11# DataFusion Distributed
22
3- Library that brings distributed execution capabilities to [ Apache DataFusion] ( https://github.com/apache/datafusion ) .
3+ Library for building distributed query execution engines based on [ Apache DataFusion] ( https://github.com/apache/datafusion ) .
44
55> [ !NOTE]
6- > This is project is not part of Apache DataFusion
6+ > This project is not part of Apache DataFusion
7+
8+ ## Quickstart
9+
10+ Starting with the following dependencies:
11+
12+ <details >
13+ <summary ><code >Cargo.toml</code ></summary >
14+
15+ ``` toml
16+ [package ]
17+ name = " datafusion-distributed-quick-start"
18+ version = " 0.1.0"
19+ edition = " 2024"
20+ default-run = " main"
21+
22+ [dependencies ]
23+ datafusion = " 54"
24+ datafusion-distributed = " 2"
25+ object_store = { version = " 0.13.2" , features = [" http" ] }
26+ tokio = { version = " 1" , features = [" full" ] }
27+ tonic = " 0"
28+ url = " 2"
29+ alloc-no-stdlib = " =2.0.4"
30+
31+ [patch .crates-io ]
32+ # https://github.com/dropbox/rust-brotli/issues/256, issue unrelated to this project, this will stop being
33+ # necessary as sson as rust-brotli fixes it.
34+ alloc-no-stdlib = { git = " https://github.com/dropbox/rust-alloc-no-stdlib" , rev = " 6032b6a9b20e03737135c55a0270ccffcc1438ef" }
35+ alloc-stdlib = { git = " https://github.com/dropbox/rust-alloc-no-stdlib" , rev = " 6032b6a9b20e03737135c55a0270ccffcc1438ef" }
36+
37+ [[bin ]]
38+ name = " main"
39+ path = " src/main.rs"
40+
41+ [[bin ]]
42+ name = " worker"
43+ path = " src/worker.rs"
44+
45+ ```
46+
47+ </details >
48+
49+ ---
50+
51+ ` src/worker.rs ` : Spawn a Distributed DataFusion worker in a localhost port.
52+
53+ <details >
54+ <summary ><code >use</code > statements</summary >
55+
56+ ``` rust
57+ use datafusion :: execution :: runtime_env :: RuntimeEnvBuilder ;
58+ use datafusion_distributed :: Worker ;
59+ use object_store :: http :: HttpBuilder ;
60+ use std :: net :: {IpAddr , Ipv4Addr , SocketAddr };
61+ use std :: sync :: Arc ;
62+ use tonic :: transport :: Server ;
63+ use url :: Url ;
64+ ```
65+
66+ </details >
67+
68+ ``` rust
69+ #[tokio:: main]
70+ async fn main () -> Result <(), Box <dyn std :: error :: Error >> {
71+ // 1. Spawn a Distributed DataFusion worker in a localhost port.
72+ let base = Url :: parse (" https://datasets.clickhouse.com" )? ;
73+ let store = HttpBuilder :: new (). with_url (base . clone ()). build ()? ;
74+ let runtime = RuntimeEnvBuilder :: new (). build_arc ()? ;
75+ runtime . register_object_store (& base , Arc :: new (store ));
76+
77+ let worker = Worker :: default (). with_runtime_env (runtime );
78+
79+ let port = std :: env :: var (" PORT" )? . parse ()? ;
80+ let addr = SocketAddr :: new (IpAddr :: V4 (Ipv4Addr :: LOCALHOST ), port );
81+ println! (" Distributed DataFusion worker listening on {addr}..." );
82+ Ok (Server :: builder ()
83+ . add_service (worker . into_worker_server ())
84+ . serve (addr )
85+ . await ? )
86+ }
87+ ```
88+
89+ ---
90+
91+ ` src/main.rs ` : Prepare the ` SessionContext ` with all the pieces necessary to communicate with the workers above.
92+
93+ <details >
94+ <summary ><code >use</code > statements</summary >
95+
96+ ``` rust
97+ use datafusion :: error :: DataFusionError ;
98+ use datafusion :: execution :: SessionStateBuilder ;
99+ use datafusion :: prelude :: {ParquetReadOptions , SessionConfig , SessionContext };
100+ use datafusion_distributed :: {
101+ DistributedExt , SessionStateBuilderExt , WorkerResolver , display_plan_ascii,
102+ };
103+ use object_store :: http :: HttpBuilder ;
104+ use std :: sync :: Arc ;
105+ use url :: Url ;
106+ ```
107+
108+ </details >
109+
110+ ``` rust
111+ #[tokio:: main]
112+ async fn main () -> Result <(), Box <dyn std :: error :: Error >> {
113+ // 2. Create a WorkerResolver implementation that knows how to resolve
114+ // Distributed DataFusion workers running remotely.
115+ let workers = std :: env :: var (" WORKERS" ). unwrap_or_default ();
116+ let mut urls : Vec <Url > = vec! [];
117+ for port in workers . split (" ," ). filter (| v | ! v . is_empty ()) {
118+ urls . push (Url :: parse (& format! (" http://127.0.0.1:{port}" ))? );
119+ }
120+
121+ struct LocalhostWorkerResolver (Vec <Url >);
122+ impl WorkerResolver for LocalhostWorkerResolver {
123+ fn get_urls (& self ) -> Result <Vec <Url >, DataFusionError > {
124+ Ok (self . 0. clone ())
125+ }
126+ }
127+
128+ // 3. Build the SessionContext as usual. Distributed queries will use
129+ // this SessionContext as a "coordinator", as it'll be in charge of
130+ // distributed planning + fanning out tasks to workers.
131+ let state = SessionStateBuilder :: new ()
132+ . with_default_features ()
133+ . with_config (SessionConfig :: new (). with_information_schema (true ))
134+ . with_distributed_worker_resolver (LocalhostWorkerResolver (urls ))
135+ . with_distributed_planner ()
136+ // A very low value forces queries to be heavily distributed.
137+ . with_distributed_file_scan_config_bytes_per_partition (1 )?
138+ . build ();
139+
140+ let ctx = SessionContext :: from (state );
141+
142+ let base = Url :: parse (" https://datasets.clickhouse.com" )? ;
143+ let store = HttpBuilder :: new (). with_url (base . clone ()). build ()? ;
144+ ctx . register_object_store (& base , Arc :: new (store ));
145+ ctx . register_parquet (
146+ " hits" ,
147+ " https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet" ,
148+ ParquetReadOptions :: default (),
149+ )
150+ . await ? ;
151+
152+ // 4. Issue the SQL query, and get a nice visualization for distributed plans
153+ // with `display_plan_ascii`.
154+ let sql = std :: env :: args (). skip (1 ). collect :: <Vec <_ >>(). join (" " );
155+ let df = ctx . sql (& sql ). await ? ;
156+ let plan = df . create_physical_plan (). await ? ;
157+ println! (" {}" , display_plan_ascii (plan . as_ref (), false ));
158+ df . show (). await ? ;
159+
160+ Ok (())
161+ }
162+ ```
163+
164+ ---
165+
166+ Start a couple of workers, each on its own port:
167+
168+ ``` sh
169+ PORT=8080 cargo run --bin worker &
170+ PORT=8081 cargo run --bin worker &
171+ PORT=8082 cargo run --bin worker &
172+ ```
173+
174+ Then point the main script at them and run a query:
175+
176+ ``` sh
177+ WORKERS=8080,8081,8082 cargo run -- " SELECT COUNT(*), AVG(\" ResolutionWidth\" ) FROM hits"
178+ ```
179+
180+ You'll see the distributed plan printed, followed by the query results.
181+
182+ ## Benchmarks
183+
184+ DataFusion Distributed consistently outperforms other distributed query engines across TPC-H and
185+ TPC-DS. The chart below shows how much slower each engine is relative to DataFusion Distributed
186+ (lower is better):
187+
188+ ![ How much slower than DataFusion Distributed?] ( ./docs/source/_static/images/summary_relative.png )
189+
190+ <details >
191+ <summary >Per-dataset totals</summary >
192+
193+ | Dataset | df-dist | Ballista | Spark | Trino | Queries compared |
194+ | -------------| --------:| ----------------------------------------------------------------------------------------:| ------:| ------:| -----------------:|
195+ | TPC-H SF1 | ** 7s** | 11s | 30s | 18s | 22 |
196+ | TPC-H SF10 | ** 10s** | 57s | 51s | 33s | 22 |
197+ | TPC-H SF100 | ** 42s** | N/A ([ #1836 ] ( https://github.com/datafusion-contrib/datafusion-distributed/issues/1836 ) ) | 261s | 93s | 19 |
198+ | TPC-DS SF1 | ** 29s** | 72s | 101s | 85s | 67 |
199+
200+ ![ TPC-H SF1] ( ./docs/source/_static/images/tpch_sf1.png )
201+ ![ TPC-H SF10] ( ./docs/source/_static/images/tpch_sf10.png )
202+ ![ TPC-H SF100] ( ./docs/source/_static/images/tpch_sf100.png )
203+ ![ TPC-DS SF1] ( ./docs/source/_static/images/tpcds_sf1.png )
204+
205+ </details >
206+
207+ ** Conditions.** All engines ran on the same cluster: 12 AWS EC2 ` c5n.2xlarge ` instances (8 vCPUs and
208+ 21 GiB of memory each, with up to 25 Gbps networking) reading Parquet files stored in Amazon S3. Each
209+ engine's total is the sum of per-query median (p50) latencies over the queries that all compared engines
210+ completed successfully; lower is better.
211+
212+ The benchmarking code is public an open for anyone to easily reproduce. It uses AWS CDK for automating the creation
213+ of the benchmarking cluster so that anyone can reproduce the same results in their own AWS account. The code can
214+ be found in the [ benchmarks/cdk] ( ./benchmarks/cdk ) directory.
7215
8216## What can you do with this crate?
9217
10- This crate is a toolkit that extends [ Apache DataFusion] ( https://github.com/apache/datafusion ) with distributed
11- capabilities,
12- providing a developer experience as close as possible to vanilla DataFusion while being unopinionated about the
13- networking stack used for hosting the different workers involved in a query.
218+ This crate is a toolkit that extends [ Apache DataFusion] ( https://github.com/apache/datafusion ) with distributed capabilities, providing a developer
219+ experience as close as possible to vanilla DataFusion while being unopinionated about the networking stack.
220+
221+ It's not an out of the box distributed engine, it's instead a library for building distributed query engines with some
222+ sane defaults for when the data sources are just files.
14223
15224Users of this library can expect to take their existing single-node DataFusion-based systems and add distributed
16225capabilities with minimal changes.
@@ -21,49 +230,14 @@ capabilities with minimal changes.
21230 a familiar API for building applications.
22231- Unopinionated about networking. This crate does not take any opinion about the networking stack, and users are
23232 expected to leverage their own infrastructure for hosting DataFusion nodes.
24- - No coordinator-worker architecture. To keep infrastructure simple, any node can act as a coordinator or a worker.
25-
26- # Benchmarks
27-
28- The benchmarking code is public an open for anyone to easily reproduce. It uses AWS CDK for automating the creation
29- of the benchmarking cluster so that anyone can reproduce the same results in their own AWS account. The code can
30- be found in the [ benchmarks/cdk] ( ./benchmarks/cdk ) directory.
31-
32- ### TPC-H SF1
33-
34- ![ benchmarks_sf1.png] ( https://github.com/user-attachments/assets/2f922066-7382-4c31-9e76-74b1ca053bfc )
35-
36- ### TPC-H SF10
37-
38- ![ benchmarks_sf10.png] ( https://github.com/user-attachments/assets/08fd3090-92bf-43fd-b80c-12e3a127e724 )
233+ - No coordinator-worker split. To keep infrastructure simple, any node can act as a coordinator or a worker.
39234
40235# Docs
41236
42- The user guide can be found here:
237+ The user guide can be found here:
43238
44239https://datafusion-contrib.github.io/datafusion-distributed
45240
46241If you'd like to contribute, see the contributor guide:
47242
48243https://datafusion-contrib.github.io/datafusion-distributed/contributor-guide/index.html
49-
50- ## Getting familiar with distributed DataFusion
51-
52- There are some runnable examples showcasing how to provide a localhost implementation for Distributed DataFusion in
53- [ examples/] ( examples ) :
54-
55- - [ localhost_worker.rs] ( examples/localhost_worker.rs ) : code that spawns a Worker listening for physical
56- plans over the network.
57- - [ localhost_run.rs] ( examples/localhost_run.rs ) : code that distributes a query across the spawned Workers and executes
58- it.
59-
60- The integration tests also provide an idea about how to use the library and what can be achieved with it:
61-
62- - [ tpch_validation_test.rs] ( tests/tpch_plans_test.rs ) : executes all TPCH queries and performs assertions over the
63- distributed plans.
64- - [ custom_config_extension.rs] ( tests/custom_config_extension.rs ) : showcases how to propagate custom DataFusion config
65- extensions.
66- - [ custom_extension_codec.rs] ( tests/custom_extension_codec.rs ) : showcases how to propagate custom physical extension
67- codecs.
68- - [ distributed_aggregation.rs] ( tests/distributed_aggregation.rs ) : showcases how to manually place ` ArrowFlightReadExec `
69- nodes in a plan and build a distributed query out of it.
0 commit comments