Developer Documentation

Parsl

Parallel Scripting Library, designed to enable efficient workflow execution.

Importing

To get all the required functionality, we suggest importing the library as follows:

>>> import parsl
>>> from parsl import *

Logging

Following the general logging philosophy of python libraries, by default Parsl doesn’t log anything. However the following helper functions are provided for logging:

  1. set_stream_logger
    This sets the logger to the StreamHandler. This is quite useful when working from a Jupyter notebook.
  2. set_file_logger
    This sets the logging to a file. This is ideal for reporting issues to the dev team.
parsl.set_stream_logger(name='parsl', level=10, format_string=None)

Add a stream log handler

Args:
  • name (string) : Set the logger name.
  • level (logging.LEVEL) : Set to logging.DEBUG by default.
  • format_string (sting) : Set to None by default.
Returns:
  • None
parsl.set_file_logger(filename, name='parsl', level=10, format_string=None)

Add a stream log handler

Args:
  • filename (string): Name of the file to write logs to
  • name (string): Logger name
  • level (logging.LEVEL): Set the logging level.
  • format_string (string): Set the format string
Returns:
  • None

Apps

Apps are parallelized functions that execute independent of the control flow of the main python interpretor. We have two main types of Apps : PythonApps and BashApps. These are subclassed from AppBase.

AppBase

This is the base class that defines the two external facing functions that an App must define. The __init__ () which is called when the interpretor sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

class parsl.app.app.AppBase(func, executor, walltime=60, sites='all', exec_type='bash')

This is the base class that defines the two external facing functions that an App must define. The __init__ () which is called when the interpretor sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.

__call__(*args, **kwargs)

The __call__ function must be implemented in the subclasses

__init__(func, executor, walltime=60, sites='all', exec_type='bash')

Constructor for the APP object.

Args:
  • func (function): Takes the function to be made into an App
  • executor (executor): Executor for the execution resource
Kwargs:
  • walltime (int) : Walltime in seconds for the app execution
  • sites (str|list) : List of site names that this app could execute over. default is ‘all’
  • exec_type (string) : App type (bash|python)
Returns:
  • APP object.

PythonApp

Concrete subclass of AppBase that implements the Python App functionality.

class parsl.app.python_app.PythonApp(func, executor, walltime=60, sites='all')

Extends AppBase to cover the Python App

__call__(*args, **kwargs)

This is where the call to a python app is handled

Args:
  • Arbitrary
Kwargs:
  • Arbitrary
Returns:
If outputs=[…] was a kwarg then:
App_fut, [Data_Futures…]
else:
App_fut
__init__(func, executor, walltime=60, sites='all')

Initialize the super. This bit is the same for both bash & python apps.

BashApp

Concrete subclass of AppBase that implements the Bash App functionality.

class parsl.app.bash_app.BashApp(func, executor, walltime=60, sites='all')
__call__(*args, **kwargs)

This is where the call to a Bash app is handled

Args:
  • Arbitrary
Kwargs:
  • Arbitrary
Returns:
If outputs=[…] was a kwarg then:
App_fut, [Data_Futures…]
else:
App_fut

Futures

Futures are returned as proxies to a parallel execution initiated by a call to an App. We have two kinds of futures in Parsl: AppFutures and DataFutures.

AppFutures

class parsl.dataflow.futures.AppFuture(parent, tid=None, stdout=None, stderr=None)

An AppFuture points at a Future returned from an Executor

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

done()

Check if the future is done. If a parent is set, we return the status of the parent. else, there is no parent assigned, meaning the status is False.

Returns:
  • True : If the future has successfully resolved.
  • False : Pending resolution
parent_callback(executor_fu)

Callback from executor future to update the parent.

Args:
  • executor_fu (Future): Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

update_parent(fut)

Handle the case where the user has called result on the AppFuture before the parent exists. Add a callback to the parent to update the state

DataFutures

class parsl.app.futures.DataFuture(fut, file_obj, parent=None, tid=None)

A datafuture points at an AppFuture

We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.

cancel()

Cancel the task that this DataFuture is tracking.

Note: This may not work

filename

Filepath of the File object this datafuture represents

filepath

Filepath of the File object this datafuture represents

parent_callback(parent_fu)

Callback from executor future to update the parent.

Args:
  • executor_fu (Future): Future returned by the executor along with callback
Returns:
  • None

Updates the super() with the result() or exception()

result(timeout=None)

A blocking call that returns either the result or raises an exception. Assumptions : A DataFuture always has a parent AppFuture. The AppFuture does callbacks when setup.

Kwargs:
  • timeout (int): Timeout in seconds
Returns:
  • If App completed successfully returns the filepath.
Raises:
  • Exception raised by app if failed.
tid

Returns the task_id of the task that will resolve this DataFuture

Exceptions

class parsl.app.errors.ParslError

Base class for all exceptions

Only to be invoked when only a more specific error is not available.

class parsl.app.errors.NotFutureError

Basically a type error. A non future item was passed to a function that expected a future.

class parsl.app.errors.InvalidAppTypeError

An invalid app type was requested from the the @App decorator.

class parsl.app.errors.AppException

An error raised during execution of an app. What this exception contains depends entirely on context

class parsl.app.errors.AppBadFormatting(reason, exitcode, retries=None)

An error raised during formatting of a bash function What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.AppFailure(reason, exitcode, retries=None)

An error raised during execution of an app. What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)

