parsl.executors.LowLatencyExecutor

class parsl.executors.LowLatencyExecutor(label='LowLatencyExecutor', provider=LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd=None, address='127.0.0.1', worker_port=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), working_dir=None, worker_debug=False, workers_per_node=1, managed=True)[source]

TODO: docstring for LowLatencyExecutor

__init__(label='LowLatencyExecutor', provider=LocalProvider(channel=LocalChannel(envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs'), cmd_timeout=30, init_blocks=1, launcher=SingleNodeLauncher(debug=True, fail_on_any=False), max_blocks=1, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, worker_init=''), launch_cmd=None, address='127.0.0.1', worker_port=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), working_dir=None, worker_debug=False, workers_per_node=1, managed=True)[source]

Methods

__init__([label, provider, launch_cmd, ...])

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

scale_in(blocks)

Scale in the number of active blocks by specified amount.

scale_out([blocks])

Scales out the number of active workers by the number of blocks specified.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([hub, targets, block])

Shutdown the executor, including all workers and controllers.

start()

Create the Interchange process and connect to it.

status()

Return status of all blocks.

submit(func, resource_specification, *args, ...)

TODO: docstring

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

label

outstanding

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

provider

run_dir

Path to the run directory.

scaling_enabled

Specify if scaling is enabled.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

workers_per_node

scale_in(blocks)[source]

Scale in the number of active blocks by specified amount.

The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530

scale_out(blocks=1)[source]

Scales out the number of active workers by the number of blocks specified.

Parameters

blocks (int) – # of blocks to scale out. Default=1

property scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

shutdown(hub=True, targets='all', block=False)[source]

Shutdown the executor, including all workers and controllers.

This is not implemented.

Kwargs:
  • hub (Bool): Whether the hub should be shutdown, Default:True,

  • targets (list of ints| ‘all’): List of block id’s to kill, Default:’all’

  • block (Bool): To block for confirmations or not

start()[source]

Create the Interchange process and connect to it.

submit(func, resource_specification, *args, **kwargs)[source]

TODO: docstring