[GSoC 2026] Kafka Streams runner — translation framework + Impulse translator#38689
Conversation
- KafkaStreamsPipelineTranslator now walks the pipeline in topological
order via QueryablePipeline and dispatches each transform to a
PTransformTranslator keyed by URN. Unknown URNs still fail fast with
a clear "No translator registered for URN ..." message.
- ImpulseTranslator implements beam:transform:impulse:v1 per design doc
§4.1: a per-application bootstrap topic source (__beam_impulse_<app>)
satisfies Kafka Streams' real-source requirement, ImpulseProcessor
emits exactly one WindowedValue<byte[]> in the GlobalWindow via a
one-shot wall-clock punctuator scheduled on init, and a persistent
state store records a "fired" flag so task restarts do not duplicate.
Bootstrap-topic auto-creation is deferred to a follow-up sub-issue
(design doc §12.1); the topic is expected to pre-exist in production.
- KafkaStreamsTranslationContext now holds the Topology being built and
a PCollection-id -> processor-name map so downstream translators can
wire to their parent nodes.
- KafkaStreamsPipelineRunner.run now translates and starts the
KafkaStreams application, returning a KafkaStreamsPortablePipelineResult
that maps KafkaStreams.State to Beam's PipelineResult.State. Forces
processing.guarantee=exactly_once_v2.
- Tests:
* KafkaStreamsPipelineTranslatorTest also covers the Impulse success
path; the unsupported-URN check now uses GroupByKey.
* ImpulseTranslatorTest exercises the topology via TopologyTestDriver:
exactly one empty byte[] in GlobalWindow is emitted, and a second
wall-clock advance does not re-emit.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the core translation framework for the Kafka Streams runner, enabling the conversion of portable Beam pipelines into Kafka Streams topologies. It establishes the infrastructure for URN-based transform dispatching and provides the first concrete implementation for the Impulse transform, which is essential for bootstrapping pipeline execution. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements the translation of the Beam Impulse transform into a Kafka Streams topology. It introduces the KafkaStreamsPipelineRunner execution logic, a KafkaStreamsPortablePipelineResult to track execution state, and the ImpulseTranslator and ImpulseProcessor to handle the impulse transform using a wall-clock punctuator and state store. The reviewer provided several critical suggestions to improve robustness: handling potential race conditions in the pipeline result's state listener, ensuring getState() correctly returns CANCELLED after cancel() is called, canceling the scheduled punctuator in ImpulseProcessor once the impulse has fired to avoid periodic wakeups, guarding against NullPointerExceptions when building Kafka Streams properties from potentially null pipeline options, and using a thread-safe list in the test CapturingProcessor.
- KafkaStreamsPortablePipelineResult: close the race where KafkaStreams could transition to a terminal state before the state listener was registered, leaving waitUntilFinish() to block forever. Also add a volatile cancelled flag so that getState() returns State.CANCELLED after a user cancel(), instead of mapping NOT_RUNNING to State.DONE. - ImpulseProcessor: capture the Cancellable returned by context.schedule and cancel the wall-clock punctuator once the impulse has fired (or if the state store already records a prior emission), so the processor stops doing periodic state-store lookups for the lifetime of the task. - KafkaStreamsPipelineRunner.run: invoke PipelineOptionsValidator.validate on the pipeline options at the start of run() so a missing required option (e.g. applicationId) fails with a clear IllegalArgumentException rather than a raw NullPointerException on Properties.put further down. - ImpulseTranslatorTest: wrap CapturingProcessor.received in Collections.synchronizedList for best-practice thread-safety even though TopologyTestDriver runs single-threaded.
|
Assigning reviewers: R: @jrmccluskey added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
Second sub-issue under the Kafka Streams runner GSoC 2026 project.
Adds URN-dispatch translation framework and the first concrete transform
translator (Impulse), per design doc §4.1.
What's in this PR
KafkaStreamsPipelineTranslatorwalking viaQueryablePipeline.ImpulseTranslator+ImpulseProcessorusing a dedicated per-applicationbootstrap topic (
__beam_impulse_<applicationId>) and a persistent statestore keyed
firedto prevent re-emission across restarts.KafkaStreamsTranslationContextnow holds theTopologyand aPCollection-id → processor-name map.
KafkaStreamsPipelineRunner.runtranslates and starts aKafkaStreamsapplication; returns a
KafkaStreamsPortablePipelineResult.processing.guarantee=exactly_once_v2set internally (not a user option).Tests
KafkaStreamsPipelineTranslatorTest— Impulse success path + unsupportedURN still rejected (uses GroupByKey URN as the negative case).
ImpulseTranslatorTest— TopologyTestDriver behavioural tests:exactly one empty
byte[]inGlobalWindow, no re-emit on second trigger.Validation
./gradlew :runners:kafka-streams:checkgreen locally.Deferred / out of scope (documented in code)
AdminClient) — design doc §12.1.Production currently requires the topic to pre-exist; tests bypass via
TopologyTestDriver.TIMESTAMP_MAX_VALUE— Kafka Streams has nonative Beam watermark; this moves to the (future) runner-side watermark
manager rather than the record timestamp.
Closes #38616
Refs #18479
cc @je-ik