parsl.executors.WorkQueueExecutor

class parsl.executors.WorkQueueExecutor(label: str = 'WorkQueueExecutor', provider: ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', project_name: str | None = None, project_password_file: str | None = None, address: str | None = None, port: int = 0, env: Dict | None = None, shared_fs: bool = False, storage_access: List[Staging] | None = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: List[str] | None = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: int = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker', function_dir: str | None = None, coprocess: bool = False)[source]

Executor to use Work Queue batch system

The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.

Parameters:
  • label (str) – A human readable label for the executor, unique with respect to other Work Queue master programs. Default is “WorkQueueExecutor”.

  • working_dir (str) – Location for Parsl to perform app delegation to the Work Queue system. Defaults to current directory.

  • project_name (str) – If a project_name is given, then Work Queue will periodically report its status and performance back to the global WQ catalog, which can be viewed here: http://ccl.cse.nd.edu/software/workqueue/status Default is None. Overrides address.

  • project_password_file (str) – Optional password file for the work queue project. Default is None.

  • address (str) – The ip to contact this work queue master process. If not given, uses the address of the current machine as returned by socket.gethostname(). Ignored if project_name is specified.

  • port (int) –

    TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will connect to Parsl using this port.

    If 0, Work Queue will allocate a port number automatically. In this case, environment variables can be used to influence the choice of port, documented here: https://ccl.cse.nd.edu/software/manuals/api/html/work__queue_8h.html#a21714a10bcdfcf5c3bd44a96f5dcbda6 Default: WORK_QUEUE_DEFAULT_PORT.

  • env (dict{str}) – Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine.

  • shared_fs (bool) – Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution. Default is False.

  • use_cache (bool) – Whether workers should cache files that are common to tasks. Warning: Two files are considered the same if they have the same filepath name. Use with care when reusing the executor instance across multiple parsl workflows. Default is False.

  • source (bool) – Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.) Default is False. Set to True if pack is True

  • pack (bool) – Use conda-pack to prepare a self-contained Python evironment for each task. This greatly increases task latency, but does not require a common environment or shared FS on execution nodes. Implies source=True.

  • extra_pkgs (list) – List of extra pip/conda package names to include when packing the environment. This may be useful if the app executes other (possibly non-Python) programs provided via pip or conda. Scanning the app source for imports would not detect these dependencies, so they need to be manually specified.

  • autolabel (bool) – Use the Resource Monitor to automatically determine resource labels based on observed task behavior.

  • autolabel_window (int) – Set the number of tasks considered for autolabeling. Work Queue will wait for a series of N tasks with steady resource requirements before making a decision on labels. Increasing this parameter will reduce the number of failed tasks due to resource exhaustion when autolabeling, at the cost of increased resources spent collecting stats.

  • autocategory (bool) – Place each app in its own category by default. If all invocations of an app have similar performance characteristics, this will provide a reasonable set of categories automatically.

  • max_retries (int) – Set the number of retries that Work Queue will make when a task fails. This is distinct from Parsl level retries configured in parsl.config.Config. Set to None to allow Work Queue to retry tasks forever. By default, this is set to 1, so that all retries will be managed by Parsl.

  • init_command (str) – Command line to run before executing a task in a worker. Default is ‘’.

  • worker_options (str) – Extra options passed to work_queue_worker. Default is ‘’.

  • worker_executable (str) – The command used to invoke work_queue_worker. This can be used when the worker needs to be wrapped inside some other command (for example, to run the worker inside a container). Default is ‘work_queue_worker’.

  • function_dir (str) – The directory where serialized function invocations are placed to be sent to workers. If undefined, this defaults to a directory under runinfo/. If shared_filesystem=True, then this directory must be visible from both the submitting side and workers.

  • coprocess (bool) – Use Work Queue’s coprocess facility to avoid launching a new Python process for each task. Experimental. This requires a version of Work Queue / cctools after commit 874df524516441da531b694afc9d591e8b134b73 (release 7.5.0 is too early). Default is False.

__init__(label: str = 'WorkQueueExecutor', provider: ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', project_name: str | None = None, project_password_file: str | None = None, address: str | None = None, port: int = 0, env: Dict | None = None, shared_fs: bool = False, storage_access: List[Staging] | None = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: List[str] | None = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: int = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker', function_dir: str | None = None, coprocess: bool = False)[source]

Methods

__init__([label, provider, working_dir, ...])

atexit_cleanup()

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(status)

This method is called by the error management infrastructure after a status poll.

initialize_scaling()

Compose the launch command and call scale out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

poll_facade()

scale_in(count)

Scale in method.

scale_in_facade(n[, max_idletime])

scale_out([blocks])

Scales out the number of blocks by "blocks"

scale_out_facade(n)

send_monitoring_info(status)

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown(*args, **kwargs)

Shutdown the executor.

start()

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, ...)

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

label

monitoring_radio

Local radio for sending monitoring messages

outstanding

Count the number of outstanding tasks.

provider

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

status_facade

Return the status of all jobs/blocks of the executor of this poller.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

workers_per_node

atexit_cleanup()[source]
initialize_scaling()[source]

Compose the launch command and call scale out

Scales the workers to the appropriate nodes with provider

property outstanding: int[source]

Count the number of outstanding tasks. This is inefficiently implemented and probably could be replaced with a counter.

radio_mode: str = 'filesystem'[source]
scale_in(count: int) List[str][source]

Scale in method.

shutdown(*args, **kwargs)[source]

Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission.

start()[source]

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

submit(func, resource_specification, *args, **kwargs)[source]

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the Work Queue task.

Parameters:
  • func (function) – Parsl app to be submitted to the Work Queue system

  • args (list) – Arguments to the Parsl app

  • kwargs (dict) – Keyword arguments to the Parsl app

property workers_per_node: int | float[source]