Skip to content

Latest commit

 

History

History
356 lines (290 loc) · 188 KB

File metadata and controls

356 lines (290 loc) · 188 KB

Configuration Settings

DataFusion configurations control various aspects of DataFusion planning and execution

Setting Configuration Options

Programmatically

You can set the options programmatically via the ConfigOptions object. For example, to configure the datafusion.execution.target_partitions using the API:

use datafusion::common::config::ConfigOptions;
let mut config = ConfigOptions::new();
config.execution.target_partitions = 1;

Via Environment Variables

You can also set configuration options via environment variables using ConfigOptions::from_env, for example

DATAFUSION_EXECUTION_TARGET_PARTITIONS=1 ./your_program

Via SQL

You can also set configuration options via SQL using the SET command. For example, to configure datafusion.execution.target_partitions:

SET datafusion.execution.target_partitions = '1';

The following configuration settings are available:

key default description
datafusion.catalog.create_default_catalog_and_schema true Whether the default catalog and schema should be created automatically.
datafusion.catalog.default_catalog datafusion The default catalog name - this impacts what SQL queries use if not specified
datafusion.catalog.default_schema public The default schema name - this impacts what SQL queries use if not specified
datafusion.catalog.information_schema false Should DataFusion provide access to information_schema virtual tables for displaying schema information
datafusion.catalog.location NULL Location scanned to load tables for default schema
datafusion.catalog.format NULL Type of TableProvider to use when loading default schema
datafusion.catalog.has_header true Default value for format.has_header for CREATE EXTERNAL TABLE if not specified explicitly in the statement.
datafusion.catalog.newlines_in_values false Specifies whether newlines in (quoted) CSV values are supported. This is the default value for format.newlines_in_values for CREATE EXTERNAL TABLE if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to true ensures that newlines in values are parsed successfully, which may reduce performance.
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see HashJoinExec for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see HashJoinExec for more details). Density is calculated as: (number of rows) / (max_key - min_key + 1). A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default ListingTableProvider in DataFusion. Defaults to true.
datafusion.execution.target_partitions 0 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system
datafusion.execution.time_zone NULL The default time zone Some functions, e.g. now return timestamps in this time zone
datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded.
datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last size_hint bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed.
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows.
datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of Utf8/Utf8Large with Utf8View, and Binary/BinaryLarge with BinaryView.
datafusion.execution.parquet.binary_as_string false (reading) If true, parquet reader will read columns of Binary/LargeBinary with Utf8, and BinaryView with Utf8View. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead.
datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution.
datafusion.execution.parquet.coerce_int96_tz NULL (reading) Optional timezone applied to INT96 columns when coerce_int96 is set. When Some, INT96 columns coerce to Timestamp(<coerce_int96>, Some(<tz>)) instead of the default Timestamp(<coerce_int96>, None). Spark and other systems write INT96 values as UTC-adjusted instants, so callers that need the resulting Arrow type to be timezone-aware (e.g. for Spark TimestampType semantics) should set this to "UTC". No effect when coerce_int96 is None.
datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files
datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When pushdown_filters is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching.
datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the ArrowWriterOptions::with_skip_arrow_metadata. Refer to https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata
datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting.
datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting
datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes
datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.
datafusion.execution.parquet.created_by datafusion version 53.1.0 (writing) Sets "created by" property
datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length
datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting
datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best effort maximum number of rows in data page
datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files
datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting
datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting
datafusion.execution.parquet.allow_single_file_parallelism true (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_filesn_row_groupsn_columns.
datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When Some, CDC is enabled with the given options; when None (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups.
datafusion.execution.planning_concurrency 0 Fan-out during initial physical planning. This is mostly use to plan UNION children in parallel. Defaults to the number of CPU cores on the system
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of LogicalPlan::Aggregate exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no DiskManager configured).
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_pushdown_buffer_capacity 1073741824 Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits.
datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only RepartitionExec supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. /table/year=2021/month=01/data.parquet).
datafusion.execution.listing_table_factory_infer_partitions true Should a ListingTable created through the ListingTableFactory infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema).
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to true will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future.
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
datafusion.execution.objectstore_writer_buffer_size 10485760 Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point.
datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When enable_ansi_mode is set to true, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - Strict type coercion rules: implicit casts between incompatible types are disallowed. - Standard SQL arithmetic behavior: operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning NULL or adjusted values. - Consistent ANSI behavior for string concatenation, comparisons, and NULL handling. When enable_ansi_mode is false (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, abs() on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return NULL instead of failing. # Default false — ANSI SQL mode is disabled by default.
datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it.
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime.
datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle.
datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase.
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase.
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as SELECT * FROM t ORDER BY timestamp DESC LIMIT 10, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress enable_join_dynamic_filter_pushdown, enable_topk_dynamic_filter_pushdown & enable_aggregate_dynamic_filter_pushdown So if you disable enable_topk_dynamic_filter_pushdown, then enable enable_dynamic_filter_pushdown, the enable_topk_dynamic_filter_pushdown will be overridden.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided target_partitions level
datafusion.optimizer.repartition_file_min_size 1048576 Minimum total file size in bytes for file-group byte-range splitting to fire. Files (or merged file groups) smaller than this stay as one partition. Lower values produce more, smaller partitions — better at filling target_partitions worth of cores when files are modestly sized, at the cost of slightly more per-partition open / metadata-load overhead.
datafusion.optimizer.repartition_joins true Should DataFusion repartition data using the join keys to execute joins in parallel using the provided target_partitions level
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.repartition_file_scans true When set to true, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to true for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to false for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to true for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of target_partitions. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation.
datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions.
datafusion.optimizer.repartition_windows true Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided target_partitions level
datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", would turn into the plan below which performs better in multithreaded environments text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
datafusion.optimizer.subset_repartition_threshold 4 Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting preserve_order to true on RepartitionExec and using SortPreservingMergeExec) When false, DataFusion will maximize plan parallelism using RepartitionExec even if this requires subsequently resorting data using a SortExec.
datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys
datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used.
datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable StatisticsRegistry for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in partition_statistics.
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using hash_join_inlist_pushdown_max_size * target_partitions memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins.
datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond hash_join_inlist_pushdown_max_size to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's max-filter-keys-per-column setting. See: https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces Utf8View to LargeUtf8, and BinaryView to LargeBinary.
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns inexact ordering: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as get_field) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
datafusion.optimizer.enable_unions_to_filter false When set to true, the logical optimizer will rewrite UNION DISTINCT branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases.
datafusion.explain.logical_plan_only false When set to true, the explain statement will only print logical plans
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.explain.show_sizes true When set to true, the explain statement will print the partition sizes
datafusion.explain.show_schema false When set to true, the explain statement will print schema information
datafusion.explain.format indent Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format.
datafusion.explain.tree_maximum_render_width 240 (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit.
datafusion.explain.analyze_level dev Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers.
datafusion.explain.analyze_categories all Which metric categories to include in "EXPLAIN ANALYZE" output. Comma-separated list of: "rows", "bytes", "timing", "uncategorized". Use "none" to show plan structure only, or "all" (default) to show everything. Metrics without a declared category are treated as "uncategorized".
datafusion.sql_parser.parse_float_as_decimal false When set to true, SQL parser will parse float as decimal type
datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
datafusion.sql_parser.enable_options_value_normalization false When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically.
datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
datafusion.sql_parser.support_varchar_with_length true If true, permit lengths for VARCHAR such as VARCHAR(20), but ignore the length. If false, error if a VARCHAR with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits.
datafusion.sql_parser.map_string_types_to_utf8view true If true, string types (VARCHAR, CHAR, Text, and String) are mapped to Utf8View during SQL planning. If false, they are mapped to Utf8. Default is true.
datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. Span) will be collected and recorded in the logical plan nodes.
datafusion.sql_parser.recursion_limit 50 Specifies the recursion depth limit when parsing complex SQL Queries
datafusion.sql_parser.default_null_ordering nulls_max Specifies the default null ordering for query results. There are 4 options: - nulls_max: Nulls appear last in ascending order. - nulls_min: Nulls appear first in ascending order. - nulls_first: Nulls always be first in any order. - nulls_last: Nulls always be last in any order. By default, nulls_max is used to follow Postgres's behavior. postgres rule: https://www.postgresql.org/docs/current/queries-order.html
datafusion.sql_parser.enable_subquery_sort_elimination true When set to true, DataFusion may remove ORDER BY clauses from subqueries or CTEs during SQL planning when their ordering cannot affect the result, such as when no LIMIT or other order-sensitive operator depends on them. Disable this option to preserve explicit subquery ordering in the planned query.
datafusion.format.safe true If set to true any formatting errors will be written to the output instead of being converted into a [std::fmt::Error]
datafusion.format.null Format string for nulls
datafusion.format.date_format %Y-%m-%d Date format for date arrays
datafusion.format.datetime_format %Y-%m-%dT%H:%M:%S%.f Format for DateTime arrays
datafusion.format.timestamp_format %Y-%m-%dT%H:%M:%S%.f Timestamp format for timestamp arrays
datafusion.format.timestamp_tz_format NULL Timestamp format for timestamp with timezone arrays. When None, ISO 8601 format is used.
datafusion.format.time_format %H:%M:%S%.f Time format for time arrays
datafusion.format.duration_format pretty Duration format. Can be either "pretty" or "ISO8601"
datafusion.format.types_info false Show types in visual representation batches

