MPI Apps
Note
Parsl’s support for MPI Apps described below is pending release.
Please use the mpi_experimental_3
branch to use the functionality
described in this document. To install directly from github:
>> pip install git+https://github.com/Parsl/parsl.git@mpi_experimental_3
MPI applications run multiple copies of a program that complete a single task by
coordinating using messages passed within or across nodes.
Starting MPI application requires invoking a “launcher” code (e.g., mpiexec
) from one node
with options that define how the copies of a program should be distributed to others.
Parsl simplifies this by composing the “launcher” command from the resources specified at the time
each app is invoked.
The broad strokes of a complete solution involves the following components:
- Configuring the
HighThroughputExecutor
with: enable_mpi_mode=True
- Configuring the
Specify an MPI Launcher from one of the supported launchers (“aprun”, “srun”, “mpiexec”) for the
HighThroughputExecutor
with:mpi_launcher="srun"
Specify the provider that matches your cluster, (eg. user
SlurmProvider
for Slurm clusters)Set the non-mpi launcher to
SingleNodeLauncher
Specify resources required by the application via
resource_specification
as shown below:
# Define HighThroughputExecutor(enable_mpi_mode=True, mpi_launcher="mpiexec", ...)
@bash_app
def lammps_mpi_application(infile: File, parsl_resource_specification: Dict):
# PARSL_MPI_PREFIX will resolve to `mpiexec -n 4 -ppn 2 -hosts NODE001,NODE002`
return f"$PARSL_MPI_PREFIX lmp_mpi -in {infile.filepath}"
# Resources in terms of nodes and how ranks are to be distributed are set on a per app
# basis via the resource_spec dictionary.
resource_spec = {
"num_nodes" = 2,
"ranks_per_node" = 2,
"num_ranks" = 4,
}
future = lammps_mpi_application(File('in.file'), resource_specification=resource_spec)
HTEX and MPI Tasks
The HighThroughputExecutor
(HTEX) is the
default executor available through Parsl.
Parsl Apps which invoke MPI code require MPI specific configuration such that:
All workers are started on the lead-node (mom-node in case of Crays)
Resource requirements of Apps are propagated to workers who provision the required number of nodes from within the batch job.
Configuring the Provider
Parsl must be configured to deploy workers on exactly one node per block. This part is
simple. Instead of defining a launcher which will place an executor on each node in the
block, simply use the SingleNodeLauncher
.
The MPI Launcher that the application will use is to be specified via HighThroughputExecutor(mpi_launcher="LAUNCHER")
It is also necessary to specify the desired number of blocks for the executor.
Parsl cannot determine the number of blocks needed to run a set of MPI Tasks,
so they must bet set explicitly (see Issue #1647).
The easiest route is to set the max_blocks
and min_blocks
of the provider
to the desired number of blocks.
Configuring the Executor
Here are the steps for configuring the executor:
Set
HighThroughputExecutor(enable_mpi_mode=True)
Set
HighThroughputExecutor(mpi_launcher="LAUNCHER")
to one from (“srun”, “aprun”, “mpiexec”)Set the
max_workers
to the number of MPI Apps you expect to run per scheduler job (block).Set
cores_per_worker=1e-6
to prevent HTEx from reducing the number of workers if you request more workers than cores.
Example Configuration
Here’s an example configuration which runs MPI tasks on ALCF’s Polaris Supercomputer
import parsl
from typing import Dict
from parsl.config import Config
# PBSPro is the right provider for Polaris:
from parsl.providers import PBSProProvider
# The high throughput executor is for scaling to HPC systems:
from parsl.executors import HighThroughputExecutor
# address_by_interface is needed for the HighThroughputExecutor:
from parsl.addresses import address_by_interface
# For checkpointing:
from parsl.utils import get_all_checkpoints
# Adjust your user-specific options here:
# run_dir="/lus/grand/projects/yourproject/yourrundir/"
user_opts = {
"worker_init": "module load conda; conda activate parsl_mpi_py310",
"scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" ,
"account": SET_YOUR_ALCF_ALLOCATION_HERE,
"queue": "debug-scaling",
"walltime": "1:00:00",
"nodes_per_block": 8,
"available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU
"cores_per_worker": 8, # this will set the number of cpu hardware threads per worker.
}
config = Config(
executors=[
HighThroughputExecutor(
label="htex",
enable_mpi_mode=True,
mpi_launcher="mpiexec",
cores_per_worker=user_opts["cores_per_worker"],
address=address_by_interface("bond0"),
provider=PBSProProvider(
account=user_opts["account"],
queue=user_opts["queue"],
# PBS directives (header lines): for array jobs pass '-J' option
scheduler_options=user_opts["scheduler_options"],
# Command to be run before starting a worker, such as:
worker_init=user_opts["worker_init"],
# number of compute nodes allocated for each block
nodes_per_block=user_opts["nodes_per_block"],
init_blocks=1,
min_blocks=0,
max_blocks=1, # Can increase more to have more parallel jobs
walltime=user_opts["walltime"]
),
),
],
Writing MPI-Compatible Apps
In MPI mode, the HighThroughputExecutor
can execute both Python or Bash Apps which invokes the MPI application.
However, it is important to not that Python Apps that directly use mpi4py
is not supported.
For multi-node MPI applications, especially when running multiple applications within a single batch job,
it is important to specify the resource requirements for the app so that the Parsl worker can provision
the appropriate resources before the application starts. For eg, your Parsl script might contain a molecular
dynamics application that requires 8 ranks over 1 node for certain inputs and 32 ranks over 4 nodes for some
depending on the size of the molecules being simulated. By specifying resources via resource_specification
,
parsl workers will provision the requested resources and then compose MPI launch command prefixes
(Eg: mpiexec -n <ranks> -ppn <ranks_per_node> -hosts <node1..nodeN>
). These launch command prefixes are
shared with the app via environment variables.
@bash_app
def echo_hello(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict):
return f'$PARSL_MPI_PREFIX hostname'
# The following app will echo the hostname from several MPI ranks
# Alternatively, you could also use the resource_specification to compose a launch
# command using env vars set by Parsl from the resource_specification like this:
@bash_app
def echo_hostname(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict):
total_ranks = os.environ("")
return f'aprun -N $PARSL_RANKS_PER_NODE -n {total_ranks} /bin/hostname'
All valid key-value pairs set in the resource_specification are exported to the application via env vars,
for eg. parsl_resource_specification = {'RANKS_PER_NODE': 4} `` will set the env var ``PARSL_RANKS_PER_NODE
However, the following options are required for MPI applications :
resource_specification = {
'num_nodes': <int>, # Number of nodes required for the application instance
'ranks_per_node': <int>, # Number of ranks / application elements to be launched per node
'num_ranks': <int>, # Number of ranks in total
}
# The above are made available in the worker env vars:
# echo $PARSL_NUM_NODES, $PARSL_RANKS_PER_NODE, $PARSL_NUM_RANKS
When the above are supplied, the following launch command prefixes are set:
PARSL_MPIEXEC_PREFIX: mpiexec launch command which works for a large number of batch systems especially PBS systems
PARSL_SRUN_PREFIX: srun launch command for Slurm based clusters
PARSL_APRUN_PREFIX: aprun launch command prefix for some Cray machines
PARSL_MPI_PREFIX: Parsl sets the MPI prefix to match the mpi_launcher specified to `HighThroughputExecutor`
PARSL_MPI_NODELIST: List of assigned nodes separated by commas (Eg, NODE1,NODE2)
PARSL_WORKER_POOL_ID: Alphanumeric string identifier for the worker pool
PARSL_WORKER_BLOCK_ID: Batch job ID that the worker belongs to
Example Application: CosmicTagger
TODO: Blurb about what CosmicTagger does CosmicTagger implements models and training utilities to train convolutional networks to separate cosmic pixels, background pixels, and neutrino pixels in a neutrinos dataset. There are several variations. A detailed description of the code can be found in:
Cosmic Background Removal with Deep Neural Networks in SBND
Cosmic Background Removal with Deep Neural Networks in SBND This network is implemented in both PyTorch and TensorFlow. To select between the networks, you can use the –framework parameter. It accepts either tensorflow or torch. The model is available in a development version with sparse convolutions in the torch framework.
This example is broken down into three components. First, configure the Executor for Polaris at
ALCF. The configuration will use the PBSProProvider
to connect to the batch scheduler.
With the goal of running MPI applications, we set the
import parsl
from typing import Dict
from parsl.config import Config
# PBSPro is the right provider for Polaris:
from parsl.providers import PBSProProvider
# The high throughput executor is for scaling to HPC systems:
from parsl.executors import HighThroughputExecutor
# address_by_interface is needed for the HighThroughputExecutor:
from parsl.addresses import address_by_interface
user_opts = {
# Make sure to setup a conda environment before using this config
"worker_init": "module load conda; conda activate parsl_mpi_py310",
"scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" ,
"account": <SET_YOUR_ALLOCATION>,
"queue": "debug-scaling",
"walltime": "1:00:00",
"nodes_per_block": 8,
"available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU
"cores_per_worker": 8, # this will set the number of cpu hardware threads per worker.
}
config = Config(
executors=[
HighThroughputExecutor(
label="htex",
enable_mpi_mode=True,
mpi_launcher="mpiexec",
cores_per_worker=user_opts["cores_per_worker"],
address=address_by_interface("bond0"),
provider=PBSProProvider(
account=user_opts["account"],
queue=user_opts["queue"],
# PBS directives (header lines): for array jobs pass '-J' option
scheduler_options=user_opts["scheduler_options"],
# Command to be run before starting a worker, such as:
worker_init=user_opts["worker_init"],
# number of compute nodes allocated for each block
nodes_per_block=user_opts["nodes_per_block"],
init_blocks=1,
min_blocks=0,
max_blocks=1, # Can increase more to have more parallel jobs
walltime=user_opts["walltime"]
),
),
],
)
Next we define the CosmicTagger MPI application. TODO: Ask Khalid for help.
@parsl.bash_app
def cosmic_tagger(workdir: str,
datatype: str = "float32",
batchsize: int = 8,
framework: str = "torch",
iterations: int = 500,
trial: int = 2,
stdout=parsl.AUTO_LOGNAME,
stderr=parsl.AUTO_LOGNAME,
parsl_resource_specification:Dict={}):
NRANKS = parsl_resource_specification['num_ranks']
return f"""
module purge
module use /soft/modulefiles/
module load conda/2023-10-04
conda activate
echo "PARSL_MPI_PREFIX : $PARSL_MPI_PREFIX"
$PARSL_MPI_PREFIX --cpu-bind numa \
python {workdir}/bin/exec.py --config-name a21 \
run.id=run_plrs_ParslDemo_g${NRANKS}_{datatype}_b{batchsize}_{framework}_i{iterations}_T{trial} \
run.compute_mode=GPU \
run.distributed=True \
framework={framework} \
run.minibatch_size={batchsize} \
run.precision={datatype} \
mode.optimizer.loss_balance_scheme=light \
run.iterations={iterations}
"""
In this example, we run a simple test that does an exploration over the batchsize
parameter
while launching the application over 2-4 nodes.
def run_cosmic_tagger():
futures = {}
for num_nodes in [2, 4]:
for batchsize in [2, 4, 8]:
parsl_res_spec = {"num_nodes": num_nodes,
"num_tasks": num_nodes * 4,
"ranks_per_node": 4}
future = cosmic_tagger(workdir="/home/yadunand/CosmicTagger",
datatype="float32",
batchsize=str(batchsize),
parsl_resource_specification=parsl_res_spec)
print(f"Stdout : {future.stdout}")
print(f"Stderr : {future.stderr}")
futures[(num_nodes, batchsize)] = future
for key in futures:
print(f"Got result for {key}: {futures[key].result()}")
run_cosmic_tagger()
Limitations
Support for MPI tasks in HTEX is limited. It is designed for running many multi-node MPI applications within a single batch job.
MPI tasks may not span across nodes from more than one block.
Parsl does not correctly determine the number of execution slots per block (Issue #1647)
The executor uses a Python process per task, which can use a lot of memory (Issue #2264)