class parsl.app.errors.MissingOutputs(reason, outputs)

Error raised at the end of app execution due to missing output files

Contains: reason(string) outputs(List of strings/files..)

class parsl.app.errors.DependencyError(dependent_exceptions, reason, outputs)

Error raised at the end of app execution due to missing output files

Contains: reason(string) outputs(List of strings/files..)

class parsl.dataflow.error.DataFlowExceptions

Base class for all exceptions Only to be invoked when only a more specific error is not available.

class parsl.dataflow.error.DuplicateTaskError

Raised by the DataFlowKernel when it finds that a job with the same task-id has been launched before.

class parsl.dataflow.error.MissingFutError

Raised when a particular future is not found within the dataflowkernel’s datastructures. Deprecated.

Executors

Executors are abstractions that represent available compute resources to which you could submit arbitrary App tasks. An executor initialized with an Execution Provider can dynamically scale with the resources requirements of the workflow.

We currently have thread pools for local execution, remote workers from ipyparallel for executing on high throughput systems such as campus clusters, and a Swift/T executor for HPC systems.

ParslExecutor (Abstract Base Class)

class parsl.executors.base.ParslExecutor

Define the strict interface for all Executor classes This is a metaclass that only enforces concrete implementations of functionality by the child classes.

Note

Shutdown is currently missing, as it is not yet supported by some of the executors (threads for eg).

__init__

Initialize self. See help(type(self)) for accurate signature.

scale_in(*args, **kwargs)

Scale in method. We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a corinine, since scaling tasks can be slow.

scale_out(*args, **kwargs)

Scale out method. We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.

scaling_enabled

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

submit(*args, **kwargs)

We haven’t yet decided on what the args to this can be, whether it should just be func, args, kwargs or be the partially evaluated fn

ThreadPoolExecutor

class parsl.executors.threads.ThreadPoolExecutor(max_workers=2, thread_name_prefix='', execution_provider=None, config=None, **kwargs)

The thread pool executor

__init__(max_workers=2, thread_name_prefix='', execution_provider=None, config=None, **kwargs)

Initialize the thread pool Config options that are really used are :

