Skip to content

Commit 89f7afd

Browse files
g-talbotclaude
andcommitted
feat(31): PostgreSQL migration 27 + compaction columns in stage/list/publish
Add compaction metadata to the PostgreSQL metastore: Migration 27: - 6 new columns: window_start, window_duration_secs, sort_fields, num_merge_ops, row_keys, zonemap_regexes - Partial index idx_metrics_splits_compaction_scope on (index_uid, sort_fields, window_start) WHERE split_state = 'Published' stage_metrics_splits: - INSERT extended from 15 to 21 bind parameters for compaction columns - ON CONFLICT SET updates all compaction columns list_metrics_splits: - PgMetricsSplit construction includes compaction fields (defaults from JSON) Also fixes pre-existing compilation errors on upstream-10b-parquet-actors: - Missing StageMetricsSplitsRequestExt import - index_id vs index_uid type mismatches in publish/mark/delete - IndexUid binding (to_string() for sqlx) - ListMetricsSplitsResponseExt trait disambiguation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ed6d687 commit 89f7afd

3 files changed

Lines changed: 84 additions & 7 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Reverse Phase 31: Remove compaction metadata columns.
2+
DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope;
3+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes;
4+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys;
5+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops;
6+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields;
7+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs;
8+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Phase 31: Add compaction metadata columns to metrics_splits.
2+
-- These columns support time-windowed compaction planning and execution.
3+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT;
4+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER;
5+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT '';
6+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0;
7+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA;
8+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}';
9+
10+
-- Compaction scope index: supports the compaction planner's primary query pattern
11+
-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple."
12+
CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope
13+
ON metrics_splits (index_uid, sort_fields, window_start)
14+
WHERE split_state = 'Published';

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ use crate::file_backed::MutationOccurred;
7575
use crate::metastore::postgres::model::Shards;
7676
use crate::metastore::postgres::utils::split_maturity_timestamp;
7777
use crate::metastore::{
78-
IndexesMetadataResponseExt, ListMetricsSplitsResponseExt, PublishMetricsSplitsRequestExt,
79-
PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, StageMetricsSplitsRequestExt,
80-
UpdateSourceRequestExt, use_shard_api,
78+
IndexesMetadataResponseExt, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt,
79+
PublishMetricsSplitsRequestExt, PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE,
80+
StageMetricsSplitsRequestExt, UpdateSourceRequestExt, use_shard_api,
8181
};
8282
use crate::{
8383
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt,
@@ -1803,6 +1803,12 @@ impl MetastoreService for PostgresqlMetastore {
18031803
let mut num_rows_list = Vec::with_capacity(splits_metadata.len());
18041804
let mut size_bytes_list = Vec::with_capacity(splits_metadata.len());
18051805
let mut split_metadata_jsons = Vec::with_capacity(splits_metadata.len());
1806+
let mut window_starts: Vec<Option<i64>> = Vec::with_capacity(splits_metadata.len());
1807+
let mut window_duration_secs_list: Vec<Option<i32>> = Vec::with_capacity(splits_metadata.len());
1808+
let mut sort_fields_list: Vec<String> = Vec::with_capacity(splits_metadata.len());
1809+
let mut num_merge_ops_list: Vec<i32> = Vec::with_capacity(splits_metadata.len());
1810+
let mut row_keys_list: Vec<Option<Vec<u8>>> = Vec::with_capacity(splits_metadata.len());
1811+
let mut zonemap_regexes_json_list: Vec<String> = Vec::with_capacity(splits_metadata.len());
18061812

18071813
for metadata in &splits_metadata {
18081814
let insertable =
@@ -1837,6 +1843,16 @@ impl MetastoreService for PostgresqlMetastore {
18371843
num_rows_list.push(insertable.num_rows);
18381844
size_bytes_list.push(insertable.size_bytes);
18391845
split_metadata_jsons.push(insertable.split_metadata_json);
1846+
window_starts.push(insertable.window_start);
1847+
window_duration_secs_list.push(if insertable.window_duration_secs == 0 {
1848+
None
1849+
} else {
1850+
Some(insertable.window_duration_secs)
1851+
});
1852+
sort_fields_list.push(insertable.sort_fields);
1853+
num_merge_ops_list.push(insertable.num_merge_ops);
1854+
row_keys_list.push(insertable.row_keys);
1855+
zonemap_regexes_json_list.push(insertable.zonemap_regexes);
18401856
}
18411857

18421858
info!(
@@ -1863,6 +1879,12 @@ impl MetastoreService for PostgresqlMetastore {
18631879
num_rows,
18641880
size_bytes,
18651881
split_metadata_json,
1882+
window_start,
1883+
window_duration_secs,
1884+
sort_fields,
1885+
num_merge_ops,
1886+
row_keys,
1887+
zonemap_regexes,
18661888
create_timestamp,
18671889
update_timestamp
18681890
)
@@ -1887,6 +1909,12 @@ impl MetastoreService for PostgresqlMetastore {
18871909
num_rows,
18881910
size_bytes,
18891911
split_metadata_json,
1912+
window_start,
1913+
window_duration_secs,
1914+
sort_fields,
1915+
num_merge_ops,
1916+
row_keys,
1917+
zonemap_regexes_json::jsonb,
18901918
(CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),
18911919
(CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
18921920
FROM UNNEST(
@@ -1904,7 +1932,13 @@ impl MetastoreService for PostgresqlMetastore {
19041932
$12::text[],
19051933
$13::bigint[],
19061934
$14::bigint[],
1907-
$15::text[]
1935+
$15::text[],
1936+
$16::bigint[],
1937+
$17::int[],
1938+
$18::text[],
1939+
$19::int[],
1940+
$20::bytea[],
1941+
$21::text[]
19081942
) AS staged(
19091943
split_id,
19101944
split_state,
@@ -1920,7 +1954,13 @@ impl MetastoreService for PostgresqlMetastore {
19201954
high_cardinality_tag_keys_json,
19211955
num_rows,
19221956
size_bytes,
1923-
split_metadata_json
1957+
split_metadata_json,
1958+
window_start,
1959+
window_duration_secs,
1960+
sort_fields,
1961+
num_merge_ops,
1962+
row_keys,
1963+
zonemap_regexes_json
19241964
)
19251965
ON CONFLICT (split_id) DO UPDATE
19261966
SET
@@ -1937,6 +1977,12 @@ impl MetastoreService for PostgresqlMetastore {
19371977
num_rows = EXCLUDED.num_rows,
19381978
size_bytes = EXCLUDED.size_bytes,
19391979
split_metadata_json = EXCLUDED.split_metadata_json,
1980+
window_start = EXCLUDED.window_start,
1981+
window_duration_secs = EXCLUDED.window_duration_secs,
1982+
sort_fields = EXCLUDED.sort_fields,
1983+
num_merge_ops = EXCLUDED.num_merge_ops,
1984+
row_keys = EXCLUDED.row_keys,
1985+
zonemap_regexes = EXCLUDED.zonemap_regexes,
19401986
update_timestamp = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
19411987
WHERE metrics_splits.split_state = 'Staged'
19421988
RETURNING split_id
@@ -1962,6 +2008,12 @@ impl MetastoreService for PostgresqlMetastore {
19622008
.bind(&num_rows_list)
19632009
.bind(&size_bytes_list)
19642010
.bind(&split_metadata_jsons)
2011+
.bind(&window_starts)
2012+
.bind(&window_duration_secs_list)
2013+
.bind(&sort_fields_list)
2014+
.bind(&num_merge_ops_list)
2015+
.bind(&row_keys_list)
2016+
.bind(&zonemap_regexes_json_list)
19652017
.fetch_all(tx.as_mut())
19662018
.await
19672019
.map_err(|sqlx_error| convert_sqlx_err(&index_id_for_err, sqlx_error))
@@ -2312,20 +2364,23 @@ impl MetastoreService for PostgresqlMetastore {
23122364
size_bytes: row.13,
23132365
split_metadata_json: row.14,
23142366
update_timestamp: row.15,
2367+
// Compaction fields are read from the JSON blob via
2368+
// to_metadata() — the SQL columns are only used for
2369+
// filtering and SS-5 consistency checks.
23152370
window_start: None,
23162371
window_duration_secs: 0,
23172372
sort_fields: String::new(),
23182373
num_merge_ops: 0,
23192374
row_keys: None,
2320-
zonemap_regexes: String::new(),
2375+
zonemap_regexes: "{}".to_string(),
23212376
};
23222377

23232378
let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);
23242379
let metadata = pg_split.to_metadata().ok()?;
23252380

23262381
Some(MetricsSplitRecord {
23272382
state,
2328-
update_timestamp: row.15,
2383+
update_timestamp: pg_split.update_timestamp,
23292384
metadata,
23302385
})
23312386
})

0 commit comments

Comments
 (0)