You can also reset configuration options to default settings via SQL using the RESET command. For example, to set and reset datafusion.execution.batch_size:

SET datafusion.execution.batch_size = '10000';

SHOW datafusion.execution.batch_size;
datafusion.execution.batch_size 10000

RESET datafusion.execution.batch_size;

SHOW datafusion.execution.batch_size;
datafusion.execution.batch_size 8192

Runtime Configuration Settings

DataFusion runtime configurations can be set via SQL using the SET command.

For example, to configure datafusion.runtime.memory_limit:

SET datafusion.runtime.memory_limit = '2G';

The following runtime configuration settings are available:

key default description
datafusion.runtime.file_statistics_cache_limit 20M Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.
datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.
datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.
datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.
datafusion.runtime.memory_limit NULL Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.
datafusion.runtime.metadata_cache_limit 50M Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.
datafusion.runtime.temp_directory NULL The path to the temporary file directory.

Tuning Guide

Short Queries

By default DataFusion will attempt to maximize parallelism and use all cores -- For example, if you have 32 cores, each plan will split the data into 32 partitions. However, if your data is small, the overhead of splitting the data to enable parallelization can dominate the actual computation.

You can find out how many cores are being used via the EXPLAIN command and look at the number of partitions in the plan.

The datafusion.optimizer.repartition_file_min_size option controls the minimum file size the ListingTable provider will attempt to repartition. However, this does not apply to user defined data sources and only works when DataFusion has accurate statistics.

