
NAME
LLM::Data::Pipeline - Generic pipeline framework for sequential data processing with checkpointing
SYNOPSIS
use LLM::Data::Pipeline;
use LLM::Data::Pipeline::Context;
use LLM::Data::Pipeline::Step;
use LLM::Data::Pipeline::Plan;
use LLM::Data::Pipeline::Runner;
class MyStep does LLM::Data::Pipeline::Step {
method name(--> Str:D) { 'my-step' }
method description(--> Str:D) { 'Does something useful' }
method requires(--> List) { ('input',) }
method optional(--> List) { ('config',) }
method provides(--> List) { ('output',) }
method execute(LLM::Data::Pipeline::Context:D $ctx --> Nil) {
my $input = $ctx.get('input');
$ctx.set('output', "processed: $input");
}
}
my LLM::Data::Pipeline::Plan $plan .= new;
$plan.add-step(MyStep.new);
my LLM::Data::Pipeline::Context $ctx .= new;
$ctx.set('input', 'hello');
my LLM::Data::Pipeline::Runner $runner .= new(
:on-step(-> $name, $event { say "$name: $event" })
);
$runner.run($plan, $ctx, :checkpoint-path('checkpoint.json'.IO));
say $ctx.get('output'); # "processed: hello"
# Resume from checkpoint
my $ctx2 = $runner.resume($plan, 'checkpoint.json'.IO);
DESCRIPTION
LLM::Data::Pipeline provides a domain-agnostic framework for building sequential data processing pipelines with automatic checkpointing and resume capability.
LLM::Data::Pipeline::Step (Role)
Any class can be a pipeline step by doing this role.
method name(--> Str:D) { ... } # Unique step identifier
method description(--> Str:D) { ... } # Human-readable description
method requires(--> List) { () } # Context keys this step must have
method optional(--> List) { () } # Context keys this step can use
method provides(--> List) { ... } # Context keys this step writes
method execute(Context:D $ctx --> Nil) { ... } # Do the work
LLM::Data::Pipeline::Context
String-keyed data bag flowing through the pipeline. Values must be JSON-serializable.
my LLM::Data::Pipeline::Context $ctx .= new;
$ctx.set('key', 'value');
$ctx.get('key'); # 'value'
$ctx.has('key'); # True
$ctx.keys; # Sorted list of all keys
# Serialization (for checkpointing)
my %snap = $ctx.snapshot; # Deep-copy Hash via JSON round-trip
my $restored = LLM::Data::Pipeline::Context.from-snapshot(%snap);
LLM::Data::Pipeline::Plan
Ordered list of steps with dependency validation.
my LLM::Data::Pipeline::Plan $plan .= new;
$plan.add-step(StepA.new);
$plan.add-step(StepB.new);
$plan.steps; # List of steps in order
$plan.validate($ctx); # Dies with diagnostic if deps unsatisfied
Validation walks steps in order, tracking available keys from each step's provides. Reports all missing keys and duplicate step names in one error.
LLM::Data::Pipeline::Runner
Executes a Plan against a Context with checkpointing.
my LLM::Data::Pipeline::Runner $runner .= new(
:on-step(-> Str:D $name, Str:D $event { ... }) # 'start', 'complete', 'skip'
);
# Run from beginning (validates first)
my $ctx = $runner.run($plan, $ctx, :checkpoint-path($path));
# Resume from checkpoint (skips completed steps)
my $ctx = $runner.resume($plan, $checkpoint-path);
Checkpoints are JSON files written after each step, containing completed step names and the full context snapshot. Writes use temp file + rename for atomicity.
AUTHOR
Matt Doughty matt@apogee.guru
COPYRIGHT AND LICENSE
Copyright 2026 Matt Doughty
This library is free software; you can redistribute it and/or modify it under the Artistic License 2.0.