Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion docs/configuration/index-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ This section describes indexing settings for a given index.
| ------------- | ------------- | ------------- |
| `commit_timeout_secs` | Maximum number of seconds before committing a split since its creation. | `60` |
| `split_num_docs_target` | Target number of docs per split. | `10000000` |
| `merge_policy` | Describes the strategy used to trigger split merge operations (see [Merge policies](#merge-policies) section below). |
| `merge_policy` | Describes the strategy used to trigger split merge operations for logs/traces (see [Merge policies](#merge-policies) section below). |
| `parquet_merge_policy` | Describes the merge policy for Parquet (metrics/sketches) splits (see [Parquet merge policy](#parquet-merge-policy) section below). |
| `parquet_indexing` | Parquet-specific indexing settings: sort schema, window duration (see [Parquet indexing settings](#parquet-indexing-settings) section below). |
| `resources.heap_size` | Indexer heap size per source per index. | `2000000000` |
| `docstore_compression_level` | Level of compression used by zstd for the docstore. Lower values may increase ingest speed, at the cost of index size | `8` |
| `docstore_blocksize` | Size of blocks in the docstore, in bytes. Lower values may improve doc retrieval speed, at the cost of index size | `1000000` |
Expand Down Expand Up @@ -687,6 +689,86 @@ indexing_settings:
type: "no_merge"
```

### Parquet indexing settings

*For indexes using the Parquet indexing pipeline (metrics, sketches).*

These settings control how the Parquet pipeline sorts, windows, and writes incoming data. They affect both ingest-time performance and downstream query/compaction efficiency.

```yaml
version: 0.7
index_id: "my-metrics-index"
# ...
indexing_settings:
parquet_indexing:
sort_fields: "metric_name|service|env|host|timeseries_id|timestamp_secs/V2"
window_duration_secs: 900
```

| Variable | Description | Default value |
| ------------- | ------------- | ------------- |
| `sort_fields` | Sort schema for row ordering in Parquet files (see syntax below). When omitted, the product-type default is used. | `metric_name\|service\|env\|datacenter\|region\|host\|timeseries_id\|timestamp_secs/V2` |
| `window_duration_secs` | Time window duration in seconds for split partitioning. Must evenly divide 3600. Larger values = fewer splits but coarser time pruning. | `900` (15 minutes) |

#### Sort schema syntax

The sort schema uses pipe-delimited column names with a `/V2` version suffix:

```text
column1|column2|...|timestamp_secs/V2
```

**Column types** are inferred from name suffixes:
- `__s` → string (e.g., `custom_tag__s`)
- `__i` → int64 (e.g., `priority__i`)
- Well-known names like `metric_name`, `service`, `env`, `host`, `timestamp_secs`, and `timeseries_id` have built-in type mappings and don't need suffixes.

**Sort direction** defaults to ascending for most columns and descending for timestamp columns. Override with `+` (ascending) or `-` (descending) as a prefix or suffix on the column name:

```text
# Explicit descending timestamp
metric_name|host|-timestamp_secs/V2

# Ascending host (default), descending timestamp (default)
metric_name|host|timestamp_secs/V2
```

**How the sort schema affects behavior:**
- **Query pruning**: queries filtering on leading columns (e.g., `metric_name`) can skip entire splits whose row key ranges don't match.
- **Compression**: grouping similar values together (e.g., all rows for the same metric name) improves columnar compression ratios.
- **Compaction scope**: splits with different sort schemas are never merged together. Changing the sort schema on an existing index creates a new compaction scope — old splits are not re-sorted.

**The `&` marker** (advanced) sets the LSM comparison cutoff: columns after `&` are used for sort order but not for compaction locality decisions. For example, `metric_name|&host|timestamp_secs/V2` sorts by metric_name then host, but only metric_name determines which splits can be merged.

#### Parquet merge policy

*For indexes using the Parquet indexing pipeline (metrics, sketches).*

The Parquet merge policy controls how Parquet splits within a compaction scope (same time window, partition, and sort schema) are merged. It uses a constant write amplification strategy: splits at the same merge level are greedily accumulated until reaching `max_merge_factor` or `target_split_size_bytes`.

```yaml
version: 0.7
index_id: "my-metrics-index"
# ...
indexing_settings:
parquet_merge_policy:
merge_factor: 10
max_merge_factor: 12
max_merge_ops: 4
target_split_size_bytes: 268435456
maturation_period: 48h
max_finalize_merge_operations: 3
```


| Variable | Description | Default value |
| ------------- | ------------- | ------------- |
| `merge_factor` | Minimum number of splits to trigger a merge. | `10` |
| `max_merge_factor` | Maximum number of splits in a single merge operation. | `12` |
| `max_merge_ops` | Maximum number of merges a split can undergo before becoming mature. Bounds total write amplification. | `4` |
| `target_split_size_bytes` | Target size for merged output splits in bytes. Merges trigger when accumulated bytes reach this threshold, even if `merge_factor` is not reached. | `268435456` (256 MiB) |
| `maturation_period` | Duration after creation when a split becomes mature (never merged again). | `48h` |
| `max_finalize_merge_operations` | *(advanced)* Maximum number of merge operations emitted during cold-window finalization at pipeline shutdown. Set to `0` to disable. | `3` |


### Indexer memory usage
Expand Down
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 84 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use siphasher::sip::SipHasher;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::MergePolicyConfig;
use crate::merge_policy_config::{MergePolicyConfig, ParquetMergePolicyConfig};

#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -118,10 +118,91 @@ pub struct IndexingSettings {
pub split_num_docs_target: usize,
#[serde(default)]
pub merge_policy: MergePolicyConfig,
/// Merge policy for Parquet (metrics/sketches) splits. Controls how
/// Parquet splits are compacted within time windows. Only used by
/// indexes that use the Parquet indexing pipeline.
#[serde(default)]
pub parquet_merge_policy: ParquetMergePolicyConfig,
/// Parquet-specific indexing settings (sort schema, window duration,
/// compression). Only used by indexes that use the Parquet pipeline.
#[serde(default)]
pub parquet_indexing: ParquetIndexingConfig,
#[serde(default)]
pub resources: IndexingResources,
}

/// Configuration for the Parquet indexing pipeline (metrics, sketches).
///
/// Controls how incoming data is sorted, windowed, and compressed before
/// writing to Parquet split files. These settings affect both ingest-time
/// performance and downstream query/compaction efficiency.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ParquetIndexingConfig {
/// Sort schema defining the physical sort order of rows in Parquet files.
///
/// Uses Husky-style pipe-delimited syntax with a `/V2` version suffix.
/// Each column is sorted ascending by default; use `+` or `-` prefix/suffix
/// to override. Column types are inferred from well-known suffixes
/// (`__s` = string, `__i` = int64, `_secs` = uint64 timestamp).
///
/// The sort order determines:
/// - **Query pruning**: queries that filter on leading sort columns can
/// skip entire splits whose row key ranges don't match.
/// - **Compression**: columns with good locality (e.g., metric_name first)
/// compress better in Parquet's columnar format.
/// - **Compaction scope**: splits with different sort schemas are never
/// merged together.
///
/// When `None`, the product-type default is used (see below).
///
/// # Default (metrics/sketches)
/// ```text
/// metric_name|service|env|datacenter|region|host|timeseries_id|timestamp_secs/V2
/// ```
///
/// # Examples
/// ```text
/// # Minimal: just metric name and timestamp
/// metric_name|timestamp_secs/V2
///
/// # Custom tags in sort order
/// metric_name|service|cluster|host|timestamp_secs/V2
///
/// # Explicit descending timestamp
/// metric_name|host|-timestamp_secs/V2
/// ```
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sort_fields: Option<String>,

/// Time window duration in seconds for split partitioning.
///
/// Incoming data is partitioned into time windows of this duration.
/// Splits within the same window may be compacted together; splits in
/// different windows are never merged. Must evenly divide 3600 (one hour).
///
/// Larger values produce fewer, larger splits (better for bulk queries)
/// but coarser time-based pruning. Smaller values give finer pruning
/// but more splits to manage.
#[serde(default = "ParquetIndexingConfig::default_window_duration_secs")]
pub window_duration_secs: u32,
}

impl ParquetIndexingConfig {
fn default_window_duration_secs() -> u32 {
900
}
}

impl Default for ParquetIndexingConfig {
fn default() -> Self {
Self {
sort_fields: None,
window_duration_secs: Self::default_window_duration_secs(),
}
}
}

impl IndexingSettings {
pub fn commit_timeout(&self) -> Duration {
Duration::from_secs(self.commit_timeout_secs as u64)
Expand Down Expand Up @@ -160,6 +241,8 @@ impl Default for IndexingSettings {
docstore_compression_level: Self::default_docstore_compression_level(),
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
parquet_merge_policy: ParquetMergePolicyConfig::default(),
parquet_indexing: ParquetIndexingConfig::default(),
resources: IndexingResources::default(),
}
}
Expand Down
15 changes: 10 additions & 5 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub use cluster_config::ClusterConfig;
// See #2048
use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig};
pub use index_config::{
IndexConfig, IndexingResources, IndexingSettings, IngestSettings, RetentionPolicy,
SearchSettings, build_doc_mapper, load_index_config_from_user_config, load_index_config_update,
prepare_doc_mapping_update,
IndexConfig, IndexingResources, IndexingSettings, IngestSettings, ParquetIndexingConfig,
RetentionPolicy, SearchSettings, build_doc_mapper, load_index_config_from_user_config,
load_index_config_update, prepare_doc_mapping_update,
};
pub use quickwit_doc_mapper::DocMapping;
use serde::Serialize;
Expand All @@ -67,7 +67,8 @@ use tracing::warn;
use crate::index_template::IndexTemplateV0_8;
pub use crate::index_template::{IndexTemplate, IndexTemplateId, VersionedIndexTemplate};
use crate::merge_policy_config::{
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, StableLogMergePolicyConfig,
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, ParquetMergePolicyConfig,
StableLogMergePolicyConfig,
};
pub use crate::metastore_config::{
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
Expand Down Expand Up @@ -113,6 +114,8 @@ pub fn disable_ingest_v1() -> bool {
KafkaSourceParams,
KinesisSourceParams,
MergePolicyConfig,
ParquetIndexingConfig,
ParquetMergePolicyConfig,
PubSubSourceParams,
PulsarSourceAuth,
PulsarSourceParams,
Expand Down Expand Up @@ -227,7 +230,9 @@ impl ConfigFormat {
}

pub fn parse<T>(&self, payload: &[u8]) -> anyhow::Result<T>
where T: DeserializeOwned {
where
T: DeserializeOwned,
{
match self {
ConfigFormat::Json => {
let mut json_value: JsonValue =
Expand Down
72 changes: 70 additions & 2 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,74 @@ impl Default for StableLogMergePolicyConfig {
}
}

// --- Parquet merge policy config ---

fn default_target_split_size_bytes() -> u64 {
256 * 1024 * 1024 // 256 MiB
}

fn default_max_finalize_merge_operations() -> usize {
3
}

/// Configuration for the Parquet (metrics/sketches) merge policy.
///
/// Controls how Parquet splits within a compaction scope are merged.
/// Splits at the same `num_merge_ops` level are greedily accumulated
/// until reaching `max_merge_factor` or `target_split_size_bytes`.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ParquetMergePolicyConfig {
/// Minimum number of splits to trigger a merge.
#[serde(default = "default_merge_factor")]
pub merge_factor: usize,
/// Maximum number of splits in a single merge operation.
#[serde(default = "default_max_merge_factor")]
pub max_merge_factor: usize,
/// Maximum number of merges a split can undergo before becoming mature.
/// Bounds total write amplification.
#[serde(default = "default_parquet_max_merge_ops")]
pub max_merge_ops: u32,
/// Target size for merged output splits in bytes. Merges are triggered
/// when accumulated bytes reach this threshold, even if `merge_factor`
/// is not reached.
#[serde(default = "default_target_split_size_bytes")]
pub target_split_size_bytes: u64,
/// Duration after creation when a split becomes mature regardless of
/// size or merge count. Mature splits are never merged.
#[schema(value_type = String)]
#[serde(default = "default_maturation_period")]
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
/// Maximum number of merge operations emitted during cold-window
/// finalization at shutdown. Set to 0 to disable.
#[serde(default = "default_max_finalize_merge_operations")]
#[serde(skip_serializing_if = "is_zero")]
pub max_finalize_merge_operations: usize,
}

fn default_parquet_max_merge_ops() -> u32 {
4
}

impl Default for ParquetMergePolicyConfig {
fn default() -> Self {
Self {
merge_factor: default_merge_factor(),
max_merge_factor: default_max_merge_factor(),
max_merge_ops: default_parquet_max_merge_ops(),
target_split_size_bytes: default_target_split_size_bytes(),
maturation_period: default_maturation_period(),
max_finalize_merge_operations: default_max_finalize_merge_operations(),
}
}
}

fn parse_human_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where D: Deserializer<'de> {
where
D: Deserializer<'de>,
{
let value: String = Deserialize::deserialize(deserializer)?;
let duration = humantime::parse_duration(&value).map_err(|error| {
de::Error::custom(format!(
Expand All @@ -131,7 +197,9 @@ where D: Deserializer<'de> {
}

fn serialize_duration<S>(value: &Duration, s: S) -> Result<S::Ok, S::Error>
where S: Serializer {
where
S: Serializer,
{
let value_str = humantime::format_duration(*value).to_string();
s.serialize_str(&value_str)
}
Expand Down
Loading
Loading