parsl.executors.base.ParslExecutor

class parsl.executors.base.ParslExecutor[source]

Executors are abstractions that represent available compute resources to which you could submit arbitrary App tasks.

This is a metaclass that only enforces concrete implementations of functionality by the child classes.

Can be used as a context manager. On exit, calls self.shutdown() with no arguments and re-raises any thrown exception.

In addition to the listed methods, a ParslExecutor instance must always have a member field:

label: str - a human readable label for the executor, unique

with respect to other executors.

An executor may optionally expose:

storage_access: List[parsl.data_provider.staging.Staging] - a list of staging

providers that will be used for file staging. In the absence of this attribute, or if this attribute is None, then a default value of parsl.data_provider.staging.default_staging will be used by the staging code.

Typechecker note: Ideally storage_access would be declared on executor __init__ methods as List[Staging] - however, lists are by default invariant, not co-variant, and it looks like @typeguard cannot be persuaded otherwise. So if you’re implementing an executor and want to @typeguard the constructor, you’ll have to use List[Any] here.

__init__()[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__()

Initialize self.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

scale_in(blocks)

Scale in method.

scale_out(blocks)

Scale out method.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown()

Shutdown the executor.

start()

Start the executor.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, …)

Submit.

Attributes

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

run_dir

Path to the run directory.

scaling_enabled

Specify if scaling is enabled.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

abstract property bad_state_is_set[source]

Returns true if this executor is in an irrecoverable error state. If this method returns true, :property:executor_exception should contain an exception indicating the cause.

create_monitoring_info(status: Dict[str, parsl.providers.provider_base.JobStatus]) → List[object][source]

Create a monitoring message for each block based on the poll status.

Returns

a list of dictionaries mapping to the info of each block

abstract property error_management_enabled[source]

Indicates whether worker error management is supported by this executor. Worker error management is done externally to the executor. However, the executor must implement certain status handling methods that allow this to function. These methods are:

:method:handle_errors :method:set_bad_state_and_fail_all

The basic idea of worker error management is that an external entity maintains a view of the state of the workers by calling :method:status() which is then processed to detect abnormal conditions. This can be done externally, as well as internally, through :method:handle_errors. If an entity external to the executor detects an abnormal condition, it can notify the executor using :method:set_bad_state_and_fail_all(exception).

Some of the scaffolding needed for implementing error management inside executors, including implementations for the status handling methods above, is available in :class:parsl.executors.status_handling.BlockProviderExecutor, which interested executors should inherit from. Noop versions of methods that are related to status handling and running parsl tasks through workers are implemented by :class:parsl.executors.status_handling.NoStatusHandlingExecutor.

abstract property executor_exception[source]

Returns an exception that indicates why this executor is in an irrecoverable state.

abstract handle_errors(error_handler: parsl.dataflow.job_error_handler.JobErrorHandler, status: Dict[str, parsl.providers.provider_base.JobStatus])bool[source]

This method is called by the error management infrastructure after a status poll. The executor implementing this method is then responsible for detecting abnormal conditions based on the status of submitted jobs. If the executor does not implement any special error handling, this method should return False, in which case a generic error handling scheme will be used. :param error_handler: a reference to the generic error handler calling this method :param status: status of all jobs launched by this executor :return: True if this executor implements custom error handling, or False otherwise

property hub_address[source]

Address to the Hub for monitoring.

property hub_port[source]

Port to the Hub for monitoring.

label: str[source]
monitor_resources()bool[source]

Should resource monitoring happen for tasks on running on this executor?

Parsl resource monitoring conflicts with execution styles which use threads, and can deadlock while running.

This function allows resource monitoring to be disabled per executor implementation.

property run_dir[source]

Path to the run directory.

abstract scale_in(blocks: int) → List[str][source]

Scale in method.

Cause the executor to reduce the number of blocks by count.

We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a coroutine, since scaling tasks can be slow.

Returns

A list of block ids corresponding to the blocks that were removed.

abstract scale_out(blocks: int) → List[str][source]

Scale out method.

We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.

Returns

A list of block ids corresponding to the blocks that were added.

abstract property scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

abstract set_bad_state_and_fail_all(exception: Exception)[source]

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail. The executor is responsible for checking :method:bad_state_is_set() in the :method:submit() method and raising the appropriate exception, which is available through :method:executor_exception().

abstract shutdown()bool[source]

Shutdown the executor.

This includes all attached resources such as workers and controllers.

abstract start() → Optional[List[str]][source]

Start the executor.

Any spin-up operations (for example: starting thread pools) should be performed here.

abstract status() → Dict[str, parsl.providers.provider_base.JobStatus][source]

Return the status of all jobs/blocks currently known to this executor.

Returns

a dictionary mapping block ids (in string) to job status

abstract property status_polling_interval[source]

Returns the interval, in seconds, at which the status method should be called. The assumption here is that, once initialized, an executor’s polling interval is fixed. In practice, at least given the current situation, the executor uses a single task provider and this method is a delegate to the corresponding method in the provider.

Returns

the number of seconds to wait between calls to status() or zero if no polling should be done

abstract submit(func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) → concurrent.futures._base.Future[source]

Submit.

abstract property tasks[source]

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.