diff --git a/README.md b/README.md index c790a72..a65d3fc 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ The library requires Java 17+, in order to use it, add the following in your `po tech.illuin data-pipeline - 0.24 + 0.24.1 ``` diff --git a/core/pom.xml b/core/pom.xml index a500c16..4103181 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -7,7 +7,7 @@ tech.illuin data-pipeline-parent - 0.24 + 0.24.1 data-pipeline diff --git a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java index 67c7609..6e6e298 100644 --- a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java +++ b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java @@ -64,27 +64,14 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E long start = System.nanoTime(); metrics.setMDC(); - Span span = this.observabilityManager.tracer().nextSpan().name(tag.id()); + Span span = this.observabilityManager.tracer().nextSpan().name("initialization_phase"); try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) { - span.tag("uid", tag.uid()); + span.tag("uid", tag.pipelineTag().uid()); - span.event("initializer:run"); Object payload = this.runInitializer(io.input(), tag, context, metrics); - span.tag("payload_type", payload == null ? "null" : payload.getClass().getName()); - - span.event("output_factory:run"); - Output output = this.outputFactory.create(io.tag(), io.input(), payload, context); - span.tag("output_type", output == null ? "null" : output.getClass().getName()); - - for (Indexer indexer : this.indexers) - { - @SuppressWarnings("unchecked") - Indexer objectIndexer = (Indexer) indexer; - span.event("indexer:run:" + indexer.getClass().getName()); - logger.trace("{}#{} launching indexer {}", io.tag().pipeline(), io.tag().uid(), indexer.getClass().getName()); - objectIndexer.index(payload, output.index()); - } + Output output = this.runOutputFactory(io.input(), io.tag(), payload, context); + this.runIndexers(io.tag(), payload, output); metrics.successCounter().increment(); io.setOutput(output); @@ -93,7 +80,7 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E } catch (Exception e) { metrics.setMDC(e); - span.event("initializer:error"); + span.event("initialization:error"); metrics.failureCounter().increment(); metrics.errorCounter(e).increment(); throw e; @@ -110,15 +97,62 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E private Object runInitializer(I input, ComponentTag tag, Context context, PipelineInitializationMetrics metrics) throws Exception { ComponentContext componentContext = wrapContext(input, context, tag.pipelineTag(), tag); - try { + Span span = this.observabilityManager.tracer().nextSpan().name(tag.id()); + try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) + { + span.tag("uid", tag.uid()); + + span.event("initializer:run"); logger.trace("{}#{} initializing payload", tag.pipelineTag().pipeline(), tag.pipelineTag().uid()); - return this.initializer.execute(input, componentContext, this.uidGenerator); + Object payload = this.initializer.execute(input, componentContext, this.uidGenerator); + span.tag("payload_type", payload == null ? "null" : payload.getClass().getName()); + + return payload; } catch (Exception e) { metrics.setMDC(e); + span.event("initializer:error"); logger.error("{}#{} initializer {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage()); return this.initializer.handleException(e, componentContext, this.uidGenerator); } + finally { + span.end(); + } + } + + private Output runOutputFactory(I input, PipelineTag tag, Object payload, Context context) + { + Span span = this.observabilityManager.tracer().nextSpan().name("output_factory"); + try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) + { + span.event("output_factory:run"); + Output output = this.outputFactory.create(tag, input, payload, context); + span.tag("output_type", output == null ? "null" : output.getClass().getName()); + + return output; + } + finally { + span.end(); + } + } + + private void runIndexers(PipelineTag tag, Object payload, Output output) + { + Span span = this.observabilityManager.tracer().nextSpan().name("payload_indexers"); + try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) + { + for (Indexer indexer : this.indexers) + { + @SuppressWarnings("unchecked") + Indexer objectIndexer = (Indexer) indexer; + span.event("indexer:run:" + indexer.getClass().getName()); + logger.trace("{}#{} launching indexer {}", tag.pipeline(), tag.uid(), indexer.getClass().getName()); + objectIndexer.index(payload, output.index()); + } + } + finally { + span.end(); + } } private ComponentTag createTag(PipelineTag pipelineTag, InitializerDescriptor initializer) diff --git a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java index 8491463..167ccfc 100644 --- a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java +++ b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java @@ -60,18 +60,25 @@ public SinkPhase( @Override public PipelineStrategy run(IO io, Context context, MetricTags metricTags) throws Exception { - for (SinkDescriptor descriptor : this.sinks) + Span span = this.observabilityManager.tracer().nextSpan().name("sink_phase"); + try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) { - ComponentTag tag = this.createTag(io.output().tag(), descriptor); - PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); + for (SinkDescriptor descriptor : this.sinks) + { + ComponentTag tag = this.createTag(io.output().tag(), descriptor); + PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); - if (descriptor.isAsync()) - this.runSinkAsynchronously(descriptor, tag, io, context, metrics); - else - this.runSinkSynchronously(descriptor, tag, io, context, metrics); - } + if (descriptor.isAsync()) + this.runSinkAsynchronously(descriptor, tag, io, context, metrics, span); + else + this.runSinkSynchronously(descriptor, tag, io, context, metrics); + } - return PipelineStrategy.CONTINUE; + return PipelineStrategy.CONTINUE; + } + finally { + span.end(); + } } @SuppressWarnings("IllegalCatch") @@ -108,7 +115,7 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO i } @SuppressWarnings("IllegalCatch") - private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO io, Context context, PipelineSinkMetrics metrics) + private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO io, Context context, PipelineSinkMetrics metrics, Span phaseSpan) { if (this.sinkExecutor == null) throw new IllegalStateException("An asynchronous run has been initiated but there is no active executor"); @@ -121,7 +128,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO long start = System.nanoTime(); MDC.setContextMap(mdc); metrics.setMDC(); - Span span = this.observabilityManager.tracer().nextSpan().name(tag.id()); + Span span = this.observabilityManager.tracer().nextSpan(phaseSpan).name(tag.id()); try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) { span.tag("uid", tag.uid()); diff --git a/pom.xml b/pom.xml index 4a1ff86..c43379e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ tech.illuin data-pipeline-parent - 0.24 + 0.24.1 pom diff --git a/resilience4j/pom.xml b/resilience4j/pom.xml index ead14a4..aa68929 100644 --- a/resilience4j/pom.xml +++ b/resilience4j/pom.xml @@ -7,7 +7,7 @@ tech.illuin data-pipeline-parent - 0.24 + 0.24.1 data-pipeline-resilience4j