config.sites.site.execution.options = {“maxThreads” : <int>,
“threadNamePrefix” : <string>}
Kwargs:
  • max_workers (int) : Number of threads (Default=2) (keeping name workers/threads for backward compatibility)
  • thread_name_prefix (string) : Thread name prefix (Only supported in python v3.6+
  • execution_provider (ep object) : This is ignored here
  • config (dict): The config dict object for the site:
scale_in(workers=1)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(workers=1)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
submit(*args, **kwargs)

Submits work to the thread pool This method is simply pass through and behaves like a submit call as described here Python docs:

Returns:
Future

IPyParallelExecutor

class parsl.executors.ipp.IPyParallelExecutor(execution_provider=None, reuse_controller=True, engine_json_file='~/.ipython/profile_default/security/ipcontroller-engine.json', engine_dir='.', controller=None, config=None)

The Ipython parallel executor. This executor allows us to take advantage of multiple processes running locally or remotely via IPythonParallel’s pilot execution system.

Note

Some deficiencies with this executor are:

  1. Ipengine’s execute one task at a time. This means one engine per core is necessary to exploit the full parallelism of a node.
  2. No notion of remaining walltime.
  3. Lack of throttling means tasks could be queued up on a worker.
__init__(execution_provider=None, reuse_controller=True, engine_json_file='~/.ipython/profile_default/security/ipcontroller-engine.json', engine_dir='.', controller=None, config=None)

Initialize the IPyParallel pool. The initialization takes all relevant parameters via KWargs.

Note

If initBlocks > 0, and a scalable execution_provider is attached, then the provider will be initialized here.

Args:
  • self
KWargs:
  • execution_provider (ExecutionProvider object)
  • reuse_controller (Bool) : If True ipp executor will attempt to connect to an available controller. Default: True
  • engine_json_file (str): Path to json engine file that will be used to compose ipp launch commands at scaling events. Default : ‘~/.ipython/profile_default/security/ipcontroller-engine.json’
  • engine_dir (str) : Alternative to above, specify the engine_dir
  • config (dict). Default: ‘.’
compose_launch_cmd(filepath, engine_dir)

Reads the json contents from filepath and uses that to compose the engine launch command

Args:
filepath: Path to the engine file engine_dir : CWD for the engines .
scale_in(*args, **kwargs)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(*args, **kwargs)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
submit(*args, **kwargs)

Submits work to the thread pool This method is simply pass through and behaves like a submit call as described here Python docs:

Returns:
Future

Swift/Turbine Executor

class parsl.executors.swift_t.TurbineExecutor(swift_attribs=None, config=None, **kwargs)

The Turbine executor. Bypass the Swift/T language and run on top off the Turbine engines in an MPI environment.

Here’s a simple diagram

             |  Data   |  Executor   |   IPC      | External Process(es)
             |  Flow   |             |            |
        Task | Kernel  |             |            |
      +----->|-------->|------------>|outgoing_q -|-> Worker_Process
      |      |         |             |            |    |         |
Parsl<---Fut-|         |             |            |  result   exception
          ^  |         |             |            |    |         |
          |  |         |   Q_mngmnt  |            |    V         V
          |  |         |    Thread<--|incoming_q<-|--- +---------+
          |  |         |      |      |            |
          |  |         |      |      |            |
          +----update_fut-----+
__init__(swift_attribs=None, config=None, **kwargs)

Initialize the thread pool Trying to implement the emews model.

Kwargs:
  • swift_attribs : Takes a dict of swift attribs. Fot future.
_queue_management_worker()

The queue management worker is responsible for listening to the incoming_q for task status messages and updating tasks with results/exceptions/updates

It expects the following messages:

{
   "task_id" : <task_id>
   "result"  : serialized result object, if task succeeded
   ... more tags could be added later
}

{
   "task_id" : <task_id>
   "exception" : serialized exception object, on failure
}

We don’t support these yet, but they could be added easily as heartbeat.

{
   "task_id" : <task_id>
   "cpu_stat" : <>
   "mem_stat" : <>
   "io_stat"  : <>
   "started"  : tstamp
}

The None message is a die request. None

_start_queue_management_thread()

Method to start the management thread as a daemon. Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.

scale_in(workers=1)

Scale in the number of active workers by 1 This method is notImplemented for threads and will raise the error if called.

Raises:
NotImplemented exception
scale_out(workers=1)

Scales out the number of active workers by 1 This method is notImplemented for threads and will raise the error if called. This would be nice to have, and can be done

Raises:
NotImplemented exception
shutdown()

Shutdown method, to kill the threads and workers.

submit(func, *args, **kwargs)

Submits work to the the outgoing_q, an external process listens on this queue for new work. This method is simply pass through and behaves like a submit call as described here Python docs:

Args:
  • func (callable) : Callable function
  • *args (list) : List of arbitrary positional arguments.
Kwargs:
  • **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns:
Future
parsl.executors.swift_t.runner(incoming_q, outgoing_q)

This is a function that mocks the Swift-T side. It listens on the the incoming_q for tasks and posts returns on the outgoing_q

Args:
  • incoming_q (Queue object) : The queue to listen on
  • outgoing_q (Queue object) : Queue to post results on

The messages posted on the incoming_q will be of the form :

{
   "task_id" : <uuid.uuid4 string>,
   "buffer"  : serialized buffer containing the fn, args and kwargs
}

If None is received, the runner will exit.

Response messages should be of the form:

{
   "task_id" : <uuid.uuid4 string>,
   "result"  : serialized buffer containing result
   "exception" : serialized exception object
}

On exiting the runner will post None to the outgoing_q

Execution Providers

Execution providers are responsible for managing execution resources that have a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have LRMs (schedulers) such as Slurm, Torque/PBS, Condor and Cobalt. Clouds, on the other hand, have API interfaces that allow much more fine-graind composition of an execution environment. An execution provider abstracts these types of resources and provides a single uniform interface to them.

ExecutionProvider (Base)

class libsubmit.providers.provider_base.ExecutionProvider

Define the strict interface for all Execution Provider

                      +------------------
                      |
script_string ------->|  submit
     id      <--------|---+
                      |
[ ids ]       ------->|  status
[statuses]   <--------|----+
                      |
[ ids ]       ------->|  cancel
[cancel]     <--------|----+
                      |
[True/False] <--------|  scaling_enabled
                      |
                      +-------------------
__init__

Initialize self. See help(type(self)) for accurate signature.

cancel(job_ids)

Cancels the resources identified by the job_ids provided by the user.

Args:
  • job_ids (list): A list of job identifiers
Returns:
  • A list of status from cancelling the job which can be True, False
Raises:
  • ExecutionProviderExceptions or its subclasses
channels_required

Does the execution provider require a channel to function. Generally all Cloud api’s require no channels while all bash script based systems such as schedulers for campus clusters (slurm, torque, cobalt, condor..) need channels

Returns:
  • Status (Bool)
scaling_enabled

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

Returns:
  • Status (Bool)
status(job_ids)

Get the status of a list of jobs identified by the job identifiers returned from the submit request.

Args:
  • job_ids (list) : A list of job identifiers
Returns:
  • A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises:
  • ExecutionProviderExceptions or its subclasses
submit(cmd_string, blocksize, job_name='parsl.auto')

The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).

Args :
  • cmd_string (str) : The bash command string to be executed.
  • blocksize (int) : Blocksize to be requested
KWargs:
  • job_name (str) : Human friendly name to be assigned to the job request
Returns:
  • A job identifier, this could be an integer, string etc
Raises:
  • ExecutionProviderExceptions or its subclasses

Slurm

class libsubmit.providers.slurm.slurm.Slurm(config, channel=None)

Slurm Execution Provider

This provider uses sbatch to submit, squeue for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.

Warning

Please note that in the config documented below, description and values are placed inside a schema that is delimited by <{ schema.. }>

Here’s a sample config for the Slurm provider:

{ "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case slurm
                    # Type : String,
                    # Expected : "slurm",
                    # Required :  True },

     "scriptDir"  : #{Description : Relative or absolute path to a
                    # directory in which intermediate scripts are placed
                    # Type : String,
                    # Default : "./.scripts"},

     "block" : { # Definition of a block

         "nodes"      : #{Description : # of nodes to provision per block
                        # Type : Integer,
                        # Default: 1},

         "taskBlocks" : #{Description : # of workers to launch per block
                        # as either an number or as a bash expression.
                        # for eg, "1" , "$(($CORES / 2))"
                        # Type : String,
                        #  Default: "1" },

         "walltime"  :  #{Description : Walltime requested per block in HH:MM:SS
                        # Type : String,
                        # Default : "00:20:00" },

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },

         "options"   : {  # Scheduler specific options

             "partition" : #{Description : Slurm partition to request blocks from
                           # Type : String,
                           # Required : True },

             "overrides" : #{"Description : String to append to the #SBATCH blocks
                           # in the submit script to the scheduler
                           # Type : String,
                           # Required : False },
         }
     }
   }
}
__init__(config, channel=None)

