parsl.dataflow.dflow.DataFlowKernel
- class parsl.dataflow.dflow.DataFlowKernel(config: Config)[source]
The DataFlowKernel adds dependency awareness to an existing executor.
It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.
Here is a simplified diagram of what happens internally:
User | DFK | Executor ---------------------------------------------------------- | | Task-------+> +Submit | App_Fu<------+--| | | Dependencies met | | task-------+--> +Submit | Ex_Fu<------+----|
- __init__(config: Config) None [source]
Initialize the DataFlowKernel.
- Parameters:
config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.
Methods
__init__
(config)Initialize the DataFlowKernel.
add_executors
(executors)check_staging_inhibited
(kwargs)checkpoint
([tasks])Checkpoint the dfk incrementally to a checkpoint file.
cleanup
()DataFlowKernel cleanup.
default_std_autopath
(taskrecord, kw)handle_app_update
(task_record, future)This function is called as a callback when an AppFuture is in its final state.
handle_exec_update
(task_record, future)This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).
handle_join_update
(task_record, inner_app_future)launch_if_ready
(task_record)Schedules a task record for re-inspection to see if it is ready for launch and for launch if it is ready.
launch_task
(task_record)Handle the actual submission of the task to the executor layer.
load_checkpoints
(checkpointDirs)Load checkpoints from the checkpoint files into a dictionary.
submit
(func, app_args, executors, cache, ...)Add task to the dataflow system.
update_task_state
(task_record, new_state)Updates a task record state, and recording an appropriate change to task state counters.
Waits for all tasks in the task list to be completed, by waiting for their AppFuture to be completed.
wipe_task
(task_id)Remove task with task_id from the internal tasks table
Attributes
Returns the fully initialized config that the DFK is actively using.
- add_executors(executors: Sequence[ParslExecutor]) None [source]
- checkpoint(tasks: Sequence[TaskRecord] | None = None) str [source]
Checkpoint the dfk incrementally to a checkpoint file.
When called, every task that has been completed yet not checkpointed is checkpointed to a file.
- Kwargs:
- tasks (List of task records)List of task ids to checkpoint. Default=None
if set to None, we iterate over all tasks held by the DFK.
Note
Checkpointing only works if memoization is enabled
- Returns:
Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
- cleanup() None [source]
DataFlowKernel cleanup.
This involves releasing all resources explicitly.
We call scale_in on each of the executors and call executor.shutdown.
- property config: Config[source]
Returns the fully initialized config that the DFK is actively using.
- Returns:
Config object
- handle_app_update(task_record: TaskRecord, future: AppFuture) None [source]
This function is called as a callback when an AppFuture is in its final state.
It will trigger post-app processing such as checkpointing.
- Parameters:
task_record – Task record
future (Future) – The relevant app future (which should be consistent with the task structure ‘app_fu’ entry
- handle_exec_update(task_record: TaskRecord, future: Future) None [source]
This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).
It will launch retries if necessary, and update the task structure.
- Parameters:
task_record (dict) – Task record
future (Future) – The future object corresponding to the task which
callback (makes this) –
- launch_if_ready(task_record: TaskRecord) None [source]
Schedules a task record for re-inspection to see if it is ready for launch and for launch if it is ready. The call will return immediately.
This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run.
It is not an error to call launch_if_ready on a task that is not ready to run - launch_if_ready will not incorrectly launch that task.
launch_if_ready is thread safe, so may be called from any thread or callback.
- launch_task(task_record: TaskRecord) Future [source]
Handle the actual submission of the task to the executor layer.
- Parameters:
task_record – The task record
- Returns:
Future that tracks the execution of the submitted function
- load_checkpoints(checkpointDirs: Sequence[str] | None) Dict[str, Future] [source]
Load checkpoints from the checkpoint files into a dictionary.
The results are used to pre-populate the memoizer’s lookup_table
- Kwargs:
checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]
- Returns:
dict containing, hashed -> future mappings
- submit(func: Callable, app_args: Sequence[Any], executors: str | Sequence[str], cache: bool, ignore_for_cache: Sequence[str] | None, app_kwargs: Dict[str, Any], join: bool = False) AppFuture [source]
Add task to the dataflow system.
If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.
- Parameters:
func (-) – A function object
- KWargs :
app_args : Args to the function
- executors (list or string)List of executors this call could go to.
Default=’all’
cache (Bool) : To enable memoization or not
ignore_for_cache (sequence) : List of kwargs to be ignored for memoization/checkpointing
app_kwargs (dict) : Rest of the kwargs to the fn passed as dict.
- Returns:
(AppFuture) [DataFutures,]
- update_task_state(task_record: TaskRecord, new_state: States) None [source]
Updates a task record state, and recording an appropriate change to task state counters.