Skip to content

Commit 8a14afb

Browse files
committed
[mqtt] Add SchemaTransform providers for MqttIO Read/Write
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR #32385 (ahmedabu98, twosom) and adapts it to the post-#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
1 parent be50185 commit 8a14afb

4 files changed

Lines changed: 541 additions & 0 deletions

File tree

sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
import org.apache.beam.sdk.coders.SerializableCoder;
3838
import org.apache.beam.sdk.io.UnboundedSource;
3939
import org.apache.beam.sdk.options.PipelineOptions;
40+
import org.apache.beam.sdk.schemas.AutoValueSchema;
4041
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
42+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
43+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
4144
import org.apache.beam.sdk.transforms.DoFn;
4245
import org.apache.beam.sdk.transforms.PTransform;
4346
import org.apache.beam.sdk.transforms.ParDo;
@@ -205,13 +208,17 @@ public static <InputT> Write<InputT> dynamicWrite() {
205208
private MqttIO() {}
206209

207210
/** A POJO describing a MQTT connection. */
211+
@DefaultSchema(AutoValueSchema.class)
208212
@AutoValue
209213
public abstract static class ConnectionConfiguration implements Serializable {
210214

215+
@SchemaFieldDescription("The MQTT broker URI.")
211216
abstract String getServerUri();
212217

218+
@SchemaFieldDescription("The MQTT topic pattern.")
213219
abstract @Nullable String getTopic();
214220

221+
@SchemaFieldDescription("The client ID prefix, which is used to construct a unique client ID.")
215222
abstract @Nullable String getClientId();
216223

217224
abstract @Nullable String getUsername();
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mqtt;
19+
20+
import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
21+
import static org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration;
22+
23+
import com.google.auto.service.AutoService;
24+
import com.google.auto.value.AutoValue;
25+
import java.io.Serializable;
26+
import javax.annotation.Nullable;
27+
import org.apache.beam.sdk.schemas.AutoValueSchema;
28+
import org.apache.beam.sdk.schemas.Schema;
29+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
30+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
31+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
32+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
33+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.ParDo;
36+
import org.apache.beam.sdk.values.PCollection;
37+
import org.apache.beam.sdk.values.PCollectionRowTuple;
38+
import org.apache.beam.sdk.values.Row;
39+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
40+
import org.joda.time.Duration;
41+
42+
@AutoService(SchemaTransformProvider.class)
43+
public class MqttReadSchemaTransformProvider
44+
extends TypedSchemaTransformProvider<ReadConfiguration> {
45+
@DefaultSchema(AutoValueSchema.class)
46+
@AutoValue
47+
public abstract static class ReadConfiguration implements Serializable {
48+
public static Builder builder() {
49+
return new AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder();
50+
}
51+
52+
@SchemaFieldDescription("Configuration options to set up the MQTT connection.")
53+
public abstract ConnectionConfiguration getConnectionConfiguration();
54+
55+
@SchemaFieldDescription(
56+
"The max number of records to receive. Setting this will result in a bounded PCollection.")
57+
@Nullable
58+
public abstract Long getMaxNumRecords();
59+
60+
@SchemaFieldDescription(
61+
"The maximum time for this source to read messages. Setting this will result in a bounded PCollection.")
62+
@Nullable
63+
public abstract Long getMaxReadTimeSeconds();
64+
65+
@AutoValue.Builder
66+
public abstract static class Builder {
67+
public abstract Builder setConnectionConfiguration(
68+
ConnectionConfiguration connectionConfiguration);
69+
70+
public abstract Builder setMaxNumRecords(Long maxNumRecords);
71+
72+
public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds);
73+
74+
public abstract ReadConfiguration build();
75+
}
76+
}
77+
78+
@Override
79+
public String identifier() {
80+
return "beam:schematransform:org.apache.beam:mqtt_read:v1";
81+
}
82+
83+
@Override
84+
public String description() {
85+
return "Reads messages from an MQTT broker and outputs each payload as a single `bytes` "
86+
+ "field.\n"
87+
+ "\n"
88+
+ "By default the read is unbounded (streaming): it keeps consuming messages from the "
89+
+ "subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or "
90+
+ "`maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection.\n"
91+
+ "\n"
92+
+ "Note: streaming reads require a runner that supports portable streaming (e.g. Prism, "
93+
+ "Flink, or Dataflow). The legacy local Python DirectRunner does not execute portable "
94+
+ "streaming cross-language reads.";
95+
}
96+
97+
@Override
98+
protected SchemaTransform from(ReadConfiguration configuration) {
99+
return new MqttReadSchemaTransform(configuration);
100+
}
101+
102+
private static class MqttReadSchemaTransform extends SchemaTransform {
103+
private final ReadConfiguration config;
104+
105+
MqttReadSchemaTransform(ReadConfiguration configuration) {
106+
this.config = configuration;
107+
}
108+
109+
@Override
110+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
111+
Preconditions.checkState(
112+
input.getAll().isEmpty(),
113+
"Expected zero input PCollections for this source, but found: %s",
114+
input.getAll().keySet());
115+
116+
MqttIO.Read<byte[]> readTransform =
117+
MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration());
118+
119+
Long maxRecords = config.getMaxNumRecords();
120+
Long maxReadTime = config.getMaxReadTimeSeconds();
121+
if (maxRecords != null) {
122+
readTransform = readTransform.withMaxNumRecords(maxRecords);
123+
}
124+
if (maxReadTime != null) {
125+
readTransform = readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime));
126+
}
127+
128+
Schema outputSchema = Schema.builder().addByteArrayField("bytes").build();
129+
130+
PCollection<Row> outputRows =
131+
input
132+
.getPipeline()
133+
.apply(readTransform)
134+
.apply(
135+
"Wrap in Beam Rows",
136+
ParDo.of(
137+
new DoFn<byte[], Row>() {
138+
@ProcessElement
139+
public void processElement(
140+
@Element byte[] data, OutputReceiver<Row> outputReceiver) {
141+
outputReceiver.output(
142+
Row.withSchema(outputSchema).addValue(data).build());
143+
}
144+
}))
145+
.setRowSchema(outputSchema);
146+
147+
return PCollectionRowTuple.of("output", outputRows);
148+
}
149+
}
150+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.mqtt;
19+
20+
import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
21+
import static org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration;
22+
23+
import com.google.auto.service.AutoService;
24+
import com.google.auto.value.AutoValue;
25+
import java.io.Serializable;
26+
import javax.annotation.Nullable;
27+
import org.apache.beam.sdk.schemas.AutoValueSchema;
28+
import org.apache.beam.sdk.schemas.Schema;
29+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
30+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
31+
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
32+
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
33+
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.ParDo;
36+
import org.apache.beam.sdk.values.PCollection;
37+
import org.apache.beam.sdk.values.PCollectionRowTuple;
38+
import org.apache.beam.sdk.values.Row;
39+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
40+
41+
@AutoService(SchemaTransformProvider.class)
42+
public class MqttWriteSchemaTransformProvider
43+
extends TypedSchemaTransformProvider<WriteConfiguration> {
44+
@DefaultSchema(AutoValueSchema.class)
45+
@AutoValue
46+
public abstract static class WriteConfiguration implements Serializable {
47+
public static Builder builder() {
48+
return new AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder();
49+
}
50+
51+
@SchemaFieldDescription("Configuration options to set up the MQTT connection.")
52+
public abstract ConnectionConfiguration getConnectionConfiguration();
53+
54+
@SchemaFieldDescription(
55+
"Whether or not the publish message should be retained by the messaging engine. "
56+
+ "When a subscriber connects, it gets the latest retained message. "
57+
+ "Defaults to `False`, which will clear the retained message from the server.")
58+
@Nullable
59+
public abstract Boolean getRetained();
60+
61+
@AutoValue.Builder
62+
public abstract static class Builder {
63+
public abstract Builder setConnectionConfiguration(
64+
ConnectionConfiguration connectionConfiguration);
65+
66+
public abstract Builder setRetained(Boolean retained);
67+
68+
public abstract WriteConfiguration build();
69+
}
70+
}
71+
72+
@Override
73+
public String identifier() {
74+
return "beam:schematransform:org.apache.beam:mqtt_write:v1";
75+
}
76+
77+
@Override
78+
public String description() {
79+
return "Publishes messages to an MQTT broker. Expects an input PCollection of rows with a "
80+
+ "single `bytes` field, each of which is published as one MQTT message.\n"
81+
+ "\n"
82+
+ "Works with both bounded (batch) and unbounded (streaming) input PCollections.";
83+
}
84+
85+
@Override
86+
protected SchemaTransform from(WriteConfiguration configuration) {
87+
return new MqttWriteSchemaTransform(configuration);
88+
}
89+
90+
private static class MqttWriteSchemaTransform extends SchemaTransform {
91+
private final WriteConfiguration config;
92+
93+
MqttWriteSchemaTransform(WriteConfiguration configuration) {
94+
this.config = configuration;
95+
}
96+
97+
@Override
98+
public PCollectionRowTuple expand(PCollectionRowTuple input) {
99+
PCollection<Row> inputRows = input.getSinglePCollection();
100+
101+
Preconditions.checkState(
102+
inputRows.getSchema().getFieldCount() == 1
103+
&& inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES),
104+
"Expected only one Schema field containing bytes, but instead received: %s",
105+
inputRows.getSchema());
106+
107+
MqttIO.Write<byte[]> writeTransform =
108+
MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration());
109+
Boolean retained = config.getRetained();
110+
if (retained != null) {
111+
writeTransform = writeTransform.withRetained(retained);
112+
}
113+
114+
inputRows
115+
.apply(
116+
"Extract bytes",
117+
ParDo.of(
118+
new DoFn<Row, byte[]>() {
119+
@ProcessElement
120+
public void processElement(
121+
@Element Row row, OutputReceiver<byte[]> outputReceiver) {
122+
outputReceiver.output(
123+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
124+
row.getBytes(0)));
125+
}
126+
}))
127+
.apply(writeTransform);
128+
129+
return PCollectionRowTuple.empty(inputRows.getPipeline());
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)