Initialize the Slurm class

Args:
  • Config (dict): Dictionary with all the config options.
KWargs:
  • Channel (None): A channel is required for slurm.
_status()

Internal: Do not call. Returns the status list for a list of job_ids

Args:
self
Returns:
[status…] : Status list of all jobs
_write_submit_script(template_string, script_filename, job_name, configs)

Load the template string with config values and write the generated submit script to a submit script file.

Args:
  • template_string (string) : The template string to be used for the writing submit script
  • script_filename (string) : Name of the submit script
  • job_name (string) : job name
  • configs (dict) : configs that get pushed into the template
Returns:
  • True: on success
Raises:
SchedulerMissingArgs : If template is missing args ScriptPathError : Unable to write submit script out
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

channels_required

Returns Bool on whether a channel is required

current_capacity

Returns the current blocksize. This may need to return more information in the futures : { minsize, maxsize, current_requested }

status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string, blocksize, job_name='parsl.auto')

Submits the cmd_string onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer

If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Args:
  • cmd_string :(String) Commandline invocation to be made on the remote side.
  • blocksize :(float)
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:
  • None: At capacity, cannot provision more
  • job_id: (string) Identifier for the job

Cobalt

class libsubmit.providers.cobalt.cobalt.Cobalt(config, channel=None)

Cobalt Execution Provider

This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) jobs. Theo script to be used is created from a template file in this same module.

Warning

Please note that in the config documented below, description and values are placed inside a schema that is delimited by #{ schema.. }

Here’s the scheme for the Cobalt provider:

{ "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case cobalt
                    # Type : String,
                    # Expected : "cobalt",
                    # Required :  True },

     "launcher"   : #{Description : Launcher to use for launching workers
                    # it is often necessary to use a launcher that the scheduler supports to
                    # launch workers on multi-node jobs, or to partition MPI jobs
                    # Type : String,
                    # Default : "singleNode" },

     "scriptDir"  : #{Description : Relative or absolute path to a
                    # directory in which intermediate scripts are placed
                    # Type : String,
                    # Default : "./.scripts"},

     "block" : { # Definition of a block

         "nodes"      : #{Description : # of nodes to provision per block
                        # Type : Integer,
                        # Default: 1},

         "taskBlocks" : #{Description : # of workers to launch per block
                        # as either an number or as a bash expression.
                        # for eg, "1" , "$(($CORES / 2))"
                        # Type : String,
                        #  Default: "1" },

         "walltime"  :  #{Description : Walltime requested per block in HH:MM:SS
                        # Type : String,
                        # Default : "01:00:00" },

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },

         "options"   : {  # Scheduler specific options

             "account"   : #{Description : Account that the job will be charged against
                           # Type : String,
                           # Required : True },

             "queue"     : #{Description : Torque queue to request blocks from
                           # Type : String,
                           # Required : False },

             "overrides" : #{"Description : String to append to the Torque submit script
                           # in the submit script to the scheduler
                           # Type : String,
                           # Required : False },
         }
     }
   }
}
__init__(config, channel=None)

Initialize the Cobalt execution provider class

Args:
  • Config (dict): Dictionary with all the config options.
KWargs :
  • channel (channel object) : default=None A channel object
_status()

Internal: Do not call. Returns the status list for a list of job_ids

Args:
self
Returns:
[status…] : Status list of all jobs
_write_submit_script(template_string, script_filename, job_name, configs)

Load the template string with config values and write the generated submit script to a submit script file.

Args:
  • template_string (string) : The template string to be used for the writing submit script
  • script_filename (string) : Name of the submit script
  • job_name (string) : job name
  • configs (dict) : configs that get pushed into the template
Returns:
  • True: on success
Raises:
SchedulerMissingArgs : If template is missing args ScriptPathError : Unable to write submit script out
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

channels_required

Returns Bool on whether a channel is required

status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string, blocksize, job_name='parsl.auto')

Submits the cmd_string onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer

If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Args:
  • cmd_string :(String) Commandline invocation to be made on the remote side.
  • blocksize :(float)
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:
  • None: At capacity, cannot provision more
  • job_id: (string) Identifier for the job

Condor

class libsubmit.providers.condor.condor.Condor(config, channel=None)

Condor Execution Provider

Warning

Please note that in the config documented below, description and values are placed inside a schema that is delimited by #{ schema.. }

Here’s the schema for the Condor provider:

