This lesson is in the early stages of development (Alpha version)

Running a parallel job

Overview

Teaching: 30 min
Exercises: 30 min
Questions
  • How do we execute a task in parallel?

Objectives
  • Understand how to run a parallel job on a cluster.

We now have the tools we need to run a multi-processor job. This is a very important aspect of HPC systems, as parallelism is one of the primary tools we have to improve the performance of computational tasks.

Our example implements a stochastic algorithm for estimating the value of π, the ratio of the circumference to the diameter of a circle. The program generates a large number of random points on a 1×1 square centered on (½,½), and checks how many of these points fall inside the unit circle. On average, π/4 of the randomly-selected points should fall in the circle, so π can be estimated from 4f, where f is the observed fraction of points that fall in the circle. Because each sample is independent, this algorithm is easily implemented in parallel.

/hpc-intro/Algorithm%20for%20computing%20pi%20through%20random%20sampling

A Serial Solution to the Problem

We start from a Python script using concepts taught in Software Carpentry’s Programming with Python workshops. We want to allow the user to specify how many random points should be used to calculate π through a command-line parameter. This script will only use a single CPU for its entire run, so it’s classified as a serial process.

Let’s write a Python program, pi.py, to estimate π for us. Start by importing the numpy module for calculating the results, and the sys module to process command-line parameters:

import numpy as np
import sys

We define a Python function inside_circle that accepts a single parameter for the number of random points used to calculate π. See Programming with Python: Creating Functions for a review of Python functions. It randomly samples points with both x and y on the half-open interval [0, 1). It then computes their distances from the origin (i.e., radii), and returns how many of those distances were less than or equal to 1.0. All of this is done using vectors of double-precision (64-bit) floating-point values.

def inside_circle(total_count):
    x = np.random.uniform(size=total_count)
    y = np.random.uniform(size=total_count)
    radii = np.sqrt(x*x + y*y)
    count = len(radii[np.where(radii<=1.0)])
    return count

Next, we create a main function to call the inside_circle function and calculate π from its returned result. See Programming with Python: Command-Line Programs for a review of main functions and parsing command-line parameters.

def main():
    n_samples = int(sys.argv[1])
    counts = inside_circle(n_samples)
    my_pi = 4.0 * counts / n_samples
    print(my_pi)

if __name__ == '__main__':
    main()

If we run the Python script locally with a command-line parameter, as in python pi-serial.py 1024, we should see the script print its estimate of π:

[user@laptop ~]$ python pi-serial.py 1024
3.10546875

Measuring Performance of the Serial Solution

The stochastic method used to estimate π should converge on the true value as the number of random points increases. But as the number of points increases, creating the variables x, y, and radii requires more time and more memory. Eventually, the memory required may exceed what’s available on our local laptop or desktop, or the time required may be too long to meet a deadline. So we’d like to take some measurements of how much memory and time the script requires, and later take the same measurements after creating a parallel version of the script to see the benefits of parallelizing the calculations required.

Estimating Memory Requirements

Since the largest variables in the script are x, y, and radii, each containing n_samples points, we’ll modify the script to report their total memory required. Each point in x, y, or radii is stored as a NumPy float64, we can use NumPy’s dtype function to calculate the size of a float64.

Replace the print(my_pi) line with the following:

size_of_float = np.dtype(np.float64).itemsize
memory_required = 3 * n_samples * size_of_float / (1024**3)
print("Pi: {}, memory: {} GiB".format(my_pi, memory_required))

The first line calculates the bytes of memory required for a single float64 value using the dtypefunction. The second line estimates the total amount of memory required to store three variables containing n_samples float64 values, converting the value into units of gibibytes. The third line prints both the estimate of π and the estimated amount of memory used by the script.

The updated Python script is:

import numpy as np
import sys

def inside_circle(total_count):
    x = np.random.uniform(size=total_count)
    y = np.random.uniform(size=total_count)
    radii = np.sqrt(x*x + y*y)
    count = len(radii[np.where(radii<=1.0)])
    return count

def main():
    n_samples = int(sys.argv[1])
    counts = inside_circle(n_samples)
    my_pi = 4.0 * counts / n_samples
    size_of_float = np.dtype(np.float64).itemsize
    memory_required = 3 * n_samples * size_of_float / (1024**3)
    print("Pi: {}, memory: {} GiB".format(my_pi, memory_required))

