parsl.dataflow.dflow.DataFlowKernel¶
-
class
parsl.dataflow.dflow.
DataFlowKernel
(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]¶ The DataFlowKernel adds dependency awareness to an existing executor.
It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.
Here is a simplified diagram of what happens internally:
User | DFK | Executor ---------------------------------------------------------- | | Task-------+> +Submit | App_Fu<------+--| | | Dependencies met | | task-------+--> +Submit | Ex_Fu<------+----|
-
__init__
(config=Config(app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, executors=[ThreadPoolExecutor(label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None)], garbage_collect=True, initialize_logging=True, internal_tasks_max_threads=10, max_idletime=120.0, monitoring=None, retries=0, retry_handler=None, run_dir='runinfo', strategy='simple', usage_tracking=False))[source]¶ Initialize the DataFlowKernel.
- Parameters
config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.
Methods
__init__
([config])Initialize the DataFlowKernel.
add_executors
(executors)check_staging_inhibited
(kwargs)checkpoint
([tasks])Checkpoint the dfk incrementally to a checkpoint file.
cleanup
()DataFlowKernel cleanup.
handle_app_update
(task_record, future)This function is called as a callback when an AppFuture is in its final state.
handle_exec_update
(task_record, future)This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).
handle_join_update
(task_record, inner_app_future)launch_if_ready
(task_record)launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).
launch_task
(task_record, executable, *args, …)Handle the actual submission of the task to the executor layer.
load_checkpoints
(checkpointDirs)Load checkpoints from the checkpoint files into a dictionary.
sanitize_and_wrap
(args, kwargs)This function should be called when all dependencies have completed.
submit
(func, app_args[, executors, cache, …])Add task to the dataflow system.
Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed.
wipe_task
(task_id)Remove task with task_id from the internal tasks table
Attributes
Returns the fully initialized config that the DFK is actively using.
-
checkpoint
(tasks=None)[source]¶ Checkpoint the dfk incrementally to a checkpoint file.
When called, every task that has been completed yet not checkpointed is checkpointed to a file.
- Kwargs:
- tasks (List of task ids)List of task ids to checkpoint. Default=None
if set to None, we iterate over all tasks held by the DFK.
Note
Checkpointing only works if memoization is enabled
- Returns
Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
-
cleanup
()[source]¶ DataFlowKernel cleanup.
This involves releasing all resources explicitly.
If the executors are managed by the DFK, then we call scale_in on each of the executors and call executor.shutdown. Otherwise, executor cleanup is left to the user.
-
property
config
[source]¶ Returns the fully initialized config that the DFK is actively using.
- Returns
config (dict)
-
handle_app_update
(task_record, future)[source]¶ This function is called as a callback when an AppFuture is in its final state.
It will trigger post-app processing such as checkpointing.
- Parameters
task_record – Task record
future (Future) – The relevant app future (which should be consistent with the task structure ‘app_fu’ entry
-
handle_exec_update
(task_record, future)[source]¶ This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).
It will launch retries if necessary, and update the task structure.
- Parameters
task_record (dict) – Task record
future (Future) – The future object corresponding to the task which
this callback (makes) –
-
launch_if_ready
(task_record)[source]¶ launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).
This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run.
It is not an error to call launch_if_ready on a task that is not ready to run - launch_if_ready will not incorrectly launch that task.
launch_if_ready is thread safe, so may be called from any thread or callback.
-
launch_task
(task_record, executable, *args, **kwargs)[source]¶ Handle the actual submission of the task to the executor layer.
If the app task has the executors attributes not set (default==’all’) the task is launched on a randomly selected executor from the list of executors. This behavior could later be updated to support binding to executors based on user specified criteria.
If the app task specifies a particular set of executors, it will be targeted at those specific executors.
- Parameters
task_record – The task record
executable (callable) – A callable object
args (list of positional args) –
kwargs (arbitrary keyword arguments) –
- Returns
Future that tracks the execution of the submitted executable
-
load_checkpoints
(checkpointDirs)[source]¶ Load checkpoints from the checkpoint files into a dictionary.
The results are used to pre-populate the memoizer’s lookup_table
- Kwargs:
checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]
- Returns
dict containing, hashed -> future mappings
-
sanitize_and_wrap
(args, kwargs)[source]¶ This function should be called when all dependencies have completed.
It will rewrite the arguments for that task, replacing each Future with the result of that future.
If the user hid futures a level below, we will not catch it, and will (most likely) result in a type error.
- Parameters
args (List) – Positional args to app function
kwargs (Dict) – Kwargs to app function
- Returns
a rewritten args list a rewritten kwargs dict pairs of exceptions, task ids from any Futures which stored exceptions rather than results.
-
submit
(func, app_args, executors='all', cache=False, ignore_for_cache=None, app_kwargs={}, join=False)[source]¶ Add task to the dataflow system.
If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.
- Parameters
func (-) – A function object
- KWargs :
app_args : Args to the function
- executors (list or string)List of executors this call could go to.
Default=’all’
cache (Bool) : To enable memoization or not
ignore_for_cache (list) : List of kwargs to be ignored for memoization/checkpointing
app_kwargs (dict) : Rest of the kwargs to the fn passed as dict.
- Returns
(AppFuture) [DataFutures,]
-