MODULE-3
Distributed memory programming with MPI –
MPI functions, The trapezoidal rule in MPI, Dealing with I/O, Collective communication, MPI-
derived datatypes, Performance evaluation of MPI programs, A parallel sorting algorithm.
Distributed memory programming with MPI
In the world of parallel MIMD (Multiple Instruction, Multiple Data) systems, we typically
distinguish between distributed-memory and shared-memory architectures. In a
distributed-memory system (see Fig. 3.1), each core has access to its own private
memory, and cores are connected via a network. A core can only access its own memory
directly. Communication with other cores requires explicit message-passing.
In contrast, a shared-memory system (see Fig. 3.2) allows all cores to access a common
global memory. This shared access simplifies some aspects of programming but introduces
new challenges like synchronization and cache coherence.
To program distributed-memory systems, we use message-passing programming models.
A program running on each core-memory pair is referred to as a process, and communication
occurs using send and receive functions. The widely-used model is MPI (Message-Passing
Interface)—a standardized library of functions callable from C and Fortran.
MPI supports not only basic point-to-point communication but also collective
communication, where multiple processes participate in operations like broadcast,
gather, scatter, and reduce. These collective functions are crucial for implementing scalable
distributed algorithms.
While learning MPI, it's also necessary to understand the fundamental concerns in message-
passing systems, such as data partitioning, which involves splitting the problem and data
among different processes, and distributed I/O, which deals with efficiently managing input
and output across processes.
These programming aspects directly impact parallel program performance, making it
essential to study communication cost, load balancing, and synchronization overheads in
distributed-memory environments.
3.1 Getting Started
The traditional "hello, world" program from Kernighan and Ritchie's text introduces basic
program output. In parallel programming using MPI (Message-Passing Interface), we adapt
this by letting only one process (rank 0) handle the output, while the other processes send
messages to it.
MPI identifies processes by non-negative integer ranks, ranging from 0 to p−1, where p is
the total number of processes.
In this MPI-based hello world example (see Program 3.1), the workflow is:
• Process 0 acts as the receiver and printer.
• All other processes use MPI_Send to send greeting messages to process 0.
• Process 0 uses MPI_Recv to receive messages and prints them.
3.1.1 Compilation and Execution
The method for compiling and executing MPI programs depends on the system in
use. On most systems, developers write the code using a text editor and then use the
command line to compile and run it.
Compilation
Most systems provide an mpicc command to compile MPI programs.
mpicc is a wrapper script around the C compiler. It automatically includes:
• Necessary header files
• Required MPI libraries
Example Command:
$ mpicc -g -Wall -o mpi_hello mpi_hello.c
• -g enables debugging information
• -Wall enables all warnings
• -o mpi_hello specifies the output executable
Execution
• The command mpiexec starts the specified number of instances of the compiled MPI
program.
• It may also control which cores the instances are assigned to.
• The MPI implementation ensures that these processes can communicate with each
other as needed for the program to function.
3.1.2 MPI Programs
MPI programs are written in standard C, which means they start just like any other C
program. For instance, the program includes the usual C header files such as <stdio.h> and
<string.h>, and it defines a standard main function.
What distinguishes an MPI program is the inclusion of the <mpi.h> header file (as shown in
Line 3 of the code). This header is essential because it provides function prototypes, macro
definitions, type definitions, and all other declarations required to compile an MPI-based
program.
An important convention in MPI programming is the naming scheme used for its identifiers.
All identifiers provided by MPI begin with MPI_. For function names and MPI-defined types,
the first letter after the underscore is capitalized. For example:
• Function: MPI_Init, MPI_Send, MPI_Comm_rank
• Type: MPI_Comm, MPI_Status
In contrast, MPI-defined macros and constants are written in all capital letters, such as
MPI_CHAR, MPI_COMM_WORLD, and MPI_STATUS_IGNORE. This consistent naming makes it
easy to distinguish between MPI-provided elements and user-defined identifiers.
3.1.3 MPI_Init and MPI_Finalize
In Line 12 of the program, the call to MPI_Init is crucial as it tells the MPI system to perform
all necessary initialization steps. These steps may include allocating message buffers and
assigning ranks to processes. Importantly, no other MPI function should be called before
MPI_Init. Its syntax is:
The parameters argc_p and argv_p are pointers to the program's command-line arguments
(argc and argv). However, if the program doesn’t use command-line arguments, we can simply
pass NULL for both. Like most MPI functions, MPI_Init returns an integer error code, which
we usually ignore to avoid cluttering the code.
At the end of the program (Line 31), the call to MPI_Finalize signals that the program is done
using MPI and any resources allocated can be released. Its syntax is very simple:
As a rule, no MPI calls should be made after MPI_Finalize.
A typical structure of an MPI program includes these functions as:
3.1.4 Communicators, MPI_Comm_size, and MPI_Comm_rank
In MPI, a communicator is a group of processes that can communicate with each other. One
of the key tasks of MPI_Init is to create the default communicator MPI_COMM_WORLD, which
includes all processes initiated when the program starts. To retrieve details about the
processes in this communicator, we use:
The parameter comm is of type MPI_Comm and refers to the communicator (typically
MPI_COMM_WORLD). The function MPI_Comm_size stores the total number of processes in
comm_sz_p, while MPI_Comm_rank stores the rank (or ID) of the calling process in
my_rank_p. Conventionally, we use the variables comm_sz for total number of processes and
my_rank for the process’s rank within MPI_COMM_WORLD.
3.1.5 SPMD Programs
Despite the fact that different processes might perform different tasks, we usually compile
and execute a single program in MPI. This programming style is known as Single Program,
Multiple Data (SPMD). In our example, although process 0 prints messages and the other
processes send messages, all processes still run the same compiled code. The if-else
construct in lines 16–29 helps differentiate the behavior based on each process's rank.
This approach makes the program flexible and scalable—it can run with any number of
processes, such as 1, 4, 1000, or even more, depending on the system's resources. Even
though MPI doesn’t require this, it is good practice to write MPI programs that can adapt to
varying numbers of processes, since the available resources can change from system to
system or over time.
3.1.6 Communication
In the MPI greeting program, each process except process 0 constructs a message using
sprintf, which formats the message into a string instead of printing it to the console. Then, the
message is sent to process 0 using MPI_Send.
Process 0, on the other hand, prints its own greeting directly using printf, and then uses a
loop to receive messages from all other processes (1 to comm_sz − 1) using MPI_Recv. This
pattern illustrates a basic example of point-to-point communication in MPI, where one
process sends and another receives a message.
3.1.7 MPI_Send
The function MPI_Send is used to transmit messages between processes. Its prototype is:
• msg_buf_p points to the data buffer (e.g., greeting).
• msg_size and msg_type define how much data is sent (e.g., strlen(greeting)+1 with
type MPI_CHAR).
• dest is the rank of the destination process.
• tag is a message identifier, used to distinguish between different types of messages.
• communicator (e.g., MPI_COMM_WORLD) defines the scope of communication.
Important: MPI types such as MPI_CHAR, MPI_INT, etc., are of type MPI_Datatype, not regular
C types.
Example Use-Case for Tags: If process 1 sends some data to process 0 and it includes two
kinds of data (e.g., print vs. compute), tags like 0 and 1 can help differentiate them.
Communicators help isolate communications. For example, if you're using two independent
MPI-based libraries (say, atmosphere and ocean simulations), using separate
communicators ensures their messages don’t interfere with each other. This avoids
complicated tag-based coordination and improves modularity.
MPI_Recv
The corresponding receive function is MPI_Recv:
• msg_buf_p is where the received message will be stored.
• buf_size and buf_type define how much can be stored.
• source specifies the rank of the sender.
• tag must match the sender’s tag.
• communicator must match that of the sender.
• status_p is used to inspect additional info (e.g., actual source or tag); it can be ignored
using MPI_STATUS_IGNORE.
MPI_Recv needs to match the send exactly in data type, size, communicator, source, and
tag to complete successfully.
3.1.9 Message Matching
In MPI, for a message sent by one process to be successfully received by another, several
parameters must match between the MPI_Send and MPI_Recv calls.
Matching Conditions
Suppose process q sends a message:
And process r receives a message:
The message is received successfully only if the following conditions hold:
• recv_comm == send_comm
• recv_tag == send_tag
• dest == r
• src == q
These are necessary, but not sufficient. The buffers must also be compatible:
If recv_type == send_type and recv_buf_sz ≥ send_buf_sz, the message can be received
Unpredictable Order Handling
In many cases, the receiver doesn’t know the order in which messages will arrive. For
example:
• Process 0 distributes work to others (1, 2, ..., comm_sz−1)
• Those processes return results to process 0
• Some tasks may finish earlier, but process 0 must wait in order if using fixed ranks
This creates delays.
Solution: MPI_ANY_SOURCE
To allow receiving from any process, MPI provides the wildcard constant MPI_ANY_SOURCE.
for (i = 1; i < comm_sz; i++) {
MPI_Recv(result, result_sz, result_type, MPI_ANY_SOURCE, result_tag, comm,
MPI_STATUS_IGNORE);
Process_result(result);
}
This allows process 0 to receive results as they come in, not in process-rank order.
Wildcard for Tags
Similarly, if a process is expecting multiple message types (e.g., with different tags), it can
use:
MPI_ANY_TAG
This enables the process to receive any incoming message, regardless of its tag.
Wildcard constants (MPI_ANY_SOURCE, MPI_ANY_TAG) can only be used in receive
operations.
• Senders must specify both destination rank and tag explicitly.
• Hence, MPI uses a push model (data is sent to receivers), not pull.
No wildcard for communicators:
• Both sender and receiver must agree on the communicator (MPI_COMM_WORLD,
etc.).
3.1.10 The status_p Argument
In MPI, a process can receive a message without prior knowledge of the message size,
sender, or tag. This is made possible through the final argument of MPI_Recv, which is a
pointer to a structure of type MPI_Status. This structure includes fields such as MPI_SOURCE,
MPI_TAG, and MPI_ERROR. After a receive call like
MPI_Status status;
MPI_Recv(..., &status);
the sender’s rank and the message tag can be accessed using status.MPI_SOURCE and
status.MPI_TAG.
However, to determine the number of elements received, we use the function MPI_Get_count,
since this information isn’t directly stored in MPI_Status. Its syntax is:
This function computes the count based on the data type and stores it in count_p. The reason
for using a function instead of a struct field is that the count depends on data type and might
involve computations like dividing the number of received bytes by the size of the data type.
To avoid unnecessary overhead, this calculation is only done when explicitly requested by
the programmer.
3.1.11 Semantics of MPI_Send and MPI_Recv
When a process executes MPI_Send, the message is assembled with both data and envelope
information such as destination rank, sender rank, tag, communicator, and message
size. After this, two behaviors are possible: the MPI system may either buffer the message or
block the sending process. If buffered, the message is stored internally and MPI_Send returns
immediately. If blocked, the function waits until it can transmit the message. Importantly,
once MPI_Send returns, we cannot be certain that the message has been transmitted—only
that the send buffer is safe for reuse. For more control, MPI offers alternative send
functions with different behaviors.
The behavior of MPI_Send also depends on the message size. Many implementations use a
cutoff size: messages smaller than the cutoff are typically buffered, while larger ones cause
blocking.
Unlike MPI_Send, MPI_Recv always blocks until a matching message has been received. Thus,
after it returns, we are guaranteed that a message resides in the receive buffer (barring any
errors). There also exists a non-blocking alternative for receiving, which checks availability
and returns regardless.
MPI ensures that messages are non-overtaking: if process q sends two messages to
process r, the first message must be received before the second. However, messages from
different processes (e.g., q and t) may arrive in any order, regardless of their sending
sequence. This flexibility is necessary since MPI cannot enforce network performance—e.g., a
message from Mars can’t be guaranteed to arrive before one from San Francisco, even if
sent earlier.
3.1.12 Some Potential Pitfalls
A key pitfall in MPI programming lies in the semantics of MPI_Recv, which blocks until a
matching send is issued. If no such send exists, the process will hang indefinitely. This
highlights the importance of ensuring that every receive has a corresponding send. Coding
errors, such as mismatched tags or incorrect source/destination ranks, can prevent a
match, causing unexpected hangs or, worse, messages being received by the wrong process.
On the sending side, a call to MPI_Send may either block or buffer. If it blocks and no
matching receive is available, the sending process will hang. If it buffers but still lacks a
matching receive, the message will be lost. Hence, meticulous attention must be paid during
both design and coding to maintain correct communication patterns and avoid these silent
failures.
3.2 The Trapezoidal Rule in MPI
While simple programs like printing messages help us understand MPI basics, the real benefit
of parallel programming comes when we apply it to more useful computations. A good
example is the trapezoidal rule for numerical integration, which is a method used to
estimate the definite integral of a function. In a serial implementation, this involves dividing
the integration interval into smaller subintervals, computing the function at each point, and
applying the trapezoidal formula. In MPI, this process can be parallelized by dividing the
integration interval among multiple processes. Each process computes the area under the
curve over its assigned subinterval and sends its partial result to a designated process
(usually rank 0), which then combines them to produce the final result. This example
demonstrates how computation and communication are coordinated in MPI programs,
moving from basic I/O tasks to solving actual mathematical problems.
3.2.1 The Trapezoidal Rule
The trapezoidal rule provides a method to numerically approximate definite integrals by
dividing the interval [a,b][a, b][a,b] into n equal subintervals and summing up the areas of
trapezoids under the curve. As shown in Figure 3.3, we consider the area between the graph
of y=f(x)y = f(x)y=f(x), two vertical lines, and the x-axis. Each subinterval is treated as a
trapezoid, with its area approximated using the function values at the subinterval’s
endpoints. According to Figure 3.4, the height of each trapezoid is the interval width
and its area is calculated as:
Summing the areas of all trapezoids gives the total approximation:
A serial pseudocode for implementing this is:
This method provides a foundation for parallelization using MPI, where the interval and
corresponding computations can be distributed across multiple processes.
3.2.2 Parallelizing the Trapezoidal Rule
To parallelize the trapezoidal rule, we follow the standard steps in parallel program design:
(1) partition the computation into tasks, (2) identify communication, (3) aggregate tasks, and
(4) map tasks to cores. In this case, we identify two main tasks: computing the area of each
trapezoid and summing the results. As shown in Figure 3.5, each area-computing task sends
its result to a central summing task.
Assuming the number of processes comm_sz evenly divides the number of trapezoids n, we
can assign n/comm_sz trapezoids to each process. Each process computes a local integral
over its subinterval [local_a, local_b]. After computing, processes send their local results to
process 0, which receives and sums them to get the final approximation.
The pseudocode is as follows:
Get a, b, n;
h = (b - a)/n;
local_n = n / comm_sz;
local_a = a + my_rank * local_n * h;
local_b = local_a + local_n * h;
local_integral = Trap(local_a, local_b, local_n, h);
if (my_rank != 0)
Send local_integral to process 0;
else {
total_integral = local_integral;
for (proc = 1; proc < comm_sz; proc++) {
Receive local_integral from proc;
total_integral += local_integral;
}
}
if (my_rank == 0)
print result;
Here, Trap() is a function implementing the serial trapezoidal rule. Note the distinction
between local variables (like local_a, local_b, and local_n, which are private to each process)
and global variables (a, b, and n, which are meaningful across all processes). Although these
definitions differ from traditional programming terms, the context in parallel programming
makes them intuitive.
MPI program shown in Program 3.2.TheTrap function is just an implementation of the serial
trapezoidal rule. See Program 3.3.
3.3 Dealing with I/O
The current version of the parallel trapezoidal rule has a major limitation: it only works for
the interval [0, 3] using 1024 trapezoids. While we could manually edit the code and
recompile for new values, this is inconvenient compared to simply typing in three new input
values during execution. Therefore, it's necessary to address the issue of user input in
parallel programs. And while discussing input handling, it's also a good opportunity to look
at output mechanisms in parallel MPI programs.
3.3.1 Output
In both the “greetings” program and the trapezoidal rule program, we assumed that
process 0 can write to stdout via printf, and this behaves as expected. While the MPI
standard doesn't strictly define which processes can access I/O devices, most MPI
implementations allow all processes in MPI_COMM_WORLD to access both stdout and
stderr. However, there is no automatic scheduling of output access. If multiple processes
try to write simultaneously to stdout, the output order becomes unpredictable, and one
process’s output might even be interrupted by another’s.
For instance, when running a program (as in Program 3.4) where each process prints a line,
the output order may appear sequential with five processes, but becomes nondeterministic
with six or more processes. This behavior occurs because the processes compete for access
to shared output, leading to race conditions in printing. The result is nondeterminism—
output may differ between runs.
To maintain a predictable output order, it's the programmer’s responsibility to manage
output. A common solution is to have each non-zero process send its output as a message
to process 0, which then prints in rank order—as demonstrated in the “greetings”
program.
3.3.2 Input
Unlike output, most MPI implementations allow only process 0 in MPI_COMM_WORLD to
access stdin. This restriction is logical, since if multiple processes accessed stdin, it would be
ambiguous as to which process should receive which part of the input. For example, should
process 0 receive the first line and process 1 the second, or should input be distributed
character-by-character?
To handle input using functions like scanf, MPI programs typically branch on process rank.
That is, process 0 reads the input and then broadcasts or sends it to the other processes. For
instance, in the parallel trapezoidal rule program, the Get_input function (see Program 3.5)
allows process 0 to read the values for
a, b, and n, and then sends these values to each of the other processes. This communication
follows the same pattern as in the “greetings” program, except that now, process 0 sends the
data, while the other processes receive it.
To use this input function, a call to Get_input should be inserted inside main, after initializing
my_rank and comm_sz.
3.4 Collective Communication
In our trapezoidal rule MPI program, one inefficiency is evident in how the global sum is
computed. After calculating local integrals, every process sends its value to process 0, which
alone performs the summation. This results in poor workload distribution, as process 0
does most of the work, while the other processes become idle after sending. Such imbalance
is inefficient, especially when resources are underutilized. A better approach would be to
distribute the summation work, so all processes contribute equally, similar to how students
can collaboratively add numbers rather than relying on one to sum them all.
3.4.1 Tree-Structured Communication
An efficient solution is the use of tree-structured communication, such as a binary tree
(see Fig. 3.6). Initially, pairs like processes 1 & 0, 3 & 2, 5 & 4, and 7 & 6 exchange and add
values. In successive steps, the intermediate results are sent and summed by fewer
processes, until process 0 receives the final total. This method reduces the number of receives
and additions by process 0 from comm_sz - 1 to just log₂(comm_sz) operations. For example,
with comm_sz = 1024, the number of operations drops from 1023 to just 10, significantly
improving performance.
The advantage lies in concurrent execution—several adds and receives happen
simultaneously in early phases, which reduces total execution time. However, coding this
manually can be complex, especially when choosing optimal pairings (e.g., pairing 0 & 4, 1 &
5, etc., as in Fig. 3.7). Each strategy may perform differently based on the system architecture
and number of processes, making it difficult to predict the best approach without
experimentation or system-specific optimization.
3.4.2 MPI_Reduce
To avoid redundant effort in writing optimized global sum functions, MPI offers a built-in
collective communication routine called MPI_Reduce, which shifts the responsibility of
optimization to the MPI implementation. This function enables multiple processes in a
communicator (like MPI_COMM_WORLD) to perform a reduction operation—such as sum,
maximum, minimum, or product—on data distributed across processes. Such collective
communications contrast with point-to-point operations (MPI_Send, MPI_Recv) by
involving all processes in the communicator.
The prototype for MPI_Reduce is:
The fifth parameter—the operator of type MPI_Op—is central to its generality. MPI defines
several built-in operations for this parameter, such as MPI_SUM, MPI_PROD, MPI_MAX, and
MPI_MIN, which are listed in Table 3.2. Programmers can also define custom reduction
operations using MPI_Op_create.
In the parallel trapezoidal rule program, this call replaces the manual summation logic in
the root process:
This line aggregates local_int from each process into total_int at process 0. For arrays,
MPI_Reduce works similarly. For instance, with double local_x[N];, one can compute a
component-wise sum as:
Thus, MPI_Reduce provides a simple, efficient, and portable way to perform global
reductions across processes.
3.4.3 Collective vs. Point-to-Point Communications
Collective communications in MPI differ significantly from point-to-point communications in
the following ways:
1. All processes in the communicator must call the same collective function. For
instance, calling MPI_Reduce on one process and MPI_Recv on another is incorrect and
will likely cause the program to hang or crash.
2. Arguments passed by each process must be compatible. If one process uses
dest_process = 0 and another uses dest_process = 1 in a call to MPI_Reduce, the result
is erroneous.
3. The output_data_p argument is used only by the destination process, but all
processes must still pass a valid argument (e.g., NULL for non-destination processes).
4. Collective communications do not use tags, unlike point-to-point operations. They
are matched only by communicator and the order of calls. As shown in Table 3.3, if
each process calls MPI_Reduce twice, the sequence determines matching—not variable
names—so values accumulate in the order of function invocation.
A final cautionary point: using the same buffer for input and output (e.g., MPI_Reduce(&x,
&x, ...)) is illegal in MPI, due to aliasing restrictions. This restriction, rooted in Fortran
compatibility, ensures that output arguments don’t overlap with input memory, preventing
undefined or erroneous behavior.
3.4.4 MPI_Allreduce
In some parallel programs, all processes may need access to the result of a global
computation, such as a sum. While MPI_Reduce provides the result only to a designated
destination process, MPI_Allreduce ensures that the result is distributed to all processes
in the communicator. This eliminates the need to implement custom distribution schemes,
such as reversing the branches of a tree structure (as in Fig. 3.8) or designing a butterfly
communication pattern (Fig. 3.9), which could be complex to code and optimize.
MPI handles this efficiently through the following function:
The parameters are identical to those of MPI_Reduce, except that there is no dest_process—
because every process receives the result. This function simplifies parallel programming by
removing the burden of manually distributing aggregated data across processes and allows all
processes to proceed with further computation using the shared result.
3.4.5 Broadcast
To optimize input distribution in a parallel program—similar to how we optimized the global
sum using tree-structured communication—we can use broadcasting. A broadcast is a
collective communication operation where data from a single process (the source) is sent
to all other processes in the communicator. For example, if process 0 reads the input data, it
can broadcast the values of a, b, and n to all other processes using a tree structure like the
one in Fig. 3.10.
MPI provides the MPI_Bcast function for this purpose:
int MPI_Bcast(
void* data_p, // in/out
int count, // in
MPI_Datatype datatype, // in
int source_proc, // in
MPI_Comm comm // in
);
In this function, the process identified by source_proc sends data to all processes in the
communicator comm. The data_p parameter is an input on the source process and an output
on all others. Although labeled in/out, its role depends on the process’s rank.
Using MPI_Bcast, we avoid writing separate send operations to each process and benefit
from optimized implementations provided by MPI—just like with MPI_Reduce.
3.4.7 Scatter
When testing a vector addition function in MPI, it’s helpful to input the dimension and
elements of vectors x and y. While MPI_Bcast works well for broadcasting the vector size,
using it to broadcast entire vectors would be inefficient. For example, with 10 processes and
vectors of 10,000 elements, broadcasting the full vectors would force every process to allocate
memory for 10,000 elements—despite only working on a subset.
To solve this, MPI offers MPI_Scatter, which allows process 0 to distribute only the
required segment of a vector to each process. This enables each process to store and
operate only on its assigned subvector, greatly optimizing memory usage. The function
signature is:
If using a block distribution and if n is divisible by comm_sz, then send_count and recv_count
should both be local_n = n / comm_sz, and send_type / recv_type should be MPI_DOUBLE. Each
process receives its local_n chunk from send_buf_p, while process 0 reads and holds the entire
input vector. The MPI_Scatter call sends the first block to process 0, second to process 1,
and so on, making it ideal for block distributions only. This approach is used in the
Read_vector function in Program 3.9.
A key caveat: this strategy only works efficiently when n is evenly divisible by comm_sz
and block distribution is used.
3.4.8 Gather
To make a vector addition program useful, we need a way to view the results. Since the
resulting vector is distributed across processes, we need to collect all subvector results onto
process 0, where they can be printed in full. MPI provides the collective function MPI_Gather
for this purpose. The function allows each process to send its portion of the vector to a single
destination process (typically rank 0), which stores the data in the correct order.
The function prototype is:
Each process sends the contents of send_buf_p to dest_proc. On the destination process (e.g.,
process 0), these chunks are stored in recv_buf_p in rank order: the data from process 0 goes
into the first block, from process 1 into the second, and so on. So, if using block
distribution with each process holding local_n elements, then recv_count should also be
local_n. Importantly, recv_count refers to the number of elements from each process, not the
total number collected.
This is effectively the reverse operation of MPI_Scatter, and similar constraints apply: it
assumes all blocks are of equal size and works best with block-distributed vectors. The
gathering logic for printing a distributed vector is implemented in Program 3.10.
3.4.9 Allgather
To perform parallel matrix-vector multiplication, we begin with the mathematical
formulation: if A is an m×n matrix and x is a vector of n components, then y = Ax results in a
vector y of mmm components, where To parallelize this,
we use block row distribution for matrix A so that each process gets a set of rows, and the
corresponding block distribution of y so that process q computes the values of y for its rows
of A.
The challenge arises with vector x, since each component of y requires all components of x.
In a serial code, this is straightforward, but in parallel code—especially in iterative
applications where y becomes the next x—we typically use the same block distribution for
both x and y. To compute the result, each process needs the entire vector x, not just its block.
One naïve approach is to use MPI_Gather followed by MPI_Bcast, but this introduces
redundant communication. Instead, MPI provides MPI_Allgather, a collective operation that
gathers data from all processes and distributes the complete result to all processes in a single
call.
This function collects send_buf_p from each process and concatenates the segments into
recv_buf_p on every process. Each process ends up with a full copy of the distributed
vector, enabling them to compute their dot-products locally. Typically, send_count =
recv_count, and the function is efficient for repeated calls, such as in iterative solvers, where
x is reused. For performance, it's advisable to allocate x once in the calling function and pass
it to the matrix-vector multiplication routine, as done in Program 3.12.
3.5 MPI-derived Datatypes
In distributed-memory systems, communication is significantly more costly than local
computation. For instance, sending a double between nodes is far slower than a local addition.
Moreover, sending many small messages is much less efficient than sending one large
message. As demonstrated in the example with double x[1000], a loop sending individual
elements takes 50–100× longer than sending the entire array in a single call. To reduce
message overhead, MPI supports three strategies: using the count parameter in
communication functions, derived datatypes, and MPI_Pack/Unpack.
Derived datatypes allow grouping multiple memory elements—including those of different
types—into a single transferable unit. For instance, in the trapezoidal rule program, three
separate MPI_Bcast calls (for a, b, and n) can be replaced with a single call using a derived
datatype composed of two MPI_DOUBLEs and one MPI_INT. This requires knowledge of the
displacements of variables in memory. Suppose the addresses of a, b, and n are 24, 40, and
48 respectively. Then the derived type is defined by:
This method simplifies communication and enhances performance. These steps are
implemented in a reusable Build_mpi_type function, which is called from an updated
Get_input routine (see Program 3.13). This technique is highly useful for compact
communication in heterogeneous data structures.
3.6 Performance Evaluation of MPI Programs
One of the main reasons we write parallel MPI programs is to achieve better performance
compared to serial versions. But how do we measure this improvement? The key is to evaluate
only the computational part (like matrix-vector multiplication), ignoring unrelated tasks
like input and output.
MPI provides a built-in timing function called MPI_Wtime(), which returns the elapsed wall
clock time (real-world time, including wait and idle periods). This differs from CPU time
(returned by functions like clock()), which doesn't account for waiting during communication.
3.6.1 Taking Timings
When timing MPI code, we typically exclude non-computational steps (e.g., input, output).
We use MPI_Wtime() as follows:
For serial code, MPI is not needed. We can use the GET_TIME macro (defined in timer.h,
downloadable from the book's website) which wraps gettimeofday():
Be aware that GET_TIME is a macro, not a function, and takes a double (not a pointer). Also,
if timer.h is not in your working directory, compile with:
Both MPI_Wtime() and GET_TIME report wall clock time, which includes communication
delays (e.g., time spent waiting in MPI_Recv). This is more relevant in parallel environments
where idle time impacts real performance.
To obtain a single elapsed time for the parallel run (instead of separate times for each
process), we use MPI_Barrier to synchronize processes and MPI_Reduce with the MPI_MAX
operator to gather the maximum elapsed time:
This gives a more realistic total parallel execution time.
Also, we must account for variability in timings due to OS activity and other system-level
factors. Even with identical input and system load, execution time may vary across runs.
Hence, we usually report the minimum observed runtime, assuming the fastest run
occurred with the least interference.
When running MPI on hybrid multicore systems, it’s often more efficient to run one MPI
process per node, reducing communication overhead and contention, leading to more
consistent and improved timings.
3.6.2 Results
The timing results for the matrix-vector multiplication program are summarized in Table
3.5. The input matrices are square, and times are shown in milliseconds, rounded to two
significant digits.
• For comm_sz = 1, the timings represent a serial run on a single core.
• As expected, for a fixed number of processes, increasing the matrix size n leads to
longer runtimes.
• For small numbers of processes, doubling n roughly quadruples the runtime. But
for larger process counts, this pattern does not always hold.
If we fix n and increase comm_sz (number of processes):
• Runtime usually decreases, especially for large values of n.
• Doubling the number of processes roughly halves the runtime.
• But for small n, adding more processes offers diminishing or no benefit.
For instance:
This behavior is typical of parallel programs. As problem size increases, runtimes
increase regardless of the number of processes, though the rate of increase
depends on the number of processes:
• One-process times increase steadily.
• Multi-process times can fluctuate (especially at 16 processes).
A useful performance model expresses parallel time as:
Where:
• T_serial(n) is the time taken by the serial program on input size n.
• T_parallel(n, p) is the time taken by the parallel program on input size n using p
processes.
• T_overhead represents the parallelization cost, often due to communication,
especially in MPI programs.
For the matrix-vector multiplication, the serial code has two nested loops:
This performs:
• 2n floating point operations per row (n multiplications + n additions),
• Executed m times, giving 2mn total floating point operations.
So, if m = n, the serial runtime can be approximated as:
(for some constant a).
In the parallel version:
• Each process handles (n/p) × n elements.
• So the local computation time is approximately:
But there's an additional overhead due to MPI_Allgather, so the parallel time becomes:
Empirical Observations
For small p (2 or 4) and large n, the runtime behaves close to T_serial(n) / p:
Doubling p approximately halves the runtime:
Thus, the MPI_Allgather overhead appears to have little impact when:
• n is large, and p is small.
Breakdown for Small n and Large p
However, the approximation fails when:
• n is small, and p is large.
For example:
This indicates that for small problem sizes, the communication cost (T_allgather)
dominates the performance, limiting scalability.
3.6.3 Speedup and Efficiency
An essential metric in evaluating parallel program performance is speedup, denoted
as:
This represents how many times faster the parallel program runs with p processes compared
to the serial version on input of size n. The ideal case is when:
This is called linear speedup, indicating perfect scaling—i.e., a program with 16 processes is
16 times faster than the serial one. However, linear speedup is rare in practice due to
communication overhead and load imbalance.
In the case of the matrix-vector multiplication program, the speedups obtained are listed
in Table 3.6. Observations include:
• For small p and large n, the program achieved near-linear speedup.
• For large p and small n, the speedup was significantly less than p.
• The worst performance occurred at n = 1024 with p = 16, where the speedup
dropped to only 2.4, far from the ideal value of 16.
Another related metric is parallel efficiency, defined as the speedup per process:
A value of 1.0 indicates perfect efficiency, where each process contributes fully to the
speedup. In general, efficiency is less than 1, and decreases as communication and
synchronization overheads increase.
The efficiencies for matrix-vector multiplication are shown in Table 3.7:
• When n is large and p is small, efficiency is close to 1.0.
• As p increases and n remains small, efficiency drops significantly.
This reinforces the principle that parallelism pays off when applied to large problems and
that adding more processes to small problems often results in poor scalability.
3.6.4 Scalability
When analyzing the performance of parallel programs, it's not enough to look only at speedup
and efficiency. Another crucial factor is scalability.
A parallel program is said to be scalable if the problem size can be increased at a rate that
maintains the same efficiency as the number of processes (p) increases. In other words, a
scalable program keeps working well even when we use more processors, provided we
increase the workload proportionally.
However, this definition has some ambiguity—specifically in the phrase: “problem size can be
increased at a rate...” Let's examine two hypothetical programs:
• Program A maintains a constant efficiency of 0.75 for all p≥2, regardless of
problem size.
• Program B has efficiency defined as:
In Program B, if we increase both n and p proportionally, we can maintain constant
efficiency. For example:
So both programs are technically scalable, but Program A is more scalable
because it does not even require an increase in problem size to maintain efficiency. This
leads to two common categories of scalability:
• Strong scalability: Constant efficiency without increasing problem size (Program A).
• Weak scalability: Constant efficiency if the problem size increases proportionally
with the number of processes (Program B).
Matrix-Vector Multiplication and Scalability
Looking at the parallel efficiency values in Table 3.7, we can conclude:
• The matrix-vector multiplication program is not strongly scalable. For nearly every
case, increasing the number of processes results in reduced efficiency.
• However, the program shows traits of weak scalability. When both p and n are
doubled, parallel efficiency often improves.
For instance:
• Increasing p from 4 to 8 or 8 to 16 with a corresponding increase in n consistently
improves or maintains efficiency.
• The only slight deviation happens when p = 2 and p=4, but this is usually not a
concern, since scalability discussions often focus on larger processor counts.
Thus, the matrix-vector multiplication program is weakly scalable. While not ideal
for small problems or low processor counts, it handles larger data and processor
configurations reasonably well—a desirable property in large-scale parallel
computing.
3.7 A Parallel Sorting Algorithm
In a distributed memory environment, a parallel sorting algorithm deals with data (keys)
spread across multiple processes. The input and output depend on the data distribution—
keys can either be distributed among processes or assigned to a single process. In this
case, we focus on an algorithm where both input and output are distributed, meaning each
process starts and ends with keys, assuming n is divisible by the number of processes
p=comm_sz. Initially, keys are randomly distributed across processes. The goal is to ensure
that, after execution:
• Each process’s local keys are sorted in increasing order, and
• For any two processes q<r, every key in process q is less than or equal to every key
in process r.
Thus, if we concatenate the keys from all processes in order of increasing rank, we get a fully
sorted list. For simplicity, we assume the keys are of type int. This sets the stage for designing
efficient and scalable parallel sorting algorithms that operate correctly and coherently in a
distributed setting.
3.7.1 Some Simple Serial Sorting Algorithms
To better understand parallel sorting, let’s briefly review a couple of serial sorting
techniques. One well-known method is Bubble Sort, which repeatedly compares and swaps
adjacent values to move larger elements toward the end of the list. On each outer loop pass,
the largest unsorted value "bubbles" into its correct position, and the unsorted portion shrinks
by one.
However, Bubble Sort is inherently sequential—a swap at one position may affect adjacent
comparisons. For instance, a sequence like 9, 5, 7 needs ordered swaps to correctly become 5,
7, 9; swapping 5 and 7 prematurely would yield the wrong result.
To introduce parallelism, we turn to Odd-Even Transposition Sort, which breaks
dependencies using phases:
• Even phases compare and swap: (a[0], a[1]), (a[2], a[3]), ...
• Odd phases compare and swap: (a[1], a[2]), (a[3], a[4]), ...
Example:
A theorem guarantees that n elements will be sorted in at most n phases.
3.7.2 Parallel Odd-Even Transposition Sort
In a distributed memory system, the same concept is extended across p processes, each
holding n/p elements (assuming n is evenly divisible by p).
When n=p (each process has one key), the algorithm is intuitive:
At each phase, process i exchanges keys with either i-1 or i+1 based on phase type (even or
odd), and keeps either the smaller or larger key based on its rank.
But this is impractical for real applications (we usually have n>p), and communication
overhead outweighs benefits for small inputs.
When each process holds multiple keys, the process is:
1. Locally sort keys using fast serial sort like qsort.
2. In each phase:
o Determine a partner (left or right neighbor depending on phase).
o Exchange full key arrays with the partner.
o Merge the two arrays:
▪ Lower-ranked process keeps the smaller half.
▪ Higher-ranked process keeps the larger half.
This ensures that:
• Each process maintains sorted keys.
• Global ordering progresses toward a fully sorted distributed list.
Table 3.8 illustrates this procedure across 4 processes and 16 keys.
Theorem
If parallel odd-even transposition sort is run with p processes, then after p phases, the entire
input list will be sorted.
Pseudocode
MPI_PROC_NULL is a special constant indicating no communication should occur for that
process in that phase.
3.7.3 Safety in MPI Programs
In a parallel MPI program, communication between processes is typically implemented using
MPI_Send and MPI_Recv calls. For example:
However, this approach can lead to problems such as program hanging or crashing,
especially when processes engage in mutual communication simultaneously. The behavior of
MPI_Send is defined by the MPI standard to be either buffered (non-blocking) or blocking.
Buffered mode allows the program to proceed without waiting for a corresponding receive,
while blocking mode waits until a matching MPI_Recv is initiated.
Most MPI implementations use a threshold mechanism: small messages are typically
buffered, whereas larger messages invoke blocking behavior. When all processes perform
MPI_Send simultaneously with large messages, none may reach the MPI_Recv stage, leading
to a deadlock—a situation where every process is waiting indefinitely for another to proceed.
A program that implicitly assumes buffered communication is termed an unsafe MPI
program. While such a program might execute correctly for small problem sizes, it is prone
to failure when the message sizes increase, or system buffering behavior changes.
Detecting Unsafe Communication
To determine whether a program is safe, the MPI standard provides a synchronous version of
MPI_Send called MPI_Ssend. The call to MPI_Ssend ensures that the send operation blocks
until the matching receive is initiated by the receiving process.
By substituting MPI_Send with MPI_Ssend, one can check the safety of the program. If the
program executes correctly with MPI_Ssend, it is considered safe. Otherwise, if it hangs or
crashes, the original version using MPI_Send is unsafe.
Causes of Unsafe Programs and Restructuring
Unsafe communication often arises when all processes attempt to send first and receive
later. For instance, in a ring communication pattern, where process q sends to (q + 1) %
comm_sz, simultaneous send operations can result in a deadlock:
To make such communication patterns safe, the communication must be restructured, so that
some processes receive before sending. An example of a safe structure is:
if (my_rank % 2 == 0) {
MPI_Send(msg, size, MPI_INT, (my_rank + 1) % comm_sz, 0, comm);
MPI_Recv(new_msg, size, MPI_INT, (my_rank + comm_sz - 1) % comm_sz, 0, comm,
MPI_STATUS_IGNORE);
} else {
MPI_Recv(new_msg, size, MPI_INT, (my_rank + comm_sz - 1) % comm_sz, 0, comm,
MPI_STATUS_IGNORE);
MPI_Send(msg, size, MPI_INT, (my_rank + 1) % comm_sz, 0, comm);
}
This pattern ensures that half of the processes perform Recv before Send, breaking the
circular dependency that leads to deadlock. This method is effective for both even and odd
values of comm_sz, as shown in Figure 3.13, which illustrates how a safe communication order
can be maintained even in an odd-sized process ring.
Using MPI_Sendrecv for Safe Communication
An alternative and cleaner approach to avoid explicit coordination is to use the MPI_Sendrecv
function, which performs a send and receive operation in a single call, ensuring safe
execution:
This method guarantees that no deadlock will occur because MPI internally schedules the
communication to ensure that one side sends while the other receives.
Additionally, if the send and receive buffers are the same, MPI provides:
This function sends the data in buf, replaces it with the received data, and is useful for in-place
communication patterns, such as in odd-even transposition sort.
3.7.4 Final Details of Parallel Odd-Even Sort
The final implementation of the parallel odd-even transposition sort uses a safe
communication strategy by combining the send and receive operations into a single call to
MPI_Sendrecv. This helps avoid deadlock issues that arise from unsafe usage of MPI_Send and
MPI_Recv, particularly when all processes attempt to send first. The improved communication
structure ensures reliable exchange of keys between paired processes during each phase. The
general algorithm remains the same: each process sorts its local keys first, and during each
phase, communicates with a partner process, compares the combined keys, and retains either
the smaller or larger half depending on its rank.
To make the process efficient, we avoid sorting the full list of combined keys. Instead, we
merge the two sorted sublists—each of size n/p—to select the required subset. If a process is
supposed to keep the smaller keys, it performs a forward merge and stops once n/p keys are
obtained. If it must keep the larger keys, it merges backward from the end of the arrays. This
partial merge is faster than full sorting. An additional optimization skips the need to copy
arrays entirely by simply swapping pointers, which reduces memory operations (this
pointer-swapping trick is elaborated in Exercise 3.28).
The performance of this final version is reflected in Table 3.9, which shows the run-times.
It’s important to note that for a single process, the sorting is done using serial quicksort,
which is significantly faster than a serial odd-even sort. As expected, run-times improve with
increasing number of processes, especially when n is large enough to offset communication
costs. However, for smaller n, gains are modest.
This final version represents a practical and scalable parallel sorting strategy for distributed
memory systems, leveraging local sorting, partner communication, safe message
passing, and efficient merging