Parsl tutorial

Parsl is a native Python library that allows you to write functions that execute in parallel and tie them together with dependencies to create workflows. Parsl wraps Python functions as “Apps” using the @App decorator. Decorated functions can run in parallel when all their inputs are ready.

For more comprehensive documentation and examples, please refer our documentation.

In [ ]:
import parsl
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import config

# parsl.set_stream_logger() # <-- log everything to stdout

print(parsl.__version__)

Configuring Parsl

Parsl separates code and execution. To do so, it relies on a configuration model to describe the pool of resources to be used for execution (e.g., clusters, clouds, threads).

We’ll come back to configuration later in this tutorial. For now, we configure this example to use a local pool of threads to facilitate local parallel execution.

In [ ]:
parsl.load(config)

Apps

In Parsl an app is a piece of code that can be asynchronously executed on an execution resource (e.g., cloud, cluster, or local PC). Parsl provides support for pure Python apps (python_app) and also command-line apps executed via Bash (bash_app).

Python Apps

As a first example let’s define a simple Python function that returns the string ‘Hello World!’. This function is made into a Parsl App using the @python_app decorator.

In [ ]:
@python_app
def hello ():
    return 'Hello World!'

print(hello().result())

As can be seen above, Apps wrap standard Python function calls. As such, they can be passed arbitrary arguments and return standard Python objects.

In [ ]:
@python_app
def multiply (a, b):
    return a * b

print(multiply(5,9).result())

Bash Apps

Parsl’s Bash app allows you to wrap execution of external applications from the command-line as you would in a Bash shell. It can also be used to execute Bash scripts directly. To define a Bash app the wrapped Python function must return the command-line string to be executed.

As a first example of a Bash let’s use the Linux command echo to return the string ‘Hello World!’. This function is made into a Bash App using the @bash_app decorator.

Note, that in this case the echo command will print ‘Hello World!’ to stdout. In order to use this output we need to tell Parsl to capture stdout. This is done by specifying the stdout keyword argument in the app function. The same approach can be used to capture stderr.

In [ ]:
@bash_app
def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'):
    return 'echo "Hello World!"'

echo_hello().result()

with open('echo-hello.stdout', 'r') as f:
     print(f.read())

Passing data

Parsl Apps can exchange data as Python objects (as shown above) or in the form of files. In order to enforce dataflow semantics, Parsl must track the data that is passed into and out of an App. To make Parsl aware of these dependencies the app function includes inputs and outputs keyword arguments.

We first create three test files named hello1.txt, hello2.txt, and hello3.txt containing the text “hello 1”, “hello 2”, and “hello 3”.

In [ ]:
!echo "hello 1" > /tmp/hello1.txt
!echo "hello 2" > /tmp/hello2.txt
!echo "hello 3" > /tmp/hello3.txt

We then write an App that will concentate these files using cat. We pass in the list of hello files (input) and concatenate the text into a new file named all_hellos.txt (output).

In [ ]:
@bash_app
def cat(inputs=[], outputs=[]):
    return 'cat %s > %s' %(" ".join(inputs), outputs[0])

concat = cat(inputs=['/tmp/hello1.txt','/tmp/hello2.txt','/tmp/hello3.txt'],
             outputs=['all_hellos.txt'])

# Open the concatenated file
with open(concat.outputs[0].result(), 'r') as f:
     print(f.read())

Futures

When a normal Python function is invoked, the Python interpreter waits for the function to complete execution and returns the results. In case of long running functions it may not be desirable to wait for completion, instead it is preferable that functions are executed asynchronously. Parsl provides such asynchronous behavior by returning a future in lieu of results. A future is essentially an object that allows Parsl to track the status of an asynchronous task so that it may, in the future, be interrogated to find the status, results, exceptions, etc.

Parsl provides two types of futures: AppFutures and DataFutures. While related, these two types of futures enable subtly different workflow patterns, as we will see.

AppFutures

AppFutures are the basic building block upon which Parsl scripts are built. Every invocation of a Parsl app returns an AppFuture which may be used to manage execution of the app and control the workflow.

Here we show how AppFutures are used to wait for the result of a Python App.

In [ ]:
@python_app
def hello ():
    import time
    time.sleep(5)
    return 'Hello World!'

app_future = hello()

# Check if the app_future is resolved
print ('Done: %s' % app_future.done())

# Print the result of the app_future. Note: this
# call will block and wait for the future to resolve
print ('Result: %s' % app_future.result())
print ('Done: %s' % app_future.done())

DataFutures

While AppFutures represent the execution of an asynchronous app, the DataFuture represents the files it produces. Parsl’s dataflow model, in which data flows from one app to another via files, requires such a construct to enable apps to validate creation of required files and to subsequently resolve dependencies when input files are created. When invoking an app, Parsl requires that a list of output files be specified (using the outputs keyword argument). A DataFuture for each file is returned by the app when it is executed. Throughout execution of the app Parsl will monitor these files to 1) ensure they are created, and 2) pass them to any dependent apps.

