Tracking issue: #18479
Depends on: #38616 (translation framework + Impulse, merged via #38689)
Summary
Third sub-issue. Adds the ExecutableStage translator (beam:runner:executable_stage:v1) so fused stateless user code (ParDo etc.) runs in the SDK harness over the Fn API and its outputs flow back into the Kafka Streams topology. Per design doc §4.2.
Scope
- ExecutableStageTranslator: register ExecutableStage.URN, add a
processor node wired to the parent via the PCollection→processor map.
- ExecutableStageProcessor (Kafka Streams Processor): on init build a
StageBundleFactory from the ExecutableStagePayload + JobInfo; for each
KStreamsPayload data record push the WindowedValue to the harness input
receiver and forward harness outputs downstream as data payloads; on a
watermark payload flush the open bundle and propagate the watermark.
- SDK-harness wiring via runners-java-fn-execution
(ExecutableStageContext.Factory, StageBundleFactory, RemoteBundle,
OutputReceiverFactory, FnDataReceiver) — same surface Flink/Spark use.
Testing approach
ExecutableStage needs a running SDK harness. Going with the EMBEDDED (in-JVM Java) environment for the execution test — same as Flink's PortableExecutionTest — so a trivial DoFn round-trips in-process with no Docker/broker. TopologyTestDriver drives the topology. (If the embedded harness inside TopologyTestDriver proves awkward, fallback is driving ExecutableStageProcessor via MockProcessorContext with the same embedded harness behind it.)
Out of scope (later sub-issues)
- State, timers, side inputs.
- GroupByKey / repartition (first real topic boundary; that's where
KStreamsPayload gets a Coder-based byte[] Serde).
- Watermark manager (comes when the first stateful transform forces it).
- ctx.commit() bundle-boundary tuning (design doc §6).
Acceptance criteria
- ./gradlew :runners:kafka-streams:check green.
- Impulse → ParDo pipeline translates; topology contains the
executable-stage processor wired to the Impulse output.
- A trivial DoFn round-trips one element end-to-end under the EMBEDDED
environment.
Note: watermark-triggered flush is provisional
The current ExecutableStageProcessor flushes the open bundle whenever it receives a watermark payload. This will change when the WatermarkManager is introduced: a stage receives watermarks from multiple parent instances, so the output watermark is min() across them, and the bundle flush / output-watermark advance should happen only when that minimum actually moves forward (i.e. when the partition holding the current minimum updates), not on every received watermark. Captured here so the follow-up is explicit.
cc @je-ik
Tracking issue: #18479
Depends on: #38616 (translation framework + Impulse, merged via #38689)
Summary
Third sub-issue. Adds the ExecutableStage translator (beam:runner:executable_stage:v1) so fused stateless user code (ParDo etc.) runs in the SDK harness over the Fn API and its outputs flow back into the Kafka Streams topology. Per design doc §4.2.
Scope
processor node wired to the parent via the PCollection→processor map.
StageBundleFactory from the ExecutableStagePayload + JobInfo; for each
KStreamsPayload data record push the WindowedValue to the harness input
receiver and forward harness outputs downstream as data payloads; on a
watermark payload flush the open bundle and propagate the watermark.
(ExecutableStageContext.Factory, StageBundleFactory, RemoteBundle,
OutputReceiverFactory, FnDataReceiver) — same surface Flink/Spark use.
Testing approach
ExecutableStage needs a running SDK harness. Going with the EMBEDDED (in-JVM Java) environment for the execution test — same as Flink's PortableExecutionTest — so a trivial DoFn round-trips in-process with no Docker/broker. TopologyTestDriver drives the topology. (If the embedded harness inside TopologyTestDriver proves awkward, fallback is driving ExecutableStageProcessor via MockProcessorContext with the same embedded harness behind it.)
Out of scope (later sub-issues)
KStreamsPayload gets a Coder-based byte[] Serde).
Acceptance criteria
executable-stage processor wired to the Impulse output.
environment.
Note: watermark-triggered flush is provisional
The current ExecutableStageProcessor flushes the open bundle whenever it receives a watermark payload. This will change when the WatermarkManager is introduced: a stage receives watermarks from multiple parent instances, so the output watermark is min() across them, and the bundle flush / output-watermark advance should happen only when that minimum actually moves forward (i.e. when the partition holding the current minimum updates), not on every received watermark. Captured here so the follow-up is explicit.
cc @je-ik