Skip to content

Commit 34b1329

Browse files
authored
refactor(cubestore): Improve InfoSchemaTableDef memory usage with single-pass builders (cube-js#10681)
Replace closure-per-column pattern with into_iter() + Arrow builders. Each row's heap data is freed incrementally during iteration instead of keeping all source data alive until RecordBatch creation. Removes Arc wrapping, Box<dyn Fn> allocations, and reduces N passes to 1. | Total cache | Per-row value (avg) | N | OLD peak Δ | NEW peak Δ | Savings | OLD allocs | NEW allocs | OLD time | NEW time | | ----------- | ------------------- | -----: | -----------: | ---------------: | -----------------------: | ---------: | ---------: | -------: | -------: | | ~11 MB | ~1 KB | 10 000 | 16.76 MB | **10.79 MB** | **−5.97 MB (−36 %)** | 26 736 | 46 | 4.1 ms | 2.0 ms | | ~55 MB | ~5 KB | 10 000 | 57.77 MB | **40.17 MB** | **−17.6 MB (−30 %)** | 26 736 | 48 | 10.3 ms | 9.5 ms | | ~127 MB | ~13 KB | 10 000 | 150.78 MB | **79.28 MB** | **−71.5 MB (−47 %)** | 26 737 | 49 | 18.0 ms | 15.0 ms | | ~257 MB | ~26 KB | 10 000 | 438.81 MB | **157.40 MB** | **−281 MB (−64 %)** | 26 737 | 50 | 53.1 ms | 34.1 ms | | ~1.5 GB | **512 KB (fixed)** | 3 000 † | 2 048.70 MB | **1 024.34 MB** | **−1 024 MB (−50 %)** | 8 065 | 54 | 804 ms | 671 ms | | ~1.5 GB | **1 MB (fixed)** | 1 500 † | 2 049.10 MB | **1 024.17 MB** | **−1 025 MB (−50 %)** | 4 062 | 53 | 85.6 ms | 61.1 ms |
1 parent df1b7e7 commit 34b1329

15 files changed

+483
-752
lines changed

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::metastore::Column;
33
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
44
use crate::CubeError;
55
use async_trait::async_trait;
6-
use datafusion::arrow::array::{ArrayRef, StringArray};
6+
use datafusion::arrow::array::{ArrayRef, StringBuilder};
77
use datafusion::arrow::datatypes::{DataType, Field};
88
use std::sync::Arc;
99

@@ -17,7 +17,7 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef {
1717
&self,
1818
ctx: InfoSchemaTableDefContext,
1919
_limit: Option<usize>,
20-
) -> Result<Arc<Vec<(Column, TablePath)>>, CubeError> {
20+
) -> Result<Vec<(Column, TablePath)>, CubeError> {
2121
let rows = ctx.meta_store.get_tables_with_path(false).await?;
2222
let mut res = Vec::new();
2323

@@ -28,7 +28,7 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef {
2828
}
2929
}
3030

31-
Ok(Arc::new(res))
31+
Ok(res)
3232
}
3333

3434
fn schema(&self) -> Vec<Field> {
@@ -40,34 +40,25 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef {
4040
]
4141
}
4242

43-
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<(Column, TablePath)>>) -> ArrayRef>> {
43+
fn columns(&self, rows: Vec<(Column, TablePath)>) -> Vec<ArrayRef> {
44+
let num_rows = rows.len();
45+
let mut schema_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
46+
let mut table_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
47+
let mut column_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
48+
let mut type_builder = StringBuilder::with_capacity(num_rows, num_rows * 16);
49+
50+
for (column, table_path) in rows.into_iter() {
51+
schema_builder.append_value(table_path.schema.get_row().get_name());
52+
table_builder.append_value(table_path.table.get_row().get_table_name());
53+
column_builder.append_value(column.get_name());
54+
type_builder.append_value(column.get_column_type().to_string());
55+
}
56+
4457
vec![
45-
Box::new(|tables| {
46-
Arc::new(StringArray::from_iter_values(
47-
tables
48-
.iter()
49-
.map(|(_, row)| row.schema.get_row().get_name()),
50-
))
51-
}),
52-
Box::new(|tables| {
53-
Arc::new(StringArray::from_iter_values(
54-
tables
55-
.iter()
56-
.map(|(_, row)| row.table.get_row().get_table_name()),
57-
))
58-
}),
59-
Box::new(|tables| {
60-
Arc::new(StringArray::from_iter_values(
61-
tables.iter().map(|(column, _)| column.get_name()),
62-
))
63-
}),
64-
Box::new(|tables| {
65-
Arc::new(StringArray::from_iter_values(
66-
tables
67-
.iter()
68-
.map(|(column, _)| column.get_column_type().to_string()),
69-
))
70-
}),
58+
Arc::new(schema_builder.finish()),
59+
Arc::new(table_builder.finish()),
60+
Arc::new(column_builder.finish()),
61+
Arc::new(type_builder.finish()),
7162
]
7263
}
7364
}

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::metastore::{IdRow, MetaStoreTable, Schema};
22
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
33
use crate::CubeError;
44
use async_trait::async_trait;
5-
use datafusion::arrow::array::{ArrayRef, StringArray};
5+
use datafusion::arrow::array::{ArrayRef, StringBuilder};
66
use datafusion::arrow::datatypes::{DataType, Field};
77
use std::sync::Arc;
88

