Skip to content

Commit f388dbd

Browse files
authored
fix: fix index and tag filtering for flat format (#7121)
* perf: only decode primary keys in the batch Signed-off-by: evenyag <realevenyag@gmail.com> * fix: don't push none to creator Signed-off-by: evenyag <realevenyag@gmail.com> * chore: implement method to filter __table_id for sparse encoding Signed-off-by: evenyag <realevenyag@gmail.com> * feat: filter table id for sparse encoding separately The __table_id doesn't present in projection so we have to filter it manually Signed-off-by: evenyag <realevenyag@gmail.com> * fix: decode tags for sparse encoding when building bloom filter Signed-off-by: evenyag <realevenyag@gmail.com> * feat: support inverted index for tags under sparse encoding Signed-off-by: evenyag <realevenyag@gmail.com> * feat: skip tag columns in fulltext index Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix warnings Signed-off-by: evenyag <realevenyag@gmail.com> * style: fix clippy Signed-off-by: evenyag <realevenyag@gmail.com> * test: fix list index metadata test Signed-off-by: evenyag <realevenyag@gmail.com> * fix: decode primary key columns to filter When primary key columns are not in projection but in filters, we need to decode them in compute_filter_mask_flat Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: reuse filter method Signed-off-by: evenyag <realevenyag@gmail.com> * fix: only use dictionary for string type in compat Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: safe to get column by creator's column id Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
1 parent 136b9ee commit f388dbd

11 files changed

Lines changed: 482 additions & 251 deletions

File tree

src/mito2/src/engine/basic_test.rs

Lines changed: 62 additions & 36 deletions
Large diffs are not rendered by default.

src/mito2/src/memtable/bulk/part_reader.rs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,10 @@
1313
// limitations under the License.
1414

1515
use std::collections::VecDeque;
16-
use std::ops::BitAnd;
1716
use std::sync::Arc;
1817

1918
use bytes::Bytes;
2019
use datatypes::arrow::array::BooleanArray;
21-
use datatypes::arrow::buffer::BooleanBuffer;
2220
use datatypes::arrow::record_batch::RecordBatch;
2321
use parquet::arrow::ProjectionMask;
2422
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
@@ -30,7 +28,7 @@ use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
3028
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
3129
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
3230
use crate::sst::parquet::flat_format::sequence_column_index;
33-
use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext};
31+
use crate::sst::parquet::reader::RowGroupReaderContext;
3432

