parsl.dataflow.flow_control.FlowControl

class parsl.dataflow.flow_control.FlowControl(dfk, *args, threshold=20, interval=5)[source]

Implements threshold-interval based flow control.

The overall goal is to trap the flow of apps from the workflow, measure it and redirect it the appropriate executors for processing.

This is based on the following logic:

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         count = get_events_since(start)
         if count >= THRESHOLD :
             break

    callback()

This logic ensures that the callbacks are activated with a maximum delay of interval for systems with infrequent events as well as systems which would generate large bursts of events.

Once a callback is triggered, the callback generally runs a strategy method on the sites available as well asqeuque

TODO: When the debug logs are enabled this module emits duplicate messages. This issue needs more debugging. What I’ve learnt so far is that the duplicate messages are present only when the timer thread is started, so this could be from a duplicate logger being added by the thread.

__init__(dfk, *args, threshold=20, interval=5)[source]

Initialize the flowcontrol object.

We start the timer thread here

Parameters

dfk (-) – DFK object to track parsl progress

KWargs:
  • threshold (int) : Tasks after which the callback is triggered

  • interval (int) : seconds after which timer expires

Methods

__init__(dfk, *args[, threshold, interval])

Initialize the flowcontrol object.

add_executors(executors)

close()

Merge the threads and terminate.

make_callback()

Makes the callback and resets the timer.

add_executors(executors: Sequence[parsl.executors.base.ParslExecutor]) None[source]
close() None[source]

Merge the threads and terminate.

make_callback() None[source]

Makes the callback and resets the timer.