if __name__ == '__main__':
    main()

Run the script again with a few different values for the number of samples, and see how the memory required changes:

[user@laptop ~]$ python pi-serial.py 1000
Pi: 3.144, memory: 2.2351741790771484e-05 GiB
[user@laptop ~]$ python pi-serial.py 2000
Pi: 3.18, memory: 4.470348358154297e-05 GiB
[user@laptop ~]$ python pi-serial.py 1000000
Pi: 3.140944, memory: 0.022351741790771484 GiB
[user@laptop ~]$ python pi-serial.py 100000000
Pi: 3.14182724, memory: 2.2351741790771484 GiB

Here we can see that the estimated amount of memory required scales linearly with the number of samples used. In practice, there is some memory required for other parts of the script, but the x, y, and radii variables are by far the largest influence on the total amount of memory required.

Estimating Calculation Time

Most of the calculations required to estimate π are in the inside_circle function:

  1. Generating n_samples random values for x and y.
  2. Calculating n_samples values of radii from x and y.
  3. Counting how many values in radii are under 1.0.

There’s also one multiplication operation and one division operation required to convert the counts value to the final estimate of π in the main function.

A simple way to measure the calculation time is to use Python’s datetime module to store the computer’s current date and time before and after the calculations, and calculate the difference between those times.

To add the time measurement to the script, add the following line below the import sys line:

import datetime

Then, add the following line immediately above the line calculating counts:

start_time = datetime.datetime.now()

Add the following two lines immediately below the line calculating counts:

end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()

And finally, modify the print statement with the following:

print("Pi: {}, memory: {} GiB, time: {} s".format(my_pi, memory_required,
                                                  elapsed_time))

The final Python script for the serial solution is:

import numpy as np
import sys
import datetime

def inside_circle(total_count):
    x = np.random.uniform(size=total_count)
    y = np.random.uniform(size=total_count)
    radii = np.sqrt(x*x + y*y)
    count = len(radii[np.where(radii<=1.0)])
    return count

def main():
    n_samples = int(sys.argv[1])
    start_time = datetime.datetime.now()
    counts = inside_circle(n_samples)
    my_pi = 4.0 * counts / n_samples
    end_time = datetime.datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()
    size_of_float = np.dtype(np.float64).itemsize
    memory_required = 3 * n_samples * size_of_float / (1024**3)
    print("Pi: {}, memory: {} GiB, time: {} s".format(my_pi, memory_required,
                                                      elapsed_time))

if __name__ == '__main__':
    main()

Run the script again with a few different values for the number of samples, and see how the solution time changes:

[user@laptop ~]$ python pi-serial.py 1000000
Pi: 3.139612, memory: 0.022351741790771484 GiB, time: 0.034872 s
[user@laptop ~]$ python pi-serial.py 10000000
Pi: 3.1425492, memory: 0.22351741790771484 GiB, time: 0.351212 s
[user@laptop ~]$ python pi-serial.py 100000000
Pi: 3.14146608, memory: 2.2351741790771484 GiB, time: 3.735195 s

Here we can see that the amount of time required scales approximately linearly with the number of samples used. There could be some variation in additional runs of the script with the same number of samples, since the elapsed time is affected by other programs running on the computer at the same time. But if the script is the most computationally-intensive process running at the time, its calculations are the largest influence on the elapsed time.

Now that we’ve developed our initial script to estimate π, we can see that as we increase the number of samples:

  1. The estimate of π tends to become more accurate.
  2. The amount of memory required scales approximately linearly.
  3. The amount of time to calculate scales approximately linearly.

In general, achieving a better estimate of π requires a greater number of points. Take a closer look at inside_circle: should we expect to get high accuracy on a single machine?

Probably not. The function allocates three arrays of size N equal to the number of points belonging to this process. Using 64-bit floating point numbers, the memory footprint of these arrays can get quite large. Each 100,000,000 points sampled consumes 2.24 GiB of memory. Sampling 400,000,000 points consumes 8.94 GiB of memory, and if your machine has less RAM than that, it will grind to a halt. If you have 16 GiB installed, you won’t quite make it to 750,000,000 points.

Running the Serial Job on a Compute Node

