Configuration

Parsl workflows are developed completely independently from their execution environment. There are very many different execution environments in which Parsl programs and their apps can run, and many of these environments have multiple options of how those Parsl programs and apps run, which makes configuration somewhat complex, and also makes determining how to set up Parsl’s configuration for a particular set of choices fairly complex, though we think the actual configuration itself is reasonable simple.

Parsl offers an extensible configuration model through which the execution environment and communication within that environment is configured. Parsl is configured using Config object. For more information, see the Config class documentation. The following shows how the configuration can be specified.

import parsl
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor

config = Config(
    executors=[ThreadPoolExecutor()],
    lazy_errors=True
)
parsl.load(config)

Note

Please note that all configuration examples below require customization for your account, allocation, Python environment, etc.

How to Configure

The configuration provided to Parsl tells Parsl what resources to use to run the Parsl program and apps, and how to use them. Therefore it is important to carefully evaluate certain aspects of the Parsl program and apps, and the planned compute resources, to determine an ideal configuration match. These aspects are: 1) where the Parsl apps will execute; 2) how many nodes will be used to execute the apps, and how long the apps will run; 3) should the scheduler allocate multiple nodes at one time; and 4) where will the main parsl program run and how will it communicate with the apps.

Stepping through the following question should help you formulate a suitable configuration. In addition, examples for some specific configurations follow.

  1. Where would you like the apps in the Parsl program to run?
Target Executor Provider
Laptop/Workstation LocalProvider
Amazon Web Services AWSProvider
Google Cloud GoogleCloudProvider
Slurm based cluster or supercomputer SlurmProvider
Torque/PBS based cluster or supercomputer TorqueProvider
Cobalt based cluster or supercomputer CobaltProvider
GridEngine based cluster or grid GridEngineProvider
Condor based cluster or grid CondorProvider
Kubernetes cluster KubernetesProvider
  1. How many nodes will you use to run them? What task durations give good performance on different executors?
Executor Number of Nodes [*] Task duration for good performance
ThreadPoolExecutor 1 (Only local) Any
LowLatencyExecutor <=10 10ms+
IPyParallelExecutor <=128 50ms+
HighThroughputExecutor <=2000
Task duration(s)/#nodes >= 0.01

longer tasks needed at higher scale

ExtremeScaleExecutor >1000, <=8000 [†] >minutes
WorkQueueExecutor <=20000 [‡] 10s+
[*]We assume that each node has 32 workers. If there are fewer workers launched per node, a higher number of nodes could be supported.
[†]8000 nodes with 32 workers each totalling 256000 workers is the maximum scale at which we’ve tested the ExtremeScaleExecutor.
[‡]The maximum number of nodes tested for the WorkQueueExecutor is 10000 GPU cores and 20000 CPU cores.

Warning

IPyParallelExecutor will be deprecated as of Parsl v0.8.0, with HighThroughputExecutor as the recommended replacement.

3. If you are running on a cluster or supercomputer, will you request multiple nodes per batch (scheduler) job? (Here we use the term block to be equivalent to a batch job.)

nodes_per_block = 1
Provider Executor choice Suitable Launchers
Systems that don’t use Aprun Any
Aprun based systems Any
nodes_per_block > 1
Provider Executor choice Suitable Launchers
TorqueProvider Any
CobaltProvider Any
SlurmProvider Any

Note

If you are on a Cray system, you most likely need the AprunLauncher to launch workers unless you are on a native Slurm system like Cori (NERSC)

4. Where will you run the main Parsl program, given that you already have determined where the apps will run? (This is needed to determine how to communicate between the Parsl program and the apps.)

Parsl program location App execution target Suitable channel
Laptop/Workstation Laptop/Workstation LocalChannel
Laptop/Workstation Cloud Resources None
Laptop/Workstation Clusters with no 2FA SSHChannel
Laptop/Workstation Clusters with 2FA SSHInteractiveLoginChannel
Login node Cluster/Supercomputer LocalChannel

Comet (SDSC)

https://ucsdnews.ucsd.edu/news_uploads/comet-logo.jpg

The following snippet shows an example configuration for executing remotely on San Diego Supercomputer Center’s Comet supercomputer. The example is designed to be executed on the login nodes, using the SlurmProvider to interface with the Slurm scheduler used by Comet and the SrunLauncher to launch workers.

