Skip to content

Some Strategies for Using Multiple Nodes of GPUs

Using multiple GPUs is one option to speed up your code. On Apocrita, we have V100, A100 and H100 GPUs available, with up to 4 GPUs per node. On other compute clusters, JADE2 has 8 V100 GPUs per node and Sulis has 3 A100 GPUs per node. If your problem is pleasingly parallel, you can distribute identical or similar tasks to each GPU on a node, or even on multiple nodes.

In Python, there are libraries which can help you use multiple GPUs on one node or multiple nodes. The library multiprocessing, covered in another blog post, can be used to create multiple processes within a node. Each of those processes can utilise a GPU. For multiple nodes, mpi4py can run copies of your code, with different parameters, on different nodes. After running your codes, you can use mpi4py to gather results from the different nodes to be processed. It works well with schedulers such as UGE, used here on Apocrita, and Slurm, used on JADE2 and Sulis.

In this blog, we will demonstrate utilising multiple GPUs using multiprocessing and mpi4py. These were done on Apocrita and Sulis using multiple GPUs and also multiple nodes of GPUs. Rather than a walkthrough, this blog will cover some general strategies and tips when using multiprocessing and mpi4py together.

There are a few ways to use multiple GPUs and we will discuss them here.

Array Jobs

The easiest way to use multiple GPUs is to run an array job. This allows you to run multiple independent jobs when resources become available, not necessarily simultaneously. This is very beginner-friendly and requires minimal changes to your code. However, this only works if your problem can be split into independent jobs. In addition, you will have to write and run another job script to process the results from the array job when it finishes.

multiprocessing

You can use multiprocessing to spawn new Python processes which can run in parallel within a node.

A common way to use multiprocessing is to use Pool.map(), this can spawn new processes each running the same function but with different parameters. In the example below, we create a Pool to call say_hello() with different names.

import multiprocessing

def say_hello(name):
    print(f"Hello {name}")

if __name__ == "__main__":
    names = [
        "Barry",
        "Alice",
        "Barbara",
        "Tom",
    ]
    with multiprocessing.Pool(len(names)) as pool:
        pool.map(say_hello, names)

In a similar spirit, if you have multiple GPUs, you can use Pool.map() to use those GPUs at the same time. In the example below, we use all detectable GPUs simultaneously and put the number 12345 in its memory .

import multiprocessing
import cupy
from cupy import cuda

def use_gpu(gpu_id):
    with cuda.Device(gpu_id):
        cupy.asarray(12345)
        print("Using GPU {gpu_id}")

if __name__ == "__main__":
    multiprocessing.set_start_method("forkserver")
    gpu_ids = range(cuda.runtime.getDeviceCount())
    with multiprocessing.Pool(len(gpu_ids)) as pool:
        pool.map(use_gpu, gpu_ids)

The function cuda.runtime.getDeviceCount() returns the number of GPUs detected. We then spawn that many number of processes, each calling the function use_gpu().

Start methods

Using multiprocessing to use multiple GPUs cannot work when forking processes. You must either spawn processes or create a fork server. This can be done respectively

if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")

and

if __name__ == "__main__":
    multiprocessing.set_start_method("forkserver")

where if __name__ == "__main__": ensures only the main or parent process sets multiprocessing's behaviour. We do not want to give Python an idea for its spawned processes to spawn even more processes. There is more documentation on the different start methods.

I personally prefer to create a subclass of multiprocessing.Process and override the run() method. This has the advantage of using a class structure to represent each process. You can assign all sorts of objects to its member variables, for example, Queues and Pipes to allow communication between processes. Below is the same example but using multiprocessing.Process.

import multiprocessing
import cupy
from cupy import cuda

class Worker(multiprocessing.Process):
    def __init__(self, gpu_id):
        super().__init__()
        self.gpu_id = gpu_id
    def run(self):
        with cuda.Device(self.gpu_id):
            cupy.asarray(12345)
            print(f"Using GPU {self.gpu_id}")

if __name__ == "__main__":
    multiprocessing.set_start_method("forkserver")
    gpu_ids = range(cuda.runtime.getDeviceCount())
    workers = [Worker(gpu_id) for gpu_id in gpu_ids]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()

mpi4py

