parsl.executors.taskvine.TaskVineExecutor

class parsl.executors.taskvine.TaskVineExecutor(label: str = 'TaskVineExecutor', worker_launch_method: Literal['provider'] | Literal['factory'] | Literal['manual'] = 'factory', function_exec_mode: Literal['regular'] | Literal['serverless'] = 'regular', manager_config: TaskVineManagerConfig = TaskVineManagerConfig(port=0, address=None, project_name=None, project_password_file=None, env_vars=None, init_command='', env_pack=None, app_pack=False, extra_pkgs=None, max_retries=1, library_config=None, shared_fs=False, autolabel=False, autolabel_algorithm='max-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker_options=None, worker_executable='vine_worker', worker_timeout=300, cores=None, gpus=None, memory=None, disk=None, python_env=None, batch_type='local', condor_requirements=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None), provider: ExecutionProvider | None = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), storage_access: List[Staging] | None = None)[source]

Executor to use TaskVine dynamic workflow system

The TaskVineExecutor system utilizes the TaskVine framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the vine_worker program on remote machines to connect to the TaskVineExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.

This Executor sets up configurations for the TaskVine manager, TaskVine factory, and run both in separate processes. Sending tasks and receiving results are done through multiprocessing module native to Python.

Parameters:
  • label (str) – A human readable label for the executor, unique with respect to other executors. Default is “TaskVineExecutor”.

  • worker_launch_method (Union[Literal['provider'], Literal['factory'], Literal['manual']]) – Choose to use Parsl provider, TaskVine factory, or manual user-provided workers to scale workers. Options are among {‘provider’, ‘factory’, ‘manual’}. Default is ‘factory’.

  • function_exec_mode (Union[Literal['regular'], Literal['serverless']]) – Choose to execute functions with a regular fresh python process or a pre-warmed forked python process. Default is ‘regular’.

  • manager_config (TaskVineManagerConfig) – Configuration for the TaskVine manager. Default

  • factory_config (TaskVineFactoryConfig) – Configuration for the TaskVine factory. Use of factory is disabled by default.

  • provider (ExecutionProvider) – The Parsl provider that will spawn worker processes. Default to spawning one local vine worker process.

  • storage_access (List[Staging]) – Define Parsl file staging providers for this executor. Default is None.

__init__(label: str = 'TaskVineExecutor', worker_launch_method: Literal['provider'] | Literal['factory'] | Literal['manual'] = 'factory', function_exec_mode: Literal['regular'] | Literal['serverless'] = 'regular', manager_config: TaskVineManagerConfig = TaskVineManagerConfig(port=0, address=None, project_name=None, project_password_file=None, env_vars=None, init_command='', env_pack=None, app_pack=False, extra_pkgs=None, max_retries=1, library_config=None, shared_fs=False, autolabel=False, autolabel_algorithm='max-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker_options=None, worker_executable='vine_worker', worker_timeout=300, cores=None, gpus=None, memory=None, disk=None, python_env=None, batch_type='local', condor_requirements=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None), provider: ExecutionProvider | None = LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), storage_access: List[Staging] | None = None)[source]

Methods

__init__([label, worker_launch_method, ...])

atexit_cleanup()

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(status)

This method is called by the error management infrastructure after a status poll.

initialize_scaling()

Compose the launch command and call scale out

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

poll_facade()

scale_in(count)

Scale in method.

scale_in_facade(n[, max_idletime])

scale_out([blocks])

Scales out the number of blocks by "blocks"

scale_out_facade(n)

send_monitoring_info(status)

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown(*args, **kwargs)

Shutdown the executor.

start()

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the TaskVine system.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, ...)

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the TaskVine system.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

label

monitoring_radio

Local radio for sending monitoring messages

outstanding

Count the number of outstanding tasks.

provider

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

status_facade

Return the status of all jobs/blocks of the executor of this poller.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

workers_per_node

atexit_cleanup()[source]
initialize_scaling()[source]

Compose the launch command and call scale out

Scales the workers to the appropriate nodes with provider

property outstanding: int[source]

Count the number of outstanding tasks.

radio_mode: str = 'filesystem'[source]
scale_in(count: int) List[str][source]

Scale in method. Cancel a given number of blocks

shutdown(*args, **kwargs)[source]

Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the TaskVine system submission.

start()[source]

Create submit process and collector thread to create, send, and retrieve Parsl tasks within the TaskVine system.

submit(func, resource_specification, *args, **kwargs)[source]

Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the TaskVine system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the TaskVine task.

Parameters:
  • func (function) – Parsl app to be submitted to the TaskVine system

  • resource_specification (dict) – Dictionary containing relevant info about task. Include information about resources of task, execution mode of task (out of {regular, serverless}).

  • args (list) – Arguments to the Parsl app

  • kwargs (dict) – Keyword arguments to the Parsl app

property workers_per_node: int | float[source]