In [ ]:
# App that echos an input message to an output file
@bash_app
def slowecho(message, outputs=[]):
    return 'sleep 5; echo %s &> {outputs[0]}' % (message)

# Call echo specifying the output file
hello = slowecho('Hello World!', outputs=['hello1.txt'])

# The AppFuture's outputs attribute is a list of DataFutures
print(hello.outputs)

# Also check the AppFuture
print ('Done: %s' % hello.done())

# Print the contents of the output DataFuture when complete
with open(hello.outputs[0].result(), 'r') as f:
     print(f.read())

# Now that this is complete, check the DataFutures again, and the Appfuture
print(hello.outputs)
print ('Done: %s' % hello.done())

Data Management

Parsl is designed to enable implementation of dataflow patterns. These patterns enable workflows to be defined in which the data passed between apps manages the flow of execution. Dataflow programming models are popular as they can cleanly express, via implicit parallelism, the concurrency needed by many applications in a simple and intuitive way.

Files

Parsl’s file abstraction abstracts local access to a file. It therefore requires only the file path to be defined. Irrespective of where the script, or its apps are executed, Parsl uses this abstraction to access that file. When referencing a Parsl file in an app, Parsl maps the object to the appropriate access path.

In [ ]:
from parsl.data_provider.files import File

# App that copies the contents of 1 or more files to another file
@bash_app
def copy(inputs=[], outputs=[]):
     return 'cat %s &> %s' % (inputs[0], outputs[0])

# cCeate a test file
open('cat-in.txt', 'w').write('Hello World!\n')

# Create Parsl file objects
parsl_infile = File("cat-in.txt")
parsl_outfile = File("cat-out.txt")

# Call the copy app with the Parsl file
copy_future = copy(inputs=[parsl_infile], outputs=[parsl_outfile])

# Read what was redirected to the output file
with open(copy_future.outputs[0].result(), 'r') as f:
     print(f.read())

Remote Files

Parsl is also able to represent remotely accessible files. In this case, you can instantiate a file object using the remote location of the file. Parsl will implictly stage the file to the execution environment before executing any dependent apps. Parsl will also translate the location of the file into a local file path such that any dependent apps can access the file in the same way as a local file. Parsl supports files that are accessible via Globus, FTP, and HTTP.

Here we create a File object using a publicly accessible file with random numbers. We can pass this file to the sort_numbers app in the same way we would a local file.

In [ ]:
@python_app
def sort_numbers(inputs=[]):
    with open(inputs[0].filepath, 'r') as f:
        strs = [n.strip() for n in f.readlines()]
        strs.sort()
        return strs

unsorted_file = File('https://raw.githubusercontent.com/Parsl/parsl-tutorial/master/input/unsorted.txt')

f = sort_numbers(inputs=[unsorted_file])
print (f.result())

Composing a workflow

Now that we understand all the building blocks, we can create workflows with Parsl. Unlike other workflow systems, Parsl creates implicit workflows based on the passing of control or data between Apps. The flexibility of this model allows for the creation of a wide range of workflows from sequential through to complex nested, parallel workflows. As we will see below, a range of workflows can be created by passing AppFutures and DataFutures between Apps.

Sequential workflow

Simple sequential or procedural workflows can be created by passing an AppFuture from one task to another. The following example shows one such workflow, which first generates a random number and then writes it to a file.

In [ ]:
# App that generates a random number
@python_app
def generate(limit):
      from random import randint
      return randint(1,limit)

# App that writes a message to a file
@bash_app
def save(message, outputs=[]):
      return 'echo %s &> {outputs[0]}' % (message)

# Generate the random number
message = generate(10)
print('Random number: %s' % message.result())

# Save the random number to a file
saved = save(message, outputs=['output.txt'])

# Print the output file
with open(saved.outputs[0].result(), 'r') as f:
      print('File contents: %s' % f.read())

Parallel workflow

The most common way that Parsl Apps are executed in parallel is via looping. The following example shows how a simple loop can be used to create many random numbers in parallel.

In [ ]:
# App that generates a random number
@python_app
def generate(limit):
    from random import randint
    return randint(1,limit)

# Generate 5 random numers
rand_nums = []
for i in range(5):
    rand_nums.append(generate(10))

# Wait for all apps to finish and collect the results
outputs = [i.result() for i in rand_nums]

# Print results
print(outputs)

Parallel dataflow

Parallel dataflows can be developed by passing data between Apps. In this example we create a set of files, each with a random number, we then concatenate these files into a single file and compute the sum of all numbers in that file. In the first two Apps files are exchanged. The final App returns the sum as a Python integer.

