diff --git a/.gitignore b/.gitignore index 89910c85..03b40008 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .gitignore~ **/target /logs +/data *.iml .idea .java-version diff --git a/core/src/main/java/tech/illuin/pipeline/CompositePipeline.java b/core/src/main/java/tech/illuin/pipeline/CompositePipeline.java index cd075e4b..fb2271d3 100644 --- a/core/src/main/java/tech/illuin/pipeline/CompositePipeline.java +++ b/core/src/main/java/tech/illuin/pipeline/CompositePipeline.java @@ -19,6 +19,7 @@ import tech.illuin.pipeline.input.indexer.Indexer; import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; +import tech.illuin.pipeline.metering.PipelineMarkerManager; import tech.illuin.pipeline.metering.PipelineMetrics; import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.metering.tag.MetricTags; @@ -120,7 +121,8 @@ public Output run(I input, Context context) throws PipelineException { PipelineTag tag = this.createTag(input, context); MetricTags metricTags = this.tagResolver.resolve(input, context); - PipelineMetrics metrics = new PipelineMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); + PipelineMarkerManager markerManager = new PipelineMarkerManager(tag, metricTags); + PipelineMetrics metrics = new PipelineMetrics(this.observabilityManager.meterRegistry(), markerManager); IO io = new IO<>(tag, input); long start = System.nanoTime(); diff --git a/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/MarkerManagerMapperFactory.java b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/MarkerManagerMapperFactory.java new file mode 100644 index 00000000..d5345f02 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/MarkerManagerMapperFactory.java @@ -0,0 +1,29 @@ +package tech.illuin.pipeline.builder.runner_compiler.argument_resolver.mapper_factory; + +import tech.illuin.pipeline.metering.MarkerManager; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Parameter; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class MarkerManagerMapperFactory implements MethodArgumentMapperFactory +{ + @Override + public boolean canHandle(Annotation category, Class parameterType) + { + return MarkerManager.class.isAssignableFrom(parameterType); + } + + @Override + public MethodArgumentMapper produce(Annotation category, Parameter parameter) + { + Class parameterType = parameter.getType(); + return args -> { + if (!parameterType.isAssignableFrom(args.markerManager().getClass())) + throw new IllegalArgumentException("This component expects the marker generator to be a subtype of " + parameterType.getSimpleName() + " actual manager is of type " + args.markerManager().getClass().getSimpleName()); + return args.markerManager(); + }; + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/ObservabilityManagerMapperFactory.java b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/ObservabilityManagerMapperFactory.java new file mode 100644 index 00000000..30676124 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/mapper_factory/ObservabilityManagerMapperFactory.java @@ -0,0 +1,29 @@ +package tech.illuin.pipeline.builder.runner_compiler.argument_resolver.mapper_factory; + +import tech.illuin.pipeline.metering.manager.ObservabilityManager; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Parameter; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class ObservabilityManagerMapperFactory implements MethodArgumentMapperFactory +{ + @Override + public boolean canHandle(Annotation category, Class parameterType) + { + return ObservabilityManager.class.isAssignableFrom(parameterType); + } + + @Override + public MethodArgumentMapper produce(Annotation category, Parameter parameter) + { + Class parameterType = parameter.getType(); + return args -> { + if (!parameterType.isAssignableFrom(args.observabilityManager().getClass())) + throw new IllegalArgumentException("This component expects the observability manager to be a subtype of " + parameterType.getSimpleName() + " actual manager is of type " + args.observabilityManager().getClass().getSimpleName()); + return args.observabilityManager(); + }; + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/method_arguments/MethodArguments.java b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/method_arguments/MethodArguments.java index ef56b58d..a4c5e020 100644 --- a/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/method_arguments/MethodArguments.java +++ b/core/src/main/java/tech/illuin/pipeline/builder/runner_compiler/argument_resolver/method_arguments/MethodArguments.java @@ -2,6 +2,8 @@ import tech.illuin.pipeline.context.Context; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.output.ComponentTag; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.output.PipelineTag; @@ -21,5 +23,7 @@ public record MethodArguments( Context context, PipelineTag pipelineTag, ComponentTag componentTag, - UIDGenerator uidGenerator + UIDGenerator uidGenerator, + ObservabilityManager observabilityManager, + MarkerManager markerManager ) {} diff --git a/core/src/main/java/tech/illuin/pipeline/context/ComponentContext.java b/core/src/main/java/tech/illuin/pipeline/context/ComponentContext.java index 54349577..cd436a5a 100644 --- a/core/src/main/java/tech/illuin/pipeline/context/ComponentContext.java +++ b/core/src/main/java/tech/illuin/pipeline/context/ComponentContext.java @@ -1,6 +1,8 @@ package tech.illuin.pipeline.context; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.output.ComponentTag; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.output.PipelineTag; @@ -15,22 +17,25 @@ public class ComponentContext implements LocalContext { private final Context globalContext; private final Object input; - private final PipelineTag pipelineTag; private final ComponentTag componentTag; private final UIDGenerator uidGenerator; + private final ObservabilityManager observabilityManager; + private final MarkerManager markerManager; public ComponentContext( Context globalContext, Object input, - PipelineTag pipelineTag, ComponentTag componentTag, - UIDGenerator uidGenerator + UIDGenerator uidGenerator, + ObservabilityManager observabilityManager, + MarkerManager markerManager ) { this.globalContext = globalContext; this.input = input; - this.pipelineTag = pipelineTag; this.componentTag = componentTag; this.uidGenerator = uidGenerator; + this.observabilityManager = observabilityManager; + this.markerManager = markerManager; } @Override @@ -42,7 +47,7 @@ public Object input() @Override public PipelineTag pipelineTag() { - return this.pipelineTag; + return this.componentTag.pipelineTag(); } @Override @@ -57,6 +62,18 @@ public UIDGenerator uidGenerator() return this.uidGenerator; } + @Override + public ObservabilityManager observabilityManager() + { + return this.observabilityManager; + } + + @Override + public MarkerManager markerManager() + { + return this.markerManager; + } + @Override public Optional parent() { diff --git a/core/src/main/java/tech/illuin/pipeline/context/LocalContext.java b/core/src/main/java/tech/illuin/pipeline/context/LocalContext.java index 45d6b58d..ae3585e0 100644 --- a/core/src/main/java/tech/illuin/pipeline/context/LocalContext.java +++ b/core/src/main/java/tech/illuin/pipeline/context/LocalContext.java @@ -1,6 +1,8 @@ package tech.illuin.pipeline.context; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.output.ComponentTag; import tech.illuin.pipeline.output.PipelineTag; @@ -16,4 +18,8 @@ public interface LocalContext extends Context ComponentTag componentTag(); UIDGenerator uidGenerator(); + + ObservabilityManager observabilityManager(); + + MarkerManager markerManager(); } diff --git a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java index 6e6e2984..c70cb6f5 100644 --- a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java +++ b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/InitializerPhase.java @@ -6,13 +6,15 @@ import org.slf4j.LoggerFactory; import tech.illuin.pipeline.context.ComponentContext; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.execution.phase.IO; import tech.illuin.pipeline.execution.phase.PipelinePhase; import tech.illuin.pipeline.execution.phase.PipelineStrategy; import tech.illuin.pipeline.input.indexer.Indexer; import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor; +import tech.illuin.pipeline.input.initializer.metering.InitializationMarkerManager; +import tech.illuin.pipeline.input.initializer.metering.InitializationMetrics; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; -import tech.illuin.pipeline.metering.PipelineInitializationMetrics; import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.metering.tag.TagResolver; @@ -60,7 +62,9 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E ComponentTag tag = this.createTag(io.tag(), this.initializer); MetricTags metricTags = this.tagResolver.resolve(io.input(), context); - PipelineInitializationMetrics metrics = new PipelineInitializationMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); + InitializationMarkerManager markerManager = new InitializationMarkerManager(tag, metricTags); + InitializationMetrics metrics = new InitializationMetrics(this.observabilityManager.meterRegistry(), markerManager); + LocalContext localContext = new ComponentContext(context, io.input(), tag, this.uidGenerator, this.observabilityManager, markerManager); long start = System.nanoTime(); metrics.setMDC(); @@ -69,8 +73,8 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E { span.tag("uid", tag.pipelineTag().uid()); - Object payload = this.runInitializer(io.input(), tag, context, metrics); - Output output = this.runOutputFactory(io.input(), io.tag(), payload, context); + Object payload = this.runInitializer(io.input(), tag, localContext, metrics); + Output output = this.runOutputFactory(io.input(), io.tag(), payload, localContext); this.runIndexers(io.tag(), payload, output); metrics.successCounter().increment(); @@ -94,9 +98,8 @@ public PipelineStrategy run(IO io, Context context, MetricTags tags) throws E } @SuppressWarnings("IllegalCatch") - private Object runInitializer(I input, ComponentTag tag, Context context, PipelineInitializationMetrics metrics) throws Exception + private Object runInitializer(I input, ComponentTag tag, LocalContext context, InitializationMetrics metrics) throws Exception { - ComponentContext componentContext = wrapContext(input, context, tag.pipelineTag(), tag); Span span = this.observabilityManager.tracer().nextSpan().name(tag.id()); try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) { @@ -104,7 +107,7 @@ private Object runInitializer(I input, ComponentTag tag, Context context, Pipeli span.event("initializer:run"); logger.trace("{}#{} initializing payload", tag.pipelineTag().pipeline(), tag.pipelineTag().uid()); - Object payload = this.initializer.execute(input, componentContext, this.uidGenerator); + Object payload = this.initializer.execute(input, context, this.uidGenerator); span.tag("payload_type", payload == null ? "null" : payload.getClass().getName()); return payload; @@ -113,14 +116,14 @@ private Object runInitializer(I input, ComponentTag tag, Context context, Pipeli metrics.setMDC(e); span.event("initializer:error"); logger.error("{}#{} initializer {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage()); - return this.initializer.handleException(e, componentContext, this.uidGenerator); + return this.initializer.handleException(e, context, this.uidGenerator); } finally { span.end(); } } - private Output runOutputFactory(I input, PipelineTag tag, Object payload, Context context) + private Output runOutputFactory(I input, PipelineTag tag, Object payload, LocalContext context) { Span span = this.observabilityManager.tracer().nextSpan().name("output_factory"); try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start())) @@ -159,9 +162,4 @@ private ComponentTag createTag(PipelineTag pipelineTag, InitializerDescriptor { return new ComponentTag(this.uidGenerator.generate(), pipelineTag, initializer.id(), ComponentFamily.INITIALIZER); } - - private ComponentContext wrapContext(I input, Context context, PipelineTag pipelineTag, ComponentTag componentTag) - { - return new ComponentContext(context, input, pipelineTag, componentTag, this.uidGenerator); - } } diff --git a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java index 167ccfc0..8256d9d7 100644 --- a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java +++ b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/SinkPhase.java @@ -7,17 +7,19 @@ import org.slf4j.MDC; import tech.illuin.pipeline.context.ComponentContext; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.execution.phase.IO; import tech.illuin.pipeline.execution.phase.PipelinePhase; import tech.illuin.pipeline.execution.phase.PipelineStrategy; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; -import tech.illuin.pipeline.metering.PipelineSinkMetrics; import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.ComponentFamily; import tech.illuin.pipeline.output.ComponentTag; import tech.illuin.pipeline.output.PipelineTag; import tech.illuin.pipeline.sink.builder.SinkDescriptor; +import tech.illuin.pipeline.sink.metering.SinkMarkerManager; +import tech.illuin.pipeline.sink.metering.SinkMetrics; import java.util.List; import java.util.Map; @@ -66,12 +68,14 @@ public PipelineStrategy run(IO io, Context context, MetricTags metricTags) th for (SinkDescriptor descriptor : this.sinks) { ComponentTag tag = this.createTag(io.output().tag(), descriptor); - PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); + SinkMarkerManager markerManager = new SinkMarkerManager(tag, metricTags); + SinkMetrics metrics = new SinkMetrics(this.observabilityManager.meterRegistry(), markerManager); + LocalContext localContext = new ComponentContext(context, io.input(), tag, this.uidGenerator, this.observabilityManager, markerManager); if (descriptor.isAsync()) - this.runSinkAsynchronously(descriptor, tag, io, context, metrics, span); + this.runSinkAsynchronously(descriptor, tag, io, localContext, metrics, span); else - this.runSinkSynchronously(descriptor, tag, io, context, metrics); + this.runSinkSynchronously(descriptor, tag, io, localContext, metrics); } return PipelineStrategy.CONTINUE; @@ -82,10 +86,8 @@ public PipelineStrategy run(IO io, Context context, MetricTags metricTags) th } @SuppressWarnings("IllegalCatch") - private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO io, Context context, PipelineSinkMetrics metrics) throws Exception + private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO io, LocalContext context, SinkMetrics metrics) throws Exception { - ComponentContext componentContext = wrapContext(io.input(), context, tag.pipelineTag(), tag); - long start = System.nanoTime(); metrics.setMDC(); Span span = this.observabilityManager.tracer().nextSpan().name(tag.id()); @@ -95,7 +97,7 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO i span.event("sink:run_sync"); logger.trace("{}#{} launching sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id()); - sink.execute(io.output(), componentContext); + sink.execute(io.output(), context); metrics.successCounter().increment(); } catch (Exception e) { @@ -115,13 +117,11 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO i } @SuppressWarnings("IllegalCatch") - private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO io, Context context, PipelineSinkMetrics metrics, Span phaseSpan) + private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO io, LocalContext context, SinkMetrics metrics, Span phaseSpan) { if (this.sinkExecutor == null) throw new IllegalStateException("An asynchronous run has been initiated but there is no active executor"); - ComponentContext componentContext = wrapContext(io.input(), context, tag.pipelineTag(), tag); - logger.trace("{}#{} queuing sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id()); Map mdc = MDC.getCopyOfContextMap(); CompletableFuture.runAsync(() -> { @@ -135,7 +135,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO span.event("sink:run_async"); logger.trace("{}#{} launching sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id()); - sink.execute(io.output(), componentContext); + sink.execute(io.output(), context); metrics.successCounter().increment(); } catch (Exception e) { @@ -144,7 +144,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO logger.error("{}#{} sink {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage()); metrics.failureCounter().increment(); metrics.errorCounter(e).increment(); - sink.handleExceptionThenSwallow(e, io.output(), componentContext); + sink.handleExceptionThenSwallow(e, io.output(), context); } finally { metrics.runTimer().record(System.nanoTime() - start, TimeUnit.NANOSECONDS); @@ -160,11 +160,6 @@ private ComponentTag createTag(PipelineTag pipelineTag, SinkDescriptor sink) return new ComponentTag(this.uidGenerator.generate(), pipelineTag, sink.id(), ComponentFamily.SINK); } - private ComponentContext wrapContext(I input, Context context, PipelineTag pipelineTag, ComponentTag componentTag) - { - return new ComponentContext(context, input, pipelineTag, componentTag, this.uidGenerator); - } - private ExecutorService initExecutor(Supplier provider) { if (this.sinks.stream().anyMatch(SinkDescriptor::isAsync)) diff --git a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/StepPhase.java b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/StepPhase.java index c38a3e58..caa0aa93 100644 --- a/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/StepPhase.java +++ b/core/src/main/java/tech/illuin/pipeline/execution/phase/impl/StepPhase.java @@ -7,12 +7,12 @@ import tech.illuin.pipeline.PipelineResult; import tech.illuin.pipeline.context.ComponentContext; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.execution.phase.IO; import tech.illuin.pipeline.execution.phase.PipelinePhase; import tech.illuin.pipeline.execution.phase.PipelineStrategy; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; -import tech.illuin.pipeline.metering.PipelineStepMetrics; import tech.illuin.pipeline.metering.manager.ObservabilityManager; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.ComponentFamily; @@ -20,6 +20,8 @@ import tech.illuin.pipeline.output.PipelineTag; import tech.illuin.pipeline.step.builder.StepDescriptor; import tech.illuin.pipeline.step.execution.evaluator.StepStrategy; +import tech.illuin.pipeline.step.metering.StepMarkerManager; +import tech.illuin.pipeline.step.metering.StepMetrics; import tech.illuin.pipeline.step.result.MultiResult; import tech.illuin.pipeline.step.result.Result; import tech.illuin.pipeline.step.result.ResultDescriptor; @@ -60,7 +62,8 @@ public PipelineStrategy run(IO io, Context context, MetricTags metricTags) th STEP_LOOP: for (StepDescriptor step : this.steps) { ComponentTag tag = this.createTag(io.output().tag(), step); - PipelineStepMetrics metrics = new PipelineStepMetrics(this.observabilityManager.meterRegistry(), tag, metricTags); + StepMarkerManager markerManager = new StepMarkerManager(tag, metricTags); + StepMetrics metrics = new StepMetrics(this.observabilityManager.meterRegistry(), markerManager); span.event("step_phase:select_step:" + tag.id()); /* Arguments are a list of Indexable which satisfy the step's execution predicate */ @@ -73,14 +76,15 @@ public PipelineStrategy run(IO io, Context context, MetricTags metricTags) th /* For each argument we perform the step and register the produced Result */ for (Indexable indexed : arguments) { - Result result = this.runStep(step, tag, indexed, io, context, metrics); + LocalContext localContext = new ComponentContext(context, io.input(), tag, this.uidGenerator, this.observabilityManager, markerManager); + Result result = this.runStep(step, tag, indexed, io, localContext, metrics); if (result instanceof MultiResult multi) multi.results().forEach(r -> metrics.resultCounter(r).increment()); else metrics.resultCounter(result).increment(); - StepStrategy strategy = step.postEvaluation(result, indexed, io.input(), context); + StepStrategy strategy = step.postEvaluation(result, indexed, io.input(), localContext); logger.trace("{}#{} received {} signal after step {} over argument {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), strategy, tag.id(), indexed.uid()); span.event("step_phase:evaluate_strategy:" + strategy.name()); @@ -124,9 +128,8 @@ else if (result instanceof MultiResult mResult) } @SuppressWarnings("IllegalCatch") - private Result runStep(StepDescriptor step, ComponentTag tag, Indexable indexed, IO io, Context context, PipelineStepMetrics metrics) throws Exception + private Result runStep(StepDescriptor step, ComponentTag tag, Indexable indexed, IO io, LocalContext context, StepMetrics metrics) throws Exception { - ComponentContext componentContext = wrapContext(io.input(), context, tag.pipelineTag(), tag); String name = getPrintableName(step); long start = System.nanoTime(); @@ -138,7 +141,7 @@ private Result runStep(StepDescriptor step, ComponentTag tag, Inde span.event("step:run"); logger.trace("{}#{} running step {} over argument {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), name, indexed.uid()); - Result result = step.execute(indexed, io.input(), io.output(), componentContext); + Result result = step.execute(indexed, io.input(), io.output(), context); span.tag("result_type", result.getClass().getName()); metrics.successCounter().increment(); @@ -150,7 +153,7 @@ private Result runStep(StepDescriptor step, ComponentTag tag, Inde logger.error("{}#{} step {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), name, e.getClass().getName(), e.getMessage()); metrics.failureCounter().increment(); metrics.errorCounter(e).increment(); - return step.handleException(e, io.input(), io.output().payload(), io.output().results(), componentContext); + return step.handleException(e, io.input(), io.output().payload(), io.output().results(), context); } finally { metrics.runTimer().record(System.nanoTime() - start, TimeUnit.NANOSECONDS); @@ -165,11 +168,6 @@ private ComponentTag createTag(PipelineTag pipelineTag, StepDescriptor ste return new ComponentTag(this.uidGenerator.generate(), pipelineTag, step.id(), ComponentFamily.STEP); } - private ComponentContext wrapContext(I input, Context context, PipelineTag pipelineTag, ComponentTag componentTag) - { - return new ComponentContext(context, input, pipelineTag, componentTag, this.uidGenerator); - } - private static String getPrintableName(StepDescriptor step) { return step.id() + (step.isPinned() ? " (pinned)" : ""); diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/Initializer.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/Initializer.java index 0ed2c316..733ef161 100644 --- a/core/src/main/java/tech/illuin/pipeline/input/initializer/Initializer.java +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/Initializer.java @@ -1,7 +1,7 @@ package tech.illuin.pipeline.input.initializer; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.initializer.runner.InitializerRunner; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.output.Output; @@ -14,9 +14,9 @@ @FunctionalInterface public interface Initializer { - Object initialize(I input, Context context, UIDGenerator generator) throws Exception; + Object initialize(I input, LocalContext context, UIDGenerator generator) throws Exception; - static

Object initializeFromParentOr(Context context, Supplier

or) + static

Object initializeFromParentOr(LocalContext context, Supplier

or) { return context.parent().map(Output::payload).orElseGet(or); } diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerDescriptor.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerDescriptor.java index 1e157f13..928146e2 100644 --- a/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerDescriptor.java +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerDescriptor.java @@ -1,6 +1,6 @@ package tech.illuin.pipeline.input.initializer.builder; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.initializer.Initializer; import tech.illuin.pipeline.input.initializer.execution.error.InitializerErrorHandler; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; @@ -24,12 +24,12 @@ public final class InitializerDescriptor this.errorHandler = errorHandler; } - public Object execute(I input, Context ctx, UIDGenerator generator) throws Exception + public Object execute(I input, LocalContext ctx, UIDGenerator generator) throws Exception { return this.initializer.initialize(input, ctx, generator); } - public Object handleException(Exception ex, Context ctx, UIDGenerator generator) throws Exception + public Object handleException(Exception ex, LocalContext ctx, UIDGenerator generator) throws Exception { return this.errorHandler.handle(ex, ctx, generator); } diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerMethodArgumentResolver.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerMethodArgumentResolver.java index 1dc89f8f..c4e338fd 100644 --- a/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerMethodArgumentResolver.java +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/builder/InitializerMethodArgumentResolver.java @@ -30,7 +30,9 @@ class InitializerMethodArgumentResolver implements MethodArgumentResolver< new ContextKeyMapperFactory<>(), new UIDGeneratorMapperFactory<>(), new PipelineTagMapperFactory<>(), - new ComponentTagMapperFactory<>() + new ComponentTagMapperFactory<>(), + new ObservabilityManagerMapperFactory<>(), + new MarkerManagerMapperFactory<>() ); } diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/execution/error/InitializerErrorHandler.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/execution/error/InitializerErrorHandler.java index 40c2794d..894f34f3 100644 --- a/core/src/main/java/tech/illuin/pipeline/input/initializer/execution/error/InitializerErrorHandler.java +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/execution/error/InitializerErrorHandler.java @@ -1,9 +1,8 @@ package tech.illuin.pipeline.input.initializer.execution.error; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; -import tech.illuin.pipeline.sink.execution.error.SinkErrorHandler; import java.util.Arrays; import java.util.Collection; @@ -16,7 +15,7 @@ @FunctionalInterface public interface InitializerErrorHandler { - Indexable handle(Exception exception, Context context, UIDGenerator generator) throws Exception; + Indexable handle(Exception exception, LocalContext context, UIDGenerator generator) throws Exception; InitializerErrorHandler RETHROW_ALL = (ex, ctx, gen) -> { throw ex; diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMarkerManager.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMarkerManager.java new file mode 100644 index 00000000..cbf28525 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMarkerManager.java @@ -0,0 +1,44 @@ +package tech.illuin.pipeline.input.initializer.metering; + +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.tag.MetricTags; +import tech.illuin.pipeline.output.ComponentTag; + +import java.util.Map; +import java.util.Set; + +import static tech.illuin.pipeline.metering.MetricFunctions.combine; + +public class InitializationMarkerManager implements MarkerManager +{ + private final ComponentTag tag; + private final MetricTags metricTags; + + public InitializationMarkerManager(ComponentTag tag, MetricTags metricTags) + { + this.tag = tag; + this.metricTags = metricTags; + } + + @Override + public Set discriminants() + { + return Set.of( + Tag.of("pipeline", this.tag.pipelineTag().pipeline()), + Tag.of("initializer", this.tag.id()) + ); + } + + @Override + public Set tags() + { + return combine(this.discriminants(), this.metricTags.asTags()); + } + + @Override + public Map markers() + { + return Map.of("initializer", this.tag.id()); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMetrics.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMetrics.java new file mode 100644 index 00000000..bd0bef4c --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/metering/InitializationMetrics.java @@ -0,0 +1,52 @@ +package tech.illuin.pipeline.input.initializer.metering; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.BaseMetrics; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.mdc.MDCManager; + +import java.util.Collection; + +import static tech.illuin.pipeline.metering.MeterRegistryKey.*; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class InitializationMetrics extends BaseMetrics +{ + public InitializationMetrics(MeterRegistry meterRegistry, InitializationMarkerManager markerManager) + { + super(meterRegistry, markerManager); + } + + public InitializationMetrics(MeterRegistry meterRegistry, InitializationMarkerManager markerManager, MDCManager mdc) + { + super(meterRegistry, markerManager, mdc); + } + + @Override + protected ConstantMeters initializeConstantMeters(MeterRegistry meterRegistry, MarkerManager markerManager) + { + Collection meterTags = this.markerManager.tags(); + return new ConstantMeters( + meterRegistry.timer(PIPELINE_INITIALIZATION_RUN_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_KEY, meterTags)), + meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_TOTAL_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_TOTAL_KEY, meterTags)), + meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_SUCCESS_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_SUCCESS_KEY, meterTags)), + meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_FAILURE_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_FAILURE_KEY, meterTags)) + ); + } + + @Override + public Counter errorCounter(Exception exception) + { + return this.meterRegistry.counter( + PIPELINE_INITIALIZATION_ERROR_TOTAL_KEY.id(), + fill( + PIPELINE_INITIALIZATION_ERROR_TOTAL_KEY, + this.markerManager.tags(Tag.of("error", exception.getClass().getName())) + ) + ); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/input/initializer/runner/InitializerRunner.java b/core/src/main/java/tech/illuin/pipeline/input/initializer/runner/InitializerRunner.java index 18cd90c4..37afb033 100644 --- a/core/src/main/java/tech/illuin/pipeline/input/initializer/runner/InitializerRunner.java +++ b/core/src/main/java/tech/illuin/pipeline/input/initializer/runner/InitializerRunner.java @@ -6,7 +6,6 @@ import tech.illuin.pipeline.builder.runner_compiler.argument_resolver.mapper_factory.MethodArgumentMapper; import tech.illuin.pipeline.builder.runner_compiler.argument_resolver.method_arguments.MethodArguments; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.initializer.Initializer; import tech.illuin.pipeline.input.initializer.annotation.InitializerConfig; @@ -37,12 +36,9 @@ public InitializerRunner(Object target) } @Override - public Object initialize(I input, Context context, UIDGenerator generator) throws Exception + public Object initialize(I input, LocalContext context, UIDGenerator generator) throws Exception { - if (!(context instanceof LocalContext localContext)) - throw new IllegalArgumentException("Invalid context provided to an InitializerRunner instance"); - - logger.trace("{}#{} launching initializer over target {}#{}", localContext.pipelineTag().pipeline(), localContext.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); + logger.trace("{}#{} launching initializer over target {}#{}", context.pipelineTag().pipeline(), context.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); try { MethodArguments originalArguments = new MethodArguments<>( @@ -52,10 +48,12 @@ public Object initialize(I input, Context context, UIDGenerator generator) throw null, null, null, - localContext, - localContext.pipelineTag(), - localContext.componentTag(), - generator + context, + context.pipelineTag(), + context.componentTag(), + generator, + context.observabilityManager(), + context.markerManager() ); Object[] arguments = this.argumentMappers.stream() .map(mapper -> mapper.map(originalArguments)) @@ -69,13 +67,13 @@ public Object initialize(I input, Context context, UIDGenerator generator) throw throw (Exception) e.getTargetException(); throw new StepRunnerException( - "The target method " + this.method.getName() + " of initializer runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of initializer runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " has thrown an unexpected exception of type " + e.getTargetException().getClass().getName(), e.getTargetException() ); } catch (IllegalAccessException e) { throw new StepRunnerException( - "The target method " + this.method.getName() + " of initializer runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of initializer runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " unexpectedly has illegal access", e ); } diff --git a/core/src/main/java/tech/illuin/pipeline/metering/BaseMetrics.java b/core/src/main/java/tech/illuin/pipeline/metering/BaseMetrics.java new file mode 100644 index 00000000..6873938d --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/metering/BaseMetrics.java @@ -0,0 +1,81 @@ +package tech.illuin.pipeline.metering; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import tech.illuin.pipeline.metering.mdc.DefaultMDCManager; +import tech.illuin.pipeline.metering.mdc.MDCManager; + +import java.util.stream.Stream; + +public abstract class BaseMetrics implements Metrics +{ + protected final MeterRegistry meterRegistry; + protected final MarkerManager markerManager; + protected final MDCManager mdc; + private final ConstantMeters constantMeters; + + public BaseMetrics(MeterRegistry meterRegistry, MarkerManager markerManager) + { + this(meterRegistry, markerManager, new DefaultMDCManager()); + } + + public BaseMetrics(MeterRegistry meterRegistry, MarkerManager markerManager, MDCManager mdc) + { + this.meterRegistry = meterRegistry; + this.markerManager = markerManager; + this.mdc = mdc; + this.constantMeters = this.initializeConstantMeters(meterRegistry, markerManager); + } + + public final void setMDC() + { + this.markerManager.markers().forEach(this.mdc::put); + } + + public final void setMDC(Exception exception) + { + this.mdc.put("error", exception.getClass().getName()); + } + + public final void unsetMDC() + { + Stream.concat( + this.markerManager.markers().keySet().stream(), + Stream.of("error") + ).forEach(this.mdc::remove); + } + + protected abstract ConstantMeters initializeConstantMeters(MeterRegistry meterRegistry, MarkerManager markerManager); + + @Override + public Timer runTimer() + { + return this.constantMeters.runTimer(); + } + + @Override + public Counter totalCounter() + { + return this.constantMeters.totalCounter(); + } + + @Override + public Counter successCounter() + { + return this.constantMeters.successCounter(); + } + + @Override + public Counter failureCounter() + { + return this.constantMeters.failureCounter(); + } + + public record ConstantMeters( + Timer runTimer, + Counter totalCounter, + Counter successCounter, + Counter failureCounter + ) {} +} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/MarkerManager.java b/core/src/main/java/tech/illuin/pipeline/metering/MarkerManager.java new file mode 100644 index 00000000..1fb18b1d --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/metering/MarkerManager.java @@ -0,0 +1,25 @@ +package tech.illuin.pipeline.metering; + +import io.micrometer.core.instrument.Tag; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public interface MarkerManager +{ + Set discriminants(); + + Set tags(); + + default Set tags(Tag... tags) + { + return Stream.concat( + this.tags().stream(), + Stream.of(tags) + ).collect(Collectors.toSet()); + } + + Map markers(); +} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/MeterRegistryKey.java b/core/src/main/java/tech/illuin/pipeline/metering/MeterRegistryKey.java index a42714cb..340599a6 100644 --- a/core/src/main/java/tech/illuin/pipeline/metering/MeterRegistryKey.java +++ b/core/src/main/java/tech/illuin/pipeline/metering/MeterRegistryKey.java @@ -63,7 +63,7 @@ public String id() * This circumvents an issue with the prometheus client as described here: *

* * @param key the metric key to which tags will be applied diff --git a/core/src/main/java/tech/illuin/pipeline/metering/MetricFunctions.java b/core/src/main/java/tech/illuin/pipeline/metering/MetricFunctions.java index d9d82b5a..9c51b203 100644 --- a/core/src/main/java/tech/illuin/pipeline/metering/MetricFunctions.java +++ b/core/src/main/java/tech/illuin/pipeline/metering/MetricFunctions.java @@ -20,9 +20,9 @@ private MetricFunctions() {} * @param tags the generated tags * @return the resulting tag collection */ - public static Collection combine(Collection mainstayTags, Collection tags) + public static Set combine(Collection mainstayTags, Collection tags) { - List combined = new ArrayList<>(mainstayTags); + Set combined = new HashSet<>(mainstayTags); Set reservedKeys = mainstayTags.stream().map(Tag::getKey).collect(Collectors.toSet()); for (Tag tag : tags) { diff --git a/core/src/main/java/tech/illuin/pipeline/metering/Metrics.java b/core/src/main/java/tech/illuin/pipeline/metering/Metrics.java new file mode 100644 index 00000000..a85dff92 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/metering/Metrics.java @@ -0,0 +1,17 @@ +package tech.illuin.pipeline.metering; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; + +public interface Metrics +{ + Timer runTimer(); + + Counter totalCounter(); + + Counter successCounter(); + + Counter failureCounter(); + + Counter errorCounter(Exception exception); +} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/PipelineInitializationMetrics.java b/core/src/main/java/tech/illuin/pipeline/metering/PipelineInitializationMetrics.java deleted file mode 100644 index 73fd9052..00000000 --- a/core/src/main/java/tech/illuin/pipeline/metering/PipelineInitializationMetrics.java +++ /dev/null @@ -1,99 +0,0 @@ -package tech.illuin.pipeline.metering; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; -import tech.illuin.pipeline.metering.marker.DefaultMDCManager; -import tech.illuin.pipeline.metering.marker.MDCManager; -import tech.illuin.pipeline.metering.marker.MDCMarker; -import tech.illuin.pipeline.metering.tag.MetricTags; -import tech.illuin.pipeline.output.ComponentTag; - -import java.util.Collection; -import java.util.Map; - -import static tech.illuin.pipeline.metering.MeterRegistryKey.*; -import static tech.illuin.pipeline.metering.MetricFunctions.compileAndFillTags; -import static tech.illuin.pipeline.metering.MetricFunctions.compileTags; - -/** - * @author Pierre Lecerf (pierre.lecerf@illuin.tech) - */ -public class PipelineInitializationMetrics extends MDCMarker -{ - private final MeterRegistry meterRegistry; - private final ComponentTag tag; - private final MetricTags metricTags; - private final Timer runTimer; - private final Counter totalCounter; - private final Counter successCounter; - private final Counter failureCounter; - - public PipelineInitializationMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags) - { - this(meterRegistry, tag, metricTags, new DefaultMDCManager()); - } - - public PipelineInitializationMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags, MDCManager mdc) - { - super(mdc); - this.meterRegistry = meterRegistry; - this.tag = tag; - this.metricTags = metricTags; - Tag[] discriminants = computeDiscriminants(this.tag.pipelineTag().pipeline(), this.tag.id()); - Collection meterTags = compileTags(this.metricTags, discriminants); - this.runTimer = meterRegistry.timer(PIPELINE_INITIALIZATION_RUN_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_KEY, meterTags)); - this.totalCounter = meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_TOTAL_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_TOTAL_KEY, meterTags)); - this.successCounter = meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_SUCCESS_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_SUCCESS_KEY, meterTags)); - this.failureCounter = meterRegistry.counter(PIPELINE_INITIALIZATION_RUN_FAILURE_KEY.id(), fill(PIPELINE_INITIALIZATION_RUN_FAILURE_KEY, meterTags)); - } - - public Timer runTimer() - { - return this.runTimer; - } - - public Counter totalCounter() - { - return this.totalCounter; - } - - public Counter successCounter() - { - return this.successCounter; - } - - public Counter failureCounter() - { - return this.failureCounter; - } - - public Counter errorCounter(Exception exception) - { - return this.meterRegistry.counter( - PIPELINE_INITIALIZATION_ERROR_TOTAL_KEY.id(), - compileAndFillTags( - this.metricTags, - PIPELINE_INITIALIZATION_ERROR_TOTAL_KEY, - Tag.of("pipeline", this.tag.pipelineTag().pipeline()), - Tag.of("initializer", this.tag.id()), - Tag.of("error", exception.getClass().getName()) - ) - ); - } - - public static Tag[] computeDiscriminants(String pipeline, String identifier) - { - return new Tag[]{ - Tag.of("pipeline", pipeline), - Tag.of("initializer", identifier), - }; - } - - @Override - public Map compileMarkers() - { - return Map.of("initializer", this.tag.id()); - } -} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/PipelineMarkerManager.java b/core/src/main/java/tech/illuin/pipeline/metering/PipelineMarkerManager.java new file mode 100644 index 00000000..fd30b89f --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/metering/PipelineMarkerManager.java @@ -0,0 +1,52 @@ +package tech.illuin.pipeline.metering; + +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.tag.MetricTags; +import tech.illuin.pipeline.output.PipelineTag; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static tech.illuin.pipeline.metering.MetricFunctions.combine; + +public class PipelineMarkerManager implements MarkerManager +{ + private final PipelineTag tag; + private final MetricTags metricTags; + + public PipelineMarkerManager(PipelineTag tag, MetricTags metricTags) + { + this.tag = tag; + this.metricTags = metricTags; + } + + @Override + public Set discriminants() + { + return Set.of( + Tag.of("pipeline", this.tag.pipeline()) + ); + } + + @Override + public Set tags() + { + return combine(this.discriminants(), this.metricTags.asTags()); + } + + @Override + public Map markers() + { + Map markers = new HashMap<>(); + markers.put("pipeline", this.tag.pipeline()); + /* The author value can be null, thus Map.of cannot be used here */ + markers.put("author", this.tag.author()); + return MetricFunctions.compileMarkers( + this.metricTags, + markers, + emptyMap() + ); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/PipelineMetrics.java b/core/src/main/java/tech/illuin/pipeline/metering/PipelineMetrics.java index 7b8415a0..f518fa71 100644 --- a/core/src/main/java/tech/illuin/pipeline/metering/PipelineMetrics.java +++ b/core/src/main/java/tech/illuin/pipeline/metering/PipelineMetrics.java @@ -3,103 +3,47 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; -import tech.illuin.pipeline.metering.marker.DefaultMDCManager; -import tech.illuin.pipeline.metering.marker.MDCManager; -import tech.illuin.pipeline.metering.marker.MDCMarker; -import tech.illuin.pipeline.metering.tag.MetricTags; -import tech.illuin.pipeline.output.PipelineTag; +import tech.illuin.pipeline.metering.mdc.MDCManager; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import static java.util.Collections.emptyMap; import static tech.illuin.pipeline.metering.MeterRegistryKey.*; -import static tech.illuin.pipeline.metering.MetricFunctions.compileAndFillTags; -import static tech.illuin.pipeline.metering.MetricFunctions.compileTags; /** * @author Pierre Lecerf (pierre.lecerf@illuin.tech) */ -public class PipelineMetrics extends MDCMarker +public class PipelineMetrics extends BaseMetrics { - private final MeterRegistry meterRegistry; - private final PipelineTag tag; - private final MetricTags metricTags; - private final Timer runTimer; - private final Counter totalCounter; - private final Counter successCounter; - private final Counter failureCounter; - - public PipelineMetrics(MeterRegistry meterRegistry, PipelineTag tag, MetricTags metricTags) - { - this(meterRegistry, tag, metricTags, new DefaultMDCManager()); - } - - public PipelineMetrics(MeterRegistry meterRegistry, PipelineTag tag, MetricTags metricTags, MDCManager mdc) + public PipelineMetrics(MeterRegistry meterRegistry, PipelineMarkerManager markerManager) { - super(mdc); - this.meterRegistry = meterRegistry; - this.tag = tag; - this.metricTags = metricTags; - Tag[] discriminants = computeDiscriminants(tag.pipeline()); - Collection meterTags = compileTags(this.metricTags, discriminants); - this.runTimer = meterRegistry.timer(PIPELINE_RUN_KEY.id(), fill(PIPELINE_RUN_KEY, meterTags)); - this.totalCounter = meterRegistry.counter(PIPELINE_RUN_TOTAL_KEY.id(), fill(PIPELINE_RUN_TOTAL_KEY, meterTags)); - this.successCounter = meterRegistry.counter(PIPELINE_RUN_SUCCESS_KEY.id(), fill(PIPELINE_RUN_SUCCESS_KEY, meterTags)); - this.failureCounter = meterRegistry.counter(PIPELINE_RUN_FAILURE_KEY.id(), fill(PIPELINE_RUN_FAILURE_KEY, meterTags)); + super(meterRegistry, markerManager); } - public Timer runTimer() + public PipelineMetrics(MeterRegistry meterRegistry, PipelineMarkerManager markerManager, MDCManager mdc) { - return this.runTimer; + super(meterRegistry, markerManager, mdc); } - public Counter totalCounter() - { - return this.totalCounter; - } - - public Counter successCounter() - { - return this.successCounter; - } - - public Counter failureCounter() - { - return this.failureCounter; + @Override + protected ConstantMeters initializeConstantMeters(MeterRegistry meterRegistry, MarkerManager markerManager) + { + Collection meterTags = this.markerManager.tags(); + return new ConstantMeters( + meterRegistry.timer(PIPELINE_RUN_KEY.id(), fill(PIPELINE_RUN_KEY, meterTags)), + meterRegistry.counter(PIPELINE_RUN_TOTAL_KEY.id(), fill(PIPELINE_RUN_TOTAL_KEY, meterTags)), + meterRegistry.counter(PIPELINE_RUN_SUCCESS_KEY.id(), fill(PIPELINE_RUN_SUCCESS_KEY, meterTags)), + meterRegistry.counter(PIPELINE_RUN_FAILURE_KEY.id(), fill(PIPELINE_RUN_FAILURE_KEY, meterTags)) + ); } public Counter errorCounter(Exception exception) { return this.meterRegistry.counter( PIPELINE_RUN_ERROR_TOTAL_KEY.id(), - compileAndFillTags( - this.metricTags, + fill( PIPELINE_RUN_ERROR_TOTAL_KEY, - Tag.of("pipeline", this.tag.pipeline()), - Tag.of("error", exception.getClass().getName()) + this.markerManager.tags(Tag.of("error", exception.getClass().getName())) ) ); } - - public static Tag[] computeDiscriminants(String identifier) - { - return new Tag[]{Tag.of("pipeline", identifier)}; - } - - @Override - public Map compileMarkers() - { - Map markers = new HashMap<>(); - markers.put("pipeline", this.tag.pipeline()); - /* The author value can be null, thus Map.of cannot be used here */ - markers.put("author", this.tag.author()); - return MetricFunctions.compileMarkers( - this.metricTags, - markers, - emptyMap() - ); - } } diff --git a/core/src/main/java/tech/illuin/pipeline/metering/PipelineSinkMetrics.java b/core/src/main/java/tech/illuin/pipeline/metering/PipelineSinkMetrics.java deleted file mode 100644 index fa533c34..00000000 --- a/core/src/main/java/tech/illuin/pipeline/metering/PipelineSinkMetrics.java +++ /dev/null @@ -1,99 +0,0 @@ -package tech.illuin.pipeline.metering; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; -import tech.illuin.pipeline.metering.marker.DefaultMDCManager; -import tech.illuin.pipeline.metering.marker.MDCManager; -import tech.illuin.pipeline.metering.marker.MDCMarker; -import tech.illuin.pipeline.metering.tag.MetricTags; -import tech.illuin.pipeline.output.ComponentTag; - -import java.util.Collection; -import java.util.Map; - -import static tech.illuin.pipeline.metering.MeterRegistryKey.*; -import static tech.illuin.pipeline.metering.MetricFunctions.compileAndFillTags; -import static tech.illuin.pipeline.metering.MetricFunctions.compileTags; - -/** - * @author Pierre Lecerf (pierre.lecerf@illuin.tech) - */ -public class PipelineSinkMetrics extends MDCMarker -{ - private final MeterRegistry meterRegistry; - private final ComponentTag tag; - private final MetricTags metricTags; - private final Timer runTimer; - private final Counter totalCounter; - private final Counter successCounter; - private final Counter failureCounter; - - public PipelineSinkMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags) - { - this(meterRegistry, tag, metricTags, new DefaultMDCManager()); - } - - public PipelineSinkMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags, MDCManager mdc) - { - super(mdc); - this.tag = tag; - this.meterRegistry = meterRegistry; - this.metricTags = metricTags; - Tag[] discriminants = computeDiscriminants(this.tag.pipelineTag().pipeline(), this.tag.id()); - Collection meterTags = compileTags(this.metricTags, discriminants); - this.runTimer = meterRegistry.timer(PIPELINE_SINK_RUN_KEY.id(), fill(PIPELINE_SINK_RUN_KEY, meterTags)); - this.totalCounter = meterRegistry.counter(PIPELINE_SINK_RUN_TOTAL_KEY.id(), fill(PIPELINE_SINK_RUN_TOTAL_KEY, meterTags)); - this.successCounter = meterRegistry.counter(PIPELINE_SINK_RUN_SUCCESS_KEY.id(), fill(PIPELINE_SINK_RUN_SUCCESS_KEY, meterTags)); - this.failureCounter = meterRegistry.counter(PIPELINE_SINK_RUN_FAILURE_KEY.id(), fill(PIPELINE_SINK_RUN_FAILURE_KEY, meterTags)); - } - - public Timer runTimer() - { - return this.runTimer; - } - - public Counter totalCounter() - { - return this.totalCounter; - } - - public Counter successCounter() - { - return this.successCounter; - } - - public Counter failureCounter() - { - return this.failureCounter; - } - - public Counter errorCounter(Exception exception) - { - return this.meterRegistry.counter( - PIPELINE_SINK_ERROR_TOTAL_KEY.id(), - compileAndFillTags( - this.metricTags, - PIPELINE_SINK_ERROR_TOTAL_KEY, - Tag.of("pipeline", this.tag.pipelineTag().pipeline()), - Tag.of("sink", this.tag.id()), - Tag.of("error", exception.getClass().getName()) - ) - ); - } - - public static Tag[] computeDiscriminants(String pipeline, String identifier) - { - return new Tag[]{ - Tag.of("pipeline", pipeline), - Tag.of("sink", identifier), - }; - } - - @Override - public Map compileMarkers() - { - return Map.of("sink", this.tag.id()); - } -} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/PipelineStepMetrics.java b/core/src/main/java/tech/illuin/pipeline/metering/PipelineStepMetrics.java deleted file mode 100644 index ada8d298..00000000 --- a/core/src/main/java/tech/illuin/pipeline/metering/PipelineStepMetrics.java +++ /dev/null @@ -1,114 +0,0 @@ -package tech.illuin.pipeline.metering; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; -import tech.illuin.pipeline.metering.marker.DefaultMDCManager; -import tech.illuin.pipeline.metering.marker.MDCManager; -import tech.illuin.pipeline.metering.marker.MDCMarker; -import tech.illuin.pipeline.metering.tag.MetricTags; -import tech.illuin.pipeline.output.ComponentTag; -import tech.illuin.pipeline.step.result.Result; - -import java.util.Collection; -import java.util.Map; - -import static tech.illuin.pipeline.metering.MeterRegistryKey.*; -import static tech.illuin.pipeline.metering.MetricFunctions.compileAndFillTags; -import static tech.illuin.pipeline.metering.MetricFunctions.compileTags; - -/** - * @author Pierre Lecerf (pierre.lecerf@illuin.tech) - */ -public class PipelineStepMetrics extends MDCMarker -{ - private final MeterRegistry meterRegistry; - private final ComponentTag tag; - private final MetricTags metricTags; - private final Timer runTimer; - private final Counter totalCounter; - private final Counter successCounter; - private final Counter failureCounter; - - public PipelineStepMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags) - { - this(meterRegistry, tag, metricTags, new DefaultMDCManager()); - } - - public PipelineStepMetrics(MeterRegistry meterRegistry, ComponentTag tag, MetricTags metricTags, MDCManager mdc) - { - super(mdc); - this.meterRegistry = meterRegistry; - this.metricTags = metricTags; - this.tag = tag; - Tag[] discriminants = computeDiscriminants(this.tag.pipelineTag().pipeline(), this.tag.id()); - Collection meterTags = compileTags(this.metricTags, discriminants); - this.runTimer = meterRegistry.timer(PIPELINE_STEP_RUN_KEY.id(), fill(PIPELINE_STEP_RUN_KEY, meterTags)); - this.totalCounter = meterRegistry.counter(PIPELINE_STEP_RUN_TOTAL_KEY.id(), fill(PIPELINE_STEP_RUN_TOTAL_KEY, meterTags)); - this.successCounter = meterRegistry.counter(PIPELINE_STEP_RUN_SUCCESS_KEY.id(), fill(PIPELINE_STEP_RUN_SUCCESS_KEY, meterTags)); - this.failureCounter = meterRegistry.counter(PIPELINE_STEP_RUN_FAILURE_KEY.id(), fill(PIPELINE_STEP_RUN_FAILURE_KEY, meterTags)); - } - - public Timer runTimer() - { - return this.runTimer; - } - - public Counter totalCounter() - { - return this.totalCounter; - } - - public Counter successCounter() - { - return this.successCounter; - } - - public Counter failureCounter() - { - return this.failureCounter; - } - - public Counter resultCounter(Result result) - { - return this.meterRegistry.counter( - PIPELINE_STEP_RESULT_TOTAL_KEY.id(), - compileAndFillTags( - this.metricTags, - PIPELINE_STEP_RESULT_TOTAL_KEY, - Tag.of("pipeline", this.tag.pipelineTag().pipeline()), - Tag.of("step", this.tag.id()), - Tag.of("result", result.name()) - ) - ); - } - - public Counter errorCounter(Exception exception) - { - return this.meterRegistry.counter( - PIPELINE_STEP_ERROR_TOTAL_KEY.id(), - compileAndFillTags( - this.metricTags, - PIPELINE_STEP_ERROR_TOTAL_KEY, - Tag.of("pipeline", this.tag.pipelineTag().pipeline()), - Tag.of("step", this.tag.id()), - Tag.of("error", exception.getClass().getName()) - ) - ); - } - - public static Tag[] computeDiscriminants(String pipeline, String identifier) - { - return new Tag[]{ - Tag.of("pipeline", pipeline), - Tag.of("step", identifier), - }; - } - - @Override - public Map compileMarkers() - { - return Map.of("step", this.tag.id()); - } -} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/marker/MDCMarker.java b/core/src/main/java/tech/illuin/pipeline/metering/marker/MDCMarker.java deleted file mode 100644 index 31c62c9e..00000000 --- a/core/src/main/java/tech/illuin/pipeline/metering/marker/MDCMarker.java +++ /dev/null @@ -1,34 +0,0 @@ -package tech.illuin.pipeline.metering.marker; - -import java.util.Map; -import java.util.stream.Stream; - -public abstract class MDCMarker -{ - private final MDCManager mdc; - - protected MDCMarker(MDCManager mdc) - { - this.mdc = mdc; - } - - public abstract Map compileMarkers(); - - public final void setMDC() - { - this.compileMarkers().forEach(this.mdc::put); - } - - public final void setMDC(Exception exception) - { - this.mdc.put("error", exception.getClass().getName()); - } - - public final void unsetMDC() - { - Stream.concat( - this.compileMarkers().keySet().stream(), - Stream.of("error") - ).forEach(this.mdc::remove); - } -} diff --git a/core/src/main/java/tech/illuin/pipeline/metering/marker/DefaultMDCManager.java b/core/src/main/java/tech/illuin/pipeline/metering/mdc/DefaultMDCManager.java similarity index 87% rename from core/src/main/java/tech/illuin/pipeline/metering/marker/DefaultMDCManager.java rename to core/src/main/java/tech/illuin/pipeline/metering/mdc/DefaultMDCManager.java index 799e61d0..d4027859 100644 --- a/core/src/main/java/tech/illuin/pipeline/metering/marker/DefaultMDCManager.java +++ b/core/src/main/java/tech/illuin/pipeline/metering/mdc/DefaultMDCManager.java @@ -1,4 +1,4 @@ -package tech.illuin.pipeline.metering.marker; +package tech.illuin.pipeline.metering.mdc; import org.slf4j.MDC; diff --git a/core/src/main/java/tech/illuin/pipeline/metering/marker/MDCManager.java b/core/src/main/java/tech/illuin/pipeline/metering/mdc/MDCManager.java similarity index 78% rename from core/src/main/java/tech/illuin/pipeline/metering/marker/MDCManager.java rename to core/src/main/java/tech/illuin/pipeline/metering/mdc/MDCManager.java index f1b4bc64..7cc15911 100644 --- a/core/src/main/java/tech/illuin/pipeline/metering/marker/MDCManager.java +++ b/core/src/main/java/tech/illuin/pipeline/metering/mdc/MDCManager.java @@ -1,4 +1,4 @@ -package tech.illuin.pipeline.metering.marker; +package tech.illuin.pipeline.metering.mdc; public interface MDCManager { diff --git a/core/src/main/java/tech/illuin/pipeline/observer/descriptor/DescriptorObserver.java b/core/src/main/java/tech/illuin/pipeline/observer/descriptor/DescriptorObserver.java index e7c83ba7..7059e497 100644 --- a/core/src/main/java/tech/illuin/pipeline/observer/descriptor/DescriptorObserver.java +++ b/core/src/main/java/tech/illuin/pipeline/observer/descriptor/DescriptorObserver.java @@ -2,28 +2,38 @@ import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.Meter.Id; -import io.micrometer.core.instrument.Timer; import tech.illuin.pipeline.close.OnCloseHandler; import tech.illuin.pipeline.execution.error.PipelineErrorHandler; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor; -import tech.illuin.pipeline.metering.*; -import tech.illuin.pipeline.metering.manager.ObservabilityManager; +import tech.illuin.pipeline.input.initializer.metering.InitializationMarkerManager; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.MeterRegistryKey; +import tech.illuin.pipeline.metering.PipelineMarkerManager; +import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.observer.Observer; import tech.illuin.pipeline.observer.descriptor.describable.DefaultDescribable; import tech.illuin.pipeline.observer.descriptor.describable.Describable; import tech.illuin.pipeline.observer.descriptor.describable.Description; import tech.illuin.pipeline.observer.descriptor.model.*; +import tech.illuin.pipeline.output.ComponentFamily; +import tech.illuin.pipeline.output.ComponentTag; +import tech.illuin.pipeline.output.PipelineTag; import tech.illuin.pipeline.sink.builder.SinkDescriptor; +import tech.illuin.pipeline.sink.metering.SinkMarkerManager; import tech.illuin.pipeline.step.builder.StepDescriptor; +import tech.illuin.pipeline.step.metering.StepMarkerManager; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Arrays.asList; import static tech.illuin.pipeline.metering.MeterRegistryKey.*; +import static tech.illuin.pipeline.output.ComponentFamily.*; public class DescriptorObserver implements Observer { @@ -39,24 +49,27 @@ public void init( List onCloseHandlers, MeterRegistry meterRegistry ) { - this.supplier = () -> new PipelineDescription( - id, - createInitializer(id, initializer, meterRegistry), - createSteps(id, steps, meterRegistry), - createSinks(id, sinks, meterRegistry), - compileMetrics( - meterRegistry, - asList(PipelineMetrics.computeDiscriminants(id)), - PIPELINE_RUN_KEY, - PIPELINE_RUN_TOTAL_KEY, - PIPELINE_RUN_SUCCESS_KEY, - PIPELINE_RUN_FAILURE_KEY, - PIPELINE_RUN_ERROR_TOTAL_KEY - ) - ); + this.supplier = () -> { + PipelineTag simulated = new PipelineTag(null, id, null); + return new PipelineDescription( + id, + createInitializer(simulated, initializer, meterRegistry), + createSteps(simulated, steps, meterRegistry), + createSinks(simulated, sinks, meterRegistry), + compileMetrics( + meterRegistry, + new PipelineMarkerManager(simulated, new MetricTags()).discriminants(), + PIPELINE_RUN_KEY, + PIPELINE_RUN_TOTAL_KEY, + PIPELINE_RUN_SUCCESS_KEY, + PIPELINE_RUN_FAILURE_KEY, + PIPELINE_RUN_ERROR_TOTAL_KEY + ) + ); + }; } - private static InitializerDescription createInitializer(String pipelineId, InitializerDescriptor descriptor, MeterRegistry meterRegistry) + private static InitializerDescription createInitializer(PipelineTag tag, InitializerDescriptor descriptor, MeterRegistry meterRegistry) { return new InitializerDescription( descriptor.id(), @@ -64,7 +77,7 @@ private static InitializerDescription createInitializer(String pipelineId, Initi compileDescription(descriptor.errorHandler()), compileMetrics( meterRegistry, - asList(PipelineInitializationMetrics.computeDiscriminants(pipelineId, descriptor.id())), + createMarkerManager(tag, descriptor.id(), INITIALIZER).discriminants(), PIPELINE_INITIALIZATION_RUN_KEY, PIPELINE_INITIALIZATION_RUN_TOTAL_KEY, PIPELINE_INITIALIZATION_RUN_SUCCESS_KEY, @@ -74,7 +87,7 @@ private static InitializerDescription createInitializer(String pipelineId, Initi ); } - private static List createSteps(String pipelineId, List> descriptors, MeterRegistry meterRegistry) + private static List createSteps(PipelineTag tag, List> descriptors, MeterRegistry meterRegistry) { return descriptors.stream() .map(sd -> new StepDescription( @@ -87,7 +100,7 @@ private static List createSteps(String pipelineId, List List createSteps(String pipelineId, List createSinks(String pipelineId, List descriptors, MeterRegistry meterRegistry) + private static List createSinks(PipelineTag tag, List descriptors, MeterRegistry meterRegistry) { return descriptors.stream() .map(sd -> new SinkDescription( @@ -110,7 +123,7 @@ private static List createSinks(String pipelineId, List createSinks(String pipelineId, List new InitializationMarkerManager(new ComponentTag(null, tag, componentId, family), new MetricTags()); + case STEP -> new StepMarkerManager(new ComponentTag(null, tag, componentId, family), new MetricTags()); + case SINK -> new SinkMarkerManager(new ComponentTag(null, tag, componentId, family), new MetricTags()); + }; + } + private static Object compileDescription(Object property) { if (property instanceof Describable describable) @@ -130,7 +152,7 @@ private static Object compileDescription(Object property) return new DefaultDescribable(property).describe(); } - private static Map compileMetrics(MeterRegistry meterRegistry, List discriminants, MeterRegistryKey... keys) + private static Map compileMetrics(MeterRegistry meterRegistry, Collection discriminants, MeterRegistryKey... keys) { return Stream.of(keys).collect(Collectors.toMap( MeterRegistryKey::id, diff --git a/core/src/main/java/tech/illuin/pipeline/sink/Sink.java b/core/src/main/java/tech/illuin/pipeline/sink/Sink.java index aeaf5334..85b298fd 100644 --- a/core/src/main/java/tech/illuin/pipeline/sink/Sink.java +++ b/core/src/main/java/tech/illuin/pipeline/sink/Sink.java @@ -1,7 +1,7 @@ package tech.illuin.pipeline.sink; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.sink.runner.SinkRunner; @@ -10,7 +10,7 @@ */ public interface Sink { - void execute(Output output, Context context) throws Exception; + void execute(Output output, LocalContext context) throws Exception; default String defaultId() { diff --git a/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkDescriptor.java b/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkDescriptor.java index 510875cf..3136ebb4 100644 --- a/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkDescriptor.java +++ b/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkDescriptor.java @@ -1,6 +1,6 @@ package tech.illuin.pipeline.sink.builder; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.sink.Sink; import tech.illuin.pipeline.sink.execution.error.SinkErrorHandler; @@ -31,18 +31,18 @@ public final class SinkDescriptor this.errorHandler = errorHandler; } - public void execute(Output output, Context ctx) throws Exception + public void execute(Output output, LocalContext ctx) throws Exception { this.executionWrapper.wrap(this.sink).execute(output, ctx); } - public void handleException(Exception ex, Output output, Context ctx) throws Exception + public void handleException(Exception ex, Output output, LocalContext ctx) throws Exception { this.errorHandler.handle(ex, output, ctx); } @SuppressWarnings("IllegalCatch") - public void handleExceptionThenSwallow(Exception ex, Output output, Context ctx) + public void handleExceptionThenSwallow(Exception ex, Output output, LocalContext ctx) { try { this.handleException(ex, output, ctx); diff --git a/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkMethodArgumentResolver.java b/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkMethodArgumentResolver.java index 5e28c3c1..511e2159 100644 --- a/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkMethodArgumentResolver.java +++ b/core/src/main/java/tech/illuin/pipeline/sink/builder/SinkMethodArgumentResolver.java @@ -34,7 +34,9 @@ class SinkMethodArgumentResolver implements MethodArgumentResolver(), new UIDGeneratorMapperFactory<>(), new PipelineTagMapperFactory<>(), - new ComponentTagMapperFactory<>() + new ComponentTagMapperFactory<>(), + new ObservabilityManagerMapperFactory<>(), + new MarkerManagerMapperFactory<>() ); } diff --git a/core/src/main/java/tech/illuin/pipeline/sink/execution/error/SinkErrorHandler.java b/core/src/main/java/tech/illuin/pipeline/sink/execution/error/SinkErrorHandler.java index 893c8e33..e9912e6a 100644 --- a/core/src/main/java/tech/illuin/pipeline/sink/execution/error/SinkErrorHandler.java +++ b/core/src/main/java/tech/illuin/pipeline/sink/execution/error/SinkErrorHandler.java @@ -1,12 +1,10 @@ package tech.illuin.pipeline.sink.execution.error; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; -import tech.illuin.pipeline.step.execution.error.StepErrorHandler; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import static tech.illuin.pipeline.execution.error.PipelineErrorHandler.throwUnlessExcepted; @@ -16,16 +14,16 @@ @FunctionalInterface public interface SinkErrorHandler { - void handle(Exception exception, Output output, Context context) throws Exception; + void handle(Exception exception, Output output, LocalContext context) throws Exception; - SinkErrorHandler RETHROW_ALL = (Exception ex, Output output, Context ctx) -> { + SinkErrorHandler RETHROW_ALL = (Exception ex, Output output, LocalContext ctx) -> { throw ex; }; @SuppressWarnings("IllegalCatch") default SinkErrorHandler andThen(SinkErrorHandler nextErrorHandler) { - return (Exception exception, Output output, Context context) -> { + return (Exception exception, Output output, LocalContext context) -> { try { handle(exception, output, context); } diff --git a/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMarkerManager.java b/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMarkerManager.java new file mode 100644 index 00000000..90b88ef0 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMarkerManager.java @@ -0,0 +1,44 @@ +package tech.illuin.pipeline.sink.metering; + +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.tag.MetricTags; +import tech.illuin.pipeline.output.ComponentTag; + +import java.util.Map; +import java.util.Set; + +import static tech.illuin.pipeline.metering.MetricFunctions.combine; + +public class SinkMarkerManager implements MarkerManager +{ + private final ComponentTag tag; + private final MetricTags metricTags; + + public SinkMarkerManager(ComponentTag tag, MetricTags metricTags) + { + this.tag = tag; + this.metricTags = metricTags; + } + + @Override + public Set discriminants() + { + return Set.of( + Tag.of("pipeline", this.tag.pipelineTag().pipeline()), + Tag.of("sink", this.tag.id()) + ); + } + + @Override + public Set tags() + { + return combine(this.discriminants(), this.metricTags.asTags()); + } + + @Override + public Map markers() + { + return Map.of("sink", this.tag.id()); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMetrics.java b/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMetrics.java new file mode 100644 index 00000000..c0758410 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/sink/metering/SinkMetrics.java @@ -0,0 +1,52 @@ +package tech.illuin.pipeline.sink.metering; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.BaseMetrics; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.mdc.MDCManager; + +import java.util.Collection; + +import static tech.illuin.pipeline.metering.MeterRegistryKey.*; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class SinkMetrics extends BaseMetrics +{ + public SinkMetrics(MeterRegistry meterRegistry, SinkMarkerManager markerManager) + { + super(meterRegistry, markerManager); + } + + public SinkMetrics(MeterRegistry meterRegistry, SinkMarkerManager markerManager, MDCManager mdc) + { + super(meterRegistry, markerManager, mdc); + } + + @Override + protected ConstantMeters initializeConstantMeters(MeterRegistry meterRegistry, MarkerManager markerManager) + { + Collection meterTags = this.markerManager.tags(); + return new ConstantMeters( + meterRegistry.timer(PIPELINE_SINK_RUN_KEY.id(), fill(PIPELINE_SINK_RUN_KEY, meterTags)), + meterRegistry.counter(PIPELINE_SINK_RUN_TOTAL_KEY.id(), fill(PIPELINE_SINK_RUN_TOTAL_KEY, meterTags)), + meterRegistry.counter(PIPELINE_SINK_RUN_SUCCESS_KEY.id(), fill(PIPELINE_SINK_RUN_SUCCESS_KEY, meterTags)), + meterRegistry.counter(PIPELINE_SINK_RUN_FAILURE_KEY.id(), fill(PIPELINE_SINK_RUN_FAILURE_KEY, meterTags)) + ); + } + + @Override + public Counter errorCounter(Exception exception) + { + return this.meterRegistry.counter( + PIPELINE_SINK_ERROR_TOTAL_KEY.id(), + fill( + PIPELINE_SINK_ERROR_TOTAL_KEY, + this.markerManager.tags(Tag.of("error", exception.getClass().getName())) + ) + ); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/sink/runner/SinkRunner.java b/core/src/main/java/tech/illuin/pipeline/sink/runner/SinkRunner.java index ce89ed49..2dde81a0 100644 --- a/core/src/main/java/tech/illuin/pipeline/sink/runner/SinkRunner.java +++ b/core/src/main/java/tech/illuin/pipeline/sink/runner/SinkRunner.java @@ -6,7 +6,6 @@ import tech.illuin.pipeline.builder.runner_compiler.argument_resolver.method_arguments.MethodArguments; import tech.illuin.pipeline.builder.runner_compiler.CompiledMethod; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.observer.descriptor.describable.Describable; import tech.illuin.pipeline.output.Output; @@ -37,25 +36,24 @@ public SinkRunner(Object target) } @Override - public void execute(Output output, Context context) throws Exception + public void execute(Output output, LocalContext context) throws Exception { - if (!(context instanceof LocalContext localContext)) - throw new IllegalArgumentException("Invalid context provided to a SinkRunner instance"); - - logger.trace("{}#{} launching sink over target {}#{}", localContext.pipelineTag().pipeline(), localContext.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); + logger.trace("{}#{} launching sink over target {}#{}", context.pipelineTag().pipeline(), context.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); try { MethodArguments originalArguments = new MethodArguments<>( null, - localContext.input(), + context.input(), output.payload(), output, null, output.results(), - localContext, - localContext.pipelineTag(), - localContext.componentTag(), - localContext.uidGenerator() + context, + context.pipelineTag(), + context.componentTag(), + context.uidGenerator(), + context.observabilityManager(), + context.markerManager() ); Object[] arguments = this.argumentMappers.stream() .map(mapper -> mapper.map(originalArguments)) @@ -68,13 +66,13 @@ public void execute(Output output, Context context) throws Exception throw (Exception) e.getTargetException(); throw new StepRunnerException( - "The target method " + this.method.getName() + " of sink runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of sink runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " has thrown an unexpected exception of type " + e.getTargetException().getClass().getName(), e.getTargetException() ); } catch (IllegalAccessException e) { throw new StepRunnerException( - "The target method " + this.method.getName() + " of sink runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of sink runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " unexpectedly has illegal access", e ); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/Step.java b/core/src/main/java/tech/illuin/pipeline/step/Step.java index d15bed56..6fc799bc 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/Step.java +++ b/core/src/main/java/tech/illuin/pipeline/step/Step.java @@ -1,7 +1,7 @@ package tech.illuin.pipeline.step; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.result.Result; @@ -13,9 +13,9 @@ */ public interface Step { - Result execute(T object, I input, Object payload, ResultView results, Context context) throws Exception; + Result execute(T object, I input, Object payload, ResultView results, LocalContext context) throws Exception; - default Result execute(T object, I input, Output output, Context context) throws Exception + default Result execute(T object, I input, Output output, LocalContext context) throws Exception { return this.execute(object, input, output.payload(Object.class), output.results().view(object), context); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/builder/StepDescriptor.java b/core/src/main/java/tech/illuin/pipeline/step/builder/StepDescriptor.java index 1239f26b..17a2ba50 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/builder/StepDescriptor.java +++ b/core/src/main/java/tech/illuin/pipeline/step/builder/StepDescriptor.java @@ -1,6 +1,7 @@ package tech.illuin.pipeline.step.builder; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.Step; @@ -43,7 +44,7 @@ public final class StepDescriptor this.errorHandler = errorHandler; } - public Result execute(T data, I input, Output output, Context ctx) throws Exception + public Result execute(T data, I input, Output output, LocalContext ctx) throws Exception { return this.executionWrapper.wrap(this.step).execute(data, input, output, ctx); } @@ -53,12 +54,12 @@ public boolean canExecute(Indexable indexable, Context ctx) return this.activationPredicate.canExecute(indexable, ctx); } - public StepStrategy postEvaluation(Result result, Indexable object, I input, Context ctx) + public StepStrategy postEvaluation(Result result, Indexable object, I input, LocalContext ctx) { return this.resultEvaluator.evaluate(result, object, input, ctx); } - public Result handleException(Exception ex, I input, Object payload, Results results, Context ctx) throws Exception + public Result handleException(Exception ex, I input, Object payload, Results results, LocalContext ctx) throws Exception { return this.errorHandler.handle(ex, input, payload, results, ctx); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/builder/StepMethodArgumentResolver.java b/core/src/main/java/tech/illuin/pipeline/step/builder/StepMethodArgumentResolver.java index c071a9c2..902bc3a0 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/builder/StepMethodArgumentResolver.java +++ b/core/src/main/java/tech/illuin/pipeline/step/builder/StepMethodArgumentResolver.java @@ -32,7 +32,9 @@ public StepMethodArgumentResolver() new ResultViewMapperFactory<>(), new UIDGeneratorMapperFactory<>(), new PipelineTagMapperFactory<>(), - new ComponentTagMapperFactory<>() + new ComponentTagMapperFactory<>(), + new ObservabilityManagerMapperFactory<>(), + new MarkerManagerMapperFactory<>() ); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/execution/error/StepErrorHandler.java b/core/src/main/java/tech/illuin/pipeline/step/execution/error/StepErrorHandler.java index 48dc7c1b..9fdd97c0 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/execution/error/StepErrorHandler.java +++ b/core/src/main/java/tech/illuin/pipeline/step/execution/error/StepErrorHandler.java @@ -1,13 +1,11 @@ package tech.illuin.pipeline.step.execution.error; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.step.result.Result; import tech.illuin.pipeline.step.result.Results; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.function.Function; import static tech.illuin.pipeline.execution.error.PipelineErrorHandler.throwUnlessExcepted; @@ -17,16 +15,16 @@ @FunctionalInterface public interface StepErrorHandler { - Result handle(Exception exception, Object input, Object payload, Results results, Context context) throws Exception; + Result handle(Exception exception, Object input, Object payload, Results results, LocalContext context) throws Exception; - StepErrorHandler RETHROW_ALL = (Exception ex, Object input, Object payload, Results results, Context ctx) -> { + StepErrorHandler RETHROW_ALL = (Exception ex, Object input, Object payload, Results results, LocalContext ctx) -> { throw ex; }; @SuppressWarnings("IllegalCatch") default StepErrorHandler andThen(StepErrorHandler nextErrorHandler) { - return (Exception exception, Object input, Object payload, Results results, Context context) -> { + return (Exception exception, Object input, Object payload, Results results, LocalContext context) -> { try { return this.handle(exception, input, payload, results, context); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/metering/StepMarkerManager.java b/core/src/main/java/tech/illuin/pipeline/step/metering/StepMarkerManager.java new file mode 100644 index 00000000..2c00da10 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/step/metering/StepMarkerManager.java @@ -0,0 +1,44 @@ +package tech.illuin.pipeline.step.metering; + +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.tag.MetricTags; +import tech.illuin.pipeline.output.ComponentTag; + +import java.util.Map; +import java.util.Set; + +import static tech.illuin.pipeline.metering.MetricFunctions.combine; + +public class StepMarkerManager implements MarkerManager +{ + private final ComponentTag tag; + private final MetricTags metricTags; + + public StepMarkerManager(ComponentTag tag, MetricTags metricTags) + { + this.tag = tag; + this.metricTags = metricTags; + } + + @Override + public Set discriminants() + { + return Set.of( + Tag.of("pipeline", this.tag.pipelineTag().pipeline()), + Tag.of("step", this.tag.id()) + ); + } + + @Override + public Set tags() + { + return combine(this.discriminants(), this.metricTags.asTags()); + } + + @Override + public Map markers() + { + return Map.of("step", this.tag.id()); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/step/metering/StepMetrics.java b/core/src/main/java/tech/illuin/pipeline/step/metering/StepMetrics.java new file mode 100644 index 00000000..b4bd3ab9 --- /dev/null +++ b/core/src/main/java/tech/illuin/pipeline/step/metering/StepMetrics.java @@ -0,0 +1,63 @@ +package tech.illuin.pipeline.step.metering; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import tech.illuin.pipeline.metering.BaseMetrics; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.metering.mdc.MDCManager; +import tech.illuin.pipeline.step.result.Result; + +import java.util.Collection; + +import static tech.illuin.pipeline.metering.MeterRegistryKey.*; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class StepMetrics extends BaseMetrics +{ + public StepMetrics(MeterRegistry meterRegistry, StepMarkerManager markerManager) + { + super(meterRegistry, markerManager); + } + + public StepMetrics(MeterRegistry meterRegistry, StepMarkerManager markerManager, MDCManager mdc) + { + super(meterRegistry, markerManager, mdc); + } + + @Override + protected ConstantMeters initializeConstantMeters(MeterRegistry meterRegistry, MarkerManager markerManager) + { + Collection meterTags = this.markerManager.tags(); + return new ConstantMeters( + meterRegistry.timer(PIPELINE_STEP_RUN_KEY.id(), fill(PIPELINE_STEP_RUN_KEY, meterTags)), + meterRegistry.counter(PIPELINE_STEP_RUN_TOTAL_KEY.id(), fill(PIPELINE_STEP_RUN_TOTAL_KEY, meterTags)), + meterRegistry.counter(PIPELINE_STEP_RUN_SUCCESS_KEY.id(), fill(PIPELINE_STEP_RUN_SUCCESS_KEY, meterTags)), + meterRegistry.counter(PIPELINE_STEP_RUN_FAILURE_KEY.id(), fill(PIPELINE_STEP_RUN_FAILURE_KEY, meterTags)) + ); + } + + public Counter resultCounter(Result result) + { + return this.meterRegistry.counter( + PIPELINE_STEP_RESULT_TOTAL_KEY.id(), + fill( + PIPELINE_STEP_RESULT_TOTAL_KEY, + this.markerManager.tags(Tag.of("result", result.name())) + ) + ); + } + + public Counter errorCounter(Exception exception) + { + return this.meterRegistry.counter( + PIPELINE_STEP_ERROR_TOTAL_KEY.id(), + fill( + PIPELINE_STEP_ERROR_TOTAL_KEY, + this.markerManager.tags(Tag.of("error", exception.getClass().getName())) + ) + ); + } +} diff --git a/core/src/main/java/tech/illuin/pipeline/step/runner/StepRunner.java b/core/src/main/java/tech/illuin/pipeline/step/runner/StepRunner.java index 11947432..f92bddc4 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/runner/StepRunner.java +++ b/core/src/main/java/tech/illuin/pipeline/step/runner/StepRunner.java @@ -6,7 +6,6 @@ import tech.illuin.pipeline.builder.runner_compiler.argument_resolver.mapper_factory.MethodArgumentMapper; import tech.illuin.pipeline.builder.runner_compiler.argument_resolver.method_arguments.MethodArguments; import tech.illuin.pipeline.commons.Reflection; -import tech.illuin.pipeline.context.Context; import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.observer.descriptor.describable.Describable; @@ -40,12 +39,9 @@ public StepRunner(java.lang.Object target) } @Override - public Result execute(T object, I input, Object payload, ResultView results, Context context) throws Exception + public Result execute(T object, I input, Object payload, ResultView results, LocalContext context) throws Exception { - if (!(context instanceof LocalContext localContext)) - throw new IllegalArgumentException("Invalid context provided to a SinkRunner instance"); - - logger.trace("{}#{} launching step over target {}#{}", localContext.pipelineTag().pipeline(), localContext.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); + logger.trace("{}#{} launching step over target {}#{}", context.pipelineTag().pipeline(), context.pipelineTag().uid(), this.target.getClass().getName(), Reflection.getMethodSignature(this.method)); try { MethodArguments originalArguments = new MethodArguments<>( @@ -56,9 +52,11 @@ public Result execute(T object, I input, Object payload, ResultView results, Con results, results, context, - localContext.pipelineTag(), - localContext.componentTag(), - localContext.uidGenerator() + context.pipelineTag(), + context.componentTag(), + context.uidGenerator(), + context.observabilityManager(), + context.markerManager() ); java.lang.Object[] arguments = this.argumentMappers.stream() .map(mapper -> mapper.map(originalArguments)) @@ -83,14 +81,14 @@ public Result execute(T object, I input, Object payload, ResultView results, Con throw (Exception) e.getTargetException(); throw new StepRunnerException( - "The target method " + this.method.getName() + " of step runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of step runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " has thrown an unexpected exception of type " + e.getTargetException().getClass().getName(), e.getTargetException() ); } catch (IllegalAccessException e) { throw new StepRunnerException( - "The target method " + this.method.getName() + " of step runner " + localContext.pipelineTag().pipeline() + "#" + localContext.componentTag().id() + "The target method " + this.method.getName() + " of step runner " + context.pipelineTag().pipeline() + "#" + context.componentTag().id() + " unexpectedly has illegal access", e ); diff --git a/core/src/main/java/tech/illuin/pipeline/step/variant/IndexableStep.java b/core/src/main/java/tech/illuin/pipeline/step/variant/IndexableStep.java index 5e90b403..62f076b8 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/variant/IndexableStep.java +++ b/core/src/main/java/tech/illuin/pipeline/step/variant/IndexableStep.java @@ -1,6 +1,7 @@ package tech.illuin.pipeline.step.variant; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.Step; @@ -18,7 +19,7 @@ public interface IndexableStep extends Step /** * @see #execute(Indexable, ResultView, Context) */ - default Result execute(T object, Object input, Object payload, ResultView results, Context context) throws Exception + default Result execute(T object, Object input, Object payload, ResultView results, LocalContext context) throws Exception { return this.execute(object, results, context); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/variant/InputStep.java b/core/src/main/java/tech/illuin/pipeline/step/variant/InputStep.java index 7f6c018b..f68c848a 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/variant/InputStep.java +++ b/core/src/main/java/tech/illuin/pipeline/step/variant/InputStep.java @@ -1,6 +1,7 @@ package tech.illuin.pipeline.step.variant; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.step.Step; import tech.illuin.pipeline.step.result.Result; @@ -14,7 +15,7 @@ public interface InputStep extends Step { Result execute(I input, ResultView results, Context context) throws Exception; - default Result execute(Indexable object, I input, Object payload, ResultView results, Context context) throws Exception + default Result execute(Indexable object, I input, Object payload, ResultView results, LocalContext context) throws Exception { return this.execute(input, results, context); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/variant/PayloadStep.java b/core/src/main/java/tech/illuin/pipeline/step/variant/PayloadStep.java index adb59615..e5a23b23 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/variant/PayloadStep.java +++ b/core/src/main/java/tech/illuin/pipeline/step/variant/PayloadStep.java @@ -1,6 +1,7 @@ package tech.illuin.pipeline.step.variant; import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.step.Step; import tech.illuin.pipeline.step.result.Result; @@ -14,7 +15,7 @@ public interface PayloadStep extends Step { Result execute(Object payload, ResultView results, Context context) throws Exception; - default Result execute(Indexable object, Object input, Object payload, ResultView results, Context context) throws Exception + default Result execute(Indexable object, Object input, Object payload, ResultView results, LocalContext context) throws Exception { return this.execute(payload, results, context); } diff --git a/core/src/main/java/tech/illuin/pipeline/step/variant/PipelineStep.java b/core/src/main/java/tech/illuin/pipeline/step/variant/PipelineStep.java index 8146ea52..1762b8e7 100644 --- a/core/src/main/java/tech/illuin/pipeline/step/variant/PipelineStep.java +++ b/core/src/main/java/tech/illuin/pipeline/step/variant/PipelineStep.java @@ -3,7 +3,6 @@ import tech.illuin.pipeline.Pipeline; import tech.illuin.pipeline.PipelineException; import tech.illuin.pipeline.annotation.Experimental; -import tech.illuin.pipeline.context.Context; import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.context.SimpleContext; import tech.illuin.pipeline.input.indexer.Indexable; @@ -32,22 +31,19 @@ public PipelineStep(Pipeline pipeline, Function resultMapper) } @Override - public Result execute(Indexable object, I input, Object payload, ResultView results, Context context) throws Exception + public Result execute(Indexable object, I input, Object payload, ResultView results, LocalContext context) throws Exception { try { - if (!(context instanceof LocalContext localContext)) - throw new IllegalArgumentException("The provided context is not a LocalContext"); - /* TODO: Bad design -> this should be challenged ASAP */ Output prev = new Output( - localContext.pipelineTag(), + context.pipelineTag(), payload, context ); prev.results().register(results); Output out = this.pipeline.run(input, new SimpleContext(prev) .copyFrom(context) - .set(PARENT_PIPELINE, localContext.pipelineTag()) + .set(PARENT_PIPELINE, context.pipelineTag()) ); return this.resultMapper.apply(out); } diff --git a/core/src/test/java/tech/illuin/pipeline/builder/PipelineBuilderTest.java b/core/src/test/java/tech/illuin/pipeline/builder/PipelineBuilderTest.java index 459a295b..51f0799c 100644 --- a/core/src/test/java/tech/illuin/pipeline/builder/PipelineBuilderTest.java +++ b/core/src/test/java/tech/illuin/pipeline/builder/PipelineBuilderTest.java @@ -10,7 +10,6 @@ import tech.illuin.pipeline.execution.error.PipelineErrorHandler; import tech.illuin.pipeline.input.author_resolver.AuthorResolver; import tech.illuin.pipeline.input.indexer.Indexable; -import tech.illuin.pipeline.input.initializer.Initializer; import tech.illuin.pipeline.input.initializer.builder.InitializerAssembler; import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor; import tech.illuin.pipeline.observer.Observer; diff --git a/core/src/test/java/tech/illuin/pipeline/execution/phase/PipelinePhaseTest.java b/core/src/test/java/tech/illuin/pipeline/execution/phase/PipelinePhaseTest.java index d8fcc30e..78868cd2 100644 --- a/core/src/test/java/tech/illuin/pipeline/execution/phase/PipelinePhaseTest.java +++ b/core/src/test/java/tech/illuin/pipeline/execution/phase/PipelinePhaseTest.java @@ -4,7 +4,6 @@ import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; import tech.illuin.pipeline.builder.SimplePipelineBuilder; -import tech.illuin.pipeline.builder.VoidPayload; import tech.illuin.pipeline.generic.pipeline.TestResult; import tech.illuin.pipeline.step.execution.evaluator.ResultEvaluator; import tech.illuin.pipeline.step.execution.evaluator.StepStrategy; diff --git a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/TestAnnotatedInitializers.java b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/TestAnnotatedInitializers.java index 425dbd69..69043ada 100644 --- a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/TestAnnotatedInitializers.java +++ b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/TestAnnotatedInitializers.java @@ -2,7 +2,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.pipeline.initializer.execution.error.WrapAll; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.input.initializer.Initializer; @@ -42,7 +42,7 @@ public AnnotatedInitializer(String name, BiFunction function } @Override - public P initialize(I input, Context context, UIDGenerator generator) + public P initialize(I input, LocalContext context, UIDGenerator generator) { logger.info("test:{}: ~", this.name); return this.function.apply(generator, input); diff --git a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/execution/error/WrapAll.java b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/execution/error/WrapAll.java index 9f8f9aae..eb7fddeb 100644 --- a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/execution/error/WrapAll.java +++ b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/initializer/execution/error/WrapAll.java @@ -1,6 +1,6 @@ package tech.illuin.pipeline.generic.pipeline.initializer.execution.error; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.model.A; import tech.illuin.pipeline.input.initializer.execution.error.InitializerErrorHandler; import tech.illuin.pipeline.input.uid_generator.UIDGenerator; @@ -13,7 +13,7 @@ public class WrapAll implements InitializerErrorHandler { @Override - public A handle(Exception exception, Context context, UIDGenerator generator) + public A handle(Exception exception, LocalContext context, UIDGenerator generator) { return new A(generator.generate(), Collections.emptyList()); } diff --git a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/TestAnnotatedSinks.java b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/TestAnnotatedSinks.java index 257f60ec..6cb4b34b 100644 --- a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/TestAnnotatedSinks.java +++ b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/TestAnnotatedSinks.java @@ -2,7 +2,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.pipeline.sink.execution.error.WrapAll; import tech.illuin.pipeline.input.indexer.Indexable; import tech.illuin.pipeline.output.Output; @@ -49,7 +49,7 @@ public AnnotatedSink(String name) } @Override - public void execute(Output output, Context context) + public void execute(Output output, LocalContext context) { logger.info("test:{}: ~", this.name); this.function.accept(output.payload()); diff --git a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/execution/error/WrapAll.java b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/execution/error/WrapAll.java index de52f6b3..dd34d816 100644 --- a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/execution/error/WrapAll.java +++ b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/sink/execution/error/WrapAll.java @@ -2,7 +2,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.sink.execution.error.SinkErrorHandler; @@ -14,7 +14,7 @@ public class WrapAll implements SinkErrorHandler private static final Logger logger = LoggerFactory.getLogger(WrapAll.class); @Override - public void handle(Exception exception, Output output, Context context) + public void handle(Exception exception, Output output, LocalContext context) { logger.error(exception.getMessage()); } diff --git a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/step/execution/error/WrapAll.java b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/step/execution/error/WrapAll.java index efe4eb85..fa3e009b 100644 --- a/core/src/test/java/tech/illuin/pipeline/generic/pipeline/step/execution/error/WrapAll.java +++ b/core/src/test/java/tech/illuin/pipeline/generic/pipeline/step/execution/error/WrapAll.java @@ -1,6 +1,6 @@ package tech.illuin.pipeline.generic.pipeline.step.execution.error; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.pipeline.TestResult; import tech.illuin.pipeline.step.execution.error.StepErrorHandler; import tech.illuin.pipeline.step.result.Result; @@ -12,7 +12,7 @@ public class WrapAll implements StepErrorHandler { @Override - public Result handle(Exception exception, Object input, Object payload, Results results, Context context) + public Result handle(Exception exception, Object input, Object payload, Results results, LocalContext context) { return new TestResult("error", "ko"); } diff --git a/core/src/test/java/tech/illuin/pipeline/input/uid_generator/UIDGeneratorTest.java b/core/src/test/java/tech/illuin/pipeline/input/uid_generator/UIDGeneratorTest.java index b3a9c1bb..9e7ab208 100644 --- a/core/src/test/java/tech/illuin/pipeline/input/uid_generator/UIDGeneratorTest.java +++ b/core/src/test/java/tech/illuin/pipeline/input/uid_generator/UIDGeneratorTest.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.illuin.pipeline.Pipeline; -import tech.illuin.pipeline.builder.VoidPayload; import tech.illuin.pipeline.generic.pipeline.TestResult; import java.util.UUID; diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/DebugMDCManager.java b/core/src/test/java/tech/illuin/pipeline/metrics/DebugMDCManager.java index ef7d7da6..df2d98da 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/DebugMDCManager.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/DebugMDCManager.java @@ -1,7 +1,7 @@ package tech.illuin.pipeline.metrics; import org.slf4j.spi.MDCAdapter; -import tech.illuin.pipeline.metering.marker.MDCManager; +import tech.illuin.pipeline.metering.mdc.MDCManager; public final class DebugMDCManager implements MDCManager { diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/MetricFunctionsTest.java b/core/src/test/java/tech/illuin/pipeline/metrics/MetricFunctionsTest.java index caabd09e..8fdb61b1 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/MetricFunctionsTest.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/MetricFunctionsTest.java @@ -6,10 +6,11 @@ import tech.illuin.pipeline.metering.MeterRegistryKey; import tech.illuin.pipeline.metering.MetricFunctions; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; import static tech.illuin.pipeline.metering.MeterRegistryKey.PIPELINE_RUN_KEY; /** @@ -26,7 +27,7 @@ public void testCombineTags__shouldSucceed() Collection combined = Assertions.assertDoesNotThrow(() -> MetricFunctions.combine(listA, listB)); Assertions.assertEquals(3, combined.size()); - Assertions.assertEquals("c", combined.toArray(new Tag[3])[2].getKey()); + Assertions.assertEquals("c", combined.stream().sorted().toList().toArray(new Tag[3])[2].getKey()); } @Test diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineInitializerMetricsTest.java b/core/src/test/java/tech/illuin/pipeline/metrics/PipelineInitializerMetricsTest.java index 25a06071..18928505 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineInitializerMetricsTest.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/PipelineInitializerMetricsTest.java @@ -5,8 +5,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.helpers.BasicMDCAdapter; import org.slf4j.spi.MDCAdapter; +import tech.illuin.pipeline.input.initializer.metering.InitializationMarkerManager; import tech.illuin.pipeline.input.uid_generator.TSIDGenerator; -import tech.illuin.pipeline.metering.PipelineInitializationMetrics; +import tech.illuin.pipeline.input.initializer.metering.InitializationMetrics; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.ComponentFamily; import tech.illuin.pipeline.output.ComponentTag; @@ -24,10 +25,12 @@ public class PipelineInitializerMetricsTest @Test public void testCreate() { - Assertions.assertDoesNotThrow(() -> new PipelineInitializationMetrics( + Assertions.assertDoesNotThrow(() -> new InitializationMetrics( new SimpleMeterRegistry(), - createTag("test-create", "test-create-initializer"), - new MetricTags().put("test", "true") + new InitializationMarkerManager( + createTag("test-create", "test-create-initializer"), + new MetricTags().put("test", "true") + ) )); } @@ -35,10 +38,12 @@ public void testCreate() public void testMDC() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineInitializationMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineInitializationMetrics( + InitializationMetrics metrics = Assertions.assertDoesNotThrow(() -> new InitializationMetrics( new SimpleMeterRegistry(), - createTag("test-mdc", "test-mdc-initializer"), - new MetricTags().put("test", "true"), + new InitializationMarkerManager( + createTag("test-mdc", "test-mdc-initializer"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); @@ -58,10 +63,12 @@ public void testMDC() public void testMDCException() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineInitializationMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineInitializationMetrics( + InitializationMetrics metrics = Assertions.assertDoesNotThrow(() -> new InitializationMetrics( new SimpleMeterRegistry(), - createTag("test-mdc-exception", "test-mdc-initializer-exception"), - new MetricTags().put("test", "true"), + new InitializationMarkerManager( + createTag("test-mdc-exception", "test-mdc-initializer-exception"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineMetricsTest.java b/core/src/test/java/tech/illuin/pipeline/metrics/PipelineMetricsTest.java index 51d1791d..8ef21852 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineMetricsTest.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/PipelineMetricsTest.java @@ -6,6 +6,7 @@ import org.slf4j.helpers.BasicMDCAdapter; import org.slf4j.spi.MDCAdapter; import tech.illuin.pipeline.input.uid_generator.TSIDGenerator; +import tech.illuin.pipeline.metering.PipelineMarkerManager; import tech.illuin.pipeline.metering.PipelineMetrics; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.PipelineTag; @@ -24,8 +25,10 @@ public void testCreate() { Assertions.assertDoesNotThrow(() -> new PipelineMetrics( new SimpleMeterRegistry(), - createTag("test-create"), - new MetricTags().put("test", "true") + new PipelineMarkerManager( + createTag("test-create"), + new MetricTags().put("test", "true") + ) )); } @@ -35,8 +38,10 @@ public void testMDC() MDCAdapter adapter = new BasicMDCAdapter(); PipelineMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineMetrics( new SimpleMeterRegistry(), - createTag("test-mdc"), - new MetricTags().put("test", "true"), + new PipelineMarkerManager( + createTag("test-mdc"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); @@ -64,8 +69,10 @@ public void testMDC_shouldHandleNulls() MDCAdapter adapter = new BasicMDCAdapter(); PipelineMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineMetrics( new SimpleMeterRegistry(), - new PipelineTag(TSIDGenerator.INSTANCE.generate(), "test-mdc-nullable", null), - new MetricTags().put("test", "true"), + new PipelineMarkerManager( + new PipelineTag(TSIDGenerator.INSTANCE.generate(), "test-mdc-nullable", null), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); @@ -86,8 +93,10 @@ public void testMDCException() MDCAdapter adapter = new BasicMDCAdapter(); PipelineMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineMetrics( new SimpleMeterRegistry(), - createTag("test-mark-exception"), - new MetricTags().put("test", "true"), + new PipelineMarkerManager( + createTag("test-mark-exception"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineSinkMetricsTest.java b/core/src/test/java/tech/illuin/pipeline/metrics/SinkMetricsTest.java similarity index 72% rename from core/src/test/java/tech/illuin/pipeline/metrics/PipelineSinkMetricsTest.java rename to core/src/test/java/tech/illuin/pipeline/metrics/SinkMetricsTest.java index dc477496..860da5cd 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineSinkMetricsTest.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/SinkMetricsTest.java @@ -6,7 +6,8 @@ import org.slf4j.helpers.BasicMDCAdapter; import org.slf4j.spi.MDCAdapter; import tech.illuin.pipeline.input.uid_generator.TSIDGenerator; -import tech.illuin.pipeline.metering.PipelineSinkMetrics; +import tech.illuin.pipeline.sink.metering.SinkMarkerManager; +import tech.illuin.pipeline.sink.metering.SinkMetrics; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.ComponentFamily; import tech.illuin.pipeline.output.ComponentTag; @@ -19,15 +20,17 @@ /** * @author Pierre Lecerf (pierre.lecerf@illuin.tech) */ -public class PipelineSinkMetricsTest +public class SinkMetricsTest { @Test public void testCreate() { - Assertions.assertDoesNotThrow(() -> new PipelineSinkMetrics( + Assertions.assertDoesNotThrow(() -> new SinkMetrics( new SimpleMeterRegistry(), - createTag("test-create", "test-create-sink"), - new MetricTags().put("test", "true") + new SinkMarkerManager( + createTag("test-create", "test-create-sink"), + new MetricTags().put("test", "true") + ) )); } @@ -35,10 +38,12 @@ public void testCreate() public void testMDC() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineSinkMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineSinkMetrics( + SinkMetrics metrics = Assertions.assertDoesNotThrow(() -> new SinkMetrics( new SimpleMeterRegistry(), - createTag("test-mdc", "test-mdc-sink"), - new MetricTags().put("test", "true"), + new SinkMarkerManager( + createTag("test-mdc", "test-mdc-sink"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); @@ -58,10 +63,12 @@ public void testMDC() public void testMDCException() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineSinkMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineSinkMetrics( + SinkMetrics metrics = Assertions.assertDoesNotThrow(() -> new SinkMetrics( new SimpleMeterRegistry(), - createTag("test-mdc-exception", "test-mdc-sink-exception"), - new MetricTags().put("test", "true"), + new SinkMarkerManager( + createTag("test-mdc-exception", "test-mdc-sink-exception"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); diff --git a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineStepMetricsTest.java b/core/src/test/java/tech/illuin/pipeline/metrics/StepMetricsTest.java similarity index 72% rename from core/src/test/java/tech/illuin/pipeline/metrics/PipelineStepMetricsTest.java rename to core/src/test/java/tech/illuin/pipeline/metrics/StepMetricsTest.java index 39b854a3..7ab3e80c 100644 --- a/core/src/test/java/tech/illuin/pipeline/metrics/PipelineStepMetricsTest.java +++ b/core/src/test/java/tech/illuin/pipeline/metrics/StepMetricsTest.java @@ -6,7 +6,8 @@ import org.slf4j.helpers.BasicMDCAdapter; import org.slf4j.spi.MDCAdapter; import tech.illuin.pipeline.input.uid_generator.TSIDGenerator; -import tech.illuin.pipeline.metering.PipelineStepMetrics; +import tech.illuin.pipeline.step.metering.StepMarkerManager; +import tech.illuin.pipeline.step.metering.StepMetrics; import tech.illuin.pipeline.metering.tag.MetricTags; import tech.illuin.pipeline.output.ComponentFamily; import tech.illuin.pipeline.output.ComponentTag; @@ -19,15 +20,17 @@ /** * @author Pierre Lecerf (pierre.lecerf@illuin.tech) */ -public class PipelineStepMetricsTest +public class StepMetricsTest { @Test public void testCreate() { - Assertions.assertDoesNotThrow(() -> new PipelineStepMetrics( + Assertions.assertDoesNotThrow(() -> new StepMetrics( new SimpleMeterRegistry(), - createTag("test-create", "test-create-step"), - new MetricTags().put("test", "true") + new StepMarkerManager( + createTag("test-create", "test-create-step"), + new MetricTags().put("test", "true") + ) )); } @@ -35,10 +38,12 @@ public void testCreate() public void testMDC() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineStepMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineStepMetrics( + StepMetrics metrics = Assertions.assertDoesNotThrow(() -> new StepMetrics( new SimpleMeterRegistry(), - createTag("test-mdc", "test-mdc-step"), - new MetricTags().put("test", "true"), + new StepMarkerManager( + createTag("test-mdc", "test-mdc-step"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); @@ -58,10 +63,12 @@ public void testMDC() public void testMDCException() { MDCAdapter adapter = new BasicMDCAdapter(); - PipelineStepMetrics metrics = Assertions.assertDoesNotThrow(() -> new PipelineStepMetrics( + StepMetrics metrics = Assertions.assertDoesNotThrow(() -> new StepMetrics( new SimpleMeterRegistry(), - createTag("test-mdc-exception", "test-mdc-step-exception"), - new MetricTags().put("test", "true"), + new StepMarkerManager( + createTag("test-mdc-exception", "test-mdc-step-exception"), + new MetricTags().put("test", "true") + ), new DebugMDCManager(adapter) )); diff --git a/core/src/test/java/tech/illuin/pipeline/sink/SinkErrorHandlerTest.java b/core/src/test/java/tech/illuin/pipeline/sink/SinkErrorHandlerTest.java index e38e91d3..a925fc87 100644 --- a/core/src/test/java/tech/illuin/pipeline/sink/SinkErrorHandlerTest.java +++ b/core/src/test/java/tech/illuin/pipeline/sink/SinkErrorHandlerTest.java @@ -3,7 +3,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; -import tech.illuin.pipeline.builder.VoidPayload; import tech.illuin.pipeline.sink.execution.error.SinkErrorHandler; import java.util.concurrent.atomic.AtomicInteger; diff --git a/core/src/test/java/tech/illuin/pipeline/sink/SinkTest.java b/core/src/test/java/tech/illuin/pipeline/sink/SinkTest.java index 049d45ae..2a8514ba 100644 --- a/core/src/test/java/tech/illuin/pipeline/sink/SinkTest.java +++ b/core/src/test/java/tech/illuin/pipeline/sink/SinkTest.java @@ -3,7 +3,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; -import tech.illuin.pipeline.builder.VoidPayload; import java.util.concurrent.atomic.AtomicInteger; diff --git a/core/src/test/java/tech/illuin/pipeline/sink/annotation/PipelineSinkAnnotationTest.java b/core/src/test/java/tech/illuin/pipeline/sink/annotation/PipelineSinkAnnotationTest.java index 01f41428..2f4ee6df 100644 --- a/core/src/test/java/tech/illuin/pipeline/sink/annotation/PipelineSinkAnnotationTest.java +++ b/core/src/test/java/tech/illuin/pipeline/sink/annotation/PipelineSinkAnnotationTest.java @@ -319,44 +319,63 @@ public void testPipeline__shouldCompile_uidGenerator() Assertions.assertEquals("KSUIDGenerator", collector.get()); } + @Test + public void testPipeline__shouldCompile_observabilityManager() + { + StringCollector collector = new StringCollector(); + Pipeline pipeline = Assertions.assertDoesNotThrow(() -> createPipeline_observabilityManager("observability-manager", collector)); + + Assertions.assertDoesNotThrow(() -> pipeline.run("input")); + Assertions.assertDoesNotThrow(pipeline::close); + + Assertions.assertEquals("SimpleMeterRegistry", collector.get()); + } + + @Test + public void testPipeline__shouldCompile_markerManager() + { + StringCollector collector = new StringCollector(); + Pipeline pipeline = Assertions.assertDoesNotThrow(() -> createPipeline_markerManager("test-marker-manager", collector)); + + Assertions.assertDoesNotThrow(() -> pipeline.run("input")); + Assertions.assertDoesNotThrow(pipeline::close); + + Assertions.assertEquals("SinkMarkerManager", collector.get()); + } + public static Pipeline createPipeline_input(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(new SinkWithInput<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_input_assembler(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(builder -> builder.sink(new SinkWithInput<>(collector))) - .build() - ; + .build(); } public static Pipeline createPipeline_input_of(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(Sink.of(new SinkWithInput<>(collector))) - .build() - ; + .build(); } public static Pipeline createPipeline_input_exception(String name) { return Pipeline.of(name) .registerSink(Sink.of(new SinkWithException<>())) - .build() - ; + .build(); } public static Pipeline createPipeline_input_returnValue(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(new SinkWithInputAndReturnValue<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_payload(String name, StringCollector collector) @@ -364,8 +383,7 @@ public static Pipeline createPipeline_payload(String name, StringCollect return Pipeline.of(name, (Initializer) (input, context, generator) -> new TestPayload(new TestObject(generator.generate()))) .registerIndexer((SingleIndexer) TestPayload::object) .registerSink(new SinkWithPayload(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_output(String name, StringCollector collector) @@ -373,8 +391,7 @@ public static Pipeline createPipeline_output(String name, StringCollecto return Pipeline.of(name, (Initializer) (input, context, generator) -> new TestPayload(new TestObject(generator.generate()))) .registerIndexer((SingleIndexer) TestPayload::object) .registerSink(new SinkWithOutput(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_current(String name, StringCollector collector) @@ -382,8 +399,7 @@ public static Pipeline createPipeline_current(String name, StringCollect return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndCurrent<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_currentOptional(String name, StringCollector collector) @@ -391,8 +407,7 @@ public static Pipeline createPipeline_currentOptional(String name, Strin return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndCurrentOptional<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_currentStream(String name, StringCollector collector) @@ -400,8 +415,7 @@ public static Pipeline createPipeline_currentStream(String name, StringC return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndCurrentStream<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_currentNamed(String name, StringCollector collector) @@ -409,8 +423,7 @@ public static Pipeline createPipeline_currentNamed(String name, StringCo return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndCurrent.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_currentOptionalNamed(String name, StringCollector collector) @@ -418,8 +431,7 @@ public static Pipeline createPipeline_currentOptionalNamed(String name, return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndCurrentOptional.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_currentStreamNamed(String name, StringCollector collector) @@ -427,8 +439,7 @@ public static Pipeline createPipeline_currentStreamNamed(String name, St return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndCurrentStream.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latest(String name, StringCollector collector) @@ -436,8 +447,7 @@ public static Pipeline createPipeline_latest(String name, StringCollecto return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndLatest<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latestOptional(String name, StringCollector collector) @@ -445,8 +455,7 @@ public static Pipeline createPipeline_latestOptional(String name, String return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndLatestOptional<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latestStream(String name, StringCollector collector) @@ -454,8 +463,7 @@ public static Pipeline createPipeline_latestStream(String name, StringCo return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithInputAndLatestStream<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latestNamed(String name, StringCollector collector) @@ -463,8 +471,7 @@ public static Pipeline createPipeline_latestNamed(String name, StringCol return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndLatest.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latestOptionalNamed(String name, StringCollector collector) @@ -472,8 +479,7 @@ public static Pipeline createPipeline_latestOptionalNamed(String name, S return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndLatestOptional.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_latestStreamNamed(String name, StringCollector collector) @@ -481,16 +487,14 @@ public static Pipeline createPipeline_latestStreamNamed(String name, Str return Pipeline.of(name) .registerStep(new StepWithInput<>("annotation-named")) .registerSink(new SinkWithInputAndLatestStream.Named<>(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_tags(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(new SinkWithTags(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_results(String name, StringCollector collector) @@ -498,16 +502,14 @@ public static Pipeline createPipeline_results(String name, StringCollect return Pipeline.of(name) .registerStep(new StepWithInput<>()) .registerSink(new SinkWithResults(collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_context(String name, String metadataKey, StringCollector collector) { return Pipeline.of(name) .registerSink(new SinkWithContext(metadataKey, collector)) - .build() - ; + .build(); } public static Pipeline createPipeline_contextKey(String name, List collector) @@ -516,16 +518,28 @@ public static Pipeline createPipeline_contextKey(String name, List createPipeline_uidGenerator(String name, StringCollector collector) { return Pipeline.of(name) .registerSink(new SinkWithUIDGenerator(collector)) - .build() - ; + .build(); + } + + public static Pipeline createPipeline_observabilityManager(String name, StringCollector collector) + { + return Pipeline.of(name) + .registerSink(new SinkWithObservabilityManager(collector)) + .build(); + } + + public static Pipeline createPipeline_markerManager(String name, StringCollector collector) + { + return Pipeline.of(name) + .registerSink(new SinkWithMarkerManager(collector)) + .build(); } public static class StringCollector diff --git a/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithMarkerManager.java b/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithMarkerManager.java new file mode 100644 index 00000000..52af6259 --- /dev/null +++ b/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithMarkerManager.java @@ -0,0 +1,29 @@ +package tech.illuin.pipeline.sink.annotation.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.sink.annotation.PipelineSinkAnnotationTest.StringCollector; +import tech.illuin.pipeline.sink.annotation.SinkConfig; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class SinkWithMarkerManager +{ + private final StringCollector collector; + + private static final Logger logger = LoggerFactory.getLogger(SinkWithMarkerManager.class); + + public SinkWithMarkerManager(StringCollector collector) + { + this.collector = collector; + } + + @SinkConfig(id = "sink-with_marker-manager") + public void execute(MarkerManager marker) + { + logger.info("markers={} tags={} discriminants={}", marker.markers(), marker.tags(), marker.discriminants()); + this.collector.update(marker.getClass().getSimpleName()); + } +} diff --git a/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithObservabilityManager.java b/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithObservabilityManager.java new file mode 100644 index 00000000..fdf84c6c --- /dev/null +++ b/core/src/test/java/tech/illuin/pipeline/sink/annotation/sink/SinkWithObservabilityManager.java @@ -0,0 +1,29 @@ +package tech.illuin.pipeline.sink.annotation.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.illuin.pipeline.metering.manager.ObservabilityManager; +import tech.illuin.pipeline.sink.annotation.PipelineSinkAnnotationTest.StringCollector; +import tech.illuin.pipeline.sink.annotation.SinkConfig; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class SinkWithObservabilityManager +{ + private final StringCollector collector; + + private static final Logger logger = LoggerFactory.getLogger(SinkWithObservabilityManager.class); + + public SinkWithObservabilityManager(StringCollector collector) + { + this.collector = collector; + } + + @SinkConfig(id = "sink-with_observability-manager") + public void execute(ObservabilityManager manager) + { + logger.info("observability-manager: meterRegistry={} tracer={}", manager.meterRegistry().getClass().getName(), manager.tracer().getClass().getName()); + this.collector.update(manager.meterRegistry().getClass().getSimpleName()); + } +} diff --git a/core/src/test/java/tech/illuin/pipeline/step/AnnotationTest.java b/core/src/test/java/tech/illuin/pipeline/step/AnnotationTest.java index 86c8a515..3839df4f 100644 --- a/core/src/test/java/tech/illuin/pipeline/step/AnnotationTest.java +++ b/core/src/test/java/tech/illuin/pipeline/step/AnnotationTest.java @@ -3,10 +3,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.TestFactory; import tech.illuin.pipeline.generic.model.A; import tech.illuin.pipeline.generic.pipeline.step.TestAnnotatedSteps; import tech.illuin.pipeline.input.indexer.SingleIndexer; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.output.Output; import java.util.List; @@ -40,7 +42,7 @@ public void testPipeline_withAnnotatedSteps() public static Pipeline createAnnotatedPipeline() { - return Pipeline.of("test-annotated", TestFactory::initializerOfEmpty) + return Pipeline.of("test-annotated", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializerOfEmpty(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerStep(new TestAnnotatedSteps.ClassActivation<>("1", "ok")) .registerStep(new TestAnnotatedSteps.ConditionActivation<>("2", "ok")) diff --git a/core/src/test/java/tech/illuin/pipeline/step/EvaluationTest.java b/core/src/test/java/tech/illuin/pipeline/step/EvaluationTest.java index 1eeccdfc..bbd30e72 100644 --- a/core/src/test/java/tech/illuin/pipeline/step/EvaluationTest.java +++ b/core/src/test/java/tech/illuin/pipeline/step/EvaluationTest.java @@ -3,9 +3,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; -import tech.illuin.pipeline.builder.VoidPayload; import tech.illuin.pipeline.context.ComponentContext; -import tech.illuin.pipeline.context.Context; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.TestFactory; import tech.illuin.pipeline.generic.model.A; import tech.illuin.pipeline.generic.model.B; @@ -13,12 +12,11 @@ import tech.illuin.pipeline.generic.pipeline.step.TestStep; import tech.illuin.pipeline.input.indexer.MultiIndexer; import tech.illuin.pipeline.input.indexer.SingleIndexer; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.output.ComponentTag; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.execution.evaluator.ResultEvaluator; import tech.illuin.pipeline.step.execution.interruption.Interruption; -import tech.illuin.pipeline.step.result.Result; -import tech.illuin.pipeline.step.result.ResultView; import tech.illuin.pipeline.step.variant.InputStep; import java.util.List; @@ -136,7 +134,7 @@ private static void testPipeline__onInterrupt(String name, ResultEvaluator evalu public static Pipeline createAbortingPipeline() { - return Pipeline.of("test-abort", TestFactory::initializer) + return Pipeline.of("test-abort", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializer(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerIndexer((MultiIndexer) A::bs) .registerStep(builder -> builder diff --git a/core/src/test/java/tech/illuin/pipeline/step/PinTest.java b/core/src/test/java/tech/illuin/pipeline/step/PinTest.java index 2d225067..961c735f 100644 --- a/core/src/test/java/tech/illuin/pipeline/step/PinTest.java +++ b/core/src/test/java/tech/illuin/pipeline/step/PinTest.java @@ -3,12 +3,14 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.TestFactory; import tech.illuin.pipeline.generic.model.A; import tech.illuin.pipeline.generic.pipeline.TestResult; import tech.illuin.pipeline.generic.pipeline.step.TestStep; import tech.illuin.pipeline.input.indexer.MultiIndexer; import tech.illuin.pipeline.input.indexer.SingleIndexer; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.execution.evaluator.ResultEvaluator; import tech.illuin.pipeline.step.result.Result; @@ -50,7 +52,7 @@ private void testPipeline_shouldHandleErrorWithPinned__runTest(ResultEvaluator e public static Pipeline createErrorHandledWithPinnedPipeline(ResultEvaluator evaluator) { - return Pipeline.of("test-error-handled-with-pinned", TestFactory::initializer) + return Pipeline.of("test-error-handled-with-pinned", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializer(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerIndexer((MultiIndexer) A::bs) .registerStep(builder -> builder diff --git a/core/src/test/java/tech/illuin/pipeline/step/StepErrorHandlerTest.java b/core/src/test/java/tech/illuin/pipeline/step/StepErrorHandlerTest.java index cc968dd7..54239b88 100644 --- a/core/src/test/java/tech/illuin/pipeline/step/StepErrorHandlerTest.java +++ b/core/src/test/java/tech/illuin/pipeline/step/StepErrorHandlerTest.java @@ -3,6 +3,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.generic.TestFactory; import tech.illuin.pipeline.generic.model.A; import tech.illuin.pipeline.generic.model.B; @@ -10,6 +11,7 @@ import tech.illuin.pipeline.generic.pipeline.step.TestStep; import tech.illuin.pipeline.input.indexer.MultiIndexer; import tech.illuin.pipeline.input.indexer.SingleIndexer; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.output.Output; import tech.illuin.pipeline.step.execution.error.StepErrorHandler; import tech.illuin.pipeline.step.execution.evaluator.ResultEvaluator; @@ -81,7 +83,7 @@ public void testPipeline_firstErrorHandlerFails_shouldHandleError() public static Pipeline createErrorHandledPipeline() { - return Pipeline.of("test-error-handled", TestFactory::initializer) + return Pipeline.of("test-error-handled", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializer(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerIndexer((MultiIndexer) A::bs) .registerStep(builder -> builder @@ -111,7 +113,7 @@ public static Pipeline createErrorHandledPipelineWithTwoErrorHandlers(Atom }; StepErrorHandler secondErrorhandler = (ex, in, payload, res, ctx) -> new TestResult("error", "ko"); - return Pipeline.of("test-composite-error-handled", TestFactory::initializer) + return Pipeline.of("test-composite-error-handled", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializer(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerIndexer((MultiIndexer) A::bs) .registerStep(builder -> builder @@ -135,7 +137,7 @@ public static Pipeline createErrorHandledPipelineWithTwoErrorHandlers(Atom public static Pipeline createErrorThrownPipeline() { - return Pipeline.of("test-error-thrown", TestFactory::initializerOfEmpty) + return Pipeline.of("test-error-thrown", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializerOfEmpty(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerStep(builder -> builder .step(new TestStep<>("1", "ok")) diff --git a/core/src/test/java/tech/illuin/pipeline/step/annotation/PipelineStepAnnotationTest.java b/core/src/test/java/tech/illuin/pipeline/step/annotation/PipelineStepAnnotationTest.java index b517be4f..50e969ec 100644 --- a/core/src/test/java/tech/illuin/pipeline/step/annotation/PipelineStepAnnotationTest.java +++ b/core/src/test/java/tech/illuin/pipeline/step/annotation/PipelineStepAnnotationTest.java @@ -352,6 +352,34 @@ public void testPipeline__shouldCompile_uidGenerator() ); } + @Test + public void testPipeline__shouldCompile_observabilityManager() + { + Pipeline pipeline = Assertions.assertDoesNotThrow(() -> createPipeline_observabilityManager("test-observability-manager")); + + Output output = Assertions.assertDoesNotThrow(() -> pipeline.run("input")); + Assertions.assertDoesNotThrow(pipeline::close); + + Assertions.assertEquals( + "SimpleMeterRegistry", + output.results().current(TestResult.class).map(TestResult::status).orElse(null) + ); + } + + @Test + public void testPipeline__shouldCompile_markerManager() + { + Pipeline pipeline = Assertions.assertDoesNotThrow(() -> createPipeline_markerManager("test-marker-manager")); + + Output output = Assertions.assertDoesNotThrow(() -> pipeline.run("input")); + Assertions.assertDoesNotThrow(pipeline::close); + + Assertions.assertEquals( + "StepMarkerManager", + output.results().current(TestResult.class).map(TestResult::status).orElse(null) + ); + } + @Test public void testPipeline__shouldCompile_multiResult() { @@ -534,6 +562,20 @@ public static Pipeline createPipeline_uidGenerator(String name) .build(); } + public static Pipeline createPipeline_observabilityManager(String name) + { + return Pipeline.of(name) + .registerStep(new StepWithObservabilityManager()) + .build(); + } + + public static Pipeline createPipeline_markerManager(String name) + { + return Pipeline.of(name) + .registerStep(new StepWithMarkerManager()) + .build(); + } + public static Pipeline createPipeline_multiResult(String name) { return Pipeline.of(name) diff --git a/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithMarkerManager.java b/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithMarkerManager.java new file mode 100644 index 00000000..a17f36a2 --- /dev/null +++ b/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithMarkerManager.java @@ -0,0 +1,23 @@ +package tech.illuin.pipeline.step.annotation.step; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.illuin.pipeline.generic.pipeline.TestResult; +import tech.illuin.pipeline.metering.MarkerManager; +import tech.illuin.pipeline.step.annotation.StepConfig; +import tech.illuin.pipeline.step.result.Result; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class StepWithMarkerManager +{ + private static final Logger logger = LoggerFactory.getLogger(StepWithMarkerManager.class); + + @StepConfig(id = "step-with_marker-manager") + public Result execute(MarkerManager marker) + { + logger.info("markers={} tags={} discriminants={}", marker.markers(), marker.tags(), marker.discriminants()); + return new TestResult("annotation-test", marker.getClass().getSimpleName()); + } +} diff --git a/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithObservabilityManager.java b/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithObservabilityManager.java new file mode 100644 index 00000000..decaf29e --- /dev/null +++ b/core/src/test/java/tech/illuin/pipeline/step/annotation/step/StepWithObservabilityManager.java @@ -0,0 +1,23 @@ +package tech.illuin.pipeline.step.annotation.step; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.illuin.pipeline.generic.pipeline.TestResult; +import tech.illuin.pipeline.metering.manager.ObservabilityManager; +import tech.illuin.pipeline.step.annotation.StepConfig; +import tech.illuin.pipeline.step.result.Result; + +/** + * @author Pierre Lecerf (pierre.lecerf@illuin.tech) + */ +public class StepWithObservabilityManager +{ + private static final Logger logger = LoggerFactory.getLogger(StepWithObservabilityManager.class); + + @StepConfig(id = "step-with_observability-manager") + public Result execute(ObservabilityManager manager) + { + logger.info("observability-manager: meterRegistry={} tracer={}", manager.meterRegistry().getClass().getName(), manager.tracer().getClass().getName()); + return new TestResult("annotation-test", manager.meterRegistry().getClass().getSimpleName()); + } +} diff --git a/doc/pipelines.md b/doc/pipelines.md index 63c9311e..e7748bb6 100644 --- a/doc/pipelines.md +++ b/doc/pipelines.md @@ -13,7 +13,7 @@ First, we get a builder from one of the `of` methods: ```java // A call to Pipeline.of(String) with type hinting is the most common scenario -// The type corresponds to the expected input type, the String argument is the pipeline name that will be tracked in tags, metrics and logs +// The type corresponds to the expected input type, the String argument is the pipeline name that will be tracked in tags, metering and logs var builder = Pipeline.of("my-pipeline"); ``` diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/circuitbreaker/CircuitBreakerSink.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/circuitbreaker/CircuitBreakerSink.java index a538ce41..c9623852 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/circuitbreaker/CircuitBreakerSink.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/circuitbreaker/CircuitBreakerSink.java @@ -2,9 +2,9 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.CircuitBreakerException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; +import tech.illuin.pipeline.resilience4j.execution.wrapper.CircuitBreakerException; import tech.illuin.pipeline.sink.Sink; import tech.illuin.pipeline.sink.execution.wrapper.SinkWrapperException; @@ -26,7 +26,7 @@ public CircuitBreakerSink(Sink sink, CircuitBreaker circuitBreaker) @Override @SuppressWarnings("IllegalCatch") - public void execute(Output output, Context context) throws Exception + public void execute(Output output, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -44,7 +44,7 @@ public void execute(Output output, Context context) throws Exception } @SuppressWarnings("IllegalCatch") - private boolean executeSink(Output output, Context context) throws SinkWrapperException + private boolean executeSink(Output output, LocalContext context) throws SinkWrapperException { try { this.sink.execute(output, context); diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/retry/RetrySink.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/retry/RetrySink.java index 7065ceee..86343194 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/retry/RetrySink.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/retry/RetrySink.java @@ -2,9 +2,9 @@ import io.github.resilience4j.retry.Retry; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; +import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryException; import tech.illuin.pipeline.sink.Sink; import tech.illuin.pipeline.sink.execution.wrapper.SinkWrapperException; @@ -26,7 +26,7 @@ public RetrySink(Sink sink, Retry retry) @Override @SuppressWarnings("IllegalCatch") - public void execute(Output output, Context context) throws Exception + public void execute(Output output, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -44,7 +44,7 @@ public void execute(Output output, Context context) throws Exception } @SuppressWarnings("IllegalCatch") - private boolean executeSink(Output output, Context context) throws SinkWrapperException + private boolean executeSink(Output output, LocalContext context) throws SinkWrapperException { try { this.sink.execute(output, context); diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/timelimiter/TimeLimiterSink.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/timelimiter/TimeLimiterSink.java index 1fb92f36..5743359b 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/timelimiter/TimeLimiterSink.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/sink/wrapper/timelimiter/TimeLimiterSink.java @@ -2,9 +2,9 @@ import io.github.resilience4j.timelimiter.TimeLimiter; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.output.Output; +import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterException; import tech.illuin.pipeline.sink.Sink; import java.util.Map; @@ -28,7 +28,7 @@ public TimeLimiterSink(Sink sink, TimeLimiter limiter, ExecutorService executor) @Override @SuppressWarnings("IllegalCatch") - public void execute(Output output, Context context) throws Exception + public void execute(Output output, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -47,7 +47,7 @@ public void execute(Output output, Context context) throws Exception } @SuppressWarnings("IllegalCatch") - private void executeSink(Output output, Context context) throws TimeLimiterSinkException + private void executeSink(Output output, LocalContext context) throws TimeLimiterSinkException { try { this.sink.execute(output, context); diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/circuitbreaker/CircuitBreakerStep.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/circuitbreaker/CircuitBreakerStep.java index a1e28e0e..bd0191c8 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/circuitbreaker/CircuitBreakerStep.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/circuitbreaker/CircuitBreakerStep.java @@ -2,9 +2,9 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.CircuitBreakerException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; +import tech.illuin.pipeline.resilience4j.execution.wrapper.CircuitBreakerException; import tech.illuin.pipeline.step.Step; import tech.illuin.pipeline.step.execution.wrapper.StepWrapperException; import tech.illuin.pipeline.step.result.Result; @@ -28,7 +28,7 @@ public CircuitBreakerStep(Step step, CircuitBreaker circuitBreaker) @Override @SuppressWarnings("IllegalCatch") - public Result execute(T object, I input, Object payload, ResultView view, Context context) throws Exception + public Result execute(T object, I input, Object payload, ResultView view, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -46,7 +46,7 @@ public Result execute(T object, I input, Object payload, ResultView view, Contex } @SuppressWarnings("IllegalCatch") - private Result executeStep(T object, I input, Object payload, ResultView view, Context context) throws StepWrapperException + private Result executeStep(T object, I input, Object payload, ResultView view, LocalContext context) throws StepWrapperException { try { return this.step.execute(object, input, payload, view, context); diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/retry/RetryStep.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/retry/RetryStep.java index 2e176371..34fc4ee8 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/retry/RetryStep.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/retry/RetryStep.java @@ -2,9 +2,9 @@ import io.github.resilience4j.retry.Retry; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; +import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryException; import tech.illuin.pipeline.step.Step; import tech.illuin.pipeline.step.execution.wrapper.StepWrapperException; import tech.illuin.pipeline.step.result.Result; @@ -28,7 +28,7 @@ public RetryStep(Step step, Retry retry) @Override @SuppressWarnings("IllegalCatch") - public Result execute(T object, I input, Object payload, ResultView view, Context context) throws Exception + public Result execute(T object, I input, Object payload, ResultView view, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -46,7 +46,7 @@ public Result execute(T object, I input, Object payload, ResultView view, Contex } @SuppressWarnings("IllegalCatch") - private Result executeStep(T object, I input, Object payload, ResultView view, Context context) throws StepWrapperException + private Result executeStep(T object, I input, Object payload, ResultView view, LocalContext context) throws StepWrapperException { try { return this.step.execute(object, input, payload, view, context); diff --git a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/timelimiter/TimeLimiterStep.java b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/timelimiter/TimeLimiterStep.java index a13de768..465b3279 100644 --- a/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/timelimiter/TimeLimiterStep.java +++ b/resilience4j/src/main/java/tech/illuin/pipeline/resilience4j/step/wrapper/timelimiter/TimeLimiterStep.java @@ -2,9 +2,9 @@ import io.github.resilience4j.timelimiter.TimeLimiter; import org.slf4j.MDC; -import tech.illuin.pipeline.context.Context; -import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterException; +import tech.illuin.pipeline.context.LocalContext; import tech.illuin.pipeline.input.indexer.Indexable; +import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterException; import tech.illuin.pipeline.step.Step; import tech.illuin.pipeline.step.execution.wrapper.StepWrapperException; import tech.illuin.pipeline.step.result.Result; @@ -31,7 +31,7 @@ public TimeLimiterStep(Step step, TimeLimiter limiter, ExecutorService exe @Override @SuppressWarnings("IllegalCatch") - public Result execute(T object, I input, Object payload, ResultView view, Context context) throws Exception + public Result execute(T object, I input, Object payload, ResultView view, LocalContext context) throws Exception { try { Map mdc = MDC.getCopyOfContextMap(); @@ -50,7 +50,7 @@ public Result execute(T object, I input, Object payload, ResultView view, Contex } @SuppressWarnings("IllegalCatch") - private Result executeStep(T object, I input, Object payload, ResultView view, Context context) throws StepWrapperException + private Result executeStep(T object, I input, Object payload, ResultView view, LocalContext context) throws StepWrapperException { try { return this.step.execute(object, input, payload, view, context); diff --git a/resilience4j/src/test/java/tech/illuin/pipeline/sink/SinkWrapperTest.java b/resilience4j/src/test/java/tech/illuin/pipeline/sink/SinkWrapperTest.java index d0ba5fc9..b4db7ccc 100644 --- a/resilience4j/src/test/java/tech/illuin/pipeline/sink/SinkWrapperTest.java +++ b/resilience4j/src/test/java/tech/illuin/pipeline/sink/SinkWrapperTest.java @@ -5,6 +5,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryWrapper; import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterWrapper; import tech.illuin.pipeline.generic.TestFactory; @@ -45,7 +47,7 @@ public static Pipeline createPipeline(AtomicInteger counter) .build() ); - return Pipeline.of("test-wrapper-combined", TestFactory::initializerOfEmpty) + return Pipeline.of("test-wrapper-combined", (Void input, LocalContext context1, UIDGenerator generator) -> TestFactory.initializerOfEmpty(input, context1, generator)) .registerIndexer(SingleIndexer.auto()) .registerSink(builder -> builder .sink((output, context) -> { diff --git a/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperRetryTest.java b/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperRetryTest.java index 687b325b..7f2a7d95 100644 --- a/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperRetryTest.java +++ b/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperRetryTest.java @@ -4,6 +4,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryWrapper; import tech.illuin.pipeline.generic.TestFactory; import tech.illuin.pipeline.generic.model.A; @@ -42,7 +44,7 @@ public void testPipeline_shouldRetryException() public static Pipeline createErrorRetryPipeline() { var counter = new AtomicInteger(0); - return Pipeline.of("test-error-retry", TestFactory::initializerOfEmpty) + return Pipeline.of("test-error-retry", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializerOfEmpty(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerStep(new TestStep<>("1", "ok")) .registerStep(builder -> builder diff --git a/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperTest.java b/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperTest.java index cb36353b..e1921e74 100644 --- a/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperTest.java +++ b/resilience4j/src/test/java/tech/illuin/pipeline/step/StepWrapperTest.java @@ -5,6 +5,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import tech.illuin.pipeline.Pipeline; +import tech.illuin.pipeline.context.LocalContext; +import tech.illuin.pipeline.input.uid_generator.UIDGenerator; import tech.illuin.pipeline.resilience4j.execution.wrapper.RetryWrapper; import tech.illuin.pipeline.resilience4j.execution.wrapper.TimeLimiterWrapper; import tech.illuin.pipeline.generic.TestFactory; @@ -57,7 +59,7 @@ public static Pipeline createPipeline() ); var counter = new AtomicInteger(0); - return Pipeline.of("test-wrapper-combined", TestFactory::initializerOfEmpty) + return Pipeline.of("test-wrapper-combined", (Void input, LocalContext context, UIDGenerator generator) -> TestFactory.initializerOfEmpty(input, context, generator)) .registerIndexer(SingleIndexer.auto()) .registerStep(new TestStep<>("1", "ok")) .registerStep(builder -> builder