Checkpointing¶
Large-scale Parsl programs are likely to encounter errors due to node failures, application or environment errors, and myriad other issues. Parsl offers an application-level checkpointing model to improve resilience, fault tolerance, and efficiency.
Note
Checkpointing is only possible for apps which have app caching enabled.
If app caching is disabled in the config Config.app_cache
, checkpointing will
not work.
Parsl follows an incremental checkpointing model, where each checkpoint file contains all results that have been updated since the last checkpoint.
When a Parsl program loads a checkpoint file and is executed, it will use checkpointed results for any apps that have been previously executed. Like app caching, checkpoints use the hash of the app and the invocation input parameters to identify previously computed results. If multiple checkpoints exist for an app (with the same hash) the most recent entry will be used.
Parsl provides four checkpointing modes:
task_exit
: a checkpoint is created each time an app completes or fails (after retries if enabled). This mode minimizes the risk of losing information from completed tasks.>>> from parsl.configs.local_threads import config >>> config.checkpoint_mode = 'task_exit'
periodic
: a checkpoint is created periodically using a user-specified checkpointing interval. Results will be saved to the checkpoint file forall tasks that have completed during this period.
>>> from parsl.configs.local_threads import config >>> config.checkpoint_mode = 'periodic' >>> config.checkpoint_period = "01:00:00"
dfk_exit
: checkpoints are created when Parsl is about to exit. This reduces the risk of losing results due to premature program termination from exceptions, terminate signals, etc. However it is still possible that information might be lost if the program is terminated abruptly (machine failure, SIGKILL, etc.)>>> from parsl.configs.local_threads import config >>> config.checkpoint_mode = 'dfk_exit'
Manual: in addition to these automated checkpointing modes, it is also possible to manually initiate a checkpoint by calling
DataFlowKernel.checkpoint()
in the Parsl program code.>>> import parsl >>> from parsl.configs.local_threads import config >>> dfk = parsl.load(config) >>> .... >>> dfk.checkpoint()
In all cases the checkpoint file is written out to the runinfo/RUN_ID/checkpoint/
directory.
Note
Checkpoint modes periodic
, dfk_exit
, and manual
can interfere with garbage collection.
In these modes task information will be retained after completion, until checkpointing events are triggered.
Creating a checkpoint¶
Automated checkpointing must be explicitly enabled in the Parsl configuration.
There is no need to modify a Parsl program as checkpointing will occur transparently.
In the following example, checkpointing is enabled at task exit. The results of
each invocation of the slow_double
app will be stored in the checkpoint file.
import parsl
from parsl.app.app import python_app
from parsl.configs.local_threads import config
config.checkpoint_mode = 'task_exit'
parsl.load(config)
@python_app(cache=True)
def slow_double(x):
import time
time.sleep(5)
return x * 2
d = []
for i in range(5):
d.append(slow_double(i))
print([d[i].result() for i in range(5)])
Alternatively, manual checkpointing can be used to explictly specify when the checkpoint
file should be saved. The following example shows how manual checkpointing can be used.
Here, the dfk.checkpoint()
function will save the results of the prior invocations
of the slow_double
app.
import parsl
from parsl import python_app
from parsl.configs.local_threads import config
dfk = parsl.load(config)
@python_app(cache=True)
def slow_double(x, sleep_dur=1):
import time
time.sleep(sleep_dur)
return x * 2
N = 5 # Number of calls to slow_double
d = [] # List to store the futures
for i in range(0, N):
d.append(slow_double(i))
# Wait for the results
[i.result() for i in d]
cpt_dir = dfk.checkpoint()
print(cpt_dir) # Prints the checkpoint dir
Resuming from a checkpoint¶
When resuming a program from a checkpoint Parsl allows the user to select
which checkpoint file(s) to use.
Checkpoint files are stored in the runinfo/RUNID/checkpoint
directory.
The example below shows how to resume using all available checkpoints.
Here, the program re-executes the same calls to the slow_double
app
as above and instead of waiting for results to be computed, the values
from the checkpoint file are are immediately returned.
import parsl
from parsl.tests.configs.local_threads import config
from parsl.utils import get_all_checkpoints
config.checkpoint_files = get_all_checkpoints()
parsl.load(config)
# Rerun the same workflow
d = []
for i in range(5):
d.append(slow_double(i))
# wait for results
print([d[i].result() for i in range(5)])