parsl.executors.HighThroughputExecutor

class parsl.executors.HighThroughputExecutor(label: str = 'HighThroughputExecutor', provider: ~parsl.providers.base.ExecutionProvider = LocalProvider(     channel=LocalChannel(         envs={},          script_dir=None,          userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/desc/docs'     ),      cmd_timeout=30,      init_blocks=1,      launcher=SingleNodeLauncher(debug=True, fail_on_any=False),      max_blocks=1,      min_blocks=0,      move_files=None,      nodes_per_block=1,      parallelism=1,      worker_init='' ), launch_cmd: str | None = None, interchange_launch_cmd: ~typing.Sequence[str] | None = None, address: str | None = None, worker_ports: ~typing.Tuple[int, int] | None = None, worker_port_range: ~typing.Tuple[int, int] | None = (54000, 55000), interchange_port_range: ~typing.Tuple[int, int] | None = (55000, 56000), storage_access: ~typing.List[~parsl.data_provider.staging.Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: float | None = None, max_workers: int | float | None = None, max_workers_per_node: int | float | None = None, cpu_affinity: str = 'none', available_accelerators: int | ~typing.Sequence[str] = (), prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, enable_mpi_mode: bool = False, mpi_launcher: str = 'mpiexec', manager_selector: ~parsl.executors.high_throughput.manager_selector.ManagerSelector = <parsl.executors.high_throughput.manager_selector.RandomManagerSelector object>, block_error_handler: bool | ~typing.Callable[[~parsl.executors.status_handling.BlockProviderExecutor, ~typing.Dict[str, ~parsl.jobs.states.JobStatus]], None] = True, encrypted: bool = False)[source]

Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
  1. The HighThroughputExecutor instance which is run as part of the Parsl script.

  2. The Interchange which acts as a load-balancing proxy between workers and Parsl

  3. The multiprocessing based worker pool which coordinates task execution over several cores on a node.

  4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

             |  Data   |  Executor   |  Interchange  | External Process(es)
             |  Flow   |             |               |
        Task | Kernel  |             |               |
      +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
      |      |         |             | batching      |    |         |
Parsl<---Fut-|         |             | load-balancing|  result   exception
          ^  |         |             | watchdogs     |    |         |
          |  |         |    Result   |               |    |         |
          |  |         |    Queue    |               |    V         V
          |  |         |    Thread<--|-incoming_q<---|--- +---------+
          |  |         |      |      |               |
          |  |         |      |      |               |
          +----update_fut-----+

Each of the workers in each process_worker_pool has access to its local rank through an environmental variable, PARSL_WORKER_RANK. The local rank is unique for each process and is an integer in the range from 0 to the number of workers per in the pool minus 1. The workers also have access to the ID of the worker pool as PARSL_WORKER_POOL_ID and the size of the worker pool as PARSL_WORKER_COUNT.

Parameters:
  • provider (ExecutionProvider) –

    Provider to access computation resources. Can be one of EC2Provider,

    Cobalt, Condor, GoogleCloud, GridEngine, Local, GridEngine, Slurm, or Torque.

  • label (str) – Label for this executor instance.

  • launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”

  • interchange_launch_cmd (Sequence[str]) – Custom sequence of command line tokens to launch the interchange process from the executor. If undefined, the executor will use the default “interchange.py” command.

  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx). Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. This field can be used to limit the executor to listen only on a specific interface, and limiting connections to the internal network. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. default=None

  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.

  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.

  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.

  • working_dir (str) – Working dir to be used by the executor.

  • worker_debug (Bool) – Enables worker debug logging.

  • prefetch_capacity (int) – Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0.

  • address_probe_timeout (int | None) – Managers attempt connecting over many different addresses to determine a viable address. This option sets a time limit in seconds on the connection attempt. Default of None implies 30s timeout set on worker.

  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default: 120s

  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default: 30s

  • poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms

  • drain_period (int) – The number of seconds after start when workers will begin to drain and then exit. Set this to a time that is slightly less than the maximum walltime of batch jobs to avoid killing tasks while they execute. For example, you could set this to the walltime minus a grace period for the batch job to start the workers, minus the expected maximum length of an individual task.

  • worker_logdir_root (string) – In case of a remote file system, specify the path to where logs will be kept.

  • encrypted (bool) – Flag to enable/disable encryption (CurveZMQ). Default is False.

cores_per_workerfloat

cores to be assigned to each worker. Oversubscription is possible by setting cores_per_worker < 1.0. Default=1

mem_per_workerfloat

GB of memory required per worker. If this option is specified, the node manager will check the available memory at startup and limit the number of workers such that the there’s sufficient memory for each worker. Default: None

max_workersint

Deprecated. Please use max_workers_per_node instead.

max_workers_per_nodeint

Caps the number of workers launched per node. Default: None

cpu_affinity: string

Whether or how each worker process sets thread affinity. Options include “none” to forgo any CPU affinity configuration, “block” to assign adjacent cores to workers (ex: assign 0-1 to worker 0, 2-3 to worker 1), and “alternating” to assign cores to workers in round-robin (ex: assign 0,2 to worker 0, 1,3 to worker 1). The “block-reverse” option assigns adjacent cores to workers, but assigns the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2)

available_accelerators: int | list

Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided accelerators, and no more workers will be launched than the number of accelerators.

Either provide the list of accelerator names or the number available. If a number is provided, Parsl will create names as integers starting with 0.

default: empty list

enable_mpi_mode: bool

If enabled, MPI launch prefixes will be composed for the batch scheduler based on the nodes available in each batch job and the resource_specification dict passed from the app. This is an experimental feature, please refer to the following doc section before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html

mpi_launcher: str

This field is only used if enable_mpi_mode is set. Select one from the list of supported MPI launchers = (“srun”, “aprun”, “mpiexec”). default: “mpiexec”

__init__(label: str = 'HighThroughputExecutor', provider: ~parsl.providers.base.ExecutionProvider = LocalProvider(     channel=LocalChannel(         envs={},          script_dir=None,          userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/desc/docs'     ),      cmd_timeout=30,      init_blocks=1,      launcher=SingleNodeLauncher(debug=True, fail_on_any=False),      max_blocks=1,      min_blocks=0,      move_files=None,      nodes_per_block=1,      parallelism=1,      worker_init='' ), launch_cmd: str | None = None, interchange_launch_cmd: ~typing.Sequence[str] | None = None, address: str | None = None, worker_ports: ~typing.Tuple[int, int] | None = None, worker_port_range: ~typing.Tuple[int, int] | None = (54000, 55000), interchange_port_range: ~typing.Tuple[int, int] | None = (55000, 56000), storage_access: ~typing.List[~parsl.data_provider.staging.Staging] | None = None, working_dir: str | None = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: float | None = None, max_workers: int | float | None = None, max_workers_per_node: int | float | None = None, cpu_affinity: str = 'none', available_accelerators: int | ~typing.Sequence[str] = (), prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, drain_period: int | None = None, poll_period: int = 10, address_probe_timeout: int | None = None, worker_logdir_root: str | None = None, enable_mpi_mode: bool = False, mpi_launcher: str = 'mpiexec', manager_selector: ~parsl.executors.high_throughput.manager_selector.ManagerSelector = <parsl.executors.high_throughput.manager_selector.RandomManagerSelector object>, block_error_handler: bool | ~typing.Callable[[~parsl.executors.status_handling.BlockProviderExecutor, ~typing.Dict[str, ~parsl.jobs.states.JobStatus]], None] = True, encrypted: bool = False)[source]

Methods

__init__([label, provider, launch_cmd, ...])

connected_blocks()

List of connected block ids

connected_managers()

Returns a list of dicts one for each connected managers.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

get_usage_information()

handle_errors(status)

This method is called by the error management infrastructure after a status poll.

hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

initialize_scaling()

Compose the launch command and scale out the initial blocks.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

poll_facade()

scale_in(blocks[, max_idletime])

Scale in the number of active blocks by specified amount.

scale_in_facade(n[, max_idletime])

scale_out_facade(n)

Scales out the number of blocks by "blocks"

send_monitoring_info(status)

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([timeout])

Shutdown the executor, including the interchange.

start()

Create the Interchange process and connect to it.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, ...)

Submits work to the outgoing_q.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

connected_workers

Returns the count of workers across all connected managers

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_zmq_port

Port to the Hub for monitoring.

label

logdir

max_workers

outstanding

Returns the count of tasks outstanding across the interchange and managers

provider

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

status_facade

Return the status of all jobs/blocks of the executor of this poller.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

submit_monitoring_radio

Local radio for sending monitoring messages

tasks

worker_logdir

workers_per_node

connected_blocks() List[str][source]

List of connected block ids

connected_managers() List[Dict[str, Any]][source]

Returns a list of dicts one for each connected managers. The dict contains info on manager(str:manager_id), block_id, worker_count, tasks(int), idle_durations(float), active(bool)

property connected_workers: int[source]

Returns the count of workers across all connected managers

get_usage_information()[source]
hold_worker(worker_id: str) None[source]

Puts a worker on hold, preventing scheduling of additional tasks to it.

This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.

Parameters:

worker_id (str) – Worker id to be put on hold

initialize_scaling()[source]

Compose the launch command and scale out the initial blocks.

property logdir[source]
property max_workers[source]
property outstanding: int[source]

Returns the count of tasks outstanding across the interchange and managers

scale_in(blocks: int, max_idletime: float | None = None) List[str][source]

Scale in the number of active blocks by specified amount.

The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530

Parameters:
  • blocks (int) – Number of blocks to terminate and scale_in by

  • max_idletime (float) –

    A time to indicate how long a block should be idle to be a candidate for scaling in.

    If None then blocks will be force scaled in even if they are busy.

    If a float, then only idle blocks will be terminated, which may be less than the requested number.

Return type:

List of block IDs scaled in

shutdown(timeout: float = 10.0)[source]

Shutdown the executor, including the interchange. This does not shut down any workers directly - workers should be terminated by the scaling mechanism or by heartbeat timeout.

Parameters:

timeout (float) – Amount of time to wait for the Interchange process to terminate before we forcefully kill it.

start()[source]

Create the Interchange process and connect to it.

status() Dict[str, JobStatus][source]

Return the status of all jobs/blocks currently known to this executor.

Returns:

a dictionary mapping block ids (in string) to job status

submit(func, resource_specification, *args, **kwargs)[source]

Submits work to the outgoing_q.

The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:

Parameters:
  • func (-) – Callable function

  • resource_specification (-) – Dictionary containing relevant info about task that is needed by underlying executors.

  • args (-) – List of arbitrary positional arguments.

Kwargs:
  • kwargs (dict) : A dictionary of arbitrary keyword args for func.

Returns:

Future

property worker_logdir[source]
property workers_per_node: int | float[source]