NAME
Concurrent::PChannel - prioritized channel
SYNOPSIS
use Concurrent::PChannel; my Concurrent::PChannel:D $pchannel .= new( :priorities(10) ); $pchannel.send("low prio", 0); $pchannel.send("high prio", 1); say $pchannel.receive; ‘high prio’ say $pchannel.receive; ‘low prio’
DESCRIPTION
Concurrent::PChannel
implements concurrent channel where each item sent over the channel has a priority attached allowing items with higher priority to be pulled first from the channel even if they were sent later in time.
For example, imagine there is a factory of devices supplying our input with different kind of events. Some event types are considered critical and must be processed ASAP. And some are, say, informative and can be taken care of when we're idling. In code this could be implemented the following way:
my $pchannel = Concurrent::PChannel.new( :priorities(3) ); for $dev-factory.devices -> $dev { start { react { whenever $dev.event-supply -> $event { given $event.type { when EvCritical { $pchannel.send: $event, 2; } when EvInformative { $pchannel.send: $event, 0; } default { $pchannel.send: $event, 1; } } } } } } for ^WORKER-COUNT { start { while $pchannel.receive -> $event { ... } } }
Performance
The performance was the primary target of this module development. It is implemented using highly-concurrent almost lock-less approach. Benchmarking of different numbers of sending/receiving threads (measured over receive()
method) results in send operations been 1.3-8 times slower than sending over the core Channel
; receiving is 1.1-6 times slower. The difference in numbers is only determined by the ratio of sending/receving threads.
What's more important, the speed is almost independent of the number of priorities used! I.e. it doesn't matter if code is using 10 or 1000 priorities – the time needed to process the two channels would only be dependent on the number of items sent over them.
Terms
Closed And Drained
A channel could be in three different states: normal, closed, and drained. The difference between the last two is that when the channel is closed it might still have some data available for receiving. Only when all items were consumed by the user code then the channel transitions into the closed and the drained state.
Priority
Priority is a positive integer value with 0 being the lowest possible priority. The higher the value the sooner an item with this priority will reach the consumer.
ATTRIBUTES
closed
True
if channel has been closed.
closed-promise
A Promise
which is kept with True
when the channel is closed and broken with a cause object if channel is marked as failed.
drained
True
if channel is closed and no items left to fetch.
drained-promise
A Promise
which is kept with True
when the channel is drained.
elems
Likely number of elements ready for fetch. It is "likely" because in a concurrent environment this value might be changing too often.
prio-count
Number of priority queues pre-allocated.
METHODS
new
new
can be used with any parameters. But usually it is recommended to specifiy :priorities(n)
named parameter to specify the expected number of priorities to be used. This allows the class to pre-allocate all required priority queues beforehand. Without this parameter a class instance starts with only one queue. If method send
is used with a priority which doesn't have a queue assigned yet then the class starts allocating new ones by multiplying the number of existing ones by 2 until get enough of them to cover the requested priority. For example:
my $pchannel.new; $pchannel.send(42, 5);
In this case before sending 42
the class allocates 2 -> 4 -> 8 queues.
Queue allocation code is the only place where locking is used.
Use of priorities
parameter is recommended if some really big number of priorities is expected. This might help in reducing the memory footprint of the code by preventing over-allocation of queues.
send(Mu \item, Int:D $priority = 0)
Send a item
using $priority
. If $priority
is omitted then default 0 is used.
receive
Receive an item from channel. If no data available and the channel is not drained then the method await
for the next item. In other words, it soft-blocks allowing the scheduler to reassing the thread onto another task if necessary until some data is ready for pick up.
If the method is called on a drained channel then it returns a Failure
wrapped around X::PChannel::OpOnClosed
exception with its op
attribute set to string "receive".
poll
Non-blocking fetch of an item. Contrary to receive
doesn't wait for a missing item. Instead the method returns Nil but NoData
typeobject. Concurrent::PChannel::NoData
is a dummy role which sole purpose is to indicate that there is no item ready in a queue.
close
Close a channel.
fail($cause)
Marks a channel as failed and sets failure cause to $cause
.
failed
Returns True
if channel is marked as failed.
Supply
Wraps receive
into a supplier.
EXCEPTIONS
Names is the documentation are given as the exception classes are exported.
X::PChannel::Priorities
Thrown if wrong priorities
parameter passed to the method new
. Attribute priorities
contains the value passed.
X::PChannel::NegativePriority
Thrown if a negative priority value has passed in from user code. Attribute prio
contains the value passed.
X::PChannel::OpOnClosed
Thrown or passed in a Failure
when an operation is performed on a closed channel. Attribute op
contains the operation name.
Note that semantics of this exception is a bit different depending on the kind of operation attempted. For receive
this exception is used when channel is drained. For send
, close
, and fail
it is thrown right away if channel is in closed state.
AUTHOR
Vadim Belman vrurg@cpan.org
COPYRIGHT AND LICENSE
Copyright 2020 Vadim Belman
This library is free software; you can redistribute it and/or modify it under the Artistic License 2.0.