Skip to content

Commit fb01049

Browse files
alambOmega359
andauthored
Example for using a separate threadpool for CPU bound work (try 3) (apache#16331)
* Example for using a separate threadpool for CPU bound work (try 3) Pare back example * Update datafusion-examples/examples/thread_pools.rs Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com> * Add a note about why the main Runtime is used for IO and not CPU * remove random thought * Update comments and simplify shutdown --------- Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
1 parent 2bf8441 commit fb01049

2 files changed

Lines changed: 361 additions & 7 deletions

File tree

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example shows how to use separate thread pools (tokio [`Runtime`]))s to
19+
//! run the IO and CPU intensive parts of DataFusion plans.
20+
//!
21+
//! # Background
22+
//!
23+
//! DataFusion, by default, plans and executes all operations (both CPU and IO)
24+
//! on the same thread pool. This makes it fast and easy to get started, but
25+
//! can cause issues when running at scale, especially when fetching and operating
26+
//! on data directly from remote sources.
27+
//!
28+
//! Specifically, without configuration such as in this example, DataFusion
29+
//! plans and executes everything the same thread pool (Tokio Runtime), including
30+
//! any I/O, such as reading Parquet files from remote object storage
31+
//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
32+
//! workload can lead to issues described in the [Architecture section] such as
33+
//! throttled network bandwidth (due to congestion control) and increased
34+
//! latencies or timeouts while processing network messages.
35+
//!
36+
//! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
37+
38+
use arrow::util::pretty::pretty_format_batches;
39+
use datafusion::common::runtime::JoinSet;
40+
use datafusion::error::Result;
41+
use datafusion::execution::SendableRecordBatchStream;
42+
use datafusion::prelude::*;
43+
use futures::stream::StreamExt;
44+
use object_store::client::SpawnedReqwestConnector;
45+
use object_store::http::HttpBuilder;
46+
use std::sync::Arc;
47+
use tokio::runtime::Handle;
48+
use tokio::sync::Notify;
49+
use url::Url;
50+
51+
/// Normally, you don't need to worry about the details of the tokio
52+
/// [`Runtime`], but for this example it is important to understand how the
53+
/// [`Runtime`]s work.
54+
///
55+
/// Each thread has "current" runtime that is installed in a thread local
56+
/// variable which is used by the `tokio::spawn` function.
57+
///
58+
/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
59+
/// as the "current" runtime in a thread local variable, on which any `async`
60+
/// [`Future`], [`Stream]`s and [`Task]`s are run.
61+
///
62+
/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
63+
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
64+
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
65+
/// CPU bound tasks will often be simpler in larger applications, even though it
66+
/// makes this example slightly more complex.
67+
#[tokio::main]
68+
async fn main() -> Result<()> {
69+
// The first two examples read local files. Enabling the URL table feature
70+
// lets us treat filenames as tables in SQL.
71+
let ctx = SessionContext::new().enable_url_table();
72+
let sql = format!(
73+
"SELECT * FROM '{}/alltypes_plain.parquet'",
74+
datafusion::test_util::parquet_test_data()
75+
);
76+
77+
// Run a query on the current runtime. Calling `await` means the future
78+
// (in this case the `async` function and all spawned work in DataFusion
79+
// plans) on the current runtime.
80+
same_runtime(&ctx, &sql).await?;
81+
82+
// Run the same query but this time on a different runtime.
83+
//
84+
// Since we call `await` here, the `async` function itself runs on the
85+
// current runtime, but internally `different_runtime_basic` executes the
86+
// DataFusion plan on a different Runtime.
87+
different_runtime_basic(ctx, sql).await?;
88+
89+
// Run the same query on a different runtime, including remote IO.
90+
//
91+
// NOTE: This is best practice for production systems
92+
different_runtime_advanced().await?;
93+
94+
Ok(())
95+
}
96+
97+
/// Run queries directly on the current tokio `Runtime`
98+
///
99+
/// This is how most examples in DataFusion are written and works well for
100+
/// development, local query processing, and non latency sensitive workloads.
101+
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
102+
// Calling .sql is an async function as it may also do network
103+
// I/O, for example to contact a remote catalog or do an object store LIST
104+
let df = ctx.sql(sql).await?;
105+
106+
// While many examples call `collect` or `show()`, those methods buffers the
107+
// results. Internally DataFusion generates output a RecordBatch at a time
108+
109+
// Calling `execute_stream` return a `SendableRecordBatchStream`. Depending
110+
// on the plan, this may also do network I/O, for example to begin reading a
111+
// parquet file from a remote object store.
112+
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
113+
114+
// `next()` drives the plan, incrementally producing new `RecordBatch`es
115+
// using the current runtime.
116+
//
117+
// Perhaps somewhat non obviously, calling `next()` can also result in other
118+
// tasks being spawned on the current runtime (e.g. for `RepartitionExec` to
119+
// read data from each of its input partitions in parallel).
120+
//
121+
// Executing the plan using this pattern intermixes any IO and CPU intensive
122+
// work on same Runtime
123+
while let Some(batch) = stream.next().await {
124+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
125+
}
126+
Ok(())
127+
}
128+
129+
/// Run queries on a **different** Runtime dedicated for CPU bound work
130+
///
131+
/// This example is suitable for running DataFusion plans against local data
132+
/// sources (e.g. files) and returning results to an async destination, as might
133+
/// be done to return query results to a remote client.
134+
///
135+
/// Production systems which also read data locally or require very low latency
136+
/// should follow the recommendations on [`different_runtime_advanced`] when
137+
/// processing data from a remote source such as object storage.
138+
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> {
139+
// Since we are already in the context of runtime (installed by
140+
// #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks
141+
let cpu_runtime = CpuRuntime::try_new()?;
142+
143+
// Prepare a task that runs the plan on cpu_runtime and sends
144+
// the results back to the original runtime via a channel.
145+
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
146+
let driver_task = async move {
147+
// Plan the query (which might require CPU work to evaluate statistics)
148+
let df = ctx.sql(&sql).await?;
149+
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
150+
151+
// Calling `next()` to drive the plan in this task drives the
152+
// execution from the cpu runtime the other thread pool
153+
//
154+
// NOTE any IO run by this plan (for example, reading from an
155+
// `ObjectStore`) will be done on this new thread pool as well.
156+
while let Some(batch) = stream.next().await {
157+
if tx.send(batch).await.is_err() {
158+
// error means dropped receiver, so nothing will get results anymore
159+
return Ok(());
160+
}
161+
}
162+
Ok(()) as Result<()>
163+
};
164+
165+
// Run the driver task on the cpu runtime. Use a JoinSet to
166+
// ensure the spawned task is canceled on error/drop
167+
let mut join_set = JoinSet::new();
168+
join_set.spawn_on(driver_task, cpu_runtime.handle());
169+
170+
// Retrieve the results in the original (IO) runtime. This requires only
171+
// minimal work (pass pointers around).
172+
while let Some(batch) = rx.recv().await {
173+
println!("{}", pretty_format_batches(&[batch?])?);
174+
}
175+
176+
// wait for completion of the driver task
177+
drain_join_set(join_set).await;
178+
179+
Ok(())
180+
}
181+
182+
/// Run CPU intensive work on a different runtime but do IO operations (object
183+
/// store access) on the current runtime.
184+
async fn different_runtime_advanced() -> Result<()> {
185+
// In this example, we will query a file via https, reading
186+
// the data directly from the plan
187+
188+
// The current runtime (created by tokio::main) is used for IO
189+
//
190+
// Note this handle should be used for *ALL* remote IO operations in your
191+
// systems, including remote catalog access, which is not included in this
192+
// example.
193+
let cpu_runtime = CpuRuntime::try_new()?;
194+
let io_handle = Handle::current();
195+
196+
let ctx = SessionContext::new();
197+
198+
// By default, the HttpStore use the same runtime that calls `await` for IO
199+
// operations. This means that if the DataFusion plan is called from the
200+
// cpu_runtime, the HttpStore IO operations will *also* run on the CPU
201+
// runtime, which will error.
202+
//
203+
// To avoid this, we use a `SpawnedReqwestConnector` to configure the
204+
// `ObjectStore` to run the HTTP requests on the IO runtime.
205+
let base_url = Url::parse("https://github.com").unwrap();
206+
let http_store = HttpBuilder::new()
207+
.with_url(base_url.clone())
208+
// Use the io_runtime to run the HTTP requests. Without this line,
209+
// you will see an error such as:
210+
// A Tokio 1.x context was found, but IO is disabled.
211+
.with_http_connector(SpawnedReqwestConnector::new(io_handle))
212+
.build()?;
213+
214+
// Tell DataFusion to process `http://` urls with this wrapped object store
215+
ctx.register_object_store(&base_url, Arc::new(http_store));
216+
217+
// As above, plan and execute the query on the cpu runtime.
218+
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
219+
let driver_task = async move {
220+
// Plan / execute the query
221+
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
222+
let df = ctx
223+
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
224+
.await?;
225+
226+
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
227+
228+
// Note you can do other non trivial CPU work on the results of the
229+
// stream before sending it back to the original runtime. For example,
230+
// calling a FlightDataEncoder to convert the results to flight messages
231+
// to send over the network
232+
233+
// send results, as above
234+
while let Some(batch) = stream.next().await {
235+
if tx.send(batch).await.is_err() {
236+
return Ok(());
237+
}
238+
}
239+
Ok(()) as Result<()>
240+
};
241+
242+
let mut join_set = JoinSet::new();
243+
join_set.spawn_on(driver_task, cpu_runtime.handle());
244+
while let Some(batch) = rx.recv().await {
245+
println!("{}", pretty_format_batches(&[batch?])?);
246+
}
247+
248+
Ok(())
249+
}
250+
251+
/// Waits for all tasks in the JoinSet to complete and reports any errors that
252+
/// occurred.
253+
///
254+
/// If we don't do this, any errors that occur in the task (such as IO errors)
255+
/// are not reported.
256+
async fn drain_join_set(mut join_set: JoinSet<Result<()>>) {
257+
// retrieve any errors from the tasks
258+
while let Some(result) = join_set.join_next().await {
259+
match result {
260+
Ok(Ok(())) => {} // task completed successfully
261+
Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed
262+
Err(e) => eprintln!("JoinSet error: {e}"), // JoinSet error
263+
}
264+
}
265+
}
266+
267+
/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
268+
///
269+
/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
270+
/// `Runtime` correctly is somewhat tricky. This structure manages the creation
271+
/// and shutdown of a separate thread.
272+
///
273+
/// # Notes
274+
/// On drop, the thread will wait for all remaining tasks to complete.
275+
///
276+
/// Depending on your application, more sophisticated shutdown logic may be
277+
/// required, such as ensuring that no new tasks are added to the runtime.
278+
///
279+
/// # Credits
280+
/// This code is derived from code originally written for [InfluxDB 3.0]
281+
///
282+
/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
283+
struct CpuRuntime {
284+
/// Handle is the tokio structure for interacting with a Runtime.
285+
handle: Handle,
286+
/// Signal to start shutting down
287+
notify_shutdown: Arc<Notify>,
288+
/// When thread is active, is Some
289+
thread_join_handle: Option<std::thread::JoinHandle<()>>,
290+
}
291+
292+
impl Drop for CpuRuntime {
293+
fn drop(&mut self) {
294+
// Notify the thread to shutdown.
295+
self.notify_shutdown.notify_one();
296+
// In a production system you also need to ensure your code stops adding
297+
// new tasks to the underlying runtime after this point to allow the
298+
// thread to complete its work and exit cleanly.
299+
if let Some(thread_join_handle) = self.thread_join_handle.take() {
300+
// If the thread is still running, we wait for it to finish
301+
print!("Shutting down CPU runtime thread...");
302+
if let Err(e) = thread_join_handle.join() {
303+
eprintln!("Error joining CPU runtime thread: {e:?}",);
304+
} else {
305+
println!("CPU runtime thread shutdown successfully.");
306+
}
307+
}
308+
}
309+
}
310+
311+
impl CpuRuntime {
312+
/// Create a new Tokio Runtime for CPU bound tasks
313+
pub fn try_new() -> Result<Self> {
314+
let cpu_runtime = tokio::runtime::Builder::new_multi_thread()
315+
.enable_time()
316+
.build()?;
317+
let handle = cpu_runtime.handle().clone();
318+
let notify_shutdown = Arc::new(Notify::new());
319+
let notify_shutdown_captured = Arc::clone(&notify_shutdown);
320+
321+
// The cpu_runtime runs and is dropped on a separate thread
322+
let thread_join_handle = std::thread::spawn(move || {
323+
cpu_runtime.block_on(async move {
324+
notify_shutdown_captured.notified().await;
325+
});
326+
// Note: cpu_runtime is dropped here, which will wait for all tasks
327+
// to complete
328+
});
329+
330+
Ok(Self {
331+
handle,
332+
notify_shutdown,
333+
thread_join_handle: Some(thread_join_handle),
334+
})
335+
}
336+
337+
/// Return a handle suitable for spawning CPU bound tasks
338+
///
339+
/// # Notes
340+
///
341+
/// If a task spawned on this handle attempts to do IO, it will error with a
342+
/// message such as:
343+
///
344+
/// ```text
345+
///A Tokio 1.x context was found, but IO is disabled.
346+
/// ```
347+
pub fn handle(&self) -> &Handle {
348+
&self.handle
349+
}
350+
}