MPI (message passing interface) is a way to run multiple copies of your code simultaneously. Each copy, or MPI process, is assigned a rank to uniquely identify each MPI process. Each MPI process can be run within a node and/or on multiple nodes and can communicate with each other.

On Apocrita, when using multiple nodes, this is known as a parallel job.

Back to the say_hello() example, we provide an example below called mpi_example.py.

from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()

def say_hello(name):
    print(f"Hello {name}")

if __name__ == "__main__":
    names = [
        "Barry",
        "Alice",
        "Barbara",
        "Tom",
    ]
    say_hello(names[RANK])

Running python mpi_example.py will produce

Hello Barry

What happened here is in python mpi_example.py, only one process is called with RANK = 0. Thus only Barry was greeted.

To invoke multiple MPI processes on your personal computer, use mpirun with the option -n with the number of processes you want, for example

mpirun -n 4 python mpi_example.py

This may produce the output

Hello Alice
Hello Barbara
Hello Barry
Hello Tom

In this example, there are 4 MPI processes runningmpi_example.py. Each MPI process has a different value of RANK, enabling each process to greet a different person.

MPI becomes very useful should you wish to use multiple nodes of GPUs. But it's also useful if you want to write code, in this MPI framework, where each process only uses one GPU. This can then scale up to multiple GPUs on multiple nodes where you run an MPI process for each GPU.

For MPI processes to communicate with each other, mpi4py provides some very useful functions, for example, gather() to collate results from all MPI processes.

We will cover two ways MPI processes can be distributed across nodes:

  • Create an MPI process for each GPU
  • Create an MPI process for each node

This will give some hints and ideas on how to use multiple nodes of GPUs. We will demonstrate using Apocrita and Sulis, which have nodes of GPUs connected with an InfiniBand.

MPI Process for Each GPU ("Pure" MPI approach)

Sulis and Slurm

When deploying an MPI process for each GPU on multiple nodes, we can write code which works out which GPU to use on a node given a RANK. Here's an example script called mpi_per_gpu.py

import platform

import cupy
from cupy import cuda
from mpi4py import MPI

COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()

def use_gpu(gpu_id):
    with cuda.Device(gpu_id):
        cupy.asarray(12345)
        print(f"Rank {RANK} on {platform.node()} using GPU {gpu_id}")

if __name__ == "__main__":
    gpu_id = RANK % cuda.runtime.getDeviceCount()
    use_gpu(gpu_id)

In this example code, the function use_gpu() allocates memory to a specified GPU, prints the node's name and the MPI process' RANK. This is to confirm how the MPI processes are distributed across multiple nodes.

On Sulis, Slurm allocates MPI processes using the fill-up rule. This is where the scheduler will allocate a process for each GPU on one node first before moving to the next one. Thus gpu_id = RANK % cuda.runtime.getDeviceCount() should assign gpu_id a number between 0 inclusive and cuda.runtime.getDeviceCount() exclusive on each node.

Unlike our personal computers, Slurm allocates resources to each MPI process. In order to allocate a process per GPU, let's take a look at the GPU node specifications of Sulis. Each node has

  • 2 x AMD EPYC 7742 (Rome) 2.25 GHz 64-core processors (i.e. 128 cores per node)
  • 512 GB DDR4-3200 RAM per node (i.e. 4 GB per core)
  • 3 x NVIDIA A100 40 GB RAM passively-cooled

Each node has 3 GPUs and we want an MPI process per GPU. Following from this, we could suggest the following allocations:

  • 3 processes (or tasks) per node
  • 42 CPU cores per process
  • 3.85 GB per CPU core

These allocations are to be provided in the job script, example below, which requests 2 nodes

#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=3
#SBATCH --cpus-per-task=42
#SBATCH --mem-per-cpu=3850
#SBATCH --gres=gpu:ampere_a100:3
#SBATCH --partition=gpu
#SBATCH --time=00:10:00
#SBATCH --account=su008

module purge
module load GCC/10.2.0
module load CUDA/11.1.1
module load OpenMPI/4.0.5
module load CuPy/8.5.0

srun python mpi_per_gpu.py

where srun is the replacement for mpirun when using Slurm. An example output of this is