Warning

This config has NOT been tested with Parsl v0.9.0

from parsl.config import Config
from parsl.launchers import SrunLauncher
from parsl.providers import SlurmProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query


config = Config(
    executors=[
        HighThroughputExecutor(
            label='Comet_HTEX_multinode',
            address=address_by_query(),
            worker_logdir_root='YOUR_LOGDIR_ON_COMET',
            max_workers=2,
            provider=SlurmProvider(
                'debug',
                launcher=SrunLauncher(),
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler
                scheduler_options='',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                walltime='00:10:00',
                init_blocks=1,
                max_blocks=1,
                nodes_per_block=2,
            ),
        )
    ]
)

Cori (NERSC)

https://6lli539m39y3hpkelqsm3c2fg-wpengine.netdna-ssl.com/wp-content/uploads/2017/08/Cori-NERSC.png

The following snippet shows an example configuration for accessing NERSC’s Cori supercomputer. This example uses the HighThroughputExecutor and connects to Cori’s Slurm scheduler. It is configured to request 2 nodes configured with 1 TaskBlock per node. Finally it includes override information to request a particular node type (Haswell) and to configure a specific Python environment on the worker nodes using Anaconda.

from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_interface


config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_multinode',
            # This is the network interface on the login node to
            # which compute nodes can communicate
            address=address_by_interface('bond0.144'),
            cores_per_worker=2,
            provider=SlurmProvider(
                'regular',  # Partition / QOS
                nodes_per_block=2,
                init_blocks=1,
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=knl,quad,cache'
                scheduler_options='',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                # We request all hyperthreads on a node.
                launcher=SrunLauncher(overrides='-c 272'),
                walltime='00:10:00',
                # Slurm scheduler on Cori can be slow at times,
                # increase the command timeouts
                cmd_timeout=120,
            ),
        )
    ]
)

Stampede2 (TACC)

https://www.tacc.utexas.edu/documents/1084364/1413880/stampede2-0717.jpg/

The following snippet shows an example configuration for accessing TACC’s Stampede2 supercomputer. This example uses theHighThroughput executor and connects to Stampede2’s Slurm scheduler.

from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.data_provider.globus import GlobusStaging


config = Config(
    executors=[
        HighThroughputExecutor(
            label='Stampede2_HTEX',
            address=address_by_hostname(),
            max_workers=2,
            provider=SlurmProvider(
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                partition='YOUR_PARTITION',
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=knl,quad,cache'
                scheduler_options='',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                launcher=SrunLauncher(),
                walltime='00:30:00'
            ),
            storage_access=[GlobusStaging(
                endpoint_uuid='ceea5ca0-89a9-11e7-a97f-22000a92523b',
                endpoint_path='/',
                local_path='/'
            )]
        )

    ],
)

Frontera (TACC)

https://fronteraweb.tacc.utexas.edu/media/filer_public/e2/66/e266466f-502e-4bfe-92d6-3634d697ed99/frontera-home.jpg

Deployed in June 2019, Frontera is the 5th most powerful supercomputer in the world. Frontera replaces the NSF Blue Waters system at NCSA and is the first deployment in the National Science Foundation’s petascale computing program. The configuration below assumes that the user is running on a login node and uses the SlurmProvider to interface with the scheduler, and uses the SrunLauncher to launch workers.

from parsl.config import Config
from parsl.channels import LocalChannel
from parsl.providers import SlurmProvider
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_hostname


""" This config assumes that it is used to launch parsl tasks from the login nodes
of Frontera at TACC. Each job submitted to the scheduler will request 2 nodes for 10 minutes.
"""
config = Config(
    executors=[
        HighThroughputExecutor(
            label="frontera_htex",
            address=address_by_hostname(),
            max_workers=1,          # Set number of workers per node
            provider=SlurmProvider(
                cmd_timeout=60,     # Add extra time for slow scheduler responses
                channel=LocalChannel(),
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                partition='normal',                                 # Replace with partition name
                scheduler_options='#SBATCH -A <YOUR_ALLOCATION>',   # Enter scheduler_options if needed

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',

                # Ideally we set the walltime to the longest supported walltime.
                walltime='00:10:00',
                launcher=SrunLauncher(),
            ),
        )
    ],
)

Theta (ALCF)

