Developer Documentation

Parsl

Parallel Scripting Library, designed to enable efficient workflow execution.

Importing

To get all the required functionality, we suggest importing the library as follows:

>>> import parsl
>>> from parsl import *

Logging

Following the general logging philosophy of python libraries, by default Parsl doesn’t log anything. However the following helper functions are provided for logging:

  1. set_stream_logger
    This sets the logger to the StreamHandler. This is quite useful when working from a Jupyter notebook.
  2. set_file_logger
    This sets the logging to a file. This is ideal for reporting issues to the dev team.
parsl.set_stream_logger(name='parsl', level=10, format_string=None)

Add a stream log handler

Args:
  • name (string) : Set the logger name.
  • level (logging.LEVEL) : Set to logging.DEBUG by default.
  • format_string (sting) : Set to None by default.
Returns:
  • None
parsl.set_file_logger(filename, name='parsl', level=10, format_string=None)

Add a stream log handler

Args:
  • filename (string): Name of the file to write logs to
  • name (string): Logger name
  • level (logging.LEVEL): Set the logging level.
  • format_string (string): Set the format string
Returns:
  • None

Apps

Apps are parallelized functions that execute independent of the control flow of the main python interpretor. We have two main types of Apps : PythonApps and BashApps. These are subclassed from AppBase.

AppBase

This is the base class that defines the two external facing functions that an App must define. The __init__ () which is called when the interpretor sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

class parsl.app.app.AppBase(func, executor, walltime=60, exec_type='bash')

This is the base class that defines the two external facing functions that an App must define. The __init__ () which is called when the interpretor sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

__call__(*args, **kwargs)

The __call__ function must be implemented in the subclasses

__init__(func, executor, walltime=60, exec_type='bash')

Constructor for the APP object.

Args:
  • func (function): Takes the function to be made into an App
  • executor (executor): Executor for the execution resource
Kwargs:
  • walltime (int) : Walltime in seconds for the app execution
  • exec_type (string) : App type (bash|python)
Returns:
  • APP object.

PythonApp

Concrete subclass of AppBase that implements the Python App functionality.

class parsl.app.app.PythonApp(func, executor, walltime=60)

Extends AppBase to cover the Python App

__call__(*args, **kwargs)

This is where the call to a python app is handled

Args:
  • Arbitrary
Kwargs:
  • Arbitrary
Returns:
If outputs=[...] was a kwarg then:
App_fut, [Data_Futures...]
else:
App_fut
__init__(func, executor, walltime=60)

Initialize the super. This bit is the same for both bash & python apps.

BashApp

Concrete subclass of AppBase that implements the Bash App functionality.

class parsl.app.app.BashApp(func, executor, walltime=60)
__call__(*args, **kwargs)

This is where the call to a Bash app is handled

Args:
  • Arbitrary
Kwargs:
  • Arbitrary
Returns:
If outputs=[...] was a kwarg then:
App_fut, [Data_Futures...]
else:
App_fut

Futures

Futures are returned as proxies to a parallel execution initiated by a call to an App. We have two kinds of futures in Parsl: AppFutures and DataFutures.

AppFutures

class parsl.dataflow.futures.AppFuture(parent, tid=None)

An AppFuture points at a Future returned from an Executor

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

done()

Check if the future is done. If a parent is set, we return the status of the parent. else, there is no parent assigned, meaning the status is False.

Returns:
  • True : If the future has successfully resolved.
  • False : Pending resolution
parent_callback(executor_fu)

Callback from executor future to update the parent.

Args:
  • executor_fu (Future): Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

update_parent(fut)

Handle the case where the user has called result on the AppFuture before the parent exists. Add a callback to the parent to update the state

DataFutures

class parsl.app.futures.DataFuture(fut, file_obj, parent=None, tid=None)

A datafuture points at an AppFuture

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

cancel()

Cancel the task that this DataFuture is tracking.

Note: This may not work

filename

Filepath of the File object this datafuture represents

