parsl.executors.FluxExecutor

class parsl.executors.FluxExecutor(provider: Optional[parsl.providers.provider_base.ExecutionProvider] = None, managed: bool = True, working_dir: Optional[str] = None, label: str = 'FluxExecutor', flux_executor_kwargs: collections.abc.Mapping = {}, flux_path: Optional[str] = None, launch_cmd: Optional[str] = None)[source]

Executor that uses Flux to schedule and run jobs.

Every callable submitted to the executor is wrapped into a Flux job.

This executor requires that there be a Flux installation available locally, and that it can be located either in PATH or through the flux_path argument.

Flux jobs are fairly heavyweight. As of Flux v0.25, a single Flux instance is (on many systems) capped at 50 jobs per second. As such, this executor is not a good fit for use-cases consisting of large numbers of small, fast jobs.

However, Flux is great at handling jobs with large resource requirements, and collections of jobs with varying resource requirements.

Note that due to vendor-specific extensions, on certain Cray machines like ALCF’s Theta or LANL’s Trinitite/Trinity, Flux cannot run applications that use the default MPI library. Generally the only workaround is to recompile with another MPI library like OpenMPI.

This executor acts as a sort of wrapper around a flux.job.FluxExecutor, which can be confusing since both wrapped and wrapper classes share the same name. Whenever possible, the underlying executor is referred by its fully qualified name, flux.job.FluxExecutor.

Parameters
  • managed (bool) – If this executor is managed by the DFK or externally handled.

  • working_dir (str) – Directory in which the executor should place its files, possibly overwriting existing files. If None, generate a unique directory.

  • label (str) – Label for this executor instance.

  • flux_handle_args (collections.abc.Sequence) – Positional arguments to flux.Flux() instance, if any. The first positional argument, url, is provided by this executor.

  • flux_executor_kwargs (collections.abc.Mapping) – Keyword arguments to pass to the underlying flux.job.FluxExecutor() instance, if any. Note that the handle_args keyword argument is provided by this executor, in order to supply the URL of a remote Flux instance.

  • flux_path (str) – Path to flux installation to use, or None to search PATH for flux.

  • launch_cmd (str) – The command to use when launching the executor’s backend. The default command is available as the DEFAULT_LAUNCH_COMMAND attribute. The default command starts a new Flux instance, which may not be desirable if a Flux instance will already be provisioned (this is not likely).

__init__(provider: Optional[parsl.providers.provider_base.ExecutionProvider] = None, managed: bool = True, working_dir: Optional[str] = None, label: str = 'FluxExecutor', flux_executor_kwargs: collections.abc.Mapping = {}, flux_path: Optional[str] = None, launch_cmd: Optional[str] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__([provider, managed, working_dir, …])

Initialize self.

create_monitoring_info(status)

Create a monitoring message for each block based on the poll status.

handle_errors(error_handler, status)

This method is called by the error management infrastructure after a status poll.

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

scale_in(*args, **kwargs)

Scale in method.

scale_out()

Scale out method.

scaling_enabled()

Specify if scaling is enabled.

set_bad_state_and_fail_all(exception)

Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail.

shutdown([wait])

Shut down the executor, causing further calls to submit to fail.

start()

Called when DFK starts the executor when the config is loaded.

status()

Return the status of all jobs/blocks currently known to this executor.

submit(func, resource_specification, *args, …)

Wrap a callable in a Flux job and submit it to Flux.

Attributes

DEFAULT_LAUNCH_CMD

bad_state_is_set

Returns true if this executor is in an irrecoverable error state.

error_management_enabled

Indicates whether worker error management is supported by this executor.

executor_exception

Returns an exception that indicates why this executor is in an irrecoverable state.

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

provider

run_dir

Path to the run directory.

status_polling_interval

Returns the interval, in seconds, at which the status method should be called.

tasks

Contains a dictionary mapping task IDs to the corresponding Future objects for all tasks that have been submitted to this executor.

DEFAULT_LAUNCH_CMD = '{flux} start {python} {manager} {protocol} {hostname} {port}'[source]
label: str[source]
scale_in(*args, **kwargs)[source]

Scale in method.

Cause the executor to reduce the number of blocks by count.

We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a coroutine, since scaling tasks can be slow.

Returns

A list of block ids corresponding to the blocks that were removed.

scale_out()[source]

Scale out method.

We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.

Returns

A list of block ids corresponding to the blocks that were added.

scaling_enabled()[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

shutdown(wait=True)[source]

Shut down the executor, causing further calls to submit to fail.

Parameters

wait – If True, do not return until all submitted Futures are done.

start()[source]

Called when DFK starts the executor when the config is loaded.

submit(func: collections.abc.Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any)[source]

Wrap a callable in a Flux job and submit it to Flux.

Parameters
  • func – The callable to submit as a job to Flux

  • resource_specification

    A mapping defining the resources to allocate to the Flux job.

    Only the following keys are checked for:

    • num_tasks: the number of tasks to launch (MPI ranks for an MPI job), default 1

    • cores_per_task: cores per task, default 1

    • gpus_per_task: gpus per task, default 1

    • num_nodes: if > 0, evenly distribute the allocated cores/gpus across the given number of nodes. Does not give the job exclusive access to those nodes; this option only affects distribution.

  • args – positional arguments for the callable

  • kwargs – keyword arguments for the callable