Developer Guide¶
Parsl is a Parallel Scripting Library, designed to enable efficient workflow execution.
Importing¶
To get all the required functionality, we suggest importing the library as follows:
>>> import parsl
>>> from parsl import *
Logging¶
Following the general logging philosophy of python libraries, by default Parsl doesn’t log anything. However the following helper functions are provided for logging:
- set_stream_logger
- This sets the logger to the StreamHandler. This is quite useful when working from a Jupyter notebook.
- set_file_logger
- This sets the logging to a file. This is ideal for reporting issues to the dev team.
Constants¶
- AUTO_LOGNAME
- Special value that indicates Parsl should construct a filename for logging.
-
parsl.
set_stream_logger
(name: str = 'parsl', level: int = 10, format_string: Optional[str] = None)[source] Add a stream log handler.
Parameters: - name (-) – Set the logger name.
- level (-) – Set to logging.DEBUG by default.
- format_string (-) – Set to None by default.
Returns: - None
-
parsl.
set_file_logger
(filename: str, name: str = 'parsl', level: int = 10, format_string: Optional[str] = None)[source] Add a stream log handler.
Parameters: - filename (-) – Name of the file to write logs to
- name (-) – Logger name
- level (-) – Set the logging level.
- format_string (-) – Set the format string
Returns: - None
Apps¶
Apps are parallelized functions that execute independent of the control flow of the main python interpreter. We have two main types of Apps: PythonApps and BashApps. These are subclassed from AppBase.
AppBase¶
This is the base class that defines the two external facing functions that an App must define. The __init__ (), which is called when the interpreter sees the definition of the decorated function, and the __call__ (), which is invoked when a decorated function is called by the user.
-
class
parsl.app.app.
AppBase
(func, data_flow_kernel=None, walltime=60, executors='all', cache=False)[source] This is the base class that defines the two external facing functions that an App must define.
The __init__ () which is called when the interpreter sees the definition of the decorated function, and the __call__ () which is invoked when a decorated function is called by the user.
PythonApp¶
Concrete subclass of AppBase that implements the Python App functionality.
Futures¶
Futures are returned as proxies to a parallel execution initiated by a call to an App
.
We have two kinds of futures in Parsl: AppFutures and DataFutures.
AppFutures¶
-
class
parsl.dataflow.futures.
AppFuture
(task_def)[source] An AppFuture wraps a sequence of Futures which may fail and be retried.
The AppFuture will wait for the DFK to provide a result from an appropriate parent future, through
parent_callback
. It will set its result to the result of that parent future, if that parent future completes without an exception. This result setting should cause .result(), .exception() and done callbacks to fire as expected.The AppFuture will not set its result to the result of the parent future, if that parent future completes with an exception, and if that parent future has retries left. In that case, no result(), exception() or done callbacks should report a result.
The AppFuture will set its result to the result of the parent future, if that parent future completes with an exception and if that parent future has no retries left, or if it has no retry field. .result(), .exception() and done callbacks should give a result as expected when a Future has a result set
The parent future may return a RemoteExceptionWrapper as a result and AppFuture will treat this an an exception for the above retry and result handling behaviour.
-
__init__
(task_def)[source] Initialize the AppFuture.
Args:
- KWargs:
- task_def : The DFK task definition dictionary for the task represented
- by this future.
-
__repr__
()[source] Return repr(self).
-
cancel
()[source] Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
-
cancelled
()[source] Return True if the future was cancelled.
-
parent_callback
(executor_fu)[source] Callback from a parent future to update the AppFuture.
Used internally by AppFuture, and should not be called by code using AppFuture.
Parameters: executor_fu (-) – Future returned by the executor along with callback. Returns: - None
Updates the future with the result() or exception()
-
DataFutures¶
-
class
parsl.app.futures.
DataFuture
(fut, file_obj, tid=None)[source] A datafuture points at an AppFuture.
We are simply wrapping a AppFuture, and adding the specific case where, if the future is resolved i.e file exists, then the DataFuture is assumed to be resolved.
-
__init__
(fut, file_obj, tid=None)[source] Construct the DataFuture object.
If the file_obj is a string convert to a File.
Parameters: - fut (-) – AppFuture that this DataFuture will track
- file_obj (-) – Something representing file(s)
- Kwargs:
- tid (task_id) : Task id that this DataFuture tracks
-
__repr__
()[source] Return repr(self).
-
cancel
()[source] Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
-
cancelled
()[source] Return True if the future was cancelled.
-
filename
[source] Filepath of the File object this datafuture represents.
-
filepath
[source] Filepath of the File object this datafuture represents.
-
parent_callback
(parent_fu)[source] Callback from executor future to update the parent.
Updates the future with the result (the File object) or the parent future’s exception.
Parameters: parent_fu (-) – Future returned by the executor along with callback Returns: - None
-
running
()[source] Return True if the future is currently executing.
-
tid
[source] Returns the task_id of the task that will resolve this DataFuture.
-
Exceptions¶
-
class
parsl.app.errors.
ParslError
[source]¶ Base class for all exceptions.
Only to be invoked when a more specific error is not available.
-
class
parsl.app.errors.
NotFutureError
[source]¶ A non future item was passed to a function that expected a future.
This is basically a type error.
-
class
parsl.app.errors.
InvalidAppTypeError
[source]¶ An invalid app type was requested from the @App decorator.
-
class
parsl.app.errors.
AppException
[source]¶ An error raised during execution of an app.
What this exception contains depends entirely on context
-
class
parsl.app.errors.
AppBadFormatting
[source]¶ An error raised during formatting of a bash function.
-
class
parsl.app.errors.
AppFailure
(reason, exitcode, retries=None)[source]¶ An error raised during execution of an app.
What this exception contains depends entirely on context Contains: reason(string) exitcode(int) retries(int/None)
-
class
parsl.app.errors.
MissingOutputs
(reason, outputs)[source]¶ Error raised at the end of app execution due to missing output files.
Contains: reason(string) outputs(List of strings/files..)
-
class
parsl.app.errors.
DependencyError
(dependent_exceptions, reason, outputs)[source]¶ Error raised at the end of app execution due to missing output files.
Contains: reason(string) outputs(List of strings/files..)
-
class
parsl.dataflow.error.
DataFlowException
[source]¶ Base class for all exceptions.
Only to be invoked when only a more specific error is not available.
DataFlowKernel¶
-
class
parsl.dataflow.dflow.
DataFlowKernel
(config=Config( app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, data_management_max_threads=10, executors=[ThreadPoolExecutor( label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None )], initialize_logging=True, lazy_errors=True, max_idletime=120.0, monitoring=None, retries=0, run_dir='runinfo', strategy='simple', usage_tracking=False ))[source]¶ The DataFlowKernel adds dependency awareness to an existing executor.
It is responsible for managing futures, such that when dependencies are resolved, pending tasks move to the runnable state.
Here is a simplified diagram of what happens internally:
User | DFK | Executor ---------------------------------------------------------- | | Task-------+> +Submit | App_Fu<------+--| | | Dependencies met | | task-------+--> +Submit | Ex_Fu<------+----|
-
__init__
(config=Config( app_cache=True, checkpoint_files=None, checkpoint_mode=None, checkpoint_period=None, data_management_max_threads=10, executors=[ThreadPoolExecutor( label='threads', managed=True, max_threads=2, storage_access=None, thread_name_prefix='', working_dir=None )], initialize_logging=True, lazy_errors=True, max_idletime=120.0, monitoring=None, retries=0, run_dir='runinfo', strategy='simple', usage_tracking=False ))[source]¶ Initialize the DataFlowKernel.
Parameters: config (Config) – A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation.
-
checkpoint
(tasks=None)[source]¶ Checkpoint the dfk incrementally to a checkpoint file.
When called, every task that has been completed yet not checkpointed is checkpointed to a file.
- Kwargs:
- tasks (List of task ids) : List of task ids to checkpoint. Default=None
- if set to None, we iterate over all tasks held by the DFK.
Note
Checkpointing only works if memoization is enabled
Returns: Checkpoint dir if checkpoints were written successfully. By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
-
cleanup
()[source]¶ DataFlowKernel cleanup.
This involves releasing all resources explicitly.
If the executors are managed by the DFK, then we call scale_in on each of the executors and call executor.shutdown. Otherwise, executor cleanup is left to the user.
-
config
[source]¶ Returns the fully initialized config that the DFK is actively using.
Returns: - config (dict)
-
handle_app_update
(task_id, future, memo_cbk=False)[source]¶ This function is called as a callback when an AppFuture is in its final state.
It will trigger post-app processing such as checkpointing.
Parameters: - task_id (string) – Task id
- future (Future) – The relevant app future (which should be consistent with the task structure ‘app_fu’ entry
- KWargs:
- memo_cbk(Bool) : Indicates that the call is coming from a memo update, that does not require additional memo updates.
-
handle_exec_update
(task_id, future)[source]¶ This function is called only as a callback from an execution attempt reaching a final state (either successfully or failing).
It will launch retries if necessary, and update the task structure.
Parameters: - task_id (string) – Task id which is a uuid string
- future (Future) – The future object corresponding to the task which
- this callback (makes) –
-
launch_if_ready
(task_id)[source]¶ launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state).
This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run.
It is not an error to call launch_if_ready on a task that is not ready to run - launch_if_ready will not incorrectly launch that task.
launch_if_ready is thread safe, so may be called from any thread or callback.
-
launch_task
(task_id, executable, *args, **kwargs)[source]¶ Handle the actual submission of the task to the executor layer.
If the app task has the executors attributes not set (default==’all’) the task is launched on a randomly selected executor from the list of executors. This behavior could later be updated to support binding to executors based on user specified criteria.
If the app task specifies a particular set of executors, it will be targeted at those specific executors.
Parameters: - task_id (uuid string) – A uuid string that uniquely identifies the task
- executable (callable) – A callable object
- args (list of positional args) –
- kwargs (arbitrary keyword arguments) –
Returns: Future that tracks the execution of the submitted executable
-
load_checkpoints
(checkpointDirs)[source]¶ Load checkpoints from the checkpoint files into a dictionary.
The results are used to pre-populate the memoizer’s lookup_table
- Kwargs:
- checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]
Returns: - dict containing, hashed -> future mappings
-
sanitize_and_wrap
(task_id, args, kwargs)[source]¶ This function should be called only when all the futures we track have been resolved.
If the user hid futures a level below, we will not catch it, and will (most likely) result in a type error.
Parameters: - task_id (uuid str) – Task id
- func (Function) – App function
- args (List) – Positional args to app function
- kwargs (Dict) – Kwargs to app function
Returns: partial function evaluated with all dependencies in args, kwargs and kwargs[‘inputs’] evaluated.
-
submit
(func, *args, executors='all', fn_hash=None, cache=False, **kwargs)[source]¶ Add task to the dataflow system.
If the app task has the executors attributes not set (default==’all’) the task will be launched on a randomly selected executor from the list of executors. If the app task specifies a particular set of executors, it will be targeted at the specified executors.
>>> IF all deps are met: >>> send to the runnable queue and launch the task >>> ELSE: >>> post the task in the pending queue
Parameters: - func (-) – A function object
- *args (-) –
Args to the function
- KWargs :
- executors (list or string) : List of executors this call could go to.
- Default=’all’
- fn_hash (Str) : Hash of the function and inputs
- Default=None
- cache (Bool) : To enable memoization or not
- kwargs (dict) : Rest of the kwargs to the fn passed as dict.
Returns: (AppFuture) [DataFutures,]
-
Executors¶
Executors are abstractions that represent available compute resources to which you could submit arbitrary App tasks. An executor initialized with an Execution Provider can dynamically scale with the resources requirements of the workflow.
We currently have thread pools for local execution, remote workers from ipyparallel for executing on high throughput systems such as campus clusters, and a Swift/T executor for HPC systems.
ParslExecutor (Abstract Base Class)¶
-
class
parsl.executors.base.
ParslExecutor
[source]¶ Define the strict interface for all Executor classes.
This is a metaclass that only enforces concrete implementations of functionality by the child classes.
In addition to the listed methods, a ParslExecutor instance must always have a member field:
- label: str - a human readable label for the executor, unique
- with respect to other executors.
An executor may optionally expose:
- storage_access: List[parsl.data_provider.staging.Staging] - a list of staging
providers that will be used for file staging. In the absence of this attribute, or if this attribute is
None
, then a default value ofparsl.data_provider.staging.default_staging
will be used by the staging code.Typechecker note: Ideally storage_access would be declared on executor __init__ methods as List[Staging] - however, lists are by default invariant, not co-variant, and it looks like @typeguard cannot be persuaded otherwise. So if you’re implementing an executor and want to @typeguard the constructor, you’ll have to use List[Any] here.
-
scale_in
(blocks: int) → None[source]¶ Scale in method.
Cause the executor to reduce the number of blocks by count.
We should have the scale in method simply take resource object which will have the scaling methods, scale_in itself should be a coroutine, since scaling tasks can be slow.
-
scale_out
(blocks: int) → None[source]¶ Scale out method.
We should have the scale out method simply take resource object which will have the scaling methods, scale_out itself should be a coroutine, since scaling tasks can be slow.
ThreadPoolExecutor¶
-
class
parsl.executors.threads.
ThreadPoolExecutor
(label: str = 'threads', max_threads: int = 2, thread_name_prefix: str = '', storage_access: List[Any] = None, working_dir: Optional[str] = None, managed: bool = True)[source]¶ A thread-based executor.
Parameters: - max_threads (int) – Number of threads. Default is 2.
- thread_name_prefix (string) – Thread name prefix (only supported in python v3.6+).
- storage_access (list of
Staging
) – Specifications for accessing data this executor remotely. - managed (bool) – If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise, this is managed by the user.
-
__init__
(label: str = 'threads', max_threads: int = 2, thread_name_prefix: str = '', storage_access: List[Any] = None, working_dir: Optional[str] = None, managed: bool = True)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
scale_in
(blocks)[source]¶ Scale in the number of active blocks by specified amount.
This method is not implemented for threads and will raise the error if called.
Raises: NotImplemented exception
-
scale_out
(workers=1)[source]¶ Scales out the number of active workers by 1.
This method is notImplemented for threads and will raise the error if called.
Raises: NotImplemented exception
-
scaling_enabled
[source]¶ Specify if scaling is enabled.
The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider
-
start
()[source]¶ Start the executor.
Any spin-up operations (for example: starting thread pools) should be performed here.
-
submit
(*args, **kwargs)[source]¶ Submits work to the thread pool.
This method is simply pass through and behaves like a submit call as described here Python docs:
IPyParallelExecutor¶
Warning
Deprecated as of v0.9.0
-
class
parsl.executors.ipp.
IPyParallelExecutor
(provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), label='ipp', working_dir=None, controller=Controller( interfaces=None, ipython_dir='~/.ipython', log=True, mode='auto', port=None, port_range=None, profile='default', public_ip=None, reuse=False ), container_image=None, engine_dir=None, storage_access=None, engine_debug_level=None, workers_per_node=1, managed=True)[source]¶ The IPython Parallel executor.
This executor uses IPythonParallel’s pilot execution system to manage multiple processes running locally or remotely.
Parameters: - provider (
ExecutionProvider
) – Provider to access computation resources. Can be one ofEC2Provider
,Cobalt
,Condor
,GoogleCloud
,GridEngine
,Jetstream
,Local
,GridEngine
,Slurm
, orTorque
. - label (str) – Label for this executor instance.
- controller (
Controller
) – Which Controller instance to use. Default isController()
. - workers_per_node (int) – Number of workers to be launched per node. Default=1
- container_image (str) – Launch tasks in a container using this docker image. If set to None, no container is used. Default is None.
- engine_dir (str) – Directory where engine logs and configuration files will be stored.
- working_dir (str) – Directory where input data should be staged to.
- storage_access (list of
Staging
) – Specifications for accessing data this executor remotely. - managed (bool) – If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise, this is managed by the user.
- engine_debug_level (int | str) – Sets engine logging to specified debug level. Choices: (0, 10, 20, 30, 40, 50, ‘DEBUG’, ‘INFO’, ‘WARN’, ‘ERROR’, ‘CRITICAL’)
- :param .. note:::
Some deficiencies with this executor are:
- Ipengines execute one task at a time. This means one engine per core is necessary to exploit the full parallelism of a node.
- No notion of remaining walltime.
- Lack of throttling means tasks could be queued up on a worker.
-
__init__
(provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), label='ipp', working_dir=None, controller=Controller( interfaces=None, ipython_dir='~/.ipython', log=True, mode='auto', port=None, port_range=None, profile='default', public_ip=None, reuse=False ), container_image=None, engine_dir=None, storage_access=None, engine_debug_level=None, workers_per_node=1, managed=True)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
compose_launch_cmd
(filepath, engine_dir, container_image)[source]¶ Reads the json contents from filepath and uses that to compose the engine launch command.
Parameters: - filepath – Path to the engine file
- engine_dir – CWD for the engines
-
scale_out
(blocks=1)[source]¶ Scales out the number of active workers by 1.
This method is notImplemented for threads and will raise the error if called.
Parameters: blocks – int Number of blocks to be provisioned.
-
scaling_enabled
[source]¶ Specify if scaling is enabled.
The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider
-
start
()[source]¶ Start the executor.
Any spin-up operations (for example: starting thread pools) should be performed here.
-
submit
(*args, **kwargs)[source]¶ Submits work to the thread pool.
This method is simply pass through and behaves like a submit call as described here Python docs:
Returns: Future
- provider (
HighThroughputExecutor¶
-
class
parsl.executors.
HighThroughputExecutor
(label: str = 'HighThroughputExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd: Optional[str] = None, address: str = '127.0.0.1', worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000), storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, working_dir: Optional[str] = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: Optional[float] = None, max_workers: Union[int, float] = inf, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, poll_period: int = 10, suppress_failure: bool = True, managed: bool = True, worker_logdir_root: Optional[str] = None)[source]¶ Executor designed for cluster-scale
- The HighThroughputExecutor system has the following components:
- The HighThroughputExecutor instance which is run as part of the Parsl script.
- The Interchange which is acts as a load-balancing proxy between workers and Parsl
- The multiprocessing based worker pool which coordinates task execution over several cores on a node.
- ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool
Here is a diagram
| Data | Executor | Interchange | External Process(es) | Flow | | | Task | Kernel | | | +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool | | | | batching | | | Parsl<---Fut-| | | load-balancing| result exception ^ | | | watchdogs | | | | | | Q_mngmnt | | V V | | | Thread<--|-incoming_q<---|--- +---------+ | | | | | | | | | | | | +----update_fut-----+
Each of the workers in each process_worker_pool has access to its local rank through an environmental variable,
PARSL_WORKER_RANK
. The local rank is unique for each process and is an integer in the range from 0 to the number of workers per in the pool minus 1. The workers also have access to the ID of the worker pool asPARSL_WORKER_POOL_ID
and the size of the worker pool asPARSL_WORKER_COUNT
.Parameters: - provider (
ExecutionProvider
) –- Provider to access computation resources. Can be one of
EC2Provider
, Cobalt
,Condor
,GoogleCloud
,GridEngine
,Jetstream
,Local
,GridEngine
,Slurm
, orTorque
.
- Provider to access computation resources. Can be one of
- label (str) – Label for this executor instance.
- launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”
- address (string) – An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This can be either a hostname as returned by
hostname
or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. Some trial and error might be necessary to identify what addresses are reachable from compute nodes. - worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.
- worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.
- interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.
- working_dir (str) – Working dir to be used by the executor.
- worker_debug (Bool) – Enables worker debug logging.
- managed (Bool) – If this executor is managed by the DFK or externally handled.
- cores_per_worker (float) – cores to be assigned to each worker. Oversubscription is possible by setting cores_per_worker < 1.0. Default=1
- mem_per_worker (float) – GB of memory required per worker. If this option is specified, the node manager will check the available memory at startup and limit the number of workers such that the there’s sufficient memory for each worker. Default: None
- max_workers (int) – Caps the number of workers launched by the manager. Default: infinity
- prefetch_capacity (int) – Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0.
- suppress_failure (Bool) – If set, the interchange will suppress failures rather than terminate early. Default: True
- heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default: 120s
- heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default: 30s
- poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms
- worker_logdir_root (string) – In case of a remote file system, specify the path to where logs will be kept.
-
__init__
(label: str = 'HighThroughputExecutor', provider: parsl.providers.provider_base.ExecutionProvider = LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd: Optional[str] = None, address: str = '127.0.0.1', worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000), storage_access: Optional[List[parsl.data_provider.staging.Staging]] = None, working_dir: Optional[str] = None, worker_debug: bool = False, cores_per_worker: float = 1.0, mem_per_worker: Optional[float] = None, max_workers: Union[int, float] = inf, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, poll_period: int = 10, suppress_failure: bool = True, managed: bool = True, worker_logdir_root: Optional[str] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_start_local_queue_process
()[source]¶ Starts the interchange process locally
Starts the interchange process locally and uses an internal command queue to get the worker task and result ports that the interchange has bound to.
-
_start_queue_management_thread
()[source]¶ Method to start the management thread as a daemon.
Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.
-
hold_worker
(worker_id)[source]¶ Puts a worker on hold, preventing scheduling of additional tasks to it.
This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.
Parameters: worker_id (str) – Worker id to be put on hold
-
scale_in
(blocks=None, block_ids=[])[source]¶ Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530
Parameters:
-
scale_out
(blocks=1)[source]¶ Scales out the number of blocks by “blocks”
Raises: NotImplementedError
-
scaling_enabled
[source]¶ Specify if scaling is enabled.
The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider
-
submit
(func, *args, **kwargs)[source]¶ Submits work to the the outgoing_q.
The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:
Parameters: - func (-) – Callable function
- *args (-) –
List of arbitrary positional arguments.
- Kwargs:
- **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns: Future
WorkQueueExecutor¶
-
class
parsl.executors.
WorkQueueExecutor
(label='WorkQueueExecutor', working_dir='.', managed=True, project_name=None, project_password=None, project_password_file=None, port=0, env=None, shared_fs=False, source=False, init_command='', full_debug=True, see_worker_output=False)[source]¶ Executor to use Work Queue batch system
The WorkQueueExecutor system utilizes the Work Queue framework to efficiently delegate Parsl apps to remote machines in clusters and grids using a fault-tolerant system. Users can run the work_queue_worker program on remote machines to connect to the WorkQueueExecutor, and Parsl apps will then be sent out to these machines for execution and retrieval.
- label: str
- A human readable label for the executor, unique with respect to other Work Queue master programs
- working_dir: str
- Location for Parsl to perform app delegation to the Work Queue system
- managed: bool
- If this executor is managed by the DFK or externally handled
- project_name: str
- Work Queue process name
- project_password: str
- Optional password for the Work Queue project
- project_password_file: str
- Optional password file for the work queue project
- port: int
- TCP port on Parsl submission machine for Work Queue workers to connect to. Workers will specify this port number when trying to connect to Parsl
- env: dict{str}
- Dictionary that contains the environmental variables that need to be set on the Work Queue worker machine
- shared_fs: bool
- Define if working in a shared file system or not. If Parsl and the Work Queue workers are on a shared file system, Work Queue does not need to transfer and rename files for execution
- source: bool
- Choose whether to transfer parsl app information as source code. (Note: this increases throughput for @python_apps, but the implementation does not include functionality for @bash_apps, and thus source=False must be used for programs utilizing @bash_apps.)
- init_command: str
- Command to run before constructed Work Queue command
- see_worker_output: bool
- Prints worker standard output if true
-
__init__
(label='WorkQueueExecutor', working_dir='.', managed=True, project_name=None, project_password=None, project_password_file=None, port=0, env=None, shared_fs=False, source=False, init_command='', full_debug=True, see_worker_output=False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
create_name_tuple
(parsl_file_obj, in_or_out)[source]¶ Returns a tuple containing information about an input or output file to a Parsl app. Utilized to specify input and output files for a specific Work Queue task within the system.
Parameters:
-
create_new_name
(file_name)[source]¶ Returns a unique file name for an input file name. If the file name is already unique with respect to the Parsl process, then it returns the original file name
Parameters: file_name (str) – Name of file that needs to be unique
-
shutdown
(*args, **kwargs)[source]¶ Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission.
-
start
()[source]¶ Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system.
-
submit
(func, *args, **kwargs)[source]¶ Processes the Parsl app by its arguments and submits the function information to the task queue, to be executed using the Work Queue system. The args and kwargs are processed for input and output files to the Parsl app, so that the files are appropriately specified for the Work Queue task.
Parameters:
ExtremeScaleExecutor¶
-
class
parsl.executors.
ExtremeScaleExecutor
(label='ExtremeScaleExecutor', provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, ranks_per_node=1, heartbeat_threshold=120, heartbeat_period=30, managed=True)[source]¶ Executor designed for leadership class supercomputer scale
The ExtremeScaleExecutor extends the Executor interface to enable task execution on supercomputing systems (>1K Nodes). When functions and their arguments are submitted to the interface, a future is returned that tracks the execution of the function on a distributed compute environment.
- The ExtremeScaleExecutor system has the following components:
- The ExtremeScaleExecutor instance which is run as part of the Parsl script
- The Interchange which is acts as a load-balancing proxy between workers and Parsl
- The MPI based mpi_worker_pool which coordinates task execution over several nodes With MPI communication between workers, we can exploit low latency networking on HPC systems.
- ZeroMQ pipes that connect the ExtremeScaleExecutor, Interchange and the mpi_worker_pool
Our design assumes that there is a single MPI application (mpi_worker_pool) running over a
block
and that there might be several such instances.Here is a diagram
| Data | Executor | Interchange | External Process(es) | Flow | | | Task | Kernel | | | +----->|-------->|------------>|->outgoing_q---|-> mpi_worker_pool | | | | batching | | | Parsl<---Fut-| | | load-balancing| result exception ^ | | | watchdogs | | | | | | Q_mngmnt | | V V | | | Thread<--|-incoming_q<---|--- +---------+ | | | | | | | | | | | | +----update_fut-----+
Parameters: - provider (
ExecutionProvider
) –- Provider to access computation resources. Can be any providers in
parsl.providers
: Cobalt
,Condor
,GoogleCloud
,GridEngine
,Jetstream
,Local
,GridEngine
,Slurm
, orTorque
.
- Provider to access computation resources. Can be any providers in
- label (str) – Label for this executor instance.
- launch_cmd (str) – Command line string to launch the mpi_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, ranks_per_node, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd=”mpiexec -np {ranks_per_node} mpi_worker_pool.py {debug} –task_url={task_url} –result_url={result_url}”
- address (string) – An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This can be either a hostname as returned by
hostname
or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. Some trial and error might be necessary to identify what addresses are reachable from compute nodes. - worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.
- worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.
- interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.
- working_dir (str) – Working dir to be used by the executor.
- worker_debug (Bool) – Enables engine debug logging.
- managed (Bool) – If this executor is managed by the DFK or externally handled.
- ranks_per_node (int) – Specify the ranks to be launched per node.
- heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default:120s
- heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default:30s
-
__init__
(label='ExtremeScaleExecutor', provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, ranks_per_node=1, heartbeat_threshold=120, heartbeat_period=30, managed=True)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_start_local_queue_process
()[source]¶ Starts the interchange process locally
Starts the interchange process locally and uses an internal command queue to get the worker task and result ports that the interchange has bound to.
-
_start_queue_management_thread
()[source]¶ Method to start the management thread as a daemon.
Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.
-
hold_worker
(worker_id)[source]¶ Puts a worker on hold, preventing scheduling of additional tasks to it.
This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.
Parameters: worker_id (str) – Worker id to be put on hold
-
scale_in
(blocks=None, block_ids=[])[source]¶ Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530
Parameters:
-
scale_out
(blocks=1)[source]¶ Scales out the number of blocks by “blocks”
Raises: NotImplementedError
-
scaling_enabled
[source]¶ Specify if scaling is enabled.
The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider
-
submit
(func, *args, **kwargs)[source]¶ Submits work to the the outgoing_q.
The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:
Parameters: - func (-) – Callable function
- *args (-) –
List of arbitrary positional arguments.
- Kwargs:
- **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns: Future
Swift/Turbine Executor¶
-
class
parsl.executors.swift_t.
TurbineExecutor
(label='turbine', storage_access=None, working_dir=None, managed=True)[source]¶ The Turbine executor.
Bypass the Swift/T language and run on top off the Turbine engines in an MPI environment.
Here is a diagram
| Data | Executor | IPC | External Process(es) | Flow | | | Task | Kernel | | | +----->|-------->|------------>|outgoing_q -|-> Worker_Process | | | | | | | Parsl<---Fut-| | | | result exception ^ | | | | | | | | | Q_mngmnt | | V V | | | Thread<--|incoming_q<-|--- +---------+ | | | | | | | | | | | | +----update_fut-----+
-
__init__
(label='turbine', storage_access=None, working_dir=None, managed=True)[source]¶ Initialize the thread pool.
Trying to implement the emews model.
-
_queue_management_worker
()[source]¶ Listen to the queue for task status messages and handle them.
Depending on the message, tasks will be updated with results, exceptions, or updates. It expects the following messages:
{ "task_id" : <task_id> "result" : serialized result object, if task succeeded ... more tags could be added later } { "task_id" : <task_id> "exception" : serialized exception object, on failure }
We do not support these yet, but they could be added easily.
{ "task_id" : <task_id> "cpu_stat" : <> "mem_stat" : <> "io_stat" : <> "started" : tstamp }
The
None
message is a die request.
-
_start_queue_management_thread
()[source]¶ Method to start the management thread as a daemon.
Checks if a thread already exists, then starts it. Could be used later as a restart if the management thread dies.
-
scale_in
(blocks)[source]¶ Scale in the number of active blocks by specified amount.
This method is not implemented for turbine and will raise an error if called.
Raises: NotImplementedError
-
scale_out
(blocks=1)[source]¶ Scales out the number of active workers by 1.
This method is not implemented for threads and will raise the error if called. This would be nice to have, and can be done
Raises: NotImplementedError
-
submit
(func, *args, **kwargs)[source]¶ Submits work to the the outgoing_q.
The outgoing_q is an external process listens on this queue for new work. This method is simply pass through and behaves like a submit call as described here Python docs:
Parameters: - func (-) – Callable function
- *args (-) –
List of arbitrary positional arguments.
- Kwargs:
- **kwargs (dict) : A dictionary of arbitrary keyword args for func.
Returns: Future
-
-
parsl.executors.swift_t.
runner
(incoming_q, outgoing_q)[source]¶ This is a function that mocks the Swift-T side.
It listens on the the incoming_q for tasks and posts returns on the outgoing_q.
Parameters: - incoming_q (-) – The queue to listen on
- outgoing_q (-) – Queue to post results on
The messages posted on the incoming_q will be of the form :
{ "task_id" : <uuid.uuid4 string>, "buffer" : serialized buffer containing the fn, args and kwargs }
If
None
is received, the runner will exit.Response messages should be of the form:
{ "task_id" : <uuid.uuid4 string>, "result" : serialized buffer containing result "exception" : serialized exception object }
On exiting the runner will post
None
to the outgoing_q
Execution Providers¶
Execution providers are responsible for managing execution resources that have a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have LRMs (schedulers) such as Slurm, Torque/PBS, Condor and Cobalt. Clouds, on the other hand, have API interfaces that allow much more fine-grained composition of an execution environment. An execution provider abstracts these types of resources and provides a single uniform interface to them.
ExecutionProvider (Base)¶
-
class
parsl.providers.provider_base.
ExecutionProvider
[source]¶ Define the strict interface for all Execution Providers
+------------------ | script_string ------->| submit id <--------|---+ | [ ids ] ------->| status [statuses] <--------|----+ | [ ids ] ------->| cancel [cancel] <--------|----+ | +-------------------
-
cancel
(job_ids: List[Any]) → List[bool][source]¶ Cancels the resources identified by the job_ids provided by the user.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from cancelling the job which can be True, False
Raises: - ExecutionProviderException or its subclasses
-
cores_per_node
[source]¶ Number of cores to provision per node.
Providers which set this property should ask for cores_per_node cores when provisioning resources, and set the corresponding environment variable PARSL_CORES before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can run concurrently per node. This information is used by dataflow.Strategy to estimate the resources required to run all outstanding tasks.
-
mem_per_node
[source]¶ Real memory to provision per node in GB.
Providers which set this property should ask for mem_per_node of memory when provisioning resources, and set the corresponding environment variable PARSL_MEMORY_GB before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can run concurrently per node. This information is used by dataflow.Strategy to estimate the resources required to run all outstanding tasks.
-
status
(job_ids: List[Any]) → List[str][source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command: str, tasks_per_node: int, job_name: str = 'parsl.auto') → Any[source]¶ The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).
- Args :
- command (str) : The bash command string to be executed
- tasks_per_node (int) : command invocations to be launched per node
- KWargs:
- job_name (str) : Human friendly name to be assigned to the job request
Returns: A job identifier, this could be an integer, string etc or None or any other object that evaluates to boolean false
if submission failed but an exception isn’t thrown.
Raises: - ExecutionProviderException or its subclasses
-
Local¶
-
class
parsl.providers.
LocalProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, launcher=SingleNodeLauncher(), init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', worker_init='', cmd_timeout=30, parallelism=1, move_files=None)[source]¶ Local Execution Provider
This provider is used to provide execution resources from the localhost.
Parameters: - min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- move_files (Optional[Bool]: should files be moved? by default, Parsl will try to figure) – this out itself (= None). If True, then will always move. If False, will never move.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, launcher=SingleNodeLauncher(), init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', worker_init='', cmd_timeout=30, parallelism=1, move_files=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by their ids.
Parameters: job_ids (-) – List of identifiers for the jobs Returns: - List of status codes.
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ Submits the command onto an Local Resource Manager job. Submit returns an ID that corresponds to the task that was just submitted.
- If tasks_per_node < 1:
- 1/tasks_per_node is provisioned
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
Slurm¶
-
class
parsl.providers.
SlurmProvider
(partition, channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, cores_per_node=None, mem_per_node=None, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', cmd_timeout=10, exclusive=True, move_files=True, launcher=SingleNodeLauncher())[source]¶ Slurm Execution Provider
This provider uses sbatch to submit, squeue for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.
Parameters: - partition (str) – Slurm partition to request blocks from.
- channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- cores_per_node (int) – Specify the number of cores to provision per node. If set to None, executors will assume all cores on the node are available for computation. Default is None.
- mem_per_node (float) – Specify the real memory to provision per node in GB. If set to None, no explicit request to the scheduler will be made. Default is None.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #SBATCH blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- exclusive (bool (Default = True)) – Requests nodes which are not shared with other running jobs.
- launcher (Launcher) –
- Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),SrunLauncher
, orAprunLauncher
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
-
__init__
(partition, channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, cores_per_node=None, mem_per_node=None, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', cmd_timeout=10, exclusive=True, move_files=True, launcher=SingleNodeLauncher())[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
Cobalt¶
-
class
parsl.providers.
CobaltProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, scheduler_options='', worker_init='', launcher=AprunLauncher(overrides=''), cmd_timeout=10)[source]¶ Cobalt Execution Provider
This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) jobs. Theo script to be used is created from a template file in this same module.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- account (str) – Account that the job will be charged against.
- queue (str) – Torque queue to request blocks from.
- scheduler_options (str) – String to prepend to the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
AprunLauncher
(the default) or,SingleNodeLauncher
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, scheduler_options='', worker_init='', launcher=AprunLauncher(overrides=''), cmd_timeout=10)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ Submits the command onto an Local Resource Manager job of parallel elements. Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node number of nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
- channel (Channel) – Channel for accessing this provider. Possible channels include
Condor¶
-
class
parsl.providers.
CondorProvider
(channel: parsl.channels.base.Channel = LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block: int = 1, cores_per_slot: Optional[int] = None, mem_per_slot: Optional[float] = None, init_blocks: int = 1, min_blocks: int = 0, max_blocks: int = 10, parallelism: float = 1, environment: Optional[Dict[str, str]] = None, project: str = '', scheduler_options: str = '', transfer_input_files: List[str] = [], walltime: str = '00:10:00', worker_init: str = '', launcher: parsl.launchers.launchers.Launcher = SingleNodeLauncher(), requirements: str = '', cmd_timeout: int = 60)[source]¶ HTCondor Execution Provider.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- cores_per_slot (int) – Specify the number of cores to provision per slot. If set to None, executors will assume all cores on the node are available for computation. Default is None.
- mem_per_slot (float) – Specify the real memory to provision per slot in GB. If set to None, no explicit request to the scheduler will be made. Default is None.
- init_blocks (int) – Number of blocks to provision at time of initialization
- min_blocks (int) – Minimum number of blocks to maintain
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- environment (dict of str) – A dictionary of environmant variable name and value pairs which will be set before running a task.
- project (str) – Project which the job will be charged against
- scheduler_options (str) – String to add specific condor attributes to the HTCondor submit script.
- transfer_input_files (list(str)) – List of strings of paths to additional files or directories to transfer to the job
- worker_init (str) – Command to be run before starting a worker.
- requirements (str) – Condor requirements.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default), - cmd_timeout (int) – Timeout for commands made to the scheduler in seconds
-
__init__
(channel: parsl.channels.base.Channel = LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block: int = 1, cores_per_slot: Optional[int] = None, mem_per_slot: Optional[float] = None, init_blocks: int = 1, min_blocks: int = 0, max_blocks: int = 10, parallelism: float = 1, environment: Optional[Dict[str, str]] = None, project: str = '', scheduler_options: str = '', transfer_input_files: List[str] = [], walltime: str = '00:10:00', worker_init: str = '', launcher: parsl.launchers.launchers.Launcher = SingleNodeLauncher(), requirements: str = '', cmd_timeout: int = 60) → None[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job IDs.
Parameters: job_ids (list of str) – The job IDs to cancel. Returns: Each entry in the list will be True if the job is cancelled succesfully, otherwise False. Return type: list of bool
-
current_capacity
[source]¶ Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by their ids.
Parameters: job_ids (list of int) – Identifiers of jobs for which the status will be returned. Returns: Status codes for the requested jobs. Return type: List of int
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ Submits the command onto an Local Resource Manager job.
- example file with the complex case of multiple submits per job:
- Universe =vanilla output = out.$(Cluster).$(Process) error = err.$(Cluster).$(Process) log = log.$(Cluster) leave_in_queue = true executable = test.sh queue 5 executable = foo queue 1
$ condor_submit test.sub Submitting job(s)…… 5 job(s) submitted to cluster 118907. 1 job(s) submitted to cluster 118908.
Parameters: Returns: None if at capacity and cannot provision more; otherwise the identifier for the job.
Return type:
- channel (Channel) – Channel for accessing this provider. Possible channels include
Torque¶
-
class
parsl.providers.
TorqueProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), account=None, queue=None, scheduler_options='', worker_init='', nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=AprunLauncher(overrides=''), walltime='00:20:00', cmd_timeout=120)[source]¶ Torque Execution Provider
This provider uses sbatch to submit, squeue for status, and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - account (str) – Account the job will be charged against.
- queue (str) – Torque queue to request blocks from.
- nodes_per_block (int) – Nodes to provision per block.
- init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
- min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #PBS blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
AprunLauncher
(the default), orSingleNodeLauncher
,
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), account=None, queue=None, scheduler_options='', worker_init='', nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=AprunLauncher(overrides=''), walltime='00:20:00', cmd_timeout=120)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ Submits the command onto an Local Resource Manager job. Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node number of nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
- channel (Channel) – Channel for accessing this provider. Possible channels include
GridEngine¶
-
class
parsl.providers.
GridEngineProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', launcher=SingleNodeLauncher())[source]¶ A provider for the Grid Engine scheduler.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #$$ blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', launcher=SingleNodeLauncher())[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the resources identified by the job_ids provided by the user.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from cancelling the job which can be True, False
Raises: - ExecutionProviderException or its subclasses
-
get_configs
(command, tasks_per_node)[source]¶ Compose a dictionary with information for writing the submit script.
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).
- Args :
- command (str) : The bash command string to be executed.
- tasks_per_node (int) : command invocations to be launched per node
- KWargs:
- job_name (str) : Human friendly name to be assigned to the job request
Returns: - A job identifier, this could be an integer, string etc
Raises: - ExecutionProviderException or its subclasses
- channel (Channel) – Channel for accessing this provider. Possible channels include
Amazon Web Services¶
-
class
parsl.providers.
AWSProvider
(image_id, key_name, init_blocks=1, min_blocks=0, max_blocks=10, nodes_per_block=1, parallelism=1, worker_init='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', linger=False, launcher=SingleNodeLauncher())[source]¶ A provider for using Amazon Elastic Compute Cloud (EC2) resources.
One of 3 methods are required to authenticate: keyfile, profile or environment variables. If neither keyfile or profile are set, the following environment variables must be set:
AWS_ACCESS_KEY_ID
(the access key for your AWS account),AWS_SECRET_ACCESS_KEY
(the secret key for your AWS account), and (optionaly) theAWS_SESSION_TOKEN
(the session key for your AWS account).Parameters: - image_id (str) – Identification of the Amazon Machine Image (AMI).
- worker_init (str) – String to append to the Userdata script executed in the cloudinit phase of instance initialization.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- key_file (str) – Path to json file that contains ‘AWSAccessKeyId’ and ‘AWSSecretKey’.
- nodes_per_block (int) – This is always 1 for ec2. Nodes to provision per block.
- profile (str) – Profile to be used from the standard aws config file ~/.aws/config.
- nodes_per_block – Nodes to provision per block. Default is 1.
- init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
- min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
- max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
- instance_type (str) – EC2 instance type. Instance types comprise varying combinations of CPU, memory, . storage, and networking capacity For more information on possible instance types,. see here Default is ‘t2.small’.
- region (str) – Amazon Web Service (AWS) region to launch machines. Default is ‘us-east-2’.
- key_name (str) – Name of the AWS private key (.pem file) that is usually generated on the console to allow SSH access to the EC2 instances. This is mostly used for debugging.
- spot_max_bid (float) – Maximum bid price (if requesting spot market machines).
- iam_instance_profile_arn (str) – Launch instance with a specific role.
- state_file (str) – Path to the state file from a previous run to re-use.
- walltime – Walltime requested per block in HH:MM:SS. This option is not currently honored by this provider.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),SrunLauncher
, orAprunLauncher
- linger (Bool) – When set to True, the workers will not
halt
. The user is responsible for shutting down the node.
-
__init__
(image_id, key_name, init_blocks=1, min_blocks=0, max_blocks=10, nodes_per_block=1, parallelism=1, worker_init='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', linger=False, launcher=SingleNodeLauncher())[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancel the jobs specified by a list of job ids.
Parameters: job_ids (list of str) – List of of job identifiers Returns: Each entry in the list will contain False if the operation fails. Otherwise, the entry will be True. Return type: list of bool
-
config_route_table
(vpc, internet_gateway)[source]¶ Configure route table for Virtual Private Cloud (VPC).
Parameters:
-
create_session
()[source]¶ Create a session.
First we look in self.key_file for a path to a json file with the credentials. The key file should have ‘AWSAccessKeyId’ and ‘AWSSecretKey’.
Next we look at self.profile for a profile name and try to use the Session call to automatically pick up the keys for the profile from the user default keys file ~/.aws/config.
Finally, boto3 will look for the keys in environment variables: AWS_ACCESS_KEY_ID: The access key for your AWS account. AWS_SECRET_ACCESS_KEY: The secret key for your AWS account. AWS_SESSION_TOKEN: The session key for your AWS account. This is only needed when you are using temporary credentials. The AWS_SECURITY_TOKEN environment variable can also be used, but is only supported for backwards compatibility purposes. AWS_SESSION_TOKEN is supported by multiple AWS SDKs besides python.
-
create_vpc
()[source]¶ Create and configure VPC
We create a VPC with CIDR 10.0.0.0/16, which provides up to 64,000 instances.
We attach a subnet for each availability zone within the region specified in the config. We give each subnet an ip range like 10.0.X.0/20, which is large enough for approx. 4000 instances.
Security groups are configured in function security_group.
-
get_instance_state
(instances=None)[source]¶ Get states of all instances on EC2 which were started by this file.
-
read_state_file
(state_file)[source]¶ Read the state file, if it exists.
If this script has been run previously, resource IDs will have been written to a state file. On starting a run, a state file will be looked for before creating new infrastructure. Information on VPCs, security groups, and subnets are saved, as well as running instances and their states.
AWS has a maximum number of VPCs per region per account, so we do not want to clutter users’ AWS accounts with security groups and VPCs that will be used only once.
-
security_group
(vpc)[source]¶ Create and configure a new security group.
Allows all ICMP in, all TCP and UDP in within VPC.
This security group is very open. It allows all incoming ping requests on all ports. It also allows all outgoing traffic on all ports. This can be limited by changing the allowed port ranges.
Parameters: vpc (VPC instance) – VPC in which to set up security group.
-
shut_down_instance
(instances=None)[source]¶ Shut down a list of instances, if provided.
If no instance is provided, the last instance started up will be shut down.
-
spin_up_instance
(command, job_name)[source]¶ Start an instance in the VPC in the first available subnet.
N instances will be started if nodes_per_block > 1. Not supported. We only do 1 node per block.
Parameters:
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by their ids.
Parameters: job_ids (list of str) – Identifiers for the jobs. Returns: The status codes of the requsted jobs. Return type: list of int
-
submit
(command='sleep 1', tasks_per_node=1, job_name='parsl.auto')[source]¶ Submit the command onto a freshly instantiated AWS EC2 instance.
Submit returns an ID that corresponds to the task that was just submitted.
Parameters: Returns: If at capacity, None will be returned. Otherwise, the job identifier will be returned.
Return type:
Google Cloud Platform¶
-
class
parsl.providers.
GoogleCloudProvider
(project_id, key_file, region, os_project, os_family, google_version='v1', instance_type='n1-standard-1', init_blocks=1, min_blocks=0, max_blocks=10, launcher=SingleNodeLauncher(), parallelism=1)[source]¶ A provider for using resources from the Google Compute Engine.
Parameters: - project_id (str) – Project ID from Google compute engine.
- key_file (str) – Path to authorization private key json file. This is required for auth. A new one can be generated here: https://console.cloud.google.com/apis/credentials
- region (str) – Region in which to start instances
- os_project (str) – OS project code for Google compute engine.
- os_family (str) – OS family to request.
- google_version (str) – Google compute engine version to use. Possibilies include ‘v1’ (default) or ‘beta’.
- instance_type (str) – ‘n1-standard-1’,
- init_blocks (int) – Number of blocks to provision immediately. Default is 1.
- min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
- max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- :param .. code:: python: +——————
- script_string ——->| submit
- id <——–|—+
[ ids ] ——->| status [statuses] <——–|—-+
[ ids ] ——->| cancel [cancel] <——–|—-+
+——————-
-
__init__
(project_id, key_file, region, os_project, os_family, google_version='v1', instance_type='n1-standard-1', init_blocks=1, min_blocks=0, max_blocks=10, launcher=SingleNodeLauncher(), parallelism=1)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the resources identified by the job_ids provided by the user.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from cancelling the job which can be True, False
Raises: - ExecutionProviderException or its subclasses
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source]¶ The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot.
- Args :
- command (str) : The bash command string to be executed.
- tasks_per_node (int) : command invocations to be launched per node
- KWargs:
- job_name (str) : Human friendly name to be assigned to the job request
Returns: - A job identifier, this could be an integer, string etc
Raises: - ExecutionProviderException or its subclasses
Kubernetes¶
-
class
parsl.providers.
KubernetesProvider
(image: str, namespace: str = 'default', nodes_per_block: int = 1, init_blocks: int = 4, min_blocks: int = 0, max_blocks: int = 10, max_cpu: float = 2, max_mem: str = '500Mi', init_cpu: float = 1, init_mem: str = '250Mi', parallelism: float = 1, worker_init: str = '', pod_name: Optional[str] = None, user_id: Optional[str] = None, group_id: Optional[str] = None, run_as_non_root: bool = False, secret: Optional[str] = None, persistent_volumes: List[Tuple[str, str]] = [])[source]¶ Kubernetes execution provider :param namespace: Kubernetes namespace to create deployments. :type namespace: str :param image: Docker image to use in the deployment. :type image: str :param nodes_per_block: Nodes to provision per block. :type nodes_per_block: int :param init_blocks: Number of blocks to provision at the start of the run. Default is 1. :type init_blocks: int :param min_blocks: Minimum number of blocks to maintain. :type min_blocks: int :param max_blocks: Maximum number of blocks to maintain. :type max_blocks: int :param max_cpu: CPU limits of the blocks (pods), in cpu units.
This is the cpu “limits” option for resource specification. Check kubernetes docs for more details. Default is 2.Parameters: - max_mem (str) – Memory limits of the blocks (pods), in Mi or Gi. This is the memory “limits” option for resource specification on kubernetes. Check kubernetes docs for more details. Default is 500Mi.
- init_cpu (float) – CPU limits of the blocks (pods), in cpu units. This is the cpu “requests” option for resource specification. Check kubernetes docs for more details. Default is 1.
- init_mem (str) – Memory limits of the blocks (pods), in Mi or Gi. This is the memory “requests” option for resource specification on kubernetes. Check kubernetes docs for more details. Default is 250Mi.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- worker_init (str) – Command to be run first for the workers, such as
python start.py
. - secret (str) – Docker secret to use to pull images
- pod_name (str) – The name for the pod, will be appended with a timestamp. Default is None, meaning parsl automatically names the pod.
- user_id (str) – Unix user id to run the container as.
- group_id (str) – Unix group id to run the container as.
- run_as_non_root (bool) – Run as non-root (True) or run as root (False).
- persistent_volumes (list[(str, str)]) – List of tuples describing persistent volumes to be mounted in the pod. The tuples consist of (PVC Name, Mount Directory).
-
__init__
(image: str, namespace: str = 'default', nodes_per_block: int = 1, init_blocks: int = 4, min_blocks: int = 0, max_blocks: int = 10, max_cpu: float = 2, max_mem: str = '500Mi', init_cpu: float = 1, init_mem: str = '250Mi', parallelism: float = 1, worker_init: str = '', pod_name: Optional[str] = None, user_id: Optional[str] = None, group_id: Optional[str] = None, run_as_non_root: bool = False, secret: Optional[str] = None, persistent_volumes: List[Tuple[str, str]] = []) → None[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source]¶ Cancels the jobs specified by a list of job ids Args: job_ids : [<job_id> …] Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request. :param - job_ids: A list of job identifiers :type - job_ids: list
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderExceptions or its subclasses
-
submit
(cmd_string, tasks_per_node, job_name='parsl')[source]¶ Submit a job :param - cmd_string: (String) - Name of the container to initiate :param - tasks_per_node: command invocations to be launched per node :type - tasks_per_node: int
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
Channels¶
For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance, some resources may allow access to their job schedulers from only their login-nodes, which require you to authenticate on through SSH, GSI-SSH and sometimes even require two-factor authentication. Channels are simple abstractions that enable the ExecutionProvider component to talk to the resource managers of compute facilities. The simplest Channel, LocalChannel, simply executes commands locally on a shell, while the SshChannel authenticates you to remote systems.
-
class
parsl.channels.base.
Channel
[source]¶ Define the interface to all channels. Channels are usually called via the execute_wait function. For channels that execute remotely, a push_file function allows you to copy over files.
+------------------ | cmd, wtime ------->| execute_wait (ec, stdout, stderr)<-|---+ | cmd, wtime ------->| execute_no_wait (ec, stdout, stderr)<-|---+ | src, dst_dir ------->| push_file dst_path <--------|----+ | dst_script_dir <------| script_dir | +-------------------
-
abspath
(path)[source]¶ Return the absolute path.
Parameters: path (str) – Path for which the absolute path will be returned.
-
execute_no_wait
(cmd, walltime, envs={}, *args, **kwargs)[source]¶ Execute asynchronousely without waiting for exitcode
Parameters: - cmd (-) – Command string to execute over the channel
- walltime (-) – Timeout in seconds
- KWargs:
- envs (dict) : Environment variables to push to the remote side
Returns: - the type of return value is channel specific
-
execute_wait
(cmd, walltime=None, envs={}, *args, **kwargs)[source]¶ Executes the cmd, with a defined walltime.
Parameters: - cmd (-) – Command string to execute over the channel
- walltime (-) – Timeout in seconds, optional
- KWargs:
- envs (Dict[str, str]) : Environment variables to push to the remote side
Returns: - (exit_code, stdout, stderr) (int, optional string, optional string) If the exit code is a failure code, the stdout and stderr return values may be None.
-
isdir
(path)[source]¶ Return true if the path refers to an existing directory.
Parameters: path (str) – Path of directory to check.
-
makedirs
(path, mode=511, exist_ok=False)[source]¶ Create a directory.
If intermediate directories do not exist, they will be created.
Parameters:
-
LocalChannel¶
-
class
parsl.channels.
LocalChannel
(userhome='.', envs={}, script_dir=None, **kwargs)[source]¶ This is not even really a channel, since opening a local shell is not heavy and done so infrequently that they do not need a persistent channel
-
__init__
(userhome='.', envs={}, script_dir=None, **kwargs)[source]¶ Initialize the local channel. script_dir is required by set to a default.
- KwArgs:
- userhome (string): (default=’.’) This is provided as a way to override and set a specific userhome
- envs (dict) : A dictionary of env variables to be set when launching the shell
- script_dir (string): Directory to place scripts
-
abspath
(path)[source]¶ Return the absolute path.
Parameters: path (str) – Path for which the absolute path will be returned.
-
close
()[source]¶ There’s nothing to close here, and this really doesn’t do anything
Returns: - False, because it really did not “close” this channel.
-
execute_no_wait
(cmd, walltime, envs={})[source]¶ Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds, this is not really used now.
Returns a tuple containing:
- pid : process id
- proc : a subprocess.Popen object
Raises: None.
-
execute_wait
(cmd, walltime=None, envs={})[source]¶ Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds, this is not really used now.
- Kwargs:
- envs (dict) : Dictionary of env variables. This will be used to override the envs set at channel initialization.
Returns: Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string Return type: - retcode
Raises: None.
-
isdir
(path)[source]¶ Return true if the path refers to an existing directory.
Parameters: path (str) – Path of directory to check.
-
makedirs
(path, mode=511, exist_ok=False)[source]¶ Create a directory.
If intermediate directories do not exist, they will be created.
Parameters:
-
push_file
(source, dest_dir)[source]¶ If the source files dirpath is the same as dest_dir, a copy is not necessary, and nothing is done. Else a copy is made.
Parameters: - source (-) – Path to the source file
- dest_dir (-) – Path to the directory to which the files is to be copied
Returns: Absolute path of the destination file
Return type: - destination_path (String)
Raises: - FileCopyException – If file copy failed.
-
SshChannel¶
-
class
parsl.channels.
SSHChannel
(hostname, username=None, password=None, script_dir=None, envs=None, gssapi_auth=False, skip_auth=False, port=22, **kwargs)[source]¶ SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following test on the commandline should work:
>>> ssh <username>@<hostname>
-
__init__
(hostname, username=None, password=None, script_dir=None, envs=None, gssapi_auth=False, skip_auth=False, port=22, **kwargs)[source]¶ Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible
Parameters: hostname (-) – Hostname - KWargs:
- username (string) : Username on remote system
- password (string) : Password for remote system
- port : The port designated for the ssh connection. Default is 22.
- script_dir (string) : Full path to a script dir where generated scripts could be sent to.
- envs (dict) : A dictionary of environment variables to be set when executing commands
Raises:
-
abspath
(path)[source]¶ Return the absolute path on the remote side.
Parameters: path (str) – Path for which the absolute path will be returned.
-
execute_no_wait
(cmd, walltime=2, envs={})[source]¶ Execute asynchronousely without waiting for exitcode
Parameters: - cmd (-) – Commandline string to be executed on the remote side
- walltime (-) – timeout to exec_command
- KWargs:
- envs (dict): A dictionary of env variables
Returns: - None, stdout (readable stream), stderr (readable stream)
Raises: - ChannelExecFailed (reason)
-
execute_wait
(cmd, walltime=2, envs={})[source]¶ Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds
- Kwargs:
- envs (dict) : Dictionary of env variables
Returns: Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string Return type: - retcode
Raises: None.
-
isdir
(path)[source]¶ Return true if the path refers to an existing directory.
Parameters: path (str) – Path of directory on the remote side to check.
-
makedirs
(path, mode=511, exist_ok=False)[source]¶ Create a directory on the remote side.
If intermediate directories do not exist, they will be created.
Parameters:
-
pull_file
(remote_source, local_dir)[source]¶ Transport file on the remote side to a local directory
Parameters: - remote_source (-) – remote_source
- local_dir (-) – Local directory to copy to
Returns: Local path to file
Return type: - str
Raises: - - FileExists – Name collision at local directory.
- - FileCopyException – FileCopy failed.
-
push_file
(local_source, remote_dir)[source]¶ Transport a local file to a directory on a remote machine
Parameters: - local_source (-) – Path
- remote_dir (-) – Remote path
Returns: Path to copied file on remote machine
Return type: - str
Raises: - - BadScriptPath – if script path on the remote side is bad
- - BadPermsScriptPath – You do not have perms to make the channel script dir
- - FileCopyException – FileCopy failed.
-
SSH Interactive Login Channel¶
-
class
parsl.channels.
SSHInteractiveLoginChannel
(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]¶ SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up.
-
__init__
(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source]¶ Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible
Parameters: hostname (-) – Hostname - KWargs:
- username (string) : Username on remote system
- password (string) : Password for remote system
- script_dir (string) : Full path to a script dir where generated scripts could be sent to.
- envs (dict) : A dictionary of env variables to be set when executing commands
Raises:
-
ExecutionProviders¶
An execution provider is basically an adapter to various types of execution resources. The providers abstract away the interfaces provided by various systems to request, monitor, and cancel computate resources.
Slurm¶
-
class
parsl.providers.
SlurmProvider
(partition, channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, cores_per_node=None, mem_per_node=None, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', cmd_timeout=10, exclusive=True, move_files=True, launcher=SingleNodeLauncher())[source] Slurm Execution Provider
This provider uses sbatch to submit, squeue for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.
Parameters: - partition (str) – Slurm partition to request blocks from.
- channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- cores_per_node (int) – Specify the number of cores to provision per node. If set to None, executors will assume all cores on the node are available for computation. Default is None.
- mem_per_node (float) – Specify the real memory to provision per node in GB. If set to None, no explicit request to the scheduler will be made. Default is None.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #SBATCH blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- exclusive (bool (Default = True)) – Requests nodes which are not shared with other running jobs.
- launcher (Launcher) –
- Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),SrunLauncher
, orAprunLauncher
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
-
__init__
(partition, channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, cores_per_node=None, mem_per_node=None, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', cmd_timeout=10, exclusive=True, move_files=True, launcher=SingleNodeLauncher())[source] Initialize self. See help(type(self)) for accurate signature.
-
_status
()[source]¶ Internal: Do not call. Returns the status list for a list of job_ids
Parameters: self – Returns: Status list of all jobs Return type: [status…]
-
_write_submit_script
(template, script_filename, job_name, configs)[source]¶ Generate submit script and write it to a file.
Parameters: - template (-) – The template string to be used for the writing submit script
- script_filename (-) – Name of the submit script
- job_name (-) – job name
- configs (-) – configs that get pushed into the template
Returns: on success
Return type: - True
Raises: SchedulerMissingArgs
– If template is missing argsScriptPathError
– Unable to write submit script out
-
cancel
(job_ids)[source] Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
current_capacity
[source]¶ Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] Submit the command as a slurm job.
Parameters: Returns: If at capacity, returns None; otherwise, a string identifier for the job
Return type:
Cobalt¶
-
class
parsl.providers.
CobaltProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, scheduler_options='', worker_init='', launcher=AprunLauncher(overrides=''), cmd_timeout=10)[source] Cobalt Execution Provider
This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) jobs. Theo script to be used is created from a template file in this same module.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- account (str) – Account that the job will be charged against.
- queue (str) – Torque queue to request blocks from.
- scheduler_options (str) – String to prepend to the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
AprunLauncher
(the default) or,SingleNodeLauncher
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=0, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', account=None, queue=None, scheduler_options='', worker_init='', launcher=AprunLauncher(overrides=''), cmd_timeout=10)[source] Initialize self. See help(type(self)) for accurate signature.
-
_status
()[source]¶ Internal: Do not call. Returns the status list for a list of job_ids
Parameters: self – Returns: Status list of all jobs Return type: [status…]
-
_write_submit_script
(template, script_filename, job_name, configs)[source]¶ Generate submit script and write it to a file.
Parameters: - template (-) – The template string to be used for the writing submit script
- script_filename (-) – Name of the submit script
- job_name (-) – job name
- configs (-) – configs that get pushed into the template
Returns: on success
Return type: - True
Raises: SchedulerMissingArgs
– If template is missing argsScriptPathError
– Unable to write submit script out
-
cancel
(job_ids)[source] Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
current_capacity
[source]¶ Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] Submits the command onto an Local Resource Manager job of parallel elements. Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node number of nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
- channel (Channel) – Channel for accessing this provider. Possible channels include
Condor¶
-
class
parsl.providers.
CondorProvider
(channel: parsl.channels.base.Channel = LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block: int = 1, cores_per_slot: Optional[int] = None, mem_per_slot: Optional[float] = None, init_blocks: int = 1, min_blocks: int = 0, max_blocks: int = 10, parallelism: float = 1, environment: Optional[Dict[str, str]] = None, project: str = '', scheduler_options: str = '', transfer_input_files: List[str] = [], walltime: str = '00:10:00', worker_init: str = '', launcher: parsl.launchers.launchers.Launcher = SingleNodeLauncher(), requirements: str = '', cmd_timeout: int = 60)[source] HTCondor Execution Provider.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- cores_per_slot (int) – Specify the number of cores to provision per slot. If set to None, executors will assume all cores on the node are available for computation. Default is None.
- mem_per_slot (float) – Specify the real memory to provision per slot in GB. If set to None, no explicit request to the scheduler will be made. Default is None.
- init_blocks (int) – Number of blocks to provision at time of initialization
- min_blocks (int) – Minimum number of blocks to maintain
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- environment (dict of str) – A dictionary of environmant variable name and value pairs which will be set before running a task.
- project (str) – Project which the job will be charged against
- scheduler_options (str) – String to add specific condor attributes to the HTCondor submit script.
- transfer_input_files (list(str)) – List of strings of paths to additional files or directories to transfer to the job
- worker_init (str) – Command to be run before starting a worker.
- requirements (str) – Condor requirements.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default), - cmd_timeout (int) – Timeout for commands made to the scheduler in seconds
-
__init__
(channel: parsl.channels.base.Channel = LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block: int = 1, cores_per_slot: Optional[int] = None, mem_per_slot: Optional[float] = None, init_blocks: int = 1, min_blocks: int = 0, max_blocks: int = 10, parallelism: float = 1, environment: Optional[Dict[str, str]] = None, project: str = '', scheduler_options: str = '', transfer_input_files: List[str] = [], walltime: str = '00:10:00', worker_init: str = '', launcher: parsl.launchers.launchers.Launcher = SingleNodeLauncher(), requirements: str = '', cmd_timeout: int = 60) → None[source] Initialize self. See help(type(self)) for accurate signature.
-
_write_submit_script
(template, script_filename, job_name, configs)[source]¶ Generate submit script and write it to a file.
Parameters: - template (-) – The template string to be used for the writing submit script
- script_filename (-) – Name of the submit script
- job_name (-) – job name
- configs (-) – configs that get pushed into the template
Returns: on success
Return type: - True
Raises: SchedulerMissingArgs
– If template is missing argsScriptPathError
– Unable to write submit script out
-
cancel
(job_ids)[source] Cancels the jobs specified by a list of job IDs.
Parameters: job_ids (list of str) – The job IDs to cancel. Returns: Each entry in the list will be True if the job is cancelled succesfully, otherwise False. Return type: list of bool
-
current_capacity
[source] Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source] Get the status of a list of jobs identified by their ids.
Parameters: job_ids (list of int) – Identifiers of jobs for which the status will be returned. Returns: Status codes for the requested jobs. Return type: List of int
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] Submits the command onto an Local Resource Manager job.
- example file with the complex case of multiple submits per job:
- Universe =vanilla output = out.$(Cluster).$(Process) error = err.$(Cluster).$(Process) log = log.$(Cluster) leave_in_queue = true executable = test.sh queue 5 executable = foo queue 1
$ condor_submit test.sub Submitting job(s)…… 5 job(s) submitted to cluster 118907. 1 job(s) submitted to cluster 118908.
Parameters: Returns: None if at capacity and cannot provision more; otherwise the identifier for the job.
Return type:
- channel (Channel) – Channel for accessing this provider. Possible channels include
Torque¶
-
class
parsl.providers.
TorqueProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), account=None, queue=None, scheduler_options='', worker_init='', nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=AprunLauncher(overrides=''), walltime='00:20:00', cmd_timeout=120)[source] Torque Execution Provider
This provider uses sbatch to submit, squeue for status, and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - account (str) – Account the job will be charged against.
- queue (str) – Torque queue to request blocks from.
- nodes_per_block (int) – Nodes to provision per block.
- init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
- min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #PBS blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
AprunLauncher
(the default), orSingleNodeLauncher
,
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), account=None, queue=None, scheduler_options='', worker_init='', nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=100, parallelism=1, launcher=AprunLauncher(overrides=''), walltime='00:20:00', cmd_timeout=120)[source] Initialize self. See help(type(self)) for accurate signature.
-
_status
()[source]¶ Internal: Do not call. Returns the status list for a list of job_ids
Parameters: self – Returns: Status list of all jobs Return type: [status…]
-
_write_submit_script
(template, script_filename, job_name, configs)[source]¶ Generate submit script and write it to a file.
Parameters: - template (-) – The template string to be used for the writing submit script
- script_filename (-) – Name of the submit script
- job_name (-) – job name
- configs (-) – configs that get pushed into the template
Returns: on success
Return type: - True
Raises: SchedulerMissingArgs
– If template is missing argsScriptPathError
– Unable to write submit script out
-
cancel
(job_ids)[source] Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
current_capacity
[source]¶ Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] Submits the command onto an Local Resource Manager job. Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node number of nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
- channel (Channel) – Channel for accessing this provider. Possible channels include
Local¶
-
class
parsl.providers.
LocalProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, launcher=SingleNodeLauncher(), init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', worker_init='', cmd_timeout=30, parallelism=1, move_files=None)[source] Local Execution Provider
This provider is used to provide execution resources from the localhost.
Parameters: - min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- move_files (Optional[Bool]: should files be moved? by default, Parsl will try to figure) – this out itself (= None). If True, then will always move. If False, will never move.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, launcher=SingleNodeLauncher(), init_blocks=4, min_blocks=0, max_blocks=10, walltime='00:15:00', worker_init='', cmd_timeout=30, parallelism=1, move_files=None)[source] Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source] Cancels the jobs specified by a list of job ids
Args: job_ids : [<job_id> …]
Returns : [True/False…] : If the cancel operation fails the entire list will be False.
-
status
(job_ids)[source] Get the status of a list of jobs identified by their ids.
Parameters: job_ids (-) – List of identifiers for the jobs Returns: - List of status codes.
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] Submits the command onto an Local Resource Manager job. Submit returns an ID that corresponds to the task that was just submitted.
- If tasks_per_node < 1:
- 1/tasks_per_node is provisioned
- If tasks_per_node == 1:
- A single node is provisioned
- If tasks_per_node > 1 :
- tasks_per_node nodes are provisioned.
Parameters: - command (-) – (String) Commandline invocation to be made on the remote side.
- tasks_per_node (-) – command invocations to be launched per node
- Kwargs:
- job_name (String): Name for job, must be unique
Returns: At capacity, cannot provision more - job_id: (string) Identifier for the job Return type: - None
AWS¶
-
class
parsl.providers.
AWSProvider
(image_id, key_name, init_blocks=1, min_blocks=0, max_blocks=10, nodes_per_block=1, parallelism=1, worker_init='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', linger=False, launcher=SingleNodeLauncher())[source] A provider for using Amazon Elastic Compute Cloud (EC2) resources.
One of 3 methods are required to authenticate: keyfile, profile or environment variables. If neither keyfile or profile are set, the following environment variables must be set:
AWS_ACCESS_KEY_ID
(the access key for your AWS account),AWS_SECRET_ACCESS_KEY
(the secret key for your AWS account), and (optionaly) theAWS_SESSION_TOKEN
(the session key for your AWS account).Parameters: - image_id (str) – Identification of the Amazon Machine Image (AMI).
- worker_init (str) – String to append to the Userdata script executed in the cloudinit phase of instance initialization.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- key_file (str) – Path to json file that contains ‘AWSAccessKeyId’ and ‘AWSSecretKey’.
- nodes_per_block (int) – This is always 1 for ec2. Nodes to provision per block.
- profile (str) – Profile to be used from the standard aws config file ~/.aws/config.
- nodes_per_block – Nodes to provision per block. Default is 1.
- init_blocks (int) – Number of blocks to provision at the start of the run. Default is 1.
- min_blocks (int) – Minimum number of blocks to maintain. Default is 0.
- max_blocks (int) – Maximum number of blocks to maintain. Default is 10.
- instance_type (str) –
EC2 instance type. Instance types comprise varying combinations of CPU, memory, . storage, and networking capacity For more information on possible instance types,. see here Default is ‘t2.small’.
- region (str) – Amazon Web Service (AWS) region to launch machines. Default is ‘us-east-2’.
- key_name (str) – Name of the AWS private key (.pem file) that is usually generated on the console to allow SSH access to the EC2 instances. This is mostly used for debugging.
- spot_max_bid (float) – Maximum bid price (if requesting spot market machines).
- iam_instance_profile_arn (str) – Launch instance with a specific role.
- state_file (str) – Path to the state file from a previous run to re-use.
- walltime – Walltime requested per block in HH:MM:SS. This option is not currently honored by this provider.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),SrunLauncher
, orAprunLauncher
- linger (Bool) – When set to True, the workers will not
halt
. The user is responsible for shutting down the node.
-
__init__
(image_id, key_name, init_blocks=1, min_blocks=0, max_blocks=10, nodes_per_block=1, parallelism=1, worker_init='', instance_type='t2.small', region='us-east-2', spot_max_bid=0, key_file=None, profile=None, iam_instance_profile_arn='', state_file=None, walltime='01:00:00', linger=False, launcher=SingleNodeLauncher())[source] Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source] Cancel the jobs specified by a list of job ids.
Parameters: job_ids (list of str) – List of of job identifiers Returns: Each entry in the list will contain False if the operation fails. Otherwise, the entry will be True. Return type: list of bool
-
create_session
()[source] Create a session.
First we look in self.key_file for a path to a json file with the credentials. The key file should have ‘AWSAccessKeyId’ and ‘AWSSecretKey’.
Next we look at self.profile for a profile name and try to use the Session call to automatically pick up the keys for the profile from the user default keys file ~/.aws/config.
Finally, boto3 will look for the keys in environment variables: AWS_ACCESS_KEY_ID: The access key for your AWS account. AWS_SECRET_ACCESS_KEY: The secret key for your AWS account. AWS_SESSION_TOKEN: The session key for your AWS account. This is only needed when you are using temporary credentials. The AWS_SECURITY_TOKEN environment variable can also be used, but is only supported for backwards compatibility purposes. AWS_SESSION_TOKEN is supported by multiple AWS SDKs besides python.
-
create_vpc
()[source] Create and configure VPC
We create a VPC with CIDR 10.0.0.0/16, which provides up to 64,000 instances.
We attach a subnet for each availability zone within the region specified in the config. We give each subnet an ip range like 10.0.X.0/20, which is large enough for approx. 4000 instances.
Security groups are configured in function security_group.
-
current_capacity
[source] Returns the current blocksize.
-
read_state_file
(state_file)[source] Read the state file, if it exists.
If this script has been run previously, resource IDs will have been written to a state file. On starting a run, a state file will be looked for before creating new infrastructure. Information on VPCs, security groups, and subnets are saved, as well as running instances and their states.
AWS has a maximum number of VPCs per region per account, so we do not want to clutter users’ AWS accounts with security groups and VPCs that will be used only once.
-
security_group
(vpc)[source] Create and configure a new security group.
Allows all ICMP in, all TCP and UDP in within VPC.
This security group is very open. It allows all incoming ping requests on all ports. It also allows all outgoing traffic on all ports. This can be limited by changing the allowed port ranges.
Parameters: vpc (VPC instance) – VPC in which to set up security group.
-
status
(job_ids)[source] Get the status of a list of jobs identified by their ids.
Parameters: job_ids (list of str) – Identifiers for the jobs. Returns: The status codes of the requsted jobs. Return type: list of int
-
submit
(command='sleep 1', tasks_per_node=1, job_name='parsl.auto')[source] Submit the command onto a freshly instantiated AWS EC2 instance.
Submit returns an ID that corresponds to the task that was just submitted.
Parameters: Returns: If at capacity, None will be returned. Otherwise, the job identifier will be returned.
Return type:
-
write_state_file
()[source] Save information that must persist to a file.
We do not want to create a new VPC and new identical security groups, so we save information about them in a file between runs.
GridEngine¶
-
class
parsl.providers.
GridEngineProvider
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', launcher=SingleNodeLauncher())[source] A provider for the Grid Engine scheduler.
Parameters: - channel (Channel) – Channel for accessing this provider. Possible channels include
LocalChannel
(the default),SSHChannel
, orSSHInteractiveLoginChannel
. - nodes_per_block (int) – Nodes to provision per block.
- min_blocks (int) – Minimum number of blocks to maintain.
- max_blocks (int) – Maximum number of blocks to maintain.
- parallelism (float) – Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
- walltime (str) – Walltime requested per block in HH:MM:SS.
- scheduler_options (str) – String to prepend to the #$$ blocks in the submit script to the scheduler.
- worker_init (str) – Command to be run before starting a worker, such as ‘module load Anaconda; source activate env’.
- launcher (Launcher) – Launcher for this provider. Possible launchers include
SingleNodeLauncher
(the default),
-
__init__
(channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/parsl/checkouts/0.9.0/docs' ), nodes_per_block=1, init_blocks=1, min_blocks=0, max_blocks=10, parallelism=1, walltime='00:10:00', scheduler_options='', worker_init='', launcher=SingleNodeLauncher())[source] Initialize self. See help(type(self)) for accurate signature.
-
cancel
(job_ids)[source] Cancels the resources identified by the job_ids provided by the user.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from cancelling the job which can be True, False
Raises: - ExecutionProviderException or its subclasses
-
current_capacity
[source]¶ Returns the currently provisioned blocks. This may need to return more information in the futures : { minsize, maxsize, current_requested }
-
status
(job_ids)[source]¶ Get the status of a list of jobs identified by the job identifiers returned from the submit request.
Parameters: job_ids (-) – A list of job identifiers
Returns: - A list of status from [‘PENDING’, ‘RUNNING’, ‘CANCELLED’, ‘COMPLETED’, ‘FAILED’, ‘TIMEOUT’] corresponding to each job_id in the job_ids list.
Raises: - ExecutionProviderException or its subclasses
-
submit
(command, tasks_per_node, job_name='parsl.auto')[source] The submit method takes the command string to be executed upon instantiation of a resource most often to start a pilot (such as IPP engine or even Swift-T engines).
- Args :
- command (str) : The bash command string to be executed.
- tasks_per_node (int) : command invocations to be launched per node
- KWargs:
- job_name (str) : Human friendly name to be assigned to the job request
Returns: - A job identifier, this could be an integer, string etc
Raises: - ExecutionProviderException or its subclasses
- channel (Channel) – Channel for accessing this provider. Possible channels include
Channels¶
For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance some resources may allow access to their job schedulers from only their login-nodes which require you to authenticate on through SSH, GSI-SSH and sometimes even require two factor authentication. Channels are simple abstractions that enable the ExecutionProvider component to talk to the resource managers of compute facilities. The simplest Channel, LocalChannel, simply executes commands locally on a shell, while the SshChannel authenticates you to remote systems.
-
class
parsl.channels.base.
Channel
[source] Define the interface to all channels. Channels are usually called via the execute_wait function. For channels that execute remotely, a push_file function allows you to copy over files.
+------------------ | cmd, wtime ------->| execute_wait (ec, stdout, stderr)<-|---+ | cmd, wtime ------->| execute_no_wait (ec, stdout, stderr)<-|---+ | src, dst_dir ------->| push_file dst_path <--------|----+ | dst_script_dir <------| script_dir | +-------------------
-
close
()[source] Closes the channel. Clean out any auth credentials.
Parameters: None – Returns: Bool
-
execute_no_wait
(cmd, walltime, envs={}, *args, **kwargs)[source] Execute asynchronousely without waiting for exitcode
Parameters: - cmd (-) – Command string to execute over the channel
- walltime (-) – Timeout in seconds
- KWargs:
- envs (dict) : Environment variables to push to the remote side
Returns: - the type of return value is channel specific
-
execute_wait
(cmd, walltime=None, envs={}, *args, **kwargs)[source] Executes the cmd, with a defined walltime.
Parameters: - cmd (-) – Command string to execute over the channel
- walltime (-) – Timeout in seconds, optional
- KWargs:
- envs (Dict[str, str]) : Environment variables to push to the remote side
Returns: - (exit_code, stdout, stderr) (int, optional string, optional string) If the exit code is a failure code, the stdout and stderr return values may be None.
-
push_file
(source, dest_dir)[source] Channel will take care of moving the file from source to the destination directory
Parameters: - source (string) – Full filepath of the file to be moved
- dest_dir (string) – Absolute path of the directory to move to
Returns: destination_path (string)
-
script_dir
[source] This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.
Parameters: None (-) – Returns: - Channel script dir
-
LocalChannel¶
-
class
parsl.channels.
LocalChannel
(userhome='.', envs={}, script_dir=None, **kwargs)[source] This is not even really a channel, since opening a local shell is not heavy and done so infrequently that they do not need a persistent channel
-
__init__
(userhome='.', envs={}, script_dir=None, **kwargs)[source] Initialize the local channel. script_dir is required by set to a default.
- KwArgs:
- userhome (string): (default=’.’) This is provided as a way to override and set a specific userhome
- envs (dict) : A dictionary of env variables to be set when launching the shell
- script_dir (string): Directory to place scripts
-
close
()[source] There’s nothing to close here, and this really doesn’t do anything
Returns: - False, because it really did not “close” this channel.
-
execute_no_wait
(cmd, walltime, envs={})[source] Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds, this is not really used now.
Returns a tuple containing:
- pid : process id
- proc : a subprocess.Popen object
Raises: None.
-
execute_wait
(cmd, walltime=None, envs={})[source] Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds, this is not really used now.
- Kwargs:
- envs (dict) : Dictionary of env variables. This will be used to override the envs set at channel initialization.
Returns: Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string Return type: - retcode
Raises: None.
-
push_file
(source, dest_dir)[source] If the source files dirpath is the same as dest_dir, a copy is not necessary, and nothing is done. Else a copy is made.
Parameters: - source (-) – Path to the source file
- dest_dir (-) – Path to the directory to which the files is to be copied
Returns: Absolute path of the destination file
Return type: - destination_path (String)
Raises: - FileCopyException – If file copy failed.
-
script_dir
[source] This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.
Parameters: None (-) – Returns: - Channel script dir
-
SSHChannel¶
-
class
parsl.channels.
SSHChannel
(hostname, username=None, password=None, script_dir=None, envs=None, gssapi_auth=False, skip_auth=False, port=22, **kwargs)[source] SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following test on the commandline should work:
>>> ssh <username>@<hostname>
-
__init__
(hostname, username=None, password=None, script_dir=None, envs=None, gssapi_auth=False, skip_auth=False, port=22, **kwargs)[source] Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible
Parameters: hostname (-) – Hostname - KWargs:
- username (string) : Username on remote system
- password (string) : Password for remote system
- port : The port designated for the ssh connection. Default is 22.
- script_dir (string) : Full path to a script dir where generated scripts could be sent to.
- envs (dict) : A dictionary of environment variables to be set when executing commands
Raises:
-
close
()[source] Closes the channel. Clean out any auth credentials.
Parameters: None – Returns: Bool
-
execute_no_wait
(cmd, walltime=2, envs={})[source] Execute asynchronousely without waiting for exitcode
Parameters: - cmd (-) – Commandline string to be executed on the remote side
- walltime (-) – timeout to exec_command
- KWargs:
- envs (dict): A dictionary of env variables
Returns: - None, stdout (readable stream), stderr (readable stream)
Raises: - ChannelExecFailed (reason)
-
execute_wait
(cmd, walltime=2, envs={})[source] Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds
- Kwargs:
- envs (dict) : Dictionary of env variables
Returns: Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string Return type: - retcode
Raises: None.
-
pull_file
(remote_source, local_dir)[source] Transport file on the remote side to a local directory
Parameters: - remote_source (-) – remote_source
- local_dir (-) – Local directory to copy to
Returns: Local path to file
Return type: - str
Raises: - - FileExists – Name collision at local directory.
- - FileCopyException – FileCopy failed.
-
push_file
(local_source, remote_dir)[source] Transport a local file to a directory on a remote machine
Parameters: - local_source (-) – Path
- remote_dir (-) – Remote path
Returns: Path to copied file on remote machine
Return type: - str
Raises: - - BadScriptPath – if script path on the remote side is bad
- - BadPermsScriptPath – You do not have perms to make the channel script dir
- - FileCopyException – FileCopy failed.
-
script_dir
[source] This is a property. Returns the directory assigned for storing all internal scripts such as scheduler submit scripts. This is usually where error logs from the scheduler would reside on the channel destination side.
Parameters: None (-) – Returns: - Channel script dir
-
SSHILChannel¶
-
class
parsl.channels.
SSHInteractiveLoginChannel
(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source] SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up.
-
__init__
(hostname, username=None, password=None, script_dir=None, envs=None, **kwargs)[source] Initialize a persistent connection to the remote system. We should know at this point whether ssh connectivity is possible
Parameters: hostname (-) – Hostname - KWargs:
- username (string) : Username on remote system
- password (string) : Password for remote system
- script_dir (string) : Full path to a script dir where generated scripts could be sent to.
- envs (dict) : A dictionary of env variables to be set when executing commands
Raises:
-
execute_no_wait
(cmd, walltime=2, envs={})[source]¶ Execute asynchronousely without waiting for exitcode
Parameters: - cmd (-) – Commandline string to be executed on the remote side
- walltime (-) – timeout to exec_command
- KWargs:
- envs (dict): A dictionary of env variables
Returns: - None, stdout (readable stream), stderr (readable stream)
Raises: - ChannelExecFailed (reason)
-
execute_wait
(cmd, walltime=2, envs={})[source]¶ Synchronously execute a commandline string on the shell.
Parameters: - cmd (-) – Commandline string to execute
- walltime (-) – walltime in seconds
- Kwargs:
- envs (dict) : Dictionary of env variables
Returns: Return code from the execution, -1 on fail - stdout : stdout string - stderr : stderr string Return type: - retcode
Raises: None.
-
pull_file
(remote_source, local_dir)[source]¶ Transport file on the remote side to a local directory
Parameters: - remote_source (-) – remote_source
- local_dir (-) – Local directory to copy to
Returns: Local path to file
Return type: - str
Raises: - - FileExists – Name collision at local directory.
- - FileCopyException – FileCopy failed.
-
push_file
(local_source, remote_dir)[source]¶ Transport a local file to a directory on a remote machine
Parameters: - local_source (-) – Path
- remote_dir (-) – Remote path
Returns: Path to copied file on remote machine
Return type: - str
Raises: - - BadScriptPath – if script path on the remote side is bad
- - BadPermsScriptPath – You do not have perms to make the channel script dir
- - FileCopyException – FileCopy failed.
-
Launchers¶
Launchers are basically wrappers for user submitted scripts as they are submitted to a specific execution resource.
SimpleLauncher¶
SingleNodeLauncher¶
-
class
parsl.launchers.
SingleNodeLauncher
[source]¶ Worker launcher that wraps the user’s command with the framework to launch multiple command invocations in parallel. This wrapper sets the bash env variable CORES to the number of cores on the machine. By setting task_blocks to an integer or to a bash expression the number of invocations of the command to be launched can be controlled.
AprunLauncher¶
SrunLauncher¶
SrunMPILauncher¶
-
class
parsl.launchers.
SrunMPILauncher
(overrides='')[source]¶ Launches as many workers as MPI tasks to be executed concurrently within a block.
Use this launcher instead of SrunLauncher if each block will execute multiple MPI applications at the same time. Workers should be launched with independent Srun calls so as to setup the environment for MPI application launch.
Flow Control¶
This section deals with functionality related to controlling the flow of tasks to various executors.
FlowControl¶
-
class
parsl.dataflow.flow_control.
FlowControl
(dfk, *args, threshold=20, interval=5)[source]¶ Implements threshold-interval based flow control.
The overall goal is to trap the flow of apps from the workflow, measure it and redirect it the appropriate executors for processing.
This is based on the following logic:
BEGIN (INTERVAL, THRESHOLD, callback) : start = current_time() while (current_time()-start < INTERVAL) : count = get_events_since(start) if count >= THRESHOLD : break callback()
This logic ensures that the callbacks are activated with a maximum delay of
interval
for systems with infrequent events as well as systems which would generate large bursts of events.Once a callback is triggered, the callback generally runs a strategy method on the sites available as well asqeuque
TODO: When the debug logs are enabled this module emits duplicate messages. This issue needs more debugging. What I’ve learnt so far is that the duplicate messages are present only when the timer thread is started, so this could be from a duplicate logger being added by the thread.
FlowNoControl¶
-
class
parsl.dataflow.flow_control.
FlowNoControl
(dfk, *args, threshold=2, interval=2)[source]¶ FlowNoControl implements similar interfaces as FlowControl.
Null handlers are used so as to mimic the FlowControl class.
Timer¶
-
class
parsl.dataflow.flow_control.
Timer
(callback, *args, interval=5, name=None)[source]¶ This timer is a simplified version of the FlowControl timer. This timer does not employ notify events.
This is based on the following logic :
BEGIN (INTERVAL, THRESHOLD, callback) : start = current_time() while (current_time()-start < INTERVAL) : wait() break callback()
-
__init__
(callback, *args, interval=5, name=None)[source]¶ Initialize the flowcontrol object We start the timer thread here
Parameters: dfk (-) – DFK object to track parsl progress - KWargs:
- threshold (int) : Tasks after which the callback is triggered
- interval (int) : seconds after which timer expires
- name (str) : a base name to use when naming the started thread
-
Strategy¶
Strategies are responsible for tracking the compute requirements of a workflow as it is executed and scaling the resources to match it.
-
class
parsl.dataflow.strategy.
Strategy
(dfk)[source]¶ FlowControl strategy.
As a workflow dag is processed by Parsl, new tasks are added and completed asynchronously. Parsl interfaces executors with execution providers to construct scalable executors to handle the variable work-load generated by the workflow. This component is responsible for periodically checking outstanding tasks and available compute capacity and trigger scaling events to match workflow needs.
Here’s a diagram of an executor. An executor consists of blocks, which are usually created by single requests to a Local Resource Manager (LRM) such as slurm, condor, torque, or even AWS API. The blocks could contain several task blocks which are separate instances on workers.
|<--min_blocks |<-init_blocks max_blocks-->| +----------------------------------------------------------+ | +--------block----------+ +--------block--------+ | executor = | | task task | ... | task task | | | +-----------------------+ +---------------------+ | +----------------------------------------------------------+
- The relevant specification options are:
- min_blocks: Minimum number of blocks to maintain
- init_blocks: number of blocks to provision at initialization of workflow
- max_blocks: Maximum number of blocks that can be active due to one workflow
slots = current_capacity * tasks_per_node * nodes_per_block active_tasks = pending_tasks + running_tasks Parallelism = slots / tasks = [0, 1] (i.e, 0 <= p <= 1)
For example:
- When p = 0,
=> compute with the least resources possible. infinite tasks are stacked per slot.
blocks = min_blocks { if active_tasks = 0 max(min_blocks, 1) { else
- When p = 1,
=> compute with the most resources. one task is stacked per slot.
blocks = min ( max_blocks, ceil( active_tasks / slots ) )
- When p = 1/2,
- => We stack upto 2 tasks per slot before we overflow and request a new block
let’s say min:init:max = 0:0:4 and task_blocks=2 Consider the following example: min_blocks = 0 init_blocks = 0 max_blocks = 4 tasks_per_node = 2 nodes_per_block = 1
In the diagram, X <- task
at 2 tasks:
+---Block---| | | | X X | |slot slot| +-----------+
at 5 tasks, we overflow as the capacity of a single block is fully used.
+---Block---| +---Block---| | X X | ----> | | | X X | | X | |slot slot| |slot slot| +-----------+ +-----------+
Memoization¶
-
class
parsl.dataflow.memoization.
Memoizer
(dfk, memoize=True, checkpoint={})[source]¶ Memoizer is responsible for ensuring that identical work is not repeated.
When a task is repeated, i.e., the same function is called with the same exact arguments, the result from a previous execution is reused. wiki
The memoizer implementation here does not collapse duplicate calls at call time, but works only when the result of a previous call is available at the time the duplicate call is made.
For instance:
No advantage from Memoization helps memoization here: here: TaskA TaskB | TaskA | | | TaskA done (TaskB) | | | (TaskB) done | | done | done
The memoizer creates a lookup table by hashing the function name and its inputs, and storing the results of the function.
When a task is ready for launch, i.e., all of its arguments have resolved, we add its hash to the task datastructure.
-
__init__
(dfk, memoize=True, checkpoint={})[source]¶ Initialize the memoizer.
Parameters: dfk (-) – The DFK object - KWargs:
- memoize (Bool): enable memoization or not.
- checkpoint (Dict): A checkpoint loaded as a dict.
-
check_memo
(task_id, task)[source]¶ Create a hash of the task and its inputs and check the lookup table for this hash.
If present, the results are returned. The result is a tuple indicating whether a memo exists and the result, since a None result is possible and could be confusing. This seems like a reasonable option without relying on a cache_miss exception.
Parameters: task (-) – task from the dfk.tasks table Returns: - present (Bool): Is this present in the memo_lookup_table
- Result (Py Obj): Result of the function if present in table
Return type: Tuple of the following This call will also set task[‘hashsum’] to the unique hashsum for the func+inputs.
-
hash_lookup
(hashsum)[source]¶ Lookup a hash in the memoization table.
Parameters: hashsum (-) – The same hashes used to uniquely identify apps+inputs Returns: - Lookup result
Raises: - KeyError – if hash not in table
-
make_hash
(task)[source]¶ Create a hash of the task inputs.
This uses a serialization library borrowed from ipyparallel. If this fails here, then all ipp calls are also likely to fail due to failure at serialization.
Parameters: task (-) – Task dictionary from dfk.tasks Returns: A unique hash string Return type: - hash (str)
-