Rand Stats

MongoDB::Queue

zef:Zer0-Tolerance

MongoDB::Queue

A fast, persistent, prioritised job queue for Raku backed by MongoDB::Fast.


Installation

zef install MongoDB::Queue

Requires a running MongoDB instance (tested with MongoDB 6+).


Quick start

use MongoDB::Queue;

my $q = MongoDB::Queue.connect(host => 'localhost', db => 'myapp');

# Produce
$q.enqueue({ task => 'send-email', to => 'user@example.com' }, 'email');

# Consume
while my $job = $q.dequeue('email') {
    send-email($job<payload><to>);
    $q.ack($job<_id>);
    CATCH { default { $q.nack($job<_id>) } }
}

Constructor

MongoDB::Queue.connect

The easiest way to create a queue. Opens a MongoDB::Fast connection and returns a ready-to-use instance.

# By host + port
my $q = MongoDB::Queue.connect(
    host       => 'localhost',   # default
    port       => 27017,         # default
    db         => 'myapp',       # database name  (default: 'queues')
    collection => 'jobs',        # collection name (default: 'jobs')
);

# Or by URL  (host:port)
my $q = MongoDB::Queue.connect(url => 'mongo.internal:27017', db => 'myapp');

# With credentials
my $q = MongoDB::Queue.connect(
    host     => 'localhost',
    username => 'myuser',
    password => 's3cret',
    db       => 'myapp',
);

# Queue-wide defaults (all optional)
my $q = MongoDB::Queue.connect(
    host               => 'localhost',
    db                 => 'myapp',
    visibility-timeout => 30,   # seconds before a processing job is reclaimable
    max-attempts       => 3,    # permanent failure threshold
    poll-interval      => 1,    # seconds to sleep when queue is empty (listen)
    dequeue-batch      => 5,    # candidates fetched per dequeue call
);

MongoDB::Queue.new

Pass your own MongoDB::Fast client when you need fine-grained control.

use MongoDB::Fast;

my $client = MongoDB::Fast.new(host => 'mongo.internal', port => 27017);
my $q = MongoDB::Queue.new(
    client          => $client,
    db-name         => 'myapp',
    collection-name => 'jobs',
);

Producing jobs

enqueue

Insert a single job. type is a required positional argument. Returns the job's _id string.

# Minimal
my $id = $q.enqueue({ task => 'send-email', to => 'user@example.com' }, 'email');

# With priority (higher = dequeued first; default 0)
my $id = $q.enqueue({ task => 'urgent-alert' }, 'email', priority => 10);

# With delayed delivery (invisible for 60 seconds)
my $id = $q.enqueue({ task => 'reminder' }, 'email', delay => 60);

# Override per-queue max-attempts for this job only
my $id = $q.enqueue({ task => 'flaky-api' }, 'webhook', max-attempts => 10);

enqueue-many

Insert multiple jobs of the same type in a single MongoDB round trip. Returns an Array of _id strings in insertion order.

my @payloads = ({ task => 'resize', file => "img$_.jpg" } for ^100);
my @ids = $q.enqueue-many(@payloads, 'image', priority => 5);
say "Inserted {@ids.elems} jobs";

Consuming jobs

dequeue

Atomically claim the next available job of the given type. Returns the full job document as a Hash, or Nil when the queue is empty.

while my $job = $q.dequeue('email') {
    my %data = $job<payload>;
    do-work(%data);
    $q.ack($job<_id>);
    CATCH { default { $q.nack($job<_id>) } }
}

Pass :worker-id to label the claim (useful for debugging):

my $job = $q.dequeue('email', worker-id => 'worker-3');

dequeue-many

Claim up to N jobs of the given type in one pass (one find + N update-one claims). Returns an Array of job documents.

while my @jobs = $q.dequeue-many(20, 'email') {
    process($_<payload>) for @jobs;
    $q.ack-many(@jobs.map(*<_id>));
}

ack

Mark a job done after successful processing.

$q.ack($job<_id>);   # returns True on success

ack-many

Acknowledge a batch in a single round trip. Returns the count of jobs marked done.

my $n = $q.ack-many(@jobs.map(*<_id>));

nack

Return a job to the queue for retry. If the job has exhausted max_attempts it is permanently failed instead.

$q.nack($job<_id>);                # retry immediately
$q.nack($job<_id>, delay => 30);   # retry after 30 seconds

fail

Permanently fail a job right now, bypassing remaining attempts. Only works on processing jobs.

$q.fail($job<_id>);   # returns True on success

Event loop

listen

Block forever, processing jobs as they arrive. ack is called automatically on success; nack is called if the callback throws.

