parsl.executors.MPIExecutor

class parsl.executors.MPIExecutor(label: str = 'MPIExecutor', provider: ExecutionProvider = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: str | None = None, interchange_launch_cmd: str | None = None, address: str | None = None, loopback_address: str = '127.0.0.1', worker_ports: Tuple[int, int] | None = None, worker_port_range: Tuple[int, int] | None = (54000, 55000), interchange_port_range: Tuple[int, int] | None = (55000, 56000), storage_access: List[Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, max_workers_per_block: int = 1, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, mpi_launcher: str = 'mpiexec', block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] = True, encrypted: bool = False)[source]

A version of HighThroughputExecutor tuned for executing multi-node (e.g., MPI) tasks.

The Provider _must_ use the SimpleLauncher, which places a single pool of workers on the first node of a block. Each worker can then make system calls which use an MPI launcher (e.g., mpirun, srun) to spawn multi-node tasks.

Specify the maximum number of multi-node tasks to run at once using max_workers_per_block. The value should be less than or equal to the nodes_per_block in the Provider.

Parameters:
  • max_workers_per_block (int) – Maximum number of MPI applications to run at once per block

  • mpi_launcher (str) – Select one from the list of supported MPI launchers: (“srun”, “aprun”, “mpiexec”). default: “mpiexec”

  • provider (ExecutionProvider) –

    Provider to access computation resources. Can be one of EC2Provider,

    Condor, GoogleCloud, GridEngine, Local, GridEngine, Slurm, or Torque.

  • label (str) – Label for this executor instance.

  • launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”

  • interchange_launch_cmd (Sequence[str]) – Custom sequence of command line tokens to launch the interchange process from the executor. If undefined, the executor will use the default “interchange.py” command.

  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This field expects an IPv4 or IPv6 address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. This field can be used to limit the executor to listen only on a specific interface, and limiting connections to the internal network. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. default=None

  • loopback_address (string) – Specify address used for internal communication between executor and interchange. Supports IPv4 and IPv6 addresses default=127.0.0.1

  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.

  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.

  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.

  • working_dir (str) – Working dir to be used by the executor.

  • worker_debug (Bool) – Enables worker debug logging.

  • prefetch_capacity (int) – Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0.

  • address_probe_timeout (int | None) – Managers attempt connecting over many different addresses to determine a viable address. This option sets a time limit in seconds on the connection attempt. Default of None implies 30s timeout set on worker.

  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default: 120s

  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default: 30s

  • poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms

  • drain_period (int) – The number of seconds after start when workers will begin to drain and then exit. Set this to a time that is slightly less than the maximum walltime of batch jobs to avoid killing tasks while they execute. For example, you could set this to the walltime minus a grace period for the batch job to start the workers, minus the expected maximum length of an individual task.

  • worker_logdir_root (string) – In case of a remote file system, specify the path to where logs will be kept.

  • encrypted (bool) – Flag to enable/disable encryption (CurveZMQ). Default is False.

  • manager_selector (ManagerSelector) – Determines what strategy the interchange uses to select managers during task distribution. See API reference under “Manager Selectors” regarding the various manager selectors. Default: ‘RandomManagerSelector’

__init__(label: str = 'MPIExecutor', provider: ExecutionProvider = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: str | None = None, interchange_launch_cmd: str | None = None, address: str | None = None, loopback_address: str = '127.0.0.1', worker_ports: Tuple[int, int] | None = None, worker_port_range: Tuple[int, int] | None = (54000, 55000), interchange_port_range: Tuple[int, int] | None = (55000, 56000), storage_access: List[Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, max_workers_per_block: int = 1, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, mpi_launcher: str = 'mpiexec', block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] = True, encrypted: bool = False)[source]

Methods

__init__([label, provider, launch_cmd, ...])

connected_blocks()

List of connected block ids

connected_managers()

Returns a list of dicts one for each connected managers.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

get_usage_information()

handle_errors(status)

This method is called by the error management infrastructure after a status poll.

hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

initialize_scaling()

Compose the launch command and scale out the initial blocks.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

poll_facade()

scale_in(blocks[, max_idletime])

Scale in the number of active blocks by specified amount.

scale_in_facade(n[, max_idletime])

scale_out_facade(n)

Scales out the number of blocks by "blocks"

send_monitoring_info(status)

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([timeout])

Shutdown the executor, including the interchange.

start()

Create the Interchange process and connect to it.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, ...)

Submits work to the outgoing_q.

validate_resource_spec(resource_specification)

HTEX supports the following Optional resource specifications: priority: lower value is higher priority

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

connected_workers

Returns the count of workers across all connected managers

enable_mpi_mode

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_zmq_port

Port to the Hub for monitoring.

label

logdir

mpi_launcher

outstanding

Returns the count of tasks outstanding across the interchange and managers

provider

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

status_facade

Return the status of all jobs/blocks of the executor of this poller.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

submit_monitoring_radio

Local radio for sending monitoring messages

tasks

worker_logdir

workers_per_node

validate_resource_spec(resource_specification: dict)[source]

HTEX supports the following Optional resource specifications: priority: lower value is higher priority