Rank 3 on gpu009.sulis.hpc using GPU 0
Rank 1 on gpu008.sulis.hpc using GPU 1
Rank 4 on gpu009.sulis.hpc using GPU 1
Rank 5 on gpu009.sulis.hpc using GPU 2
Rank 0 on gpu008.sulis.hpc using GPU 0
Rank 2 on gpu008.sulis.hpc using GPU 2

which confirms that there were 3 MPI processes per node, each MPI process using a GPU. We observe the fill-up allocation happening here because the first three ranks are on gpu008.sulis.hpc and the remaining on gpu009.sulis.hpc.

Apocrita and UGE

On Apocrita, to request multiple nodes of GPUs, you use -pe parallel. However, there isn't a one-to-one equivalent of Slurm's --ntasks-per-node on UGE. Apocrita's -pe parallel will default to an MPI process per CPU core using the fill-up allocation rule.

To get around this, we suggest using intelmpi's round-robin allocation rule. This type of allocation allocates the MPI processes evenly across nodes. We can adjust the script mpi_per_gpu.py accordingly by changing gpu_id

import platform

import cupy
from cupy import cuda
from mpi4py import MPI

COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()
SIZE = COMM.Get_size()

def use_gpu(gpu_id):
    with cuda.Device(gpu_id):
        cupy.asarray(12345)
        print(f"Rank {RANK} on {platform.node()} using GPU {gpu_id}")

if __name__ == "__main__":
    n_node = SIZE / cuda.runtime.getDeviceCount()
    gpu_id = RANK // int(n_node)
    use_gpu(gpu_id)

where we want SIZE to be the total number of GPUs across nodes.

Here's an example bash script which requests 2 nodes of 4 GPUs to run this code on.

#!/bin/bash
#$ -l h_rt=240:0:0
#$ -pe parallel 96
#$ -l gpu=4
#$ -l gpu_type=ampere
#$ -cwd
#$ -j y
#$ -q test.q

module load intelmpi/2022.2
module load cuda/12.0.0
module load python/3.10.7

source ../venv/bin/activate
mpirun -rr -n 8 python mpi_per_gpu.py

Do note the following:

  • -pe parallel 96 requests 96 CPU cores, this is equivalent to two nodes
  • -l gpu=4 requests 4 GPUs per node
  • venv is some virtual environment containing the python packages we need
  • With intelmpi, mpirun has the option -rr to use round-robin allocation option
  • mpirun has the option -n 8 to allocate 8 MPI processes

The job's resulting output may look something like this

Rank 1 on rdg12 using GPU 0
Rank 0 on rdg13 using GPU 0
Rank 2 on rdg13 using GPU 1
Rank 6 on rdg13 using GPU 3
Rank 4 on rdg13 using GPU 2
Rank 3 on rdg12 using GPU 1
Rank 5 on rdg12 using GPU 2
Rank 7 on rdg12 using GPU 3

which shows there is a unique rank for each node-GPU combination, meaning there is an MPI process for each GPU. The round-robin allocation is verified by noticing that the even ranks are on rdg13 and the odd ranks are on rdg12.

MPI Process for Each Node (Hybrid Approach)

Another option is to allocate an MPI process per node. Each MPI process uses multiprocessing to utilise all GPUs on a node. This is also known as a hybrid approach. This works because MPI runs copies of your code, including code which uses multiple processes or threads.

MPI forking processes

An MPI process cannot fork more processes. Using multiprocessing.set_start_method("fork") will upset mpi4py. Please use

if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")

or

if __name__ == "__main__":
    multiprocessing.set_start_method("forkserver")

as explained previously.

Below is an example Python code, called mpi_per_node.py which uses all GPUs on a node, prints the node's name and the MPI process' RANK.

import multiprocessing
import platform

import cupy
from cupy import cuda

if __name__ == "__main__":
    multiprocessing.set_start_method("forkserver")
    from mpi4py import MPI
    COMM = MPI.COMM_WORLD
    RANK = COMM.Get_rank()

class Worker(multiprocessing.Process):
    def __init__(self, gpu_id, rank):
        super().__init__()
        self.gpu_id = gpu_id
        self.rank = rank
    def run(self):
        with cuda.Device(self.gpu_id):
            cupy.asarray(12345)
            print(f"Rank {self.rank} on {platform.node()} "
                  f"using GPU {self.gpu_id}")