https://www.alcf.anl.gov/files/ALCF-Theta_111016-1000px.jpg

The following snippet shows an example configuration for executing on Argonne Leadership Computing Facility’s Theta supercomputer. This example uses the HighThroughputExecutor and connects to Theta’s Cobalt scheduler using the CobaltProvider. This configuration assumes that the script is being executed on the login nodes of Theta.

from parsl.config import Config
from parsl.providers import CobaltProvider
from parsl.launchers import AprunLauncher
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname


config = Config(
    executors=[
        HighThroughputExecutor(
            label='theta_local_htex_multinode',
            max_workers=4,
            address=address_by_hostname(),
            provider=CobaltProvider(
                queue='YOUR_QUEUE',
                account='YOUR_ACCOUNT',
                launcher=AprunLauncher(overrides="-d 64"),
                walltime='00:30:00',
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                # string to prepend to #COBALT blocks in the submit
                # script to the scheduler eg: '#COBALT -t 50'
                scheduler_options='',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                cmd_timeout=120,
            ),
        )
    ],
)

Cooley (ALCF)

https://today.anl.gov/wp-content/uploads/sites/44/2015/06/Cray-Cooley.jpg

The following snippet shows an example configuration for executing on Argonne Leadership Computing Facility’s Cooley analysis and visualization system. The example uses the HighThroughputExecutor and connects to Cooley’s Cobalt scheduler using the CobaltProvider. This configuration assumes that the script is being executed on the login nodes of Theta.

from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.launchers import MpiRunLauncher
from parsl.providers import CobaltProvider


config = Config(
    executors=[
        HighThroughputExecutor(
            label="cooley_htex",
            worker_debug=False,
            cores_per_worker=1,
            address=address_by_hostname(),
            provider=CobaltProvider(
                queue='debug',
                account='YOUR_ACCOUNT',  # project name to submit the job
                launcher=MpiRunLauncher(),
                scheduler_options='',  # string to prepend to #COBALT blocks in the submit script to the scheduler
                worker_init='',  # command to run before starting a worker, such as 'source activate env'
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                nodes_per_block=4,
                cmd_timeout=60,
                walltime='00:10:00',
            ),
        )
    ],

)

Blue Waters (Cray)

https://www.cray.com/sites/default/files/images/Solutions_Images/bluewaters.png

The following snippet shows an example configuration for executing remotely on Blue Waters, a flagship machine at the National Center for Supercomputing Applications. The configuration assumes the user is running on a login node and uses the TorqueProvider to interface with the scheduler, and uses the AprunLauncher to launch workers.

from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.launchers import AprunLauncher
from parsl.providers import TorqueProvider


config = Config(
    executors=[
        HighThroughputExecutor(
            label="bw_htex",
            cores_per_worker=1,
            worker_debug=False,
            address=address_by_hostname(),
            provider=TorqueProvider(
                queue='normal',
                launcher=AprunLauncher(overrides="-b -- bwpy-environ --"),
                scheduler_options='',  # string to prepend to #SBATCH blocks in the submit script to the scheduler
                worker_init='',  # command to run before starting a worker, such as 'source activate env'
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                nodes_per_block=2,
                walltime='00:10:00'
            ),
        )

    ],

)

Summit (ORNL)

https://www.olcf.ornl.gov/wp-content/uploads/2018/06/Summit_Exaop-1500x844.jpg

The following snippet shows an example configuration for executing from the login node on Summit, the leadership class supercomputer hosted at the Oak Ridge National Laboratory. The example uses the LSFProvider to provision compute nodes from the LSF cluster scheduler and the JsrunLauncher to launch workers across the compute nodes.

from parsl.config import Config
from parsl.executors import HighThroughputExecutor

from parsl.launchers import JsrunLauncher
from parsl.providers import LSFProvider

from parsl.addresses import address_by_interface

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Summit_HTEX',
            # On Summit ensure that the working dir is writeable from the compute nodes,
            # for eg. paths below /gpfs/alpine/world-shared/
            working_dir='YOUR_WORKING_DIR_ON_SHARED_FS',
            address=address_by_interface('ib0'),  # This assumes Parsl is running on login node
            worker_port_range=(50000, 55000),
            provider=LSFProvider(
                launcher=JsrunLauncher(),
                walltime="00:10:00",
                nodes_per_block=2,
                init_blocks=1,
                max_blocks=1,
                worker_init='',  # Input your worker environment initialization commands
                project='YOUR_PROJECT_ALLOCATION',
                cmd_timeout=60
            ),
        )

    ],
)

