Skip to content

Add portable Mqtt source and sink transforms#32385

Closed
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:mqtt_external
Closed

Add portable Mqtt source and sink transforms#32385
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:mqtt_external

Conversation

@ahmedabu98

Copy link
Copy Markdown
Contributor

Fixes #21060

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ahmedabu98

Copy link
Copy Markdown
Contributor Author

CC: @chamikaramj

You can test this out locally with the following code. @gdiazdelrio can you check this PR out and give it a shot? Let me know if something doesn't work

import apache_beam as beam
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService

provider = ExternalTransformProvider(BeamJarExpansionService("sdks:java:io:expansion-service:shadowJar"))
MqttRead = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_read:v1")
MqttWrite = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_write:v1")

with beam.Pipeline() as p:
  connection_configuration = {
    "server_uri": "tcp://localhost:58494",
    "topic": "WRITE_TOPIC",
    "client_id": "READ_PIPELINE"
  }

  # read
  p | MqttRead(connection_configuration, max_read_time_seconds=10) | beam.Map(print)

  # write
  # p | beam.Create([beam.Row(bytes=bytes([1, 2, 3, 4, 5]))]) | MqttWrite(
  #       connection_configuration=connection_configuration)

@Abacn

Abacn commented Oct 2, 2024

Copy link
Copy Markdown
Contributor

Hi, what is the status of this PR? @ahmedabu98 @chamikaramj

@ahmedabu98

Copy link
Copy Markdown
Contributor Author

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@twosom

twosom commented Oct 10, 2024

Copy link
Copy Markdown
Contributor

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@ahmedabu98
Thank you for the opportunity. May I give it a try?

I’ve only worked with the Java SDK so far, so this will be my first time dealing with portable development. It might take me some time, but I’ll do my best to work through it.

If I have any questions along the way, would it be alright to leave a comment here on the PR?

@ahmedabu98

Copy link
Copy Markdown
Contributor Author

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@twosom

twosom commented Oct 13, 2024

Copy link
Copy Markdown
Contributor

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@ahmedabu98
Thanks!

I'll add feature for

and also read and write

Should I start by creating my own branch and cherry-picking your commit?

twosom pushed a commit to twosom/beam that referenced this pull request Oct 13, 2024
@shunping

Copy link
Copy Markdown
Collaborator

@ahmedabu98 @twosom Any update for this PR? Are we ready to get it merged?

@twosom

twosom commented Nov 11, 2024

Copy link
Copy Markdown
Contributor

@shunping
Hi!

Thanks for reply.

I've created a SchemaTransformProvider and tested it in Python.

It works well in the Batch scenario, but it doesn't work in the Streaming scenario.

While trying to find the cause, I switched to another task.

I'll continue working on this ^_^

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions

github-actions Bot commented Jan 7, 2025

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @robertwb @damondouglas

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @Abacn @johnjcasey

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@damondouglas

Copy link
Copy Markdown
Contributor

@ahmedabu98 LGTM pending conflicts resolution

@github-actions

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @kennknowles @damondouglas

@github-actions

github-actions Bot commented Feb 4, 2025

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @robertwb @chamikaramj

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @damondouglas for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @damondouglas @damondouglas

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions

github-actions Bot commented Mar 7, 2025

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @Abacn @johnjcasey

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@Abacn

Abacn commented Mar 11, 2025

Copy link
Copy Markdown
Contributor

waiting on author

@claudevdm

Copy link
Copy Markdown
Collaborator

Hey @ahmedabu98 is this still being worked on?

@derrickaw

Copy link
Copy Markdown
Collaborator

waiting on author

@github-actions

github-actions Bot commented Oct 6, 2025

Copy link
Copy Markdown
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Oct 6, 2025
@github-actions

Copy link
Copy Markdown
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Oct 14, 2025
@tkaymak

tkaymak commented May 9, 2026

Copy link
Copy Markdown
Contributor

I have a use case for a Mqtt streaming source in Python, I would pick this up in the future @Abacn if you think it makes sense.

@Abacn

Abacn commented May 12, 2026

Copy link
Copy Markdown
Contributor

Yeah it's a valuable addition. The original plan was to first onboard SchemaTransform for MqttIO (this PR) then add it to

to generate xlang wrapper when Python SDK is built.

tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 10, 2026
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the
SchemaTransforms are picked up by ExpansionService via @autoservice.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 10, 2026
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the
SchemaTransforms are picked up by ExpansionService via @autoservice.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 10, 2026
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the
SchemaTransforms are picked up by ExpansionService via @autoservice.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 11, 2026
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 12, 2026
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Abacn pushed a commit that referenced this pull request Jun 13, 2026
…ing (revives #32385) (#38493)

* [mqtt] Add SchemaTransform providers for MqttIO Read/Write

Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR #32385 (ahmedabu98, twosom) and adapts
it to the post-#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).

* [mqtt] Add messaging expansion service and wire MqttIO into Python xlang

Adds a new :sdks:java:io:messaging-expansion-service module that serves
messaging IOs (MQTT for now, with room for JMS/Solace later) instead of
adding MqttIO to the shared :sdks:java:io:expansion-service, per review
feedback from @Abacn and @chamikaramj.

Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml
under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt),
skipping the core SchemaTransforms it bundles transitively (those are
generated from the Java IO expansion service). Regenerates
standard_external_transforms.yaml so the generated Python wrappers are
served by the messaging expansion service, and registers the new target in
the generateExternalTransformsConfig task and the xlang wrapper-validation
list.

The CHANGES.md announcement is deferred to the follow-up PR that sets up
the Xlang Messaging PostCommit, per review feedback.

* [expansion-service] Remove obsolete upToDateWhen workaround

outputs.upToDateWhen { false } in the shadowJar block was a workaround for
a corrupted gradle cache and is no longer needed (review feedback on
PR #38493).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MQTT IO connector for Python

8 participants