Rand Stats

Async::Workers

zef:vrurg

NAME

Async::Workers - Asynchronous threaded workers

SYNOPSIS

use Async::Workers;

my $wm = Async::Workers.new( :max-workers(5) );

for 1..10 -> $n {
    $wm.do-async: {
        sleep 1.rand;
        say "Worker #$n";
    }
}

await $wm;

DESCRIPTION

This module provides an easy way to execute a number of jobs in parallel while allowing to keep the resources consumed by your code under control.

Both OO and procedural interfaces are provided.

Reliability

This module has been tested by running 20k repetitions of it's test suite using 56 parallel processes. This doesn't prove there're no bugs, but what can be told for certain is that within the tested scenarios the robustness is high enough.

Terminology

How it works.

The goal is achieved by combining a queue of jobs and a number of pre-spawned threads controlled by workers. A job is picked from the queue by a currently unoccupied worker and the code object associated with it is invoked. Since the number of workers is limited it is easier to predict and plan CPU usage.

Yet, it is still possible to over-use memory and cause heavy swapping or even overflows in cases when there is too many jobs are produced and an average one takes longer to complete than it is needed to generate a new one. To provide some control over this situation one can define hi and lo thresholds on the queue size. When the queue contains as many jobs, as defined by the hi threshold, do-async method blocks upon receiving a new job request and unblocks only when the queue shortens down to its lo threshold.

The worker manager doesn't start workers until the first job is sent to the queue. It is also possible to shutdown all workers if they're no longer needed.

Some internal events are reported with messages from Async::Workers::Msg. See the on_msg method below.

ATTRIBUTES

max-workers

Maximum number of workers. Defaults to $*KERNEL.cpu-cores.

max-jobs

Set the maximum number of jobs a worker should process before stopping and letting the manager to spawn a new one. The functionality is not activated if the attribute is left undefined.

lo-threshold, hi-threshold

Low and high thresholds of the queue size.

queued

Current queue size. If the queue has been blocked due to reaching hi-threshold then jobs awaiting for unblock are not counted toward this value.

running

The number of currently occupied workers.

completed

A Promise which is kept when manager completes all jobs after transitioning into shutdown state. When this happens the job queue is closed and all workers are requested to stop. Submission of a new job with do-async at this point will re-vivify the queue and return the manager into working state.

In case of an internal failure the promise will be broken with an exception.

empty

A Promise which is kept each time the queue gets emptied. Note that the initially empty queue is not reflected with this attribute. Only when the queue contained at least one element and then went down to zero length this promise is kept. In other words, it happens when Async::Workers::Msg::Queue::Empty is emitted.

Immediately after being kept the attribute gets replaced with a fresh Promise. So that the following example will finish only if the queue has been emptied twice:

await $wm.empty;
await $wm.empty;

quiet

If set to True then no exceptions thrown by jobs are reported. In this case it is recommended to monitor messages for Async::Workers::Msg::Job::Died.

METHODS

do-async( &code, |params --> Async::Workers::Job )

Takes a &code object and wraps it into a job object. params are passed to &code when it gets executed.

This method blocks if hi-threshold is defined and the queue size has reached the limit.

If no error happens then the method returns an Async::Workers::Job instance. Otherwise it may throw either X::Manager::Down if the manager is in shutdown or completed status; or it may throw X::Manager::NoQueue if somehow the job queue has not been initialized properly.

shutdown

Switches the manager into shutdown state and closes the job queue. Since the queue might still contain some incomplete jobs it is likely to take some time until the completed promise gets kept. Normally it'd be helpful to await for the manager:

my $wm = Async::Workers.new(...);
...
$wm.shutdown;
await $wm;

In this case the execution blocks until the job queue is emptied. Note that at this point completed might still not been fulfilled because workers are being shutting down in the meanwhile.

workers

Returns the number of started workers.

workers( UInt $num )

Sets the maximum number of workers (max-workers attribute). Can be used at any time without shutting down the manager:

$wm = Async::Worker.new: :max-workers(20);
$wm.do-async: &job1 for ^$repetitions;
$wm.workers($wm.workers - 5);
$wm.do-async: &job2 for ^$repetitions;

If user increases the number of workers then as many additional ones are started as necessary.

On the contrary, if the number of workers is reduced then as many of them are requested to stop as needed to meet user's demand. Note that this is done by injecting special jobs. It means that for a really long queue it may take quite a time before the extra workers receive the stop command. This behaviour may change in the future.

set-threshold( UInt :$lo, Num :$hi )

Dynamically sets high and low queue thresholds. The high might be set to Inf to define unlimited queue size. Note that this would translate into undefined value of hi-threshold attribute.

on_msg( &callback )

Submits a Async::Workers::Msg message object to user code passed in &callback. Internally this method does tapping on a message Supply and returns a resulting Tap object.

The following messages can currently be emitted by the manager (names are shortened to not include Async::Workers::Msg:: prefix):

messages

This method produces a Supply which emits messages.

HELPER SUBS

stop-worker($rc?, :$soft = False)

Bypasses to the current worker stop method.

If called from within a job code it would cause the worker controlling the job to stop. If this would reduce the number of workers to less than max-workers then the manager will spawn as many new ones as needed:

$wm.do-async: {
    if $something-went-wrong {
        stop-worker
    }
}

Note that the job would be stopped too, unless :soft parameter is used. In this case both the job and its worker will be allowed to complete. The worker will stop after the job is done.

PROCEDURAL

Procedural interface hides a singleton object behind it. The following subs are exported by the module:

async-workers( |params --> Async::Workers:D )

Returns the singleton object. Creates it if necessary. If supplied with parameters they're passed to the constructor. If singleton is already created then the parameters are ignored.

do-async

Bypasses to the corresponding method on the singleton.

do-async {
    say "My task";
}

shutdown-workers

Bypasses to shutdown on the singleton.

AUTHOR

Vadim Belman vrurg@cpan.org

LICENSE

Artistic License 2.0

See the LICENSE file in this distribution.