If you know your data is small, you can set the datafusion.execution.target_partitions option to a smaller number to reduce the overhead of repartitioning. For very small datasets (e.g. less than 1MB), we recommend setting target_partitions to 1 to avoid repartitioning altogether.

SET datafusion.execution.target_partitions = '1';

Memory-limited Queries

When executing a memory-consuming query under a tight memory limit, DataFusion will spill intermediate results to disk.

When the FairSpillPool is used, memory is divided evenly among partitions. The higher the value of datafusion.execution.target_partitions, the less memory is allocated to each partition, and the out-of-core execution path may trigger more frequently, possibly slowing down execution.

Additionally, while spilling, data is read back in datafusion.execution.batch_size size batches. The larger this value, the fewer spilled sorted runs can be merged. Decreasing this setting can help reduce the number of subsequent spills required.

In conclusion, for queries under a very tight memory limit, it's recommended to set target_partitions and batch_size to smaller values.

-- Query still gets parallelized, but each partition will have more memory to use
SET datafusion.execution.target_partitions = 4;
-- Smaller than the default '8192', while still keep the benefit of vectorized execution
SET datafusion.execution.batch_size = 1024;

Join Queries

Currently Apache Datafusion supports the following join algorithms:

  • Nested Loop Join
  • Sort Merge Join
  • Hash Join
  • Symmetric Hash Join
  • Piecewise Merge Join (experimental)

The physical planner will choose the appropriate algorithm based on the statistics + join condition of the two tables.

Join Algorithm Optimizer Configurations

You can modify join optimization behavior in your queries by setting specific configuration values. Use the following command to update a configuration:

SET datafusion.optimizer.<configuration_name>;

Example

SET datafusion.optimizer.prefer_hash_join = false;

Adjusting the following configuration values influences how the optimizer selects the join algorithm used to execute your SQL query:

Join Optimizer Configurations

Adjusting the following configuration values influences how the optimizer selects the join algorithm used to execute your SQL query.

allow_symmetric_joins_without_pruning (bool, default = true)

Controls whether symmetric hash joins are allowed for unbounded data sources even when their inputs lack ordering or filtering.

  • If disabled, the SymmetricHashJoin operator cannot prune its internal buffers to be produced only at the end of execution.

prefer_hash_join (bool, default = true)

Determines whether the optimizer prefers Hash Join over Sort Merge Join during physical plan selection.

  • true: favors HashJoin for faster execution when sufficient memory is available.
  • false: allows SortMergeJoin to be chosen when more memory-efficient execution is needed.

enable_piecewise_merge_join (bool, default = false)

Enables the experimental Piecewise Merge Join algorithm.

  • When enabled, the physical planner may select PiecewiseMergeJoin if there is exactly one range filter in the join condition.
  • Piecewise Merge Join is faster than Nested Loop Join performance wise for single range filter except for cases where it is joining two large tables (num_rows > 100,000) that are approximately equal in size.