filepath

Filepath of the File object this datafuture represents

parent_callback(parent_fu)

Callback from executor future to update the parent.

Args:
  • executor_fu (Future): Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

result(timeout=None)

A blocking call that returns either the result or raises an exception. Assumptions : A DataFuture always has a parent AppFuture. The AppFuture does callbacks when setup.

Kwargs:
  • timeout (int): Timeout in seconds
Returns:
  • If App completed successfully returns the filepath.
Raises:
  • Exception raised by app if failed.
tid

Returns the task_id of the task that will resolve this DataFuture

Exceptions

class parsl.app.errors.ParslError

Base class for all exceptions

Only to be invoked when only a more specific error is not available.

class parsl.app.errors.NotFutureError

Basically a type error. A non future item was passed to a function that expected a future.

class parsl.app.errors.InvalidAppTypeError

An invalid app type was requested from the the @App decorator.

class parsl.app.errors.AppException

An error raised during execution of an app. What this exception contains depends entirely on context

class parsl.app.errors.AppBadFormatting(reason, exitcode, retries=None)

An error raised during formatting of a bash function What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.AppFailure(reason, exitcode, retries=None)

An error raised during execution of an app. What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.MissingOutputs(reason, outputs)

Error raised at the end of app execution due to missing output files

Contains: reason(string) outputs(List of strings/files..)

class parsl.app.errors.DependencyError(dependent_exceptions, reason, outputs)

Error raised at the end of app execution due to missing output files

Contains: reason(string) outputs(List of strings/files..)

class parsl.dataflow.error.DataFlowExceptions

Base class for all exceptions Only to be invoked when only a more specific error is not available.

class parsl.dataflow.error.DuplicateTaskError

Raised by the DataFlowKernel when it finds that a job with the same task-id has been launched before.

class parsl.dataflow.error.MissingFutError

Raised when a particular future is not found within the dataflowkernel’s datastructures. Deprecated.

Executors

Executors are abstractions that represent available compute resources to which you could submit arbitrary App tasks. An executor initialized with an Execution Provider can dynamically scale with the resources requirements of the workflow.

We currently have thread pools for local execution, remote workers from ipyparallel for executing on high throughput systems such as campus clusters, and a Swift/T executor for HPC systems.

ParslExecutor

class parsl.executors.base.ParslExecutor

Define the strict interface for all Executor classes This is a metaclass that only enforces concrete implementations of functionality by the child classes.

Note

Shutdown is currently missing, as it is not yet supported by some of the executors (threads for eg).

__init__

Initialize self. See help(type(self)) for accurate signature.

scale_in(*args, **kwargs)

Scale in method. We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a corinine, since scaling tasks can be slow.

scale_out(*args, **kwargs)

Scale out method. We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.

scaling_enabled

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

submit(*args, **kwargs)

We haven’t yet decided on what the args to this can be, whether it should just be func, args, kwargs or be the partially evaluated fn

ThreadPoolExecutor

class parsl.executors.threads.ThreadPoolExecutor(max_workers=2, thread_name_prefix='', execution_provider=None, config=None)

The thread pool executor

__init__(max_workers=2, thread_name_prefix='', execution_provider=None, config=None)

Initialize the thread pool Config options that are really used are :