{ "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case condor
                    # Type : String,
                    # Expected : "condor",
                    # Required :  True },

     "launcher"   : #{Description : Launcher to use for launching workers
                    # Since condor doesn't generally do multi-node, "singleNode" is the
                    # only meaningful launcher.
                    # Type : String,
                    # Default : "singleNode" },

     "scriptDir"  : #{Description : Relative or absolute path to a
                    # directory in which intermediate scripts are placed
                    # Type : String,
                    # Default : "./.scripts"},

     "block" : { # Definition of a block

         "nodes"      : #{Description : # of nodes to provision per block
                        # Type : Integer,
                        # Default: 1},

         "taskBlocks" : #{Description : # of workers to launch per block
                        # as either an number or as a bash expression.
                        # for eg, "1" , "$(($CORES / 2))"
                        # Type : String,
                        #  Default: "1" },

         "walltime"  :  #{Description : Walltime requested per block in HH:MM:SS
                        # Type : String,
                        # Default : "01:00:00" },

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },

         "options"   : {  # Scheduler specific options

             "project"    : #{Description : Project to which the job will be charged against
                            # Type : String,
                            # Required : True },

             "overrides"  : #{"Description : String to add specific condor attributes to the
                            # Condor submit script
                            # Type : String,
                            # Required : False },

             "workerSetup": #{"Description : String that sets up the env for the workers as well
                            # apps to run
                            # Type : String,
                            # Required : False },

             "requirements": #{"Description : Condor requirements
                             # Type : String,
                             # Required : True },
         }
     }
   }
}
__init__(config, channel=None)

Initialize the Condor class

Args:
  • Config (dict): Dictionary with all the config options.
KWargs:
  • Channel (none): A channel is required for htcondor.
_status()

Internal: Do not call. Returns the status list for a list of job_ids

Args:
self
Returns:
[status…] : Status list of all jobs
_write_submit_script(template_string, script_filename, job_name, configs)

Load the template string with config values and write the generated submit script to a submit script file.

Args:
  • template_string (string) : The template string to be used for the writing submit script script_filename (string) : Name of the submit script
  • job_name (string) : job name
  • configs (dict) : configs that get pushed into the template
Returns:
  • True: on success
Raises:
  • SchedulerMissingArgs : If template is missing args
  • ScriptPathError : Unable to write submit script out
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

channels_required

Returns Bool on whether a channel is required

status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string, blocksize, job_name='parsl.auto')

Submits the cmd_string onto an Local Resource Manager job of blocksize parallel elements.

example file with the complex case of multiple submits per job:
Universe =vanilla output = out.$(Cluster).$(Process) error = err.$(Cluster).$(Process) log = log.$(Cluster) leave_in_queue = true executable = test.sh queue 5 executable = foo queue 1

$ condor_submit test.sub Submitting job(s)…… 5 job(s) submitted to cluster 118907. 1 job(s) submitted to cluster 118908.

Torque

class libsubmit.providers.torque.torque.Torque(config, channel=None)

Torque Execution Provider

This provider uses sbatch to submit, squeue for status, and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.

Warning

Please note that in the config documented below, description and values are placed inside a schema that is delimited by #{ schema.. }

Here’s the scheme for the Torque provider:

{ "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case torque
                    # Type : String,
                    # Expected : "torque",
                    # Required :  True },

     "scriptDir"  : #{Description : Relative or absolute path to a
                    # directory in which intermediate scripts are placed
                    # Type : String,
                    # Default : "./scripts"},

     "block" : { # Definition of a block

         "nodes"      : #{Description : # of nodes to provision per block
                        # Type : Integer,
                        # Default: 1},

         "taskBlocks" : #{Description : # of workers to launch per block
                        # as either an number or as a bash expression.
                        # for eg, "1" , "$(($CORES / 2))"
                        # Type : String,
                        #  Default: "1" },

         "walltime"  :  #{Description : Walltime requested per block in HH:MM:SS
                        # Type : String,
                        # Default : "00:20:00" },

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },

         "options"   : {  # Scheduler specific options

             "account"   : #{Description : Account the job will be charged against
                           # Type : String,
                           # Required : True },

             "queue"     : #{Description : Torque queue to request blocks from
                           # Type : String,
                           # Required : False },

             "overrides" : #{"Description : String to append to the Torque submit script
                           # in the submit script to the scheduler
                           # Type : String,
                           # Required : False },
         }
     }
   }
}
__init__(config, channel=None)

Initialize the Torque class

Args:
  • Config (dict): Dictionary with all the config options.
KWargs:
  • Channel (None): A channel is required for torque.
_status()

Internal: Do not call. Returns the status list for a list of job_ids

Args:
self
Returns:
[status…] : Status list of all jobs
_write_submit_script(template_string, script_filename, job_name, configs)

Load the template string with config values and write the generated submit script to a submit script file.

Args:
  • template_string (string) : The template string to be used for the writing submit script
  • script_filename (string) : Name of the submit script
  • job_name (string) : job name
  • configs (dict) : configs that get pushed into the template
Returns:
  • True: on success
Raises:
SchedulerMissingArgs : If template is missing args ScriptPathError : Unable to write submit script out
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

channels_required

Returns Bool on whether a channel is required

current_capacity

Returns the current blocksize. This may need to return more information in the futures : { minsize, maxsize, current_requested }

status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string, blocksize, job_name='parsl.auto')

Submits the cmd_string onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer

If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Args:
  • cmd_string :(String) Commandline invocation to be made on the remote side.
  • blocksize :(float)
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:
  • None: At capacity, cannot provision more
  • job_id: (string) Identifier for the job

Local

class libsubmit.providers.local.local.Local(config, channel_script_dir=None, channel=None)

Local Execution Provider

This provider is used to launch IPP engines on the localhost.

Warning

Please note that in the config documented below, description and values are placed inside a schema that is delimited by #{ schema.. }

Here’s the scheme for the Local provider:

