`Developer Guide

Parsl is a Parallel Scripting Library, designed to enable efficient workflow execution.

Importing

To get all the required functionality, we suggest importing the library as follows:

>>> import parsl
>>> from parsl import *

Logging

Following the general logging philosophy of python libraries, by default Parsl doesn’t log anything. However the following helper functions are provided for logging:

  1. set_stream_logger
    This sets the logger to the StreamHandler. This is quite useful when working from a Jupyter notebook.
  2. set_file_logger
    This sets the logging to a file. This is ideal for reporting issues to the dev team.
parsl.set_stream_logger(name='parsl', level=10, format_string=None)[source]

Add a stream log handler.

Parameters:
  • name (-) – Set the logger name.
  • level (-) – Set to logging.DEBUG by default.
  • format_string (-) – Set to None by default.
Returns:

  • None

parsl.set_file_logger(filename, name='parsl', level=10, format_string=None)[source]

Add a stream log handler.

Parameters:
  • filename (-) – Name of the file to write logs to
  • name (-) – Logger name
  • level (-) – Set the logging level.
  • format_string (-) – Set the format string
Returns:

  • None

Apps

Apps are parallelized functions that execute independent of the control flow of the main python interpreter. We have two main types of Apps : PythonApps and BashApps. These are subclassed from AppBase.

AppBase

This is the base class that defines the two external facing functions that an App must define. The __init__ () which is called when the interpreter sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

class parsl.app.app.AppBase(func, data_flow_kernel=None, walltime=60, executors='all', cache=False, exec_type='bash')[source]

This is the base class that defines the two external facing functions that an App must define.

The __init__ () which is called when the interpreter sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

PythonApp

Concrete subclass of AppBase that implements the Python App functionality.

BashApp

Concrete subclass of AppBase that implements the Bash App functionality.

Futures

Futures are returned as proxies to a parallel execution initiated by a call to an App. We have two kinds of futures in Parsl: AppFutures and DataFutures.

AppFutures

class parsl.dataflow.futures.AppFuture(parent, tid=None, stdout=None, stderr=None)[source]

An AppFuture points at a Future returned from an Executor.

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

__init__(parent, tid=None, stdout=None, stderr=None)[source]

Initialize the AppFuture.

Parameters:parent (-) – The parent future if one exists A default value of None should be passed in if app is not launched
KWargs:
  • tid (Int) : Task id should be any unique identifier. Now Int.
  • stdout (str) : Stdout file of the app.
    Default: None
  • stderr (str) : Stderr file of the app.
    Default: None
__repr__()[source]

Return repr(self).

add_done_callback(fn)[source]

Attaches a callable that will be called when the future finishes.

Parameters:fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.
cancel()[source]

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

cancelled()[source]

Return True if the future was cancelled.

done()[source]

Check if the future is done.

If a parent is set, we return the status of the parent. else, there is no parent assigned, meaning the status is False.

Returns:If the future has successfully resolved. - False : Pending resolution
Return type:
  • True
exception(timeout=None)[source]

Return the exception raised by the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError – If the future was cancelled.
  • TimeoutError – If the future didn’t finish executing before the given timeout.
parent_callback(executor_fu)[source]

Callback from executor future to update the parent.

Parameters:executor_fu (-) – Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

result(timeout=None)[source]

Result.

Waits for the result of the AppFuture KWargs: timeout (int): Timeout in seconds

running()[source]

Return True if the future is currently executing.

update_parent(fut)[source]

Add a callback to the parent to update the state.

This handles the case where the user has called result on the AppFuture before the parent exists.

DataFutures

class parsl.app.futures.DataFuture(fut, file_obj, parent=None, tid=None)[source]

A datafuture points at an AppFuture.

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

__init__(fut, file_obj, parent=None, tid=None)[source]

Construct the DataFuture object.

If the file_obj is a string convert to a File.

Parameters:
  • fut (-) – AppFuture that this DataFuture will track
  • file_obj (-) – Something representing file(s)
Kwargs:
  • parent ()
  • tid (task_id) : Task id that this DataFuture tracks
__repr__()[source]

Return repr(self).

add_done_callback(fn)[source]

Attaches a callable that will be called when the future finishes.

Parameters:fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.
cancel()[source]

Cancel the task that this DataFuture is tracking.

Note: This may not work

cancelled()[source]

Return True if the future was cancelled.

done()[source]

Return True of the future was cancelled or finished executing.

exception(timeout=None)[source]

Return the exception raised by the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError – If the future was cancelled.
  • TimeoutError – If the future didn’t finish executing before the given timeout.
filename[source]

Filepath of the File object this datafuture represents.

filepath[source]

Filepath of the File object this datafuture represents.

parent_callback(parent_fu)[source]

Callback from executor future to update the parent.

Parameters:parent_fu (-) – Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

result(timeout=None)[source]

A blocking call that returns either the result or raises an exception.

Assumptions : A DataFuture always has a parent AppFuture. The AppFuture does callbacks when setup.

Kwargs:
  • timeout (int): Timeout in seconds
Returns:

  • If App completed successfully returns the filepath.

Raises:
  • Exception raised by app if failed.
running()[source]

Return True if the future is currently executing.

tid[source]

Returns the task_id of the task that will resolve this DataFuture.

Exceptions

class parsl.app.errors.ParslError[source]

Base class for all exceptions.

Only to be invoked when a more specific error is not available.

class parsl.app.errors.NotFutureError[source]

A non future item was passed to a function that expected a future.

This is basically a type error.

class parsl.app.errors.InvalidAppTypeError[source]

An invalid app type was requested from the @App decorator.

class parsl.app.errors.AppException[source]

An error raised during execution of an app.

What this exception contains depends entirely on context

class parsl.app.errors.AppBadFormatting(reason, exitcode, retries=None)[source]

An error raised during formatting of a bash function.

What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.AppFailure(reason, exitcode, retries=None)[source]

An error raised during execution of an app.

What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.MissingOutputs(reason, outputs)[source]

Error raised at the end of app execution due to missing output files.

Contains: reason(string) outputs(List of strings/files..)

class parsl.app.errors.DependencyError(dependent_exceptions, reason, outputs)[source]

Error raised at the end of app execution due to missing output files.

Contains: reason(string) outputs(List of strings/files..)

class parsl.dataflow.error.DataFlowException[source]

Base class for all exceptions.

Only to be invoked when only a more specific error is not available.

class parsl.dataflow.error.DuplicateTaskError[source]

Raised by the DataFlowKernel when it finds that a job with the same task-id has been launched before.

class parsl.dataflow.error.MissingFutError[source]

Raised when a particular future is not found within the dataflowkernel’s datastructures.

Deprecated.

DataFlowKernel

class parsl.dataflow.dflow.DataFlowKernel(config=Config( app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, data_management_max_threads=10, db_logger_config=None, executors=[ThreadPoolExecutor( label='threads', managed=True, max_threads=2, storage_access=[], thread_name_prefix='', working_dir=None )], lazy_errors=True, retries=0, run_dir='runinfo', strategy='simple', usage_tracking=True ))[source]

The DataFlowKernel adds dependency awareness to an existing executor.

It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.

Here is a simplified diagram of what happens internally:

 User             |        DFK         |    Executor
----------------------------------------------------------
                  |                    |
       Task-------+> +Submit           |
     App_Fu<------+--|                 |
                  |  Dependencies met  |
                  |         task-------+--> +Submit
                  |        Ex_Fu<------+----|
__init__(config=Config( app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, data_management_max_threads=10, db_logger_config=None, executors=[ThreadPoolExecutor( label='threads', managed=True, max_threads=2, storage_access=[], thread_name_prefix='', working_dir=None )], lazy_errors=True, retries=0, run_dir='runinfo', strategy='simple', usage_tracking=True ))[source]

Initialize the DataFlowKernel.

Parameters:config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.
__weakref__[source]

list of weak references to the object (if defined)

checkpoint(tasks=None)[source]

Checkpoint the dfk incrementally to a checkpoint file.

When called, every task that has been completed yet not checkpointed is checkpointed to a file.

Kwargs:
  • tasks (List of task ids) : List of task ids to checkpoint. Default=None
    if set to None, we iterate over all tasks held by the DFK.

Note

Checkpointing only works if memoization is enabled

Returns:Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
cleanup()[source]

DataFlowKernel cleanup.

This involves killing resources explicitly and sending die messages to IPP workers.

If the executors are managed (created by the DFK), then we call scale_in on each of the executors and call executor.shutdown. Otherwise, we do nothing, and executor cleanup is left to the user.

config[source]

Returns the fully initialized config that the DFK is actively using.

DO NOT update.

Returns:
  • config (dict)
handle_update(task_id, future, memo_cbk=False)[source]

This function is called only as a callback from a task being done.

Move done task from runnable -> done Move newly doable tasks from pending -> runnable , and launch

Parameters:
  • task_id (string) – Task id which is a uuid string
  • future (Future) – The future object corresponding to the task which
  • this callback (makes) –
KWargs:
memo_cbk(Bool) : Indicates that the call is coming from a memo update, that does not require additional memo updates.
launch_task(task_id, executable, *args, **kwargs)[source]

Handle the actual submission of the task to the executor layer.

If the app task has the executors attributes not set (default==’all’) the task is launched on a randomly selected executor from the list of executors. This behavior could later be updated to support binding to executors based on user specified criteria.

If the app task specifies a particular set of executors, it will be targeted at those specific executors.

Parameters:
  • task_id (uuid string) – A uuid string that uniquely identifies the task
  • executable (callable) – A callable object
  • args (list of positional args) –
  • kwargs (arbitrary keyword arguments) –
Returns:

Future that tracks the execution of the submitted executable

load_checkpoints(checkpointDirs)[source]

Load checkpoints from the checkpoint files into a dictionary.

The results are used to pre-populate the memoizer’s lookup_table

Kwargs:
  • checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]
Returns:
  • dict containing, hashed -> future mappings
sanitize_and_wrap(task_id, args, kwargs)[source]

This function should be called ONLY when all the futures we track have been resolved.

If the user hid futures a level below, we will not catch it, and will (most likely) result in a type error .

Parameters:
  • task_id (uuid str) – Task id
  • func (Function) – App function
  • args (List) – Positional args to app function
  • kwargs (Dict) – Kwargs to app function
Returns:

partial Function evaluated with all dependencies in args, kwargs and kwargs[‘inputs’] evaluated.

submit(func, *args, executors='all', fn_hash=None, cache=False, **kwargs)[source]

Add task to the dataflow system.

If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.

>>> IF all deps are met:
>>>   send to the runnable queue and launch the task
>>> ELSE:
>>>   post the task in the pending queue
Parameters:
  • func (-) – A function object
  • *args (-) –

    Args to the function

KWargs :
  • executors (list or string) : List of executors this call could go to.
    Default=’all’
  • fn_hash (Str) : Hash of the function and inputs
    Default=None
  • cache (Bool) : To enable memoization or not
  • kwargs (dict) : Rest of the kwargs to the fn passed as dict.
Returns:(AppFuture) [DataFutures,]

Executors

Executors are abstractions that represent available compute resources to which you could submit arbitrary App tasks. An executor initialized with an Execution Provider can dynamically scale with the resources requirements of the workflow.

We currently have thread pools for local execution, remote workers from ipyparallel for executing on high throughput systems such as campus clusters, and a Swift/T executor for HPC systems.

ParslExecutor (Abstract Base Class)

class parsl.executors.base.ParslExecutor[source]

Define the strict interface for all Executor classes.

This is a metaclass that only enforces concrete implementations of functionality by the child classes.

In addition to the listed methods, a ParslExecutor instance must always have a member field:

label: str - a human readable label for the executor, unique
with respect to other executors.
__init__[source]

Initialize self. See help(type(self)) for accurate signature.

scale_in(count)[source]

Scale in method.

Cause the executor to reduce the number of blocks by count.

We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a coroutine, since scaling tasks can be slow.

scale_out(*args, **kwargs)[source]

Scale out method.

We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.

scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

submit(*args, **kwargs)[source]

Submit.

We haven’t yet decided on what the args to this can be, whether it should just be func, args, kwargs or be the partially evaluated fn

ThreadPoolExecutor

class parsl.executors.threads.ThreadPoolExecutor(label='threads', max_threads=2, thread_name_prefix='', storage_access=None, working_dir=None, managed=True)[source]

A thread-based executor.

Parameters:
  • max_threads (int) – Number of threads. Default is 2.
  • thread_name_prefix (string) – Thread name prefix (only supported in python v3.6+).
  • storage_access (list of Scheme) – Specifications for accessing data this executor remotely. Multiple `Scheme`s are not yet supported.
  • managed (bool) – If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise, this is managed by the user.
__init__(label='threads', max_threads=2, thread_name_prefix='', storage_access=None, working_dir=None, managed=True)[source]

Initialize self. See help(type(self)) for accurate signature.

scale_in(blocks)[source]

Scale in the number of active blocks by specified amount.

This method is not implemented for threads and will raise the error if called.

Raises:NotImplemented exception
scale_out(workers=1)[source]

Scales out the number of active workers by 1.

This method is notImplemented for threads and will raise the error if called.

Raises:NotImplemented exception
scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

submit(*args, **kwargs)[source]

Submits work to the thread pool.

This method is simply pass through and behaves like a submit call as described here Python docs:

IPyParallelExecutor

class parsl.executors.ipp.IPyParallelExecutor(provider=LocalProvider( channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), init_blocks=4, label='local', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, max_blocks=10, min_blocks=0, nodes_per_block=1, parallelism=1, script_dir='parsl_scripts', tasks_per_node=1, walltime='00:15:00' ), label='ipp', engine_file='~/.ipython/profile_default/security/ipcontroller-engine.json', engine_dir='.', working_dir=None, controller=Controller( interfaces=None, ipython_dir='~/.ipython', log=True, mode='auto', port=None, port_range=None, profile='default', public_ip=None, reuse=False ), container_image=None, storage_access=None, engine_debug_level=None, managed=True)[source]

The IPython Parallel executor.

This executor uses IPythonParallel’s pilot execution system to manage multiple processes running locally or remotely.

Parameters:
  • provider (ExecutionProvider) – Provider to access computation resources. Can be one of EC2Provider, AzureProvider, Cobalt, Condor, GoogleCloud, GridEngine, Jetstream, Local, GridEngine, Slurm, or Torque.
  • label (str) – Label for this executor instance.
  • controller (Controller) – Which Controller instance to use. Default is Controller().
  • container_image (str) – Launch tasks in a container using this docker image. If set to None, no container is used. Default is None.
  • engine_file (str) – Path to json engine file that will be used to compose ipp launch commands at scaling events. Default is ‘~/.ipython/profile_default/security/ipcontroller-engine.json’.
  • engine_dir (str) – Alternative to above, specify the engine_dir
  • working_dir (str) – Directory where input data should be staged to.
  • storage_access (list of Scheme) – Specifications for accessing data this executor remotely. Multiple `Scheme`s are not yet supported.
  • managed (bool) – If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise, this is managed by the user.
  • engine_debug_level (int | str) – Sets engine logging to specified debug level. Choices: (0, 10, 20, 30, 40, 50, ‘DEBUG’, ‘INFO’, ‘WARN’, ‘ERROR’, ‘CRITICAL’)
:param .. note:::

Some deficiencies with this executor are:

  1. Ipengine’s execute one task at a time. This means one engine per core is necessary to exploit the full parallelism of a node.
  2. No notion of remaining walltime.
  3. Lack of throttling means tasks could be queued up on a worker.
__init__(provider=LocalProvider( channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), init_blocks=4, label='local', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, max_blocks=10, min_blocks=0, nodes_per_block=1, parallelism=1, script_dir='parsl_scripts', tasks_per_node=1, walltime='00:15:00' ), label='ipp', engine_file='~/.ipython/profile_default/security/ipcontroller-engine.json', engine_dir='.', working_dir=None, controller=Controller( interfaces=None, ipython_dir='~/.ipython', log=True, mode='auto', port=None, port_range=None, profile='default', public_ip=None, reuse=False ), container_image=None, storage_access=None, engine_debug_level=None, managed=True)[source]

Initialize self. See help(type(self)) for accurate signature.

compose_launch_cmd(filepath, engine_dir, container_image)[source]

Reads the json contents from filepath and uses that to compose the engine launch command.

Parameters:
  • filepath – Path to the engine file
  • engine_dir – CWD for the engines
scale_in(blocks)[source]

Scale in the number of active blocks by the specified number.

scale_out(*args, **kwargs)[source]

Scales out the number of active workers by 1.

This method is notImplemented for threads and will raise the error if called.

scaling_enabled[source]

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

submit(*args, **kwargs)[source]

Submits work to the thread pool.

This method is simply pass through and behaves like a submit call as described here Python docs:

Returns:Future

Swift/Turbine Executor

class parsl.executors.swift_t.TurbineExecutor(label='turbine', storage_access=None, working_dir=None, managed=True)[source]

The Turbine executor.

Bypass the Swift/T language and run on top off the Turbine engines in an MPI environment.

Here is a diagram

             |  Data   |  Executor   |   IPC      | External Process(es)
             |  Flow   |             |            |
        Task | Kernel  |             |            |
      +----->|-------->|------------>|outgoing_q -|-> Worker_Process
      |      |         |             |            |    |         |
Parsl<---Fut-|         |             |            |  result   exception
          ^  |         |             |            |    |         |
          |  |         |   Q_mngmnt  |            |    V         V
          |  |         |    Thread<--|incoming_q<-|--- +---------+
          |  |         |      |      |            |
          |  |         |      |      |            |
          +----update_fut-----+
__init__(label='turbine', storage_access=None, working_dir=None, managed=True)[source]

Initialize the thread pool.

Trying to implement the emews model.

_queue_management_worker()[source]

Listen to the queue for task status messages and handle them.

Depending on the message, tasks will be updated with results, exceptions, or updates. It expects the following messages:

{
   "task_id" : <task_id>
   "result"  : serialized result object, if task succeeded
   ... more tags could be added later
}

{
   "task_id" : <task_id>
   "exception" : serialized exception object, on failure
}

We do not support these yet, but they could be added easily.

{
   "task_id" : <task_id>
   "cpu_stat" : <>
   "mem_stat" : <>
   "io_stat"  : <>
   "started"  : tstamp
}

The None message is a die request.

_start_queue_management_thread()[source]

Method to start the management thread as a daemon.

Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.

scale_in(workers)[source]

Scale in the number of active blocks by specified amount.

This method is not implemented for turbine and will raise an error if called.

Raises:NotImplementedError
scale_out(workers=1)[source]

Scales out the number of active workers by 1.

This method is not implemented for threads and will raise the error if called. This would be nice to have, and can be done

Raises:NotImplementedError
shutdown()[source]

Shutdown method, to kill the threads and workers.

submit(func, *args, **kwargs)[source]

Submits work to the the outgoing_q.

The outgoing_q is an external process listens on this queue for new work. This method is simply pass through and behaves like a submit call as described here Python docs:

Parameters:
  • func (-) – Callable function
  • *args (-) –

    List of arbitrary positional arguments.

Kwargs:
  • **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns:Future
parsl.executors.swift_t.runner(incoming_q, outgoing_q)[source]

This is a function that mocks the Swift-T side.

It listens on the the incoming_q for tasks and posts returns on the outgoing_q.

Parameters:
  • incoming_q (-) – The queue to listen on
  • outgoing_q (-) – Queue to post results on

The messages posted on the incoming_q will be of the form :

{
   "task_id" : <uuid.uuid4 string>,
   "buffer"  : serialized buffer containing the fn, args and kwargs
}

If None is received, the runner will exit.

Response messages should be of the form:

{
   "task_id" : <uuid.uuid4 string>,
   "result"  : serialized buffer containing result
   "exception" : serialized exception object
}

On exiting the runner will post None to the outgoing_q

Execution Providers

Execution providers are responsible for managing execution resources that have a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have LRMs (schedulers) such as Slurm, Torque/PBS, Condor and Cobalt. Clouds, on the other hand, have API interfaces that allow much more fine-grained composition of an execution environment. An execution provider abstracts these types of resources and provides a single uniform interface to them.

ExecutionProvider (Base)

class libsubmit.providers.provider_base.ExecutionProvider[source]

Define the strict interface for all Execution Provider

                      +------------------
                      |
script_string ------->|  submit
     id      <--------|---+
                      |
[ ids ]       ------->|  status
[statuses]   <--------|----+
                      |
[ ids ]       ------->|  cancel
[cancel]     <--------|----+
                      |
[True/False] <--------|  scaling_enabled
                      |
                      +-------------------
__weakref__[source]

list of weak references to the object (if defined)

cancel(job_ids)[source]

Cancels the resources identified by the job_ids provided by the user.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from cancelling the job which can be True, False

Raises:
  • ExecutionProviderException or its subclasses
scaling_enabled[source]

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
status(job_ids)[source]

Get the status of a list of jobs identified by the job identifiers returned from the submit request.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.

Raises:
  • ExecutionProviderException or its subclasses
submit(command, blocksize, job_name='parsl.auto')[source]

The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).

Args :
  • command (str) : The bash command string to be executed.
  • blocksize (int) : Blocksize to be requested
KWargs:
  • job_name (str) : Human friendly name to be assigned to the job request
Returns:

  • A job identifier, this could be an integer, string etc

Raises:
  • ExecutionProviderException or its subclasses

Local

class libsubmit.providers.LocalProvider(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='local', script_dir='parsl_scripts', tasks_per_node=1, nodes_per_block=1, launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', parallelism=1)[source]

Local Execution Provider

This provider is used to provide execution resources from the localhost.

Parameters:
  • min_blocks (int) – Minimum number of blocks to maintain.
  • max_blocks (int) – Maximum number of blocks to maintain.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
__init__(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='local', script_dir='parsl_scripts', tasks_per_node=1, nodes_per_block=1, launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', parallelism=1)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

scaling_enabled[source]

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
status(job_ids)[source]

Get the status of a list of jobs identified by their ids.

Parameters:job_ids (-) – List of identifiers for the jobs
Returns:
  • List of status codes.
submit(command, blocksize, job_name='parsl.auto')[source]

Submits the command onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1:
1/tasks_per_node is provisioned
If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Parameters:
  • command (-) – (String) Commandline invocation to be made on the remote side.
  • blocksize (-) – (float) - Not really used for local
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:At capacity, cannot provision more - job_id: (string) Identifier for the job
Return type:
  • None

Slurm

class libsubmit.providers.SlurmProvider(partition, label='slurm', channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', overrides='', cmd_timeout=10, launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

Slurm Execution Provider

This provider uses sbatch to submit, squeue for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.

Parameters:
  • partition (str) – Slurm partition to request blocks from.
  • label (str) – Label for this provider.
  • channel (Channel) – Channel for accessing this provider. Possible channels include LocalChannel (the default), SSHChannel, or SSHInteractiveLoginChannel.
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • nodes_per_block (int) – Nodes to provision per block.
  • tasks_per_node (int) – Tasks to run per node.
  • min_blocks (int) – Minimum number of blocks to maintain.
  • max_blocks (int) – Maximum number of blocks to maintain.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • overrides (str) – String to prepend to the #SBATCH blocks in the submit script to the scheduler.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include SingleNodeLauncher (the default), SrunLauncher, or AprunLauncher
__init__(partition, label='slurm', channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', overrides='', cmd_timeout=10, launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

submit(command, blocksize, job_name='parsl.auto')[source]

Submit the command as a slurm job of blocksize parallel elements.

Parameters:
  • command (str) – Command to be made on the remote side.
  • blocksize (int) – Not implemented.
  • job_name (str) – Name for the job (must be unique).
Returns:

If at capacity, returns None; otherwise, a string identifier for the job

Return type:

None or str

Cobalt

class libsubmit.providers.CobaltProvider(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='cobalt', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, overrides='', launcher=<libsubmit.launchers.launchers.AprunLauncher object>, cmd_timeout=10)[source]

Cobalt Execution Provider

This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) jobs. Theo script to be used is created from a template file in this same module.

Parameters:
  • channel (Channel) – Channel for accessing this provider. Possible channels include LocalChannel (the default), SSHChannel, or SSHInteractiveLoginChannel.
  • label (str) – Label for this provider.
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • nodes_per_block (int) – Nodes to provision per block.
  • tasks_per_node (int) – Tasks to run per node.
  • min_blocks (int) – Minimum number of blocks to maintain.
  • max_blocks (int) – Maximum number of blocks to maintain.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • account (str) – Account that the job will be charged against.
  • queue (str) – Torque queue to request blocks from.
  • overrides (str) – String to append to the Torque submit script on the scheduler.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include AprunLauncher (the default) or, SingleNodeLauncher
__init__(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='cobalt', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, overrides='', launcher=<libsubmit.launchers.launchers.AprunLauncher object>, cmd_timeout=10)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

submit(command, blocksize, job_name='parsl.auto')[source]

Submits the command onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer

If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Parameters:
  • command (-) – (String) Commandline invocation to be made on the remote side.
  • blocksize (-) – (float)
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:At capacity, cannot provision more - job_id: (string) Identifier for the job
Return type:
  • None

Condor

class libsubmit.providers.CondorProvider(channel=None, label='condor', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, environment=None, script_dir='parsl_scripts', project='', overrides='', walltime='00:10:00', worker_setup='', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, requirements='')[source]

HTCondor Execution Provider.

Parameters:
  • channel (Channel) – Channel for accessing this provider. Possible channels include LocalChannel (the default), SSHChannel, or SSHInteractiveLoginChannel.
  • label (str) – Label for this provider.
  • nodes_per_block (int) – Nodes to provision per block.
  • tasks_per_node (int) – Workers to start per node
  • init_blocks (int) – Number of blocks to provision at time of initialization
  • min_blocks (int) – Minimum number of blocks to maintain
  • max_blocks (int) – Maximum number of blocks to maintain.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
  • environment (dict of str) – A dictionary of environmant variable name and value pairs which will be set before running a task.
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • project (str) – Project which the job will be charged against
  • overrides (str) – String to add specific condor attributes to the HTCondor submit script.
  • worker_setup (str) – Command to be run before running a task.
  • requirements (str) – Condor requirements.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include SingleNodeLauncher (the default),
__init__(channel=None, label='condor', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, environment=None, script_dir='parsl_scripts', project='', overrides='', walltime='00:10:00', worker_setup='', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>, requirements='')[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the jobs specified by a list of job IDs.

Parameters:job_ids (list of str) – The job IDs to cancel.
Returns:Each entry in the list will be True if the job is cancelled succesfully, otherwise False.
Return type:list of bool
current_capacity[source]

Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }

scaling_enabled[source]

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
status(job_ids)[source]

Get the status of a list of jobs identified by their ids.

Parameters:job_ids (list of int) – Identifiers of jobs for which the status will be returned.
Returns:Status codes for the requested jobs.
Return type:List of int
submit(command, blocksize, job_name='parsl.auto')[source]

Submits the command onto an Local Resource Manager job of blocksize parallel elements.

example file with the complex case of multiple submits per job:
Universe =vanilla output = out.$(Cluster).$(Process) error = err.$(Cluster).$(Process) log = log.$(Cluster) leave_in_queue = true executable = test.sh queue 5 executable = foo queue 1

$ condor_submit test.sub Submitting job(s)…… 5 job(s) submitted to cluster 118907. 1 job(s) submitted to cluster 118908.

Parameters:
  • command (str) – Command to execute
  • blocksize (int) – Number of blocks to request.
  • job_name (str) – Job name prefix.
Returns:

None if at capacity and cannot provision more; otherwise the identifier for the job.

Return type:

None or str

Torque

class libsubmit.providers.TorqueProvider(channel, account=None, queue=None, overrides='', label='torque', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=<libsubmit.launchers.launchers.AprunLauncher object>, walltime='00:20:00')[source]

Torque Execution Provider

This provider uses sbatch to submit, squeue for status, and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.

Parameters:
  • channel (Channel) – Channel for accessing this provider. Possible channels include LocalChannel (the default), SSHChannel, or SSHInteractiveLoginChannel.
  • account (str) – Account the job will be charged against.
  • queue (str) – Torque queue to request blocks from.
  • label (str) – Label for this provider.
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • nodes_per_block (int) – Nodes to provision per block.
  • tasks_per_node (int) – Tasks to run per node.
  • init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
  • min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
  • max_blocks (int) – Maximum number of blocks to maintain.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • overrides (str) – String to prepend to the Torque submit script.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include AprunLauncher (the default), or SingleNodeLauncher,
__init__(channel, account=None, queue=None, overrides='', label='torque', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=<libsubmit.launchers.launchers.AprunLauncher object>, walltime='00:20:00')[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

submit(command, blocksize, job_name='parsl.auto')[source]

Submits the command onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer

If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Parameters:
  • command (-) – (String) Commandline invocation to be made on the remote side.
  • blocksize (-) – (float)
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:At capacity, cannot provision more - job_id: (string) Identifier for the job
Return type:
  • None

GridEngine

class libsubmit.providers.GridEngineProvider(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='grid_engine', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', overrides='', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

A provider for the Grid Engine scheduler.

Parameters:
  • channel (Channel) – Channel for accessing this provider. Possible channels include LocalChannel (the default), SSHChannel, or SSHInteractiveLoginChannel.
  • label (str) – Label for this provider.
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • nodes_per_block (int) – Nodes to provision per block.
  • tasks_per_node (int) – Tasks to run per node.
  • min_blocks (int) – Minimum number of blocks to maintain.
  • max_blocks (int) – Maximum number of blocks to maintain.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • overrides (str) – String to prepend to the #SBATCH blocks in the submit script to the scheduler.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include SingleNodeLauncher (the default),
__init__(channel=LocalChannel( envs={}, script_dir='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs/.scripts', userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/latest/docs' ), label='grid_engine', script_dir='parsl_scripts', nodes_per_block=1, tasks_per_node=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', overrides='', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the resources identified by the job_ids provided by the user.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from cancelling the job which can be True, False

Raises:
  • ExecutionProviderException or its subclasses
get_configs(command)[source]

Compose a dictionary with information for writing the submit script.

submit(command='', blocksize=1, job_name='parsl.auto')[source]

The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).

Args :
  • command (str) : The bash command string to be executed.
  • blocksize (int) : Blocksize to be requested
KWargs:
  • job_name (str) : Human friendly name to be assigned to the job request
Returns:

  • A job identifier, this could be an integer, string etc

Raises:
  • ExecutionProviderException or its subclasses

Amazon Web Services

class libsubmit.providers.AWSProvider(image_id, label='ec2', init_blocks=1, min_blocks=0, max_blocks=10, tasks_per_node=1, nodes_per_block=1, parallelism=1, overrides='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_name=None, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

A provider for using Amazon Elastic Compute Cloud (EC2) resources.

One of 3 methods are required to authenticate: keyfile, profile or environment variables. If neither keyfile or profile are set, the following environment variables must be set: AWS_ACCESS_KEY_ID (the access key for your AWS account), AWS_SECRET_ACCESS_KEY (the secret key for your AWS account), and (optionaly) the AWS_SESSION_TOKEN (the session key for your AWS account).

Parameters:
  • image_id (str) – Identification of the Amazon Machine Image (AMI).
  • label (str) – Label for this provider.
  • overrides (str) – String to append to the Userdata script executed in the cloudinit phase of instance initialization.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • key_file (str) – Path to json file that contains ‘AWSAccessKeyId’ and ‘AWSSecretKey’.
  • nodes_per_block (int) – This is always 1 for ec2. Nodes to provision per block.
  • tasks_per_node (int) – Tasks to run per node.
  • profile (str) – Profile to be used from the standard aws config file ~/.aws/config.
  • nodes_per_block – Nodes to provision per block. Default is 1.
  • init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
  • min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
  • max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
  • instance_type (str) – EC2 instance type. Instance types comprise varying combinations of CPU, memory, . storage, and networking capacity For more information on possible instance types,. see here Default is ‘t2.small’.
  • region (str) – Amazon Web Service (AWS) region to launch machines. Default is ‘us-east-2’.
  • key_name (str) – Name of the AWS private key (.pem file) that is usually generated on the console to allow SSH access to the EC2 instances. This is mostly used for debugging.
  • spot_max_bid (float) – Maximum bid price (if requesting spot market machines).
  • iam_instance_profile_arn (str) – Launch instance with a specific role.
  • state_file (str) – Path to the state file from a previous run to re-use.
  • walltime – Walltime requested per block in HH:MM:SS. This option is not currently honored by this provider.
  • launcher (Launcher) – Launcher for this provider. Possible launchers include SingleNodeLauncher (the default), SrunLauncher, or AprunLauncher
__init__(image_id, label='ec2', init_blocks=1, min_blocks=0, max_blocks=10, tasks_per_node=1, nodes_per_block=1, parallelism=1, overrides='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_name=None, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', launcher=<libsubmit.launchers.launchers.SingleNodeLauncher object>)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancel the jobs specified by a list of job ids.

Parameters:job_ids (list of str) – List of of job identifiers
Returns:Each entry in the list will contain False if the operation fails. Otherwise, the entry will be True.
Return type:list of bool
config_route_table(vpc, internet_gateway)[source]

Configure route table for Virtual Private Cloud (VPC).

Parameters:
  • vpc (dict) – Representation of the VPC (created by create_vpc()).
  • internet_gateway (dict) – Representation of the internet gateway (created by create_vpc()).
create_session()[source]

Create a session.

First we look in self.key_file for a path to a json file with the credentials. The key file should have ‘AWSAccessKeyId’ and ‘AWSSecretKey’.

Next we look at self.profile for a profile name and try to use the Session call to automatically pick up the keys for the profile from the user default keys file ~/.aws/config.

Finally, boto3 will look for the keys in environment variables: AWS_ACCESS_KEY_ID: The access key for your AWS account. AWS_SECRET_ACCESS_KEY: The secret key for your AWS account. AWS_SESSION_TOKEN: The session key for your AWS account. This is only needed when you are using temporary credentials. The AWS_SECURITY_TOKEN environment variable can also be used, but is only supported for backwards compatibility purposes. AWS_SESSION_TOKEN is supported by multiple AWS SDKs besides python.

create_vpc()[source]

Create and configure VPC

We create a VPC with CIDR 10.0.0.0/16, which provides up to 64,000 instances.

We attach a subnet for each availability zone within the region specified in the config. We give each subnet an ip range like 10.0.X.0/20, which is large enough for approx. 4000 instances.

Security groups are configured in function security_group.

current_capacity[source]

Returns the current blocksize.

get_instance_state(instances=None)[source]

Get states of all instances on EC2 which were started by this file.

initialize_boto_client()[source]

Initialize the boto client.

read_state_file(state_file)[source]

Read the state file, if it exists.

If this script has been run previously, resource IDs will have been written to a state file On starting a run, a state file will be looked for before creating new infrastructure. Information on VPCs, security groups, and subnets are saved, as well as running instances and their states.

AWS has a maximum number of VPCs per region per account, so we do not want to clutter users’ AWS accounts with security groups and VPCs that will be used only once.

scaling_enabled[source]

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
security_group(vpc)[source]

Create and configure a new security group.

Allows all ICMP in, all TCP and UDP in within VPC.

This security group is very open. It allows all incoming ping requests on all ports. It also allows all outgoing traffic on all ports. This can be limited by changing the allowed port ranges.

Parameters:vpc (VPC instance) – VPC in which to set up security group.
show_summary()[source]

Print human readable summary of current AWS state to log and to console.

shut_down_instance(instances=None)[source]

Shut down a list of instances, if provided.

If no instance is provided, the last instance started up will be shut down.

spin_up_instance(command, job_name)[source]

Start an instance in the VPC in the first available subnet.

N instances will be started if nodes_per_block > 1. Not supported. We only do 1 node per block.

Parameters:
  • command (str) – Command string to execute on the node.
  • job_name (str) – Name associated with the instances.
status(job_ids)[source]

Get the status of a list of jobs identified by their ids.

Parameters:job_ids (list of str) – Identifiers for the jobs.
Returns:The status codes of the requsted jobs.
Return type:list of int
submit(command='sleep 1', blocksize=1, job_name='parsl.auto')[source]

Submit the command onto a freshly instantiated AWS EC2 instance.

Submit returns an ID that corresponds to the task that was just submitted.

Parameters:
  • command (str) – Command to be invoked on the remote side.
  • blocksize (int) – Number of blocks requested.
  • job_name (str) – Prefix for the job name.
Returns:

If at capacity, None will be returned. Otherwise, the job identifier will be returned.

Return type:

None or str

teardown()[source]

Teardown the EC2 infastructure.

Terminate all EC2 instances, delete all subnets, delete security group, delete VPC, and reset all instance variables.

write_state_file()[source]

Save information that must persist to a file.

We do not want to create a new VPC and new identical security groups, so we save information about them in a file between runs.

Azure

class libsubmit.providers.AzureProvider(subscription_id, username, password, label='azure', template_file='template.json', init_blocks=1, min_blocks=0, max_blocks=1, nodes_per_block=1, state_file=None)[source]

A provider for using Azure resources.

Parameters:
  • profile (str) – Profile to be used if different from the standard Azure config file ~/.azure/config.
  • template_file (str) – Location of template file for Azure instance. Default is ‘templates/template.json’.
  • walltime (str) – Walltime requested per block in HH:MM:SS.
  • azure_template_file (str) – Path to the template file for the Azure instance.
  • init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
  • min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
  • max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
  • nodes_per_block (int) – Nodes to provision per block. Default is 1.
__init__(subscription_id, username, password, label='azure', template_file='template.json', init_blocks=1, min_blocks=0, max_blocks=1, nodes_per_block=1, state_file=None)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancel jobs specified by a list of job ids.

Parameters:of str (list) – List of identifiers of jobs which should be canceled.
Returns:For each entry, True if the cancel operation is successful, otherwise False.
Return type:list of bool
status(job_ids)[source]

Get the status of a list of jobs identified by their ids.

Parameters:job_ids (list of str) – Identifiers for the jobs.
Returns:Status codes for each requested job.
Return type:list of int
submit(command='sleep 1', blocksize=1, job_name='parsl.auto')[source]

Submit command to an Azure instance.

Submit returns an ID that corresponds to the task that was just submitted.

Parameters:
  • command (str) – Command to be invoked on the remote side.
  • blocksize (int) – Number of blocks requested.
  • job_name (str) – Prefix for job name.
Returns:

If at capacity (no more can be provisioned), None is returned. Otherwise, an identifier for the job is returned.

Return type:

None or str

Google Cloud Platform

class libsubmit.providers.GoogleCloudProvider(project_id, key_file, region, os_project, os_family, label='google_cloud', google_version='v1', instance_type='n1-standard-1', script_dir='parsl_scripts', init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1)[source]

A provider for using resources from the Google Compute Engine.

Parameters:
  • project_id (str) – Project ID from Google compute engine.
  • key_file (str) – Path to authorization private key json file. This is required for auth. A new one can be generated here: https://console.cloud.google.com/apis/credentials
  • region (str) – Region in which to start instances
  • os_project (str) – OS project code for Google compute engine.
  • os_family (str) – OS family to request.
  • label (str) – A label for this executor. Default is ‘google_cloud’.
  • google_version (str) – Google compute engine version to use. Possibilies include ‘v1’ (default) or ‘beta’.
  • instance_type (str) – ‘n1-standard-1’,
  • script_dir (str) – Relative or absolute path to a directory where intermediate scripts are placed.
  • init_blocks (int) – Number of blocks to provision immediately. Default is 1.
  • min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
  • max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
  • parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
:param .. code:: python: +——————

script_string ——->| submit
id <——–|—+

[ ids ] ——->| status [statuses] <——–|—-+


[ ids ] ——->| cancel [cancel] <——–|—-+


[True/False] <——–| scaling_enabled

+——————-

__init__(project_id, key_file, region, os_project, os_family, label='google_cloud', google_version='v1', instance_type='n1-standard-1', script_dir='parsl_scripts', init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1)[source]

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)[source]

Cancels the resources identified by the job_ids provided by the user.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from cancelling the job which can be True, False

Raises:
  • ExecutionProviderException or its subclasses
status(job_ids)[source]

Get the status of a list of jobs identified by the job identifiers returned from the submit request.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.

Raises:
  • ExecutionProviderException or its subclasses
submit(command='', blocksize=1, job_name='parsl.auto')[source]

The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot.

Args :
  • command (str) : The bash command string to be executed.
  • blocksize (int) : Blocksize to be requested
KWargs:
  • job_name (str) : Human friendly name to be assigned to the job request
Returns:

  • A job identifier, this could be an integer, string etc

Raises:
  • ExecutionProviderException or its subclasses

Kubernetes

class libsubmit.providers.KubernetesProvider(config, channel=None)[source]

Kubernetes execution provider:

TODO: put in a config

__init__(config, channel=None)[source]

Initialize the Kubernetes execution provider class

Parameters:Config (-) – Dictionary with all the config options.
KWargs :
  • channel (channel object) : default=None A channel object
__repr__()[source]

Return repr(self).

cancel(job_ids)[source]

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

scaling_enabled[source]

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
status(job_ids)[source]

Get the status of a list of jobs identified by the job identifiers returned from the submit request.

Parameters:

job_ids (-) – A list of job identifiers

Returns:

  • A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.

Raises:
  • ExecutionProviderExceptions or its subclasses
submit(cmd_string, blocksize, job_name='parsl.auto')[source]

Submit a job

Parameters:
  • cmd_string (-) – (String) - Name of the container to initiate
  • blocksize (-) – (float) - Number of replicas
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:At capacity, cannot provision more - job_id: (string) Identifier for the job
Return type:
  • None

Channels

For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance, some resources may allow access to their job schedulers from only their login-nodes, which require you to authenticate on through SSH, GSI-SSH and sometimes even require two-factor authentication. Channels are simple abstractions that enable the ExecutionProvider component to talk to the resource managers of compute facilities. The simplest Channel, LocalChannel, simply executes commands locally on a shell, while the SshChannel authenticates you to remote systems.

class libsubmit.channels.channel_base.Channel[source]

Define the interface to all channels. Channels are usually called via the execute_wait function. For channels that execute remotely, a push_file function allows you to copy over files.

                      +------------------
                      |
cmd, wtime    ------->|  execute_wait
(ec, stdout, stderr)<-|---+
                      |
cmd, wtime    ------->|  execute_no_wait
(ec, stdout, stderr)<-|---+
                      |
src, dst_dir  ------->|  push_file
   dst_path  <--------|----+
                      |
dst_script_dir <------|  script_dir
                      |
                      +-------------------
__weakref__[source]

list of weak references to the object (if defined)

close()[source]

Closes the channel. Clean out any auth credentials.

Parameters:None
Returns:Bool
execute_no_wait(cmd, walltime, envs={}, *args, **kwargs)[source]

Optional. THis is infrequently used.

Parameters:
  • cmd (-) – Command string to execute over the channel
  • walltime (-) – Timeout in seconds
KWargs:
  • envs (dict) : Environment variables to push to the remote side
Returns:
  • (exit_code(None), stdout, stderr) (int, io_thing, io_thing)
execute_wait(cmd, walltime, envs={}, *args, **kwargs)[source]

Executes the cmd, with a defined walltime.

Parameters:
  • cmd (-) – Command string to execute over the channel
  • walltime (-) – Timeout in seconds
KWargs:
  • envs (dict) : Environment variables to push to the remote side
Returns:
  • (exit_code, stdout, stderr) (int, string, string)
push_file(source, dest_dir)[source]

Channel will take care of moving the file from source to the destination directory

Parameters:
  • source (string) – Full filepath of the file to be moved
  • dest_dir (string) – Absolute path of the directory to move to
Returns:

destination_path (string)

script_dir[source]

This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.

Parameters:None (-) –
Returns:
  • Channel script dir

LocalChannel

class libsubmit.channels.LocalChannel(userhome='.', envs={}, script_dir='./.scripts', **kwargs)[source]

This is not even really a channel, since opening a local shell is not heavy and done so infrequently that they do not need a persistent channel

__init__(userhome='.', envs={}, script_dir='./.scripts', **kwargs)[source]

Initialize the local channel. script_dir is required by set to a default.

KwArgs:
  • userhome (string): (default=’.’) This is provided as a way to override and set a specific userhome
  • envs (dict) : A dictionary of env variables to be set when launching the shell
  • script_dir (string): (default=”./.scripts”) Directory to place scripts
close()[source]

There’s nothing to close here, and this really doesn’t do anything

Returns:
  • False, because it really did not “close” this channel.
execute_no_wait(cmd, walltime, envs={})[source]

Synchronously execute a commandline string on the shell.

Parameters:
  • cmd (-) – Commandline string to execute
  • walltime (-) – walltime in seconds, this is not really used now.
Returns:

Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string

Return type:

  • retcode

Raises:

None.

execute_wait(cmd, walltime, envs={})[source]

Synchronously execute a commandline string on the shell.

Parameters:
  • cmd (-) – Commandline string to execute
  • walltime (-) – walltime in seconds, this is not really used now.
Kwargs:
  • envs (dict) : Dictionary of env variables. This will be used to override the envs set at channel initialization.
Returns:Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string
Return type:
  • retcode

Raises: None.

push_file(source, dest_dir)[source]

If the source files dirpath is the same as dest_dir, a copy is not necessary, and nothing is done. Else a copy is made.

Parameters:
  • source (-) – Path to the source file
  • dest_dir (-) – Path to the directory to which the files is to be copied
Returns:

Absolute path of the destination file

Return type:

  • destination_path (String)

Raises:

- FileCopyException – If file copy failed.

script_dir[source]

This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.

Parameters:None (-) –
Returns:
  • Channel script dir

SshChannel

class libsubmit.channels.SSHChannel(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]

SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following test on the commandline should work :

>>> ssh <username>@<hostname>
__init__(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]

Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible

Parameters:hostname (-) – Hostname
KWargs:
  • username (string) : Username on remote system
  • password (string) : Password for remote system
  • script_dir (string) : Full path to a script dir where generated scripts could be sent to.
  • envs (dict) : A dictionary of environment variables to be set when executing commands

Raises:

execute_no_wait(cmd, walltime=2, envs={})[source]

Execute asynchronousely without waiting for exitcode

Parameters:
  • cmd (-) – Commandline string to be executed on the remote side
  • walltime (-) – timeout to exec_command
KWargs:
  • envs (dict): A dictionary of env variables
Returns:

  • None, stdout (readable stream), stderr (readable stream)

Raises:
  • ChannelExecFailed (reason)
execute_wait(cmd, walltime=2, envs={})[source]

Synchronously execute a commandline string on the shell.

Parameters:
  • cmd (-) – Commandline string to execute
  • walltime (-) – walltime in seconds, this is not really used now.
Kwargs:
  • envs (dict) : Dictionary of env variables
Returns:Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string
Return type:
  • retcode

Raises: None.

pull_file(remote_source, local_dir)[source]

Transport file on the remote side to a local directory

Parameters:
  • remote_source (-) – remote_source
  • local_dir (-) – Local directory to copy to
Returns:

Local path to file

Return type:

  • str

Raises:
  • - FileExists – Name collision at local directory.
  • - FileCopyException – FileCopy failed.
push_file(local_source, remote_dir)[source]

Transport a local file to a directory on a remote machine

Parameters:
  • local_source (-) – Path
  • remote_dir (-) – Remote path
Returns:

Path to copied file on remote machine

Return type:

  • str

Raises:
  • - BadScriptPath – if script path on the remote side is bad
  • - BadPermsScriptPath – You do not have perms to make the channel script dir
  • - FileCopyException – FileCopy failed.

SSH Interactive Login Channel

class libsubmit.channels.SSHInteractiveLoginChannel(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]

SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up.

__init__(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]

Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible

Parameters:hostname (-) – Hostname
KWargs:
  • username (string) : Username on remote system
  • password (string) : Password for remote system
  • script_dir (string) : Full path to a script dir where generated scripts could be sent to.
  • envs (dict) : A dictionary of env variables to be set when executing commands

Raises:

Launchers

Launchers are basically wrappers for user submitted scripts as they are submitted to a specific execution resource.

SimpleLauncher

class libsubmit.launchers.SimpleLauncher[source]

Does no wrapping. Just returns the command as-is

SingleNodeLauncher

class libsubmit.launchers.SingleNodeLauncher[source]

Worker launcher that wraps the user’s command with the framework to launch multiple command invocations in parallel. This wrapper sets the bash env variable CORES to the number of cores on the machine. By setting task_blocks to an integer or to a bash expression the number of invocations of the command to be launched can be controlled.

AprunLauncher

class libsubmit.launchers.AprunLauncher(aprun_override=None)[source]

Worker launcher that wraps the user’s command with the Aprun launch framework to launch multiple cmd invocations in parallel on a single job allocation

SrunLauncher

class libsubmit.launchers.SrunLauncher[source]

Worker launcher that wraps the user’s command with the SRUN launch framework to launch multiple cmd invocations in parallel on a single job allocation.

SrunMPILauncher

class libsubmit.launchers.SrunMPILauncher[source]

Worker launcher that wraps the user’s command with the SRUN launch framework to launch multiple cmd invocations in parallel on a single job allocation.

Flow Control

This section deals with functionality related to controlling the flow of tasks to various executors.

FlowControl

class parsl.dataflow.flow_control.FlowControl(dfk, *args, threshold=20, interval=5)[source]

Implements threshold-interval based flow control.

The overall goal is to trap the flow of apps from the workflow, measure it and redirect it the appropriate executors for processing.

This is based on the following logic:

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         count = get_events_since(start)
         if count >= THRESHOLD :
             break

    callback()

This logic ensures that the callbacks are activated with a maximum delay of interval for systems with infrequent events as well as systems which would generate large bursts of events.

Once a callback is triggered, the callback generally runs a strategy method on the sites available as well asqeuque

TODO: When the debug logs are enabled this module emits duplicate messages. This issue needs more debugging. What I’ve learnt so far is that the duplicate messages are present only when the timer thread is started, so this could be from a duplicate logger being added by the thread.

close()[source]

Merge the threads and terminate.

make_callback(kind=None)[source]

Makes the callback and resets the timer.

KWargs:
  • kind (str): Default=None, used to pass information on what triggered the callback
notify(event_id)[source]

Let the FlowControl system know that there is an event.

FlowNoControl

class parsl.dataflow.flow_control.FlowNoControl(dfk, *args, threshold=2, interval=2)[source]

FlowNoControl implements similar interfaces as FlowControl.

Null handlers are used so as to mimic the FlowControl class.

__init__(dfk, *args, threshold=2, interval=2)[source]

Initialize the flowcontrol object. This does nothing.

Parameters:dfk (-) – DFK object to track parsl progress
KWargs:
  • threshold (int) : Tasks after which the callback is triggered
  • interval (int) : seconds after which timer expires
__weakref__[source]

list of weak references to the object (if defined)

close()[source]

This close fn does nothing.

notify(event_id)[source]

This notifiy fn does nothing.

Timer

class parsl.dataflow.flow_control.Timer(callback, *args, interval=5)[source]

This timer is a simplified version of the FlowControl timer. This timer does not employ notify events.

This is based on the following logic :

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         wait()
         break

    callback()
__init__(callback, *args, interval=5)[source]

Initialize the flowcontrol object We start the timer thread here

Parameters:dfk (-) – DFK object to track parsl progress
KWargs:
  • threshold (int) : Tasks after which the callback is triggered
  • interval (int) : seconds after which timer expires
__weakref__[source]

list of weak references to the object (if defined)

close()[source]

Merge the threads and terminate.

make_callback(kind=None)[source]

Makes the callback and resets the timer.

Strategy

Strategies are responsible for tracking the compute requirements of a workflow as it is executed and scaling the resources to match it.

class parsl.dataflow.strategy.Strategy(dfk)[source]

FlowControl strategy.

As a workflow dag is processed by Parsl, new tasks are added and completed asynchronously. Parsl interfaces executors with execution providers to construct scalable executors to handle the variable work-load generated by the workflow. This component is responsible for periodically checking outstanding tasks and available compute capacity and trigger scaling events to match workflow needs.

Here’s a diagram of an executor. An executor consists of blocks, which are usually created by single requests to a Local Resource Manager (LRM) such as slurm, condor, torque, or even AWS API. The blocks could contain several task blocks which are separate instances on workers.

           |<--min_blocks     |<-init_blocks              max_blocks-->|
           +----------------------------------------------------------+
           |  +--------block----------+       +--------block--------+ |
executor = |  | task          task    | ...   |    task      task   | |
           |  +-----------------------+       +---------------------+ |
           +----------------------------------------------------------+
The relevant specification options are:
  1. min_blocks: Minimum number of blocks to maintain
  2. init_blocks: number of blocks to provision at initialization of workflow
  3. max_blocks: Maximum number of blocks that can be active due to one workflow
slots = current_capacity * tasks_per_node * nodes_per_block

active_tasks = pending_tasks + running_tasks

Parallelism = slots / tasks
            = [0, 1] (i.e,  0 <= p <= 1)

For example:

When p = 0,

=> compute with the least resources possible. infinite tasks are stacked per slot.

blocks =  min_blocks           { if active_tasks = 0
          max(min_blocks, 1)   {  else
When p = 1,

=> compute with the most resources. one task is stacked per slot.

blocks = min ( max_blocks,
         ceil( active_tasks / slots ) )
When p = 1/2,
=> We stack upto 2 tasks per slot before we overflow and request a new block

let’s say min:init:max = 0:0:4 and task_blocks=2 Consider the following example: min_blocks = 0 init_blocks = 0 max_blocks = 4 tasks_per_node = 2 nodes_per_block = 1

In the diagram, X <- task

at 2 tasks:

+---Block---|
|           |
| X      X  |
|slot   slot|
+-----------+

at 5 tasks, we overflow as the capacity of a single block is fully used.

+---Block---|       +---Block---|
| X      X  | ----> |           |
| X      X  |       | X         |
|slot   slot|       |slot   slot|
+-----------+       +-----------+
__init__(dfk)[source]

Initialize strategy.

__weakref__[source]

list of weak references to the object (if defined)

unset_logging()[source]

Mute newly added handlers to the root level, right after calling executor.status

Memoization

class parsl.dataflow.memoization.Memoizer(dfk, memoize=True, checkpoint={})[source]

Memoizer is responsible for ensuring that identical work is not repeated.

When a task is repeated, i.e., the same function is called with the same exact arguments, the result from a previous execution is reused. wiki

The memoizer implementation here does not collapse duplicate calls at call time, but works only when the result of a previous call is available at the time the duplicate call is made.

For instance:

No advantage from                 Memoization helps
memoization here:                 here:

 TaskA                            TaskB
   |   TaskA                        |
   |     |   TaskA                done  (TaskB)
   |     |     |                                (TaskB)
 done    |     |
       done    |
             done

The memoizer creates a lookup table by hashing the function name and its inputs, and storing the results of the function.

When a task is ready for launch, i.e., all of its arguments have resolved, we add its hash to the task datastructure.

__init__(dfk, memoize=True, checkpoint={})[source]

Initialize the memoizer.

Parameters:dfk (-) – The DFK object
KWargs:
  • memoize (Bool): enable memoization or not.
  • checkpoint (Dict): A checkpoint loaded as a dict.
__weakref__[source]

list of weak references to the object (if defined)

check_memo(task_id, task)[source]

Create a hash of the task and its inputs and check the lookup table for this hash.

If present, the results are returned. The result is a tuple indicating whether a memo exists and the result, since a Null result is possible and could be confusing. This seems like a reasonable option without relying on an cache_miss exception.

Parameters:task (-) – task from the dfk.tasks table
Returns:
  • present (Bool): Is this present in the memo_lookup_table
  • Result (Py Obj): Result of the function if present in table
Return type:Tuple of the following

This call will also set task[‘hashsum’] to the unique hashsum for the func+inputs.

hash_lookup(hashsum)[source]

Lookup a hash in the memoization table.

Will raise a KeyError if hash is not in the memoization lookup table.

Parameters:hashsum (-) – The same hashes used to uniquely identify apps+inputs
Returns:
  • Lookup result, this is unlikely to be None, since the hashes are set by this library and could not miss entried in it’s dict.
Raises:- KeyError – if hash not in table
make_hash(task)[source]

Create a hash of the task inputs.

This uses a serialization library borrowed from ipyparallel. If this fails here, then all ipp calls are also likely to fail due to failure at serialization.

Parameters:task (-) – Task dictionary from dfk.tasks
Returns:A unique hash string
Return type:
  • hash (str)
update_memo(task_id, task, r)[source]

Updates the memoization lookup table with the result from a task.

Parameters:
  • task_id (-) – Integer task id
  • task (-) – A task dict from dfk.tasks
  • r (-) – Result future

A warning is issued when a hash collision occures during the update. This is not likely.