diff --git a/Cargo.lock b/Cargo.lock index bb1b35ee63182..38fedfc019e9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5026,6 +5026,7 @@ dependencies = [ "databend-common-storage", "databend-common-tracing", "databend-common-users", + "databend-common-vector", "databend-enterprise-fail-safe", "databend-enterprise-vacuum-handler", "databend-meta-client 260205.11.0", @@ -6683,6 +6684,7 @@ dependencies = [ "databend-common-functions", "databend-common-io", "databend-common-sql-test-support", + "databend-common-vector", "databend-storages-common-table-meta", "divan", "fastrace", diff --git a/src/query/service/src/physical_plans/physical_column_mutation.rs b/src/query/service/src/physical_plans/physical_column_mutation.rs index eaf80a5c8e8d3..b4c90345ce8cd 100644 --- a/src/query/service/src/physical_plans/physical_column_mutation.rs +++ b/src/query/service/src/physical_plans/physical_column_mutation.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -189,7 +190,8 @@ impl IPhysicalPlan for ColumnMutation { let block_thresholds = table.get_block_thresholds(); let cluster_stats_gen = if matches!(self.mutation_kind, MutationKind::Delete) { - table.get_cluster_stats_gen(builder.ctx.clone(), 0, block_thresholds, None)? + let input_schema = DataSchema::from(table.schema_with_stream()).into(); + table.get_cluster_stats_gen(builder.ctx.clone(), 0, block_thresholds, input_schema)? } else { table.cluster_gen_for_append( builder.ctx.clone(), diff --git a/src/query/service/src/physical_plans/physical_multi_table_insert.rs b/src/query/service/src/physical_plans/physical_multi_table_insert.rs index 116c519e087ef..7f8d1b340da6d 100644 --- a/src/query/service/src/physical_plans/physical_multi_table_insert.rs +++ b/src/query/service/src/physical_plans/physical_multi_table_insert.rs @@ -691,7 +691,7 @@ impl IPhysicalPlan for ChunkAppendData { builder.ctx.clone(), 0, block_thresholds, - Some(schema), + schema, )?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index f752364ea9df5..e37402097604b 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -165,8 +165,9 @@ impl IPhysicalPlan for Mutation { let table = FuseTable::try_from_table(tbl.as_ref())?; let block_thresholds = table.get_block_thresholds(); + let input_schema = DataSchema::from(table.schema_with_stream()).into(); let cluster_stats_gen = - table.get_cluster_stats_gen(builder.ctx.clone(), 0, block_thresholds, None)?; + table.get_cluster_stats_gen(builder.ctx.clone(), 0, block_thresholds, input_schema)?; let max_threads = builder.settings.get_max_threads()? as usize; let io_request_semaphore = Arc::new(Semaphore::new(max_threads)); diff --git a/src/query/service/src/physical_plans/physical_recluster.rs b/src/query/service/src/physical_plans/physical_recluster.rs index 51ead5b0d1aad..a9d3744bb5b73 100644 --- a/src/query/service/src/physical_plans/physical_recluster.rs +++ b/src/query/service/src/physical_plans/physical_recluster.rs @@ -24,9 +24,9 @@ use databend_common_catalog::table::Table; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; -use databend_common_expression::SortColumnDescription; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_meta_app::schema::TableInfo; use databend_common_metrics::storage::metrics_inc_recluster_block_bytes_to_read; @@ -44,6 +44,7 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::operations::TransformSerializeBlock; +use databend_common_storages_fuse::operations::TransformVectorClusterKmeans; use databend_common_storages_fuse::statistics::ClusterStatsGenerator; use databend_storages_common_cache::TempDirManager; use databend_storages_common_table_meta::meta::TableMetaTimestamps; @@ -185,11 +186,12 @@ impl IPhysicalPlan for Recluster { .add_transformer(|| TransformAddStreamColumns::new(stream_ctx.clone())); } + let input_schema = DataSchema::from(table.schema_with_stream()).into(); let cluster_stats_gen = table.get_cluster_stats_gen( builder.ctx.clone(), task.level + 1, block_thresholds, - None, + input_schema, )?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { @@ -203,27 +205,33 @@ impl IPhysicalPlan for Recluster { }); } - // construct output fields - let output_fields = cluster_stats_gen.out_fields.clone(); - let schema = DataSchemaRefExt::create(output_fields); - let sort_descs: Vec<_> = cluster_stats_gen - .cluster_key_index - .iter() - .map(|offset| SortColumnDescription { - offset: *offset, - asc: true, - nulls_first: false, - }) - .collect(); + let settings = builder.ctx.get_settings(); + let max_threads = settings.get_max_threads()? as usize; - // merge sort let (rows_per_block, bytes_per_block) = block_thresholds.calc_rows_for_recluster( task.total_rows, task.total_bytes, task.total_compressed, ); - let settings = builder.ctx.get_settings(); + if let Some(vector_operator) = cluster_stats_gen.vector_operator.clone() { + builder.main_pipeline.try_resize(1)?; + builder.main_pipeline.add_accumulating_transformer(move || { + TransformVectorClusterKmeans::new( + vector_operator.vector_column_input_offset, + vector_operator.info.distance_type, + rows_per_block, + ) + }); + builder.main_pipeline.try_resize(max_threads)?; + } + + // construct output fields + let output_fields = cluster_stats_gen.out_fields.clone(); + let schema = DataSchemaRefExt::create(output_fields); + let sort_descs = cluster_stats_gen.sort_descs(); + + // merge sort let sort_pipeline_builder = SortPipelineBuilder::create( builder.ctx.clone(), schema, @@ -241,7 +249,6 @@ impl IPhysicalPlan for Recluster { let compact_thresholds = block_thresholds .set_rows_per_block(rows_per_block) .set_bytes_per_block(bytes_per_block); - let max_threads = settings.get_max_threads()? as usize; build_ordered_compact_pipeline( &mut builder.main_pipeline, compact_thresholds, diff --git a/src/query/service/src/physical_plans/physical_replace_into.rs b/src/query/service/src/physical_plans/physical_replace_into.rs index 9e502fb467a81..9146a0d731a07 100644 --- a/src/query/service/src/physical_plans/physical_replace_into.rs +++ b/src/query/service/src/physical_plans/physical_replace_into.rs @@ -114,12 +114,8 @@ impl IPhysicalPlan for ReplaceInto { .build_table_by_table_info(&self.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; let schema = DataSchema::from(table.schema()).into(); - let cluster_stats_gen = table.get_cluster_stats_gen( - builder.ctx.clone(), - 0, - self.block_thresholds, - Some(schema), - )?; + let cluster_stats_gen = + table.get_cluster_stats_gen(builder.ctx.clone(), 0, self.block_thresholds, schema)?; // connect to broadcast processor and append transform let serialize_block_transform = TransformSerializeBlock::try_create( diff --git a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs index a3ccb75f23db6..eff08a15cba53 100644 --- a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs +++ b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs @@ -341,6 +341,7 @@ fn build_test_segment_info( spatial_index_size: None, spatial_index_location: None, spatial_stats: None, + vector_stats: None, virtual_block_meta: None, compression: Compression::Lz4, create_on: Some(Utc::now()), diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index deb95b772d20d..08afab6da9d9e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -23,10 +23,14 @@ use databend_common_expression::ColumnRef; use databend_common_expression::DataBlock; use databend_common_expression::Expr; use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_expression::types::DataType; +use databend_common_expression::types::F32; use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::VectorDataType; use databend_common_storages_fuse::FuseBlockPartInfo; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::io::MetaWriter; @@ -34,6 +38,7 @@ use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::operations::ReclusterMode; use databend_common_storages_fuse::operations::ReclusterMutator; use databend_common_storages_fuse::pruning::create_segment_location_vector; +use databend_common_storages_fuse::statistics::VectorClusterInfo; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::statistics::reducers::reduce_block_metas; use databend_query::sessions::TableContext; @@ -43,8 +48,11 @@ use databend_query::test_kits::*; use databend_storages_common_table_meta::meta; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; +use databend_storages_common_table_meta::meta::ColumnStatistics; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::VectorColumnStatistics; +use databend_storages_common_table_meta::meta::VectorDistanceType; use databend_storages_common_table_meta::meta::Versioned; use rand::Rng; use rand::thread_rng; @@ -235,6 +243,132 @@ async fn gen_recluster_segments_by_ranges( Ok(segment_locations) } +fn test_vector_cluster_key_expr() -> Expr { + Expr::ColumnRef(ColumnRef { + span: None, + data_type: DataType::Vector(VectorDataType::Float32(2)), + id: 1, + display_name: "embedding".to_string(), + }) +} + +fn vector_column_stats(centroid: [f32; 2], radius: f32) -> VectorColumnStatistics { + VectorColumnStatistics { + centroid: centroid.into_iter().map(F32::from).collect(), + radius: F32::from(radius), + } +} + +fn vector_recluster_schema() -> TableSchemaRef { + TableSchemaRef::new(TableSchema::new_from_column_ids( + vec![ + TableField::new_from_column_id( + "tenant", + TableDataType::Number(NumberDataType::Int32), + 0, + ), + TableField::new_from_column_id( + "embedding", + TableDataType::Vector(VectorDataType::Float32(2)), + 1, + ), + ], + Default::default(), + 2, + )) +} + +type VectorBlockStatsSpec = (i32, i32, [f32; 2], f32); +type VectorSegmentStatsSpec = Vec; + +async fn gen_recluster_segments_by_vector_stats( + data_accessor: &opendal::Operator, + location_generator: &TableMetaLocationGenerator, + blocks_by_segment: &[VectorSegmentStatsSpec], + scalar_cluster_stats: bool, + row_count: u64, + block_size: u64, + file_size: u64, + thresholds: BlockThresholds, + cluster_key_id: u32, +) -> anyhow::Result> { + let mut segment_locations = Vec::with_capacity(blocks_by_segment.len()); + for blocks_spec in blocks_by_segment { + let mut blocks = Vec::with_capacity(blocks_spec.len()); + for &(tenant_min, tenant_max, centroid, radius) in blocks_spec { + let block_id = Uuid::new_v4().simple().to_string(); + let location = (block_id, DataBlock::VERSION); + let mut col_stats = HashMap::new(); + col_stats.insert( + 0, + ColumnStatistics::new( + Scalar::from(tenant_min), + Scalar::from(tenant_max), + 0, + 4, + Some(1), + ), + ); + + let mut vector_stats = HashMap::new(); + vector_stats.insert( + (1, VectorDistanceType::L2), + vector_column_stats(centroid, radius), + ); + + let mut block = BlockMeta::new( + row_count, + block_size, + file_size, + col_stats, + HashMap::default(), + Some(ClusterStatistics::new( + cluster_key_id, + if scalar_cluster_stats { + vec![Scalar::from(tenant_min)] + } else { + vec![] + }, + if scalar_cluster_stats { + vec![Scalar::from(tenant_max)] + } else { + vec![] + }, + 0, + None, + )), + location, + None, + 0, + None, + None, + None, + None, + None, + None, + None, + None, + meta::Compression::Lz4Raw, + Some(Utc::now()), + ); + block.vector_stats = Some(vector_stats); + blocks.push(Arc::new(block)); + } + + let block_refs = blocks + .iter() + .map(|block| block.as_ref()) + .collect::>(); + let statistics = reduce_block_metas(&block_refs, thresholds, Some(cluster_key_id)); + let segment = SegmentInfo::new(blocks, statistics); + let segment_location = location_generator + .gen_segment_info_location(TestFixture::default_table_meta_timestamps(), false); + segment.write_meta(data_accessor, &segment_location).await?; + segment_locations.push((segment_location, SegmentInfo::VERSION)); + } + Ok(segment_locations) +} + async fn target_select_segments_by_level_with_mode( level_counts: &[(i32, usize)], thresholds: BlockThresholds, @@ -305,6 +439,7 @@ async fn target_select_segment_locations_with_mode( thresholds, cluster_key_id, max_tasks, + None, ); let segment_windows = mutator.select_segments(&compact_segments, max_segments)?; @@ -478,6 +613,7 @@ async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Re thresholds, cluster_key_id, 1, + None, ); let segment_windows = mutator.select_segments(&compact_segments, 8)?; @@ -534,6 +670,7 @@ async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Re thresholds, cluster_key_id, 1, + None, ); let segment_windows = mutator.select_segments(&compact_segments, 3)?; let window_sizes = segment_windows @@ -559,6 +696,211 @@ async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Re Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_recluster_mutator_vector_mixed_key_overlap_selection() -> anyhow::Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + ctx.get_settings().set_max_threads(2)?; + ctx.get_settings().set_recluster_block_size(1000)?; + + let data_accessor = ctx.get_application_level_data_operator()?.operator(); + let location_generator = TableMetaLocationGenerator::new("_prefix".to_owned()); + let cluster_key_id = 0; + let thresholds = BlockThresholds::new(1000, 100, 100, 1); + + let segment_locations = gen_recluster_segments_by_vector_stats( + &data_accessor, + &location_generator, + &[ + vec![(1, 1, [0.0, 0.0], 1.0)], + vec![(1, 1, [0.5, 0.0], 1.0)], + // Vector sphere overlaps with segment 0/1, but scalar range does not. + vec![(2, 2, [0.5, 0.0], 1.0)], + // Scalar range overlaps with segment 0/1, but vector sphere does not. + vec![(1, 1, [10.0, 0.0], 1.0)], + ], + true, + 1000, + 100, + 100, + thresholds, + cluster_key_id, + ) + .await?; + + let schema = vector_recluster_schema(); + let ctx: Arc = ctx.clone(); + let segment_locations = create_segment_location_vector(segment_locations, None); + let compact_segments = FuseTable::segment_pruning( + &ctx, + schema.clone(), + data_accessor.clone(), + &None, + segment_locations, + ) + .await?; + + let vector_cluster_info = VectorClusterInfo { + key_index: 1, + column_id: 1, + column_name: "embedding".to_string(), + distance_type: VectorDistanceType::L2, + }; + let mutator = ReclusterMutator::new( + ctx, + data_accessor, + schema, + vec![test_cluster_key_expr(), test_vector_cluster_key_expr()], + 1.0, + thresholds, + cluster_key_id, + 1, + Some(vector_cluster_info), + ); + + let segment_windows = mutator.select_segments(&compact_segments, 8)?; + let window_index_sets = segment_windows + .iter() + .map(|window| { + window + .iter() + .map(|segment| segment.loc.segment_idx) + .collect::>() + }) + .collect::>(); + assert!(window_index_sets.contains(&HashSet::from([0, 1]))); + assert!( + !window_index_sets + .iter() + .any(|window| window.contains(&0) && window.contains(&2)) + ); + assert!( + !window_index_sets + .iter() + .any(|window| window.contains(&0) && window.contains(&3)) + ); + + let vector_window = segment_windows + .into_iter() + .find(|window| { + window + .iter() + .map(|segment| segment.loc.segment_idx) + .collect::>() + == HashSet::from([0, 1]) + }) + .unwrap(); + let (block_num, parts) = mutator + .target_select(vector_window, ReclusterMode::Normal) + .await?; + assert_eq!(block_num, 2); + assert_eq!(parts.tasks.len(), 1); + assert_eq!(task_part_counts(&parts), vec![2]); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_recluster_mutator_vector_only_overlap_selection() -> anyhow::Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + ctx.get_settings().set_max_threads(2)?; + ctx.get_settings().set_recluster_block_size(1000)?; + + let data_accessor = ctx.get_application_level_data_operator()?.operator(); + let location_generator = TableMetaLocationGenerator::new("_prefix".to_owned()); + let cluster_key_id = 0; + let thresholds = BlockThresholds::new(1000, 100, 100, 1); + + let segment_locations = gen_recluster_segments_by_vector_stats( + &data_accessor, + &location_generator, + &[ + vec![(1, 1, [0.0, 0.0], 1.0)], + vec![(2, 2, [0.5, 0.0], 1.0)], + vec![(3, 3, [10.0, 0.0], 1.0)], + ], + false, + 1000, + 100, + 100, + thresholds, + cluster_key_id, + ) + .await?; + + let schema = vector_recluster_schema(); + let ctx: Arc = ctx.clone(); + let segment_locations = create_segment_location_vector(segment_locations, None); + let compact_segments = FuseTable::segment_pruning( + &ctx, + schema.clone(), + data_accessor.clone(), + &None, + segment_locations, + ) + .await?; + + let vector_cluster_info = VectorClusterInfo { + key_index: 0, + column_id: 1, + column_name: "embedding".to_string(), + distance_type: VectorDistanceType::L2, + }; + let mutator = ReclusterMutator::new( + ctx, + data_accessor, + schema, + vec![test_vector_cluster_key_expr()], + 1.0, + thresholds, + cluster_key_id, + 1, + Some(vector_cluster_info), + ); + + let segment_windows = mutator.select_segments(&compact_segments, 8)?; + let window_index_sets = segment_windows + .iter() + .map(|window| { + window + .iter() + .map(|segment| segment.loc.segment_idx) + .collect::>() + }) + .collect::>(); + assert!(window_index_sets.contains(&HashSet::from([0, 1]))); + assert!( + !window_index_sets + .iter() + .any(|window| window.contains(&0) && window.contains(&2)) + ); + assert!( + !window_index_sets + .iter() + .any(|window| window.contains(&1) && window.contains(&2)) + ); + + let vector_window = segment_windows + .into_iter() + .find(|window| { + window + .iter() + .map(|segment| segment.loc.segment_idx) + .collect::>() + == HashSet::from([0, 1]) + }) + .unwrap(); + let (block_num, parts) = mutator + .target_select(vector_window, ReclusterMode::Normal) + .await?; + assert_eq!(block_num, 2); + assert_eq!(parts.tasks.len(), 1); + assert_eq!(task_part_counts(&parts), vec![2]); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_recluster_mutator_split_tasks_by_parallel_budget() -> anyhow::Result<()> { let fixture = TestFixture::setup().await?; @@ -903,6 +1245,7 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> { threshold, cluster_key_id, max_tasks, + None, )); let segment_windows = mutator.select_segments(&compact_segments, 8)?; // select the blocks with the highest depth. diff --git a/src/query/service/tests/it/storages/fuse/statistics.rs b/src/query/service/tests/it/storages/fuse/statistics.rs index 2622c90b34516..2672d74ce97e2 100644 --- a/src/query/service/tests/it/storages/fuse/statistics.rs +++ b/src/query/service/tests/it/storages/fuse/statistics.rs @@ -587,6 +587,7 @@ async fn test_ft_cluster_stats_with_stats() -> anyhow::Result<()> { 0, block_compactor, vec![], + None, vec![], FunctionContext::default(), ); @@ -629,6 +630,7 @@ async fn test_ft_cluster_stats_with_stats() -> anyhow::Result<()> { 0, block_compactor, operators, + None, vec![], FunctionContext::default(), ); @@ -647,6 +649,7 @@ async fn test_ft_cluster_stats_with_stats() -> anyhow::Result<()> { 0, block_compactor, vec![], + None, vec![], FunctionContext::default(), ); diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 2c8a7db056090..645a8d6bc664c 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -2354,6 +2354,7 @@ impl Binder { sql_dialect: self.dialect, }; let mut cluster_keys = Vec::with_capacity(expr_len); + let mut vector_cluster_key_num = 0; for cluster_expr in cluster_exprs.iter() { let (cluster_key, _) = scalar_binder.bind(cluster_expr)?; if cluster_key.used_columns().len() != 1 || !cluster_key.evaluable() { @@ -2372,12 +2373,27 @@ impl Binder { } let data_type = expr.data_type(); - if !Self::valid_cluster_key_type(data_type) { + let (is_valid_type, is_vector_type) = Self::valid_cluster_key_type(data_type); + if !is_valid_type { return Err(ErrorCode::InvalidClusterKeys(format!( "Unsupported data type '{}' for cluster by expression `{:#}`", data_type, cluster_expr ))); } + if is_vector_type { + if matches!(cluster_type, AstClusterType::Hilbert) { + return Err(ErrorCode::InvalidClusterKeys(format!( + "Unsupported data type '{}' for hilbert cluster by expression `{:#}`", + data_type, cluster_expr + ))); + } + vector_cluster_key_num += 1; + if vector_cluster_key_num > 1 { + return Err(ErrorCode::InvalidClusterKeys( + "Only one vector column is supported in cluster by", + )); + } + } let mut cluster_expr = cluster_expr.clone(); cluster_expr.drive_mut(&mut normalizer); @@ -2387,9 +2403,9 @@ impl Binder { Ok(cluster_keys) } - pub(crate) fn valid_cluster_key_type(data_type: &DataType) -> bool { + pub(crate) fn valid_cluster_key_type(data_type: &DataType) -> (bool, bool) { let inner_type = data_type.remove_nullable(); - matches!( + let is_valid_type = matches!( inner_type, DataType::Number(_) | DataType::String @@ -2397,7 +2413,10 @@ impl Binder { | DataType::Date | DataType::Boolean | DataType::Decimal(_) - ) + | DataType::Vector(_) + ); + let is_vector_type = matches!(inner_type, DataType::Vector(_)); + (is_valid_type, is_vector_type) } fn is_column_not_null(&self) -> bool { diff --git a/src/query/sql/src/planner/expression/expression_parser.rs b/src/query/sql/src/planner/expression/expression_parser.rs index 3180bfa70fe47..30a1340dbcf7e 100644 --- a/src/query/sql/src/planner/expression/expression_parser.rs +++ b/src/query/sql/src/planner/expression/expression_parser.rs @@ -476,6 +476,7 @@ pub fn analyze_cluster_keys( }; let mut exprs = Vec::with_capacity(ast_exprs.len()); let mut cluster_keys = Vec::with_capacity(exprs.len()); + let mut vector_cluster_key_num = 0; for ast in ast_exprs { let (scalar, _) = *type_checker.resolve(&ast)?; if scalar.used_columns().len() != 1 || !scalar.evaluable() { @@ -494,12 +495,21 @@ pub fn analyze_cluster_keys( } let data_type = expr.data_type(); - if !Binder::valid_cluster_key_type(data_type) { + let (is_valid_type, is_vector_type) = Binder::valid_cluster_key_type(data_type); + if !is_valid_type { return Err(ErrorCode::InvalidClusterKeys(format!( "Unsupported data type '{}' for cluster by expression `{:#}`", data_type, ast ))); } + if is_vector_type { + vector_cluster_key_num += 1; + if vector_cluster_key_num > 1 { + return Err(ErrorCode::InvalidClusterKeys( + "Only one vector column is supported in cluster by", + )); + } + } exprs.push(expr); diff --git a/src/query/storages/common/cache/src/manager.rs b/src/query/storages/common/cache/src/manager.rs index 251b1437380e7..39914f449644c 100644 --- a/src/query/storages/common/cache/src/manager.rs +++ b/src/query/storages/common/cache/src/manager.rs @@ -1308,6 +1308,7 @@ mod tests { spatial_index_location: None, spatial_index_size: None, spatial_stats: None, + vector_stats: None, virtual_block_meta: None, compression: Compression::Lz4, create_on: None, diff --git a/src/query/storages/common/index/Cargo.toml b/src/query/storages/common/index/Cargo.toml index 0bbd550d60f21..c5773cbcb8be8 100644 --- a/src/query/storages/common/index/Cargo.toml +++ b/src/query/storages/common/index/Cargo.toml @@ -18,6 +18,7 @@ databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-functions = { workspace = true } databend-common-io = { workspace = true } +databend-common-vector = { workspace = true } databend-storages-common-table-meta = { workspace = true } anyerror = { workspace = true } diff --git a/src/query/storages/common/index/src/lib.rs b/src/query/storages/common/index/src/lib.rs index 37434d9eb49e3..785279e3a6e49 100644 --- a/src/query/storages/common/index/src/lib.rs +++ b/src/query/storages/common/index/src/lib.rs @@ -33,6 +33,7 @@ mod page_index; mod range_index; mod spatial_index; mod spatial_predicate; +mod vector; mod virtual_column; pub use bloom_index::BloomIndex; @@ -74,6 +75,16 @@ pub use spatial_predicate::SpatialPredicate; pub use spatial_predicate::SpatialPredicateOp; pub use spatial_predicate::SpatialPredicateResult; pub use spatial_predicate::collect_spatial_predicates; +pub use vector::normalize_vector; +pub use vector::preprocess_vector; +pub use vector::vector_angular_distance; +pub use vector::vector_distance; +pub use vector::vector_distance_lower_bound; +pub use vector::vector_distance_type_from_index_option; +pub use vector::vector_distance_type_name; +pub use vector::vector_spheres_overlap; +pub use vector::vector_stat_distance; +pub use vector::vector_statistics_distance_type; pub use virtual_column::VIRTUAL_COLUMN_NODES_KEY; pub use virtual_column::VIRTUAL_COLUMN_SHARED_COLUMN_IDS_KEY; pub use virtual_column::VIRTUAL_COLUMN_STRING_TABLE_JSON_KEY; diff --git a/src/query/storages/common/index/src/page_index.rs b/src/query/storages/common/index/src/page_index.rs index fa599f8ac2912..6a7af47a41656 100644 --- a/src/query/storages/common/index/src/page_index.rs +++ b/src/query/storages/common/index/src/page_index.rs @@ -108,6 +108,10 @@ impl PageIndex { return Ok((true, None)); } + if min_values.is_empty() { + return Ok((true, None)); + } + let pages = min_values.len(); let mut start = 0; let mut end = pages - 1; diff --git a/src/query/storages/common/index/src/range_index.rs b/src/query/storages/common/index/src/range_index.rs index 5713dcf956c55..441919510fd69 100644 --- a/src/query/storages/common/index/src/range_index.rs +++ b/src/query/storages/common/index/src/range_index.rs @@ -296,6 +296,7 @@ pub fn statistics_to_domain(mut stats: Vec<&ColumnStatistics>, data_type: &DataT let inner_domain = statistics_to_domain(stats, inner_ty); Domain::Map(Some(Box::new(inner_domain))) } + DataType::Vector(_) => Domain::full(data_type), _ => { let stat = stats[0]; let min = stat.min(); diff --git a/src/query/storages/common/index/src/vector.rs b/src/query/storages/common/index/src/vector.rs new file mode 100644 index 0000000000000..5c7dea8c60fba --- /dev/null +++ b/src/query/storages/common/index/src/vector.rs @@ -0,0 +1,141 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::types::F32; +use databend_common_vector::cosine_distance; +use databend_common_vector::l1_distance; +use databend_common_vector::l2_distance; +use databend_common_vector::vector_norm; +use databend_storages_common_table_meta::meta::VectorColumnStatistics; +use databend_storages_common_table_meta::meta::VectorDistanceType; + +use crate::DistanceType; + +pub fn vector_distance( + left: &[f32], + right: &[f32], + distance_type: VectorDistanceType, +) -> Result { + match distance_type { + VectorDistanceType::L1 => l1_distance(left, right), + VectorDistanceType::L2 => l2_distance(left, right), + VectorDistanceType::Dot => { + let distance = cosine_distance(left, right)?; + if distance.is_finite() { + Ok(distance) + } else { + Ok(1.0) + } + } + } +} + +pub fn vector_spheres_overlap( + left: &VectorColumnStatistics, + right: &VectorColumnStatistics, + distance_type: VectorDistanceType, +) -> Result { + let left_centroid = vector_centroid_values(left); + let right_centroid = vector_centroid_values(right); + let distance = vector_stat_distance(&left_centroid, &right_centroid, distance_type)?; + + Ok(distance <= left.radius.0 + right.radius.0) +} + +pub fn vector_distance_lower_bound( + query: &[f32], + stats: &VectorColumnStatistics, + distance_type: VectorDistanceType, +) -> Result { + let centroid = vector_centroid_values(stats); + let distance = vector_stat_distance(query, ¢roid, distance_type)?; + let lower_bound = (distance - stats.radius.0).max(0.0); + if matches!(distance_type, VectorDistanceType::Dot) { + return Ok(1.0 - lower_bound.cos()); + } + + Ok(lower_bound) +} + +pub fn vector_stat_distance( + left: &[f32], + right: &[f32], + distance_type: VectorDistanceType, +) -> Result { + if matches!(distance_type, VectorDistanceType::Dot) { + return vector_angular_distance(left, right); + } + + vector_distance(left, right, distance_type) +} + +pub fn vector_angular_distance(left: &[f32], right: &[f32]) -> Result { + // Cosine distance (1 - cos(theta)) is not a metric, so it cannot be used + // directly with the triangle inequality for safe block pruning. Convert it + // to angular distance theta, which is a metric on normalized vectors. + let distance = cosine_distance(left, right)?; + if !distance.is_finite() { + return Ok(std::f32::consts::FRAC_PI_2); + } + + Ok((1.0 - distance).clamp(-1.0, 1.0).acos()) +} + +pub fn preprocess_vector(values: &[F32], distance_type: DistanceType) -> Vec { + let mut vector = values.iter().map(|value| value.0).collect::>(); + if matches!(distance_type, DistanceType::Dot) { + normalize_vector(&mut vector); + } + vector +} + +pub fn normalize_vector(vector: &mut [f32]) { + let norm = vector_norm(vector); + if norm <= f32::EPSILON { + return; + } + for value in vector { + *value /= norm; + } +} + +pub fn vector_statistics_distance_type(distance_type: DistanceType) -> VectorDistanceType { + match distance_type { + DistanceType::L1 => VectorDistanceType::L1, + DistanceType::L2 => VectorDistanceType::L2, + DistanceType::Dot => VectorDistanceType::Dot, + } +} + +pub fn vector_distance_type_from_index_option(distance_type: &str) -> Option { + match distance_type.trim() { + "cosine" | "dot" => Some(VectorDistanceType::Dot), + "l1" => Some(VectorDistanceType::L1), + "l2" => Some(VectorDistanceType::L2), + _ => None, + } +} + +pub fn vector_distance_type_name(distance_type: VectorDistanceType) -> &'static str { + match distance_type { + VectorDistanceType::L1 => "l1", + VectorDistanceType::L2 => "l2", + VectorDistanceType::Dot => "dot", + } +} + +fn vector_centroid_values(stats: &VectorColumnStatistics) -> Vec { + stats.centroid.iter().map(|value| value.0).collect() +} diff --git a/src/query/storages/common/pruner/src/page_pruner.rs b/src/query/storages/common/pruner/src/page_pruner.rs index 2b93def4e8275..4e0e92ebd431a 100644 --- a/src/query/storages/common/pruner/src/page_pruner.rs +++ b/src/query/storages/common/pruner/src/page_pruner.rs @@ -20,6 +20,7 @@ use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::RemoteExpr; use databend_common_expression::TableSchemaRef; +use databend_common_expression::types::DataType; use databend_storages_common_index::PageIndex; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::ClusterStatistics; @@ -72,12 +73,25 @@ impl PagePrunerCreator { cluster_key_meta: Option, cluster_keys: Vec>, ) -> Result> { - if cluster_key_meta.is_none() - || cluster_keys.is_empty() - || cluster_keys - .iter() - .any(|expr| !matches!(expr, RemoteExpr::ColumnRef { .. })) - { + if cluster_key_meta.is_none() || cluster_keys.is_empty() { + return Ok(Arc::new(KeepTrue)); + } + + // ClusterStatistics min/max/pages are stored as scalar-only tuples. + // Drop vector keys here so PageIndex fields stay aligned with those tuples; + // vector pruning uses BlockMeta.vector_stats instead of page stats. + let mut scalar_cluster_keys = Vec::with_capacity(cluster_keys.len()); + for expr in cluster_keys { + match expr { + RemoteExpr::ColumnRef { id, data_type, .. } => { + if !matches!(data_type.remove_nullable(), DataType::Vector(_)) { + scalar_cluster_keys.push(id.to_string()); + } + } + _ => return Ok(Arc::new(KeepTrue)), + } + } + if scalar_cluster_keys.is_empty() { return Ok(Arc::new(KeepTrue)); } @@ -85,18 +99,10 @@ impl PagePrunerCreator { Ok(match filter_expr { Some(expr) => { - let cluster_keys = cluster_keys - .iter() - .map(|expr| match expr { - RemoteExpr::ColumnRef { id, .. } => id.to_string(), - _ => unreachable!(), - }) - .collect::>(); - let page_filter = PageIndex::try_create( func_ctx, cluster_key_meta.0, - cluster_keys, + scalar_cluster_keys, expr, schema.clone(), )?; diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index 7e3b608338774..4e0719702e9e9 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -24,6 +24,8 @@ pub use v2::ExtendedBlockMeta; pub use v2::SegmentStatistics; pub use v2::SpatialStatistics; pub use v2::Statistics; +pub use v2::VectorColumnStatistics; +pub use v2::VectorDistanceType; pub use v2::VirtualBlockMeta; pub use v2::VirtualColumnMeta; pub use v4::CompactSegmentInfo; diff --git a/src/query/storages/common/table_meta/src/meta/statistics.rs b/src/query/storages/common/table_meta/src/meta/statistics.rs index a4079e4bc3fec..afdaf36a36b9b 100644 --- a/src/query/storages/common/table_meta/src/meta/statistics.rs +++ b/src/query/storages/common/table_meta/src/meta/statistics.rs @@ -27,6 +27,8 @@ use uuid::Uuid; use crate::meta::ColumnStatistics; use crate::meta::SegmentStatistics; use crate::meta::SpatialStatistics; +use crate::meta::VectorColumnStatistics; +use crate::meta::VectorDistanceType; use crate::meta::format::compress; use crate::meta::format::encode; use crate::meta::format::read_and_deserialize; @@ -37,6 +39,8 @@ pub type Location = (String, FormatVersion); pub type ClusterKey = (u32, String); pub type StatisticsOfColumns = HashMap; pub type StatisticsOfSpatialColumns = HashMap; +pub type StatisticsOfVectorColumns = + HashMap<(ColumnId, VectorDistanceType), VectorColumnStatistics>; pub type BlockHLL = HashMap; pub type RawBlockHLL = Vec; diff --git a/src/query/storages/common/table_meta/src/meta/v2/mod.rs b/src/query/storages/common/table_meta/src/meta/v2/mod.rs index 2475c054891df..8db7b00c17bf3 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/mod.rs @@ -33,4 +33,6 @@ pub use statistics::ClusterStatistics; pub use statistics::ColumnStatistics; pub use statistics::SpatialStatistics; pub use statistics::Statistics; +pub use statistics::VectorColumnStatistics; +pub use statistics::VectorDistanceType; pub use table_snapshot_statistics::TableSnapshotStatistics; diff --git a/src/query/storages/common/table_meta/src/meta/v2/segment.rs b/src/query/storages/common/table_meta/src/meta/v2/segment.rs index 6999b3d0e9ac4..85c0acd262c4b 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/segment.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/segment.rs @@ -39,6 +39,7 @@ use crate::meta::FormatVersion; use crate::meta::Location; use crate::meta::SpatialStatistics; use crate::meta::Statistics; +use crate::meta::StatisticsOfVectorColumns; use crate::meta::Versioned; use crate::meta::v0; use crate::meta::v1; @@ -194,6 +195,7 @@ pub struct BlockMeta { pub spatial_index_size: Option, pub spatial_index_location: Option, pub spatial_stats: Option>, + pub vector_stats: Option, /// The block meta of virtual columns. pub virtual_block_meta: Option, pub compression: Compression, @@ -242,6 +244,7 @@ impl BlockMeta { spatial_index_size, spatial_index_location, spatial_stats, + vector_stats: None, virtual_block_meta, compression, create_on, @@ -380,6 +383,7 @@ impl BlockMeta { spatial_index_size: None, spatial_index_location: None, spatial_stats: None, + vector_stats: None, virtual_block_meta: None, create_on: None, ngram_filter_index_size: None, @@ -411,6 +415,7 @@ impl BlockMeta { spatial_index_size: None, spatial_index_location: None, spatial_stats: None, + vector_stats: None, virtual_block_meta: None, create_on: None, ngram_filter_index_size: None, diff --git a/src/query/storages/common/table_meta/src/meta/v2/statistics.rs b/src/query/storages/common/table_meta/src/meta/v2/statistics.rs index 5ae0a4454bdc8..cfe2de630bf5a 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/statistics.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/statistics.rs @@ -24,6 +24,7 @@ use databend_common_expression::TableField; use databend_common_expression::converts::datavalues::from_scalar; use databend_common_expression::converts::meta::IndexScalar; use databend_common_expression::types::DataType; +use databend_common_expression::types::F32; use databend_common_frozen_api::FrozenAPI; use log::info; use serde::de::Error; @@ -89,6 +90,31 @@ pub struct SpatialStatistics { pub is_valid: bool, } +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + FrozenAPI, +)] +pub enum VectorDistanceType { + L1, + L2, + Dot, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, FrozenAPI)] +pub struct VectorColumnStatistics { + pub centroid: Vec, + pub radius: F32, +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq, Default, FrozenAPI)] pub struct AdditionalStatsMeta { /// The size of the stats data in bytes. diff --git a/src/query/storages/common/table_meta/src/meta/v3/frozen/block_meta.rs b/src/query/storages/common/table_meta/src/meta/v3/frozen/block_meta.rs index f75701cfdccb2..f73a26d3a8b5f 100644 --- a/src/query/storages/common/table_meta/src/meta/v3/frozen/block_meta.rs +++ b/src/query/storages/common/table_meta/src/meta/v3/frozen/block_meta.rs @@ -68,6 +68,7 @@ impl From for crate::meta::BlockMeta { spatial_index_size: None, spatial_index_location: None, spatial_stats: None, + vector_stats: None, virtual_block_meta: None, compression: value.compression.into(), create_on: None, diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 0c6d44822b9f0..93609f06e4a7a 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -27,6 +27,7 @@ databend-common-statistics = { workspace = true } databend-common-storage = { workspace = true } databend-common-tracing = { workspace = true } databend-common-users = { workspace = true } +databend-common-vector = { workspace = true } databend-enterprise-fail-safe = { workspace = true } databend-enterprise-vacuum-handler = { workspace = true } databend-meta-client = { workspace = true } diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index d79f51815559a..3670422b22444 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -199,14 +199,19 @@ impl BlockBuilder { )?; inverted_index_states.push(inverted_index_state); } - let vector_index_state = if let Some(ref vector_index_builder) = self.vector_index_builder { + let (vector_index_state, vector_stats) = if let Some(ref vector_index_builder) = + self.vector_index_builder + { let vector_index_location = self.meta_locations.block_vector_index_location(); let mut vector_index_builder = vector_index_builder.clone(); vector_index_builder.add_block(&data_block)?; - let vector_index_state = vector_index_builder.finalize(&vector_index_location)?; - Some(vector_index_state) + let vector_index_state = vector_index_builder.finalize_block(&vector_index_location)?; + ( + vector_index_state.index_state, + vector_index_state.vector_stats, + ) } else { - None + (None, None) }; let (spatial_index_state, spatial_stats) = @@ -277,6 +282,7 @@ impl BlockBuilder { spatial_index_size: spatial_index_state.as_ref().map(|v| v.size), spatial_index_location: spatial_index_state.as_ref().map(|v| v.location.clone()), spatial_stats, + vector_stats, compression: self.write_settings.table_compression.into(), inverted_index_size, virtual_block_meta: None, diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 1bed3be29a1dc..f3703efef6fa4 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -357,15 +357,20 @@ impl StreamBlockBuilder { } else { None }; - let vector_index_state = - if let Some(ref mut vector_index_builder) = self.vector_index_builder { - let vector_index_location = - self.properties.meta_locations.block_vector_index_location(); - let vector_index_state = vector_index_builder.finalize(&vector_index_location)?; - Some(vector_index_state) - } else { - None - }; + let (vector_index_state, vector_stats) = if let Some(ref mut vector_index_builder) = + self.vector_index_builder + { + let vector_index_location = + self.properties.meta_locations.block_vector_index_location(); + let vector_index_state = vector_index_builder.finalize_block(&vector_index_location)?; + ( + vector_index_state.index_state, + vector_index_state.vector_stats, + ) + } else { + (None, None) + }; + let vector_index_size = vector_index_state.as_ref().map(|v| v.size); let vector_index_location = vector_index_state.as_ref().map(|v| v.location.clone()); @@ -417,6 +422,7 @@ impl StreamBlockBuilder { spatial_index_size, spatial_index_location, spatial_stats, + vector_stats, create_on: Some(Utc::now()), ngram_filter_index_size: bloom_index_state .as_ref() diff --git a/src/query/storages/fuse/src/io/write/stream/cluster_statistics.rs b/src/query/storages/fuse/src/io/write/stream/cluster_statistics.rs index e51670c66269e..fbf696895d50a 100644 --- a/src/query/storages/fuse/src/io/write/stream/cluster_statistics.rs +++ b/src/query/storages/fuse/src/io/write/stream/cluster_statistics.rs @@ -18,18 +18,14 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::Column; -use databend_common_expression::ColumnRef; use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; -use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; -use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_functions::aggregates::eval_aggr; use databend_common_sql::evaluator::BlockOperator; use databend_storages_common_table_meta::meta::ClusterStatistics; -use databend_storages_common_table_meta::table::ClusterType; use crate::FuseTable; @@ -37,6 +33,7 @@ use crate::FuseTable; pub struct ClusterStatisticsBuilder { cluster_key_id: u32, cluster_key_index: Vec, + vector_cluster_id_offset: Option, extra_key_num: usize, operators: Vec, @@ -49,49 +46,35 @@ impl ClusterStatisticsBuilder { ctx: Arc, source_schema: &TableSchemaRef, ) -> Result> { - let cluster_type = table.cluster_type(); - if cluster_type.is_none_or(|v| v == ClusterType::Hilbert) { - return Ok(Default::default()); - } - let input_schema: Arc = DataSchema::from(source_schema).into(); - let input_field_len = input_schema.fields.len(); - - let cluster_keys = table.linear_cluster_keys(ctx.clone()); - let mut cluster_key_index = Vec::with_capacity(cluster_keys.len()); - let mut extra_key_num = 0; - - let mut exprs = Vec::with_capacity(cluster_keys.len()); - for remote_expr in &cluster_keys { - let expr = remote_expr - .as_expr(&BUILTIN_FUNCTIONS) - .project_column_ref(|name| input_schema.index_of(name))?; - let index = match &expr { - Expr::ColumnRef(ColumnRef { id, .. }) => *id, - _ => { - exprs.push(expr); - let offset = input_field_len + extra_key_num; - extra_key_num += 1; - offset - } - }; - cluster_key_index.push(index); - } - let operators = if exprs.is_empty() { - vec![] - } else { - vec![BlockOperator::Map { - exprs, - projections: None, - }] - }; + let cluster_stats_gen = table.get_cluster_stats_gen( + ctx.clone(), + 0, + table.get_block_thresholds(), + input_schema, + )?; + let vector_cluster_id_offset = cluster_stats_gen + .vector_operator + .as_ref() + .map(|vector_operator| vector_operator.vector_cluster_id_offset); + let extra_key_num = cluster_stats_gen.operator_extra_key_num(); + let cluster_key_index = cluster_stats_gen + .cluster_key_index + .into_iter() + .filter(|index| Some(*index) != vector_cluster_id_offset) + .collect::>(); + + if cluster_key_index.is_empty() && vector_cluster_id_offset.is_none() { + return Ok(Default::default()); + } Ok(Arc::new(Self { cluster_key_id: table.cluster_key_id().unwrap(), cluster_key_index, + vector_cluster_id_offset, extra_key_num, - func_ctx: ctx.get_function_context()?, - operators, + func_ctx: cluster_stats_gen.func_ctx, + operators: cluster_stats_gen.operators, })) } } @@ -115,7 +98,7 @@ impl ClusterStatisticsState { } pub fn add_block(&mut self, input: DataBlock) -> Result { - if self.builder.cluster_key_index.is_empty() { + if !self.has_cluster_key() { return Ok(input); } @@ -125,28 +108,19 @@ impl ClusterStatisticsState { .operators .iter() .try_fold(input, |input, op| op.execute(&self.builder.func_ctx, input))?; - let cols = self - .builder - .cluster_key_index - .iter() - .map(|&i| block.get_by_offset(i).to_column()) - .collect(); - let entries = [Column::Tuple(cols).into()]; - let (min, _) = eval_aggr("min", vec![], &entries, num_rows, vec![])?; - let (max, _) = eval_aggr("max", vec![], &entries, num_rows, vec![])?; - assert_eq!(min.len(), 1); - assert_eq!(max.len(), 1); - self.mins.push(min.index(0).unwrap().to_owned()); - self.maxs.push(max.index(0).unwrap().to_owned()); + + let (min, max) = self.scalar_cluster_min_max(&block, num_rows)?; + self.mins.push(min); + self.maxs.push(max); + block.pop_columns(self.builder.extra_key_num); Ok(block) } pub fn finalize(self, perfect: bool) -> Result> { - if self.builder.cluster_key_index.is_empty() { + if !self.has_cluster_key() { return Ok(None); } - let min = self .mins .into_iter() @@ -164,7 +138,7 @@ impl ClusterStatisticsState { .unwrap() .clone(); - let level = if min == max && perfect { + let level = if self.builder.vector_cluster_id_offset.is_none() && min == max && perfect { -1 } else { self.level @@ -178,4 +152,34 @@ impl ClusterStatisticsState { pages: None, })) } + + fn has_cluster_key(&self) -> bool { + !self.builder.cluster_key_index.is_empty() + || self.builder.vector_cluster_id_offset.is_some() + } + + fn scalar_cluster_min_max( + &self, + block: &DataBlock, + num_rows: usize, + ) -> Result<(Scalar, Scalar)> { + if self.builder.cluster_key_index.is_empty() { + return Ok((Scalar::Tuple(vec![]), Scalar::Tuple(vec![]))); + } + let cols = self + .builder + .cluster_key_index + .iter() + .map(|&i| block.get_by_offset(i).to_column()) + .collect(); + let entries = [Column::Tuple(cols).into()]; + let (min, _) = eval_aggr("min", vec![], &entries, num_rows, vec![])?; + let (max, _) = eval_aggr("max", vec![], &entries, num_rows, vec![])?; + assert_eq!(min.len(), 1); + assert_eq!(max.len(), 1); + Ok(( + min.index(0).unwrap().to_owned(), + max.index(0).unwrap().to_owned(), + )) + } } diff --git a/src/query/storages/fuse/src/io/write/vector_index_writer.rs b/src/query/storages/fuse/src/io/write/vector_index_writer.rs index ca372d66c2c2d..2d9a59b006ce6 100644 --- a/src/query/storages/fuse/src/io/write/vector_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/vector_index_writer.rs @@ -26,6 +26,8 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; +use databend_common_expression::types::F32; +use databend_common_expression::types::VectorColumn; use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; use databend_common_meta_app::schema::TableIndex; use databend_common_meta_app::schema::TableIndexType; @@ -33,9 +35,15 @@ use databend_common_metrics::storage::metrics_inc_block_vector_index_generate_mi use databend_storages_common_blocks::blocks_to_parquet; use databend_storages_common_index::DistanceType; use databend_storages_common_index::HNSWIndex; +use databend_storages_common_index::normalize_vector; +use databend_storages_common_index::preprocess_vector; +use databend_storages_common_index::vector_stat_distance; +use databend_storages_common_index::vector_statistics_distance_type; use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SingleColumnMeta; +use databend_storages_common_table_meta::meta::StatisticsOfVectorColumns; +use databend_storages_common_table_meta::meta::VectorColumnStatistics; use databend_storages_common_table_meta::table::TableCompression; use log::debug; use log::info; @@ -61,19 +69,25 @@ struct VectorIndexParam { m: usize, ef_construct: usize, distances: Vec, + field_offsets: Vec<(usize, ColumnId)>, } #[derive(Clone)] pub struct VectorIndexBuilder { // Parameters for each vector index index_params: Vec, - field_offsets: Vec>, field_offsets_set: HashSet, + statistics_params: BTreeMap, // Collected vector columns columns: BTreeMap>, } +pub(crate) struct VectorIndexBuildState { + pub(crate) index_state: Option, + pub(crate) vector_stats: Option, +} + impl VectorIndexBuilder { pub fn try_create( table_indexes: &BTreeMap, @@ -81,8 +95,8 @@ impl VectorIndexBuilder { is_sync: bool, ) -> Option { let mut index_params = Vec::with_capacity(table_indexes.len()); - let mut field_offsets = Vec::with_capacity(table_indexes.len()); let mut field_offsets_set = HashSet::new(); + let mut statistics_params = BTreeMap::new(); for index in table_indexes.values() { if !matches!(index.index_type, TableIndexType::Vector) { @@ -109,10 +123,6 @@ impl VectorIndexBuilder { ); continue; } - for (offset, _) in &offsets { - field_offsets_set.insert(*offset); - } - field_offsets.push(offsets); // Parse index parameters let m = match index.options.get("m") { @@ -148,6 +158,16 @@ impl VectorIndexBuilder { ); continue; } + for (offset, column_id) in &offsets { + field_offsets_set.insert(*offset); + // Vector statistics currently use only the first configured distance type + // to avoid scanning the same vectors multiple times during block writing. + // If stat-based pruning needs full coverage for multi-distance indexes, + // extend statistics_params to keep all distance types per column. + statistics_params + .entry(*offset) + .or_insert((*column_id, distances[0])); + } info!( "Added vector index parameters for {}: m={}, ef_construct={}, distances={:?}", index.name, m, ef_construct, distances @@ -158,6 +178,7 @@ impl VectorIndexBuilder { m, ef_construct, distances, + field_offsets: offsets, }; index_params.push(index_param); } @@ -167,11 +188,11 @@ impl VectorIndexBuilder { columns.insert(*offset, vec![]); } - if !field_offsets.is_empty() { + if !field_offsets_set.is_empty() { Some(VectorIndexBuilder { index_params, - field_offsets, field_offsets_set, + statistics_params, columns, }) } else { @@ -197,7 +218,8 @@ impl VectorIndexBuilder { let start = Instant::now(); info!("Start build vector HNSW index for location: {}", location.0); - let result = self.build_vector_index()?; + let concated_columns = self.take_concated_columns()?; + let result = self.build_vector_index(&concated_columns)?; let VectorIndexResult { index_fields, index_columns, @@ -239,8 +261,25 @@ impl VectorIndexBuilder { Ok(state) } + pub(crate) fn finalize_block(&mut self, location: &Location) -> Result { + let concated_columns = self.take_concated_columns()?; + let vector_stats = self.build_vector_statistics(&concated_columns)?; + if self.index_params.is_empty() { + return Ok(VectorIndexBuildState { + index_state: None, + vector_stats, + }); + } + + let index_state = self.finalize_with_columns(location, &concated_columns)?; + Ok(VectorIndexBuildState { + index_state: Some(index_state), + vector_stats, + }) + } + #[async_backtrace::framed] - pub async fn finalize_with_existing( + pub(crate) async fn finalize_with_existing( &mut self, operator: Operator, settings: &ReadSettings, @@ -248,10 +287,10 @@ impl VectorIndexBuilder { existing_location: Option<&Location>, existing_column_metas: Option>, existing_index_meta: Option>, - ) -> Result { + ) -> Result { // If there's no existing vector index, just use the regular finalize method if existing_location.is_none() || existing_column_metas.is_none() { - return self.finalize(location); + return self.finalize_block(location); } // Process new vector index data @@ -283,7 +322,9 @@ impl VectorIndexBuilder { start.elapsed().as_millis() as u64 ); - let result = self.build_vector_index()?; + let concated_columns = self.take_concated_columns()?; + let vector_stats = self.build_vector_statistics(&concated_columns)?; + let result = self.build_vector_index(&concated_columns)?; let VectorIndexResult { mut index_fields, mut index_columns, @@ -342,10 +383,60 @@ impl VectorIndexBuilder { location.0, size, elapsed_ms ); + Ok(VectorIndexBuildState { + index_state: Some(state), + vector_stats, + }) + } + + fn finalize_with_columns( + &self, + location: &Location, + concated_columns: &BTreeMap, + ) -> Result { + let start = Instant::now(); + info!("Start build vector HNSW index for location: {}", location.0); + + let result = self.build_vector_index(concated_columns)?; + let VectorIndexResult { + index_fields, + index_columns, + metadata, + } = result; + + let index_schema = TableSchemaRefExt::create(index_fields); + let index_block = DataBlock::new(index_columns, 1); + + let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); + let _ = blocks_to_parquet( + index_schema.as_ref(), + vec![index_block], + &mut data, + TableCompression::Zstd, + false, + Some(metadata), + )?; + + let size = data.len() as u64; + let state = VectorIndexState { + location: location.clone(), + size, + data, + }; + + let elapsed_ms = start.elapsed().as_millis() as u64; + { + metrics_inc_block_vector_index_generate_milliseconds(elapsed_ms); + } + info!( + "Finish build vector HNSW index: location={}, size={} bytes in {} ms", + location.0, size, elapsed_ms + ); + Ok(state) } - fn build_vector_index(&mut self) -> Result { + fn take_concated_columns(&mut self) -> Result> { let mut columns = BTreeMap::new(); for offset in &self.field_offsets_set { columns.insert(*offset, vec![]); @@ -358,13 +449,20 @@ impl VectorIndexBuilder { concated_columns.insert(offset, concated_column); } + Ok(concated_columns) + } + + fn build_vector_index( + &self, + concated_columns: &BTreeMap, + ) -> Result { let mut index_fields = Vec::new(); let mut index_columns = Vec::new(); let mut metadata = Vec::with_capacity(self.index_params.len()); - for (field_offsets, index_param) in self.field_offsets.iter().zip(&self.index_params) { + for index_param in &self.index_params { debug!("Building HNSW index for {}", index_param.index_name); - for (offset, column_id) in field_offsets { + for (offset, column_id) in &index_param.field_offsets { let Some(column) = concated_columns.get(offset) else { return Err(ErrorCode::Internal("Can't find vector column")); }; @@ -394,6 +492,28 @@ impl VectorIndexBuilder { }; Ok(result) } + + fn build_vector_statistics( + &self, + concated_columns: &BTreeMap, + ) -> Result> { + if self.statistics_params.is_empty() { + return Ok(None); + } + + let mut statistics = StatisticsOfVectorColumns::new(); + for (offset, (column_id, distance_type)) in &self.statistics_params { + let Some(column) = concated_columns.get(offset) else { + return Err(ErrorCode::Internal("Can't find vector column")); + }; + let vector_distance_type = vector_statistics_distance_type(*distance_type); + if let Some(vector_stats) = vector_statistics_from_column(column, *distance_type)? { + statistics.insert((*column_id, vector_distance_type), vector_stats); + } + } + + Ok((!statistics.is_empty()).then_some(statistics)) + } } struct VectorIndexResult { @@ -401,3 +521,59 @@ struct VectorIndexResult { index_columns: Vec, metadata: Vec, } + +fn vector_statistics_from_column( + column: &Column, + distance_type: DistanceType, +) -> Result> { + let column = column.remove_nullable(); + match &column { + Column::Vector(VectorColumn::Float32((values, dimension))) => { + vector_statistics_from_vectors(values.as_slice(), *dimension, distance_type) + } + _ => Ok(None), + } +} + +fn vector_statistics_from_vectors( + values: &[F32], + dimension: usize, + distance_type: DistanceType, +) -> Result> { + if dimension == 0 || values.is_empty() || !values.len().is_multiple_of(dimension) { + return Ok(None); + } + + let rows = values.len() / dimension; + let mut centroid = vec![0.0_f64; dimension]; + for vector in values.chunks_exact(dimension) { + let vector = preprocess_vector(vector, distance_type); + for (idx, value) in vector.iter().enumerate() { + centroid[idx] += *value as f64; + } + } + + let mut centroid = centroid + .into_iter() + .map(|value| (value / rows as f64) as f32) + .collect::>(); + if matches!(distance_type, DistanceType::Dot) { + normalize_vector(&mut centroid); + } + + let mut radius = 0.0_f32; + let vector_distance_type = vector_statistics_distance_type(distance_type); + for vector in values.chunks_exact(dimension) { + let vector = preprocess_vector(vector, distance_type); + radius = radius.max(vector_stat_distance( + &vector, + ¢roid, + vector_distance_type, + )?); + } + + Ok(Some(VectorColumnStatistics { + centroid: centroid.into_iter().map(F32::from).collect(), + radius: F32::from(radius), + })) +} diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index b24997bfc9438..9dab79c7c5dde 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; use databend_common_expression::ColumnRef; @@ -24,9 +25,12 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::Expr; use databend_common_expression::LimitType; -use databend_common_expression::SortColumnDescription; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline::core::Pipeline; +use databend_common_pipeline::core::ProcessorPtr; +use databend_common_pipeline_transforms::AccumulatingTransformer; use databend_common_pipeline_transforms::TransformPipelineHelper; use databend_common_pipeline_transforms::blocks::CompoundBlockOperator; use databend_common_pipeline_transforms::build_compact_block_pipeline; @@ -42,7 +46,10 @@ use crate::io::StreamBlockProperties; use crate::operations::TransformBlockBuilder; use crate::operations::TransformBlockWriter; use crate::operations::TransformSerializeBlock; +use crate::operations::TransformVectorClusterKmeans; use crate::statistics::ClusterStatsGenerator; +use crate::statistics::VectorClusterOperator; +use crate::statistics::vector_cluster_info_from_column; impl FuseTable { pub fn do_append_data( @@ -99,8 +106,9 @@ impl FuseTable { transform_len: usize, need_match: bool, ) -> Result { + let input_schema = DataSchema::from(self.schema_with_stream()).into(); let cluster_stats_gen = - self.get_cluster_stats_gen(ctx.clone(), 0, block_thresholds, None)?; + self.get_cluster_stats_gen(ctx.clone(), 0, block_thresholds, input_schema)?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { @@ -122,18 +130,30 @@ impl FuseTable { pipeline.add_pipe(builder.finalize()); } - let cluster_keys = &cluster_stats_gen.cluster_key_index; - if !cluster_keys.is_empty() { - let sort_desc: Vec = cluster_keys - .iter() - .map(|index| SortColumnDescription { - offset: *index, - asc: true, - nulls_first: false, - }) - .collect(); - let sort_desc: Arc<[_]> = sort_desc.into(); + if let Some(vector_operator) = cluster_stats_gen.vector_operator.clone() { + let rows_per_block = block_thresholds.max_rows_per_block; + let mut builder = pipeline.add_transform_with_specified_len( + move |input, output| { + Ok(ProcessorPtr::create(AccumulatingTransformer::create( + input, + output, + TransformVectorClusterKmeans::new( + vector_operator.vector_column_input_offset, + vector_operator.info.distance_type, + rows_per_block, + ), + ))) + }, + transform_len, + )?; + if need_match { + builder.add_items_prepend(vec![create_dummy_item()]); + } + pipeline.add_pipe(builder.finalize()); + } + let sort_desc: Arc<[_]> = cluster_stats_gen.sort_descs().into(); + if !sort_desc.is_empty() { let mut builder = pipeline.try_create_transform_pipeline_builder_with_len( || { Ok(TransformSortPartial::new( @@ -158,8 +178,10 @@ impl FuseTable { block_thresholds: BlockThresholds, modified_schema: Option>, ) -> Result { + let input_schema = + modified_schema.unwrap_or(DataSchema::from(self.schema_with_stream()).into()); let cluster_stats_gen = - self.get_cluster_stats_gen(ctx.clone(), 0, block_thresholds, modified_schema)?; + self.get_cluster_stats_gen(ctx.clone(), 0, block_thresholds, input_schema)?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { @@ -171,19 +193,23 @@ impl FuseTable { }); } - let cluster_keys = &cluster_stats_gen.cluster_key_index; - if !cluster_keys.is_empty() { - let sort_desc: Vec = cluster_keys - .iter() - .map(|index| SortColumnDescription { - offset: *index, - asc: true, - nulls_first: false, - }) - .collect(); - let sort_desc: Arc<[_]> = sort_desc.into(); - pipeline - .add_transformer(|| TransformSortPartial::new(LimitType::None, sort_desc.clone())); + if let Some(vector_operator) = cluster_stats_gen.vector_operator.clone() { + let rows_per_block = block_thresholds.max_rows_per_block; + pipeline.add_accumulating_transformer(move || { + TransformVectorClusterKmeans::new( + vector_operator.vector_column_input_offset, + vector_operator.info.distance_type, + rows_per_block, + ) + }); + } + + let sort_desc: Arc<[_]> = cluster_stats_gen.sort_descs().into(); + if !sort_desc.is_empty() { + pipeline.add_transformer({ + let sort_desc = sort_desc.clone(); + move || TransformSortPartial::new(LimitType::None, sort_desc.clone()) + }); } Ok(cluster_stats_gen) } @@ -193,15 +219,13 @@ impl FuseTable { ctx: Arc, level: i32, block_thresholds: BlockThresholds, - modified_schema: Option>, + input_schema: Arc, ) -> Result { let cluster_type = self.cluster_type(); if cluster_type.is_none_or(|v| v == ClusterType::Hilbert) { return Ok(ClusterStatsGenerator::default()); } - let input_schema = - modified_schema.unwrap_or(DataSchema::from(self.schema_with_stream()).into()); let mut merged = input_schema.fields().clone(); let cluster_keys = self.linear_cluster_keys(ctx.clone()); @@ -209,11 +233,36 @@ impl FuseTable { let mut extra_key_num = 0; let mut exprs = Vec::with_capacity(cluster_keys.len()); + let mut vector_cluster_info = None; + let mut vector_column_input_offset = None; - for remote_expr in &cluster_keys { + for (key_index, remote_expr) in cluster_keys.iter().enumerate() { let expr = remote_expr .as_expr(&BUILTIN_FUNCTIONS) .project_column_ref(|name| input_schema.index_of(name))?; + if matches!(expr.data_type().remove_nullable(), DataType::Vector(_)) { + let Expr::ColumnRef(ColumnRef { id, .. }) = &expr else { + return Err(ErrorCode::InvalidClusterKeys( + "Vector cluster key only supports direct column reference", + )); + }; + if vector_cluster_info.is_some() { + return Err(ErrorCode::InvalidClusterKeys( + "Only one vector column is supported in cluster by", + )); + } + let input_field = input_schema.field(*id); + let schema = self.schema(); + let field = schema.field_with_name(input_field.name())?; + let vector_info = vector_cluster_info_from_column( + &self.table_info.meta.indexes, + key_index, + field.column_id(), + field.name(), + )?; + vector_column_input_offset = Some(*id); + vector_cluster_info = Some(vector_info); + } let index = match &expr { Expr::ColumnRef(ColumnRef { id, .. }) => *id, _ => { @@ -237,6 +286,28 @@ impl FuseTable { projections: None, }] }; + let mut vector_operator = None; + if let Some(vector_info) = vector_cluster_info { + if vector_info.key_index < cluster_key_index.len() { + if let Some(vector_column_input_offset) = vector_column_input_offset { + let cluster_id_offset = merged.len(); + merged.push(DataField::new( + "_vector_cluster_id", + DataType::Number(NumberDataType::UInt64), + )); + extra_key_num += 1; + // Keep the original CLUSTER BY order. For CLUSTER BY (a, embedding, b), + // sorting should use (a, _vector_cluster_id, b), not append the vector + // cluster id after all scalar keys. + cluster_key_index[vector_info.key_index] = cluster_id_offset; + vector_operator = Some(VectorClusterOperator { + info: vector_info, + vector_column_input_offset, + vector_cluster_id_offset: cluster_id_offset, + }); + } + } + } Ok(ClusterStatsGenerator::new( self.cluster_key_id().unwrap(), @@ -246,6 +317,7 @@ impl FuseTable { level, block_thresholds, operators, + vector_operator, merged, ctx.get_function_context()?, )) diff --git a/src/query/storages/fuse/src/operations/common/processors/mod.rs b/src/query/storages/fuse/src/operations/common/processors/mod.rs index 6251dd5eebf78..78203370ef112 100644 --- a/src/query/storages/fuse/src/operations/common/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/common/processors/mod.rs @@ -20,6 +20,7 @@ mod transform_merge_commit_meta; mod transform_mutation_aggregator; mod transform_serialize_block; mod transform_serialize_segment; +mod transform_vector_cluster_kmeans; pub use multi_table_insert_commit::CommitMultiTableInsert; pub use sink_commit::CommitSink; @@ -31,3 +32,4 @@ pub use transform_mutation_aggregator::TableMutationAggregator; pub use transform_serialize_block::TransformSerializeBlock; pub use transform_serialize_segment::TransformSerializeSegment; pub use transform_serialize_segment::new_serialize_segment_processor; +pub use transform_vector_cluster_kmeans::TransformVectorClusterKmeans; diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 27a19c3bc946c..db83240836e04 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -29,6 +29,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::Expr; use databend_common_expression::TableSchemaRef; use databend_common_expression::VirtualDataSchema; +use databend_common_expression::types::DataType; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::parse_cluster_keys; @@ -923,8 +924,13 @@ fn fill_missing_segment_cluster_stats( return; } + let scalar_cluster_key_exprs = cluster_key_exprs + .iter() + .filter(|expr| !matches!(expr.data_type().remove_nullable(), DataType::Vector(_))) + .cloned() + .collect::>(); let (min, max) = get_min_max_stats( - cluster_key_exprs, + &scalar_cluster_key_exprs, &summary.col_stats, None, Some(cluster_key_id), diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_vector_cluster_kmeans.rs b/src/query/storages/fuse/src/operations/common/processors/transform_vector_cluster_kmeans.rs new file mode 100644 index 0000000000000..6d1d2b7b73cf4 --- /dev/null +++ b/src/query/storages/fuse/src/operations/common/processors/transform_vector_cluster_kmeans.rs @@ -0,0 +1,392 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::VectorColumn; +use databend_common_pipeline_transforms::AccumulatingTransform; +use databend_storages_common_index::normalize_vector; +use databend_storages_common_index::vector_distance; +use databend_storages_common_table_meta::meta::VectorDistanceType; + +const KMEANS_MAX_ITER: usize = 100; +const KMEANS_SEED: u64 = 0xD47A_BA5E_C1A5_7E12; +const KMEANS_TOLERANCE: f32 = 1e-4; +// Kmeans is computed per bounded batch so the transform can stream instead of +// buffering the whole INSERT/RECLUSTER input. Cluster ids are batch-local. +const KMEANS_BATCH_CLUSTER_COUNT: usize = 64; +const KMEANS_MAX_BATCH_ROWS: usize = 262_144; + +struct KMeansResult { + assignments: Vec, +} + +struct SmallRng { + state: u64, +} + +impl SmallRng { + fn new(seed: u64) -> Self { + Self { state: seed.max(1) } + } + + fn next_u64(&mut self) -> u64 { + self.state = self + .state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + self.state + } + + fn next_f32(&mut self) -> f32 { + let value = self.next_u64() >> 40; + (value as f32) / ((1u64 << 24) as f32) + } + + fn gen_range(&mut self, upper: usize) -> usize { + (self.next_u64() as usize) % upper + } +} + +pub struct TransformVectorClusterKmeans { + vector_column_input_offset: usize, + distance_type: VectorDistanceType, + rows_per_block: usize, + batch_rows: usize, + pending_blocks: Vec, + pending_rows: usize, +} + +impl TransformVectorClusterKmeans { + pub fn new( + vector_column_input_offset: usize, + distance_type: VectorDistanceType, + rows_per_block: usize, + ) -> Self { + let rows_per_block = rows_per_block.max(1); + Self { + vector_column_input_offset, + distance_type, + rows_per_block, + batch_rows: kmeans_batch_rows(rows_per_block), + pending_blocks: vec![], + pending_rows: 0, + } + } + + fn cluster_blocks(&mut self) -> Result> { + if self.pending_blocks.is_empty() { + return Ok(vec![]); + } + + let mut block = if self.pending_blocks.len() == 1 { + self.pending_rows = 0; + self.pending_blocks.pop().unwrap() + } else { + let blocks = std::mem::take(&mut self.pending_blocks); + self.pending_rows = 0; + DataBlock::concat(&blocks)? + }; + let num_rows = block.num_rows(); + if num_rows <= 1 { + append_cluster_id_column(&mut block, vec![0; num_rows]); + return Ok(vec![block]); + } + + let (samples, dimension) = vector_samples( + block + .get_by_offset(self.vector_column_input_offset) + .to_column(), + self.distance_type, + )?; + let k = num_rows.div_ceil(self.rows_per_block).clamp(1, num_rows); + if k <= 1 { + append_cluster_id_column(&mut block, vec![0; num_rows]); + return Ok(vec![block]); + } + + let result = kmeans( + &samples, + num_rows, + dimension, + k, + KMEANS_MAX_ITER, + KMEANS_TOLERANCE, + KMEANS_SEED, + self.distance_type, + )?; + + append_cluster_id_column(&mut block, result.assignments); + Ok(vec![block]) + } + + fn push_block(&mut self, data: DataBlock, output: &mut Vec) -> Result<()> { + let rows = data.num_rows(); + if rows == 0 { + let mut data = data; + append_cluster_id_column(&mut data, vec![]); + output.push(data); + return Ok(()); + } + + if rows > self.batch_rows { + output.extend(self.cluster_blocks()?); + let (blocks, remain) = data.split_by_rows(self.batch_rows); + for block in blocks { + self.pending_rows = block.num_rows(); + self.pending_blocks.push(block); + output.extend(self.cluster_blocks()?); + } + if let Some(remain) = remain { + self.pending_rows = remain.num_rows(); + self.pending_blocks.push(remain); + } + return Ok(()); + } + + if self.pending_rows > 0 && self.pending_rows.saturating_add(rows) > self.batch_rows { + output.extend(self.cluster_blocks()?); + } + + self.pending_rows += rows; + self.pending_blocks.push(data); + if self.pending_rows >= self.batch_rows { + output.extend(self.cluster_blocks()?); + } + + Ok(()) + } +} + +impl AccumulatingTransform for TransformVectorClusterKmeans { + const NAME: &'static str = "TransformVectorClusterKmeans"; + + fn transform(&mut self, data: DataBlock) -> Result> { + let mut output = Vec::new(); + self.push_block(data, &mut output)?; + Ok(output) + } + + fn on_finish(&mut self, output: bool) -> Result> { + if output { + self.cluster_blocks() + } else { + self.pending_blocks.clear(); + self.pending_rows = 0; + Ok(vec![]) + } + } +} + +fn kmeans_batch_rows(rows_per_block: usize) -> usize { + rows_per_block + .saturating_mul(KMEANS_BATCH_CLUSTER_COUNT) + .min(KMEANS_MAX_BATCH_ROWS) + .max(rows_per_block) + .max(1) +} + +fn vector_samples(column: Column, distance_type: VectorDistanceType) -> Result<(Vec, usize)> { + let column = column.remove_nullable(); + let Column::Vector(VectorColumn::Float32((values, dimension))) = column else { + return Err(ErrorCode::InvalidClusterKeys( + "Vector cluster key only supports float32 vector column", + )); + }; + if dimension == 0 || values.len() % dimension != 0 { + return Err(ErrorCode::InvalidClusterKeys( + "Vector cluster key has invalid vector dimension", + )); + } + + let mut samples = values.iter().map(|value| value.0).collect::>(); + if matches!(&distance_type, VectorDistanceType::Dot) { + for vector in samples.chunks_exact_mut(dimension) { + normalize_vector(vector); + } + } + + Ok((samples, dimension)) +} + +fn append_cluster_id_column(block: &mut DataBlock, assignments: Vec) { + debug_assert_eq!(block.num_rows(), assignments.len()); + let cluster_ids = assignments + .into_iter() + .map(|cluster_id| cluster_id as u64) + .collect::>(); + block.add_column(UInt64Type::from_data(cluster_ids)); +} + +fn kmeans( + data: &[f32], + rows: usize, + dim: usize, + k: usize, + max_iter: usize, + tolerance: f32, + seed: u64, + distance_type: VectorDistanceType, +) -> Result { + if rows == 0 || dim == 0 || k == 0 || k > rows || data.len() != rows * dim { + return Err(ErrorCode::InvalidClusterKeys( + "Invalid kmeans input for vector cluster key", + )); + } + + let mut rng = SmallRng::new(seed ^ 0x9e3779b97f4a7c15); + let mut centroids = init_kmeans_plus_plus(data, rows, dim, k, seed, distance_type)?; + let mut assignments = vec![usize::MAX; rows]; + let mut distances = vec![0.0; rows]; + + for _ in 0..max_iter { + let mut changed = false; + for row_idx in 0..rows { + let (cluster, distance) = + nearest_centroid(row(data, row_idx, dim), ¢roids, k, dim, distance_type)?; + if assignments[row_idx] != cluster { + changed = true; + assignments[row_idx] = cluster; + } + distances[row_idx] = distance; + } + + let mut next_centroids = vec![0.0; k * dim]; + let mut counts = vec![0usize; k]; + for (row_idx, cluster) in assignments.iter().enumerate().take(rows) { + counts[*cluster] += 1; + let point = row(data, row_idx, dim); + let centroid = row_mut(&mut next_centroids, *cluster, dim); + for (dst, src) in centroid.iter_mut().zip(point.iter()) { + *dst += *src; + } + } + + for (cluster, count) in counts.iter().enumerate().take(k) { + if *count == 0 { + let farthest = distances + .iter() + .enumerate() + .max_by(|(_, left), (_, right)| { + left.partial_cmp(right).unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|(idx, _)| idx) + .unwrap_or_else(|| rng.gen_range(rows)); + row_mut(&mut next_centroids, cluster, dim) + .copy_from_slice(row(data, farthest, dim)); + continue; + } + + let inv_count = 1.0 / counts[cluster] as f32; + for value in row_mut(&mut next_centroids, cluster, dim) { + *value *= inv_count; + } + if matches!(&distance_type, VectorDistanceType::Dot) { + normalize_vector(row_mut(&mut next_centroids, cluster, dim)); + } + } + + let mut shift = 0.0; + for cluster in 0..k { + shift += vector_distance( + row(¢roids, cluster, dim), + row(&next_centroids, cluster, dim), + VectorDistanceType::L2, + )?; + } + centroids = next_centroids; + + if !changed || shift <= tolerance { + break; + } + } + + Ok(KMeansResult { assignments }) +} + +fn init_kmeans_plus_plus( + data: &[f32], + rows: usize, + dim: usize, + k: usize, + seed: u64, + distance_type: VectorDistanceType, +) -> Result> { + let mut rng = SmallRng::new(seed); + let mut centroids = vec![0.0; k * dim]; + let first = rng.gen_range(rows); + row_mut(&mut centroids, 0, dim).copy_from_slice(row(data, first, dim)); + + let mut min_distances = vec![f32::INFINITY; rows]; + for cluster in 1..k { + let last_centroid = row(¢roids, cluster - 1, dim); + let mut total = 0.0; + for (row_idx, min_distance) in min_distances.iter_mut().enumerate().take(rows) { + let distance = vector_distance(row(data, row_idx, dim), last_centroid, distance_type)?; + if distance < *min_distance { + *min_distance = distance; + } + total += *min_distance; + } + + let chosen = if total <= f32::EPSILON || !total.is_finite() { + rng.gen_range(rows) + } else { + let mut threshold = rng.next_f32() * total; + let mut picked = rows - 1; + for (idx, distance) in min_distances.iter().enumerate() { + threshold -= *distance; + if threshold <= 0.0 { + picked = idx; + break; + } + } + picked + }; + row_mut(&mut centroids, cluster, dim).copy_from_slice(row(data, chosen, dim)); + } + + Ok(centroids) +} + +fn nearest_centroid( + point: &[f32], + centroids: &[f32], + k: usize, + dim: usize, + distance_type: VectorDistanceType, +) -> Result<(usize, f32)> { + let mut best_idx = 0; + let mut best_dist = f32::INFINITY; + for cluster in 0..k { + let distance = vector_distance(point, row(centroids, cluster, dim), distance_type)?; + if distance < best_dist { + best_idx = cluster; + best_dist = distance; + } + } + Ok((best_idx, best_dist)) +} + +fn row(data: &[f32], idx: usize, dim: usize) -> &[f32] { + &data[idx * dim..(idx + 1) * dim] +} + +fn row_mut(data: &mut [f32], idx: usize, dim: usize) -> &mut [f32] { + &mut data[idx * dim..(idx + 1) * dim] +} diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 2b4e99e8ad86a..68f562721977a 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -27,6 +27,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::ColumnRef; use databend_common_expression::Expr; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; @@ -35,6 +36,7 @@ use databend_common_expression::types::DataType; use databend_common_sql::parse_cluster_keys; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::LoadParams; +use databend_storages_common_index::vector_spheres_overlap; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; @@ -44,6 +46,7 @@ use databend_storages_common_table_meta::meta::RawBlockHLL; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::VectorColumnStatistics; use fastrace::Span; use fastrace::func_path; use fastrace::future::FutureExt; @@ -64,8 +67,10 @@ use crate::SegmentLocation; use crate::io::MetaReaders; use crate::operations::ReclusterMode; use crate::operations::common::BlockMetaIndex as BlockIndex; +use crate::statistics::VectorClusterInfo; use crate::statistics::get_min_max_stats; use crate::statistics::reducers::merge_statistics_mut; +use crate::statistics::vector_cluster_info_from_column; /// Maximum recluster depth allowed when only two blocks remain. /// For two-block layouts, repeated reclustering beyond this level @@ -131,6 +136,11 @@ struct ReclusterBlock { stats: ClusterStatistics, } +struct VectorReclusterSegment { + segment: SelectedReclusterSegment, + block_metas: Vec>, +} + #[derive(Clone)] pub struct SelectedReclusterSegment { pub loc: SegmentLocation, @@ -164,6 +174,7 @@ pub struct ReclusterMutator { pub(crate) memory_threshold: usize, pub(crate) cluster_key_exprs: Vec>, pub(crate) cluster_key_types: Vec, + pub(crate) vector_cluster_info: Option, } impl ReclusterMutator { @@ -188,9 +199,11 @@ impl ReclusterMutator { // safe to unwrap let cluster_keys = table.resolve_cluster_keys().unwrap(); - let cluster_key_exprs = + let full_cluster_key_exprs = parse_cluster_keys(ctx.clone(), Arc::new(table.clone()), cluster_keys)?; - if cluster_key_exprs.is_empty() { + let vector_cluster_info = vector_cluster_info_from_exprs(table, &full_cluster_key_exprs)?; + let cluster_key_exprs = scalar_cluster_key_exprs(full_cluster_key_exprs); + if cluster_key_exprs.is_empty() && vector_cluster_info.is_none() { return Err(ErrorCode::Internal( "recluster requires non-empty cluster key expressions", )); @@ -211,6 +224,7 @@ impl ReclusterMutator { memory_threshold, cluster_key_exprs, cluster_key_types, + vector_cluster_info, }) } @@ -225,9 +239,11 @@ impl ReclusterMutator { block_thresholds: BlockThresholds, cluster_key_id: u32, max_tasks: usize, + vector_cluster_info: Option, ) -> Self { + let cluster_key_exprs = scalar_cluster_key_exprs(cluster_key_exprs); assert!( - !cluster_key_exprs.is_empty(), + !cluster_key_exprs.is_empty() || vector_cluster_info.is_some(), "recluster requires non-empty cluster key expressions" ); let cluster_key_types = cluster_key_exprs @@ -250,6 +266,7 @@ impl ReclusterMutator { memory_threshold, cluster_key_exprs, cluster_key_types, + vector_cluster_info, } } @@ -551,13 +568,17 @@ impl ReclusterMutator { let mut total_rows = 0; let mut total_bytes = 0; let mut total_compressed = 0; + let vector_cluster_info = self.vector_cluster_key(); + let has_scalar_cluster_key = !self.cluster_key_types.is_empty(); let mut points_map: HashMap, (Vec, Vec)> = HashMap::new(); for &i in indices.iter() { let block = &blocks[i]; let stats = &block.stats; - points_map.entry(stats.min().clone()).or_default().0.push(i); - points_map.entry(stats.max().clone()).or_default().1.push(i); + if has_scalar_cluster_key { + points_map.entry(stats.min().clone()).or_default().0.push(i); + points_map.entry(stats.max().clone()).or_default().1.push(i); + } total_rows += block.meta.row_count; total_bytes += block.meta.block_size; @@ -608,7 +629,32 @@ impl ReclusterMutator { average_depth, max_depth, max_point_overlap_count, - } = self.fetch_max_depth(points_map, self.depth_threshold, max_blocks_num)?; + } = match (vector_cluster_info, has_scalar_cluster_key) { + (Some(vector_cluster_info), true) => { + let scalar_selection = self.fetch_max_depth( + points_map, + &self.cluster_key_types, + self.depth_threshold, + max_blocks_num, + )?; + let vector_selection = self.fetch_max_vector_depth( + &indices, + blocks, + vector_cluster_info, + max_blocks_num, + )?; + merge_depth_selections(scalar_selection, vector_selection, max_blocks_num) + } + (Some(vector_cluster_info), false) => { + self.fetch_max_vector_depth(&indices, blocks, vector_cluster_info, max_blocks_num)? + } + (None, _) => self.fetch_max_depth( + points_map, + &self.cluster_key_types, + self.depth_threshold, + max_blocks_num, + )?, + }; let max_total_bytes = self.memory_threshold.saturating_mul(task_budget); // Keep the first, highest-depth blocks within the remaining execution budget. // This is a second-stage guard after candidate selection: the average @@ -813,6 +859,29 @@ impl ReclusterMutator { &self, compact_segments: &[(SegmentLocation, Arc)], max_len: usize, + ) -> Result>> { + // Segment selection follows the cluster key shape: + // - vector-only: use vector sphere overlap directly because there is no scalar range. + // - scalar-only: use scalar ClusterStatistics min/max overlap. + // - mixed: first build scalar windows, then refine each window by vector sphere overlap. + // This avoids running scalar overlap again during vector refinement. + let vector_cluster_info = self.vector_cluster_key(); + if vector_cluster_info.is_some() && self.cluster_key_types.is_empty() { + return self.select_vector_only_segments(compact_segments, max_len); + } + + let scalar_windows = self.select_scalar_segments(compact_segments, max_len)?; + if let Some(vector_cluster_info) = vector_cluster_info { + self.refine_scalar_windows_by_vector(scalar_windows, vector_cluster_info, max_len) + } else { + Ok(scalar_windows) + } + } + + fn select_scalar_segments( + &self, + compact_segments: &[(SegmentLocation, Arc)], + max_len: usize, ) -> Result>> { // Keep a bounded overlap between adjacent hot windows. Anchors carry // still-active segments forward so a long overlap range is not split @@ -996,7 +1065,7 @@ impl ReclusterMutator { // Convert index windows back to segment objects. Empty windows can be // produced only by an empty fallback group and are ignored here. - Ok(windows + let scalar_windows = windows .into_iter() .map(|(selected_indices, _)| { selected_indices @@ -1005,9 +1074,251 @@ impl ReclusterMutator { .collect::>() }) .filter(|window| !window.is_empty()) + .collect::>(); + + Ok(scalar_windows) + } + + fn select_vector_only_segments( + &self, + compact_segments: &[(SegmentLocation, Arc)], + max_len: usize, + ) -> Result>> { + let Some(vector_cluster_info) = self.vector_cluster_key() else { + return Ok(vec![]); + }; + let block_per_seg = self.block_thresholds.block_per_segment; + let max_len = max_len.max(1); + let mut total_blocks = 0; + let mut candidate_indices = IndexSet::new(); + let mut small_segments = IndexSet::new(); + let mut segments: Vec> = + Vec::with_capacity(compact_segments.len()); + segments.resize_with(compact_segments.len(), || None); + + for (i, (loc, compact_segment)) in compact_segments.iter().enumerate() { + let segment = + SelectedReclusterSegment::create(self, loc.clone(), compact_segment.clone()); + let level = segment.stats.level; + if level < 0 && compact_segment.summary.block_count as usize >= block_per_seg { + continue; + } + + let block_metas = compact_segment.block_metas()?; + let current_blocks_num = compact_segment.summary.block_count as usize; + if current_blocks_num < block_per_seg { + small_segments.insert(i); + } + total_blocks += current_blocks_num; + candidate_indices.insert(i); + segments[i] = Some(VectorReclusterSegment { + segment, + block_metas, + }); + } + + let mut windows = Vec::new(); + let mut seen_windows = HashSet::new(); + let mut covered_segments = IndexSet::new(); + + if candidate_indices.len() > 1 && total_blocks > block_per_seg { + let mut overlaps = vec![IndexSet::new(); compact_segments.len()]; + for idx in candidate_indices.iter().copied() { + overlaps[idx].insert(idx); + } + + let candidate_list = candidate_indices.iter().copied().collect::>(); + for left_pos in 0..candidate_list.len() { + for right_pos in left_pos + 1..candidate_list.len() { + let left_idx = candidate_list[left_pos]; + let right_idx = candidate_list[right_pos]; + let Some(left_segment) = segments[left_idx].as_ref() else { + continue; + }; + let Some(right_segment) = segments[right_idx].as_ref() else { + continue; + }; + + if vector_segment_spheres_overlap( + &left_segment.block_metas, + &right_segment.block_metas, + vector_cluster_info, + )? { + overlaps[left_idx].insert(right_idx); + overlaps[right_idx].insert(left_idx); + } + } + } + + let mut depth_order = candidate_indices + .iter() + .copied() + .map(|idx| (idx, overlaps[idx].len())) + .filter(|(_, depth)| *depth > 1) + .collect::>(); + depth_order.sort_by(|(left_idx, left_depth), (right_idx, right_depth)| { + right_depth + .cmp(left_depth) + .then_with(|| left_idx.cmp(right_idx)) + }); + + for (idx, depth) in depth_order { + let mut selected_indices = overlaps[idx].iter().copied().collect::>(); + selected_indices.sort_by(|left_idx, right_idx| { + overlaps[*right_idx] + .len() + .cmp(&overlaps[*left_idx].len()) + .then_with(|| left_idx.cmp(right_idx)) + }); + selected_indices.truncate(max_len); + + let mut window_key = selected_indices.clone(); + window_key.sort_unstable(); + if seen_windows.insert(window_key) { + covered_segments.extend(selected_indices.iter().copied()); + windows.push((selected_indices, depth)); + } + } + + debug!( + "recluster: vector segment selection overlap windows segments={} blocks={} window_count={} covered_segments={}", + candidate_indices.len(), + total_blocks, + windows.len(), + covered_segments.len(), + ); + } + + let max_threads = (self.ctx.get_settings().get_max_threads()? as usize).max(2); + let mut fallback_indices = Vec::new(); + let mut fallback_blocks = 0; + for idx in candidate_indices { + if !covered_segments.contains(&idx) || small_segments.contains(&idx) { + fallback_blocks += compact_segments[idx].1.summary.block_count as usize; + fallback_indices.push(idx); + } + } + + let fallback_groups = if fallback_blocks <= block_per_seg { + vec![fallback_indices] + } else { + fallback_indices + .chunks(max_threads) + .map(|chunk| chunk.to_vec()) + .collect::>() + }; + for selected_indices in fallback_groups { + let mut window_key = selected_indices.clone(); + window_key.sort_unstable(); + if seen_windows.insert(window_key) { + windows.push((selected_indices, 0)); + } + } + + Ok(windows + .into_iter() + .map(|(selected_indices, _)| { + selected_indices + .into_iter() + .filter_map(|i| segments[i].as_ref().map(|segment| segment.segment.clone())) + .collect::>() + }) + .filter(|window| !window.is_empty()) .collect()) } + fn refine_scalar_windows_by_vector( + &self, + scalar_windows: Vec>, + vector_cluster_info: &VectorClusterInfo, + max_len: usize, + ) -> Result>> { + let max_len = max_len.max(1); + let mut refined_windows = Vec::with_capacity(scalar_windows.len()); + let mut seen_windows = HashSet::new(); + + for scalar_window in scalar_windows { + if scalar_window.len() < 2 { + if seen_windows.insert(Self::segment_window_key(&scalar_window)) { + refined_windows.push(scalar_window); + } + continue; + } + + let mut vector_segments = Vec::with_capacity(scalar_window.len()); + for segment in scalar_window.iter().cloned() { + let block_metas = segment.info.block_metas()?; + vector_segments.push(VectorReclusterSegment { + segment, + block_metas, + }); + } + + let mut overlaps = vec![IndexSet::new(); vector_segments.len()]; + for (idx, overlap) in overlaps.iter_mut().enumerate().take(vector_segments.len()) { + overlap.insert(idx); + } + + for left in 0..vector_segments.len() { + for right in left + 1..vector_segments.len() { + if vector_segment_spheres_overlap( + &vector_segments[left].block_metas, + &vector_segments[right].block_metas, + vector_cluster_info, + )? { + overlaps[left].insert(right); + overlaps[right].insert(left); + } + } + } + + let mut has_vector_component = false; + let mut visited = vec![false; vector_segments.len()]; + for start in 0..vector_segments.len() { + if visited[start] { + continue; + } + + let mut stack = vec![start]; + let mut component = Vec::new(); + visited[start] = true; + while let Some(idx) = stack.pop() { + component.push(idx); + for next in overlaps[idx].iter().copied() { + if !visited[next] { + visited[next] = true; + stack.push(next); + } + } + } + + if component.len() < 2 { + continue; + } + has_vector_component = true; + + component.sort_unstable_by_key(|idx| vector_segments[*idx].segment.loc.segment_idx); + for chunk in component.chunks(max_len) { + let window = chunk + .iter() + .map(|idx| vector_segments[*idx].segment.clone()) + .collect::>(); + if seen_windows.insert(Self::segment_window_key(&window)) { + refined_windows.push(window); + } + } + } + + if !has_vector_component + && seen_windows.insert(Self::segment_window_key(&scalar_window)) + { + refined_windows.push(scalar_window); + } + } + + Ok(refined_windows) + } + fn flush_segment_window( current_indices: &mut IndexSet, current_max_depth: &mut usize, @@ -1034,6 +1345,15 @@ impl ReclusterMutator { selected_indices } + fn segment_window_key(window: &[SelectedReclusterSegment]) -> Vec { + let mut key = window + .iter() + .map(|segment| segment.loc.segment_idx) + .collect::>(); + key.sort_unstable(); + key + } + fn build_cluster_stats_for_recluster( &self, cluster_stats: Option<&ClusterStatistics>, @@ -1048,7 +1368,7 @@ impl ReclusterMutator { let (min_stats, max_stats) = get_min_max_stats( &self.cluster_key_exprs, col_stats, - cluster_stats, + None, Some(self.cluster_key_id), self.schema.as_ref(), ); @@ -1168,9 +1488,149 @@ impl ReclusterMutator { .collect()) } + fn vector_cluster_key(&self) -> Option<&VectorClusterInfo> { + self.vector_cluster_info.as_ref() + } + + fn fetch_max_vector_depth( + &self, + indices: &[usize], + blocks: &[ReclusterBlock], + vector_cluster_info: &VectorClusterInfo, + max_len: usize, + ) -> Result { + let mut vector_blocks = Vec::with_capacity(indices.len()); + let mut selected_idx = IndexSet::with_capacity(max_len); + + for &idx in indices { + if let Some(vector_stats) = + block_meta_vector_stats(blocks[idx].meta.as_ref(), vector_cluster_info) + { + vector_blocks.push((idx, vector_stats)); + } else { + selected_idx.insert(idx); + } + } + + if vector_blocks.is_empty() { + selected_idx.truncate(max_len); + return Ok(DepthSelection { + average_depth: 0.0, + max_depth: 0, + max_point_overlap_count: 0, + selected_idx, + }); + } + + let mut overlaps = vec![IndexSet::new(); vector_blocks.len()]; + for (idx, overlap) in overlaps.iter_mut().enumerate() { + overlap.insert(vector_blocks[idx].0); + } + + for left in 0..vector_blocks.len() { + for right in left + 1..vector_blocks.len() { + let left_idx = vector_blocks[left].0; + let right_idx = vector_blocks[right].0; + if !self + .scalar_cluster_stats_overlap(&blocks[left_idx].stats, &blocks[right_idx].stats) + { + continue; + } + + if vector_spheres_overlap( + vector_blocks[left].1, + vector_blocks[right].1, + vector_cluster_info.distance_type, + )? { + overlaps[left].insert(right_idx); + overlaps[right].insert(left_idx); + } + } + } + + let mut max_depth = 0usize; + let mut max_point = 0usize; + let mut sum_depth = 0usize; + for (idx, overlap) in overlaps.iter().enumerate() { + let depth = overlap.len(); + sum_depth += depth; + if depth > max_depth { + max_depth = depth; + max_point = idx; + } + } + + let average_depth = (10000.0 * sum_depth as f64 / overlaps.len() as f64).round() / 10000.0; + let max_point_overlap_count = max_depth; + + let vector_depth_threshold = self.depth_threshold.min(1.0); + if max_depth as f64 > vector_depth_threshold { + selected_idx.extend(overlaps[max_point].iter().copied()); + + let mut depth_order = overlaps + .iter() + .enumerate() + .map(|(idx, overlap)| (idx, overlap.len())) + .collect::>(); + depth_order.sort_by(|(left_idx, left_depth), (right_idx, right_depth)| { + right_depth + .cmp(left_depth) + .then_with(|| vector_blocks[*left_idx].0.cmp(&vector_blocks[*right_idx].0)) + }); + + for (idx, _) in depth_order { + if selected_idx.len() >= max_len { + break; + } + selected_idx.extend(overlaps[idx].iter().copied()); + } + } + + selected_idx.truncate(max_len); + Ok(DepthSelection { + selected_idx, + average_depth, + max_depth, + max_point_overlap_count, + }) + } + + fn scalar_cluster_stats_overlap( + &self, + left: &ClusterStatistics, + right: &ClusterStatistics, + ) -> bool { + let left_min = left.min(); + let left_max = left.max(); + let right_min = right.min(); + let right_max = right.max(); + + let cluster_key_count = left_min.len(); + if cluster_key_count == 0 { + return true; + } + + if left_max.len() < cluster_key_count + || right_min.len() < cluster_key_count + || right_max.len() < cluster_key_count + { + return true; + } + + for key_index in 0..cluster_key_count { + if !scalar_le(&left_min[key_index], &right_max[key_index]) + || !scalar_le(&right_min[key_index], &left_max[key_index]) + { + return false; + } + } + true + } + fn fetch_max_depth( &self, points_map: HashMap, (Vec, Vec)>, + cluster_key_types: &[DataType], depth_threshold: f64, max_len: usize, ) -> Result { @@ -1180,7 +1640,7 @@ impl ReclusterMutator { let mut point_overlaps: Vec> = Vec::with_capacity(points_map.len()); let mut unfinished_intervals = BTreeMap::new(); let (keys, values): (Vec<_>, Vec<_>) = points_map.into_iter().unzip(); - let indices = compare_scalars(keys, &self.cluster_key_types)?; + let indices = compare_scalars(keys, cluster_key_types)?; for (i, idx) in indices.into_iter().enumerate() { let start = &values[idx as usize].0; let end = &values[idx as usize].1; @@ -1331,3 +1791,138 @@ impl ReclusterMutator { open_interval_count + start.len() } } + +fn merge_depth_selections( + mut left: DepthSelection, + right: DepthSelection, + max_len: usize, +) -> DepthSelection { + let DepthSelection { + selected_idx: right_selected_idx, + average_depth: right_average_depth, + max_depth: right_max_depth, + max_point_overlap_count: right_max_point_overlap_count, + } = right; + let average_depth = left.average_depth.max(right_average_depth); + let max_depth = left.max_depth.max(right_max_depth); + let max_point_overlap_count = left + .max_point_overlap_count + .max(right_max_point_overlap_count); + + left.selected_idx.extend(right_selected_idx); + left.selected_idx.truncate(max_len); + DepthSelection { + selected_idx: left.selected_idx, + average_depth, + max_depth, + max_point_overlap_count, + } +} + +fn vector_segment_spheres_overlap( + left_blocks: &[Arc], + right_blocks: &[Arc], + vector_cluster_info: &VectorClusterInfo, +) -> Result { + let mut left_stats = Vec::new(); + let mut right_stats = Vec::new(); + let mut left_missing_stats = false; + let mut right_missing_stats = false; + + for block_meta in left_blocks { + if let Some(vector_stats) = + block_meta_vector_stats(block_meta.as_ref(), vector_cluster_info) + { + left_stats.push(vector_stats); + } else { + left_missing_stats = true; + } + } + + for block_meta in right_blocks { + if let Some(vector_stats) = + block_meta_vector_stats(block_meta.as_ref(), vector_cluster_info) + { + right_stats.push(vector_stats); + } else { + right_missing_stats = true; + } + } + + if left_missing_stats || right_missing_stats || left_stats.is_empty() || right_stats.is_empty() + { + return Ok(true); + } + + for left_stat in &left_stats { + for right_stat in &right_stats { + if vector_spheres_overlap(left_stat, right_stat, vector_cluster_info.distance_type)? { + return Ok(true); + } + } + } + + Ok(false) +} + +fn vector_cluster_info_from_exprs( + table: &FuseTable, + cluster_keys: &[Expr], +) -> Result> { + let table_schema = table.schema(); + let mut vector_cluster_info = None; + + for (key_index, expr) in cluster_keys.iter().enumerate() { + if !matches!(expr.data_type().remove_nullable(), DataType::Vector(_)) { + continue; + } + + let Expr::ColumnRef(ColumnRef { id, .. }) = expr else { + return Err(ErrorCode::InvalidClusterKeys( + "Vector cluster key only supports direct column reference", + )); + }; + + let field = table_schema.field(*id); + let vector_info = vector_cluster_info_from_column( + &table.table_info.meta.indexes, + key_index, + field.column_id(), + field.name(), + )?; + + if vector_cluster_info.is_some() { + return Err(ErrorCode::InvalidClusterKeys( + "Only one vector column is supported in cluster by", + )); + } + + vector_cluster_info = Some(vector_info); + } + + Ok(vector_cluster_info) +} + +fn scalar_cluster_key_exprs(cluster_key_exprs: Vec>) -> Vec> { + cluster_key_exprs + .into_iter() + .filter(|expr| !matches!(expr.data_type().remove_nullable(), DataType::Vector(_))) + .collect() +} + +fn block_meta_vector_stats<'a>( + block_meta: &'a BlockMeta, + vector_cluster_info: &VectorClusterInfo, +) -> Option<&'a VectorColumnStatistics> { + block_meta.vector_stats.as_ref()?.get(&( + vector_cluster_info.column_id, + vector_cluster_info.distance_type, + )) +} + +fn scalar_le(left: &Scalar, right: &Scalar) -> bool { + matches!( + left.partial_cmp(right), + None | Some(cmp::Ordering::Less | cmp::Ordering::Equal) + ) +} diff --git a/src/query/storages/fuse/src/operations/table_index.rs b/src/query/storages/fuse/src/operations/table_index.rs index 35346d1982c40..6a3a8f76f3633 100644 --- a/src/query/storages/fuse/src/operations/table_index.rs +++ b/src/query/storages/fuse/src/operations/table_index.rs @@ -882,7 +882,7 @@ impl AsyncTransform for VectorIndexTransform { let vector_index_location = self.meta_locations.block_vector_index_location(); let existing_location = &block_meta.vector_index_location; - let state = builder + let vector_result = builder .finalize_with_existing( self.operator.clone(), &self.settings, @@ -892,9 +892,17 @@ impl AsyncTransform for VectorIndexTransform { index_meta.clone(), ) .await?; + let Some(state) = vector_result.index_state else { + return Err(ErrorCode::Internal("Failed to build vector index")); + }; new_block_meta.vector_index_size = Some(state.size); new_block_meta.vector_index_location = Some(vector_index_location); + let mut vector_stats = block_meta.vector_stats.clone().unwrap_or_default(); + if let Some(new_vector_stats) = vector_result.vector_stats { + vector_stats.extend(new_vector_stats); + } + new_block_meta.vector_stats = (!vector_stats.is_empty()).then_some(vector_stats); BlockWriter::write_down_vector_index_state(&self.operator, Some(state)).await?; let extended_block_meta = ExtendedBlockMeta { diff --git a/src/query/storages/fuse/src/pruning/vector_index_pruner.rs b/src/query/storages/fuse/src/pruning/vector_index_pruner.rs index 5172546cf2bba..9741996777361 100644 --- a/src/query/storages/fuse/src/pruning/vector_index_pruner.rs +++ b/src/query/storages/fuse/src/pruning/vector_index_pruner.rs @@ -34,6 +34,7 @@ use databend_common_expression::Evaluator; use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::RemoteExpr; +use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_expression::VECTOR_SCORE_COL_NAME; use databend_common_expression::types::Buffer; @@ -48,9 +49,12 @@ use databend_common_metrics::storage::metrics_inc_bytes_block_vector_index_pruni use databend_storages_common_index::DistanceType; use databend_storages_common_index::FixedLengthPriorityQueue; use databend_storages_common_index::ScoredPointOffset; +use databend_storages_common_index::vector_distance_lower_bound; +use databend_storages_common_index::vector_statistics_distance_type; use databend_storages_common_io::ReadSettings; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::VectorDistanceType; use futures_util::future; use log::info; use tokio::sync::OwnedSemaphorePermit; @@ -71,6 +75,12 @@ struct VectorTopNParam { limit: usize, } +#[derive(Clone)] +struct VectorFilterParam { + filter_expr: Expr, + max_score: Option, +} + /// Vector index pruner. #[derive(Clone)] pub struct VectorIndexPruner { @@ -79,6 +89,9 @@ pub struct VectorIndexPruner { _schema: TableSchemaRef, vector_index: VectorIndexInfo, vector_reader: VectorIndexReader, + query_values: Vec, + vector_distance_type: VectorDistanceType, + vector_filter_param: Option, vector_topn_param: Option, } @@ -113,15 +126,19 @@ impl VectorIndexPruner { ), ]; - let query_values = - unsafe { std::mem::transmute::, Vec>(vector_index.query_values.clone()) }; + let vector_distance_type = vector_statistics_distance_type(distance_type); + let query_values = vector_index + .query_values + .iter() + .map(|value| value.0) + .collect::>(); let vector_reader = VectorIndexReader::create( pruning_ctx.dal.clone(), settings, distance_type, columns, - query_values, + query_values.clone(), ); // If the filter only has the vector score column, we can filter the scores. @@ -137,6 +154,10 @@ impl VectorIndexPruner { } else { None }; + let vector_filter_param = filter_expr.clone().map(|filter_expr| VectorFilterParam { + max_score: vector_score_upper_bound(&filter_expr), + filter_expr, + }); let mut vector_topn_param = None; // If the first sort expr is the vector score column and has the limit value, @@ -162,6 +183,9 @@ impl VectorIndexPruner { _schema: schema, vector_index, vector_reader, + query_values, + vector_distance_type, + vector_filter_param, vector_topn_param, }) } @@ -230,7 +254,13 @@ impl VectorIndexPruner { return Ok(pruned_metas); } - // Unable to do prune, fallback to only calculating the score + if let Some(param) = &self.vector_filter_param { + return self + .vector_index_filter_prune(Some(¶m.filter_expr), param.max_score, metas) + .await; + } + + // Unable to do prune, fallback to only calculating the score. self.vector_index_scores(metas).await } @@ -239,22 +269,45 @@ impl VectorIndexPruner { limit: usize, metas: Vec<(BlockMetaIndex, Arc)>, ) -> Result)>> { - let results = self - .process_vector_pruning_tasks(metas, move |vector_reader, row_count, location| { - let limit = limit; - async move { vector_reader.prune(limit, row_count, &location).await } - }) - .await?; - let mut top_queue = FixedLengthPriorityQueue::new(limit); - let len = results.len(); + let mut candidates = self.vector_prune_candidates(metas)?; + let len = candidates.len(); + candidates.sort_by(|left, right| compare_lower_bound(left.lower_bound, right.lower_bound)); + let mut vector_prune_result_map = HashMap::with_capacity(len); - for vector_prune_result in results { + let mut processed_blocks = 0usize; + let filter_max_score = self + .vector_filter_param + .as_ref() + .and_then(|param| param.max_score); + for candidate in candidates { + if self.can_skip_by_filter_bound(candidate.lower_bound, filter_max_score) { + continue; + } + if self.can_skip_by_lower_bound(candidate.lower_bound, &top_queue, limit) { + break; + } + + let vector_prune_result = self + .process_vector_pruning_candidate( + candidate, + move |vector_reader, row_count, location| async move { + vector_reader.prune(limit, row_count, &location).await + }, + ) + .await?; + processed_blocks += 1; for vector_score in &vector_prune_result.scores { top_queue.push(vector_score.clone()); } vector_prune_result_map.insert(vector_prune_result.block_idx, vector_prune_result); } + if processed_blocks < len { + info!( + "Vector stat lower bound pruned {} blocks before hnsw topn", + len - processed_blocks + ); + } let top_scores = top_queue.into_sorted_vec(); let top_indexes: HashSet = top_scores.iter().map(|s| s.index).collect(); @@ -288,6 +341,12 @@ impl VectorIndexPruner { limit: usize, metas: Vec<(BlockMetaIndex, Arc)>, ) -> Result)>> { + if asc { + return self + .vector_index_topn_prune_by_stat_bounds(filter_expr, limit, metas) + .await; + } + let results = self .process_vector_pruning_tasks( metas, @@ -373,6 +432,73 @@ impl VectorIndexPruner { Ok(pruned_metas) } + async fn vector_index_topn_prune_by_stat_bounds( + &self, + filter_expr: &Option, + limit: usize, + metas: Vec<(BlockMetaIndex, Arc)>, + ) -> Result)>> { + let mut top_queue = FixedLengthPriorityQueue::new(limit); + let mut candidates = self.vector_prune_candidates(metas)?; + let len = candidates.len(); + candidates.sort_by(|left, right| compare_lower_bound(left.lower_bound, right.lower_bound)); + + let mut vector_prune_result_map = HashMap::with_capacity(len); + let mut processed_blocks = 0usize; + for candidate in candidates { + if self.can_skip_by_lower_bound(candidate.lower_bound, &top_queue, limit) { + break; + } + + let vector_prune_result = self + .process_vector_pruning_candidate( + candidate, + |vector_reader, row_count, location| async move { + vector_reader.generate_scores(row_count, &location).await + }, + ) + .await?; + processed_blocks += 1; + + if self.push_scores_to_top_queue( + filter_expr, + true, + &vector_prune_result, + &mut top_queue, + )? { + vector_prune_result_map.insert(vector_prune_result.block_idx, vector_prune_result); + } + } + if processed_blocks < len { + info!( + "Vector stat lower bound pruned {} blocks before score topn", + len - processed_blocks + ); + } + + let top_scores = top_queue.into_sorted_vec(); + let top_indexes: HashSet = top_scores.iter().map(|s| s.index).collect(); + + let mut pruned_metas = Vec::with_capacity(top_indexes.len()); + for index in 0..len { + if !top_indexes.contains(&index) { + continue; + } + let vector_prune_result = vector_prune_result_map.remove(&index).unwrap(); + + let mut vector_scores = Vec::with_capacity(vector_prune_result.scores.len()); + for vector_score in &vector_prune_result.scores { + vector_scores.push((vector_score.row_idx as usize, vector_score.score)); + } + let mut block_meta_index = vector_prune_result.block_meta_index; + block_meta_index.vector_scores = Some(vector_scores); + + pruned_metas.push((block_meta_index, vector_prune_result.block_meta)); + } + + Ok(pruned_metas) + } + async fn vector_index_scores( &self, metas: Vec<(BlockMetaIndex, Arc)>, @@ -423,6 +549,70 @@ impl VectorIndexPruner { Ok(new_metas) } + async fn vector_index_filter_prune( + &self, + filter_expr: Option<&Expr>, + max_score: Option, + metas: Vec<(BlockMetaIndex, Arc)>, + ) -> Result)>> { + let start = Instant::now(); + let mut skipped_blocks = 0usize; + let candidates = self.vector_prune_candidates(metas)?; + let mut keep_metas = Vec::with_capacity(candidates.len()); + for candidate in candidates { + if self.can_skip_by_filter_bound(candidate.lower_bound, max_score) { + skipped_blocks += 1; + continue; + } + keep_metas.push((candidate.block_meta_index, candidate.block_meta)); + } + if keep_metas.is_empty() { + if skipped_blocks > 0 { + info!("Vector stat lower bound pruned {skipped_blocks} blocks before score filter"); + } + return Ok(vec![]); + } + + let results = self + .pruning_ctx + .pruning_cost + .measure_async( + PruningCostKind::BlocksVector, + self.process_vector_pruning_tasks( + keep_metas, + |vector_reader, row_count, location| async move { + vector_reader.generate_scores(row_count, &location).await + }, + ), + ) + .await?; + + let mut new_metas = Vec::with_capacity(results.len()); + for vector_prune_result in results { + if !self.block_matches_vector_filter(filter_expr, &vector_prune_result)? { + continue; + } + + let mut vector_scores = Vec::with_capacity(vector_prune_result.scores.len()); + for vector_score in &vector_prune_result.scores { + vector_scores.push((vector_score.row_idx as usize, vector_score.score)); + } + let mut block_meta_index = vector_prune_result.block_meta_index; + block_meta_index.vector_scores = Some(vector_scores); + + new_metas.push((block_meta_index, vector_prune_result.block_meta)); + } + + let elapsed = start.elapsed().as_millis() as u64; + metrics_inc_block_vector_index_pruning_milliseconds(elapsed); + if skipped_blocks > 0 { + info!("Vector stat lower bound pruned {skipped_blocks} blocks before score filter"); + } + info!("Vector index calculate score filter elapsed: {elapsed}"); + + Ok(new_metas) + } + // Helper function to process vector pruning tasks with different vector reader operations async fn process_vector_pruning_tasks( &self, @@ -498,9 +688,196 @@ impl VectorIndexPruner { Ok(results) } + + async fn process_vector_pruning_candidate( + &self, + candidate: VectorPruneCandidate, + vector_reader_op: F, + ) -> Result + where + F: FnOnce(VectorIndexReader, usize, String) -> Fut, + Fut: Future>> + Send, + { + let VectorPruneCandidate { + index, + block_meta_index, + block_meta, + .. + } = candidate; + let Some(location) = &block_meta.vector_index_location else { + return Err(ErrorCode::StorageUnavailable(format!( + "vector index {} file don't exist, need refresh", + self.vector_index.index_name + ))); + }; + + let row_count = block_meta.row_count as usize; + let score_offsets = + vector_reader_op(self.vector_reader.clone(), row_count, location.0.clone()).await?; + + let mut vector_scores = Vec::with_capacity(score_offsets.len()); + for score_offset in score_offsets { + let vector_score = VectorScore { + index, + row_idx: score_offset.idx, + score: F32::from(score_offset.score), + }; + vector_scores.push(vector_score); + } + + Ok(VectorPruneResult { + block_idx: index, + scores: vector_scores, + block_meta_index, + block_meta, + }) + } + + fn vector_prune_candidates( + &self, + metas: Vec<(BlockMetaIndex, Arc)>, + ) -> Result> { + metas + .into_iter() + .enumerate() + .map(|(index, (block_meta_index, block_meta))| { + let lower_bound = self.vector_stat_lower_bound(block_meta.as_ref())?; + Ok(VectorPruneCandidate { + index, + block_meta_index, + block_meta, + lower_bound, + }) + }) + .collect() + } + + fn vector_stat_lower_bound(&self, block_meta: &BlockMeta) -> Result> { + let Some(vector_stats) = block_meta.vector_stats.as_ref() else { + return Ok(None); + }; + let Some(vector_stat) = + vector_stats.get(&(self.vector_index.column_id, self.vector_distance_type)) + else { + return Ok(None); + }; + + Ok(Some(vector_distance_lower_bound( + &self.query_values, + vector_stat, + self.vector_distance_type, + )?)) + } + + fn can_skip_by_lower_bound( + &self, + lower_bound: Option, + top_queue: &FixedLengthPriorityQueue, + limit: usize, + ) -> bool { + let Some(lower_bound) = lower_bound else { + return false; + }; + if top_queue.len() < limit { + return false; + } + + top_queue + .top() + .is_some_and(|score| lower_bound > score.score.0) + } + + fn can_skip_by_filter_bound(&self, lower_bound: Option, max_score: Option) -> bool { + let (Some(lower_bound), Some(max_score)) = (lower_bound, max_score) else { + return false; + }; + + lower_bound > max_score + } + + fn block_matches_vector_filter( + &self, + filter_expr: Option<&Expr>, + vector_prune_result: &VectorPruneResult, + ) -> Result { + let Some(filter_expr) = filter_expr else { + return Ok(true); + }; + + let num_rows = vector_prune_result.block_meta.row_count as usize; + let mut builder = Vec::with_capacity(num_rows); + for score in &vector_prune_result.scores { + builder.push(F32::from(score.score)); + } + let column = Column::Number(NumberColumn::Float32(Buffer::from(builder))); + let block = DataBlock::new(vec![BlockEntry::from(column)], num_rows); + let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS); + let res = evaluator.run(filter_expr)?; + let res_column = res.into_full_column(filter_expr.data_type(), num_rows); + let res_column = res_column.remove_nullable(); + let bitmap = res_column.as_boolean().unwrap(); + + Ok(bitmap.null_count() != num_rows) + } + + fn push_scores_to_top_queue( + &self, + filter_expr: &Option, + asc: bool, + vector_prune_result: &VectorPruneResult, + top_queue: &mut FixedLengthPriorityQueue, + ) -> Result { + let mut pushed = false; + if let Some(filter_expr) = filter_expr { + let num_rows = vector_prune_result.block_meta.row_count as usize; + let mut builder = Vec::with_capacity(num_rows); + for score in &vector_prune_result.scores { + builder.push(F32::from(score.score)); + } + let column = Column::Number(NumberColumn::Float32(Buffer::from(builder))); + let block = DataBlock::new(vec![BlockEntry::from(column)], num_rows); + let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS); + let res = evaluator.run(filter_expr)?; + let res_column = res.into_full_column(filter_expr.data_type(), num_rows); + let res_column = res_column.remove_nullable(); + let bitmap = res_column.as_boolean().unwrap(); + if bitmap.null_count() == num_rows { + return Ok(false); + } + + for (idx, vector_score) in vector_prune_result.scores.iter().enumerate() { + if bitmap.get_bit(idx) { + if asc { + top_queue.push(vector_score.clone()); + } else { + top_queue.push(vector_score.negative_score()); + } + pushed = true; + } + } + return Ok(pushed); + } + + for vector_score in &vector_prune_result.scores { + if asc { + top_queue.push(vector_score.clone()); + } else { + top_queue.push(vector_score.negative_score()); + } + pushed = true; + } + Ok(pushed) + } } // result of vector index block pruning +struct VectorPruneCandidate { + index: usize, + block_meta_index: BlockMetaIndex, + block_meta: Arc, + lower_bound: Option, +} + struct VectorPruneResult { // the block index in segment block_idx: usize, @@ -539,3 +916,65 @@ impl VectorScore { } } } + +fn compare_lower_bound(left: Option, right: Option) -> Ordering { + let left = left.unwrap_or(f32::INFINITY); + let right = right.unwrap_or(f32::INFINITY); + left.partial_cmp(&right).unwrap_or(Ordering::Equal) +} + +fn vector_score_upper_bound(expr: &Expr) -> Option { + let Expr::FunctionCall(call) = expr else { + return None; + }; + + match call.function.signature.name.as_str() { + "and" | "and_filters" => call + .args + .iter() + .filter_map(vector_score_upper_bound) + .min_by(|left, right| left.partial_cmp(right).unwrap_or(Ordering::Equal)), + "lt" | "lte" => comparison_upper_bound(&call.args, true), + "gt" | "gte" => comparison_upper_bound(&call.args, false), + _ => None, + } +} + +fn comparison_upper_bound(args: &[Expr], column_on_left: bool) -> Option { + if args.len() != 2 { + return None; + } + + if column_on_left { + if is_vector_score_expr(&args[0]) { + return numeric_constant(&args[1]); + } + } else if is_vector_score_expr(&args[1]) { + return numeric_constant(&args[0]); + } + + None +} + +fn is_vector_score_expr(expr: &Expr) -> bool { + match expr { + Expr::ColumnRef(column) => column.id == 0, + Expr::Cast(cast) => is_vector_score_expr(&cast.expr), + _ => false, + } +} + +fn numeric_constant(expr: &Expr) -> Option { + match expr { + Expr::Constant(constant) => scalar_to_f32(&constant.scalar), + Expr::Cast(cast) => numeric_constant(&cast.expr), + _ => None, + } +} + +fn scalar_to_f32(scalar: &Scalar) -> Option { + match scalar { + Scalar::Number(number) => Some(number.to_f32().0), + _ => None, + } +} diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 1bb229c6f27a7..e88742a75230c 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -14,10 +14,13 @@ use std::cmp; use std::cmp::Ordering; +use std::collections::BTreeMap; use std::collections::HashMap; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::ColumnId; use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataField; @@ -25,16 +28,86 @@ use databend_common_expression::Domain; use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; +use databend_common_expression::SortColumnDescription; use databend_common_expression::TableSchema; use databend_common_expression::compare_scalars; use databend_common_expression::types::DataType; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_functions::aggregates::eval_aggr; +use databend_common_meta_app::schema::TableIndex; +use databend_common_meta_app::schema::TableIndexType; use databend_common_sql::evaluator::BlockOperator; use databend_storages_common_index::statistics_to_domain; +use databend_storages_common_index::vector_distance_type_from_index_option; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::StatisticsOfColumns; +use databend_storages_common_table_meta::meta::VectorDistanceType; use log::warn; +#[derive(Clone, Debug)] +pub struct VectorClusterInfo { + pub key_index: usize, + pub column_id: ColumnId, + pub column_name: String, + pub distance_type: VectorDistanceType, +} + +#[derive(Clone, Debug)] +pub struct VectorClusterOperator { + pub info: VectorClusterInfo, + pub vector_column_input_offset: usize, + pub vector_cluster_id_offset: usize, +} + +pub fn vector_cluster_info_from_column( + table_indexes: &BTreeMap, + key_index: usize, + column_id: ColumnId, + column_name: &str, +) -> Result { + let distance_type = vector_cluster_distance_type(table_indexes, column_id, column_name)?; + + Ok(VectorClusterInfo { + key_index, + column_id, + column_name: column_name.to_string(), + distance_type, + }) +} + +fn vector_cluster_distance_type( + table_indexes: &BTreeMap, + column_id: ColumnId, + column_name: &str, +) -> Result { + let mut distance_types = Vec::new(); + for index in table_indexes.values().filter(|index| { + index.index_type == TableIndexType::Vector && index.column_ids.contains(&column_id) + }) { + let Some(distance) = index.options.get("distance") else { + continue; + }; + for distance_type in distance + .split(',') + .filter_map(vector_distance_type_from_index_option) + { + if !distance_types.contains(&distance_type) { + distance_types.push(distance_type); + } + } + } + + match distance_types.as_slice() { + [distance_type] => Ok(*distance_type), + [] => Err(ErrorCode::InvalidClusterKeys(format!( + "Vector cluster key `{column_name}` requires a vector index with distance option" + ))), + _ => Err(ErrorCode::InvalidClusterKeys(format!( + "Vector cluster key `{column_name}` has multiple vector index distance types; use exactly one distance type for vector clustering" + ))), + } +} + #[derive(Clone, Default)] pub struct ClusterStatsGenerator { cluster_key_id: u32, @@ -46,6 +119,7 @@ pub struct ClusterStatsGenerator { pub extra_key_num: usize, pub cluster_key_index: Vec, pub operators: Vec, + pub vector_operator: Option, pub out_fields: Vec, pub func_ctx: FunctionContext, } @@ -60,6 +134,7 @@ impl ClusterStatsGenerator { level: i32, block_thresholds: BlockThresholds, operators: Vec, + vector_operator: Option, out_fields: Vec, func_ctx: FunctionContext, ) -> Self { @@ -71,11 +146,33 @@ impl ClusterStatsGenerator { level, block_thresholds, operators, + vector_operator, out_fields, func_ctx, } } + pub fn sort_descs(&self) -> Vec { + self.cluster_key_index + .iter() + .map(|offset| SortColumnDescription { + offset: *offset, + asc: true, + nulls_first: false, + }) + .collect() + } + + pub fn operator_extra_key_num(&self) -> usize { + self.operators + .iter() + .map(|op| match op { + BlockOperator::Map { exprs, .. } => exprs.len(), + BlockOperator::Project { .. } => 0, + }) + .sum() + } + // This can be used in block append. // The input block contains the cluster key block. pub fn gen_stats_for_append( @@ -126,20 +223,30 @@ impl ClusterStatsGenerator { if self.cluster_key_index.is_empty() { return Ok(None); } - let mut min = Vec::with_capacity(self.cluster_key_index.len()); - let mut max = Vec::with_capacity(self.cluster_key_index.len()); - - for key in self.cluster_key_index.iter() { - let val = data_block.get_by_offset(*key); - let left = unsafe { val.index_unchecked(0) }.to_owned(); - min.push(left); - - // The maximum in cluster statistics neednot larger than the non-trimmed one. - // So we use trim_min directly. - let right = unsafe { val.index_unchecked(val.value().len() - 1) }.to_owned(); - max.push(right); + let vector_cluster_id_offset = self.vector_cluster_id_offset(); + let scalar_cluster_key_index = self.scalar_cluster_key_index(vector_cluster_id_offset); + let mut min = Vec::with_capacity(scalar_cluster_key_index.len()); + let mut max = Vec::with_capacity(scalar_cluster_key_index.len()); + + let vector_key_position = vector_cluster_id_offset + .and_then(|offset| self.cluster_key_index.iter().position(|key| *key == offset)) + .unwrap_or(self.cluster_key_index.len()); + for (key_index, key) in scalar_cluster_key_index.iter().copied() { + if key_index < vector_key_position { + let val = data_block.get_by_offset(key); + let left = unsafe { val.index_unchecked(0) }.to_owned(); + min.push(left); + + // The maximum in cluster statistics neednot larger than the non-trimmed one. + // So we use trim_min directly. + let right = unsafe { val.index_unchecked(val.value().len() - 1) }.to_owned(); + max.push(right); + } else { + let (left, right) = aggregate_cluster_key_min_max(data_block, key)?; + min.push(left); + max.push(right); + } } - debug_assert!( min.iter() .map(Scalar::as_ref) @@ -148,7 +255,8 @@ impl ClusterStatsGenerator { "cluster statistics: min > max, data may not be sorted by cluster key" ); - let level = if min == max + let level = if self.vector_operator.is_none() + && min == max && self.block_thresholds.check_large_enough( data_block.num_rows(), data_block.estimate_block_size(data_block.num_columns() - self.extra_key_num), @@ -161,9 +269,9 @@ impl ClusterStatsGenerator { let pages = if let Some(max_page_size) = self.max_page_size { let mut values = Vec::with_capacity(data_block.num_rows() / max_page_size + 1); for start in (0..data_block.num_rows()).step_by(max_page_size) { - let mut tuple_values = Vec::with_capacity(self.cluster_key_index.len()); - for key in self.cluster_key_index.iter() { - let val = data_block.get_by_offset(*key); + let mut tuple_values = Vec::with_capacity(scalar_cluster_key_index.len()); + for (_, key) in scalar_cluster_key_index.iter().copied() { + let val = data_block.get_by_offset(key); let left = unsafe { val.index_unchecked(start) }; tuple_values.push(left.to_owned()); } @@ -182,6 +290,24 @@ impl ClusterStatsGenerator { pages, ))) } + + fn vector_cluster_id_offset(&self) -> Option { + self.vector_operator + .as_ref() + .map(|vector_operator| vector_operator.vector_cluster_id_offset) + } + + fn scalar_cluster_key_index( + &self, + vector_cluster_id_offset: Option, + ) -> Vec<(usize, usize)> { + self.cluster_key_index + .iter() + .copied() + .enumerate() + .filter(|(_, key)| Some(*key) != vector_cluster_id_offset) + .collect() + } } pub fn sort_by_cluster_stats( @@ -212,6 +338,20 @@ pub fn sort_by_cluster_stats( } } +pub fn aggregate_cluster_key_min_max( + data_block: &DataBlock, + key: usize, +) -> Result<(Scalar, Scalar)> { + let entry = data_block.get_by_offset(key).clone(); + let entries = [entry]; + let (min, _) = eval_aggr("min", vec![], &entries, data_block.num_rows(), vec![])?; + let (max, _) = eval_aggr("max", vec![], &entries, data_block.num_rows(), vec![])?; + Ok(( + min.index(0).unwrap().to_owned(), + max.index(0).unwrap().to_owned(), + )) +} + #[derive(Clone, Copy, Default)] pub struct BlockOverlapDepth { pub overlap: usize, @@ -289,6 +429,20 @@ pub fn get_min_max_stats( let mut mins = Vec::with_capacity(exprs.len()); let mut maxs = Vec::with_capacity(exprs.len()); for expr in exprs { + if matches!(expr.data_type().remove_nullable(), DataType::Vector(_)) { + if let Expr::ColumnRef(column_ref) = expr { + let column_ids = schema.field(column_ref.id).leaf_column_ids(); + if let Some(stats) = column_ids + .first() + .and_then(|column_id| col_stats.get(column_id)) + { + mins.push(stats.min().clone()); + maxs.push(stats.max().clone()); + continue; + } + } + } + // Since the hilbert index does not calc domain, set min max directly. if expr.data_type().remove_nullable() == DataType::Binary { mins.push(Scalar::Binary(vec![])); diff --git a/src/query/storages/fuse/src/statistics/mod.rs b/src/query/storages/fuse/src/statistics/mod.rs index ec4400714d321..67d40de97f3d1 100644 --- a/src/query/storages/fuse/src/statistics/mod.rs +++ b/src/query/storages/fuse/src/statistics/mod.rs @@ -25,9 +25,13 @@ pub use accumulator::RowOrientedSegmentBuilder; pub use accumulator::VirtualColumnAccumulator; pub use cluster_statistics::BlockOverlapDepth; pub use cluster_statistics::ClusterStatsGenerator; +pub use cluster_statistics::VectorClusterInfo; +pub use cluster_statistics::VectorClusterOperator; +pub use cluster_statistics::aggregate_cluster_key_min_max; pub use cluster_statistics::calculate_block_overlap_depths; pub use cluster_statistics::get_min_max_stats; pub use cluster_statistics::sort_by_cluster_stats; +pub use cluster_statistics::vector_cluster_info_from_column; pub use column_statistic::END_OF_UNICODE_RANGE; pub use column_statistic::STATS_STRING_PREFIX_LEN; pub use column_statistic::Trim; diff --git a/src/query/storages/fuse/src/statistics/reducers.rs b/src/query/storages/fuse/src/statistics/reducers.rs index 5a8ed54751bef..6972049afe173 100644 --- a/src/query/storages/fuse/src/statistics/reducers.rs +++ b/src/query/storages/fuse/src/statistics/reducers.rs @@ -305,7 +305,8 @@ pub fn reduce_cluster_statistics>>( .map(Scalar::as_ref) .cmp(y.iter().map(Scalar::as_ref)) }) - .unwrap(); + .unwrap() + .clone(); let max = max_stats .into_iter() .max_by(|x, y| { @@ -313,13 +314,14 @@ pub fn reduce_cluster_statistics>>( .map(Scalar::as_ref) .cmp(y.iter().map(Scalar::as_ref)) }) - .unwrap(); + .unwrap() + .clone(); let level = levels.into_iter().max().unwrap_or(0); Some(ClusterStatistics::new( cluster_key_id, - min.clone(), - max.clone(), + min, + max, level, None, )) diff --git a/src/query/storages/fuse/src/table_functions/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information.rs index 469bec1b7cd7c..7051cf1357cfa 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information.rs @@ -46,6 +46,7 @@ use crate::FuseTable; use crate::Table; use crate::io::SegmentsIO; use crate::sessions::TableContext; +use crate::statistics::BlockOverlapDepth; use crate::statistics::calculate_block_overlap_depths; use crate::statistics::get_min_max_stats; use crate::table_functions::SimpleArgFunc; @@ -223,6 +224,21 @@ impl<'a> ClusteringInformationImpl<'a> { let snapshot = snapshot.unwrap(); let schema = self.table.schema(); + let scalar_exprs = exprs + .into_iter() + .filter(|expr| !matches!(expr.data_type().remove_nullable(), DataType::Vector(_))) + .collect::>(); + let scalar_cluster_key_types = scalar_exprs + .iter() + .map(|v| { + let data_type = v.data_type(); + if matches!(*data_type, DataType::String) { + data_type.wrap_nullable() + } else { + data_type.clone() + } + }) + .collect::>(); let mut ranges = Vec::with_capacity(snapshot.summary.block_count as usize); let mut constant_block_count = 0; @@ -240,7 +256,7 @@ impl<'a> ClusteringInformationImpl<'a> { for segment in segments.into_iter().flatten() { for block in segment.blocks { let (min, max) = get_min_max_stats( - &exprs, + &scalar_exprs, &block.col_stats, block.cluster_stats.as_ref(), default_cluster_key_id, @@ -256,18 +272,11 @@ impl<'a> ClusteringInformationImpl<'a> { } drop(snapshot); - let cluster_key_types = exprs - .into_iter() - .map(|v| { - let data_type = v.data_type(); - if matches!(*data_type, DataType::String) { - data_type.wrap_nullable() - } else { - data_type.clone() - } - }) - .collect::>(); - let stats = calculate_block_overlap_depths(&ranges, &cluster_key_types)?; + let stats = if scalar_cluster_key_types.is_empty() { + vec![BlockOverlapDepth::default(); ranges.len()] + } else { + calculate_block_overlap_depths(&ranges, &scalar_cluster_key_types)? + }; let mut sum_overlap = 0; let mut sum_depth = 0; diff --git a/src/query/storages/fuse/src/table_functions/clustering_statistics.rs b/src/query/storages/fuse/src/table_functions/clustering_statistics.rs index 473378b60308e..32bf9d423529f 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_statistics.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_statistics.rs @@ -38,6 +38,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::FuseTable; use crate::io::SegmentsIO; use crate::sessions::TableContext; +use crate::statistics::BlockOverlapDepth; use crate::statistics::calculate_block_overlap_depths; use crate::statistics::get_min_max_stats; use crate::table_functions::TableMetaFunc; @@ -93,7 +94,11 @@ impl TableMetaFunc for ClusteringStatistics { let cluster_keys = tbl.resolve_cluster_keys().unwrap(); let exprs = parse_cluster_keys(ctx.clone(), Arc::new(tbl.clone()), cluster_keys)?; - let cluster_key_types = exprs + let scalar_exprs = exprs + .into_iter() + .filter(|expr| !matches!(expr.data_type().remove_nullable(), DataType::Vector(_))) + .collect::>(); + let scalar_cluster_key_types = scalar_exprs .iter() .map(|v| { let data_type = v.data_type(); @@ -137,7 +142,7 @@ impl TableMetaFunc for ClusteringStatistics { for block in segment.blocks.iter() { let block = block.as_ref(); let (min, max) = get_min_max_stats( - &exprs, + &scalar_exprs, &block.col_stats, block.cluster_stats.as_ref(), Some(cluster_key_id), @@ -159,7 +164,11 @@ impl TableMetaFunc for ClusteringStatistics { } } - let block_depths = calculate_block_overlap_depths(&ranges, &cluster_key_types)?; + let block_depths = if scalar_cluster_key_types.is_empty() { + vec![BlockOverlapDepth::default(); ranges.len()] + } else { + calculate_block_overlap_depths(&ranges, &scalar_cluster_key_types)? + }; let mut segment_name = StringColumnBuilder::with_capacity(output_len); let mut block_name = StringColumnBuilder::with_capacity(output_len); let mut min = Vec::with_capacity(output_len); diff --git a/src/query/storages/fuse/src/table_functions/fuse_block_statistics.rs b/src/query/storages/fuse/src/table_functions/fuse_block_statistics.rs index 24002589ba93f..aee2ba3c3729e 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_block_statistics.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_block_statistics.rs @@ -25,6 +25,7 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRefExt; +use databend_common_expression::types::Float32Type; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberScalar; use databend_common_expression::types::StringType; @@ -32,9 +33,12 @@ use databend_common_expression::types::UInt64Type; use databend_common_expression::types::VariantType; use databend_common_expression::types::number::F64; use databend_common_expression::types::variant::cast_scalar_to_variant; +use databend_storages_common_index::vector_distance_type_name; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SpatialStatistics; use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::VectorColumnStatistics; +use databend_storages_common_table_meta::meta::VectorDistanceType; use crate::FuseTable; use crate::io::SegmentsIO; @@ -60,6 +64,10 @@ impl TableMetaFunc for FuseBlockStatistics { "spatial_statistics", TableDataType::Nullable(Box::new(TableDataType::Variant)), ), + TableField::new( + "vector_statistics", + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), ]) } @@ -82,6 +90,7 @@ impl TableMetaFunc for FuseBlockStatistics { let mut column_names = Vec::with_capacity(estimated_rows); let mut statistics = Vec::with_capacity(estimated_rows); let mut spatial_statistics = Vec::with_capacity(estimated_rows); + let mut vector_statistics = Vec::with_capacity(estimated_rows); let segments_io = SegmentsIO::create(ctx.clone(), tbl.operator.clone(), schema.clone()); @@ -105,6 +114,10 @@ impl TableMetaFunc for FuseBlockStatistics { .spatial_stats .as_ref() .map(|stats| stats.iter().collect::>()); + let vector_stats = block + .vector_stats + .as_ref() + .map(|stats| stats.iter().collect::>()); for (column_id, column_stat) in col_stats { let Ok(field) = schema.field_of_column_id(*column_id) else { @@ -120,6 +133,7 @@ impl TableMetaFunc for FuseBlockStatistics { ); statistics.push(Some(stat)); spatial_statistics.push(None); + vector_statistics.push(None); num_rows += 1; if num_rows >= limit { @@ -138,6 +152,31 @@ impl TableMetaFunc for FuseBlockStatistics { statistics.push(None); let stat = build_spatial_statistics_variant(spatial_stat, &func_ctx); spatial_statistics.push(Some(stat)); + vector_statistics.push(None); + + num_rows += 1; + if num_rows >= limit { + break 'outer; + } + } + } + + if let Some(vector_stats) = &vector_stats { + for ((column_id, distance_type), vector_stat) in vector_stats { + let Ok(field) = schema.field_of_column_id(*column_id) else { + continue; + }; + block_locations.push(block.location.0.clone()); + column_ids.push(*column_id as u64); + column_names.push(field.name().to_string()); + statistics.push(None); + spatial_statistics.push(None); + let stat = build_vector_statistics_variant( + *distance_type, + vector_stat, + &func_ctx, + ); + vector_statistics.push(Some(stat)); num_rows += 1; if num_rows >= limit { @@ -155,6 +194,7 @@ impl TableMetaFunc for FuseBlockStatistics { StringType::from_data(column_names), VariantType::from_opt_data(statistics), VariantType::from_opt_data(spatial_statistics), + VariantType::from_opt_data(vector_statistics), ])) } } @@ -234,6 +274,38 @@ fn build_spatial_statistics_variant( build_variant(scalar, &data_type, func_ctx) } +fn build_vector_statistics_variant( + distance_type: VectorDistanceType, + vector_stat: &VectorColumnStatistics, + func_ctx: &FunctionContext, +) -> Vec { + let scalar = Scalar::Tuple(vec![ + Scalar::String(vector_distance_type_name(distance_type).to_string()), + Scalar::Array(Float32Type::from_data( + vector_stat + .centroid + .iter() + .map(|value| value.0) + .collect::>(), + )), + Scalar::Number(NumberScalar::Float32(vector_stat.radius)), + ]); + let data_type = TableDataType::Tuple { + fields_name: vec![ + "distance_type".to_string(), + "centroid".to_string(), + "radius".to_string(), + ], + fields_type: vec![ + TableDataType::String, + TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::Float32))), + TableDataType::Number(NumberDataType::Float32), + ], + }; + + build_variant(scalar, &data_type, func_ctx) +} + fn build_variant(scalar: Scalar, data_type: &TableDataType, func_ctx: &FunctionContext) -> Vec { let mut buf = Vec::new(); cast_scalar_to_variant(scalar.as_ref(), &func_ctx.tz, &mut buf, Some(data_type));