3533
/// Iterator for reading data inside a bulk part.
3634
pub struct EncodedBulkPartIter {
@@ -191,38 +189,13 @@ fn apply_combined_filters(
191189
let num_rows = record_batch.num_rows();
192190
let mut combined_filter = None;
193191

194-
// First, apply predicate filters.
192+
// First, apply predicate filters using the shared method.
195193
if !context.base.filters.is_empty() {
196-
let num_rows = record_batch.num_rows();
197-
let mut mask = BooleanBuffer::new_set(num_rows);
198-
199-
// Run filter one by one and combine them result, similar to RangeBase::precise_filter
200-
for filter_ctx in &context.base.filters {
201-
let filter = match filter_ctx.filter() {
202-
MaybeFilter::Filter(f) => f,
203-
// Column matches.
204-
MaybeFilter::Matched => continue,
205-
// Column doesn't match, filter the entire batch.
206-
MaybeFilter::Pruned => return Ok(None),
207-
};
208-
209-
// Safety: We checked the format type in new().
210-
let Some(column_index) = context
211-
.read_format()
212-
.as_flat()
213-
.unwrap()
214-
.projected_index_by_id(filter_ctx.column_id())
215-
else {
216-
continue;
217-
};
218-
let array = record_batch.column(column_index);
219-
let result = filter
220-
.evaluate_array(array)
221-
.context(crate::error::RecordBatchSnafu)?;
222-
223-
mask = mask.bitand(&result);
224-
}
225-
// Convert the mask to BooleanArray
194+
let predicate_mask = context.base.compute_filter_mask_flat(&record_batch)?;
195+
// If predicate filters out the entire batch, return None early
196+
let Some(mask) = predicate_mask else {
197+
return Ok(None);
198+
};
226199
combined_filter = Some(BooleanArray::from(mask));
227200
}
228201

src/mito2/src/read/compat.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ impl FlatCompatBatch {
386386
/// Repeats the vector value `to_len` times.
387387
fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
388388
assert_eq!(1, vector.len());
389-
if is_tag {
389+
let data_type = vector.data_type();
390+
if is_tag && data_type.is_string() {
390391
let values = vector.to_arrow_array();
391392
if values.is_null(0) {
392393
// Creates a dictionary array with `to_len` null keys.

src/mito2/src/read/flat_projection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub struct FlatProjectionMapper {
4848
/// Ids of columns to project. It keeps ids in the same order as the `projection`
4949
/// indices to build the mapper.
5050
/// The mapper won't deduplicate the column ids.
51+
///
52+
/// Note that this doesn't contain the `__table_id` and `__tsid`.
5153
column_ids: Vec<ColumnId>,
5254
/// Ids and DataTypes of columns of the expected batch.
5355
/// We can use this to check if the batch is compatible with the expected schema.

src/mito2/src/sst/index.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@ use std::sync::Arc;
2626

2727
use bloom_filter::creator::BloomFilterIndexer;
2828
use common_telemetry::{debug, info, warn};
29+
use datatypes::arrow::array::BinaryArray;
2930
use datatypes::arrow::record_batch::RecordBatch;
31+
use mito_codec::index::IndexValuesCodec;
32+
use mito_codec::row_converter::CompositeValues;
3033
use puffin_manager::SstPuffinManager;
3134
use smallvec::{SmallVec, smallvec};
32-
use snafu::ResultExt;
35+
use snafu::{OptionExt, ResultExt};
3336
use statistics::{ByteCount, RowCount};
3437
use store_api::metadata::RegionMetadataRef;
3538
use store_api::storage::{ColumnId, FileId, RegionId};
@@ -40,7 +43,7 @@ use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, Regio
4043
use crate::cache::file_cache::{FileType, IndexKey};
4144
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
4245
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
43-
use crate::error::{BuildIndexAsyncSnafu, Error, Result};
46+
use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result};
4447
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
4548
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
4649
use crate::read::{Batch, BatchReader};
@@ -57,6 +60,8 @@ use crate::sst::index::fulltext_index::creator::FulltextIndexer;
5760
use crate::sst::index::intermediate::IntermediateManager;
5861
use crate::sst::index::inverted_index::creator::InvertedIndexer;
5962
use crate::sst::parquet::SstInfo;
63+
use crate::sst::parquet::flat_format::primary_key_column_index;
64+
use crate::sst::parquet::format::PrimaryKeyArray;
6065

6166
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
6267
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
@@ -698,6 +703,56 @@ impl IndexBuildScheduler {
698703
}
699704
}
700705

706+
/// Decodes primary keys from a flat format RecordBatch.
707+
/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
708+
pub(crate) fn decode_primary_keys_with_counts(
709+
batch: &RecordBatch,
710+
codec: &IndexValuesCodec,
711+
) -> Result<Vec<(CompositeValues, usize)>> {
712+
let primary_key_index = primary_key_column_index(batch.num_columns());
713+
let pk_dict_array = batch
714+
.column(primary_key_index)
715+
.as_any()
716+
.downcast_ref::<PrimaryKeyArray>()
717+
.context(InvalidRecordBatchSnafu {
718+
reason: "Primary key column is not a dictionary array",
719+
})?;
720+
let pk_values_array = pk_dict_array
721+
.values()
722+
.as_any()
723+
.downcast_ref::<BinaryArray>()
724+
.context(InvalidRecordBatchSnafu {
725+
reason: "Primary key values are not binary array",
726+
})?;
727+
let keys = pk_dict_array.keys();
728+
729+
// Decodes primary keys and count consecutive occurrences
730+
let mut result: Vec<(CompositeValues, usize)> = Vec::new();
731+
let mut prev_key: Option<u32> = None;
732+
733+
for i in 0..keys.len() {
734+
let current_key = keys.value(i);
735+
736+
// Checks if current key is the same as previous key
737+
if let Some(prev) = prev_key
738+
&& prev == current_key
739+
{
740+
// Safety: We already have a key in the result vector.
741+
result.last_mut().unwrap().1 += 1;
742+
continue;
743+
}
744+
745+
// New key, decodes it.
746+
let pk_bytes = pk_values_array.value(current_key as usize);
747+
let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
748+
749+
result.push((decoded_value, 1));
750+
prev_key = Some(current_key);
751+
}
752+
753+
Ok(result)
754+
}
755+
701756
#[cfg(test)]
702757
mod tests {
703758
use std::sync::Arc;

src/mito2/src/sst/index/bloom_filter/creator.rs

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ use std::collections::HashMap;
1616
use std::sync::Arc;
1717
use std::sync::atomic::AtomicUsize;
1818

19+
use api::v1::SemanticType;
1920
use common_telemetry::{debug, warn};
2021
use datatypes::arrow::record_batch::RecordBatch;
2122
use datatypes::schema::SkippingIndexType;
2223
use datatypes::vectors::Helper;
2324
use index::bloom_filter::creator::BloomFilterCreator;
2425
use index::target::IndexTarget;
2526
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
26-
use mito_codec::row_converter::SortField;
27+
use mito_codec::row_converter::{CompositeValues, SortField};
2728
use puffin::puffin_manager::{PuffinWriter, PutOptions};
2829
use snafu::{ResultExt, ensure};
30+
use store_api::codec::PrimaryKeyEncoding;
2931
use store_api::metadata::RegionMetadataRef;
3032
use store_api::storage::{ColumnId, FileId};
3133
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
@@ -35,13 +37,13 @@ use crate::error::{
3537
OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
3638
};
3739
use crate::read::Batch;
38-
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
3940
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
4041
use crate::sst::index::intermediate::{
4142
IntermediateLocation, IntermediateManager, TempFileProvider,
4243
};
4344
use crate::sst::index::puffin_manager::SstPuffinWriter;
4445
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46+
use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, decode_primary_keys_with_counts};
4547

4648
/// The buffer size for the pipe used to send index data to the puffin blob.
4749
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
@@ -289,47 +291,81 @@ impl BloomFilterIndexer {
289291
let n = batch.num_rows();
290292
guard.inc_row_count(n);
291293

292-
for (col_id, creator) in &mut self.creators {
293-
// Get the column name from metadata
294-
if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
295-
let column_name = &column_meta.column_schema.name;
294+
let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
295+
let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
296296

297-
// Find the column in the RecordBatch by name
298-
if let Some(column_array) = batch.column_by_name(column_name) {
299-
// Convert Arrow array to VectorRef
300-
let vector = Helper::try_into_vector(column_array.clone())
301-
.context(crate::error::ConvertVectorSnafu)?;
302-
let sort_field = SortField::new(vector.data_type());
297+
for (col_id, creator) in &mut self.creators {
298+
// Safety: `creators` are created from the metadata so it won't be None.
299+
let column_meta = self.metadata.column_by_id(*col_id).unwrap();
300+
let column_name = &column_meta.column_schema.name;
301+
if let Some(column_array) = batch.column_by_name(column_name) {
302+
// Convert Arrow array to VectorRef
303+
let vector = Helper::try_into_vector(column_array.clone())
304+
.context(crate::error::ConvertVectorSnafu)?;
305+
let sort_field = SortField::new(vector.data_type());
306+
307+
for i in 0..n {
308+
let value = vector.get_ref(i);
309+
let elems = (!value.is_null())
310+
.then(|| {
311+
let mut buf = vec![];
312+
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
313+
.context(EncodeSnafu)?;
314+
Ok(buf)
315+
})
316+
.transpose()?;
303317

304-
for i in 0..n {
305-
let value = vector.get_ref(i);
306-
let elems = (!value.is_null())
307-
.then(|| {
308-
let mut buf = vec![];
309-
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
310-
.context(EncodeSnafu)?;
311-
Ok(buf)
312-
})
313-
.transpose()?;
318+
creator
319+
.push_row_elems(elems)
320+
.await
321+
.context(PushBloomFilterValueSnafu)?;
322+
}
323+
} else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
324+
// Column not found in batch, tries to decode from primary keys for sparse encoding.
325+
if decoded_pks.is_none() {
326+
decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
327+
}
314328

315-
creator
316-
.push_row_elems(elems)
317-
.await
318-
.context(PushBloomFilterValueSnafu)?;
319-
}
320-
} else {
329+
let pk_values_with_counts = decoded_pks.as_ref().unwrap();
330+
let Some(col_info) = self.codec.pk_col_info(*col_id) else {
321331
debug!(
322-
"Column {} not found in the batch during building bloom filter index",
332+
"Column {} not found in primary key during building bloom filter index",
323333
column_name
324334
);
325-
// Push empty elements to maintain alignment
326-
for _ in 0..n {
327-
creator
328-
.push_row_elems(None)
329-
.await
330-
.context(PushBloomFilterValueSnafu)?;
331-
}
335+
continue;
336+
};
337+
let pk_index = col_info.idx;
338+
let field = &col_info.field;
339+
for (decoded, count) in pk_values_with_counts {
340+
let value = match decoded {
341+
CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
342+
CompositeValues::Sparse(sparse) => sparse.get(col_id),
343+
};
344+
345+
let elems = value
346+
.filter(|v| !v.is_null())
347+
.map(|v| {
348+
let mut buf = vec![];
349+
IndexValueCodec::encode_nonnull_value(
350+
v.as_value_ref(),
351+
field,
352+
&mut buf,
353+
)
354+
.context(EncodeSnafu)?;
355+
Ok(buf)
356+
})
357+
.transpose()?;
358+
359+
creator
360+
.push_n_row_elems(*count, elems)
361+
.await
362+
.context(PushBloomFilterValueSnafu)?;
332363
}
364+
} else {
365+
debug!(
366+
"Column {} not found in the batch during building bloom filter index",
367+
column_name
368+
);
333369
}
334370
}
335371

src/mito2/src/sst/index/fulltext_index/creator.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::HashMap;
1616
use std::sync::Arc;
1717
use std::sync::atomic::AtomicUsize;
1818

19+
use api::v1::SemanticType;
1920
use common_telemetry::warn;
2021
use datatypes::arrow::array::{Array, LargeStringArray, StringArray};
2122
use datatypes::arrow::datatypes::DataType;
@@ -69,6 +70,17 @@ impl FulltextIndexer {
6970
let mut creators = HashMap::new();
7071

7172
for column in &metadata.column_metadatas {
73+
// Tag columns don't support fulltext index now.
74+
// If we need to support fulltext index for tag columns, we also need to parse
75+
// the codec and handle sparse encoding for flat format specially.
76+
if column.semantic_type == SemanticType::Tag {
77+
common_telemetry::debug!(
78+
"Skip creating fulltext index for tag column {}",
79+
column.column_schema.name
80+
);
81+
continue;
82+
}
83+
7284
let options = column
7385
.column_schema
7486
.fulltext_options()

0 commit comments

Comments
 (0)