parsl.executors.MPIExecutor
- class parsl.executors.MPIExecutor(label: str = 'MPIExecutor', provider: ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: str | None = None, interchange_launch_cmd: str | None = None, address: str | None = None, worker_ports: Tuple[int, int] | None = None, worker_port_range: Tuple[int, int] | None = (54000, 55000), interchange_port_range: Tuple[int, int] | None = (55000, 56000), storage_access: List[Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, max_workers_per_block: int = 1, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, mpi_launcher: str = 'mpiexec', block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] = True, encrypted: bool = False)[source]
A version of
HighThroughputExecutor
tuned for executing multi-node (e.g., MPI) tasks.The Provider _must_ use the
SimpleLauncher
, which places a single pool of workers on the first node of a block. Each worker can then make system calls which use an MPI launcher (e.g.,mpirun
,srun
) to spawn multi-node tasks.Specify the maximum number of multi-node tasks to run at once using
max_workers_per_block
. The value should be less than or equal to thenodes_per_block
in the Provider.- Parameters:
max_workers_per_block (int) – Maximum number of MPI applications to run at once per block
mpi_launcher (str) – Select one from the list of supported MPI launchers: (“srun”, “aprun”, “mpiexec”). default: “mpiexec”
provider (
ExecutionProvider
) –- Provider to access computation resources. Can be one of
EC2Provider
, Cobalt
,Condor
,GoogleCloud
,GridEngine
,Local
,GridEngine
,Slurm
, orTorque
.
- Provider to access computation resources. Can be one of
label (str) – Label for this executor instance.
launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”
interchange_launch_cmd (Sequence[str]) – Custom sequence of command line tokens to launch the interchange process from the executor. If undefined, the executor will use the default “interchange.py” command.
address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx). Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. This field can be used to limit the executor to listen only on a specific interface, and limiting connections to the internal network. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. default=None
worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.
worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.
interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.
working_dir (str) – Working dir to be used by the executor.
worker_debug (Bool) – Enables worker debug logging.
prefetch_capacity (int) – Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0.
address_probe_timeout (int | None) – Managers attempt connecting over many different addresses to determine a viable address. This option sets a time limit in seconds on the connection attempt. Default of None implies 30s timeout set on worker.
heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default: 120s
heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default: 30s
poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms
drain_period (int) – The number of seconds after start when workers will begin to drain and then exit. Set this to a time that is slightly less than the maximum walltime of batch jobs to avoid killing tasks while they execute. For example, you could set this to the walltime minus a grace period for the batch job to start the workers, minus the expected maximum length of an individual task.
worker_logdir_root (string) – In case of a remote file system, specify the path to where logs will be kept.
encrypted (bool) – Flag to enable/disable encryption (CurveZMQ). Default is False.
- __init__(label: str = 'MPIExecutor', provider: ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: str | None = None, interchange_launch_cmd: str | None = None, address: str | None = None, worker_ports: Tuple[int, int] | None = None, worker_port_range: Tuple[int, int] | None = (54000, 55000), interchange_port_range: Tuple[int, int] | None = (55000, 56000), storage_access: List[Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, max_workers_per_block: int = 1, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, mpi_launcher: str = 'mpiexec', block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] = True, encrypted: bool = False)[source]
Methods
__init__
([label, provider, launch_cmd, ...])connected_blocks
()List of connected block ids
connected_managers
()Returns a list of dicts one for each connected managers.
create_monitoring_info
(status)Create a monitoring message for each block based on the poll status.
get_usage_information
()handle_errors
(status)This method is called by the error management infrastructure after a status poll.
hold_worker
(worker_id)Puts a worker on hold, preventing scheduling of additional tasks to it.
initialize_scaling
()Compose the launch command and scale out the initial blocks.
monitor_resources
()Should resource monitoring happen for tasks on running on this executor?
poll_facade
()scale_in
(blocks[, max_idletime])Scale in the number of active blocks by specified amount.
scale_in_facade
(n[, max_idletime])scale_out_facade
(n)Scales out the number of blocks by "blocks"
send_monitoring_info
(status)set_bad_state_and_fail_all
(exception)Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.
shutdown
([timeout])Shutdown the executor, including the interchange.
start
()Create the Interchange process and connect to it.
status
()Return the status of all jobs/blocks currently known to this executor.
submit
(func, resource_specification, *args, ...)Submits work to the outgoing_q.
validate_resource_spec
(resource_specification)HTEX does not support any resource_specification options and will raise InvalidResourceSpecification is any are passed to it
Attributes
bad_state_is_set
Returns true if this executor is in an irrecoverable error state.
connected_workers
Returns the count of workers across all connected managers
enable_mpi_mode
executor_exception
Returns an exception that indicates why this executor is in an irrecoverable state.
hub_address
Address to the Hub for monitoring.
hub_zmq_port
Port to the Hub for monitoring.
label
logdir
mpi_launcher
outstanding
Returns the count of tasks outstanding across the interchange and managers
provider
radio_mode
run_dir
Path to the run directory.
run_id
UUID for the enclosing DFK.
status_facade
Return the status of all jobs/blocks of the executor of this poller.
status_polling_interval
Returns the interval, in seconds, at which the status method should be called.
submit_monitoring_radio
Local radio for sending monitoring messages
tasks
worker_logdir
workers_per_node