4.6 THE GEOMETRIC DECOMPOSITION PATTERN
Problem
How can an algorithm be organized around a data structure that has been decomposed into concurrently updatable "chunks"?
Context
Many important problems are best understood as a sequence of operations on a core data structure. There may be other work in the computation, but an effective understanding of the full computation can be obtained by understanding how the core data structures are updated. For these types of problems, often the best way to represent the concurrency is in terms of decompositions of these core data structures. (This form of concurrency is sometimes known as domain decomposition, or coarse-grained data parallelism.)
The way these data structures are built is fundamental to the algorithm. If the data structure is recursive, any analysis of the concurrency must take this recursion into account. For recursive data structures, the Recursive Data and Divide and Conquer patterns are likely candidates. For arrays and other linear data structures, we can often reduce the problem to potentially concurrent components by decomposing the data structure into contiguous substructures, in a manner analogous to dividing a geometric region into subregionshence the name Geometric Decomposition. For arrays, this decomposition is along one or more dimensions, and the resulting subarrays are usually called blocks. We will use the term chunks for the substructures or subregions, to allow for the possibility of more general data structures, such as graphs.
This decomposition of data into chunks then implies a decomposition of the update operation into tasks, where each task represents the update of one chunk, and the tasks execute concurrently. If the computations are strictly local, that is, all required information is within the chunk, the concurrency is embarrassingly parallel and the simpler Task Parallelism pattern should be used. In many cases, however, the update requires information from points in other chunks (frequently from what we can call neighboring chunkschunks containing data that was nearby in the original global data structure). In these cases, information must be shared between chunks to complete the update.
Example: mesh-computation program
The problem is to model 1D heat diffusion (that is, diffusion of heat along an infinitely narrow pipe). Initially, the whole pipe is at a stable and fixed temperature. At time 0, we set both ends to different temperatures, which will remain fixed throughout the computation. We then calculate how temperatures change in the rest of the pipe over time. (What we expect is that the temperatures will converge to a smooth gradient from one end of the pipe to the other.) Mathematically, the problem is to solve a 1D differential equation representing heat diffusion:
The approach used is to discretize the problem space (representing U by a one-dimensional array and computing values for a sequence of discrete time steps). We will output values for each time step as they are computed, so we need only save values for U for two time steps; we will call these arrays uk (U at the timestep k) and ukp1 (U at timestep k + 1). At each time step, we then need to compute for each point in array ukp1 the following:
ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]);
Variables dt and dx represent the intervals between discrete time steps and between discrete points, respectively.
Observe that what is being computed is a new value for variable ukp1 at each point, based on data at that point and its left and right neighbors.
We can begin to design a parallel algorithm for this problem by decomposing the arrays uk and ukp1 into contiguous subarrays (the chunks described earlier). These chunks can be operated on concurrently, giving us exploitable concurrency. Notice that we have a situation in which some elements can be updated using only data from within the chunk, while others require data from neighboring chunks, as illustrated by Fig. 4.8.
Figure 4.8 Data dependencies in the heat-equation problem. Solid boxes indicate the element being updated; shaded boxes the elements containing needed data.
Example: matrix-multiplication program
Consider the multiplication of two square matrices (that is, compute C=A · B). As discussed in [FJL+88], the matrices can be decomposed into blocks. The summations in the definition of matrix multiplication are likewise organized into blocks, allowing us to write a blockwise matrix multiplication equation
where at each step in the summation, we compute the matrix product Aik · Bkj and add it to the running matrix sum.
This equation immediately implies a solution in terms of the Geometric Decomposition pattern; that is, one in which the algorithm is based on decomposing the data structure into chunks (square blocks here) that can be operated on concurrently.
To help visualize this algorithm more clearly, consider the case where we decompose all three matrices into square blocks with each task "owning" corresponding blocks of A, B, and C. Each task will run through the sum over k to compute its block of C, with tasks receiving blocks from other tasks as needed. In Fig. 4.9, we illustrate two steps in this process showing a block being updated (the solid block) and the matrix blocks required at two different steps (the shaded blocks), where blocks of the A matrix are passed across a row and blocks of the B matrix are passed around a column.
Figure 4.9 Data dependencies in the matrix-multiplication problem. Solid boxes indicate the "chunk" being updated (C); shaded boxes indicate the chunks of A (row) and B (column) required to update C at each of the two steps.
Forces
-
To exploit the potential concurrency in the problem, we must assign chunks of the decomposed data structure to UEs. Ideally, we want to do this in a way that is simple, portable, scalable, and efficient. As noted in Section 4.1, however, these goals may conflict. A key consideration is balancing the load, that is, ensuring that all UEs have roughly the same amount of work to do.
-
We must also ensure that the data required for the update of each chunk is present when needed. This problem is somewhat analogous to the problem of managing data dependencies in the Task Parallelism pattern, and again the design must keep in mind the sometimes-conflicting goals of simplicity, portability, scalability, and efficiency.
Solution
Designs for problems that fit this pattern involve the following key elements: partitioning the global data structure into substructures or "chunks" (the data decomposition), ensuring that each task has access to all the data it needs to perform the update operation for its chunk (the exchange operation), updating the chunks (the update operation), and mapping chunks to UEs in a way that gives good performance (the data distribution and task schedule).
Data decomposition
The granularity of the data decomposition has a significant impact on the efficiency of the program. In a coarse-grained decomposition, there are a smaller number of large chunks. This results in a smaller number of large messages, which can greatly reduce communication overhead. A fine-grained decomposition, on the other hand, results in a larger number of smaller chunks, in many cases leading to many more chunks than PEs. This results in a larger number of smaller messages (and hence increases communication overhead), but it greatly facilitates load balancing.
Although it might be possible in some cases to mathematically derive an optimum granularity for the data decomposition, programmers usually experiment with a range of chunk sizes to empirically determine the best size for a given system. This depends, of course, on the computational performance of the PEs and on the performance characteristics of the communication network. Therefore, the program should be implemented so that the granularity is controlled by parameters that can be easily changed at compile or runtime.
The shape of the chunks can also affect the amount of communication needed between tasks. Often, the data to share between tasks is limited to the boundaries of the chunks. In this case, the amount of shared information scales with the surface area of the chunks. Because the computation scales with the number of points within a chunk, it scales as the volume of the region. This surface-to-volume effect can be exploited to maximize the ratio of computation to communication. Therefore, higher-dimensional decompositions are usually preferred. For example, consider two different decompositions of an N by N matrix into four chunks. In one case, we decompose the problem into four column chunks of size N by N/4. In the second case, we decompose the problem into four square chunks of size N/2 by N/2. For the column block decomposition, the surface area is 2N + 2(N/4) or 5N/2. For the square chunk case, the surface area is 4(N/2) or 2N. Hence, the total amount of data that must be exchanged is less for the square chunk decomposition.
In some cases, the preferred shape of the decomposition can be dictated by other concerns. It may be the case, for example, that existing sequential code can be more easily reused with a lower-dimensional decomposition, and the potential increase in performance is not worth the effort of reworking the code. Also, an instance of this pattern can be used as a sequential step in a larger computation. If the decomposition used in an adjacent step differs from the optimal one for this pattern in isolation, it may or may not be worthwhile to redistribute the data for this step. This is especially an issue in distributed-memory systems where redistributing the data can require significant communication that will delay the computation. Therefore, data decomposition decisions must take into account the capability to reuse sequential code and the need to interface with other steps in the computation. Notice that these considerations might lead to a decomposition that would be suboptimal under other circumstances.
Communication can often be more effectively managed by replicating the nonlocal data needed to update the data in a chunk. For example, if the data structure is an array representing the points on a mesh and the update operation uses a local neighborhood of points on the mesh, a common communication-management technique is to surround the data structure for the block with a ghost boundary to contain duplicates of data at the boundaries of neighboring blocks. So now each chunk has two parts: a primary copy owned by the UE (that will be updated directly) and zero or more ghost copies (also referred to as shadow copies). These ghost copies provide two benefits. First, their use may consolidate communication into potentially fewer, larger messages. On latency-sensitive networks, this can greatly reduce communication overhead. Second, communication of the ghost copies can be overlapped (that is, it can be done concurrently) with the update of parts of the array that don't depend on data within the ghost copy. In essence, this hides the communication cost behind useful computation, thereby reducing the observed communication overhead.
For example, in the case of the mesh-computation example discussed earlier, each of the chunks would be extended by one cell on each side. These extra cells would be used as ghost copies of the cells on the boundaries of the chunks. Fig. 4.10 illustrates this scheme.
Figure 4.10 A data distribution with ghost boundaries. Shaded cells are ghost copies; arrows point from primary copies to corresponding secondary copies.
The exchange operation
A key factor in using this pattern correctly is ensuring that nonlocal data required for the update operation is obtained before it is needed.
If all the data needed is present before the beginning of the update operation, the simplest approach is to perform the entire exchange before beginning the update, storing the required nonlocal data in a local data structure designed for that purpose (for example, the ghost boundary in a mesh computation). This approach is relatively straightforward to implement using either copying or message passing.
More sophisticated approaches in which computation and communication overlap are also possible. Such approaches are necessary if some data needed for the update is not initially available, and may improve performance in other cases as well. For example, in the example of a mesh computation, the exchange of ghost cells and the update of cells in the interior region (which do not depend on the ghost cells) can proceed concurrently. After the exchange is complete, the boundary layer (the values that do depend on the ghost cells) can be updated. On systems where communication and computation occur in parallel, the savings from such an approach can be significant. This is such a common feature of parallel algorithms that standard communication APIs (such as MPI) include whole classes of message-passing routines to overlap computation and communication. These are discussed in more detail in the MPI appendix.
The low-level details of how the exchange operation is implemented can have a large impact on efficiency. Programmers should seek out optimized implementations of communication patterns used in their programs. In many applications, for example, the collective communication routines in message-passing libraries such as MPI are useful. These have been carefully optimized using techniques beyond the ability of many parallel programmers (we discuss some of these in Sec. 6.4.2) and should be used whenever possible.
The update operation
Updating the data structure is done by executing the corresponding tasks (each responsible for the update of one chunk of the data structures) concurrently. If all the needed data is present at the beginning of the update operation, and if none of this data is modified during the course of the update, parallelization is easier and more likely to be efficient.
If the required exchange of information has been performed before beginning the update operation, the update itself is usually straightforward to implementit is essentially identical to the analogous update in an equivalent sequential program, particularly if good choices have been made about how to represent nonlocal data. If the exchange and update operations overlap, more care is needed to ensure that the update is performed correctly. If a system supports lightweight threads that are well integrated with the communication system, then overlap can be achieved via multithreading within a single task, with one thread computing while another handles communication. In this case, synchronization between the threads is required.
In some systems, for example MPI, nonblocking communication is supported by matching communication primitives: one to start the communication (without blocking), and the other (blocking) to complete the operation and use the results. For maximal overlap, communication should be started as soon as possible, and completed as late as possible. Sometimes, operations can be reordered to allow more overlap without changing the algorithm semantics.
Data distribution and task scheduling
The final step in designing a parallel algorithm for a problem that fits this pattern is deciding how to map the collection of tasks (each corresponding to the update of one chunk) to UEs. Each UE can then be said to "own" a collection of chunks and the data they contain. Thus, we have a two-tiered scheme for distributing data among UEs: partitioning the data into chunks and then assigning these chunks to UEs. This scheme is flexible enough to represent a variety of popular schemes for distributing data among UEs.
In the simplest case, each task can be statically assigned to a separate UE; then all tasks can execute concurrently, and the intertask coordination needed to implement the exchange operation is straightforward. This approach is most appropriate when the computation times of the tasks are uniform and the exchange operation has been implemented to overlap communication and computation within each tasks.
The simple approach can lead to poor load balance in some situations, however. For example, consider a linear algebra problem in which elements of the matrix are successively eliminated as the computation proceeds. Early in the computation, all the rows and columns of the matrix have numerous elements to work with and decompositions based on assigning full rows or columns to UEs are effective. Later in the computation, however, rows or columns become sparse, the work per row becomes uneven, and the computational load becomes poorly balanced between UEs. The solution is to decompose the problem into many more chunks than there are UEs and to scatter them among the UEs with a cyclic or block-cyclic distribution. (Cyclic and block-cyclic distributions are discussed in the Distributed Array pattern.) Then, as chunks become sparse, there are (with high probability) other nonsparse chunks for any given UE to work on, and the load becomes well balanced. A rule of thumb is that one needs around ten times as many tasks as UEs for this approach to work well.
It is also possible to use dynamic load-balancing algorithms to periodically redistribute the chunks among the UEs to improve the load balance. These incur overhead that must be traded off against the improvement likely to occur from the improved load balance and increased implementation costs. In addition, the resulting program is more complex than those that use one of the static methods. Generally, one should consider the (static) cyclic allocation strategy first.
Program structure
The overall program structure for applications of this pattern will normally use either the Loop Parallelism pattern or the SPMD pattern, with the choice determined largely by the target platform. These patterns are described in the Supporting Structures design space.
Examples
We include two examples with this pattern: a mesh computation and matrix multiplication. The challenges in working with the Geometric Decomposition pattern are best appreciated in the low-level details of the resulting programs. Therefore, even though the techniques used in these programs are not fully developed until much later in the book, we provide full programs in this section rather than high-level descriptions of the solutions.
Mesh computation
This problem is described in the Context section of this pattern. presents a simple sequential version of a program (some details omitted) that solves the 1D heat-diffusion problem. The program associated with this problem is straightforward, although one detail might need further explanation: After computing new values in ukp1 at each step, conceptually what we want to do is copy them to uk for the next iteration. We avoid a time-consuming actual copy by making uk and ukp1 pointers to their respective arrays and simply swapping them at the end of each step. This causes uk to point to the newly computed values and ukp1 to point to the area to use for computing new values in the next iteration.
Example 4.11. Sequential heat-diffusion program
#include <stdio.h> #include <stdlib.h> #define NX 100 #define LEFTVAL 1.0 #define RIGHTVAL 10.0 #define NSTEPS 10000 void initialize(double uk[], double ukp1[]) { uk[0] = LEFTVAL; uk[NX-1] = RIGHTVAL; for (int i = 1; i < NX-1; ++i) uk[i] = 0.0; for (int i = 0; i < NX; ++i) ukp1[i] = uk[i]; } void printValues(double uk[], int step) { /* NOT SHOWN */ } int main(void) { /* pointers to arrays for two iterations of algorithm */ double *uk = malloc(sizeof(double) * NX); double *ukp1 = malloc(sizeof(double) * NX); double *temp; double dx = 1.0/NX; double dt = 0.5*dx*dx; initialize(uk, ukpl); for (int k = 0; k < NSTEPS; ++k) { /* compute new values */ for (int i = 1; i < NX-1; ++i) { ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* "copy" ukp1 to uk by swapping pointers */ temp = ukp1; ukp1 = uk; uk = temp; printValues(uk, k); } return 0; }
This program combines a top-level sequential control structure (the time-step loop) with an array-update operation, which can be parallelized using the Geometric Decomposition pattern. We show parallel implementations of this program using OpenMP and MPI.
OpenMP solution
A particularly simple version of the program using OpenMP and the Loop Parallelism pattern is shown in . Because OpenMP is a shared-memory programming model, there is no need to explicitly partition and distribute the two key arrays (uk and ukp1). The creation of the threads and distribution of the work among the threads are accomplished with the parallel for directive.
#pragma parallel for schedule(static)
The schedule (static) clause decomposes the iterations of the parallel loop into one contiguous block per thread with each block being approximately the same size. This schedule is important for Loop Parallelism programs implementing the Geometric Decomposition pattern. Good performance for most Geometric Decomposition problems (and mesh programs in particular) requires that the data in the processor's cache be used many times before it is displaced by data from new cache lines. Using large blocks of contiguous loop iterations increases the chance that multiple values fetched in a cache line will be utilized and that subsequent loop iterations are likely to find at least some of the required data in cache.
The last detail to discuss for the program in is the synchronization required to safely copy the pointers. It is essential that all of the threads complete their work before the pointers they manipulate are swapped in preparation for the next iteration. In this program, this synchronization happens automatically due to the implied barrier (see Sec. 6.3.2) at the end of the parallel loop.
The program in works well with a small number of threads. When large numbers of threads are involved, however, the overhead incurred by placing the thread creation and destruction inside the loop over k would be prohibitive. We can reduce thread-management overhead by splitting the parallel for directive into separate parallel and for directives and moving the thread creation outside the loop over k. This approach is shown in . Because the whole k loop is now inside a parallel region, we must be more careful about how data is shared between threads. The private clause causes the loop indices k and i to be local to each thread. The pointers uk and ukp1 are shared, however, so the swap operation must be protected. The easiest way to do this is to ensure that only one member of the team of threads does the swap. In OpenMP, this is most easily done by placing the update inside a single construct. As described in more detail in the OpenMP appendix, Appendix A, the first thread to encounter the construct will carry out the swap while the other threads wait at the end of the construct.
Example 4.12. Parallel heat-diffusion program using OpenMP
#include <stdio.h> #include <stdlib.h> #define NX 100 #define LEFTVAL 1.0 #define RIGHTVAL 10.0 #define NSTEPS 10000 void initialize(double uk[], double ukp1[]) { uk[0] = LEFTVAL; uk[NX-1] = RIGHTVAL; for (int i = 1; i < NX-1; ++i) uk[i] = 0.0; for (int i=0; i < NX; ++i) ukp1[i] = uk[i]; } void printValues(double uk[], int step) { /* NOT SHOWN */ } int main(void) { /* pointers to arrays for two iterations of algorithm */ double *uk = malloc(sizeof(double) * NX); double *ukp1 = malloc(sizeof(double) * NX); double *temp; double dx = 1.0/NX; double dt = 0.5*dx*dx; initialize(uk, ukp1); for (int k = 0; k < NSTEPS; ++k) { #pragma omp parallel for schedule(static) /* compute new values */ for (int i = 1; i < NX-1; ++i) { ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* "copy" ukp1 to uk by swapping pointers */ temp = ukp1; ukpl = uk; uk = temp; printValues(uk, k); } return 0; }
MPI solution
An MPI-based program for this example is shown in and . The approach used in this program uses a data distribution with ghost cells and the SPMD pattern.
Each process is given a single chunk of the data domain of size NX/NP, where NX is the total size of the global data array and NP is the number of processes. For simplicity, we assume NX is evenly divided by NP.
The update of the chunk is straightforward and essentially identical to that from the sequential code. The length and greater complexity in this MPI program arises from two sources. First, the data initialization is more complex, because it must account for the data values at the edges of the first and last chunks. Second, message-passing routines are required inside the loop over k to exchange ghost cells.
Example 4.13. Parallel heat-diffusion program using OpenMP. This version has less thread-management overhead.
#include <stdio.h> #include <stdlib.h> #include <omp.h> #define NX 100 #define LEFTVAL 1.0 #define RIGHTVAL 10.0 #define NSTEPS 10000 void initialize(double uk[], double ukp1[]){/* NOT SHOWN */} void printValues(double uk[], int step) { /* NOT SHOWN */ } int main(void) { /* pointers to arrays for two iterations of algorithm */ double *uk = malloc(sizeof(double) * NX); double *ukp1 = malloc(sizeof(double) * NX); double *temp; int i,k; double dx = 1.0/NX; double dt = 0.5*dx*dx; #pragma omp parallel private (k, i) { initialize(uk, ukp1); for (k = 0; k < NSTEPS; ++k) { #pragma omp for schedule(static) for (i = 1; i < NX-1; ++i) { ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* "copy" ukp1 to uk by swapping pointers */ #pragma omp single { temp = ukp1; ukp1 = uk; uk = temp; } } } return 0; }
The details of the message-passing functions can be found in the MPI appendix, Appendix B. Briefly, transmitting data consists of one process doing a send operation, specifying the buffer containing the data, and another process doing a receive operation, specifying the buffer into which the data should be placed. We need several different pairs of sends and receives because the process that owns the leftmost chunk of the array does not have a left neighbor it needs to communicate with, and similarly the process that owns the rightmost chunk does not have a right neighbor to communicate with.
Example 4.14. Parallel heat-diffusion program using MPI (continued in )
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <mpi.h> #define NX 100 #define LEFTVAL 1.0 #define RIGHTVAL 10.0 #define NSTEPS 10000 void initialize(double uk[], double ukp1[], int numPoints, int numProcs, int myID) { for (int i = 1; i <= numPoints; ++i) uk[i] = 0.0; /* left endpoint */ if (myID == 0) uk[1] = LEFTVAL; /* right endpoint */ if (myID == numProcs-1) uk[numPoints] = RIGHTVAL; /* copy values to ukpl */ for (int i = 1; i <= numPoints; ++i) ukp1[i] = uk[i]; } void printValues(double uk[], int step, int numPoints, int myID) { /* NOT SHOWN */ } int main(int argc, char *argv[]) { /* pointers to arrays for two iterations of algorithm */ double *uk, *ukp1, *temp; double dx = 1.0/NX; double dt = 0.5*dx*dx; int numProcs, myID, leftNbr, rightNbr, numPoints; MPI_Status status; /* MPI initialization */ MPI-Init(&argc, &argv); MPI_Comm_size (MPI_COMM_WORLD, &numProcs); MPI_Comm_rank(MPI_COMM_WORLD, &myID); //get own ID /* initialization of other variables */ leftNbr = myID - 1; // ID of left "neighbor" process rightNbr = myID + 1; // ID of right "neighbor" process numPoints = (NX / numProcs); /* uk, ukp1 include a "ghost cell" at each end */ uk = malloc(sizeof(double) * (numPoints+2)); ukp1 = malloc(sizeof(double) * (numPoints+2)); initialize(uk, ukp1, numPoints, numProcs, myID); /* continued in next figure */
We could further modify the code in and to use nonblocking communication to overlap computation and communication, as discussed earlier in this pattern. The first part of the program is unchanged from our first mesh computation MPI program (that is, ). The differences for this case are contained in the second part of the program containing the main computation loop. This code is shown in .
Example 4.15. Parallel heat-diffusion program using MPI (continued from )
/* continued from Figure 4.14 */ for (int k = 0; k < NSTEPS; ++k) { /* exchange boundary information */ if (myID != 0) MPI_Send(&uk[1], 1, MPI_DOUBLE, leftNbr, 0, MPI_COMM_WORLD); if (myID != numProcs-1) MPI_Send(&uk[numPoints], 1, MPI_DOUBLE, rightNbr, 0, MPI_COMM_WORLD); if (myID != 0) MPI_Recv(&uk[0], 1, MPI_DOUBLE, leftNbr, 0, MPI_COMM_WORLD, &status); if (myID != numProcs-1) MPI_Recv(&uk[numPoints+l],1, MPI_DOUBLE, rightNbr, 0, MPI_COMM_WORLD, &status); /* compute new values for interior points */ for (int i = 2; i < numPoints; ++i) { ukpl[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* compute new values for boundary points */ if (myID != 0) { int i=1; ukpl[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } if (myID != numProcs-1) { int i=numPoints; ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* "copy" ukp1 to uk by swapping pointers */ temp = ukp1; ukp1 = uk; uk = temp; printValues(uk, k, numPoints, myID); } /* clean up and end */ MPI_Finalize(); return 0; }
While the basic algorithm is the same, the communication is quite different. The immediate-mode communication routines, MPI_Isend and MPI_Irecv, are used to set up and then launch the communication events. These functions (described in more detail in the MPI appendix, Appendix B) return immediately. The update operations on the interior points can then take place because they don't depend on the results of the communication. We then call functions to wait until the communication is complete and update the edges of each UE's chunks using the results of the communication events. In this case, the messages are small in size, so it is unlikely that this version of the program would be any faster than our first one. But it is easy to imagine cases where large, complex communication events would be involved and being able to do useful work while the messages move across the computer network would result in significantly greater performance.
Example 4.16. Parallel heat-diffusion program using MPI with overlapping communication/ computation (continued from )
/* continued */ MPI_Request reqRecvL, reqRecvR, reqSendL, reqSendR; //needed for // nonblocking I/0 for (int k = 0; k < NSTEPS; ++k) { /* initiate communication to exchange boundary information */ if (myID != 0) { MPI_Irecv(&uk[0], 1, MPI_DOUBLE, leftNbr, 0, MPI_COMM_WORLD, ®RecvL); MPI_Isend(&uk[l], 1, MPI_DOUBLE, leftNbr, 0, MPI_COMM_WORLD, ®SendL); } if (myID != numProcs-1) { MPI_Irecv(&uk[numPoints+1],1, MPI_DOUBLE, rightNbr, 0, MPI_COMM_WORLD, &reqRecvR); MPI_Isend(&uk[numPoints], 1, MPI_DOUBLE, rightNbr, 0, MPI_COMM_WORLD, &reqSendR); } /* compute new values for interior points */ for (int i = 2; i < numPoints; ++i) { ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* wait for communication to complete */ if (myID != 0) { MPI_Wait(&reqRecvL, &status); MPI_Wait(&reqSendL, &status); } if (myID != numProcs-1) { MPI_Wait(&reqRecvR, &status); MPI_Wait(&reqSendR, &status); } /* compute new values for boundary points */ if (myID != 0) { int i=1; ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } if (myID != numProcs-1) { int i=numPoints; ukp1[i]=uk[i]+ (dt/(dx*dx))*(uk[i+1]-2*uk[i]+uk[i-1]); } /* "copy" ukp1 to uk by swapping pointers */ temp = ukp1; ukp1 = uk; uk = temp; printValues(uk, k, numPoints, myID); } /* clean up and end */ MPI_Finalize(); return 0; }
Example 4.17. Sequential matrix multiplication
#include <stdio.h> #include <stdlib.h> #define N 100 #define NB 4 #define blockstart(M,i,j,rows_per_blk,cols_per_blk,stride) (M + ((i)*(rows_per_blk))*(stride) + (j)*(cols_per_blk)) int main(int argc, char *argv[]) { /* matrix dimensions */ int dimN = N; int dimP = N; int dimM = N; /* block dimensions */ int dimNb = dimN/NB; int dimPb = dimP/NB; int dimMb = dimM/NB; /* allocate memory for matrices */ double *A = malloc(dimN*dimP*sizeof(double)); double *B = malloc(dimP*dimM*sizeof(double)); double *C = malloc(dimN*dimM*sizeof(double)); /* Initialize matrices */ initialize(A, B, dimN, dimP, dimM); /* Do the matrix multiplication */ for (int ib=0; ib < NB; ++ib) { for (int jb=0; jb < NB; ++jb) { /* find block[ib][jb] of C */ double * blockPtr = blockstart(C, ib, jb, dimNb, dimMb, dimM); /* clear block[ib] [jb] of C (set all elements to zero) */ matclear(blockPtr, dimNb, dimMb, dimM); for (int kb=0; kb < NB; ++kb) { /* compute product of block[ib][kb] of A and block[kb] [jb] of B and add to block[ib][jb] of C */ matmul_add(blockstart(A, ib, kb, dimNb, dimPb, dimP), blockstart(B, kb, jb, dimPb, dimMb, dimM), blockPtr, dimNb, dimPb, dimMb, dimP, dimM, dimM); } } } /* Code to print results not shown */ return 0; }
Matrix multiplication
The matrix multiplication problem is described in the Context section. presents a simple sequential program to compute the desired result, based on decomposing the N by N matrix into NB*NB square blocks. The notation block [i] [j] in comments indicates the (i,j)-th block as described earlier. To simplify the coding in C, we represent the matrices as 1D arrays (internally arranged in row-major order) and define a macro blockstart to find the top-left corner of a submatrix within one of these 1D arrays. We omit code for functions initialize (initialize matrices A and B), printMatrix (print a matrix's values), matclear (clear a matrixset all values to zero), and matmul_add (compute the matrix product of the two input matrices and add it to the output matrix). Parameters to most of these functions include matrix dimensions, plus a stride that denotes the distance from the start of one row of the matrix to the start of the next and allows us to apply the functions to submatrices as well as to whole matrices.
Example 4.18. Sequential matrix multiplication, revised. We do not show the parts of the program that are not changed from the program in .
/* Declarations, initializations, etc. not shown -- same as first version */ /* Do the multiply */ matclear(C, dimN, dimM, dimM); /* sets all elements to zero */ for (int kb=0; kb < NB; ++kb) { for (int ib=0; ib < NB; ++ib) { for (int jb=0; jb < NB; ++jb) { /* compute product of block[ib][kb] of A and block[kb][jb] of B and add to block[ib][jb] of C */ matmul_add(blockstart(A, ib, kb, dimNb, dimPb, dimP), blockstart(B, kb, jb, dimPb, dimMb, dimM), blockstart(C, ib, jb, dimNb, dimMb, dimM), dimNb, dimPb, dimMb, dimP, dimM, dimM); } } } /* Remaining code is the same as for the first version */
We first observe that we can rearrange the loops without affecting the result of the computation, as shown in .
Observe that with this transformation, we have a program that combines a high-level sequential structure (the loop over kb) with a loop structure (the nested loops over ib and jb) that can be parallelized with the Geometric Decomposition pattern.
OpenMP solution
We can produce a parallel version of this program for a shared-memory environment by parallelizing the inner nested loops (over ib and/or jb) with OpenMP loop directives. As with the mesh example, it is important to keep thread-management overhead small, so once again the parallel directive should appear outside of the loop over kb. A for directive would then be placed prior to one of the inner loops. The issues raised by this algorithm and the resulting source code modifications are essentially the same as those arising from the mesh program example, so we do not show program source code here.
MPI solution
A parallel version of the matrix multiplication program using MPI is shown in and . The natural approach with MPI is to use the SPMD pattern with the Geometric Decomposition pattern. We will use the matrix multiplication algorithm described earlier.
Example 4.19. Parallel matrix multiplication with message passing (continued in )
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <math.h> #include <mpi.h> #define N 100 #define blockstart(M,i,j,rows_per_blk,cols_per_blk,stride) (M + ((i)*(rows_per_blk))*(stride) + (j)*(cols_per_blk)) int main(int argc, char *argv[]) { /* matrix dimensions */ int dimN = N; int dimP = N; int dimM = N; /* block dimensions */ int dimNb, dimPb, dimMb; /* matrices */ double *A, *B, *C; /* buffers for receiving sections of A, B from other processes */ double *Abuffer, *Bbuffer; int numProcs, myID, myID_i, myID_j, NB; MPI_Status status; /* MPI initialization */ MPI_Init(&argc, &argv); MPI_Comm_size (MPI_COMM_WORLD, &numProcs); MPI_Comm_rank(MPI_COMM_WORLD, &myID) ; /* initialize other variables */ NB = (int) sqrt((double) numProcs); myID_i = myID / NB; myID_j = myID % NB; dimNb = dimN/NB; dimPb = dimP/NB; dimMb = dimM/NB; A = malloc(dimNb*dimPb*sizeof(double)); B = malloc(dimPb*dimMb*sizeof(double)); C = malloc(dimNb*dimMb*sizeof(double)); Abuffer = malloc(dimNb*dimPb*sizeof(double)); Bbuffer = malloc(dimPb*dimMb*sizeof(double)); /* Initialize matrices */ initialize(A, B, dimNb, dimPb, dimPb, dimPb, NB, myID_i, myID_j); /* continued in next figure */
The three matrices (A, B, and C) are decomposed into blocks. The UEs (processes in the case of MPI) involved in the computation are organized into a grid such that the indices of the matrix blocks map onto the coordinates of the processes (that is, matrix block (i, j) is associated with the process with row index i and column index j). For simplicity, we assume the number of processes numProcs is a perfect square and its square root evenly divides the order of the matrices (N).
Example 4.20. Parallel matrix multiplication with message-passing (continued from )
/* continued from previous figure */ /* Do the multiply */ matclear(C, dimNb, dimMb, dimMb); for (int kb=0; kb < NB; ++kb) { if (myID_j == kb) { /* send A to other processes in the same "row" */ for (int jb=0; jb < NB; ++jb) { if (jb != myID_j) MPI_Send(A, dimNb*dimPb, MPI_DOUBLE, myID_i*NB + jb, 0, MPI_COMM_WORLD); } /* copy A to Abuffer */ memcpy(Abuffer, A, dimNb*dimPb*sizeof(double)); } else { MPI_Recv(Abuffer, dimNb*dimPb, MPI_DOUBLE, myID_i*NB + kb, 0, MPI_COMM_WORLD, &status); ) if (myID_i == kb) { /* send B to other processes in the same "column" */ for (int ib=0; ib < NB; ++ib) { if (ib != myID_i) MPI_Send(B, dimPb*dimMb, MPI_DOUBLE, ib*NB + myID_j, 0, MPI_COMM_WORLD); } /* copy B to Bbuffer */ memcpy(Bbuffer, B, dimPb*dimMb*sizeof(double)); } else { MPI_Recv(Bbuffer, dimPb*dimMb, MPI_DOUBLE, kb*NB + myID_j, 0, MPI_COMM_WORLD, &status); } /* compute product of block[ib] [kb] of A and block[kb][jb] of B and add to block[ib][jb] of C */ matmul_add(Abuffer, Bbuffer, C, dimNb, dimPb, dimMb, dimPb, dimMb, dimMb); } } /* Code to print results not shown */ /* Clean up and end */ MPI_Finalize(); return 0; }
Although the algorithm may seem complex at first, the overall idea is straightforward. The computation proceeds through a number of phases (the loop over kb). At each phase, the process whose row index equals the kb index sends its blocks of A across the row of processes. Likewise, the process whose column index equals kb sends its blocks of B along the column of processes. Following the communication operations, each process then multiplies the A and B blocks it received and sums the result into its block of C. After NB phases, the block of the C matrix on each process will hold the final product.
These types of algorithms are very common when working with MPI. The key to understanding these algorithms is to think in terms of the set of processes, the data owned by each process, and how data from neighboring processes flows among the processes as the calculation unfolds. We revisit these issues in the SPMD and Distributed Array patterns as well as in the MPI appendix.
A great deal of research has been carried out on parallel matrix multiplication and related linear algebra algorithms. A more sophisticated approach, in which the blocks of A and B circulate among processes, arriving at each process just in time to be used, is given in [FJL+88].
Known uses
Most problems involving the solution of differential equations use the Geometric Decomposition pattern. A finite-differencing scheme directly maps onto this pattern. Another class of problems that use this pattern comes from computational linear algebra. The parallel routines in the ScaLAPACK [Sca, BCC+97] library are for the most part based on this pattern. These two classes of problems cover a large portion of all parallel applications in scientific computing.
Related Patterns
If the update required for each chunk can be done without data from other chunks, then this pattern reduces to the embarrassingly parallel algorithm described in the Task Parallelism pattern. As an example of such a computation, consider computing a 2D FFT (Fast Fourier Transform) by first applying a 1D FFT to each row of the matrix and then applying a 1D FFT to each column. Although the decomposition may appear data-based (by rows/by columns), in fact the computation consists of two instances of the Task Parallelism pattern.
If the data structure to be distributed is recursive in nature, then the Divide and Conquer or Recursive Data pattern may be applicable.