$q.listen(
    -> $job { do-work($job<payload>) },
    type => 'email',
);

All options:

$q.listen(
    -> $job { do-work($job<payload>) },
    type          => 'email',    # job type to consume (required)
    worker-id     => 'worker-1', # label for this process
    poll-interval => 2,          # seconds to sleep when queue is empty
    reclaim-every => 60,         # how often to scan for stale jobs (seconds)
    batch         => 5,          # jobs claimed per poll cycle (default 1)
);

When batch > 1, listen uses dequeue-many internally and processes all claimed jobs before sleeping, maximising throughput under high load.


Maintenance

reclaim-stale

Re-queue (or permanently fail) processing jobs whose locked_at is older than the threshold. Returns the count of jobs reclaimed. Called automatically by listen every reclaim-every seconds.

my $n = $q.reclaim-stale;                    # uses visibility-timeout
my $n = $q.reclaim-stale(older-than => 120); # custom threshold

purge

Delete all done and failed jobs. Returns the count deleted.

my $n = $q.purge;

clear

Delete every job regardless of status. Returns the count deleted.

$q.clear;

drop

Drop the entire collection.

$q.drop;

Metrics

All metric methods require the job type:

say $q.size('email');       # pending jobs of that type
say $q.in-flight('email');  # currently processing
say $q.failed('email');     # permanently failed
say $q.total('email');      # all jobs of that type

my %s = $q.stats('email');
# { pending => 42, processing => 3, failed => 1, total => 46 }

Job document schema

Every job stored in MongoDB has these fields:

FieldTypeNotes
_idStr32-char random hex
payloadHashArbitrary application data
typeStrJob type — used to route jobs to the right worker
statusStrpending | processing | done | failed
priorityIntHigher = dequeued first (default 0)
available_atIntEpoch seconds; gates delayed delivery
available_at_dtDateTimeUTC datetime companion
created_atIntEpoch seconds
created_at_dtDateTimeUTC datetime companion
locked_atIntEpoch seconds when claimed (Any if pending)
locked_at_dtDateTimeUTC datetime companion (Any if pending)
locked_byStrWorker identifier (Any if pending)
attemptsIntIncremented on every dequeue
max_attemptsIntPermanent failure threshold
done_atIntEpoch seconds when acked/failed (Any otherwise)
done_at_dtDateTimeUTC datetime companion (Any otherwise)

Complete example — email worker

Runnable scripts live in examples/.

examples/producer.raku

use lib 'lib';
use MongoDB::Queue;

my $q = MongoDB::Queue.connect(host => 'localhost', db => 'myapp');

my @emails = (
    %( to => 'alice@example.com', subject => 'Hello'  ),
    %( to => 'bob@example.com',   subject => 'Hi'     ),
    %( to => 'ceo@example.com',   subject => 'Urgent' ),
);

# Send the CEO's email first (high priority)
$q.enqueue(@emails[2], 'email', priority => 100);

# Bulk-insert the rest in one round trip
my @ids = $q.enqueue-many(@emails[0..1], 'email');

say "Queued 1 high-priority + {@ids.elems} normal emails.";
say "Stats: {$q.stats('email').raku}";

examples/worker.raku

use lib 'lib';
use MongoDB::Queue;

my $q = MongoDB::Queue.connect(
    host               => 'localhost',
    db                 => 'myapp',
    visibility-timeout => 60,
    max-attempts       => 5,
);

say "Worker {$*PID} started. Waiting for jobs...";

$q.listen(
    -> $job {
        my %email = $job<payload>;
        say "[attempt {$job<attempts>}] Sending email to %email<to>%email<subject>";
        send-smtp(%email);
        say "  ✓ done ({$job<_id>})";
    },
    type          => 'email',
    worker-id     => "w-{$*PID}",
    poll-interval => 2,
    reclaim-every => 60,
    batch         => 5,
);

sub send-smtp(%email) { ... }

Run them in separate terminals (MongoDB must be running):

raku examples/producer.raku
raku examples/worker.raku

How it works

Atomic dequeue uses an optimistic-lock pattern:

  1. find a small batch of pending candidates of the requested type, sorted by priority DESC.
  2. For each candidate, issue update-one with { _id => $id, status => 'pending', type => $type } as the filter.
  3. MongoDB's single-document atomicity ensures only one worker wins each document — the first to update sees modifiedCount = 1; all others see 0 and skip to the next candidate.
  4. If every candidate is stolen by a concurrent worker the loop retries with a fresh find.

This avoids findAndModify (which has a known serialisation bug in MongoDB::Fast) while still guaranteeing at-most-one delivery per dequeue call.


License

Artistic-2.0