parsl.executors.WorkQueueExecutor

class parsl.executors.WorkQueueExecutor(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.1.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init=''), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: Optional[List[str]] = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, init_command: str = '', worker_options: str = '', full_debug: bool = True)[source]

Executor to use Work Queue batch system

The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.

Parameters
  • label (str) – A human readable label for the executor, unique with respect to other Work Queue master programs. Default is “WorkQueueExecutor”.

  • working_dir (str) – Location for Parsl to perform app delegation to the Work Queue system. Defaults to current directory.

  • managed (bool) – Whether this executor is managed by the DFK or externally handled. Default is True (managed by DFK).

  • project_name (str) – If given, Work Queue master process name. Default is None. Overrides address.

  • project_password_file (str) – Optional password file for the work queue project. Default is None.

  • address (str) – The ip to contact this work queue master process. If not given, uses the address of the current machine as returned by socket.gethostname(). Ignored if project_name is specified.

  • port (int) – TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will specify this port number when trying to connect to Parsl. Default is 9123.

  • env (dict{str}) – Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine.

  • shared_fs (bool) – Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution. Default is False.

  • use_cache (bool) – Whether workers should cache files that are common to tasks. Warning: Two files are considered the same if they have the same filepath name. Use with care when reusing the executor instance across multiple parsl workflows. Default is False.

  • source (bool) – Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.) Default is False. Set to True if pack is True

  • pack (bool) – Use conda-pack to prepare a self-contained Python evironment for each task. This greatly increases task latency, but does not require a common environment or shared FS on execution nodes. Implies source=True.

  • extra_pkgs (list) – List of extra pip/conda package names to include when packing the environment. This may be useful if the app executes other (possibly non-Python) programs provided via pip or conda. Scanning the app source for imports would not detect these dependencies, so they need to be manually specified.

  • autolabel (bool) – Use the Resource Monitor to automatically determine resource labels based on observed task behavior.

  • autolabel_window (int) – Set the number of tasks considered for autolabeling. Work Queue will wait for a series of N tasks with steady resource requirements before making a decision on labels. Increasing this parameter will reduce the number of failed tasks due to resource exhaustion when autolabeling, at the cost of increased resources spent collecting stats.

  • autocategory (bool) – Place each app in its own category by default. If all invocations of an app have similar performance characteristics, this will provide a reasonable set of categories automatically.

  • init_command (str) – Command line to run before executing a task in a worker. Default is ‘’.

  • worker_options (str) – Extra options passed to work_queue_worker. Default is ‘’.

__init__(label: str = 'WorkQueueExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/1.1.0/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init=''), working_dir: str = '.', managed: bool = True, project_name: Optional[str] = None, project_password_file: Optional[str] = None, address: Optional[str] = None, port: int = 0, env: Optional[Dict] = None, shared_fs: bool = False, storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: Optional[List[str]] = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, init_command: str = '', worker_options: str = '', full_debug: bool = True)[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([label, provider, working_dir, …])

Initialize self.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

initialize_scaling()

Compose the launch command and call scale out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

run_dir([value])

Path to the run directory.

scale_in(count)

Scale in method.

scale_out([blocks])

Scale out method.

scaling_enabled()

Specify if scaling is enabled.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown(*args, **kwargs)

Shutdown the executor.

start()

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, …)

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

provider

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.