Simplifying Cluster (PVM and MPI) Application Programming Using Interface Classes: Part 2
Introduction
In part 1 of this series, we introduced the Parallel Virtual Machine (PVM) library, one of the early standards used for parallel and cluster application programming. Our goal in part 1 was to show how interface classes can be used to simplify the PVM's message-passing interface and how simplifying that interface allows us to concentrate on and clarify the parallel programming logic required in the application. In this article (part 2), we dig deeper into our interface and stream classes and see how they can be applied to the Message Passing Interface (MPI) system.
While the PVM system was one of the early standards for parallel and cluster-based programming, the MPI system is the current and de facto standard. We include both standards in our discussion because they're both in heavy use. The PVM has a small number of library routines and for certain tasks is easier to use than most MPI implementations. The MPI has a large comprehensive API (more than 100 functions). It has language bindings for C, C++, and Fortran. For large-scale parallel or cluster-based applications, MPI is the system of choice.
The following table shows the general categories of functions that the MPI standard contains.
Category of MPI Function |
Routines for ... |
Process management |
Creating and managing MPI processes |
Remote memory operations |
Directly accessing memory in another process |
Communication |
Transferring a sequence of contiguous identical elements in memory |
Communicators |
Dividing processes into groups |
User-defined datatypes/packing and unpacking |
Transferring noncontiguous memory locations in a single communication |
Environment management |
Setting up, executing, entering, and leaving the MPI environment |
File management |
Managing files |
A basic MPI program consists of two or more processes that work together to solve some problem or perform some task. Each process belongs to one or more communication domains. A communication domain is a shared structure that allows processes in the same group (or groups of processes) to communicate with each other. Each communication domain is represented by a communicator. The size of a communication domain is determined by the number of processes that belong to it. Processes in the same communication domain are ordered and are identified by an integer, referred to as the rank of the MPI process. Following are the most fundamental operations of any MPI process:
- Call the MPI_init() function for the process
- Determine the rank of the process with the MPI_Comm_rank() function
- Determine the size of the communication domain with MPI_Comm_size() function
- Send and/or receive messages from other processes (MPI_send and MPI_recv functions)
- Call the MPI_finalize() function for the process
Figure 1 shows the anatomy of a basic MPI program.
Figure 1 The anatomy of a basic MPI program.
The send and receive operations in Figure 1 occur within or between communication domains. If the communication is within the communication domain, it's an intracomcommunicator. If the communication is between communication domains, it's an intercommcommunicator. MPI programs start with an MPI_init() and finish with MPI_finalize.
Although the MPI standard has a C++ binding, its C++ binding doesn't provide mpi_stream classes. The MPI standard presents the same problems as the PVM standard. The programmer has to fiddle with and worry about the correct datatypes during send and receive operations. The setup of the send and receive operation can be tedious if many datatypes are involved. Trying to send and receive user-defined datatypes between cluster nodes is even more cumbersome. The tedious code required for setting up the send and receive operations can muddy the logic flow in those parts of the program that use parallelism. This is especially the case if communications between the nodes are highly interactive. Parallel programming is challenging enough; it's advantageous to clarify the logic wherever we can. The C++ stream metaphor helps to simplify input and output operations. We want to maintain the stream metaphor in our MPI programs. The introduction of mpi_stream classes is a step in the right direction.
The MPI standard has a family of SEND and RECV functions. The following tables show the most commonly used functions.
#include "mpi.h"
MPI Send Routine |
Description |
int MPI_Send(void *Buffer,int Count,MPI_Datatype Type, |
Performs a basic send |
int MPI_Send_init(void *Buffer,int Count,MPI_Datatype Type, |
Initializes a handle for a standard send |
int MPI_Ssend(void *Buffer,int Count,MPI_Datatype Type, |
Performs a basic synchronous send |
int MPI_Ssend_init(void *Buffer,int Count,MPI_Datatype Type, |
Initializes a handle for a synchronous send |
int MPI_Rsend(void *Buffer,int Count,MPI_Datatype Type, |
Performs basic ready send |
int MPI_Rsend_init(void *Buffer,int Count,MPI_Datatype Type, |
Initializes a handle for a ready send |
int MPI_Isend(void *Buffer,int Count,MPI_Datatype Type, |
Starts a nonblocking send |
int MPI_Issend(void *Buffer,int Count,MPI_Datatype Type, |
Starts a nonblocking synchronous send |
int MPI_Irsend(void *Buffer,int Count,MPI_Datatype Type, |
Starts a nonblocking ready send |
MPI Receive Routine |
|
int MPI_Recv(void *Buffer,int Count,MPI_Datatype Type, |
Performs a basic receive |
int MPI_Recv_init(void *Buffer,int Count,MPI_Datatype Type, |
Initializes a handle for a receive |
int MPI_Irecv(void *Buffer,int Count,MPI_Datatype Type, |
Begins a nonblocking receive |
int MPI_Sendrecv(void *sendBuffer,int SendCount, |
Sends and receives a message |
int MPI_Sendrecv_replace(void *Buffer,int Count,MPI_Datatype
Type, |
Sends and receives using a single buffer |
The MPI_SEND and MPI_RECV functions are similar to the pvm_send and pvm _rec functions discussed in Article 1. Listing 1 is an example of a simple MPI program that uses the basic MPI_SEND and MPI_RECV functions.
Listing 1 A simple MPI program that uses MPI_SEND and MPI_RECV functions.
#include <mpi.h> #include <unistd.h> const int Len = 100; const int Boss = 0; int main(int argc,char *argv[]) { char HostName[Len]; int Rank,NumProcs; int MessageTag = 25; float SomeNumber = 3.1415; float Y; MPI_Status Status; MPI_Init(&argc,&argv); MPI_Comm_rank(MPI_COMM_WORLD,&Rank); MPI_Comm_size(MPI_COMM_WORLD,&NumProcs); int Value = -99; if(Rank == Boss){ gethostname(HostName,Len); cout << "Rank " << Rank << "At " << HostName << endl; for(int N =1;N < NumProcs;N++) { MPI_Send(&N,1,MPI_INT,N,MessageTag,MPI_COMM_WORLD); Y = SomeNumber + N; MPI_Send(&Y,1,MPI_FLOAT,N,MessageTag,MPI_COMM_WORLD); } } else{ MPI_Recv(&Value,1,MPI_INT,Boss,MessageTag,MPI_COMM_WORLD, &Status); MPI_Recv(&Y,1,MPI_FLOAT,Boss,MessageTag,MPI_COMM_WORLD,&Status); cout << "From Rank: " << Value << " " << sqrt(Y - Value) << endl; } MPI_Finalize(); }
The program shown in Listing 1 is executed by all of the nodes in the cluster. The first process of the first node to execute the program is given a rank of 0. For our purposes this rank is referred to as the Boss rank. All other processes that execute the program are referred to as workers. The program in Listing 1 shows a simple example of how the SPMD (Single Program Multiple Data) model of parallelism is implemented. Specifically this implementation of the SPMD has a boss and worker configuration. In this model each process receives the same program to execute but might take different paths through the program. In Listing 1 the Boss takes one path and the workers take another path. In the case of the boss and the worker processes both have to deal with coding the MPI_SEND and MPI_RECV functions with the proper datatypes. We can simplify the program in Listing 1 by using the MPI C++ bindings. Listing 2 shows the same program using the C++ bindings.
Listing 2 Simplification of Listing 1 using C++ bindings.
#include <mpi.h> #include <unistd.h> const int Len = 100; const int Boss = 0; using namespace MPI; int main(int argc,char *argv[]) { char HostName[Len]; int Rank,NumProcs; int MessageTag = 25; float SomeNumber = 3.1415; float Y; int Value = -99; MPI::Status Status; MPI::Init(argc,argv); Rank = COMM_WORLD.Get_rank(); NumProcs = COMM_WORLD.Get_size(); if(Rank == Boss){ gethostname(HostName,Len); cout << "Rank " << Rank << "At " << HostName << endl; for(int N =1;N < NumProcs;N++) { COMM_WORLD.Send(&N,1,MPI::INT,N,MessageTag); Y = SomeNumber + N; COMM_WORLD.Send(&Y,1,MPI::FLOAT,N,MessageTag); } } else{ COMM_WORLD.Recv(&Value,1,MPI::INT,Boss,MessageTag,Status); COMM_WORLD.Recv(&Y,1,MPI::FLOAT,Boss,MessageTag,Status); cout << "From Rank: " << Value << " " << sqrt(Y - Value) << endl; } Finalize(); }
The C++ bindings are declared in an MPI namespace. Notice the following in Listing 2:
using namespace MPI;
Although the program in Listing 2 is an improvement over the program in Listing 1, we can do better. Here is where the stream metaphor can be used to simplify matters. In part 1 of this series, we built a simple pvm_stream class; here we show its mpi_stream counterpart. Our mpi_stream classes will take advantage of the MPI::Intracomm, which encapsulates the notion of the MPI communicator. The Intracomm class deals with communicators between processes in the same communication domain. This is in contrast to the MPI::Intercomm class, which deals with communicators between communication domains. We have four mpi_stream classes:
- message_communicator
- impi_stream
- ompi_stream
- mpi_stream
The message_communicator class is a simple interface class for MPI::Intracomm. Listing 3 shows the declaration of the message_communicator class.
Listing 3 Declaration of message_communicator.
#include <mpi.h> using namespace MPI; class message_communicator{ public: message_communicator(void); message_communicator(MPI_Comm X); void messageRank(int X); int messageRank(void); int rank(void); void messageId(int X); int size(void); protected: Intracomm Communicator; int MessageRank; int MessageId; };
The impi_stream class is used to wrap MPI_RECV operations. The ompi_stream class is used to wrap MPI_SEND operations. The mpi_stream class uses multiple inheritance to combine the impi_stream and ompi_stream class. The impi_stream class and the ompi_stream class will define the operator>> and the operator<< methods for all of the built-in datatypes. We'll also define operator>> and operator<< for certain user-defined datatypes. Setting up these mpi_stream classes requires extra effort up front, but that work is far outweighed by the long-term advantage. Listing 4 shows the declarations of the stream classes.
Listing 4 The declarations of the stream classes.
class impi_stream : public virtual message_communicator{ public: impi_stream(void); impi_stream(MPI_Comm X); impi_stream &operator>>(int &Data); impi_stream &operator>>(string &Data); impi_stream &operator>>(float &Data); impi_stream &operator>>(vector<string> &X); impi_stream &operator>>(list<string> &X); private: MPI::Status Status; }; class ompi_stream : public virtual message_communicator{ public: ompi_stream(void); ompi_stream(MPI_Comm X); ompi_stream &operator<<(int Data); ompi_stream &operator<<(string Data); ompi_stream &operator<<(float Data); ompi_stream &operator<<(vector<string> &X); ompi_stream &operator<<(list<string> &X); private: MPI::Status Status; }; class mpi_stream : public ompi_stream, public impi_stream{ public: mpi_stream(void); mpi_stream(MPI_Comm X); };
Once we complete the definitions for the classes declared in Listing 3 and Listing 4, we can simplify our original MPI program by using a C++ stream metaphor. The stream metaphor gets rid of the specific references to MPI_SEND and MPI_RECV functions. It also gets rid of specific references for individual datatypes. Listing 5 shows a streamlined version of the program introduced in Listing 1.
Listing 5 The complete object-oriented version of Listing 1.
#include <unistd.h> #include "mpi_stream2.h" const int Len = 100; const int Boss = 0; int main(int argc,char *argv[]) { char HostName[Len]; int Rank,NumProcs; int MessageTag = 25; float SomeNumber = 3.1415; float Y; MPI::Status Status; MPI::Init(argc,argv); mpi_stream Communicator(COMM_WORLD); NumProcs = Communicator.size(); Communicator.messageId(MessageTag); int Value = -99; if(Communicator.rank() == Boss){ gethostname(HostName,Len); cout << "Boss Rank: " << Communicator.rank() << "At " << HostName << endl; for(int N =1;N < NumProcs;N++) { Communicator.messageRank(N); Y = SomeNumber + N; Communicator << N << Y; } } else{ Communicator.messageRank(Boss); Communicator >> Value >> Y; cout << "From Rank: " << Value << " " << sqrt(Y - Value) << endl; } Finalize(); }
The program in Listing 5 takes better advantage of an object-oriented approach to MPI programming. Objects are inserted into mpi_stream and extracted from mpi_stream using >> and << operators. The details of the processing that the >> and << operators perform are encapsulated and hidden from the user of the mpi_stream objects. To the developer accustomed to the C++ object-oriented paradigm, the program in Listing 5 is easier to read and understand than the programs in Listing 1 and Listing 2. Also, the MPI syntax doesn't get in the way of the logic of the program. The supplier of the mpi_stream class does the heavy lifting and the user of the mpi_stream class receives all of the benefit. Listing 6 contains the definitions for the methods of the message_communicator, impi_stream, ompi_stream, and mpi_stream classes.
Listing 6 Definitions for the mpi_stream classes.
#include "mpi_stream.h" int message_communicator::size(void) { return(Communicator.Get_size()); } int message_communicator::rank(void) { return(Communicator.Get_rank()); } void message_communicator::messageId(int X) { MessageId = X; } void message_communicator::messageRank(int X) { MessageRank = X; } int message_communicator::messageRank(void) { return(MessageRank); } message_communicator::message_communicator(void) : Communicator(MPI::COMM_WORLD) {} message_communicator::message_communicator(MPI_Comm X) : Communicator(X) {} impi_stream::impi_stream(void) : message_communicator(MPI::COMM_WORLD) {} impi_stream::impi_stream(MPI_Comm X) : message_communicator(X) {} impi_stream &impi_stream::operator>>(int &Data) { Communicator.Recv(&Data,1,MPI::INT,MessageRank,MessageId,Status); return(*this); } impi_stream &impi_stream::operator>>(float &Data) { Communicator.Recv(&Data,1,MPI::FLOAT,MessageRank,MessageId,Status); return(*this); } impi_stream &impi_stream::operator>>(string &Data) { char *Value; int Size; Communicator.Recv(&Size,1,MPI::INT,MessageRank,MessageId,Status); Value = new char[Size]; Communicator.Recv(Value,Size,MPI::CHAR,MessageRank,MessageId,Status); Data.assign(Value); delete Value; return(*this); } impi_stream &impi_stream::operator>>(vector<string> &X) { char *Value; int NumWords; int Size; *this >> NumWords; string Data; int N; for(N = 0;N < NumWords; N++) { *this >> Data; X.push_back(Data); } return(*this); } impi_stream &impi_stream::operator>>(list<string> &X) {} ompi_stream::ompi_stream(void) : message_communicator(MPI::COMM_WORLD) {} ompi_stream::ompi_stream(MPI_Comm X): message_communicator(X) {} ompi_stream &ompi_stream::operator<<(int Data) { Communicator.Send(&Data,1,MPI::INT,MessageRank,MessageId); return(*this); } ompi_stream &ompi_stream::operator<<(float Data) { Communicator.Send(&Data,1,MPI::FLOAT,MessageRank,MessageId); return(*this); } ompi_stream &ompi_stream::operator<<(string Data) { int Size; Size = Data.length(); Communicator.Send(&Size,1,MPI::INT,MessageRank,MessageId); Communicator.Send((char *)Data.c_str(),Size,MPI::CHAR,MessageRank,MessageId); return(*this); } ompi_stream &ompi_stream::operator<<(vector<string> &X) { int N = 0; int Size = X.size(); *this << Size; for(N = 0;N < X.size(); N++) { *this << X[N]; } return(*this); } ompi_stream &ompi_stream::operator<<(list<string> &X) { string Token; *this << int(X.size()); while(X.size() > 0) { Token.assign(X.front()); X.pop_front(); *this << Token; } return(*this); } mpi_stream::mpi_stream(void) : impi_stream(MPI::COMM_WORLD) ,ompi_stream(MPI::COMM_WORLD) {} mpi_stream::mpi_stream(MPI_Comm X) : impi_stream(X) ,ompi_stream(X) {}
Although there are several approaches for adding a stream-type metaphor to the C++ bindings being considered by the MPI development community, none have been accepted into the official MPI standard. This article has taken one of the simplest approaches being considered.