parsl.executors.taskvine.TaskVineExecutor
- class parsl.executors.taskvine.TaskVineExecutor(label: str = 'TaskVineExecutor', worker_launch_method: Literal['provider'] | Literal['factory'] | Literal['manual'] = 'factory', function_exec_mode: Literal['regular'] | Literal['serverless'] = 'regular', manager_config: TaskVineManagerConfig = TaskVineManagerConfig(port=0, address=None, project_name=None, project_password_file=None, env_vars=None, init_command='', env_pack=None, app_pack=False, extra_pkgs=None, max_retries=1, library_config=None, shared_fs=False, autolabel=False, autolabel_algorithm='max-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, tune_parameters=None, vine_log_dir=None), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker_options=None, worker_executable='vine_worker', worker_timeout=300, cores=None, gpus=None, memory=None, disk=None, python_env=None, batch_type='local', condor_requirements=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None), provider: ExecutionProvider | None = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), storage_access: List[Staging] | None = None)[source]
Executor to use TaskVine dynamic workflow system
The TaskVineExecutor system utilizes the TaskVine framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the vine_worker program on remote machines to connect to the TaskVineExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.
This Executor sets up configurations for the TaskVine manager, TaskVine factory, and run both in separate processes. Sending tasks and receiving results are done through multiprocessing module native to Python.
- Parameters:
label (str) – A human readable label for the executor, unique with respect to other executors. Default is “TaskVineExecutor”.
worker_launch_method (Union[Literal['provider'], Literal['factory'], Literal['manual']]) – Choose to use Parsl provider, TaskVine factory, or manual user-provided workers to scale workers. Options are among {‘provider’, ‘factory’, ‘manual’}. Default is ‘factory’.
function_exec_mode (Union[Literal['regular'], Literal['serverless']]) – Choose to execute functions with a regular fresh python process or a pre-warmed forked python process. Default is ‘regular’.
manager_config (TaskVineManagerConfig) – Configuration for the TaskVine manager. Default
factory_config (TaskVineFactoryConfig) – Configuration for the TaskVine factory. Use of factory is disabled by default.
provider (ExecutionProvider) – The Parsl provider that will spawn worker processes. Default to spawning one local vine worker process.
storage_access (List[Staging]) – Define Parsl file staging providers for this executor. Default is None.
- __init__(label: str = 'TaskVineExecutor', worker_launch_method: Literal['provider'] | Literal['factory'] | Literal['manual'] = 'factory', function_exec_mode: Literal['regular'] | Literal['serverless'] = 'regular', manager_config: TaskVineManagerConfig = TaskVineManagerConfig(port=0, address=None, project_name=None, project_password_file=None, env_vars=None, init_command='', env_pack=None, app_pack=False, extra_pkgs=None, max_retries=1, library_config=None, shared_fs=False, autolabel=False, autolabel_algorithm='max-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, tune_parameters=None, vine_log_dir=None), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker_options=None, worker_executable='vine_worker', worker_timeout=300, cores=None, gpus=None, memory=None, disk=None, python_env=None, batch_type='local', condor_requirements=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None), provider: ExecutionProvider | None = LocalProvider(cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, nodes_per_block=1, parallelism=1, worker_init=''), storage_access: List[Staging] | None = None)[source]
Methods
__init__
([label, worker_launch_method, ...])create_monitoring_info
(status)Create a monitoring message for each block based on the poll status.
handle_errors
(status)This method is called by the error management infrastructure after a status poll.
Compose the launch command and call scale out
monitor_resources
()Should resource monitoring happen for tasks on running on this executor?
poll_facade
()scale_in
(blocks)Scale in method.
scale_in_facade
(n[, max_idletime])scale_out_facade
(n)Scales out the number of blocks by "blocks"
send_monitoring_info
(status)set_bad_state_and_fail_all
(exception)Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.
shutdown
(*args, **kwargs)Shutdown the executor.
start
()Create submit process and collector thread to create, send, and retrieve Parsl tasks within the TaskVine system.
status
()Return the status of all jobs/blocks currently known to this executor.
submit
(func, resource_specification, *args, ...)Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the TaskVine system.
Attributes
bad_state_is_set
Returns true if this executor is in an irrecoverable error state.
executor_exception
Returns an exception that indicates why this executor is in an irrecoverable state.
hub_address
Address to the Hub for monitoring.
hub_zmq_port
Port to the Hub for monitoring.
label
Count the number of outstanding tasks.
provider
run_dir
Path to the run directory.
run_id
UUID for the enclosing DFK.
status_facade
Return the status of all jobs/blocks of the executor of this poller.
status_polling_interval
Returns the interval, in seconds, at which the status method should be called.
submit_monitoring_radio
Local radio for sending monitoring messages
tasks
- initialize_scaling()[source]
Compose the launch command and call scale out
Scales the workers to the appropriate nodes with provider
- shutdown(*args, **kwargs)[source]
Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the TaskVine system submission.
- start()[source]
Create submit process and collector thread to create, send, and retrieve Parsl tasks within the TaskVine system.
- submit(func, resource_specification, *args, **kwargs)[source]
Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the TaskVine system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the TaskVine task.
- Parameters:
func (function) – Parsl app to be submitted to the TaskVine system
resource_specification (dict) – Dictionary containing relevant info about task. Include information about resources of task, execution mode of task (out of {regular, serverless}).
args (list) – Arguments to the Parsl app
kwargs (dict) – Keyword arguments to the Parsl app