parsl.executors.HighThroughputExecutor

class parsl.executors.HighThroughputExecutor(label: str = 'HighThroughputExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.2.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: Optional[str] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000), storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, working_dir: Optional[str] = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: Optional[float] = None, max_workers: Union[int, float] = inf, cpu_affinity: str = 'none', prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, poll_period: int = 10, address_probe_timeout: Optional[int] = None, managed: bool = True, worker_logdir_root: Optional[str] = None)[source]

Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
  1. The HighThroughputExecutor instance which is run as part of the Parsl script.

  2. The Interchange which is acts as a load-balancing proxy between workers and Parsl

  3. The multiprocessing based worker pool which coordinates task execution over several cores on a node.

  4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

             |  Data   |  Executor   |  Interchange  | External Process(es)
             |  Flow   |             |               |
        Task | Kernel  |             |               |
      +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
      |      |         |             | batching      |    |         |
Parsl<---Fut-|         |             | load-balancing|  result   exception
          ^  |         |             | watchdogs     |    |         |
          |  |         |   Q_mngmnt  |               |    V         V
          |  |         |    Thread<--|-incoming_q<---|--- +---------+
          |  |         |      |      |               |
          |  |         |      |      |               |
          +----update_fut-----+

Each of the workers in each process_worker_pool has access to its local rank through an environmental variable, PARSL_WORKER_RANK. The local rank is unique for each process and is an integer in the range from 0 to the number of workers per in the pool minus 1. The workers also have access to the ID of the worker pool as PARSL_WORKER_POOL_ID and the size of the worker pool as PARSL_WORKER_COUNT.

Parameters
  • provider (ExecutionProvider) –

    Provider to access computation resources. Can be one of EC2Provider,

    Cobalt, Condor, GoogleCloud, GridEngine, Local, GridEngine, Slurm, or Torque.

  • label (str) – Label for this executor instance.

  • launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”

  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This can be either a hostname as returned by hostname or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. default=None

  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.

  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.

  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.

  • working_dir (str) – Working dir to be used by the executor.

  • worker_debug (Bool) – Enables worker debug logging.

  • managed (Bool) – If this executor is managed by the DFK or externally handled.

  • cores_per_worker (float) – cores to be assigned to each worker. Oversubscription is possible by setting cores_per_worker < 1.0. Default=1

  • mem_per_worker (float) – GB of memory required per worker. If this option is specified, the node manager will check the available memory at startup and limit the number of workers such that the there’s sufficient memory for each worker. Default: None

  • max_workers (int) – Caps the number of workers launched per node. Default: infinity

  • cpu_affinity (string) – Whether or how each worker process sets thread affinity. Options are “none” to forgo any CPU affinity configuration, “block” to assign adjacent cores to workers (ex: assign 0-1 to worker 0, 2-3 to worker 1), and “alternating” to assign cores to workers in round-robin (ex: assign 0,2 to worker 0, 1,3 to worker 1).

  • prefetch_capacity (int) – Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0.

  • address_probe_timeout (int | None) – Managers attempt connecting over many different addesses to determine a viable address. This option sets a time limit in seconds on the connection attempt. Default of None implies 30s timeout set on worker.

  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default: 120s

  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default: 30s

  • poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms

  • worker_logdir_root (string) – In case of a remote file system, specify the path to where logs will be kept.

__init__(label: str = 'HighThroughputExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.2.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd: Optional[str] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000), storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, working_dir: Optional[str] = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: Optional[float] = None, max_workers: Union[int, float] = inf, cpu_affinity: str = 'none', prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, poll_period: int = 10, address_probe_timeout: Optional[int] = None, managed: bool = True, worker_logdir_root: Optional[str] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([label, provider, launch_cmd, …])

Initialize self.

create_monitoring_info(status)

Create a msg for monitoring based on the poll status

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

initialize_scaling()

Compose the launch command and call the scale_out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

scale_in([blocks, block_ids, force, …])

Scale in the number of active blocks by specified amount.

scale_out([blocks])

Scales out the number of blocks by “blocks”

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([hub, targets, block])

Shutdown the executor, including all workers and controllers.

start()

Create the Interchange process and connect to it.

status()

Return status of all blocks.

submit(func, resource_specification, *args, …)

Submits work to the the outgoing_q.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

connected_managers

connected_workers

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

outstanding

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

provider

run_dir

Path to the run directory.

scaling_enabled

Specify if scaling is enabled.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

workers_per_node

property connected_managers[source]
property connected_workers[source]
create_monitoring_info(status)[source]

Create a msg for monitoring based on the poll status

hold_worker(worker_id)[source]

Puts a worker on hold, preventing scheduling of additional tasks to it.

This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.

Parameters

worker_id (str) – Worker id to be put on hold

initialize_scaling()[source]

Compose the launch command and call the scale_out

This should be implemented in the child classes to take care of executor specific oddities.

label: str[source]
property outstanding[source]

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

scale_in(blocks=None, block_ids=[], force=True, max_idletime=None)[source]

Scale in the number of active blocks by specified amount.

The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530

Parameters
  • blocks (int) – Number of blocks to terminate and scale_in by

  • force (Bool) – Used along with blocks to indicate whether blocks should be terminated by force. When force = True, we will kill blocks regardless of the blocks being busy When force = False, Only idle blocks will be terminated. If the # of idle_blocks < blocks, the list of jobs marked for termination will be in the range: 0 - blocks.

  • max_idletime (float) – A time to indicate how long a block can be idle. Used along with force = False to kill blocks that have been idle for that long.

  • block_ids (list) – List of specific block ids to terminate. Optional

Returns

Return type

List of job_ids marked for termination

property scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

shutdown(hub=True, targets='all', block=False)[source]

Shutdown the executor, including all workers and controllers.

This is not implemented.

Kwargs:
  • hub (Bool): Whether the hub should be shutdown, Default: True,

  • targets (list of ints| ‘all’): List of block id’s to kill, Default: ‘all’

  • block (Bool): To block for confirmations or not

start()[source]

Create the Interchange process and connect to it.

submit(func, resource_specification, *args, **kwargs)[source]

Submits work to the the outgoing_q.

The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:

Parameters
  • func (-) – Callable function

  • args (-) – List of arbitrary positional arguments.

Kwargs:
  • kwargs (dict) : A dictionary of arbitrary keyword args for func.

Returns

Future

property workers_per_node[source]