{ "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case local
                    # Type : String,
                    # Expected : "local",
                    # Required :  True },

     "scriptDir"  : #{Description : Relative or absolute path to a
                    # directory in which intermediate scripts are placed
                    # Type : String,
                    # Default : "./.scripts"},

     "block" : { # Definition of a block

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },
     }
   }
}
__init__(config, channel_script_dir=None, channel=None)

Initialize the local provider class

Args:
  • Config (dict): Dictionary with all the config options.
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args: job_ids : [<job_id> …]

Returns : [True/False…] : If the cancel operation fails the entire list will be False.

status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string, blocksize, job_name='parsl.auto')

Submits the cmd_string onto an Local Resource Manager job of blocksize parallel elements. Submit returns an ID that corresponds to the task that was just submitted.

If tasks_per_node < 1:
1/tasks_per_node is provisioned
If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node * blocksize number of nodes are provisioned.
Args:
  • cmd_string :(String) Commandline invocation to be made on the remote side.
  • blocksize :(float) - Not really used for local
Kwargs:
  • job_name (String): Name for job, must be unique
Returns:
  • None: At capacity, cannot provision more
  • job_id: (string) Identifier for the job

AWS

Amazon Web Services

class libsubmit.providers.aws.aws.EC2Provider(config, channel=None)

Here’s a sample config for the EC2 provider:

{ "auth" : { # Definition of authentication method for AWS. One of 3 methods are required to authenticate
             # with AWS : keyfile, profile or env_variables. If keyfile or profile is not set Boto3 will
             # look for the following env variables :
             # AWS_ACCESS_KEY_ID : The access key for your AWS account.
             # AWS_SECRET_ACCESS_KEY : The secret key for your AWS account.
             # AWS_SESSION_TOKEN : The session key for your AWS account.

     "keyfile"    : #{Description: Path to json file that contains 'AWSAccessKeyId' and 'AWSSecretKey'
                    # Type : String,
                    # Required : False},

     "profile"    : #{Description: Specify the profile to be used from the standard aws config file
                    # ~/.aws/config.
                    # Type : String,
                    # Expected : "default", # Use the 'default' aws profile
                    # Required : False},

   },

  "execution" : { # Definition of all execution aspects of a site

     "executor"   : #{Description: Define the executor used as task executor,
                    # Type : String,
                    # Expected : "ipp",
                    # Required : True},

     "provider"   : #{Description : The provider name, in this case ec2
                    # Type : String,
                    # Expected : "aws",
                    # Required :  True },

     "block" : { # Definition of a block

         "nodes"      : #{Description : # of nodes to provision per block
                        # Type : Integer,
                        # Default: 1},

         "taskBlocks" : #{Description : # of workers to launch per block
                        # as either an number or as a bash expression.
                        # for eg, "1" , "$(($CORES / 2))"
                        # Type : String,
                        #  Default: "1" },

         "walltime"  :  #{Description : Walltime requested per block in HH:MM:SS
                        # Type : String,
                        # Default : "00:20:00" },

         "initBlocks" : #{Description : # of blocks to provision at the start of
                        # the DFK
                        # Type : Integer
                        # Default : ?
                        # Required :    },

         "minBlocks" :  #{Description : Minimum # of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : 0 },

         "maxBlocks" :  #{Description : Maximum # Of blocks outstanding at any time
                        # WARNING :: Not Implemented
                        # Type : Integer
                        # Default : ? },

         "options"   : {  # Scheduler specific options


             "instanceType" : #{Description : Instance type t2.small|t2...
                              # Type : String,
                              # Required : False
                              # Default : t2.small },

             "imageId"      : #{"Description : String to append to the #SBATCH blocks
                              # in the submit script to the scheduler
                              # Type : String,
                              # Required : False },

             "region"       : #{"Description : AWS region to launch machines in
                              # in the submit script to the scheduler
                              # Type : String,
                              # Default : 'us-east-2',
                              # Required : False },

             "keyName"      : #{"Description : Name of the AWS private key (.pem file)
                              # that is usually generated on the console to allow ssh access
                              # to the EC2 instances, mostly for debugging.
                              # in the submit script to the scheduler
                              # Type : String,
                              # Required : True },

             "spotMaxBid"   : #{"Description : If requesting spot market machines, specify
                              # the max Bid price.
                              # Type : Float,
                              # Required : False },
         }
     }
   }
}
__init__(config, channel=None)

Initialize the EC2Provider class

Args:
  • Config (dict): Dictionary with all the config options.
KWargs:
  • Channel (None): A channel is not required for EC2.
cancel(job_ids)

Cancels the jobs specified by a list of job ids

Args:
job_ids (list) : List of of job identifiers
Returns :
[True/False…] : If the cancel operation fails the entire list will be False.
create_session()

Here we will first look in the ~/.aws/config file.

First we look in config[“auth”][“keyfile”] for a path to a json file with the credentials. the keyfile should have ‘AWSAccessKeyId’ and ‘AWSSecretKey’

Next we look for config[“auth”][“profile”] for a profile name and try to use the Session call to auto pick up the keys for the profile from the user default keys file ~/.aws/config.

Lastly boto3 will look for the keys in env variables: AWS_ACCESS_KEY_ID : The access key for your AWS account. AWS_SECRET_ACCESS_KEY : The secret key for your AWS account. AWS_SESSION_TOKEN : The session key for your AWS account. This is only needed when you are using temporary credentials. The AWS_SECURITY_TOKEN environment variable can also be used, but is only supported for backwards compatibility purposes. AWS_SESSION_TOKEN is supported by multiple AWS SDKs besides python.