@@ -16,20 +16,23 @@ impl InfoSchemaTableDef for SchemataInfoSchemaTableDef {
1616
&self,
1717
ctx: InfoSchemaTableDefContext,
1818
_limit: Option<usize>,
19-
) -> Result<Arc<Vec<Self::T>>, CubeError> {
20-
Ok(Arc::new(ctx.meta_store.schemas_table().all_rows().await?))
19+
) -> Result<Vec<Self::T>, CubeError> {
20+
Ok(ctx.meta_store.schemas_table().all_rows().await?)
2121
}
2222

2323
fn schema(&self) -> Vec<Field> {
2424
vec![Field::new("schema_name", DataType::Utf8, false)]
2525
}
2626

27-
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
28-
vec![Box::new(|tables| {
29-
Arc::new(StringArray::from_iter_values(
30-
tables.iter().map(|row| row.get_row().get_name()),
31-
))
32-
})]
27+
fn columns(&self, rows: Vec<Self::T>) -> Vec<ArrayRef> {
28+
let num_rows = rows.len();
29+
let mut name_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
30+
31+
for row in rows.into_iter() {
32+
name_builder.append_value(row.get_row().get_name());
33+
}
34+
35+
vec![Arc::new(name_builder.finish())]
3336
}
3437
}
3538

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::queryplanner::info_schema::timestamp_nanos_or_panic;
33
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
44
use crate::CubeError;
55
use async_trait::async_trait;
6-
use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
6+
use datafusion::arrow::array::{ArrayRef, StringBuilder, TimestampNanosecondBuilder};
77
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
88
use std::sync::Arc;
99

@@ -17,8 +17,10 @@ impl InfoSchemaTableDef for TablesInfoSchemaTableDef {
1717
&self,
1818
ctx: InfoSchemaTableDefContext,
1919
_limit: Option<usize>,
20-
) -> Result<Arc<Vec<TablePath>>, CubeError> {
21-
ctx.meta_store.get_tables_with_path(false).await
20+
) -> Result<Vec<TablePath>, CubeError> {
21+
Ok(Arc::unwrap_or_clone(
22+
ctx.meta_store.get_tables_with_path(false).await?,
23+
))
2224
}
2325

2426
fn schema(&self) -> Vec<Field> {
@@ -38,42 +40,31 @@ impl InfoSchemaTableDef for TablesInfoSchemaTableDef {
3840
]
3941
}
4042

41-
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
43+
fn columns(&self, rows: Vec<Self::T>) -> Vec<ArrayRef> {
44+
let num_rows = rows.len();
45+
let mut schema_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
46+
let mut name_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
47+
let mut range_end_builder = TimestampNanosecondBuilder::with_capacity(num_rows);
48+
let mut seal_builder = TimestampNanosecondBuilder::with_capacity(num_rows);
49+
50+
for row in rows.into_iter() {
51+
schema_builder.append_value(row.schema.get_row().get_name());
52+
let table = row.table.get_row();
53+
name_builder.append_value(table.get_table_name());
54+
range_end_builder.append_option(
55+
table
56+
.build_range_end()
57+
.as_ref()
58+
.map(timestamp_nanos_or_panic),
59+
);
60+
seal_builder.append_option(table.seal_at().as_ref().map(timestamp_nanos_or_panic));
61+
}
62+
4263
vec![
43-
Box::new(|tables| {
44-
Arc::new(StringArray::from_iter_values(
45-
tables.iter().map(|row| row.schema.get_row().get_name()),
46-
))
47-
}),
48-
Box::new(|tables| {
49-
Arc::new(StringArray::from_iter_values(
50-
tables
51-
.iter()
52-
.map(|row| row.table.get_row().get_table_name()),
53-
))
54-
}),
55-
Box::new(|tables| {
56-
Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map(
57-
|row| {
58-
row.table
59-
.get_row()
60-
.build_range_end()
61-
.as_ref()
62-
.map(timestamp_nanos_or_panic)
63-
},
64-
)))
65-
}),
66-
Box::new(|tables| {
67-
Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map(
68-
|row| {
69-
row.table
70-
.get_row()
71-
.seal_at()
72-
.as_ref()
73-
.map(timestamp_nanos_or_panic)
74-
},
75-
)))
76-
}),
64+
Arc::new(schema_builder.finish()),
65+
Arc::new(name_builder.finish()),
66+
Arc::new(range_end_builder.finish()),
67+
Arc::new(seal_builder.finish()),
7768
]
7869
}
7970
}

