parsl.executors.WorkQueueExecutor

class parsl.executors.WorkQueueExecutor(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = False, init_command: str = '', full_debug: bool = True)[source]

Executor to use Work Queue batch system

The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.

label: str

A human readable label for the executor, unique with respect to other Work Queue master programs. Default is “WorkQueueExecutor”.

working_dir: str

Location for Parsl to perform app delegation to the Work Queue system. Defaults to current directory.

managed: bool

Whether this executor is managed by the DFK or externally handled. Default is True (managed by DFK).

project_name: str

If given, Work Queue master process name. Default is None. Overrides address.

project_password_file: str

Optional password file for the work queue project. Default is None.

address: str

The ip to contact this work queue master process. If not given, uses the address of the current machine as returned by socket.gethostname(). Ignored if project_name is specified.

port: int

TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will specify this port number when trying to connect to Parsl. Default is 9123.

env: dict{str}

Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine.

shared_fs: bool

Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution. Default is False.

use_cache: bool

Whether workers should cache files that are common to tasks. Warning: Two files are considered the same if they have the same filepath name. Use with care when reusing the executor instance across multiple parsl workflows. Default is False.

source: bool

Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.) Default is False. Set to True if pack is True

pack: bool

Use conda-pack to prepare a self-contained Python evironment for each task. This greatly increases task latency, but does not require a common environment or shared FS on execution nodes. Implies source=True.

autolabel: bool

Use the Resource Monitor to automatically determine resource labels based on observed task behavior.

autolabel_window: int

Set the number of tasks considered for autolabeling. Work Queue will wait for a series of N tasks with steady resource requirements before making a decision on labels. Increasing this parameter will reduce the number of failed tasks due to resource exhaustion when autolabeling, at the cost of increased resources spent collecting stats.

autocategory: bool

Place each app in its own category by default. If all invocations of an app have similar performance characteristics, this will provide a reasonable set of categories automatically.

init_command: str

Command line to run before executing a task in a worker. Default is ‘’.

__init__(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = False, init_command: str = '', full_debug: bool = True)[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([label, provider, working_dir, …])

Initialize self.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

initialize_scaling()

Compose the launch command and call scale out

run_dir([value])

Path to the run directory.

scale_in(count)

Scale in method.

scale_out([blocks])

Scale out method.

scaling_enabled()

Specify if scaling is enabled.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown(*args, **kwargs)

Shutdown the executor.

start()

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, …)

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

provider

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.