parsl.executors.WorkQueueExecutor
- class parsl.executors.WorkQueueExecutor(label: str = 'WorkQueueExecutor', provider: ExecutionProvider = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', project_name: str | None = None, project_password_file: str | None = None, address: str | None = None, port: int = 0, env: Dict | None = None, shared_fs: bool = False, storage_access: List[Staging] | None = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: List[str] | None = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: int = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker', function_dir: str | None = None, coprocess: bool = False, scaling_cores_per_worker: int = 1)[source]
Executor to use Work Queue batch system
The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.
- Parameters:
label (str) – A human readable label for the executor, unique with respect to other Work Queue master programs. Default is “WorkQueueExecutor”.
working_dir (str) – Location for Parsl to perform app delegation to the Work Queue system. Defaults to current directory.
project_name (str) – If a project_name is given, then Work Queue will periodically report its status and performance back to the global WQ catalog, which can be viewed here: http://ccl.cse.nd.edu/software/workqueue/status Default is None. Overrides address.
project_password_file (str) – Optional password file for the work queue project. Default is None.
address (str) – The ip to contact this work queue master process. If not given, uses the address of the current machine as returned by socket.gethostname(). Ignored if project_name is specified.
port (int) –
TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will connect to Parsl using this port.
If 0, Work Queue will allocate a port number automatically. In this case, environment variables can be used to influence the choice of port, documented here: https://ccl.cse.nd.edu/software/manuals/api/html/work__queue_8h.html#a21714a10bcdfcf5c3bd44a96f5dcbda6 Default: WORK_QUEUE_DEFAULT_PORT.
env (dict{str}) – Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine.
shared_fs (bool) – Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution. Default is False.
use_cache (bool) – Whether workers should cache files that are common to tasks. Warning: Two files are considered the same if they have the same filepath name. Use with care when reusing the executor instance across multiple parsl workflows. Default is False.
source (bool) – Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.) Default is False. Set to True if pack is True
pack (bool) – Use conda-pack to prepare a self-contained Python evironment for each task. This greatly increases task latency, but does not require a common environment or shared FS on execution nodes. Implies source=True.
extra_pkgs (list) – List of extra pip/conda package names to include when packing the environment. This may be useful if the app executes other (possibly non-Python) programs provided via pip or conda. Scanning the app source for imports would not detect these dependencies, so they need to be manually specified.
autolabel (bool) – Use the Resource Monitor to automatically determine resource labels based on observed task behavior.
autolabel_window (int) – Set the number of tasks considered for autolabeling. Work Queue will wait for a series of N tasks with steady resource requirements before making a decision on labels. Increasing this parameter will reduce the number of failed tasks due to resource exhaustion when autolabeling, at the cost of increased resources spent collecting stats.
autocategory (bool) – Place each app in its own category by default. If all invocations of an app have similar performance characteristics, this will provide a reasonable set of categories automatically.
max_retries (int) – Set the number of retries that Work Queue will make when a task fails. This is distinct from Parsl level retries configured in parsl.config.Config. Set to None to allow Work Queue to retry tasks forever. By default, this is set to 1, so that all retries will be managed by Parsl.
init_command (str) – Command line to run before executing a task in a worker. Default is ‘’.
worker_options (str) – Extra options passed to work_queue_worker. Default is ‘’.
worker_executable (str) – The command used to invoke work_queue_worker. This can be used when the worker needs to be wrapped inside some other command (for example, to run the worker inside a container). Default is ‘work_queue_worker’.
function_dir (str) – The directory where serialized function invocations are placed to be sent to workers. If undefined, this defaults to a directory under runinfo/. If shared_filesystem=True, then this directory must be visible from both the submitting side and workers.
coprocess (bool) – Use Work Queue’s coprocess facility to avoid launching a new Python process for each task. Experimental. This requires a version of Work Queue / cctools after commit 874df524516441da531b694afc9d591e8b134b73 (release 7.5.0 is too early). Default is False.
scaling_cores_per_worker (int) – When using Parsl scaling, this specifies the number of cores that a worker is expected to have available for computation. Default 1. This parameter can be ignored when using a fixed number of blocks, or when using one task per worker (by omitting a
cores
resource specifiation for each task).
- __init__(label: str = 'WorkQueueExecutor', provider: ExecutionProvider = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), working_dir: str = '.', project_name: str | None = None, project_password_file: str | None = None, address: str | None = None, port: int = 0, env: Dict | None = None, shared_fs: bool = False, storage_access: List[Staging] | None = None, use_cache: bool = False, source: bool = False, pack: bool = False, extra_pkgs: List[str] | None = None, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, max_retries: int = 1, init_command: str = '', worker_options: str = '', full_debug: bool = True, worker_executable: str = 'work_queue_worker', function_dir: str | None = None, coprocess: bool = False, scaling_cores_per_worker: int = 1)[source]
Methods
__init__
([label, provider, working_dir, ...])create_monitoring_info
(status)Create a monitoring message for each block based on the poll status.
handle_errors
(status)This method is called by the error management infrastructure after a status poll.
Compose the launch command and call scale out
monitor_resources
()Should resource monitoring happen for tasks on running on this executor?
poll_facade
()scale_in
(blocks)Scale in method.
scale_in_facade
(n[, max_idletime])scale_out_facade
(n)Scales out the number of blocks by "blocks"
send_monitoring_info
(status)set_bad_state_and_fail_all
(exception)Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.
shutdown
(*args, **kwargs)Shutdown the executor.
start
()Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.
status
()Return the status of all jobs/blocks currently known to this executor.
submit
(func, resource_specification, *args, ...)Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system.
Attributes
bad_state_is_set
Returns true if this executor is in an irrecoverable error state.
executor_exception
Returns an exception that indicates why this executor is in an irrecoverable state.
hub_address
Address to the Hub for monitoring.
hub_zmq_port
Port to the Hub for monitoring.
label
Count the number of outstanding slots required.
provider
run_dir
Path to the run directory.
run_id
UUID for the enclosing DFK.
status_facade
Return the status of all jobs/blocks of the executor of this poller.
status_polling_interval
Returns the interval, in seconds, at which the status method should be called.
submit_monitoring_radio
Local radio for sending monitoring messages
tasks
- initialize_scaling()[source]
Compose the launch command and call scale out
Scales the workers to the appropriate nodes with provider
- property outstanding: int[source]
Count the number of outstanding slots required. This is inefficiently implemented and probably could be replaced with a counter.
- shutdown(*args, **kwargs)[source]
Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission.
- start()[source]
Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.
- submit(func, resource_specification, *args, **kwargs)[source]
Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the Work Queue task.