From d933016ebc08074c3fbe0cb34bb42bd32f24ca3a Mon Sep 17 00:00:00 2001 From: Harrison Crosse Date: Fri, 3 Apr 2026 09:23:28 -0400 Subject: [PATCH] feat: make BatchPartitioner::partition_iter public Expose the existing partition_iter method so downstream async consumers can separate CPU-bound partitioning from I/O. Closes #21311 --- .../physical-plan/src/repartition/mod.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d3020c2756fff..5ab21dd09a111 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -545,12 +545,21 @@ impl BatchPartitioner { }) } - /// Actual implementation of [`partition`](Self::partition). + /// Returns an iterator of `(partition_index, RecordBatch)` pairs for the given batch. /// - /// The reason this was pulled out is that we need to have a variant of `partition` that works w/ sync functions, - /// and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve - /// this (so we don't need to clone the entire implementation). - fn partition_iter( + /// This is useful for async consumers that want to separate CPU-bound partitioning + /// from I/O. For example, you can iterate results on the async side and send them + /// through a channel, while performing file I/O on a blocking task: + /// + /// ```ignore + /// for result in partitioner.partition_iter(batch)? { + /// let (partition, batch) = result?; + /// tx.send((partition, batch)).await?; + /// } + /// ``` + /// + /// The sync [`partition`](Self::partition) method is implemented on top of this. + pub fn partition_iter( &mut self, batch: RecordBatch, ) -> Result> + Send + '_> {