parsl.executors.ExtremeScaleExecutor

class parsl.executors.ExtremeScaleExecutor(label='ExtremeScaleExecutor', provider=LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.2.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, ranks_per_node=1, heartbeat_threshold=120, heartbeat_period=30, managed=True)[source]

Executor designed for leadership class supercomputer scale

The ExtremeScaleExecutor extends the Executor interface to enable task execution on supercomputing systems (>1K Nodes). When functions and their arguments are submitted to the interface, a future is returned that tracks the execution of the function on a distributed compute environment.

The ExtremeScaleExecutor system has the following components:
  1. The ExtremeScaleExecutor instance which is run as part of the Parsl script

  2. The Interchange which is acts as a load-balancing proxy between workers and Parsl

  3. The MPI based mpi_worker_pool which coordinates task execution over several nodes With MPI communication between workers, we can exploit low latency networking on HPC systems.

  4. ZeroMQ pipes that connect the ExtremeScaleExecutor, Interchange and the mpi_worker_pool

Our design assumes that there is a single MPI application (mpi_worker_pool) running over a block and that there might be several such instances.

Here is a diagram

             |  Data   |  Executor   |  Interchange  | External Process(es)
             |  Flow   |             |               |
        Task | Kernel  |             |               |
      +----->|-------->|------------>|->outgoing_q---|-> mpi_worker_pool
      |      |         |             | batching      |    |         |
Parsl<---Fut-|         |             | load-balancing|  result   exception
          ^  |         |             | watchdogs     |    |         |
          |  |         |   Q_mngmnt  |               |    V         V
          |  |         |    Thread<--|-incoming_q<---|--- +---------+
          |  |         |      |      |               |
          |  |         |      |      |               |
          +----update_fut-----+
Parameters
  • provider (ExecutionProvider) –

    Provider to access computation resources. Can be any providers in parsl.providers:

    Cobalt, Condor, GoogleCloud, GridEngine, Local, GridEngine, Slurm, or Torque.

  • label (str) – Label for this executor instance.

  • launch_cmd (str) – Command line string to launch the mpi_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, ranks_per_node, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”mpiexec -np {ranks_per_node} mpi_worker_pool.py {debug} –task_url={task_url} –result_url={result_url}”

  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This can be either a hostname as returned by hostname or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. Some trial and error might be necessary to identify what addresses are reachable from compute nodes.

  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.

  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.

  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.

  • working_dir (str) – Working dir to be used by the executor.

  • worker_debug (Bool) – Enables engine debug logging.

  • managed (Bool) – If this executor is managed by the DFK or externally handled.

  • ranks_per_node (int) – Specify the ranks to be launched per node.

  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default:120s

  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default:30s

__init__(label='ExtremeScaleExecutor', provider=LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.2.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, ranks_per_node=1, heartbeat_threshold=120, heartbeat_period=30, managed=True)[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([label, provider, launch_cmd, …])

Initialize self.

create_monitoring_info(status)

Create a msg for monitoring based on the poll status

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

initialize_scaling()

Compose the launch command and call the scale_out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

scale_in([blocks, block_ids, force, …])

Scale in the number of active blocks by specified amount.

scale_out([blocks])

Scales out the number of blocks by “blocks”

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([hub, targets, block])

Shutdown the executor, including all workers and controllers.

start()

Create the Interchange process and connect to it.

status()

Return status of all blocks.

submit(func, resource_specification, *args, …)

Submits work to the the outgoing_q.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

connected_managers

connected_workers

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

outstanding

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

provider

run_dir

Path to the run directory.

scaling_enabled

Specify if scaling is enabled.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

workers_per_node

initialize_scaling()[source]

Compose the launch command and call the scale_out

This should be implemented in the child classes to take care of executor specific oddities.

label: str[source]
start()[source]

Create the Interchange process and connect to it.