config.sites.site.execution.options = {“maxThreads” : <int>,
“threadNamePrefix” : <string>}
Kwargs:
  • max_workers (int) : Number of threads (Default=2) (keeping name workers/threads for backward compatibility)
  • thread_name_prefix (string) : Thread name prefix (Only supported in python v3.6+
  • execution_provider (ep object) : This is ignored here
  • config (dict): The config dict object for the site:
scale_in(workers=1)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(workers=1)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
submit(*args, **kwargs)

Submits work to the thread pool This method is simply pass through and behaves like a submit call as described here Python docs:

Returns:
Future

IPyParallelExecutor

class parsl.executors.threads.ThreadPoolExecutor(max_workers=2, thread_name_prefix='', execution_provider=None, config=None)

The thread pool executor

__init__(max_workers=2, thread_name_prefix='', execution_provider=None, config=None)

Initialize the thread pool Config options that are really used are :

config.sites.site.execution.options = {“maxThreads” : <int>,
“threadNamePrefix” : <string>}
Kwargs:
  • max_workers (int) : Number of threads (Default=2) (keeping name workers/threads for backward compatibility)
  • thread_name_prefix (string) : Thread name prefix (Only supported in python v3.6+
  • execution_provider (ep object) : This is ignored here
  • config (dict): The config dict object for the site:
scale_in(workers=1)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(workers=1)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
submit(*args, **kwargs)

Submits work to the thread pool This method is simply pass through and behaves like a submit call as described here Python docs:

Returns:
Future

Swift/Turbine Executor

class parsl.executors.swift_t.TurbineExecutor(swift_attribs=None)

The Turbine executor. Bypass the Swift/T language and run on top off the Turbine engines in an MPI environment.

Here’s a simple diagram

             |  Data   |  Executor   |   IPC      | External Process(es)
             |  Flow   |             |            |
        Task | Kernel  |             |            |
      +----->|-------->|------------>|outgoing_q -|-> Worker_Process
      |      |         |             |            |    |         |
Parsl<---Fut-|         |             |            |  result   exception
          ^  |         |             |            |    |         |
          |  |         |   Q_mngmnt  |            |    V         V
          |  |         |    Thread<--|incoming_q<-|--- +---------+
          |  |         |      |      |            |
          |  |         |      |      |            |
          +----update_fut-----+
__init__(swift_attribs=None)

Initialize the thread pool Trying to implement the emews model.

Kwargs:
  • swift_attribs : Takes a dict of swift attribs. Fot future.
_queue_management_worker()

The queue management worker is responsible for listening to the incoming_q for task status messages and updating tasks with results/exceptions/updates

It expects the following messages:

{
“task_id” : <task_id> “result” : serialized result object, if task succeeded ... more tags could be added later

}

{
“task_id” : <task_id> “exception” : serialized exception object, on failure

}

We don’t support these yet, but they could be added easily as heartbeat.

{
“task_id” : <task_id> “cpu_stat” : <> “mem_stat” : <> “io_stat” : <> “started” : tstamp

}

The None message is a die request. None

_start_queue_management_thread()

Method to start the management thread as a daemon. Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.

scale_in(workers=1)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(workers=1)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called. This would be nice to have, and can be done

Raises:
NotImplemented exception
shutdown()

Shutdown method, to kill the threads and workers.

submit(func, *args, **kwargs)

Submits work to the the outgoing_q, an external process listens on this queue for new work. This method is simply pass through and behaves like a submit call as described here Python docs:

Args:
  • func (callable) : Callable function
  • *args (list) : List of arbitrary positional arguments.
Kwargs:
  • **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns:
Future
parsl.executors.swift_t.runner(incoming_q, outgoing_q)

This is a function that mocks the Swift-T side. It listens on the the incoming_q for tasks and posts returns on the outgoing_q

Args:
  • incoming_q (Queue object) : The queue to listen on
  • outgoing_q (Queue object) : Queue to post results on

The messages posted on the incoming_q will be of the form :

{
“task_id” : <uuid.uuid4 string>, “buffer” : serialized buffer containing the fn, args and kwargs

}

If None is received, the runner will exit.

Response messages should be of the form:

{
“task_id” : <uuid.uuid4 string>, “result” : serialized buffer containing result “exception” : serialized exception object

}

On exiting the runner will post None to the outgoing_q

Execution Providers

Execution providers are responsible for managing execution resources with a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have schedulers such as Slurm, PBS, Condor and. Clouds on the other hand have API interfaces that allow much more fine grain composition of an execution environment. An execution provider abstracts these resources and provides a single uniform interface to them.

ExecutionProvider

Slurm

Amazon Web Services

Azure