parsl.executors.status_handling.BlockProviderExecutor

class parsl.executors.status_handling.BlockProviderExecutor(*, provider: ExecutionProvider | None, block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None])[source]

A base class for executors which scale using blocks.

This base class is intended to help with executors which:

  • use blocks of workers to execute tasks

  • blocks of workers are launched on a batch system through an ExecutionProvider

An implementing class should implement the abstract methods required by ParslExecutor to submit tasks, as well as BlockProviderExecutor abstract methods to provide the executor-specific command to start a block of workers (the _get_launch_command method), and some basic scaling information (outstanding and workers_per_node properties).

This base class provides a scale_out method which will launch new blocks. It does not provide a scale_in method, because scale-in behaviour is not well defined in the Parsl scaling model and so behaviour is left to individual executors.

Parsl scaling will provide scaling between min_blocks and max_blocks by invoking scale_out, but it will not initialize the blocks requested by any init_blocks parameter. Subclasses must implement that behaviour themselves.

__init__(*, provider: ExecutionProvider | None, block_error_handler: bool | Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None])[source]

Methods

__init__(*, provider, block_error_handler)

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(status)

This method is called by the error management infrastructure after a status poll.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

poll_facade()

scale_in(blocks)

Scale in method.

scale_in_facade(n[, max_idletime])

scale_out_facade(n)

Scales out the number of blocks by "blocks"

send_monitoring_info(status)

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown()

Shutdown the executor.

start()

Start the executor.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, ...)

Submit.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_zmq_port

Port to the Hub for monitoring.

label

outstanding

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

provider

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

status_facade

Return the status of all jobs/blocks of the executor of this poller.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

submit_monitoring_radio

Local radio for sending monitoring messages

tasks

workers_per_node

property bad_state_is_set[source]

Returns true if this executor is in an irrecoverable error state. If this method returns true, :property:executor_exception should contain an exception indicating the cause.

create_monitoring_info(status: Dict[str, JobStatus]) Sequence[object][source]

Create a monitoring message for each block based on the poll status.

property executor_exception[source]

Returns an exception that indicates why this executor is in an irrecoverable state.

handle_errors(status: Dict[str, JobStatus]) None[source]

This method is called by the error management infrastructure after a status poll. The executor implementing this method is then responsible for detecting abnormal conditions based on the status of submitted jobs. If the executor does not implement any special error handling, this method should return False, in which case a generic error handling scheme will be used. :param status: status of all jobs launched by this executor

abstract property outstanding: int[source]

This should return the number of tasks that the executor has been given to run (waiting to run, and running now)

poll_facade() None[source]
property provider[source]
scale_in(blocks: int) List[str][source]

Scale in method.

Cause the executor to reduce the number of blocks by count.

The default implementation will kill blocks without regard to their status or whether they are executing tasks. Executors with more nuanced scaling strategies might overload this method to work with that strategy - see the HighThroughputExecutor for an example of that.

Returns:

A list of block ids corresponding to the blocks that were removed.

scale_in_facade(n: int, max_idletime: float | None = None) List[str][source]
scale_out_facade(n: int) List[str][source]

Scales out the number of blocks by “blocks”

send_monitoring_info(status: Dict) None[source]
set_bad_state_and_fail_all(exception: Exception)[source]

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail. The executor is responsible for checking :method:bad_state_is_set() in the :method:submit() method and raising the appropriate exception, which is available through :method:executor_exception().

status() Dict[str, JobStatus][source]

Return the status of all jobs/blocks currently known to this executor.

Returns:

a dictionary mapping block ids (in string) to job status

property status_facade: Dict[str, JobStatus][source]

Return the status of all jobs/blocks of the executor of this poller.

Returns:

a dictionary mapping block ids (in string) to job status

property status_polling_interval[source]

Returns the interval, in seconds, at which the status method should be called. The assumption here is that, once initialized, an executor’s polling interval is fixed. In practice, at least given the current situation, the executor uses a single task provider and this method is a delegate to the corresponding method in the provider.

Returns:

the number of seconds to wait between calls to status() or zero if no polling should be done

property tasks: Dict[object, Future][source]
abstract property workers_per_node: int | float[source]