Skip to content

Commit 0a415ec

Browse files
committed
[mqtt] Wire MqttIO into Python xlang wrapper generation
Adds name overrides for the new MqttIO SchemaTransforms in standard_expansion_services.yaml so the generated Python wrappers follow the kafka-style naming (ReadFromMqtt / WriteToMqtt) and become available under apache_beam.io. Regenerates standard_external_transforms.yaml via :sdks:python:generateExternalTransformsConfig — the file now includes the mqtt_read:v1 and mqtt_write:v1 entries with their inferred Row schema for ConnectionConfiguration and the transform descriptions. Adds an I/Os entry in CHANGES.md for the 2.75.0 release covering both batch and streaming.
1 parent 0014f84 commit 0a415ec

3 files changed

Lines changed: 61 additions & 1 deletion

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
## I/Os
6767

6868
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
69+
* MqttIO: portable `ReadFromMqtt` / `WriteToMqtt` transforms now available in Python via cross-language (`apache_beam.io`), for both batch and streaming ([#32385](https://github.com/apache/beam/issues/32385)).
6970

7071
## New Features / Improvements
7172

sdks/standard_expansion_services.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444
name: 'WriteToKafka'
4545
'beam:schematransform:org.apache.beam:kafka_read:v1':
4646
name: 'ReadFromKafka'
47+
'beam:schematransform:org.apache.beam:mqtt_write:v1':
48+
name: 'WriteToMqtt'
49+
'beam:schematransform:org.apache.beam:mqtt_read:v1':
50+
name: 'ReadFromMqtt'
4751
skip_transforms:
4852
# Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py
4953
- 'beam:schematransform:org.apache.beam:kafka_write:v1'

sdks/standard_external_transforms.yaml

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# configuration in /sdks/standard_expansion_services.yaml.
2020
# Refer to gen_xlang_wrappers.py for more info.
2121
#
22-
# Last updated on: 2026-05-06
22+
# Last updated on: 2026-06-10
2323

2424
- default_service: sdks:java:io:expansion-service:shadowJar
2525
description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
@@ -80,6 +80,61 @@
8080
type: str
8181
identifier: beam:schematransform:org.apache.beam:mongodb_write:v1
8282
name: MongodbWrite
83+
- default_service: sdks:java:io:expansion-service:shadowJar
84+
description: 'Reads messages from an MQTT broker and outputs each payload as a single
85+
`bytes` field.
86+
87+
88+
By default the read is unbounded (streaming): it keeps consuming messages from
89+
the subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or
90+
`maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection.
91+
92+
93+
Note: streaming reads require a runner that supports portable streaming (e.g.
94+
Prism, Flink, or Dataflow). The legacy local Python DirectRunner does not execute
95+
portable streaming cross-language reads.'
96+
destinations:
97+
python: apache_beam/io
98+
fields:
99+
- description: Configuration options to set up the MQTT connection.
100+
name: connection_configuration
101+
nullable: false
102+
type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=<class
103+
'str'>, topic=typing.Optional[str], username=typing.Optional[str])
104+
- description: The max number of records to receive. Setting this will result in
105+
a bounded PCollection.
106+
name: max_num_records
107+
nullable: true
108+
type: int64
109+
- description: The maximum time for this source to read messages. Setting this will
110+
result in a bounded PCollection.
111+
name: max_read_time_seconds
112+
nullable: true
113+
type: int64
114+
identifier: beam:schematransform:org.apache.beam:mqtt_read:v1
115+
name: ReadFromMqtt
116+
- default_service: sdks:java:io:expansion-service:shadowJar
117+
description: 'Publishes messages to an MQTT broker. Expects an input PCollection
118+
of rows with a single `bytes` field, each of which is published as one MQTT message.
119+
120+
121+
Works with both bounded (batch) and unbounded (streaming) input PCollections.'
122+
destinations:
123+
python: apache_beam/io
124+
fields:
125+
- description: Configuration options to set up the MQTT connection.
126+
name: connection_configuration
127+
nullable: false
128+
type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=<class
129+
'str'>, topic=typing.Optional[str], username=typing.Optional[str])
130+
- description: Whether or not the publish message should be retained by the messaging
131+
engine. When a subscriber connects, it gets the latest retained message. Defaults
132+
to `False`, which will clear the retained message from the server.
133+
name: retained
134+
nullable: true
135+
type: boolean
136+
identifier: beam:schematransform:org.apache.beam:mqtt_write:v1
137+
name: WriteToMqtt
83138
- default_service: sdks:java:io:expansion-service:shadowJar
84139
description: ''
85140
destinations:

0 commit comments

Comments
 (0)