Skip to content

Commit e34cc96

Browse files
[yaml] - mongodb read normalization (#38772)
* feat(yaml): Add MongoDB read connector draft Squashed draft MongoDB read connector changes from PR #35802. * feat(yaml): Enhance MongoDB Read SchemaTransform, schema/BSON mapping, and tests - Fully implemented MongoDB read configuration and Provider with JSON schema parsing - Enhanced MongoDbUtils with a deep BSON-to-Beam row conversion supporting all primitives, arrays, maps, and nested rows - Added comprehensive Java unit tests for MongoDbUtils and MongoDbReadSchemaTransformProvider - Mapped WriteToMongoDB and ReadFromMongoDB in standard_io.yaml - Implemented end-to-end integration test verifying write/read pipeline against containerized MongoDB * test(yaml): Add error-handling coverage for MongoDB Write and Read transforms - Standardized standard_io mappings to snake_case (error_handling, batch_size) - Extended integration test to verify error-handling queues are empty for clean runs * remove old edits to other transforms from original PR * add python support * revert changes to integration tests not needed from original PR and fix lint issues * fix more lint * fix generate external transforms * pin read to python provider * comment out read part * retry readfrommongodb * fix gemini comments * address gemini * fix spotless * fix bad return type * fix flakey test * address comments and switch to uri * remove non-overlapping parameters --------- Co-authored-by: Arnav Arora <aroraarnav@google.com>
1 parent a86e611 commit e34cc96

11 files changed

Lines changed: 847 additions & 25 deletions

File tree

sdks/java/io/mongodb/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ dependencies {
3232
implementation library.java.mongodb_driver_core
3333
implementation library.java.slf4j_api
3434
implementation library.java.vendored_guava_32_1_2_jre
35+
provided library.java.everit_json_schema
36+
permitUnusedDeclared library.java.everit_json_schema
3537
testImplementation library.java.junit
3638
testImplementation project(path: ":sdks:java:io:common")
3739
testImplementation project(path: ":sdks:java:testing:test-utils")

sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/AggregationQuery.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,10 @@ public AggregationQuery withMongoDbPipeline(List<BsonDocument> mongoDbPipeline)
6666

6767
@Override
6868
public MongoCursor<Document> apply(MongoCollection<Document> collection) {
69+
List<BsonDocument> pipeline = new ArrayList<>(mongoDbPipeline());
6970
if (bucket() != null) {
70-
if (mongoDbPipeline().size() == 1) {
71-
mongoDbPipeline().add(bucket());
72-
} else {
73-
mongoDbPipeline().set(mongoDbPipeline().size() - 1, bucket());
74-
}
71+
pipeline.add(bucket());
7572
}
76-
return collection.aggregate(mongoDbPipeline()).iterator();
73+
return collection.aggregate(pipeline).iterator();
7774
}
7875
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mongodb;
19+
20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
22+
import com.google.auto.value.AutoValue;
23+
import java.io.Serializable;
24+
import org.apache.beam.sdk.schemas.AutoValueSchema;
25+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
26+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
27+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
30+
/** Configuration class for the MongoDB Read transform. */
31+
@DefaultSchema(AutoValueSchema.class)
32+
@AutoValue
33+
public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable {
34+
35+
@SchemaFieldDescription("The connection URI for the MongoDB server.")
36+
public abstract String getUri();
37+
38+
@SchemaFieldDescription("The MongoDB database to read from.")
39+
public abstract String getDatabase();
40+
41+
@SchemaFieldDescription("The MongoDB collection to read from.")
42+
public abstract String getCollection();
43+
44+
@SchemaFieldDescription(
45+
"The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).")
46+
public abstract String getSchema();
47+
48+
@SchemaFieldDescription(
49+
"An optional BSON filter to apply to the read. This should be a valid JSON string.")
50+
@Nullable
51+
public abstract String getFilter();
52+
53+
@SchemaFieldDescription(
54+
"This option specifies whether and where to output rows that failed to be read.")
55+
@Nullable
56+
public abstract ErrorHandling getErrorHandling();
57+
58+
public void validate() {
59+
checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
60+
checkArgument(
61+
getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
62+
checkArgument(
63+
getCollection() != null && !getCollection().isEmpty(),
64+
"MongoDB collection must be specified.");
65+
checkArgument(
66+
getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified.");
67+
}
68+
69+
public static Builder builder() {
70+
return new AutoValue_MongoDbReadSchemaTransformConfiguration.Builder();
71+
}
72+
73+
@AutoValue.Builder
74+
public abstract static class Builder {
75+
public abstract Builder setUri(String uri);
76+
77+
public abstract Builder setDatabase(String database);
78+
79+
public abstract Builder setCollection(String collection);
80+
81+
public abstract Builder setSchema(String schema);
82+
83+
public abstract Builder setFilter(String filter);
84+
85+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
86+
87+
public abstract MongoDbReadSchemaTransformConfiguration build();
88+
}
89+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mongodb;
19+
20+
import com.google.auto.service.AutoService;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import org.apache.beam.sdk.schemas.Schema;
24+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
25+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
26+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
27+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
28+
import org.apache.beam.sdk.schemas.utils.JsonUtils;
29+
import org.apache.beam.sdk.transforms.DoFn;
30+
import org.apache.beam.sdk.transforms.ParDo;
31+
import org.apache.beam.sdk.values.PCollection;
32+
import org.apache.beam.sdk.values.PCollectionRowTuple;
33+
import org.apache.beam.sdk.values.PCollectionTuple;
34+
import org.apache.beam.sdk.values.Row;
35+
import org.apache.beam.sdk.values.TupleTag;
36+
import org.apache.beam.sdk.values.TupleTagList;
37+
import org.bson.Document;
38+
39+
/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */
40+
@AutoService(SchemaTransformProvider.class)
41+
public class MongoDbReadSchemaTransformProvider
42+
extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> {
43+
44+
private static final String OUTPUT_TAG_NAME = "output";
45+
public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
46+
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
47+
48+
private static final org.apache.beam.sdk.metrics.Counter errorCounter =
49+
org.apache.beam.sdk.metrics.Metrics.counter(
50+
MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter");
51+
52+
@Override
53+
protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) {
54+
return new MongoDbReadSchemaTransform(configuration);
55+
}
56+
57+
@Override
58+
public String identifier() {
59+
return "beam:schematransform:org.apache.beam:mongodb_read:v1";
60+
}
61+
62+
@Override
63+
public List<String> inputCollectionNames() {
64+
return Collections.emptyList();
65+
}
66+
67+
@Override
68+
public List<String> outputCollectionNames() {
69+
return Collections.singletonList(OUTPUT_TAG_NAME);
70+
}
71+
72+
/** The {@link SchemaTransform} that performs the read operation. */
73+
private static class MongoDbReadSchemaTransform extends SchemaTransform {
74+
private final MongoDbReadSchemaTransformConfiguration configuration;
75+
76+
MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) {
77+
configuration.validate();
78+
this.configuration = configuration;
79+
}
80+
81+
@Override
82+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
83+
Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());
84+
85+
MongoDbIO.Read read =
86+
MongoDbIO.read()
87+
.withUri(configuration.getUri())
88+
.withDatabase(configuration.getDatabase())
89+
.withCollection(configuration.getCollection());
90+
91+
final String filterStr = configuration.getFilter();
92+
if (filterStr != null) {
93+
read = read.withQueryFn(FindQuery.create().withFilters(Document.parse(filterStr)));
94+
}
95+
96+
PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read);
97+
98+
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
99+
Schema errorSchema = ErrorHandling.errorSchemaBytes();
100+
101+
PCollectionTuple outputTuple =
102+
mongoDocs.apply(
103+
"ConvertToBeamRows",
104+
ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema))
105+
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
106+
107+
PCollection<Row> beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema);
108+
PCollection<Row> errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
109+
110+
PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows);
111+
ErrorHandling errorHandling = configuration.getErrorHandling();
112+
if (handleErrors && errorHandling != null) {
113+
output = output.and(errorHandling.getOutput(), errorOutput);
114+
}
115+
return output;
116+
}
117+
}
118+
119+
/** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */
120+
static class DocumentToRowFn extends DoFn<Document, Row> {
121+
private final Schema schema;
122+
private final boolean handleErrors;
123+
private final Schema errorSchema;
124+
125+
DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) {
126+
this.schema = schema;
127+
this.handleErrors = handleErrors;
128+
this.errorSchema = errorSchema;
129+
}
130+
131+
@ProcessElement
132+
public void processElement(@Element Document doc, MultiOutputReceiver receiver) {
133+
try {
134+
receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema));
135+
} catch (Exception e) {
136+
if (!handleErrors) {
137+
throw new RuntimeException(
138+
"Failed to convert BSON Document to Beam Row: " + doc.toJson(), e);
139+
}
140+
errorCounter.inc();
141+
byte[] docBytes;
142+
try {
143+
docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
144+
} catch (Exception jsonEx) {
145+
docBytes = doc.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
146+
}
147+
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e));
148+
}
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)