feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508
feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508mdashti wants to merge 4 commits into
WorkerTransport, with Arrow Flight optional.#508Conversation
85c691d to
a5f6672
Compare
| /// This is the extension point the in-crate shared-memory transport (and its in-process test) executes | ||
| /// through: it owns the runtime and drives the head stage itself. The transport's dispatch must | ||
| /// complete synchronously: a transport that spawns background delivery work, or a plan that | ||
| /// declares work-unit feeds (which are pumped by that background work), is rejected rather than | ||
| /// left to stall. |
There was a problem hiding this comment.
This references code that will never be upstreamed, right?
There was a problem hiding this comment.
Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.
| // One token per execution. In-process transports watch it to tear down the moment this | ||
| // output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT | ||
| // gather). It rides the dispatch context so every worker fragment shares it. | ||
| let token = CancellationToken::new(); | ||
| let context = Arc::new(task_ctx_with_extension( | ||
| &context, | ||
| DistributedCancellationToken(token.clone()), | ||
| )); |
There was a problem hiding this comment.
Is this still necessary with the new per-stream cancellation that you implemented?
There was a problem hiding this comment.
Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.
| /// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean | ||
| /// up; the coordinator holds it until the query's result stream is drained. | ||
| #[derive(Default)] | ||
| pub(crate) struct FlightWorkerDispatch { |
There was a problem hiding this comment.
Is this supposed to be feature-flagged for flight, or is it not actually specific to that backend?
There was a problem hiding this comment.
It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.
There was a problem hiding this comment.
A followup PR should probably split this module (and others with large swaths of feature flags) into transport and non-transport specific modules.
There was a problem hiding this comment.
Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.
Carries the extension points a transport needs beyond plan delivery and reads: a per-execution cancellation token that `DistributedExec::execute` attaches and fires when the output stream drops, `NetworkBoundary` partition routing (`route_partition` / `partitions_per_consumer_task` / `PartitionRoute`), and `DistributedExec::prepare_in_process_plan` for a transport that drives the head stage itself. The default stays Arrow Flight; in-memory and shared-memory transports plug in through these extension points without further changes to the core.
This PR adds a `WorkerTransport` that hosts its workers in the current process: plans are delivered with a direct `Worker::set_task_plan` call and partitions are read straight from the local task registry, with no gRPC underneath. It is the reference implementation for the transport extension points and the basis for running distributed plans without the Flight stack. To keep the dispatch paths from drifting, plan delivery is factored into transport-neutral pieces both transports share: `encode_task_plan` (task specialization + codec), the `Worker::set_task_plan` core that the Flight coordinator stream now wraps, and `collect_task_work_unit_feeds`. The in-memory read side runs one `execute_local_task` over the whole partition range and pumps each partition into a buffer, so a consumer that interleaves partition polls of a partitioned join can't leave partitions empty.
With `flight` off there is no `tonic` / `arrow-flight` and the in-memory transport is the default, so distributed plans still run. The integration suite runs over the in-memory transport in both build configurations: `start_localhost_context` builds an `InMemoryWorkerTransport` cluster instead of a gRPC one, and the gRPC harness moved to `start_localhost_flight_context`. A `unit-test-flight-transport` job sets `DATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight` to keep full Flight coverage. For the no-flight suite to run, not just build, the `tpch` / `tpcds` / `clickbench` / `stateful_data_cleanup` dataset tests move into the benchmarks crate: as a dev-dependency of the library they re-enabled `flight` on every test build through feature unification. With that gone, `cargo test --no-default-features --features integration --lib --tests` is genuinely Flight-free, and a `unit-test-no-flight` job runs it. The only tests still gated on `flight` are the ones that need a real wire: the `URLEmitter` routing tests, which assert per-URL worker identity, and the network-boundary connection metrics (`bytes_transferred`, latency). The rest, including the worker-hook and metrics tests, register `InMemoryWorkerTransport` directly and run either way.
a5f6672 to
53ff817
Compare
The `prepare_in_process_plan` doc referenced an out-of-crate transport that won't be upstreamed; reworded it to stand on its own. The `query_coordinator` module note makes its `flight` gating visible without opening `mod.rs`.
| /// This is the extension point the in-crate shared-memory transport (and its in-process test) executes | ||
| /// through: it owns the runtime and drives the head stage itself. The transport's dispatch must | ||
| /// complete synchronously: a transport that spawns background delivery work, or a plan that | ||
| /// declares work-unit feeds (which are pumped by that background work), is rejected rather than | ||
| /// left to stall. |
There was a problem hiding this comment.
Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.
| // One token per execution. In-process transports watch it to tear down the moment this | ||
| // output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT | ||
| // gather). It rides the dispatch context so every worker fragment shares it. | ||
| let token = CancellationToken::new(); | ||
| let context = Arc::new(task_ctx_with_extension( | ||
| &context, | ||
| DistributedCancellationToken(token.clone()), | ||
| )); |
There was a problem hiding this comment.
Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.
| /// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean | ||
| /// up; the coordinator holds it until the query's result stream is drained. | ||
| #[derive(Default)] | ||
| pub(crate) struct FlightWorkerDispatch { |
There was a problem hiding this comment.
It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.
There was a problem hiding this comment.
Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.
WorkerTransport, with Arrow Flight optional.WorkerTransport, with Arrow Flight optional.
What
This PR puts a pluggable
WorkerTransportin front of the worker comms, with Arrow Flight as one implementation behind a default-onflightfeature. It adds an in-memory transport and surfaces partition routing onNetworkBoundary.Why
The transport is hardcoded to Flight. The coordinator ships plans over gRPC and the consumer reads partitions through an internal, non-swappable connection, so an embedder can't supply its own transport without reaching into crate internals. The crate also can't build without
tonic/arrow-flight.Closed #483
How
WorkerTransportabstraction and a dispatch seam, with Flight as the reference implementation. No behaviour change for the Flight path.route_partition/partitions_per_consumer_taskonNetworkBoundary, so a transport can place a produced partition without re-deriving the slice formula by hand.start_localhost_contexthonoursDATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight, so the generic suite runs over either transport.flightfeature gatestonic/arrow-flightand the gRPC surface. In-memory is the default when it's off.Tests
CI runs the integration suite over Flight, over the in-memory transport, and once more with
--no-default-features.clippyandrustfmtare clean in both feature configs.