rust/cubestore/cubestore/src/queryplanner/info_schema/rocksdb_properties.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::metastore::RocksPropertyRow;
22
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
33
use crate::CubeError;
44
use async_trait::async_trait;
5-
use datafusion::arrow::array::{ArrayRef, StringArray};
5+
use datafusion::arrow::array::{ArrayRef, StringBuilder};
66
use datafusion::arrow::datatypes::{DataType, Field};
77
use std::sync::Arc;
88

@@ -32,12 +32,12 @@ impl InfoSchemaTableDef for RocksDBPropertiesTableDef {
3232
&self,
3333
ctx: InfoSchemaTableDefContext,
3434
_limit: Option<usize>,
35-
) -> Result<Arc<Vec<Self::T>>, CubeError> {
36-
Ok(Arc::new(if self.for_cachestore {
35+
) -> Result<Vec<Self::T>, CubeError> {
36+
Ok(if self.for_cachestore {
3737
ctx.cache_store.rocksdb_properties().await?
3838
} else {
3939
ctx.meta_store.rocksdb_properties().await?
40-
}))
40+
})
4141
}
4242

4343
fn schema(&self) -> Vec<Field> {
@@ -47,18 +47,22 @@ impl InfoSchemaTableDef for RocksDBPropertiesTableDef {
4747
]
4848
}
4949

50-
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
50+
fn columns(&self, rows: Vec<Self::T>) -> Vec<ArrayRef> {
51+
let num_rows = rows.len();
52+
let mut key_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
53+
let mut value_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
54+
55+
for row in rows.into_iter() {
56+
key_builder.append_value(&row.key);
57+
match row.value.as_ref() {
58+
Some(v) => value_builder.append_value(v),
59+
None => value_builder.append_null(),
60+
}
61+
}
62+
5163
vec![
52-
Box::new(|items| {
53-
Arc::new(StringArray::from_iter_values(
54-
items.iter().map(|row| row.key.clone()),
55-
))
56-
}),
57-
Box::new(|items| {
58-
Arc::new(StringArray::from_iter(
59-
items.iter().map(|row| row.value.as_ref()),
60-
))
61-
}),
64+
Arc::new(key_builder.finish()),
65+
Arc::new(value_builder.finish()),
6266
]
6367
}
6468
}

rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::queryplanner::info_schema::timestamp_nanos_or_panic;
44
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
55
use crate::CubeError;
66
use async_trait::async_trait;
7-
use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
7+
use datafusion::arrow::array::{ArrayRef, StringBuilder, TimestampNanosecondBuilder};
88
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
99
use std::sync::Arc;
1010

@@ -18,8 +18,8 @@ impl InfoSchemaTableDef for SystemCacheTableDef {
1818
&self,
1919
ctx: InfoSchemaTableDefContext,
2020
limit: Option<usize>,
21-
) -> Result<Arc<Vec<Self::T>>, CubeError> {
22-
Ok(Arc::new(ctx.cache_store.cache_all(limit).await?))
21+
) -> Result<Vec<Self::T>, CubeError> {
22+
Ok(ctx.cache_store.cache_all(limit).await?)
2323
}
2424

2525
fn schema(&self) -> Vec<Field> {
@@ -35,33 +35,26 @@ impl InfoSchemaTableDef for SystemCacheTableDef {
3535
]
3636
}
3737

38-
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
38+
fn columns(&self, rows: Vec<Self::T>) -> Vec<ArrayRef> {
39+
let num_rows = rows.len();
40+
let mut key_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
41+
let mut prefix_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
42+
let mut expire_builder = TimestampNanosecondBuilder::with_capacity(num_rows);
43+
let mut value_builder = StringBuilder::with_capacity(num_rows, num_rows * 64);
44+
45+
for row in rows.into_iter() {
46+
let item = row.get_row();
47+
key_builder.append_value(item.get_key());
48+
prefix_builder.append_option(item.get_prefix().as_deref());
49+
expire_builder.append_option(item.get_expire().as_ref().map(timestamp_nanos_or_panic));
50+
value_builder.append_value(item.get_value());
51+
}
52+
3953
vec![
40-
Box::new(|items| {
41-
Arc::new(StringArray::from_iter(
42-
items.iter().map(|row| Some(row.get_row().get_key())),
43-
))
44-
}),
45-
Box::new(|items| {
46-
Arc::new(StringArray::from_iter(
47-
items.iter().map(|row| row.get_row().get_prefix().clone()),
48-
))
49-
}),
50-
Box::new(|items| {
51-
Arc::new(TimestampNanosecondArray::from_iter(items.iter().map(
52-
|row| {
53-
row.get_row()
54-
.get_expire()
55-
.as_ref()
56-
.map(timestamp_nanos_or_panic)
57-
},
58-
)))
59-
}),
60-
Box::new(|items| {
61-
Arc::new(StringArray::from_iter(
62-
items.iter().map(|row| Some(row.get_row().get_value())),
63-
))
64-
}),
54+
Arc::new(key_builder.finish()),
55+
Arc::new(prefix_builder.finish()),
56+
Arc::new(expire_builder.finish()),
57+
Arc::new(value_builder.finish()),
6558
]
6659
}
6760
}

0 commit comments

Comments
 (0)