Skip to content

Commit ca4b3c2

Browse files
committed
Report source lineage from HadoopFormatIO
1 parent 8801afd commit ca4b3c2

1 file changed

Lines changed: 42 additions & 0 deletions

File tree

  • sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format

sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
import org.apache.beam.sdk.coders.CoderRegistry;
5353
import org.apache.beam.sdk.coders.KvCoder;
5454
import org.apache.beam.sdk.io.BoundedSource;
55+
import org.apache.beam.sdk.io.FileSystem;
56+
import org.apache.beam.sdk.io.FileSystems;
57+
import org.apache.beam.sdk.io.fs.ResourceId;
5558
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
5659
import org.apache.beam.sdk.io.hadoop.WritableCoder;
5760
import org.apache.beam.sdk.options.PipelineOptions;
@@ -83,6 +86,7 @@
8386
import org.apache.hadoop.io.ObjectWritable;
8487
import org.apache.hadoop.io.Writable;
8588
import org.apache.hadoop.mapred.FileAlreadyExistsException;
89+
import org.apache.hadoop.mapred.FileSplit;
8690
import org.apache.hadoop.mapreduce.InputFormat;
8791
import org.apache.hadoop.mapreduce.InputSplit;
8892
import org.apache.hadoop.mapreduce.Job;
@@ -462,6 +466,7 @@ public Read<K, V> withConfiguration(Configuration configuration) {
462466
if (getValueTranslationFunction() == null) {
463467
builder.setValueTypeDescriptor((TypeDescriptor<V>) inputFormatValueClass);
464468
}
469+
465470
return builder.build();
466471
}
467472

@@ -724,7 +729,11 @@ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, Pipeline
724729
LOG.info("Not splitting source {} because source is already split.", this);
725730
return ImmutableList.of(this);
726731
}
732+
727733
computeSplitsIfNecessary();
734+
735+
reportSourceLineage(inputSplits);
736+
728737
LOG.info(
729738
"Generated {} splits. Size of first split is {} ",
730739
inputSplits.size(),
@@ -744,6 +753,39 @@ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, Pipeline
744753
.collect(Collectors.toList());
745754
}
746755

756+
private void reportSourceLineage(final List<SerializableSplit> inputSplits) {
757+
List<ResourceId> fileResources = new ArrayList<>();
758+
759+
for (SerializableSplit split : inputSplits) {
760+
InputSplit inputSplit = split.getSplit();
761+
762+
if (inputSplit instanceof FileSplit) {
763+
String pathString = ((FileSplit) inputSplit).getPath().toString();
764+
ResourceId resourceId = FileSystems.matchNewResource(pathString, false);
765+
fileResources.add(resourceId);
766+
}
767+
}
768+
769+
if (fileResources.size() <= 100) {
770+
for (ResourceId resource : fileResources) {
771+
FileSystems.reportSourceLineage(resource);
772+
}
773+
} else {
774+
HashSet<ResourceId> uniqueDirs = new HashSet<>();
775+
for (ResourceId resource : fileResources) {
776+
ResourceId dir = resource.getCurrentDirectory();
777+
uniqueDirs.add(dir);
778+
if (uniqueDirs.size() > 100) {
779+
FileSystems.reportSourceLineage(dir, FileSystem.LineageLevel.TOP_LEVEL);
780+
return;
781+
}
782+
}
783+
for (ResourceId dir : uniqueDirs) {
784+
FileSystems.reportSourceLineage(dir);
785+
}
786+
}
787+
}
788+
747789
@Override
748790
public long getEstimatedSizeBytes(PipelineOptions po) throws Exception {
749791
if (inputSplit == null) {

0 commit comments

Comments
 (0)