create_vpc()

Create and configure VPC [TODO] Describe this a bit more …

get_instance_state(instances=None)

Get stateus of all instances on EC2 which were started by this file

read_state_file(statefile)

If this script has been run previously, it will be persisitent by writing resource ids to state file. On run, the script looks for a state file before creating new infrastructure

scale_in(size)

Scale cluster in (smaller)

show_summary()

Print human readable summary of current AWS state to log and to console

shut_down_instance(instances=None)

Shuts down a list of instances if provided or the last instance started up if none provided

[TODO] …

spin_up_instance(cmd_string, job_name)

Starts an instance in the vpc in first available subnet. Starts up n instances if nodes per block > 1 Not supported. We only do 1 node per block

Args:
  • cmd_string (str) : Command string to execute on the node
  • job_name (str) : Name associated with the instances
status(job_ids)

Get the status of a list of jobs identified by their ids.

Args:
  • job_ids (List of ids) : List of identifiers for the jobs
Returns:
  • List of status codes.
submit(cmd_string='sleep 1', blocksize=1, job_name='parsl.auto')

Submits the cmd_string onto a freshly instantiates AWS EC2 instance. Submit returns an ID that corresponds to the task that was just submitted.

Args:
  • cmd_string (str): Commandline invocation to be made on the remote side.
  • blocksize (int) : Number of blocks requested
Kwargs:
  • job_name (String): Prefix for job name
Returns:
  • None: At capacity, cannot provision more
  • job_id: (string) Identifier for the job
teardown()

Terminate all EC2 instances, delete all subnets, delete security group, delete vpc and reset all instance variables

Azure

class libsubmit.providers.azure.azureProvider.AzureProvider(config)
__init__(config)

Initialize Azure provider. Uses Azure python SDK to provide execution resources

cancel()

Destroy an azure VM

status()

Get status of azure VM. Not implemented yet.

submit()

Uses AzureDeployer to spin up an instance and connect it to the iPyParallel controller

Channels

For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance, some resources may allow access to their job schedulers from only their login-nodes, which require you to authenticate on through SSH, GSI-SSH and sometimes even require two-factor authentication. Channels are simple abstractions that enable the ExecutionProvider component to talk to the resource managers of compute facilities. The simplest Channel, LocalChannel, simply executes commands locally on a shell, while the SshChannel authenticates you to remote systems.

class libsubmit.channels.channel_base.Channel

Define the interface to all channels. Channels are usually called via the execute_wait function. For channels that execute remotely, a push_file function allows you to copy over files.

                      +------------------
                      |
cmd, wtime    ------->|  execute_wait
(ec, stdout, stderr)<-|---+
                      |
cmd, wtime    ------->|  execute_no_wait
(ec, stdout, stderr)<-|---+
                      |
src, dst_dir  ------->|  push_file
   dst_path  <--------|----+
                      |
dst_script_dir <------|  script_dir
                      |
                      +-------------------
close()

Closes the channel. Clean out any auth credentials.

Args:
None
Returns:
Bool
execute_no_wait(cmd, walltime, *args, **kwargs)

Optional. THis is infrequently used.

Args:
  • cmd (string): Command string to execute over the channel
  • walltime (int) : Timeout in seconds
Returns:
  • (exit_code(None), stdout, stderr) (int, io_thing, io_thing)
execute_wait(cmd, walltime, *args, **kwargs)

Executes the cmd, with a defined walltime.

Args:
  • cmd (string): Command string to execute over the channel
  • walltime (int) : Timeout in seconds
Returns:
  • (exit_code, stdout, stderr) (int, string, string)
push_file(source, dest_dir)

Channel will take care of moving the file from source to the destination directory

Args:
source (string) : Full filepath of the file to be moved dest_dir (string) : Absolute path of the directory to move to
Returns:
destination_path (string)
script_dir

This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.

Args:
  • None
Returns:
  • Channel script dir

LocalChannel

class libsubmit.channels.local.local.LocalChannel(userhome='.', envs={}, scriptDir='./.scripts', **kwargs)

This is not even really a channel, since opening a local shell is not heavy and done so infrequently that they do not need a persistent channel

__init__(userhome='.', envs={}, scriptDir='./.scripts', **kwargs)

Initialize the local channel. scriptDir is required by set to a default.

KwArgs:
  • userhome (string): (default=’.’) This is provided as a way to override and set a specific userhome
  • envs (dict) : A dictionary of env variables to be set when launching the shell
  • channel_script_dir (string): (default=”./.scripts”) Directory to place scripts
close()

There’s nothing to close here, and this really doesn’t do anything

Returns:
  • False, because it really did not “close” this channel.
execute_no_wait(cmd, walltime)

Synchronously execute a commandline string on the shell.

Args:
  • cmd (string) : Commandline string to execute
  • walltime (int) : walltime in seconds, this is not really used now.

Returns:

  • retcode : Return code from the execution, -1 on fail
  • stdout : stdout string
  • stderr : stderr string
Raises:
None.
execute_wait(cmd, walltime)

Synchronously execute a commandline string on the shell.

Args:
  • cmd (string) : Commandline string to execute
  • walltime (int) : walltime in seconds, this is not really used now.
