parsl.executors.FluxExecutor

class parsl.executors.FluxExecutor(provider: ExecutionProvider | None = None, working_dir: str | None = None, label: str = 'FluxExecutor', flux_executor_kwargs: Mapping = {}, flux_path: str | None = None, launch_cmd: str | None = None)[source]

Executor that uses Flux to schedule and run jobs.

Every callable submitted to the executor is wrapped into a Flux job.

This executor requires that there be a Flux installation available locally, and that it can be located either in PATH or through the flux_path argument.

Flux jobs are fairly heavyweight. As of Flux v0.25, a single Flux instance is (on many systems) capped at 50 jobs per second. As such, this executor is not a good fit for use-cases consisting of large numbers of small, fast jobs.

However, Flux is great at handling jobs with large resource requirements, and collections of jobs with varying resource requirements.

Note that due to vendor-specific extensions, on certain Cray machines like ALCF’s Theta or LANL’s Trinitite/Trinity, Flux cannot run applications that use the default MPI library. Generally the only workaround is to recompile with another MPI library like OpenMPI.

This executor acts as a sort of wrapper around a flux.job.FluxExecutor, which can be confusing since both wrapped and wrapper classes share the same name. Whenever possible, the underlying executor is referred by its fully qualified name, flux.job.FluxExecutor.

Parameters:
  • working_dir (str) – Directory in which the executor should place its files, possibly overwriting existing files. If None, generate a unique directory.

  • label (str) – Label for this executor instance.

  • flux_handle_args (collections.abc.Sequence) – Positional arguments to flux.Flux() instance, if any. The first positional argument, url, is provided by this executor.

  • flux_executor_kwargs (collections.abc.Mapping) – Keyword arguments to pass to the underlying flux.job.FluxExecutor() instance, if any. Note that the handle_args keyword argument is provided by this executor, in order to supply the URL of a remote Flux instance.

  • flux_path (str) – Path to flux installation to use, or None to search PATH for flux.

  • launch_cmd (str) – The command to use when launching the executor’s backend. The default command is available as the DEFAULT_LAUNCH_COMMAND attribute. The default command starts a new Flux instance, which may not be desirable if a Flux instance will already be provisioned (this is not likely).

__init__(provider: ExecutionProvider | None = None, working_dir: str | None = None, label: str = 'FluxExecutor', flux_executor_kwargs: Mapping = {}, flux_path: str | None = None, launch_cmd: str | None = None)[source]

Methods

__init__([provider, working_dir, label, ...])

monitor_resources()

Should resource monitoring happen for tasks on running on this executor?

shutdown([wait])

Shut down the executor, causing further calls to submit to fail.

start()

Called when DFK starts the executor when the config is loaded.

submit(func, resource_specification, *args, ...)

Wrap a callable in a Flux job and submit it to Flux.

Attributes

DEFAULT_LAUNCH_CMD

hub_address

Address to the Hub for monitoring.

hub_port

Port to the Hub for monitoring.

label

monitoring_radio

Local radio for sending monitoring messages

radio_mode

run_dir

Path to the run directory.

run_id

UUID for the enclosing DFK.

DEFAULT_LAUNCH_CMD = '{flux} start {python} {manager} {protocol} {hostname} {port}'[source]
shutdown(wait=True)[source]

Shut down the executor, causing further calls to submit to fail.

Parameters:

wait – If True, do not return until all submitted Futures are done.

start()[source]

Called when DFK starts the executor when the config is loaded.

submit(func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any)[source]

Wrap a callable in a Flux job and submit it to Flux.

Parameters:
  • func – The callable to submit as a job to Flux

  • resource_specification

    A mapping defining the resources to allocate to the Flux job.

    Only the following keys are checked for:

    • num_tasks: the number of tasks to launch (MPI ranks for an MPI job), default 1

    • cores_per_task: cores per task, default 1

    • gpus_per_task: gpus per task, default 1

    • num_nodes: if > 0, evenly distribute the allocated cores/gpus across the given number of nodes. Does not give the job exclusive access to those nodes; this option only affects distribution.

  • args – positional arguments for the callable

  • kwargs – keyword arguments for the callable