Create a submission file, requesting one task on a single node and enough memory to prevent the job from running out of memory:

[yourUsername@gra-login1 ~]$ nano serial-pi.sh
[yourUsername@gra-login1 ~]$ cat serial-pi.sh
#!/usr/bin/env bash
#SBATCH -J serial-pi
#SBATCH -p testing
#SBATCH -N 1
#SBATCH -n 1
#SBATCH --mem=3G

# Load the computing environment we need
module load python3

# Execute the task
python pi.py 100000000

Then submit your job. We will use the batch file to set the options, rather than the command line.

[yourUsername@gra-login1 ~]$ sbatch serial-pi.sh

As before, use the status commands to check when your job runs. Use ls to locate the output file, and examine it. Is it what you expected?

Modify the job script to increase both the number of samples and the amount of memory requested (perhaps by a factor of 2, then by a factor of 10), and resubmit the job each time.

Even with sufficient memory for necessary variables, a script could require enormous amounts of time to calculate on a single CPU. To reduce the amount of time required, we need to modify the script to use multiple CPUs for the calculations. In the largest problem scales, we could use multiple CPUs in multiple compute nodes, distributing the memory requirements across all the nodes used to calculate the solution.

Running the Parallel Job

We will run an example that uses the Message Passing Interface (MPI) for parallelism — this is a common tool on HPC systems.

What is MPI?

The Message Passing Interface is a set of tools which allow multiple parallel jobs to communicate with each other. Typically, a single executable is run multiple times, possibly on different machines, and the MPI tools are used to inform each instance of the executable about how many instances there are, which instance it is. MPI also provides tools to allow communication and coordination between instances. An MPI instance typically has its own copy of all the local variables.

While MPI jobs can generally be run as stand-alone executables, in order for them to run in parallel they must use an MPI run-time system, which is a specific implementation of the MPI standard. To do this, they should be started via a command such as mpiexec (or mpirun, or srun, etc. depending on the MPI run-time you need to use), which will ensure that the appropriate run-time support for parallelism is included.

MPI run-time arguments

On their own, commands such as mpiexec can take many arguments specifying how many machines will participate in the execution, and you might need these if you would like to run an MPI program on your laptop (for example). In the context of a queuing system, however, it is frequently the case that we do not need to specify this information as the MPI run-time will have been configured to obtain it from the queuing system, by examining the environment variables set when the job is launched.

What changes are needed for an MPI version of the π calculator?

First, we need to import the MPI object from the Python module mpi4py by adding an from mpi4py import MPI line immediately below the import datetime line.

Second, we need to modify the “main” function to perform the overhead and accounting work required to:

  • subdivide the total number of points to be sampled,
  • partition the total workload among the various parallel processors available,
  • have each parallel process report the results of its workload back to the “rank 0” process, which does the final calculations and prints out the result.

The modifications to the serial script demonstrate four important concepts:

  • COMM_WORLD: the default MPI Communicator, providing a channel for all the processes involved in this mpiexec to exchange information with one another.
  • Scatter: A collective operation in which an array of data on one MPI rank is divided up, with separate portions being sent out to the partner ranks. Each partner rank receives data from the matching index of the host array.
  • Gather: The inverse of scatter. One rank populates a local array, with the array element at each index assigned the value provided by the corresponding partner rank — including the host’s own value.
  • Conditional Output: since every rank is running the same code, the partitioning, the final calculations, and the print statement are wrapped in a conditional so that only one rank performs these operations.

We add the lines:

comm = MPI.COMM_WORLD
cpus = comm.Get_size()
rank = comm.Get_rank()

immediately before the n_samples line to set up the MPI environment for each process.

We replace the start_time and counts lines with the lines:

if rank == 0:
  start_time = datetime.datetime.now()
  partitions = [ int(n_samples / cpus) ] * cpus
  counts = [ int(0) ] * cpus
else:
  partitions = None
  counts = None

This ensures that only the rank 0 process measures times and coordinates the work to be distributed to all the ranks, while the other ranks get placeholder values for the partitions and counts variables.

Immediately below these lines, let’s

by adding the following three lines:

partition_item = comm.scatter(partitions, root=0)
count_item = inside_circle(partition_item)
counts = comm.gather(count_item, root=0)

Illustrations of these steps are shown below.


