Skip to content

Commit 8235c3b

Browse files
authored
Merge pull request #3151 from perspective-dev/sql-model-fixes
SQL model fixes
2 parents 9b70773 + 6c23a6b commit 8235c3b

9 files changed

Lines changed: 440 additions & 16 deletions

File tree

rust/perspective-client/src/rust/virtual_server/data.rs

Lines changed: 139 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ use arrow_array::builder::{
1717
BooleanBuilder, Float64Builder, Int32Builder, StringBuilder, TimestampMillisecondBuilder,
1818
};
1919
use arrow_array::{
20-
Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float64Array, Int32Array,
21-
Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
22-
TimestampNanosecondArray, TimestampSecondArray,
20+
Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
21+
Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch,
22+
StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
23+
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
24+
TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
25+
UInt64Array,
2326
};
2427
use arrow_ipc::reader::{FileReader, StreamReader};
2528
use arrow_ipc::writer::StreamWriter;
@@ -301,7 +304,10 @@ fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
301304
let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
302305
Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0)
303306
},
304-
_ => Scalar::String(format!("{:?}", array)),
307+
_ => {
308+
let scalar_arr = array.slice(row_idx, 1);
309+
Scalar::String(format!("{:?}", scalar_arr))
310+
},
305311
}
306312
}
307313

@@ -356,6 +362,47 @@ fn coerce_column(
356362
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
357363
array.clone(),
358364
)),
365+
DataType::Int8 => {
366+
let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
367+
let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
368+
Ok((
369+
Field::new(name, DataType::Int32, true),
370+
Arc::new(result) as ArrayRef,
371+
))
372+
},
373+
DataType::Int16 => {
374+
let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
375+
let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
376+
Ok((
377+
Field::new(name, DataType::Int32, true),
378+
Arc::new(result) as ArrayRef,
379+
))
380+
},
381+
DataType::UInt8 => {
382+
let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
383+
let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
384+
Ok((
385+
Field::new(name, DataType::Int32, true),
386+
Arc::new(result) as ArrayRef,
387+
))
388+
},
389+
DataType::UInt16 => {
390+
let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
391+
let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
392+
Ok((
393+
Field::new(name, DataType::Int32, true),
394+
Arc::new(result) as ArrayRef,
395+
))
396+
},
397+
DataType::UInt32 => {
398+
let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
399+
let result: Int64Array = arr.iter().map(|v| v.map(|v| v as i64)).collect();
400+
let result: Float64Array = result.iter().map(|v| v.map(|v| v as f64)).collect();
401+
Ok((
402+
Field::new(name, DataType::Float64, true),
403+
Arc::new(result) as ArrayRef,
404+
))
405+
},
359406
DataType::Int64 => {
360407
let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
361408
let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
@@ -364,6 +411,22 @@ fn coerce_column(
364411
Arc::new(result) as ArrayRef,
365412
))
366413
},
414+
DataType::UInt64 => {
415+
let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
416+
let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
417+
Ok((
418+
Field::new(name, DataType::Float64, true),
419+
Arc::new(result) as ArrayRef,
420+
))
421+
},
422+
DataType::Float32 => {
423+
let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
424+
let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
425+
Ok((
426+
Field::new(name, DataType::Float64, true),
427+
Arc::new(result) as ArrayRef,
428+
))
429+
},
367430
DataType::Decimal128(_, scale) => {
368431
let scale = *scale;
369432
let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
@@ -374,13 +437,77 @@ fn coerce_column(
374437
Arc::new(result) as ArrayRef,
375438
))
376439
},
440+
DataType::Date64 => {
441+
let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
442+
let result: Date32Array = arr
443+
.iter()
444+
.map(|v| v.map(|v| (v / 86_400_000) as i32))
445+
.collect();
446+
Ok((
447+
Field::new(name, DataType::Date32, true),
448+
Arc::new(result) as ArrayRef,
449+
))
450+
},
377451
DataType::Timestamp(unit, _) => {
378452
let casted = timestamp_to_millis(array, unit);
379453
Ok((
380454
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
381455
casted,
382456
))
383457
},
458+
DataType::Time32(TimeUnit::Second) => {
459+
let arr = array.as_any().downcast_ref::<Time32SecondArray>().unwrap();
460+
let result: TimestampMillisecondArray =
461+
arr.iter().map(|v| v.map(|v| v as i64 * 1_000)).collect();
462+
Ok((
463+
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
464+
Arc::new(result) as ArrayRef,
465+
))
466+
},
467+
DataType::Time32(TimeUnit::Millisecond) => {
468+
let arr = array
469+
.as_any()
470+
.downcast_ref::<Time32MillisecondArray>()
471+
.unwrap();
472+
let result: TimestampMillisecondArray =
473+
arr.iter().map(|v| v.map(|v| v as i64)).collect();
474+
Ok((
475+
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
476+
Arc::new(result) as ArrayRef,
477+
))
478+
},
479+
DataType::Time64(TimeUnit::Microsecond) => {
480+
let arr = array
481+
.as_any()
482+
.downcast_ref::<Time64MicrosecondArray>()
483+
.unwrap();
484+
let result: TimestampMillisecondArray =
485+
arr.iter().map(|v| v.map(|v| v / 1_000)).collect();
486+
Ok((
487+
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
488+
Arc::new(result) as ArrayRef,
489+
))
490+
},
491+
DataType::Time64(TimeUnit::Nanosecond) => {
492+
let arr = array
493+
.as_any()
494+
.downcast_ref::<Time64NanosecondArray>()
495+
.unwrap();
496+
let result: TimestampMillisecondArray =
497+
arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect();
498+
Ok((
499+
Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
500+
Arc::new(result) as ArrayRef,
501+
))
502+
},
503+
DataType::LargeUtf8 => {
504+
let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
505+
let result: StringArray = arr.iter().map(|v| v.map(|v| v.to_string())).collect();
506+
Ok((
507+
Field::new(name, DataType::Utf8, true),
508+
Arc::new(result) as ArrayRef,
509+
))
510+
},
384511
dt => {
385512
tracing::warn!(
386513
"Coercing unknown Arrow type {} to Utf8 for column '{}'",
@@ -393,7 +520,8 @@ fn coerce_column(
393520
if array.is_null(i) {
394521
builder.append_null();
395522
} else {
396-
builder.append_value(format!("{:?}", array));
523+
let scalar_arr = array.slice(i, 1);
524+
builder.append_value(format!("{:?}", scalar_arr));
397525
}
398526
}
399527
Ok((
@@ -518,7 +646,11 @@ impl VirtualDataSlice {
518646
}
519647

520648
let new_schema = Arc::new(Schema::new(new_fields));
521-
self.frozen = Some(RecordBatch::try_new(new_schema, new_arrays)?);
649+
self.frozen = if new_arrays.is_empty() {
650+
Some(RecordBatch::new_empty(new_schema))
651+
} else {
652+
Some(RecordBatch::try_new(new_schema, new_arrays)?)
653+
};
522654
Ok(())
523655
}
524656

@@ -658,7 +790,7 @@ impl VirtualDataSlice {
658790
}
659791

660792
/// Serializes the data to a column-oriented JSON string.
661-
pub(crate) fn render_to_columns_json(&mut self) -> Result<String, Box<dyn Error>> {
793+
pub fn render_to_columns_json(&mut self) -> Result<String, Box<dyn Error>> {
662794
let batch = self.freeze().clone();
663795
let schema = batch.schema();
664796
let mut map = serde_json::Map::new();

rust/perspective-client/src/rust/virtual_server/generic_sql_model/table_make_view.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,17 @@ impl<'a> ViewQueryContext<'a> {
228228
)
229229
};
230230

231-
let pivot_using = self.select_clauses().join(", ");
231+
let pivot_using: String = self
232+
.config
233+
.columns
234+
.iter()
235+
.flatten()
236+
.map(|col| {
237+
let escaped = col.replace('"', "\"\"").replace('_', "-");
238+
format!("first(\"{}\") as \"{}\"", escaped, escaped)
239+
})
240+
.collect::<Vec<_>>()
241+
.join(", ");
232242
let mut row_id_cols = self.row_path_aliases.clone();
233243
if !self.is_flat_mode() {
234244
row_id_cols.push("__GROUPING_ID__".to_string());

rust/perspective-js/test/js/duckdb/split_by.spec.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,50 @@ describeDuckDB("split_by", (getClient) => {
6565
await view.delete();
6666
});
6767

68+
// https://github.com/perspective-dev/perspective/issues/3148
69+
test("split_by with group_by and count aggregate", async function () {
70+
const table = await getClient().open_table("memory.superstore");
71+
const view = await table.view({
72+
columns: ["Row ID"],
73+
split_by: ["Region"],
74+
group_by: ["Category"],
75+
aggregates: { "Row ID": "count" },
76+
});
77+
78+
const json = await view.to_json({ start_row: 0, end_row: 4 });
79+
expect(json).toEqual([
80+
{
81+
__ROW_PATH__: [],
82+
"Central|Row ID": 2323,
83+
"East|Row ID": 2848,
84+
"South|Row ID": 1620,
85+
"West|Row ID": 3203,
86+
},
87+
{
88+
__ROW_PATH__: ["Furniture"],
89+
"Central|Row ID": 481,
90+
"East|Row ID": 601,
91+
"South|Row ID": 332,
92+
"West|Row ID": 707,
93+
},
94+
{
95+
__ROW_PATH__: ["Office Supplies"],
96+
"Central|Row ID": 1422,
97+
"East|Row ID": 1712,
98+
"South|Row ID": 995,
99+
"West|Row ID": 1897,
100+
},
101+
{
102+
__ROW_PATH__: ["Technology"],
103+
"Central|Row ID": 420,
104+
"East|Row ID": 535,
105+
"South|Row ID": 293,
106+
"West|Row ID": 599,
107+
},
108+
]);
109+
await view.delete();
110+
});
111+
68112
test.skip("split_by without group_by", async function () {
69113
const table = await getClient().open_table("memory.superstore");
70114
const view = await table.view({

rust/perspective-python/perspective/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"AsyncClient",
2323
"AsyncServer",
2424
"GenericSQLVirtualServerModel",
25+
"VirtualDataSlice",
2526
"VirtualServer",
2627
"num_cpus",
2728
"set_num_cpus",
@@ -354,6 +355,7 @@ def delete_callback():
354355
AsyncServer,
355356
AsyncClient,
356357
VirtualServer,
358+
VirtualDataSlice,
357359
GenericSQLVirtualServerModel,
358360
# NOTE: these are classes without constructors,
359361
# so we import them just for type hinting

0 commit comments

Comments
 (0)