In this section, we will explain how return values from steps are organized and the different ways you can access them in a pipeline.
The core principle to keep in mind is that in data-pipeline, there are three ways data is expected to flow:
- inbound from the outside: the pipeline's input, considered immutable
- within the pipeline: step results, considered immutable
- outbound from the pipeline: sinks with side effects, or eventually via the
Outputcontainer
Each step in a pipeline produces a Result which is pushed and tracked into a structure called the ResultContainer:
Afterward, every next component along the way will have access to previously pushed results.
For instance, if we were to write the Step C from above, we could ask for Result A and Result B to be passed as arguments:
public class StepC
{
//where A, B and C are Results
@StepConfig
public C doStuff(@Current A resultA, @Current B resultB) {}
}Or in a Sink, you would have access to all produced results (assuming all steps were executed and no result was discarded):
public class MySink
{
//where A, B and C are Results
@SinkConfig
public void doStuff(@Current A resultA, @Current B resultB, @Current C resultC) {}
}data-pipeline puts an emphasis on the ability to track the provenance or lineage of its execution and data it produces.
For that purpose, it will generate "tags" along the way, and apply those to runs, results, and observability byproducts, making it possible to uniquely identify them.
As it stands there are two types of tags: PipelineTag and ComponentTag.
The PipelineTag is generated at the start of a pipeline run, containing:
- a pipeline
name(as determined by the Pipeline'sid) - a run unique
uid(as determined by theUidGenerator) - a run
author(as determined by theAuthorResolver)
A typical PipelineTag may look like this:
{
"name": "my-pipeline",
"uid": "2axhxfxn4x0agygvfqdevgc69ru",
"author": "some_user@myapp.com"
}They can be injected in Initializer, Step and Sink functions by type as described in their respective documentation.
The ComponentTag is generated at the start of each component run (step, sink, etc.), containing:
- a component
id(as determined by the component'sid) - a component
familydepending on the type of component (INITIALIZER,STEPorSINK) - a run unique
uid(as determined by theUIDGenerator) - a run
pipelineTagreference to current pipeline'sPipelineTag
A typical ComponentTag may look like this:
{
"id": "my-step",
"family": "STEP",
"uid": "2axhxgjnoznxdpvzvas3vnrqh1q",
"pipelineTag": {
"name": "my-pipeline",
"uid": "2axhxfxn4x0agygvfqdevgc69ru",
"author": "some_user@myapp.com"
}
}They can be injected in Initializer, Step and Sink functions by type as described in their respective documentation.
The ResultContainer is responsible for holding references to all results produced during a Pipeline run.
They come with a variety of methods for accessing results, allowing for different strategies:
- results can be identified by their java type (most common use-case)
- results can be identified by a user-defined name (for discriminating generic result types)
- results can be considered
Optionalif a step execution is optional or if it returns results of varying identifiers - results can be considered a
Streamif several steps produce results with the same identifier - results have a scope:
currentandlatestcurrentcovers the current execution scope, i.e. only what steps produced since the pipeline startedlatestcovers both the current execution scope and results supplied via pipeline inheritance
Examples can be found in the documentation:
- for steps: type-binding and name-binding, each with @Current and @Latest variants,
OptionalandStreamvariants - for sinks: type-binding and name-binding, each with @Current and @Latest variants,
OptionalandStreamvariants
One of the core aspects of this data model is that results can be stacked in order to enable state continuity between pipelines.
When a pipeline is executed with a Context initialized from a previous output, the pipeline's result container will be able to access previous results in the latest scope:
Setting it up is done the following way:
var firstOutput = firstPipeline.run("some-input");
otherPipeline.run("other-input", new SimpleContext<>(firstOutput));

