Skip to content

Add portable Mqtt source and sink transforms#32385

Closed
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:mqtt_external
Closed

Add portable Mqtt source and sink transforms#32385
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:mqtt_external

Conversation

@ahmedabu98
Copy link
Copy Markdown
Contributor

Fixes #21060

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ahmedabu98
Copy link
Copy Markdown
Contributor Author

CC: @chamikaramj

You can test this out locally with the following code. @gdiazdelrio can you check this PR out and give it a shot? Let me know if something doesn't work

import apache_beam as beam
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService

provider = ExternalTransformProvider(BeamJarExpansionService("sdks:java:io:expansion-service:shadowJar"))
MqttRead = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_read:v1")
MqttWrite = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_write:v1")

with beam.Pipeline() as p:
  connection_configuration = {
    "server_uri": "tcp://localhost:58494",
    "topic": "WRITE_TOPIC",
    "client_id": "READ_PIPELINE"
  }

  # read
  p | MqttRead(connection_configuration, max_read_time_seconds=10) | beam.Map(print)

  # write
  # p | beam.Create([beam.Row(bytes=bytes([1, 2, 3, 4, 5]))]) | MqttWrite(
  #       connection_configuration=connection_configuration)

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Oct 2, 2024

Hi, what is the status of this PR? @ahmedabu98 @chamikaramj

@ahmedabu98
Copy link
Copy Markdown
Contributor Author

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@twosom
Copy link
Copy Markdown
Contributor

twosom commented Oct 10, 2024

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@ahmedabu98
Thank you for the opportunity. May I give it a try?

I’ve only worked with the Java SDK so far, so this will be my first time dealing with portable development. It might take me some time, but I’ll do my best to work through it.

If I have any questions along the way, would it be alright to leave a comment here on the PR?

@ahmedabu98
Copy link
Copy Markdown
Contributor Author

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@twosom
Copy link
Copy Markdown
Contributor

twosom commented Oct 13, 2024

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@ahmedabu98
Thanks!

I'll add feature for

and also read and write

Should I start by creating my own branch and cherry-picking your commit?

twosom pushed a commit to twosom/beam that referenced this pull request Oct 13, 2024
@shunping
Copy link
Copy Markdown
Collaborator

@ahmedabu98 @twosom Any update for this PR? Are we ready to get it merged?

@twosom
Copy link
Copy Markdown
Contributor

twosom commented Nov 11, 2024

@shunping
Hi!

Thanks for reply.

I've created a SchemaTransformProvider and tested it in Python.

It works well in the Batch scenario, but it doesn't work in the Streaming scenario.

While trying to find the cause, I switched to another task.

I'll continue working on this ^_^

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jan 7, 2025

Reminder, please take a look at this pr: @robertwb @damondouglas

@github-actions
Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @Abacn @johnjcasey

@github-actions
Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@damondouglas
Copy link
Copy Markdown
Contributor

@ahmedabu98 LGTM pending conflicts resolution

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @kennknowles @damondouglas

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 4, 2025

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @robertwb @chamikaramj

@github-actions
Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @damondouglas for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @damondouglas @damondouglas

@github-actions
Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 7, 2025

Reminder, please take a look at this pr: @Abacn @johnjcasey

@github-actions
Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Mar 11, 2025

waiting on author

@claudevdm
Copy link
Copy Markdown
Collaborator

Hey @ahmedabu98 is this still being worked on?

@derrickaw
Copy link
Copy Markdown
Collaborator

waiting on author

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Oct 6, 2025

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Oct 6, 2025
@github-actions
Copy link
Copy Markdown
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Oct 14, 2025
@tkaymak
Copy link
Copy Markdown
Contributor

tkaymak commented May 9, 2026

I have a use case for a Mqtt streaming source in Python, I would pick this up in the future @Abacn if you think it makes sense.

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented May 12, 2026

Yeah it's a valuable addition. The original plan was to first onboard SchemaTransform for MqttIO (this PR) then add it to

to generate xlang wrapper when Python SDK is built.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MQTT IO connector for Python

8 participants