Setup the MPI environment and initialize local variables — including the vector containing the number of points to generate on each parallel processor:

/hpc-intro/MPI%20initialize

Distribute the number of points from the originating vector to all the parallel processors:

/hpc-intro/MPI%20scatter

Perform the computation in parallel:

/hpc-intro/MPI%20compute

Retrieve counts from all the parallel processes:

/hpc-intro/MPI%20gather

Print out the report:

/hpc-intro/MPI%20finalize

Finally, we’ll ensure the my_pi through print lines only run on rank 0. Otherwise, every parallel processor will print its local value, and the report will become hopelessly garbled:

if rank == 0:
   my_pi = 4.0 * sum(counts) / sum(partitions)
   end_time = datetime.datetime.now()
   elapsed_time = (end_time - start_time).total_seconds()
   size_of_float = np.dtype(np.float64).itemsize
   memory_required = 3 * sum(partitions) * size_of_float / (1024**3)
   print("Pi: {}, memory: {} GiB, time: {} s".format(my_pi, memory_required,
                                                            elapsed_time))

A fully commented version of the final MPI parallel python code is available here.

Our purpose here is to exercise the parallel workflow of the cluster, not to optimize the program to minimize its memory footprint. Rather than push our local machines to the breaking point (or, worse, the login node), let’s give it to a cluster node with more resources.

Create a submission file, requesting more than one task on a single node:

[yourUsername@gra-login1 ~]$ nano parallel-pi.sh
[yourUsername@gra-login1 ~]$ cat parallel-pi.sh

#!/usr/bin/env bash #SBATCH -J parallel-pi #SBATCH -p testing #SBATCH -N 1 #SBATCH -n 4 #SBATCH –mem=3G

Load the computing environment we need

module load python3

Execute the task

mpiexec python pi.py 100000000

{: .language-bash}

Then submit your job. We will use the batch file to set the options, rather than the command line.

[yourUsername@gra-login1 ~]$ sbatch parallel-pi.sh

As before, use the status commands to check when your job runs. Use ls to locate the output file, and examine it. Is it what you expected?

Modify the job script to increase both the number of samples and the amount of memory requested (perhaps by a factor of 2, then by a factor of 10), and resubmit the job each time. You can also increase the number of CPUs.

How Much Does MPI Improve Performance?

In theory, by dividing up the π calculations among n MPI processes, we should see run times reduce by a factor of n. In practice, some time is required to start the additional MPI processes, for the MPI processes to communicate and coordinate, and some types of calculations may only be able to run effectively on a single CPU.

Additionally, if the MPI processes operate on different physical CPUs in the computer, or across multiple compute nodes, additional time is required for communication compared to all processes operating on a single CPU.

Amdahl’s Law is one way of predicting improvements in execution time for a fixed parallel workload. If a workload needs 20 hours to complete on a single core, and one hour of that time is spent on tasks that cannot be parallelized, only the remaining 19 hours could be parallelized. Even if an infinite number of cores were used for the parallel parts of the workload, the total run time cannot be less than one hour.

In practice, it’s common to evaluate the parallelism of an MPI program by

The speedup factor S is calculated as the single-CPU execution time divided by the multi-CPU execution time. For a laptop with 8 cores, the graph of speedup factor versus number of cores used shows relatively consistent improvement when using 2, 4, or 8 cores, but using additional cores shows a diminishing return.

/hpc-intro/MPI%20speedup%20factors%20on%20an%208-core%20laptop

For a set of HPC nodes containing 28 cores each, the graph of speedup factor versus number of cores shows consistent improvements up through three nodes and 84 cores, but worse performance when adding a fourth node with an additional 28 cores. This is due to the amount of communication and coordination required among the MPI processes requiring more time than is gained by reducing the amount of work each MPI process has to complete. This communication overhead is not included in Amdahl’s Law.

/hpc-intro/MPI%20speedup%20factors%20on%20an%208-core%20laptop

In practice, MPI speedup factors are influenced by:

In an HPC environment, we try to reduce the execution time for all types of jobs, and MPI is an extremely common way to combine dozens, hundreds, or thousands of CPUs into solving a single problem.

Key Points

  • Parallelism is an important feature of HPC clusters.

  • MPI parallelism is a common case.

  • The queuing system facilitates executing parallel tasks.