CC-IN2P3

https://cc.in2p3.fr/wp-content/uploads/2017/03/bandeau_accueil.jpg

The snippet below shows an example configuration for executing from a login node on IN2P3’s Computing Centre. The configuration uses the LocalProvider to run on a login node primarily to avoid GSISSH, which Parsl does not support yet. This system uses Grid Engine which Parsl interfaces with using the GridEngineProvider.

from parsl.config import Config
from parsl.channels import LocalChannel
from parsl.providers import GridEngineProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query

config = Config(
    executors=[
        HighThroughputExecutor(
            label='cc_in2p3_htex',
            address=address_by_query(),
            max_workers=2,
            provider=GridEngineProvider(
                channel=LocalChannel(),
                nodes_per_block=1,
                init_blocks=2,
                max_blocks=2,
                walltime="00:20:00",
                scheduler_options='',     # Input your scheduler_options if needed
                worker_init='',     # Input your worker_init if needed
            ),
        )
    ],
)

Midway (RCC, UChicago)

https://rcc.uchicago.edu/sites/rcc.uchicago.edu/files/styles/slideshow-image/public/uploads/images/slideshows/20140430_RCC_8978.jpg?itok=BmRuJ-wq

This Midway cluster is a campus cluster hosted by the Research Computing Center at the University of Chicago. The snippet below shows an example configuration for executing remotely on Midway. The configuration assumes the user is running on a login node and uses the SlurmProvider to interface with the scheduler, and uses the SrunLauncher to launch workers.

from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_hostname
from parsl.executors import HighThroughputExecutor

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Midway_HTEX_multinode',
            worker_debug=False,
            address=address_by_hostname(),
            max_workers=2,
            provider=SlurmProvider(
                'YOUR_PARTITION',  # Partition name, e.g 'broadwl'
                launcher=SrunLauncher(),
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=knl,quad,cache'
                scheduler_options='',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                walltime='00:30:00'
            ),
        )
    ],
)

Open Science Grid

https://hcc-docs.unl.edu/download/attachments/11635314/Screen%20Shot%202013-03-19%20at%202.19.28%20PM.png?version=1&modificationDate=1492720049000&api=v2

The Open Science Grid (OSG) is a national, distributed computing Grid spanning over 100 individual sites to provide tens of thousands of CPU cores. The snippet below shows an example configuration for executing remotely on OSG. The configuration uses the CondorProvider to interface with the scheduler.

Note

This config was last tested with 0.8.0

from parsl.config import Config
from parsl.providers import CondorProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query

config = Config(
    executors=[
        HighThroughputExecutor(
            label='OSG_HTEX',
            address=address_by_query(),
            max_workers=1,
            provider=CondorProvider(
                nodes_per_block=1,
                init_blocks=4,
                max_blocks=4,
                # This scheduler option string ensures that the compute nodes provisioned
                # will have modules
                scheduler_options='Requirements = OSGVO_OS_STRING == "RHEL 6" && Arch == "X86_64" &&  HAS_MODULES == True',
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                walltime="00:20:00",
            ),
        )
    ]
)

Amazon Web Services

../_images/aws_image.png

Note

Please note that boto3 library is a requirement to use AWS with Parsl. This can be installed via python3 -m pip install parsl[aws]

Amazon Web Services is a commercial cloud service which allows you to rent a range of computers and other computing services. The snippet below shows an example configuration for provisioning nodes from the Elastic Compute Cloud (EC2) service. The first run would configure a Virtual Private Cloud and other networking and security infrastructure that will be re-used in subsequent runs. The configuration uses the AWSProvider to connect to AWS.

from parsl.config import Config
from parsl.providers import AWSProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query

config = Config(
    executors=[
        HighThroughputExecutor(
            label='ec2_single_node',
            address=address_by_query(),
            provider=AWSProvider(
                # Specify your EC2 AMI id
                'YOUR_AMI_ID',
                # Specify the AWS region to provision from
                # eg. us-east-1
                region='YOUR_AWS_REGION',

                # Specify the name of the key to allow ssh access to nodes
                key_name='YOUR_KEY_NAME',
                profile="default",
                state_file='awsproviderstate.json',
                nodes_per_block=1,
                init_blocks=1,
                max_blocks=1,
                min_blocks=0,
                walltime='01:00:00',
            ),
        )
    ],
)

