Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
ResultArrayWrapper,
ResultMultiWrapper,
ResultWrapper,
rowsToColumnar,
} from '@cubejs-backend/native';
import type {
Application as ExpressApplication,
Expand Down Expand Up @@ -129,40 +130,6 @@ function systemAsyncHandler(handler: (req: Request & { context: ExtendedRequestC
};
}

function rowsToColumnar(rawData: any): { members: string[]; columns: any[][] } {
let rows: any[];

if (Array.isArray(rawData)) {
rows = rawData;
} else if (rawData) {
rows = Array.from(rawData as Iterable<any>);
} else {
rows = [];
}

const rowCount = rows.length;
if (rowCount === 0) {
return { members: [], columns: [] };
}

const members = Object.keys(rows[0]);
const memberCount = members.length;
const columns: any[][] = new Array(memberCount);

for (let j = 0; j < memberCount; j++) {
const member = members[j];
const col = new Array(rowCount);

for (let i = 0; i < rowCount; i++) {
col[i] = rows[i][member];
}

columns[j] = col;
}

return { members, columns };
}

// Prepared CheckAuthFn, default or from config: always async
type PreparedCheckAuthFn = (ctx: any, authorization?: string) => Promise<{
securityContext: any;
Expand Down
52 changes: 48 additions & 4 deletions packages/cubejs-backend-native/js/ResultWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,45 @@ export interface DataResult {
getResults(): ResultWrapper[];
}

export interface JsRawColumnarData {
members: string[];
columns: any[][];
}

export function rowsToColumnar(rawData: any): JsRawColumnarData {
let rows: any[];

if (Array.isArray(rawData)) {
rows = rawData;
} else if (rawData) {
rows = Array.from(rawData as Iterable<any>);
} else {
rows = [];
}

const rowCount = rows.length;
if (rowCount === 0) {
return { members: [], columns: [] };
}

const members = Object.keys(rows[0]);
const memberCount = members.length;
const columns: any[][] = new Array(memberCount);

for (let j = 0; j < memberCount; j++) {
const member = members[j];
const col = new Array(rowCount);

for (let i = 0; i < rowCount; i++) {
col[i] = rows[i][member];
}

columns[j] = col;
}

return { members, columns };
}

class BaseWrapper {
public readonly isWrapper: boolean = true;
}
Expand Down Expand Up @@ -140,14 +179,19 @@ export class ResultWrapper extends BaseWrapper implements DataResult {
return [this.nativeReference];
}

// Pivot to columnar before serializing: the row-oriented form repeats
// every column name on every row, which inflates JSON size and forces
// the Rust side to allocate a per-row map before transposing back to
// its native columnar `QueryResult` representation.
//
// Serialize to a Buffer so the Rust side can decode via
// serde_json::from_slice instead of walking a JsArray through the
// serde_json::from_slice instead of walking a JsValue through the
// Neon bridge with JsValueDeserializer. On 5 MB of AoO rows
// (~21k rows × 8 fields) the JsArray walk costs ~80 ms locally;
// (~21k rows × 8 fields) the JsValue walk costs ~80 ms locally;
// Buffer + serde_json is ~7× faster (M3 MAX) and tracks V8's JSON.parse
// (~11 ms on the same payload). On a real server it should be 3-6× slower,
// so avoiding the JsArray walk matters even more there.
return [Buffer.from(JSON.stringify(this.jsResult))];
// so avoiding the JsValue walk matters even more there.
return [Buffer.from(JSON.stringify(rowsToColumnar(this.jsResult)))];
}

public setTransformData(td: any) {
Expand Down
69 changes: 24 additions & 45 deletions packages/cubejs-backend-native/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cubeorchestrator::query_result_transform::{
DBResponsePrimitive, DBResponseValue, RequestResultData, RequestResultDataMulti,
TransformedData,
};
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
use cubeorchestrator::transport::{JsRawColumnarData, TransformDataRequest};
use cubesql::compile::engine::df::scan::{ColumnarValueObject, FieldValue, ValueObject};
use cubesql::CubeError;
use neon::context::{Context, FunctionContext, ModuleContext};
Expand Down Expand Up @@ -87,41 +87,27 @@ impl ResultWrapper {

let raw_data_js = raw_data_js_arr.first().unwrap();

let query_result =
if let Ok(js_box) = raw_data_js.downcast::<JsBox<Arc<QueryResult>>, _>(cx) {
Arc::clone(&js_box)
} else if let Ok(js_buffer) = raw_data_js.downcast::<JsBuffer, _>(cx) {
let bytes = js_buffer.as_slice(cx);
let js_raw_data: JsRawData = serde_json::from_slice(bytes).map_err(|e| {
CubeError::internal(format!(
"Can't parse raw data JSON from JS ResultWrapper: {}",
e
))
})?;

QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_cube_err("Can't build results data from JS rawData")?
} else if let Ok(js_array) = raw_data_js.downcast::<JsArray, _>(cx) {
let deserializer = JsValueDeserializer::new(cx, js_array.upcast());
let js_raw_data: JsRawData = match Deserialize::deserialize(deserializer) {
Ok(data) => data,
Err(_) => {
return Err(CubeError::internal(
"Can't deserialize results raw data from JS ResultWrapper object"
.to_string(),
));
}
};
let query_result = if let Ok(js_box) =
raw_data_js.downcast::<JsBox<Arc<QueryResult>>, _>(cx)
{
Arc::clone(&js_box)
} else if let Ok(js_buffer) = raw_data_js.downcast::<JsBuffer, _>(cx) {
let bytes = js_buffer.as_slice(cx);
let js_raw_data: JsRawColumnarData = serde_json::from_slice(bytes).map_err(|e| {
CubeError::internal(format!(
"Can't parse raw data JSON from JS ResultWrapper: {}",
e
))
})?;

QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_cube_err("Can't build results data from JS rawData")?
} else {
return Err(CubeError::internal(
"Can't deserialize results raw data from JS ResultWrapper object".to_string(),
));
};
QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_cube_err("Can't build results data from JS rawData")?
} else {
return Err(CubeError::internal(
"Can't deserialize results raw data from JS ResultWrapper object".to_string(),
));
};

Ok(Self {
transform_data: transform_request,
Expand Down Expand Up @@ -315,21 +301,14 @@ fn extract_query_result(
Ok(Arc::clone(&js_box))
} else if let Ok(js_buffer) = data_arg.downcast::<JsBuffer, _>(cx) {
let bytes = js_buffer.as_slice(cx);
let js_raw_data: JsRawData = serde_json::from_slice(bytes)?;

QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_err(anyhow::Error::from)
} else if let Ok(js_array) = data_arg.downcast::<JsArray, _>(cx) {
let deserializer = JsValueDeserializer::new(cx, js_array.upcast());
let js_raw_data: JsRawData = Deserialize::deserialize(deserializer)?;
let js_raw_data: JsRawColumnarData = serde_json::from_slice(bytes)?;

QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_err(anyhow::Error::from)
} else {
Err(anyhow::anyhow!(
"Second argument must be an Array of JsBox<Arc<QueryResult>> or JsArray"
"Second argument must be a JsBox<Arc<QueryResult>> or a JsBuffer with columnar JsRawColumnarData JSON"
))
}
}
Expand Down Expand Up @@ -357,7 +336,7 @@ pub fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
for (i, row) in result.rows.iter().enumerate() {
let js_row = cx.execute_scoped(|mut cx| {
let js_row = JsObject::new(&mut cx);
for (key, value) in result.columns.iter().zip(row.iter()) {
for (key, value) in result.members.iter().zip(row.iter()) {
let js_key = cx.string(key);
let js_value: Handle<'_, JsValue> = match value {
DBResponseValue::Primitive(DBResponsePrimitive::Null) => cx.null().upcast(),
Expand Down
124 changes: 95 additions & 29 deletions rust/cube/cubeorchestrator/benches/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through
use cubeorchestrator::query_message_parser::QueryResult;
use cubeorchestrator::query_result_transform::{DBResponsePrimitive, TransformedData};
use cubeorchestrator::transport::{
ConfigItem, JsRawData, MemberOrMemberExpression, NormalizedQuery, QueryType, ResultType,
TransformDataRequest,
ConfigItem, JsRawColumnarData, MemberOrMemberExpression, NormalizedQuery, QueryType,
ResultType, TransformDataRequest,
};
use indexmap::IndexMap;

const ROW_COUNTS: &[usize] = &[1_000, 10_000, 50_000, 100_000];
const COLUMN_COUNTS: &[usize] = &[8, 16, 32, 64];
Expand Down Expand Up @@ -171,40 +170,48 @@ fn build_dataset(
dimensions: &[(String, String)],
measures: &[(String, String)],
time_dims: &[TimeColumn],
) -> JsRawData {
) -> JsRawColumnarData {
let total_cols = dimensions.len() + measures.len() + time_dims.len();
let mut rows = Vec::with_capacity(row_count);

for i in 0..row_count {
let mut row = IndexMap::with_capacity(total_cols);
for (j, (_, alias)) in dimensions.iter().enumerate() {
row.insert(
alias.clone(),
DBResponsePrimitive::String(format!("dim_{}_{}", j, i % 1000)),
);
let mut members = Vec::with_capacity(total_cols);
let mut columns: Vec<Vec<DBResponsePrimitive>> = Vec::with_capacity(total_cols);

for (j, (_, alias)) in dimensions.iter().enumerate() {
members.push(alias.clone());
let mut col = Vec::with_capacity(row_count);
for i in 0..row_count {
col.push(DBResponsePrimitive::String(format!(
"dim_{}_{}",
j,
i % 1000
)));
}
for (j, (_, alias)) in measures.iter().enumerate() {
row.insert(
alias.clone(),
DBResponsePrimitive::Number(((i * (j + 1)) as f64) * 0.5),
);
columns.push(col);
}
for (j, (_, alias)) in measures.iter().enumerate() {
members.push(alias.clone());
let mut col = Vec::with_capacity(row_count);
for i in 0..row_count {
col.push(DBResponsePrimitive::Number(((i * (j + 1)) as f64) * 0.5));
}
for (j, td) in time_dims.iter().enumerate() {
columns.push(col);
}
for (j, td) in time_dims.iter().enumerate() {
members.push(td.alias.clone());
let mut col = Vec::with_capacity(row_count);
for i in 0..row_count {
// Format mirrors typical CubeStore output: ISO-8601 with millisecond
// fractional and no timezone. None of `transform_value`'s six chrono
// parsers fully match this shape, so the function falls through to
// `s.clone()` — measuring the production worst case.
// fractional and no timezone.
let month = ((i + j) % 12) + 1;
let day = ((i / 12) % 28) + 1;
row.insert(
td.alias.clone(),
DBResponsePrimitive::String(format!("2024-{:02}-{:02}T00:00:00.000", month, day)),
);
col.push(DBResponsePrimitive::String(format!(
"2024-{:02}-{:02}T00:00:00.000",
month, day
)));
}
rows.push(row);
columns.push(col);
}

rows
JsRawColumnarData { members, columns }
}

fn bench_transform(c: &mut Criterion) {
Expand Down Expand Up @@ -305,5 +312,64 @@ fn bench_transform_time_scenarios(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, bench_transform, bench_transform_time_scenarios);
/// Bench the JS→Rust raw-data ingest path: `serde_json::from_slice` then
/// `QueryResult::from_js_raw_data`. This is the part of the pipeline that the
/// columnar wire format change actually touches; `bench_transform` above
/// consumes an already-built `QueryResult` and is unaffected.
fn bench_from_js_raw_data(c: &mut Criterion) {
let mut group = c.benchmark_group("QueryResult::from_js_raw_data");

let combos: &[(usize, usize)] = &[(8, 10_000), (16, 10_000), (16, 100_000), (32, 100_000)];

for &(col_count, row_count) in combos {
let (dim_count, measure_count) = split_dim_measure(col_count);
let dimensions = make_member_aliases("dim", dim_count);
let measures = make_member_aliases("measure", measure_count);

let dataset = build_dataset(row_count, &dimensions, &measures, &[]);
let payload = serde_json::to_vec(&dataset).expect("to_vec");
let payload_len = payload.len();

eprintln!(
"from_js_raw_data: c{:02}_r{} payload_bytes={}",
col_count, row_count, payload_len
);

group.throughput(Throughput::Elements((row_count * col_count) as u64));

let id_param = format!("c{:02}_r{}", col_count, row_count);

// Parse only: serde_json::from_slice into the wire type.
group.bench_with_input(BenchmarkId::new("parse_only", &id_param), &(), |b, _| {
b.iter(|| {
let parsed: JsRawColumnarData =
serde_json::from_slice(black_box(&payload)).expect("from_slice");
black_box(parsed);
});
});

// End-to-end: parse + transpose into QueryResult — what the Neon bridge does.
group.bench_with_input(
BenchmarkId::new("parse_plus_build", &id_param),
&(),
|b, _| {
b.iter(|| {
let parsed: JsRawColumnarData =
serde_json::from_slice(black_box(&payload)).expect("from_slice");
let built = QueryResult::from_js_raw_data(parsed).expect("from_js_raw_data");
black_box(built);
});
},
);
}

group.finish();
}

criterion_group!(
benches,
bench_transform,
bench_transform_time_scenarios,
bench_from_js_raw_data
);
criterion_main!(benches);
Loading
Loading