Returns:
  • retcode : Return code from the execution, -1 on fail
  • stdout : stdout string
  • stderr : stderr string

Raises: None.

push_file(source, dest_dir)

If the source files dirpath is the same as dest_dir, a copy is not necessary, and nothing is done. Else a copy is made.

Args:
  • source (string) : Path to the source file
  • dest_dir (string) : Path to the directory to which the files is to be copied
Returns:
  • destination_path (String) : Absolute path of the destination file
Raises:
  • FileCopyException : If file copy failed.

SshChannel

class libsubmit.channels.ssh.ssh.SshChannel(hostname, username=None, password=None, scriptDir=None, **kwargs)

Ssh persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following test on the commandline should work :

>>> ssh <username>@<hostname>
__init__(hostname, username=None, password=None, scriptDir=None, **kwargs)

Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible

Args:
  • hostname (String) : Hostname
KWargs:
  • username (string) : Username on remote system
  • password (string) : Password for remote system
  • channel_script_dir (string) : Full path to a script dir where generated scripts could be sent to.

Raises:

execute_no_wait(cmd, walltime=2, envs={})

Execute asynchronousely without waiting for exitcode

Args:
  • cmd (string): Commandline string to be executed on the remote side
  • walltime (int): timeout to exec_command
KWargs:
  • envs (dict): A dictionary of env variables
Returns:
  • None, stdout (readable stream), stderr (readable stream)
Raises:
  • ChannelExecFailed (reason)
pull_file(remote_source, local_dir)

Transport file on the remote side to a local directory

Args:
  • remote_source (string): remote_source
  • local_dir (string): Local directory to copy to
Returns:
  • str: Local path to file
Raises:
  • FileExists : Name collision at local directory.
  • FileCopyException : FileCopy failed.
push_file(local_source, remote_dir)

Transport a local file to a directory on a remote machine

Args:
  • local_source (string): Path
  • remote_dir (string): Remote path
Returns:
  • str: Path to copied file on remote machine
Raises:
  • BadScriptPath : if script path on the remote side is bad
  • BadPermsScriptPath : You do not have perms to make the channel script dir
  • FileCopyException : FileCopy failed.

SshILChannel

class libsubmit.channels.ssh_il.ssh_il.SshILChannel(hostname, username=None, password=None, scriptDir=None, **kwargs)

Ssh persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following test on the commandline should work :

>>> ssh <username>@<hostname>
__init__(hostname, username=None, password=None, scriptDir=None, **kwargs)

Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible

Args:
  • hostname (String) : Hostname
KWargs:
  • username (string) : Username on remote system
  • password (string) : Password for remote system
  • channel_script_dir (string) : Full path to a script dir where generated scripts could be sent to.

Raises:

execute_no_wait(cmd, walltime=2, envs={})

Execute asynchronousely without waiting for exitcode

Args:
  • cmd (string): Commandline string to be executed on the remote side
  • walltime (int): timeout to exec_command
KWargs:
  • envs (dict): A dictionary of env variables
Returns:
  • None, stdout (readable stream), stderr (readable stream)
Raises:
  • ChannelExecFailed (reason)
pull_file(remote_source, local_dir)

Transport file on the remote side to a local directory

Args:
  • remote_source (string): remote_source
  • local_dir (string): Local directory to copy to
Returns:
  • str: Local path to file
Raises:
  • FileExists : Name collision at local directory.
  • FileCopyException : FileCopy failed.
push_file(local_source, remote_dir)

Transport a local file to a directory on a remote machine

Args:
  • local_source (string): Path
  • remote_dir (string): Remote path
Returns:
  • str: Path to copied file on remote machine
Raises:
  • BadScriptPath : if script path on the remote side is bad
  • BadPermsScriptPath : You do not have perms to make the channel script dir
  • FileCopyException : FileCopy failed.

Launchers

Launchers are basically wrappers for user submitted scripts as they are submitted to a specific execution resource.

singleNodeLauncher

libsubmit.launchers.singleNodeLauncher(cmd_string, taskBlocks, walltime=None)

Worker launcher that wraps the user’s cmd_string with the framework to launch multiple cmd_string invocations in parallel. This wrapper sets the bash env variable CORES to the number of cores on the machine. By setting taskBlocks to an integer or to a bash expression the number of invocations of the cmd_string to be launched can be controlled.

Args:
  • cmd_string (string): The command string to be launched
  • taskBlock (string) : bash evaluated string.
KWargs:
  • walltime (int) : This is not used by this launcher.

srunLauncher

libsubmit.launchers.srunLauncher(cmd_string, taskBlocks, walltime=None)

Worker launcher that wraps the user’s cmd_string with the SRUN launch framework to launch multiple cmd invocations in parallel on a single job allocation.

Args:
  • cmd_string (string): The command string to be launched
  • taskBlock (string) : bash evaluated string.
KWargs:
  • walltime (int) : This is not used by this launcher.

srunMpiLauncher

libsubmit.launchers.srunMpiLauncher(cmd_string, taskBlocks, walltime=None)

Worker launcher that wraps the user’s cmd_string with the SRUN launch framework to launch multiple cmd invocations in parallel on a single job allocation.

Args:
  • cmd_string (string): The command string to be launched
  • taskBlock (string) : bash evaluated string.
KWargs:
  • walltime (int) : This is not used by this launcher.