datafusion/core/src/lib.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -510,17 +510,20 @@
510510
//! initial development and processing local files, but it can lead to problems
511511
//! under load and/or when reading from network sources such as AWS S3.
512512
//!
513+
//! ### Optimizing Latency: Throttled CPU / IO under Highly Concurrent Load
514+
//!
513515
//! If your system does not fully utilize either the CPU or network bandwidth
514516
//! during execution, or you see significantly higher tail (e.g. p99) latencies
515517
//! responding to network requests, **it is likely you need to use a different
516-
//! [`Runtime`] for CPU intensive DataFusion plans**. This effect can be especially
517-
//! pronounced when running several queries concurrently.
518+
//! [`Runtime`] for DataFusion plans**. The [thread_pools example]
519+
//! has an example of how to do so.
518520
//!
519-
//! As shown in the following figure, using the same [`Runtime`] for both CPU
520-
//! intensive processing and network requests can introduce significant
521-
//! delays in responding to those network requests. Delays in processing network
522-
//! requests can and does lead network flow control to throttle the available
523-
//! bandwidth in response.
521+
//! As shown below, using the same [`Runtime`] for both CPU intensive processing
522+
//! and network requests can introduce significant delays in responding to
523+
//! those network requests. Delays in processing network requests can and does
524+
//! lead network flow control to throttle the available bandwidth in response.
525+
//! This effect can be especially pronounced when running multiple queries
526+
//! concurrently.
524527
//!
525528
//! ```text
526529
//! Legend
@@ -602,6 +605,7 @@
602605
//!
603606
//! [Tokio]: https://tokio.rs
604607
//! [`Runtime`]: tokio::runtime::Runtime
608+
//! [thread_pools example]: https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/thread_pools.rs
605609
//! [`task`]: tokio::task
606610
//! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
607611
//! [`RepartitionExec`]: physical_plan::repartition::RepartitionExec

0 commit comments

Comments
 (0)