if __name__ == "__main__":
    gpu_ids = range(cuda.runtime.getDeviceCount())
    workers = [Worker(gpu_id, RANK) for gpu_id in gpu_ids]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()

Import guard

The import statement from mpi4py import MPI will trigger initialisation behind the scenes for MPI to work. We have to ensure this import statement is only done by the parent process. Thus any processes created by multiprocessing should not use any mpi4py code, including importing it. To do this, I suggest an import guard, shown below, to ensure only the parent process imports mpi4py.

if __name__ == "__main__":
    from mpi4py import MPI

We placed the guard at the top of the code so that import statements remain at the top of the code.

The above example code is very similar to the example code shown in the multiprocessing example.

Continuing with Sulis and Slurm, we can allocate one entire node per MPI process:

  • 1 process (or tasks) per node
  • 128 CPU cores per process
  • 3.85 GB per CPU core

This can be put in a job script and requesting, for example, 2 nodes.

#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=128
#SBATCH --mem-per-cpu=3850
#SBATCH --gres=gpu:ampere_a100:3
#SBATCH --partition=gpu
#SBATCH --time=00:10:00
#SBATCH --account=su008

module purge
module load GCC/10.2.0
module load CUDA/11.1.1
module load OpenMPI/4.0.5
module load CuPy/8.5.0

srun python mpi_per_node.py

An example output of the job is

Rank 1 on gpu009.sulis.hpc using GPU 1
Rank 1 on gpu009.sulis.hpc using GPU 2
Rank 1 on gpu009.sulis.hpc using GPU 0
Rank 0 on gpu008.sulis.hpc using GPU 2
Rank 0 on gpu008.sulis.hpc using GPU 0
Rank 0 on gpu008.sulis.hpc using GPU 1

which verifies that there is a MPI process per node, each using all GPUs on the node.

Similarly, on Apocrita, the bash script may look something like this

#!/bin/bash
#$ -l h_rt=240:0:0
#$ -pe parallel 96
#$ -l gpu=4
#$ -l gpu_type=ampere
#$ -cwd
#$ -j y
#$ -q test.q

module load intelmpi/2022.2
module load cuda/12.0.0
module load python/3.10.7

source ../venv/bin/activate
mpirun -rr -n 2 python mpi_per_node.py

The script mpi_per_node.py works both on Apocrita and Sulis without change, so it is independent of the allocation rule.

Discussion

We've covered four methods to use multiple nodes of GPUs:

  • Array jobs
  • multiprocessing
  • MPI (using mpi4py with an MPI process per GPU)
  • Hybrid MPI+multiprocessing (using mpi4py with an MPI process per node)

I believe that there is a use case for these different methods, depending on whether your problem is more suited to be split per node or GPU. I also think it's sensible to choose a method based on your preferred style of programming.

For example, when writing code suitable for array jobs, you want your code to run given a unique parameter for each job. On Apocrita, that unique parameter is your $SGE_TASK_ID. Similarly, for MPI, each MPI process has a unique RANK and runs code given that RANK. This could be described as encapsulating your one process in one script. As we saw in the MPI example, the MPI approach seems simpler. Given a RANK, work out what GPU to use and use it. It is also easily scalable to any number of GPUs and nodes you give it.

On the other hand with multiprocessing and the hybrid approach, when writing code, you are responsible for handling each process on a node in your code. This can be unwieldy but very useful if you want more fine-grained control over your processes. For example, if you want to run additional CPU processes while the GPU processes are running.

The hybrid approach is also appropriate when you already have existing multiprocessing code and want to extend it to use multiple nodes. As we saw in the example with multiprocessing and hybrid code, they are very similar.

With multiprocessing and the hybrid approach, you also have the opportunity to work in a shared memory paradigm. However, with Python's GIL, I personally find this is quite rare and can be difficult to work with.

As we saw in the MPI example, do note that schedulers may allocate processes differently which may affect your code. The hybrid approach works well here as it is independent of the allocation rule.

There are many more ways to use multiple nodes of GPUs we haven't touched on, for example, Dask and PyTorch's DistributedDataParallel. But I think the tools shown in this blog should be a good starting point to assess what tools would be suitable for you to start using multiple nodes of GPUs.