Skip to content

feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508

Open
mdashti wants to merge 4 commits into
datafusion-contrib:mainfrom
paradedb:moe/worker-transport
Open

feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508
mdashti wants to merge 4 commits into
datafusion-contrib:mainfrom
paradedb:moe/worker-transport

Conversation

@mdashti

@mdashti mdashti commented Jun 20, 2026

Copy link
Copy Markdown

What

This PR puts a pluggable WorkerTransport in front of the worker comms, with Arrow Flight as one implementation behind a default-on flight feature. It adds an in-memory transport and surfaces partition routing on NetworkBoundary.

Why

The transport is hardcoded to Flight. The coordinator ships plans over gRPC and the consumer reads partitions through an internal, non-swappable connection, so an embedder can't supply its own transport without reaching into crate internals. The crate also can't build without tonic / arrow-flight.

Closed #483

How

  • A WorkerTransport abstraction and a dispatch seam, with Flight as the reference implementation. No behaviour change for the Flight path.
  • route_partition / partitions_per_consumer_task on NetworkBoundary, so a transport can place a produced partition without re-deriving the slice formula by hand.
  • An in-memory transport: every worker runs in the current process, plans delivered by a function call, partitions read from the local task registry. start_localhost_context honours DATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight, so the generic suite runs over either transport.
  • The flight feature gates tonic / arrow-flight and the gRPC surface. In-memory is the default when it's off.

Tests

CI runs the integration suite over Flight, over the in-memory transport, and once more with --no-default-features. clippy and rustfmt are clean in both feature configs.

@mdashti mdashti marked this pull request as draft June 20, 2026 07:55
@mdashti mdashti force-pushed the moe/worker-transport branch 4 times, most recently from 85c691d to a5f6672 Compare June 22, 2026 08:54

@stuhood stuhood left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great: thanks!

Comment thread src/coordinator/distributed.rs Outdated
Comment on lines +148 to +152
/// This is the extension point the in-crate shared-memory transport (and its in-process test) executes
/// through: it owns the runtime and drives the head stage itself. The transport's dispatch must
/// complete synchronously: a transport that spawns background delivery work, or a plan that
/// declares work-unit feeds (which are pumped by that background work), is rejected rather than
/// left to stall.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This references code that will never be upstreamed, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.

Comment on lines +256 to +263
// One token per execution. In-process transports watch it to tear down the moment this
// output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT
// gather). It rides the dispatch context so every worker fragment shares it.
let token = CancellationToken::new();
let context = Arc::new(task_ctx_with_extension(
&context,
DistributedCancellationToken(token.clone()),
));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still necessary with the new per-stream cancellation that you implemented?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.

/// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean
/// up; the coordinator holds it until the query's result stream is drained.
#[derive(Default)]
pub(crate) struct FlightWorkerDispatch {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be feature-flagged for flight, or is it not actually specific to that backend?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A followup PR should probably split this module (and others with large swaths of feature flags) into transport and non-transport specific modules.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.

mdashti added 3 commits June 22, 2026 13:18
Carries the extension points a transport needs beyond plan delivery and reads: a per-execution
cancellation token that `DistributedExec::execute` attaches and fires when the output stream
drops, `NetworkBoundary` partition routing (`route_partition` / `partitions_per_consumer_task` /
`PartitionRoute`), and `DistributedExec::prepare_in_process_plan` for a transport that drives the
head stage itself. The default stays Arrow Flight; in-memory and shared-memory transports plug in
through these extension points without further changes to the core.
This PR adds a `WorkerTransport` that hosts its workers in the current process: plans are
delivered with a direct `Worker::set_task_plan` call and partitions are read straight from
the local task registry, with no gRPC underneath. It is the reference implementation for
the transport extension points and the basis for running distributed plans without the
Flight stack.

To keep the dispatch paths from drifting, plan delivery is factored into transport-neutral
pieces both transports share: `encode_task_plan` (task specialization + codec), the
`Worker::set_task_plan` core that the Flight coordinator stream now wraps, and
`collect_task_work_unit_feeds`. The in-memory read side runs one `execute_local_task` over
the whole partition range and pumps each partition into a buffer, so a consumer that
interleaves partition polls of a partitioned join can't leave partitions empty.
With `flight` off there is no `tonic` / `arrow-flight` and the in-memory transport is the
default, so distributed plans still run. The integration suite runs over the in-memory transport
in both build configurations: `start_localhost_context` builds an `InMemoryWorkerTransport`
cluster instead of a gRPC one, and the gRPC harness moved to `start_localhost_flight_context`. A
`unit-test-flight-transport` job sets `DATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight` to keep full
Flight coverage.

For the no-flight suite to run, not just build, the `tpch` / `tpcds` / `clickbench` /
`stateful_data_cleanup` dataset tests move into the benchmarks crate: as a dev-dependency of the
library they re-enabled `flight` on every test build through feature unification. With that gone,
`cargo test --no-default-features --features integration --lib --tests` is genuinely Flight-free,
and a `unit-test-no-flight` job runs it. The only tests still gated on `flight` are the ones that
need a real wire: the `URLEmitter` routing tests, which assert per-URL worker identity, and the
network-boundary connection metrics (`bytes_transferred`, latency). The rest, including the
worker-hook and metrics tests, register `InMemoryWorkerTransport` directly and run either way.
@mdashti mdashti force-pushed the moe/worker-transport branch from a5f6672 to 53ff817 Compare June 22, 2026 20:20
The `prepare_in_process_plan` doc referenced an out-of-crate transport that
won't be upstreamed; reworded it to stand on its own. The `query_coordinator`
module note makes its `flight` gating visible without opening `mod.rs`.

@mdashti mdashti left a comment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuhood Thanks for the comments.

Comment thread src/coordinator/distributed.rs Outdated
Comment on lines +148 to +152
/// This is the extension point the in-crate shared-memory transport (and its in-process test) executes
/// through: it owns the runtime and drives the head stage itself. The transport's dispatch must
/// complete synchronously: a transport that spawns background delivery work, or a plan that
/// declares work-unit feeds (which are pumped by that background work), is rejected rather than
/// left to stall.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.

Comment on lines +256 to +263
// One token per execution. In-process transports watch it to tear down the moment this
// output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT
// gather). It rides the dispatch context so every worker fragment shares it.
let token = CancellationToken::new();
let context = Arc::new(task_ctx_with_extension(
&context,
DistributedCancellationToken(token.clone()),
));

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.

/// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean
/// up; the coordinator holds it until the query's result stream is drained.
#[derive(Default)]
pub(crate) struct FlightWorkerDispatch {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.

@mdashti mdashti changed the title Added a pluggable WorkerTransport, with Arrow Flight optional. feat: added a pluggable WorkerTransport, with Arrow Flight optional. Jun 23, 2026
@mdashti mdashti marked this pull request as ready for review June 23, 2026 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Abstract away gRPC details from the worker protocol

2 participants