Kubernetes Clusters

https://d1.awsstatic.com/PAC/kuberneteslogo.eabc6359f48c8e30b7a138c18177f3fd39338e05.png

Kubernetes is an open-source system for container management, such as automating deployment and scaling of containers. The snippet below shows an example configuration for deploying pods as workers on a Kubernetes cluster. The KubernetesProvider exploits the Python Kubernetes API, which assumes that you have kube config in ~/.kube/config.

from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import KubernetesProvider
from parsl.addresses import address_by_route


config = Config(
    executors=[
        HighThroughputExecutor(
            label='kube-htex',
            cores_per_worker=1,
            max_workers=1,
            worker_logdir_root='YOUR_WORK_DIR',

            # Address for the pod worker to connect back
            address=address_by_route(),
            provider=KubernetesProvider(
                namespace="default",

                # Docker image url to use for pods
                image='YOUR_DOCKER_URL',

                # Command to be run upon pod start, such as:
                # 'module load Anaconda; source activate parsl_env'.
                # or 'pip install parsl'
                worker_init='',

                # The secret key to download the image
                secret="YOUR_KUBE_SECRET",

                # Should follow the Kubernetes naming rules
                pod_name='YOUR-POD-Name',

                nodes_per_block=1,
                init_blocks=1,
                # Maximum number of pods to scale up
                max_blocks=10,
            ),
        ),
    ]
)

Ad-Hoc Clusters

Any collection of compute nodes without a scheduler setup for task scheduling can be considered an ad-hoc cluster. Often these machines have a shared filesystem such as NFS or Lustre. In order to use these resources with Parsl, they need to set-up for password-less SSH access.

To use these ssh-accessible collection of nodes as an ad-hoc cluster, we create an executor for each node, using the LocalProvider with SSHChannel to identify the node by hostname. An example configuration follows.

from parsl.providers import AdHocProvider
from parsl.channels import SSHChannel
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query
from parsl.config import Config

user_opts = {'adhoc':
             {'username': 'YOUR_USERNAME',
              'script_dir': 'YOUR_SCRIPT_DIR',
              'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
             }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            label='remote_htex',
            max_workers=2,
            address=address_by_query(),
            worker_logdir_root=user_opts['adhoc']['script_dir'],
            provider=AdHocProvider(
                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init='',
                channels=[SSHChannel(hostname=m,
                                     username=user_opts['adhoc']['username'],
                                     script_dir=user_opts['adhoc']['script_dir'],
                ) for m in user_opts['adhoc']['remote_hostnames']]
            )
        )
    ],
    #  AdHoc Clusters should not be setup with scaling strategy.
    strategy=None,
)

Note

Multiple blocks should not be assigned to each node when using the HighThroughputExecutor

Note

Load-balancing will not work properly with this approach. In future work, a dedicated provider that supports load-balancing will be implemented. You can follow progress on this work here.

Work Queue (CCL ND)

http://ccl.cse.nd.edu/software/workqueue/WorkQueueLogoSmall.png

The following snippet shows an example configuration for using the Work Queue distributed framework to run applications on remote machines at large. This examples uses the WorkQueueExecutor to schedule tasks locally, and assumes that Work Queue workers have been externally connected to the master using the work_queue_worker or condor_submit_workers command line utilities from CCTools. For more information the process of submitting tasks and workers to Work Queue, please refer to the CCTools Work Queue documentation.

from parsl.config import Config
from parsl.executors import WorkQueueExecutor

config = Config(
    executors=[
        WorkQueueExecutor(
            label="wqex_local",
            port=50055,
            project_name="WorkQueue Example",
            shared_fs=True,
            see_worker_output=True
        )
    ]
)

To utilize Work Queue with Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment (instructions for installing Miniconda can be found here):

This creates a Conda environment on your machine with all the necessary tools and setup needed to utilize Work Queue with the Parsl library.

Further help

For help constructing a configuration, you can click on class names such as Config or HighThroughputExecutor to see the associated class documentation. The same documentation can be accessed interactively at the python command line via, for example:

>>> from parsl.config import Config
>>> help(Config)