parsl.dataflow.dflow.DataFlowKernel

class parsl.dataflow.dflow.DataFlowKernel(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]

The DataFlowKernel adds dependency awareness to an existing executor.

It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.

Here is a simplified diagram of what happens internally:

 User             |        DFK         |    Executor
----------------------------------------------------------
                  |                    |
       Task-------+> +Submit           |
     App_Fu<------+--|                 |
                  |  Dependencies met  |
                  |         task-------+--> +Submit
                  |        Ex_Fu<------+----|
__init__(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]

Initialize the DataFlowKernel.

Parameters

config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.

Methods

__init__([config])

Initialize the DataFlowKernel.

add_executors(executors)

atexit_cleanup()

check_staging_inhibited(kwargs)

checkpoint([tasks])

Checkpoint the dfk incrementally to a checkpoint file.

cleanup()

DataFlowKernel cleanup.

handle_app_update(task_record, future)

This function is called as a callback when an AppFuture is in its final state.

handle_exec_update(task_record, future)

This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).

handle_join_update(task_record, inner_app_future)

launch_if_ready(task_record)

launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).

launch_task(task_record)

Handle the actual submission of the task to the executor layer.

load_checkpoints(checkpointDirs)

Load checkpoints from the checkpoint files into a dictionary.

log_task_states()

submit(func, app_args[, executors, cache, ...])

Add task to the dataflow system.

update_task_state(task_record, new_state)

Updates a task record state, and recording an appropriate change to task state counters.

wait_for_current_tasks()

Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed.

wipe_task(task_id)

Remove task with task_id from the internal tasks table

Attributes

config

Returns the fully initialized config that the DFK is actively using.

add_executors(executors)[source]
atexit_cleanup() None[source]
static check_staging_inhibited(kwargs: Dict[str, Any]) bool[source]
checkpoint(tasks: Optional[Sequence[parsl.dataflow.taskrecord.TaskRecord]] = None) str[source]

Checkpoint the dfk incrementally to a checkpoint file.

When called, every task that has been completed yet not checkpointed is checkpointed to a file.

Kwargs:
  • tasks (List of task records)List of task ids to checkpoint. Default=None

    if set to None, we iterate over all tasks held by the DFK.

Note

Checkpointing only works if memoization is enabled

Returns

Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}

cleanup() None[source]

DataFlowKernel cleanup.

This involves releasing all resources explicitly.

If the executors are managed by the DFK, then we call scale_in on each of the executors and call executor.shutdown. Otherwise, executor cleanup is left to the user.

property config: parsl.config.Config[source]

Returns the fully initialized config that the DFK is actively using.

Returns

  • Config object

handle_app_update(task_record: parsl.dataflow.taskrecord.TaskRecord, future: parsl.dataflow.futures.AppFuture) None[source]

This function is called as a callback when an AppFuture is in its final state.

It will trigger post-app processing such as checkpointing.

Parameters
  • task_record – Task record

  • future (Future) – The relevant app future (which should be consistent with the task structure ‘app_fu’ entry

handle_exec_update(task_record: parsl.dataflow.taskrecord.TaskRecord, future: concurrent.futures._base.Future) None[source]

This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).

It will launch retries if necessary, and update the task structure.

Parameters
  • task_record (dict) – Task record

  • future (Future) – The future object corresponding to the task which

  • callback (makes this) –

handle_join_update(task_record: parsl.dataflow.taskrecord.TaskRecord, inner_app_future: parsl.dataflow.futures.AppFuture) None[source]
launch_if_ready(task_record: parsl.dataflow.taskrecord.TaskRecord) None[source]

launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).

This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run.

It is not an error to call launch_if_ready on a task that is not ready to run - launch_if_ready will not incorrectly launch that task.

It is also not an error to call launch_if_ready on a task that has already been launched - launch_if_ready will not re-launch that task.

launch_if_ready is thread safe, so may be called from any thread or callback.

launch_task(task_record: parsl.dataflow.taskrecord.TaskRecord) concurrent.futures._base.Future[source]

Handle the actual submission of the task to the executor layer.

If the app task has the executors attributes not set (default==’all’) the task is launched on a randomly selected executor from the list of executors. This behavior could later be updated to support binding to executors based on user specified criteria.

If the app task specifies a particular set of executors, it will be targeted at those specific executors.

Parameters

task_record – The task record

Returns

Future that tracks the execution of the submitted executable

load_checkpoints(checkpointDirs)[source]

Load checkpoints from the checkpoint files into a dictionary.

The results are used to pre-populate the memoizer’s lookup_table

Kwargs:
  • checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]

Returns

  • dict containing, hashed -> future mappings

log_task_states() None[source]
submit(func, app_args, executors='all', cache=False, ignore_for_cache=None, app_kwargs={}, join=False)[source]

Add task to the dataflow system.

If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.

Parameters

func (-) – A function object

KWargs :
  • app_args : Args to the function

  • executors (list or string)List of executors this call could go to.

    Default=’all’

  • cache (Bool) : To enable memoization or not

  • ignore_for_cache (list) : List of kwargs to be ignored for memoization/checkpointing

  • app_kwargs (dict) : Rest of the kwargs to the fn passed as dict.

Returns

(AppFuture) [DataFutures,]

update_task_state(task_record: parsl.dataflow.taskrecord.TaskRecord, new_state: parsl.dataflow.states.States) None[source]

Updates a task record state, and recording an appropriate change to task state counters.

wait_for_current_tasks() None[source]

Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed. This method will not necessarily wait for any tasks added after cleanup has started (such as data stageout?)

wipe_task(task_id: int) None[source]

Remove task with task_id from the internal tasks table