Skip to content

[GSoC 2026] Kafka Streams Runner — ExecutableStage (stateless ParDo) translator #38743

@junaiddshaukat

Description

@junaiddshaukat

Tracking issue: #18479
Depends on: #38616 (translation framework + Impulse, merged via #38689)

Summary

Third sub-issue. Adds the ExecutableStage translator (beam:runner:executable_stage:v1) so fused stateless user code (ParDo etc.) runs in the SDK harness over the Fn API and its outputs flow back into the Kafka Streams topology. Per design doc §4.2.

Scope

  • ExecutableStageTranslator: register ExecutableStage.URN, add a
    processor node wired to the parent via the PCollection→processor map.
  • ExecutableStageProcessor (Kafka Streams Processor): on init build a
    StageBundleFactory from the ExecutableStagePayload + JobInfo; for each
    KStreamsPayload data record push the WindowedValue to the harness input
    receiver and forward harness outputs downstream as data payloads; on a
    watermark payload flush the open bundle and propagate the watermark.
  • SDK-harness wiring via runners-java-fn-execution
    (ExecutableStageContext.Factory, StageBundleFactory, RemoteBundle,
    OutputReceiverFactory, FnDataReceiver) — same surface Flink/Spark use.

Testing approach

ExecutableStage needs a running SDK harness. Going with the EMBEDDED (in-JVM Java) environment for the execution test — same as Flink's PortableExecutionTest — so a trivial DoFn round-trips in-process with no Docker/broker. TopologyTestDriver drives the topology. (If the embedded harness inside TopologyTestDriver proves awkward, fallback is driving ExecutableStageProcessor via MockProcessorContext with the same embedded harness behind it.)

Out of scope (later sub-issues)

  • State, timers, side inputs.
  • GroupByKey / repartition (first real topic boundary; that's where
    KStreamsPayload gets a Coder-based byte[] Serde).
  • Watermark manager (comes when the first stateful transform forces it).
  • ctx.commit() bundle-boundary tuning (design doc §6).

Acceptance criteria

  • ./gradlew :runners:kafka-streams:check green.
  • Impulse → ParDo pipeline translates; topology contains the
    executable-stage processor wired to the Impulse output.
  • A trivial DoFn round-trips one element end-to-end under the EMBEDDED
    environment.

Note: watermark-triggered flush is provisional

The current ExecutableStageProcessor flushes the open bundle whenever it receives a watermark payload. This will change when the WatermarkManager is introduced: a stage receives watermarks from multiple parent instances, so the output watermark is min() across them, and the bundle flush / output-watermark advance should happen only when that minimum actually moves forward (i.e. when the partition holding the current minimum updates), not on every received watermark. Captured here so the follow-up is explicit.

cc @je-ik

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions