parsl.dataflow.dflow.DataFlowKernel

class parsl.dataflow.dflow.DataFlowKernel(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]

The DataFlowKernel adds dependency awareness to an existing executor.

It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.

Here is a simplified diagram of what happens internally:

 User             |        DFK         |    Executor
----------------------------------------------------------
                  |                    |
       Task-------+> +Submit           |
     App_Fu<------+--|                 |
                  |  Dependencies met  |
                  |         task-------+--> +Submit
                  |        Ex_Fu<------+----|
__init__(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]

Initialize the DataFlowKernel.

Parameters

config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.

Methods

__init__([config])

Initialize the DataFlowKernel.

add_executors(executors)

atexit_cleanup()

check_staging_inhibited(kwargs)

checkpoint([tasks])

Checkpoint the dfk incrementally to a checkpoint file.

cleanup()

DataFlowKernel cleanup.

handle_app_update(task_record, future)

This function is called as a callback when an AppFuture is in its final state.

handle_exec_update(task_record, future)

This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).

handle_join_update(task_record, inner_app_future)

launch_if_ready(task_record)

launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).

launch_task(task_record, executable, *args, …)

Handle the actual submission of the task to the executor layer.

load_checkpoints(checkpointDirs)

Load checkpoints from the checkpoint files into a dictionary.

log_task_states()

sanitize_and_wrap(args, kwargs)

This function should be called when all dependencies have completed.

submit(func, app_args[, executors, cache, …])

Add task to the dataflow system.

wait_for_current_tasks()

Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed.

wipe_task(task_id)

Remove task with task_id from the internal tasks table

Attributes

config

Returns the fully initialized config that the DFK is actively using.

add_executors(executors)[source]
atexit_cleanup()[source]
static check_staging_inhibited(kwargs)[source]
checkpoint(tasks=None)[source]

Checkpoint the dfk incrementally to a checkpoint file.

When called, every task that has been completed yet not checkpointed is checkpointed to a file.

Kwargs:
  • tasks (List of task ids)List of task ids to checkpoint. Default=None

    if set to None, we iterate over all tasks held by the DFK.

Note

Checkpointing only works if memoization is enabled

Returns

Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}

cleanup()[source]

DataFlowKernel cleanup.

This involves releasing all resources explicitly.

If the executors are managed by the DFK, then we call scale_in on each of the executors and call executor.shutdown. Otherwise, executor cleanup is left to the user.

property config[source]

Returns the fully initialized config that the DFK is actively using.

Returns

  • config (dict)

handle_app_update(task_record, future)[source]

This function is called as a callback when an AppFuture is in its final state.

It will trigger post-app processing such as checkpointing.

Parameters
  • task_record – Task record

  • future (Future) – The relevant app future (which should be consistent with the task structure ‘app_fu’ entry

handle_exec_update(task_record, future)[source]

This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).

It will launch retries if necessary, and update the task structure.

Parameters
  • task_record (dict) – Task record

  • future (Future) – The future object corresponding to the task which

  • this callback (makes) –

handle_join_update(task_record, inner_app_future)[source]
launch_if_ready(task_record)[source]

launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).

This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run.

It is not an error to call launch_if_ready on a task that is not ready to run - launch_if_ready will not incorrectly launch that task.

launch_if_ready is thread safe, so may be called from any thread or callback.

launch_task(task_record, executable, *args, **kwargs)[source]

Handle the actual submission of the task to the executor layer.

If the app task has the executors attributes not set (default==’all’) the task is launched on a randomly selected executor from the list of executors. This behavior could later be updated to support binding to executors based on user specified criteria.

If the app task specifies a particular set of executors, it will be targeted at those specific executors.

Parameters
  • task_record – The task record

  • executable (callable) – A callable object

  • args (list of positional args) –

  • kwargs (arbitrary keyword arguments) –

Returns

Future that tracks the execution of the submitted executable

load_checkpoints(checkpointDirs)[source]

Load checkpoints from the checkpoint files into a dictionary.

The results are used to pre-populate the memoizer’s lookup_table

Kwargs:
  • checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]

Returns

  • dict containing, hashed -> future mappings

log_task_states()[source]
sanitize_and_wrap(args, kwargs)[source]

This function should be called when all dependencies have completed.

It will rewrite the arguments for that task, replacing each Future with the result of that future.

If the user hid futures a level below, we will not catch it, and will (most likely) result in a type error.

Parameters
  • args (List) – Positional args to app function

  • kwargs (Dict) – Kwargs to app function

Returns

a rewritten args list a rewritten kwargs dict pairs of exceptions, task ids from any Futures which stored exceptions rather than results.

submit(func, app_args, executors='all', cache=False, ignore_for_cache=None, app_kwargs={}, join=False)[source]

Add task to the dataflow system.

If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.

Parameters

func (-) – A function object

KWargs :
  • app_args : Args to the function

  • executors (list or string)List of executors this call could go to.

    Default=’all’

  • cache (Bool) : To enable memoization or not

  • ignore_for_cache (list) : List of kwargs to be ignored for memoization/checkpointing

  • app_kwargs (dict) : Rest of the kwargs to the fn passed as dict.

Returns

(AppFuture) [DataFutures,]

wait_for_current_tasks()[source]

Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed. This method will not necessarily wait for any tasks added after cleanup has started (such as data stageout?)

wipe_task(task_id)[source]

Remove task with task_id from the internal tasks table