Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.gitignore~
**/target
/logs
/data
*.iml
.idea
.java-version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.illuin.pipeline.input.indexer.Indexer;
import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor;
import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.PipelineMarkerManager;
import tech.illuin.pipeline.metering.PipelineMetrics;
import tech.illuin.pipeline.metering.manager.ObservabilityManager;
import tech.illuin.pipeline.metering.tag.MetricTags;
Expand Down Expand Up @@ -120,7 +121,8 @@ public Output run(I input, Context context) throws PipelineException
{
PipelineTag tag = this.createTag(input, context);
MetricTags metricTags = this.tagResolver.resolve(input, context);
PipelineMetrics metrics = new PipelineMetrics(this.observabilityManager.meterRegistry(), tag, metricTags);
PipelineMarkerManager markerManager = new PipelineMarkerManager(tag, metricTags);
PipelineMetrics metrics = new PipelineMetrics(this.observabilityManager.meterRegistry(), markerManager);
IO<I> io = new IO<>(tag, input);

long start = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tech.illuin.pipeline.builder.runner_compiler.argument_resolver.mapper_factory;

import tech.illuin.pipeline.metering.MarkerManager;

import java.lang.annotation.Annotation;
import java.lang.reflect.Parameter;

/**
* @author Pierre Lecerf (pierre.lecerf@illuin.tech)
*/
public class MarkerManagerMapperFactory<T, I> implements MethodArgumentMapperFactory<T, I>
{
@Override
public boolean canHandle(Annotation category, Class<?> parameterType)
{
return MarkerManager.class.isAssignableFrom(parameterType);
}

@Override
public MethodArgumentMapper<T, I> produce(Annotation category, Parameter parameter)
{
Class<?> parameterType = parameter.getType();
return args -> {
if (!parameterType.isAssignableFrom(args.markerManager().getClass()))
throw new IllegalArgumentException("This component expects the marker generator to be a subtype of " + parameterType.getSimpleName() + " actual manager is of type " + args.markerManager().getClass().getSimpleName());
return args.markerManager();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import tech.illuin.pipeline.context.Context;
import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.MarkerManager;
import tech.illuin.pipeline.output.ComponentTag;
import tech.illuin.pipeline.output.Output;
import tech.illuin.pipeline.output.PipelineTag;
Expand All @@ -21,5 +22,6 @@ public record MethodArguments<T, I>(
Context context,
PipelineTag pipelineTag,
ComponentTag componentTag,
UIDGenerator uidGenerator
UIDGenerator uidGenerator,
MarkerManager markerManager
) {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tech.illuin.pipeline.context;

import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.MarkerManager;
import tech.illuin.pipeline.output.ComponentTag;
import tech.illuin.pipeline.output.Output;
import tech.illuin.pipeline.output.PipelineTag;
Expand All @@ -15,22 +16,22 @@ public class ComponentContext implements LocalContext
{
private final Context globalContext;
private final Object input;
private final PipelineTag pipelineTag;
private final ComponentTag componentTag;
private final UIDGenerator uidGenerator;
private final MarkerManager markerManager;

public ComponentContext(
Context globalContext,
Object input,
PipelineTag pipelineTag,
ComponentTag componentTag,
UIDGenerator uidGenerator
UIDGenerator uidGenerator,
MarkerManager markerManager
) {
this.globalContext = globalContext;
this.input = input;
this.pipelineTag = pipelineTag;
this.componentTag = componentTag;
this.uidGenerator = uidGenerator;
this.markerManager = markerManager;
}

@Override
Expand All @@ -42,7 +43,7 @@ public Object input()
@Override
public PipelineTag pipelineTag()
{
return this.pipelineTag;
return this.componentTag.pipelineTag();
}

@Override
Expand All @@ -57,6 +58,12 @@ public UIDGenerator uidGenerator()
return this.uidGenerator;
}

@Override
public MarkerManager markerManager()
{
return this.markerManager;
}

@Override
public Optional<Output> parent()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tech.illuin.pipeline.context;

import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.MarkerManager;
import tech.illuin.pipeline.output.ComponentTag;
import tech.illuin.pipeline.output.PipelineTag;

Expand All @@ -16,4 +17,6 @@ public interface LocalContext extends Context
ComponentTag componentTag();

UIDGenerator uidGenerator();

MarkerManager markerManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import org.slf4j.LoggerFactory;
import tech.illuin.pipeline.context.ComponentContext;
import tech.illuin.pipeline.context.Context;
import tech.illuin.pipeline.context.LocalContext;
import tech.illuin.pipeline.execution.phase.IO;
import tech.illuin.pipeline.execution.phase.PipelinePhase;
import tech.illuin.pipeline.execution.phase.PipelineStrategy;
import tech.illuin.pipeline.input.indexer.Indexer;
import tech.illuin.pipeline.input.initializer.builder.InitializerDescriptor;
import tech.illuin.pipeline.input.initializer.metering.InitializationMarkerManager;
import tech.illuin.pipeline.input.initializer.metering.InitializationMetrics;
import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.PipelineInitializationMetrics;
import tech.illuin.pipeline.metering.manager.ObservabilityManager;
import tech.illuin.pipeline.metering.tag.MetricTags;
import tech.illuin.pipeline.metering.tag.TagResolver;
Expand Down Expand Up @@ -60,7 +62,9 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E

ComponentTag tag = this.createTag(io.tag(), this.initializer);
MetricTags metricTags = this.tagResolver.resolve(io.input(), context);
PipelineInitializationMetrics metrics = new PipelineInitializationMetrics(this.observabilityManager.meterRegistry(), tag, metricTags);
InitializationMarkerManager markerManager = new InitializationMarkerManager(tag, metricTags);
InitializationMetrics metrics = new InitializationMetrics(this.observabilityManager.meterRegistry(), markerManager);
LocalContext localContext = new ComponentContext(context, io.input(), tag, this.uidGenerator, markerManager);

long start = System.nanoTime();
metrics.setMDC();
Expand All @@ -69,8 +73,8 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E
{
span.tag("uid", tag.pipelineTag().uid());

Object payload = this.runInitializer(io.input(), tag, context, metrics);
Output output = this.runOutputFactory(io.input(), io.tag(), payload, context);
Object payload = this.runInitializer(io.input(), tag, localContext, metrics);
Output output = this.runOutputFactory(io.input(), io.tag(), payload, localContext);
this.runIndexers(io.tag(), payload, output);

metrics.successCounter().increment();
Expand All @@ -94,17 +98,16 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags tags) throws E
}

@SuppressWarnings("IllegalCatch")
private Object runInitializer(I input, ComponentTag tag, Context context, PipelineInitializationMetrics metrics) throws Exception
private Object runInitializer(I input, ComponentTag tag, LocalContext context, InitializationMetrics metrics) throws Exception
{
ComponentContext componentContext = wrapContext(input, context, tag.pipelineTag(), tag);
Span span = this.observabilityManager.tracer().nextSpan().name(tag.id());
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
{
span.tag("uid", tag.uid());

span.event("initializer:run");
logger.trace("{}#{} initializing payload", tag.pipelineTag().pipeline(), tag.pipelineTag().uid());
Object payload = this.initializer.execute(input, componentContext, this.uidGenerator);
Object payload = this.initializer.execute(input, context, this.uidGenerator);
span.tag("payload_type", payload == null ? "null" : payload.getClass().getName());

return payload;
Expand All @@ -113,14 +116,14 @@ private Object runInitializer(I input, ComponentTag tag, Context context, Pipeli
metrics.setMDC(e);
span.event("initializer:error");
logger.error("{}#{} initializer {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage());
return this.initializer.handleException(e, componentContext, this.uidGenerator);
return this.initializer.handleException(e, context, this.uidGenerator);
}
finally {
span.end();
}
}

private Output runOutputFactory(I input, PipelineTag tag, Object payload, Context context)
private Output runOutputFactory(I input, PipelineTag tag, Object payload, LocalContext context)
{
Span span = this.observabilityManager.tracer().nextSpan().name("output_factory");
try (Tracer.SpanInScope scope = this.observabilityManager.tracer().withSpan(span.start()))
Expand Down Expand Up @@ -159,9 +162,4 @@ private ComponentTag createTag(PipelineTag pipelineTag, InitializerDescriptor<?>
{
return new ComponentTag(this.uidGenerator.generate(), pipelineTag, initializer.id(), ComponentFamily.INITIALIZER);
}

private ComponentContext wrapContext(I input, Context context, PipelineTag pipelineTag, ComponentTag componentTag)
{
return new ComponentContext(context, input, pipelineTag, componentTag, this.uidGenerator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
import org.slf4j.MDC;
import tech.illuin.pipeline.context.ComponentContext;
import tech.illuin.pipeline.context.Context;
import tech.illuin.pipeline.context.LocalContext;
import tech.illuin.pipeline.execution.phase.IO;
import tech.illuin.pipeline.execution.phase.PipelinePhase;
import tech.illuin.pipeline.execution.phase.PipelineStrategy;
import tech.illuin.pipeline.input.uid_generator.UIDGenerator;
import tech.illuin.pipeline.metering.PipelineSinkMetrics;
import tech.illuin.pipeline.metering.manager.ObservabilityManager;
import tech.illuin.pipeline.metering.tag.MetricTags;
import tech.illuin.pipeline.output.ComponentFamily;
import tech.illuin.pipeline.output.ComponentTag;
import tech.illuin.pipeline.output.PipelineTag;
import tech.illuin.pipeline.sink.builder.SinkDescriptor;
import tech.illuin.pipeline.sink.metering.SinkMarkerManager;
import tech.illuin.pipeline.sink.metering.SinkMetrics;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,12 +68,14 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags metricTags) th
for (SinkDescriptor descriptor : this.sinks)
{
ComponentTag tag = this.createTag(io.output().tag(), descriptor);
PipelineSinkMetrics metrics = new PipelineSinkMetrics(this.observabilityManager.meterRegistry(), tag, metricTags);
SinkMarkerManager markerManager = new SinkMarkerManager(tag, metricTags);
SinkMetrics metrics = new SinkMetrics(this.observabilityManager.meterRegistry(), markerManager);
LocalContext localContext = new ComponentContext(context, io.input(), tag, this.uidGenerator, markerManager);

if (descriptor.isAsync())
this.runSinkAsynchronously(descriptor, tag, io, context, metrics, span);
this.runSinkAsynchronously(descriptor, tag, io, localContext, metrics, span);
else
this.runSinkSynchronously(descriptor, tag, io, context, metrics);
this.runSinkSynchronously(descriptor, tag, io, localContext, metrics);
}

return PipelineStrategy.CONTINUE;
Expand All @@ -82,10 +86,8 @@ public PipelineStrategy run(IO<I> io, Context context, MetricTags metricTags) th
}

@SuppressWarnings("IllegalCatch")
private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, Context context, PipelineSinkMetrics metrics) throws Exception
private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, LocalContext context, SinkMetrics metrics) throws Exception
{
ComponentContext componentContext = wrapContext(io.input(), context, tag.pipelineTag(), tag);

long start = System.nanoTime();
metrics.setMDC();
Span span = this.observabilityManager.tracer().nextSpan().name(tag.id());
Expand All @@ -95,7 +97,7 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> i

span.event("sink:run_sync");
logger.trace("{}#{} launching sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id());
sink.execute(io.output(), componentContext);
sink.execute(io.output(), context);
metrics.successCounter().increment();
}
catch (Exception e) {
Expand All @@ -115,13 +117,11 @@ private void runSinkSynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> i
}

@SuppressWarnings("IllegalCatch")
private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, Context context, PipelineSinkMetrics metrics, Span phaseSpan)
private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I> io, LocalContext context, SinkMetrics metrics, Span phaseSpan)
{
if (this.sinkExecutor == null)
throw new IllegalStateException("An asynchronous run has been initiated but there is no active executor");

ComponentContext componentContext = wrapContext(io.input(), context, tag.pipelineTag(), tag);

logger.trace("{}#{} queuing sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id());
Map<String, String> mdc = MDC.getCopyOfContextMap();
CompletableFuture.runAsync(() -> {
Expand All @@ -135,7 +135,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I>

span.event("sink:run_async");
logger.trace("{}#{} launching sink {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id());
sink.execute(io.output(), componentContext);
sink.execute(io.output(), context);
metrics.successCounter().increment();
}
catch (Exception e) {
Expand All @@ -144,7 +144,7 @@ private void runSinkAsynchronously(SinkDescriptor sink, ComponentTag tag, IO<I>
logger.error("{}#{} sink {} threw an {}: {}", tag.pipelineTag().pipeline(), tag.pipelineTag().uid(), tag.id(), e.getClass().getName(), e.getMessage());
metrics.failureCounter().increment();
metrics.errorCounter(e).increment();
sink.handleExceptionThenSwallow(e, io.output(), componentContext);
sink.handleExceptionThenSwallow(e, io.output(), context);
}
finally {
metrics.runTimer().record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
Expand All @@ -160,11 +160,6 @@ private ComponentTag createTag(PipelineTag pipelineTag, SinkDescriptor sink)
return new ComponentTag(this.uidGenerator.generate(), pipelineTag, sink.id(), ComponentFamily.SINK);
}

private ComponentContext wrapContext(I input, Context context, PipelineTag pipelineTag, ComponentTag componentTag)
{
return new ComponentContext(context, input, pipelineTag, componentTag, this.uidGenerator);
}

private ExecutorService initExecutor(Supplier<ExecutorService> provider)
{
if (this.sinks.stream().anyMatch(SinkDescriptor::isAsync))
Expand Down
Loading