Skip to content

Commit 2204a9d

Browse files
committed
[mqtt] Add messaging expansion service and wire MqttIO into Python xlang
Adds a new :sdks:java:io:messaging-expansion-service module that serves messaging IOs (MQTT for now, with room for JMS/Solace later) instead of adding MqttIO to the shared :sdks:java:io:expansion-service, per review feedback from @Abacn and @chamikaramj. Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt), skipping the core SchemaTransforms it bundles transitively (those are generated from the Java IO expansion service). Regenerates standard_external_transforms.yaml so the generated Python wrappers are served by the messaging expansion service, and registers the new target in the generateExternalTransformsConfig task and the xlang wrapper-validation list. Adds an I/Os entry in CHANGES.md for 2.75.0 covering batch and streaming.
1 parent 8a14afb commit 2204a9d

7 files changed

Lines changed: 129 additions & 2 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
## I/Os
6767

6868
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
69+
* MqttIO: portable `ReadFromMqtt` / `WriteToMqtt` transforms now available in Python via cross-language (`apache_beam.io`), for both batch and streaming ([#32385](https://github.com/apache/beam/issues/32385)).
6970

7071
## New Features / Improvements
7172

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
apply plugin: 'org.apache.beam.module'
20+
apply plugin: 'application'
21+
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
22+
23+
applyJavaNature(
24+
automaticModuleName: 'org.apache.beam.sdk.io.messaging.expansion.service',
25+
exportJavadoc: false,
26+
validateShadowJar: false,
27+
shadowClosure: {},
28+
)
29+
30+
shadowJar {
31+
manifest {
32+
attributes(["Multi-Release": true])
33+
}
34+
mergeServiceFiles()
35+
outputs.upToDateWhen { false }
36+
}
37+
38+
description = "Apache Beam :: SDKs :: Java :: IO :: Messaging Expansion Service"
39+
ext.summary = "Expansion service serving messaging IOs (e.g. MQTT)"
40+
41+
dependencies {
42+
implementation project(":sdks:java:expansion-service")
43+
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
44+
implementation project(":sdks:java:io:mqtt")
45+
permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761
46+
runtimeOnly library.java.slf4j_jdk14
47+
}
48+
49+
task runExpansionService (type: JavaExec) {
50+
mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService"
51+
classpath = sourceSets.test.runtimeClasspath
52+
args = [project.findProperty("constructionService.port") ?: "8097"]
53+
}

sdks/python/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tasks.register("generateExternalTransformsConfig") {
7373
// Need to build all expansion services listed in sdks/standard_expansion_services.yaml
7474
dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
7575
dependsOn ":sdks:java:io:expansion-service:build"
76+
dependsOn ":sdks:java:io:messaging-expansion-service:build"
7677
// Keep this in-sync with pyproject.toml
7778
def PyYaml = "'pyyaml>=3.12,<7.0.0'"
7879

sdks/python/test-suites/xlang/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ project.evaluationDependsOn(":sdks:python")
2525
// relevant fields as done here, then add it to `xlangTasks`.
2626
def gcpExpansionPath = project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath()
2727
def ioExpansionPath = project.project(':sdks:java:io:expansion-service').getPath()
28+
def messagingExpansionPath = project.project(':sdks:java:io:messaging-expansion-service').getPath()
2829
// Properties that are common across runners.
2930
// Used to launch the expansion service, collect the right tests, and cleanup afterwards
3031
def gcpXlang = new CrossLanguageTask().tap {
@@ -42,7 +43,7 @@ def ioXlang = new CrossLanguageTask().tap {
4243
}
4344

4445
// This list should include all expansion service targets in sdks/python/standard_expansion_services.yaml
45-
def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath]
46+
def servicesToGenerateFrom = [ioExpansionPath, messagingExpansionPath, gcpExpansionPath]
4647
def xlangWrapperValidation = new CrossLanguageTask().tap {
4748
name = "xlangWrapperValidation"
4849
expansionProjectPaths = servicesToGenerateFrom

sdks/standard_expansion_services.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@
5353
- 'beam:schematransform:org.apache.beam:iceberg_read:v1'
5454
- 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
5555

56+
- gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar'
57+
destinations:
58+
python: 'apache_beam/io'
59+
transforms:
60+
'beam:schematransform:org.apache.beam:mqtt_write:v1':
61+
name: 'WriteToMqtt'
62+
'beam:schematransform:org.apache.beam:mqtt_read:v1':
63+
name: 'ReadFromMqtt'
64+
skip_transforms:
65+
# Core SchemaTransforms bundled via :sdks:java:expansion-service; already
66+
# generated from the Java IO expansion service above.
67+
- 'beam:schematransform:org.apache.beam:generate_sequence:v1'
68+
- 'beam:schematransform:org.apache.beam:tfrecord_read:v1'
69+
- 'beam:schematransform:org.apache.beam:tfrecord_write:v1'
70+
5671
# TODO(ahmedabu98): Enable this service in a future PR
5772
#- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
5873
# destinations:

sdks/standard_external_transforms.yaml

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# configuration in /sdks/standard_expansion_services.yaml.
2020
# Refer to gen_xlang_wrappers.py for more info.
2121
#
22-
# Last updated on: 2026-05-06
22+
# Last updated on: 2026-06-11
2323

2424
- default_service: sdks:java:io:expansion-service:shadowJar
2525
description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
@@ -145,3 +145,58 @@
145145
type: str
146146
identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1
147147
name: TfrecordWrite
148+
- default_service: sdks:java:io:messaging-expansion-service:shadowJar
149+
description: 'Reads messages from an MQTT broker and outputs each payload as a single
150+
`bytes` field.
151+
152+
153+
By default the read is unbounded (streaming): it keeps consuming messages from
154+
the subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or
155+
`maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection.
156+
157+
158+
Note: streaming reads require a runner that supports portable streaming (e.g.
159+
Prism, Flink, or Dataflow). The legacy local Python DirectRunner does not execute
160+
portable streaming cross-language reads.'
161+
destinations:
162+
python: apache_beam/io
163+
fields:
164+
- description: Configuration options to set up the MQTT connection.
165+
name: connection_configuration
166+
nullable: false
167+
type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=<class
168+
'str'>, topic=typing.Optional[str], username=typing.Optional[str])
169+
- description: The max number of records to receive. Setting this will result in
170+
a bounded PCollection.
171+
name: max_num_records
172+
nullable: true
173+
type: int64
174+
- description: The maximum time for this source to read messages. Setting this will
175+
result in a bounded PCollection.
176+
name: max_read_time_seconds
177+
nullable: true
178+
type: int64
179+
identifier: beam:schematransform:org.apache.beam:mqtt_read:v1
180+
name: ReadFromMqtt
181+
- default_service: sdks:java:io:messaging-expansion-service:shadowJar
182+
description: 'Publishes messages to an MQTT broker. Expects an input PCollection
183+
of rows with a single `bytes` field, each of which is published as one MQTT message.
184+
185+
186+
Works with both bounded (batch) and unbounded (streaming) input PCollections.'
187+
destinations:
188+
python: apache_beam/io
189+
fields:
190+
- description: Configuration options to set up the MQTT connection.
191+
name: connection_configuration
192+
nullable: false
193+
type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=<class
194+
'str'>, topic=typing.Optional[str], username=typing.Optional[str])
195+
- description: Whether or not the publish message should be retained by the messaging
196+
engine. When a subscriber connects, it gets the latest retained message. Defaults
197+
to `False`, which will clear the retained message from the server.
198+
name: retained
199+
nullable: true
200+
type: boolean
201+
identifier: beam:schematransform:org.apache.beam:mqtt_write:v1
202+
name: WriteToMqtt

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-8")
236236
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-9")
237237
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
238238
include(":sdks:java:io:expansion-service")
239+
include(":sdks:java:io:messaging-expansion-service")
239240
include(":sdks:java:io:file-based-io-tests")
240241
include(":sdks:java:io:bigquery-io-perf-tests")
241242
include(":sdks:java:io:cdap")

0 commit comments

Comments
 (0)