Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The library requires Java 17+, in order to use it, add the following in your `po
<dependency>
<groupId>tech.illuin</groupId>
<artifactId>data-pipeline</artifactId>
<version>0.24</version>
<version>0.24.1</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>tech.illuin</groupId>
<artifactId>data-pipeline-parent</artifactId>
<version>0.24</version>
<version>0.24.1</version>
</parent>

<artifactId>data-pipeline</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,14 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E

long start = System.nanoTime();
metrics.setMDC();
Span span = this.observabilityManager.tracer().nextSpan().name(tag.id());
Span span = this.observabilityManager.tracer().nextSpan().name("initialization_phase");
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
span.tag("uid", tag.uid());
span.tag("uid", tag.pipelineTag().uid());

span.event("initializer:run");
Object payload = this.runInitializer(io.input(), tag, context, metrics);
span.tag("payload_type", payload == null ? "null" : payload.getClass().getName());

span.event("output_factory:run");
Output output = this.outputFactory.create(io.tag(), io.input(), payload, context);
span.tag("output_type", output == null ? "null" : output.getClass().getName());

for (Indexer<?> indexer : this.indexers)
{
@SuppressWarnings("unchecked")
Indexer<Object> objectIndexer = (Indexer<Object>) indexer;
span.event("indexer:run:" + indexer.getClass().getName());
logger.trace("{}#{} launching indexer {}", io.tag().pipeline(), io.tag().uid(), indexer.getClass().getName());
objectIndexer.index(payload, output.index());
}
Output output = this.runOutputFactory(io.input(), io.tag(), payload, context);
this.runIndexers(io.tag(), payload, output);

metrics.successCounter().increment();
io.setOutput(output);
Expand All @@ -93,7 +80,7 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E
}
catch (Exception e) {
metrics.setMDC(e);
span.event("initializer:error");
span.event("initialization:error");
metrics.failureCounter().increment();
metrics.errorCounter(e).increment();
throw e;
Expand All @@ -110,15 +97,62 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E
private Object runInitializer(I input, ComponentTag tag, Context context, PipelineInitializationMetrics metrics) throws Exception
{
ComponentContext componentContext = wrapContext(input, context, tag.pipelineTag(), tag);
try {
Span span = this.observabilityManager.tracer().nextSpan().name(tag.id());
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
span.tag("uid", tag.uid());

span.event("initializer:run");
logger.trace("{}#{} initializing payload", tag.pipelineTag().pipeline(), tag.pipelineTag().uid());
return this.initializer.execute(input, componentContext, this.uidGenerator);
Object payload = this.initializer.execute(input, componentContext, this.uidGenerator);
span.tag("payload_type", payload == null ? "null" : payload.getClass().getName());

return payload;
}
catch (Exception e) {
metrics.setMDC(e);
span.event("initializer:error");
logger.error("{}#{} initializer {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage());
return this.initializer.handleException(e, componentContext, this.uidGenerator);
}
finally {
span.end();
}
}

private Output runOutputFactory(I input, PipelineTag tag, Object payload, Context context)
{
Span span = this.observabilityManager.tracer().nextSpan().name("output_factory");
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
span.event("output_factory:run");
Output output = this.outputFactory.create(tag, input, payload, context);
span.tag("output_type", output == null ? "null" : output.getClass().getName());

return output;
}
finally {
span.end();
}
}

private void runIndexers(PipelineTag tag, Object payload, Output output)
{
Span span = this.observabilityManager.tracer().nextSpan().name("payload_indexers");
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
for (Indexer<?> indexer : this.indexers)
{
@SuppressWarnings("unchecked")
Indexer<Object> objectIndexer = (Indexer<Object>) indexer;
span.event("indexer:run:" + indexer.getClass().getName());
logger.trace("{}#{} launching indexer {}", tag.pipeline(), tag.uid(), indexer.getClass().getName());
objectIndexer.index(payload, output.index());
}
}
finally {
span.end();
}
}

private ComponentTag createTag(PipelineTag pipelineTag, InitializerDescriptor<?> initializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,25 @@ public SinkPhase(
@Override
public PipelineStrategy run(IO<I> io, Context context, MetricTags metricTags) throws Exception
{
for (SinkDescriptor descriptor : this.sinks)
Span span = this.observabilityManager.tracer().nextSpan().name("sink_phase");
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
ComponentTag tag = this.createTag(io.output().tag(), descriptor);
PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags);
for (SinkDescriptor descriptor : this.sinks)
{
ComponentTag tag = this.createTag(io.output().tag(), descriptor);
PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags);

if (descriptor.isAsync())
this.runSinkAsynchronously(descriptor, tag, io, context, metrics);
else
this.runSinkSynchronously(descriptor, tag, io, context, metrics);
}
if (descriptor.isAsync())
this.runSinkAsynchronously(descriptor, tag, io, context, metrics, span);
else
this.runSinkSynchronously(descriptor, tag, io, context, metrics);
}

return PipelineStrategy.CONTINUE;
return PipelineStrategy.CONTINUE;
}
finally {
span.end();
}
}

@SuppressWarnings("IllegalCatch")
Expand Down Expand Up @@ -108,7 +115,7 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> i
}

@SuppressWarnings("IllegalCatch")
private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, Context context, PipelineSinkMetrics metrics)
private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, Context context, PipelineSinkMetrics metrics, Span phaseSpan)
{
if (this.sinkExecutor == null)
throw new IllegalStateException("An asynchronous run has been initiated but there is no active executor");
Expand All @@ -121,7 +128,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I>
long start = System.nanoTime();
MDC.setContextMap(mdc);
metrics.setMDC();
Span span = this.observabilityManager.tracer().nextSpan().name(tag.id());
Span span = this.observabilityManager.tracer().nextSpan(phaseSpan).name(tag.id());
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
span.tag("uid", tag.uid());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>tech.illuin</groupId>
<artifactId>data-pipeline-parent</artifactId>
<version>0.24</version>
<version>0.24.1</version>
<packaging>pom</packaging>

<modules>
Expand Down
2 changes: 1 addition & 1 deletion resilience4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>tech.illuin</groupId>
<artifactId>data-pipeline-parent</artifactId>
<version>0.24</version>
<version>0.24.1</version>
</parent>

<artifactId>data-pipeline-resilience4j</artifactId>
Expand Down