diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1a193ec006e0..dc30d82adcf4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.checkerframework.checker.nullness.qual.Nullable; @@ -122,16 +123,24 @@ private static Lineage createLineage(PipelineOptions options, LineageDirection d /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - return checkNotNull( - sources, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + Lineage localSources = sources; + if (localSources == null) { + return createDefaultLineage(LineageDirection.SOURCE); + } + return localSources; } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - return checkNotNull( - sinks, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + Lineage localSinks = sinks; + if (localSinks == null) { + return createDefaultLineage(LineageDirection.SINK); + } + return localSinks; + } + + private static Lineage createDefaultLineage(LineageDirection direction) { + return createLineage(PipelineOptionsFactory.create(), direction); } @VisibleForTesting