Add portable Mqtt source and sink transforms#32385
Conversation
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
CC: @chamikaramj You can test this out locally with the following code. @gdiazdelrio can you check this PR out and give it a shot? Let me know if something doesn't work import apache_beam as beam
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService
provider = ExternalTransformProvider(BeamJarExpansionService("sdks:java:io:expansion-service:shadowJar"))
MqttRead = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_read:v1")
MqttWrite = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_write:v1")
with beam.Pipeline() as p:
connection_configuration = {
"server_uri": "tcp://localhost:58494",
"topic": "WRITE_TOPIC",
"client_id": "READ_PIPELINE"
}
# read
p | MqttRead(connection_configuration, max_read_time_seconds=10) | beam.Map(print)
# write
# p | beam.Create([beam.Row(bytes=bytes([1, 2, 3, 4, 5]))]) | MqttWrite(
# connection_configuration=connection_configuration) |
|
Hi, what is the status of this PR? @ahmedabu98 @chamikaramj |
@ahmedabu98 I’ve only worked with the Java SDK so far, so this will be my first time dealing with portable development. It might take me some time, but I’ll do my best to work through it. If I have any questions along the way, would it be alright to leave a comment here on the PR? |
Of course! You'd be doing me a favor :) Let me know how I can help To start, you can git checkout this PR and first run |
@ahmedabu98 I'll add feature for
and also Should I start by creating my own branch and cherry-picking your commit? |
|
@ahmedabu98 @twosom Any update for this PR? Are we ready to get it merged? |
|
@shunping Thanks for reply. I've created a SchemaTransformProvider and tested it in Python. It works well in the Batch scenario, but it doesn't work in the Streaming scenario. While trying to find the cause, I switched to another task. I'll continue working on this ^_^ |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Reminder, please take a look at this pr: @robertwb @damondouglas |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
Reminder, please take a look at this pr: @Abacn @johnjcasey |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
|
@ahmedabu98 LGTM pending conflicts resolution |
|
Reminder, please take a look at this pr: @kennknowles @damondouglas |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
|
Reminder, please take a look at this pr: @robertwb @chamikaramj |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
|
|
Reminder, please take a look at this pr: @damondouglas @damondouglas |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
Reminder, please take a look at this pr: @Abacn @johnjcasey |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
|
waiting on author |
|
Hey @ahmedabu98 is this still being worked on? |
|
waiting on author |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
|
I have a use case for a Mqtt streaming source in Python, I would pick this up in the future @Abacn if you think it makes sense. |
|
Yeah it's a valuable addition. The original plan was to first onboard SchemaTransform for MqttIO (this PR) then add it to
to generate xlang wrapper when Python SDK is built. |
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the SchemaTransforms are picked up by ExpansionService via @autoservice. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the SchemaTransforms are picked up by ExpansionService via @autoservice. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the SchemaTransforms are picked up by ExpansionService via @autoservice. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
…ing (revives #32385) (#38493) * [mqtt] Add SchemaTransform providers for MqttIO Read/Write Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR #32385 (ahmedabu98, twosom) and adapts it to the post-#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>). * [mqtt] Add messaging expansion service and wire MqttIO into Python xlang Adds a new :sdks:java:io:messaging-expansion-service module that serves messaging IOs (MQTT for now, with room for JMS/Solace later) instead of adding MqttIO to the shared :sdks:java:io:expansion-service, per review feedback from @Abacn and @chamikaramj. Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt), skipping the core SchemaTransforms it bundles transitively (those are generated from the Java IO expansion service). Regenerates standard_external_transforms.yaml so the generated Python wrappers are served by the messaging expansion service, and registers the new target in the generateExternalTransformsConfig task and the xlang wrapper-validation list. The CHANGES.md announcement is deferred to the follow-up PR that sets up the Xlang Messaging PostCommit, per review feedback. * [expansion-service] Remove obsolete upToDateWhen workaround outputs.upToDateWhen { false } in the shadowJar block was a workaround for a corrupted gradle cache and is no longer needed (review feedback on PR #38493).
Fixes #21060