In [ ]:
# App that generates a random number
@bash_app
def generate(outputs=[]):
    return "echo $(( RANDOM )) &> {outputs[0]}"

# App that concatenates input files into a single output file
@bash_app
def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'):
    return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

# App that calculates the sum of values in a list of input files
@python_app
def total(inputs=[]):
    total = 0
    with open(inputs[0], 'r') as f:
        for l in f:
            total += int(l)
    return total

# Create 5 files with random numbers
output_files = []
for i in range (5):
     output_files.append(generate(outputs=['random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=["all.txt"])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])
print (total.result())

Examples

Monte Carlo workflow

Many scientific applications use the monte-carlo method to compute results.

If a circle with radius is inscribed inside a square with side length then the area of the circle is and the area of the square is . Thus, if uniformly distributed random points are dropped within the suqare then approximately will be inside the circle.

Each call to the function pi() is executed independently and in parallel. The avg_three() app is used to compute the average of the futures that were returned from the pi() calls.

The dependency chain looks like this:

App Calls    pi()  pi()   pi()
              \     |     /
Futures        a    b    c
                \   |   /
App Call        avg_points()
                    |
Future            avg_pi
In [ ]:
# App that estimates pi by placing points in a box
@python_app
def pi(total):
    import random

    # Set the size of the box (edge length) in which we drop random points
    edge_length = 10000
    center = edge_length / 2
    c2  = center ** 2
    count = 0

    for i in range(total):
        # Drop a random point in the box.
        x,y = random.randint(1, edge_length),random.randint(1, edge_length)
        # Count points within the circle
        if (x-center)**2 + (y-center)**2 < c2:
            count += 1

    return (count*4/total)

# App that computes the average of the values
@python_app
def avg_points(a, b, c):
    return (a + b + c)/3

# Estimate three values for pi
a, b, c = pi(10**6), pi(10**6), pi(10**6)

# Compute the average of the three estimates
avg_pi  = avg_points(a, b, c)

# Print the results
print("A: {0:.5f} B: {1:.5f} C: {2:.5f}".format(a.result(), b.result(), c.result()))
print("Average: {0:.5f}".format(avg_pi.result()))

Execution and configuration

Parsl is designed to support arbitrary execution providers (e.g., PCs, clusters, supercomputers) and execution models (e.g., threads, pilot jobs, etc.). Instead, the configuration used to run the script tells Parsl how to execute apps on the desired environment. Parsl provides a high level abstraction, called a Block, for describing the resource configuration for a particular app or script.

Information about the different execution providers and executors supported is included in the Parsl documentation.

Above we used built-in configuration for running with threads. Below we will illustrate how to create a config for different environments.

Local execution with threads

As we saw above, we can configure Parsl to execute apps on a local thread pool. This is a good way to parallelize execution on a local PC. The configuration object defines the executors that will be used as well as a range of other options such as authentication method (e.g., if using SSH), checkpoint files, and executor specific configuration. In the case of threads we define the maximum number of threads to be used.

In [ ]:
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor

local_config = Config(
    executors=[
        ThreadPoolExecutor(
            max_threads=8,
            label='local_threads'
        )
    ]
)

Local execution with pilot jobs

We can also define a configuration that uses IPythonParallel as the executor. In this mode, pilot jobs are used to manage the submission. Parsl creates an IPythonParallel controller to manage execution and deploys one or more IPythonParallel engines (workers) to execute workload. The following config will instantiate this infrastructure locally, it can be trivially extended to include a remote provider (e.g., Cori, Theta, etc.) for execution.

In [ ]:
from parsl.config import Config
from parsl.executors.ipp import IPyParallelExecutor
from libsubmit.providers import LocalProvider
from libsubmit.channels import LocalChannel

ipp_config = Config(
    executors=[
        IPyParallelExecutor(
            label="local_ipp",
            provider=LocalProvider(
                channel=LocalChannel(),
                init_blocks=1,
                max_blocks=1,
            )
        )
    ]
)

Running a workflow using a configuration

We can now run the same workflow using either of the two configurations defined above.

First we clear the current configuration and then load one of the two configurations we defined above. You can change these configurations back and forth to see the same workflow executed using different execution methods. You will notice that executing using the IPyParallel executor takes longer as it has to start worker/engine processes locally before executing the tasks.

In [ ]:
parsl.clear()
parsl.load(ipp_config)
#parsl.load(local_config)
In [ ]:
@bash_app
def generate(outputs=[]):
    return "echo $(( RANDOM )) &> {outputs[0]}"

@bash_app
def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'):
    return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

@python_app
def total(inputs=[]):
    total = 0
    with open(inputs[0], 'r') as f:
        for l in f:
            total += int(l)
    return total

# Create 5 files with random numbers
output_files = []
for i in range (5):
     output_files.append(generate(outputs=['random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0].filepath for i in output_files],
            outputs=["combined.txt"])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])

print (total.result())