Skip to content

[GSoC 2026] Kafka Streams runner — translation framework + Impulse translator#38689

Open
junaiddshaukat wants to merge 2 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-impulse-translator
Open

[GSoC 2026] Kafka Streams runner — translation framework + Impulse translator#38689
junaiddshaukat wants to merge 2 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-impulse-translator

Conversation

@junaiddshaukat
Copy link
Copy Markdown
Contributor

Summary

Second sub-issue under the Kafka Streams runner GSoC 2026 project.
Adds URN-dispatch translation framework and the first concrete transform
translator (Impulse), per design doc §4.1.

What's in this PR

  • URN dispatch in KafkaStreamsPipelineTranslator walking via QueryablePipeline.
  • ImpulseTranslator + ImpulseProcessor using a dedicated per-application
    bootstrap topic (__beam_impulse_<applicationId>) and a persistent state
    store keyed fired to prevent re-emission across restarts.
  • KafkaStreamsTranslationContext now holds the Topology and a
    PCollection-id → processor-name map.
  • KafkaStreamsPipelineRunner.run translates and starts a KafkaStreams
    application; returns a KafkaStreamsPortablePipelineResult.
  • processing.guarantee=exactly_once_v2 set internally (not a user option).

Tests

  • KafkaStreamsPipelineTranslatorTest — Impulse success path + unsupported
    URN still rejected (uses GroupByKey URN as the negative case).
  • ImpulseTranslatorTest — TopologyTestDriver behavioural tests:
    exactly one empty byte[] in GlobalWindow, no re-emit on second trigger.

Validation

  • ./gradlew :runners:kafka-streams:check green locally.

Deferred / out of scope (documented in code)

  • Bootstrap topic auto-creation (AdminClient) — design doc §12.1.
    Production currently requires the topic to pre-exist; tests bypass via
    TopologyTestDriver.
  • Watermark advancement to TIMESTAMP_MAX_VALUE — Kafka Streams has no
    native Beam watermark; this moves to the (future) runner-side watermark
    manager rather than the record timestamp.
  • ExecutableStage / stateless ParDo — next sub-issue.

Closes #38616
Refs #18479
cc @je-ik

- KafkaStreamsPipelineTranslator now walks the pipeline in topological
  order via QueryablePipeline and dispatches each transform to a
  PTransformTranslator keyed by URN. Unknown URNs still fail fast with
  a clear "No translator registered for URN ..." message.
- ImpulseTranslator implements beam:transform:impulse:v1 per design doc
  §4.1: a per-application bootstrap topic source (__beam_impulse_<app>)
  satisfies Kafka Streams' real-source requirement, ImpulseProcessor
  emits exactly one WindowedValue<byte[]> in the GlobalWindow via a
  one-shot wall-clock punctuator scheduled on init, and a persistent
  state store records a "fired" flag so task restarts do not duplicate.
  Bootstrap-topic auto-creation is deferred to a follow-up sub-issue
  (design doc §12.1); the topic is expected to pre-exist in production.
- KafkaStreamsTranslationContext now holds the Topology being built and
  a PCollection-id -> processor-name map so downstream translators can
  wire to their parent nodes.
- KafkaStreamsPipelineRunner.run now translates and starts the
  KafkaStreams application, returning a KafkaStreamsPortablePipelineResult
  that maps KafkaStreams.State to Beam's PipelineResult.State. Forces
  processing.guarantee=exactly_once_v2.
- Tests:
  * KafkaStreamsPipelineTranslatorTest also covers the Impulse success
    path; the unsupported-URN check now uses GroupByKey.
  * ImpulseTranslatorTest exercises the topology via TopologyTestDriver:
    exactly one empty byte[] in GlobalWindow is emitted, and a second
    wall-clock advance does not re-emit.
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the core translation framework for the Kafka Streams runner, enabling the conversion of portable Beam pipelines into Kafka Streams topologies. It establishes the infrastructure for URN-based transform dispatching and provides the first concrete implementation for the Impulse transform, which is essential for bootstrapping pipeline execution.

Highlights

  • Translation Framework: Implemented a URN-dispatch translation framework in KafkaStreamsPipelineTranslator that walks the pipeline in topological order and dispatches transforms to specific translators.
  • Impulse Transform: Added the Impulse transform translator and processor, which uses a dedicated bootstrap topic and a persistent state store to ensure exactly-once emission per application.
  • Runner Execution: Updated KafkaStreamsPipelineRunner to handle the full translation lifecycle, including starting the Kafka Streams application and returning a KafkaStreamsPortablePipelineResult.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the translation of the Beam Impulse transform into a Kafka Streams topology. It introduces the KafkaStreamsPipelineRunner execution logic, a KafkaStreamsPortablePipelineResult to track execution state, and the ImpulseTranslator and ImpulseProcessor to handle the impulse transform using a wall-clock punctuator and state store. The reviewer provided several critical suggestions to improve robustness: handling potential race conditions in the pipeline result's state listener, ensuring getState() correctly returns CANCELLED after cancel() is called, canceling the scheduled punctuator in ImpulseProcessor once the impulse has fired to avoid periodic wakeups, guarding against NullPointerExceptions when building Kafka Streams properties from potentially null pipeline options, and using a thread-safe list in the test CapturingProcessor.

- KafkaStreamsPortablePipelineResult: close the race where KafkaStreams
  could transition to a terminal state before the state listener was
  registered, leaving waitUntilFinish() to block forever. Also add a
  volatile cancelled flag so that getState() returns State.CANCELLED
  after a user cancel(), instead of mapping NOT_RUNNING to State.DONE.
- ImpulseProcessor: capture the Cancellable returned by context.schedule
  and cancel the wall-clock punctuator once the impulse has fired (or
  if the state store already records a prior emission), so the
  processor stops doing periodic state-store lookups for the lifetime
  of the task.
- KafkaStreamsPipelineRunner.run: invoke PipelineOptionsValidator.validate
  on the pipeline options at the start of run() so a missing required
  option (e.g. applicationId) fails with a clear IllegalArgumentException
  rather than a raw NullPointerException on Properties.put further down.
- ImpulseTranslatorTest: wrap CapturingProcessor.received in
  Collections.synchronizedList for best-practice thread-safety even
  though TopologyTestDriver runs single-threaded.
@github-actions
Copy link
Copy Markdown
Contributor

ghost commented May 26, 2026

Assigning reviewers:

R: @jrmccluskey added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@je-ik je-ik self-requested a review May 26, 2026 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant