parsl.executors.WorkQueueExecutor

class parsl.executors.WorkQueueExecutor(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: Optional[List[str]] = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: Optional[int] = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker')[source]

Executor to use Work Queue batch system

The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.

Parameters
  • label (str) – A human readable label for the executor, unique with respect to other Work Queue master programs. Default is “WorkQueueExecutor”.

  • working_dir (str) – Location for Parsl to perform app delegation to the Work Queue system. Defaults to current directory.

  • managed (bool) – Whether this executor is managed by the DFK or externally handled. Default is True (managed by DFK).

  • project_name (str) – If a project_name is given, then Work Queue will periodically report its status and performance back to the global WQ catalog, which can be viewed here: http://ccl.cse.nd.edu/software/workqueue/status Default is None. Overrides address.

  • project_password_file (str) – Optional password file for the work queue project. Default is None.

  • address (str) – The ip to contact this work queue master process. If not given, uses the address of the current machine as returned by socket.gethostname(). Ignored if project_name is specified.

  • port (int) – TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will specify this port number when trying to connect to Parsl. Default is 9123.

  • env (dict{str}) – Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine.

  • shared_fs (bool) – Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution. Default is False.

  • use_cache (bool) – Whether workers should cache files that are common to tasks. Warning: Two files are considered the same if they have the same filepath name. Use with care when reusing the executor instance across multiple parsl workflows. Default is False.

  • source (bool) – Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.) Default is False. Set to True if pack is True

  • pack (bool) – Use conda-pack to prepare a self-contained Python evironment for each task. This greatly increases task latency, but does not require a common environment or shared FS on execution nodes. Implies source=True.

  • extra_pkgs (list) – List of extra pip/conda package names to include when packing the environment. This may be useful if the app executes other (possibly non-Python) programs provided via pip or conda. Scanning the app source for imports would not detect these dependencies, so they need to be manually specified.

  • autolabel (bool) – Use the Resource Monitor to automatically determine resource labels based on observed task behavior.

  • autolabel_window (int) – Set the number of tasks considered for autolabeling. Work Queue will wait for a series of N tasks with steady resource requirements before making a decision on labels. Increasing this parameter will reduce the number of failed tasks due to resource exhaustion when autolabeling, at the cost of increased resources spent collecting stats.

  • autocategory (bool) – Place each app in its own category by default. If all invocations of an app have similar performance characteristics, this will provide a reasonable set of categories automatically.

  • max_retries (Optional[int]) – Set the number of retries that Work Queue will make when a task fails. This is distinct from Parsl level retries configured in parsl.config.Config. Set to None to allow Work Queue to retry tasks forever. By default, this is set to 1, so that all retries will be managed by Parsl.

  • init_command (str) – Command line to run before executing a task in a worker. Default is ‘’.

  • worker_options (str) – Extra options passed to work_queue_worker. Default is ‘’.

  • worker_executable (str) – The command used to invoke work_queue_worker. This can be used when the worker needs to be wrapped inside some other command (for example, to run the worker inside a container). Default is ‘work_queue_worker’.

__init__(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/stable/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: Optional[List[str]] = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: Optional[int] = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker')[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([label, provider, working_dir, …])

Initialize self.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

initialize_scaling()

Compose the launch command and call scale out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

run_dir([value])

Path to the run directory.

scale_in(count)

Scale in method.

scale_out([blocks])

Scales out the number of blocks by “blocks”

scaling_enabled()

Specify if scaling is enabled.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown(*args, **kwargs)

Shutdown the executor.

start()

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

status()

Return status of all blocks.

submit(func, resource_specification, *args, …)

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

outstanding

Count the number of outstanding tasks.

provider

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

workers_per_node

initialize_scaling()[source]

Compose the launch command and call scale out

Scales the workers to the appropriate nodes with provider

label: str[source]
property outstanding[source]

Count the number of outstanding tasks. This is inefficiently implemented and probably could be replaced with a counter.

run_dir(value=None)[source]

Path to the run directory.

scale_in(count)[source]

Scale in method. Not implemented.

scaling_enabled()[source]

Specify if scaling is enabled. Not enabled in Work Queue.

shutdown(*args, **kwargs)[source]

Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission.

start()[source]

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

submit(func, resource_specification, *args, **kwargs)[source]

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the Work Queue task.

Parameters
  • func (function) – Parsl app to be submitted to the Work Queue system

  • args (list) – Arguments to the Parsl app

  • kwargs (dict) – Keyword arguments to the Parsl app

property workers_per_node[source]