@@ -17,9 +17,12 @@ use arrow_array::builder::{
1717 BooleanBuilder , Float64Builder , Int32Builder , StringBuilder , TimestampMillisecondBuilder ,
1818} ;
1919use arrow_array:: {
20- Array , ArrayRef , BooleanArray , Date32Array , Decimal128Array , Float64Array , Int32Array ,
21- Int64Array , RecordBatch , StringArray , TimestampMicrosecondArray , TimestampMillisecondArray ,
22- TimestampNanosecondArray , TimestampSecondArray ,
20+ Array , ArrayRef , BooleanArray , Date32Array , Date64Array , Decimal128Array , Float32Array ,
21+ Float64Array , Int8Array , Int16Array , Int32Array , Int64Array , LargeStringArray , RecordBatch ,
22+ StringArray , Time32MillisecondArray , Time32SecondArray , Time64MicrosecondArray ,
23+ Time64NanosecondArray , TimestampMicrosecondArray , TimestampMillisecondArray ,
24+ TimestampNanosecondArray , TimestampSecondArray , UInt8Array , UInt16Array , UInt32Array ,
25+ UInt64Array ,
2326} ;
2427use arrow_ipc:: reader:: { FileReader , StreamReader } ;
2528use arrow_ipc:: writer:: StreamWriter ;
@@ -301,7 +304,10 @@ fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
301304 let arr = array. as_any ( ) . downcast_ref :: < Date32Array > ( ) . unwrap ( ) ;
302305 Scalar :: Float ( arr. value ( row_idx) as f64 * 86_400_000.0 )
303306 } ,
304- _ => Scalar :: String ( format ! ( "{:?}" , array) ) ,
307+ _ => {
308+ let scalar_arr = array. slice ( row_idx, 1 ) ;
309+ Scalar :: String ( format ! ( "{:?}" , scalar_arr) )
310+ } ,
305311 }
306312}
307313
@@ -356,6 +362,47 @@ fn coerce_column(
356362 Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
357363 array. clone ( ) ,
358364 ) ) ,
365+ DataType :: Int8 => {
366+ let arr = array. as_any ( ) . downcast_ref :: < Int8Array > ( ) . unwrap ( ) ;
367+ let result: Int32Array = arr. iter ( ) . map ( |v| v. map ( |v| v as i32 ) ) . collect ( ) ;
368+ Ok ( (
369+ Field :: new ( name, DataType :: Int32 , true ) ,
370+ Arc :: new ( result) as ArrayRef ,
371+ ) )
372+ } ,
373+ DataType :: Int16 => {
374+ let arr = array. as_any ( ) . downcast_ref :: < Int16Array > ( ) . unwrap ( ) ;
375+ let result: Int32Array = arr. iter ( ) . map ( |v| v. map ( |v| v as i32 ) ) . collect ( ) ;
376+ Ok ( (
377+ Field :: new ( name, DataType :: Int32 , true ) ,
378+ Arc :: new ( result) as ArrayRef ,
379+ ) )
380+ } ,
381+ DataType :: UInt8 => {
382+ let arr = array. as_any ( ) . downcast_ref :: < UInt8Array > ( ) . unwrap ( ) ;
383+ let result: Int32Array = arr. iter ( ) . map ( |v| v. map ( |v| v as i32 ) ) . collect ( ) ;
384+ Ok ( (
385+ Field :: new ( name, DataType :: Int32 , true ) ,
386+ Arc :: new ( result) as ArrayRef ,
387+ ) )
388+ } ,
389+ DataType :: UInt16 => {
390+ let arr = array. as_any ( ) . downcast_ref :: < UInt16Array > ( ) . unwrap ( ) ;
391+ let result: Int32Array = arr. iter ( ) . map ( |v| v. map ( |v| v as i32 ) ) . collect ( ) ;
392+ Ok ( (
393+ Field :: new ( name, DataType :: Int32 , true ) ,
394+ Arc :: new ( result) as ArrayRef ,
395+ ) )
396+ } ,
397+ DataType :: UInt32 => {
398+ let arr = array. as_any ( ) . downcast_ref :: < UInt32Array > ( ) . unwrap ( ) ;
399+ let result: Int64Array = arr. iter ( ) . map ( |v| v. map ( |v| v as i64 ) ) . collect ( ) ;
400+ let result: Float64Array = result. iter ( ) . map ( |v| v. map ( |v| v as f64 ) ) . collect ( ) ;
401+ Ok ( (
402+ Field :: new ( name, DataType :: Float64 , true ) ,
403+ Arc :: new ( result) as ArrayRef ,
404+ ) )
405+ } ,
359406 DataType :: Int64 => {
360407 let arr = array. as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) ;
361408 let result: Float64Array = arr. iter ( ) . map ( |v| v. map ( |v| v as f64 ) ) . collect ( ) ;
@@ -364,6 +411,22 @@ fn coerce_column(
364411 Arc :: new ( result) as ArrayRef ,
365412 ) )
366413 } ,
414+ DataType :: UInt64 => {
415+ let arr = array. as_any ( ) . downcast_ref :: < UInt64Array > ( ) . unwrap ( ) ;
416+ let result: Float64Array = arr. iter ( ) . map ( |v| v. map ( |v| v as f64 ) ) . collect ( ) ;
417+ Ok ( (
418+ Field :: new ( name, DataType :: Float64 , true ) ,
419+ Arc :: new ( result) as ArrayRef ,
420+ ) )
421+ } ,
422+ DataType :: Float32 => {
423+ let arr = array. as_any ( ) . downcast_ref :: < Float32Array > ( ) . unwrap ( ) ;
424+ let result: Float64Array = arr. iter ( ) . map ( |v| v. map ( |v| v as f64 ) ) . collect ( ) ;
425+ Ok ( (
426+ Field :: new ( name, DataType :: Float64 , true ) ,
427+ Arc :: new ( result) as ArrayRef ,
428+ ) )
429+ } ,
367430 DataType :: Decimal128 ( _, scale) => {
368431 let scale = * scale;
369432 let arr = array. as_any ( ) . downcast_ref :: < Decimal128Array > ( ) . unwrap ( ) ;
@@ -374,13 +437,77 @@ fn coerce_column(
374437 Arc :: new ( result) as ArrayRef ,
375438 ) )
376439 } ,
440+ DataType :: Date64 => {
441+ let arr = array. as_any ( ) . downcast_ref :: < Date64Array > ( ) . unwrap ( ) ;
442+ let result: Date32Array = arr
443+ . iter ( )
444+ . map ( |v| v. map ( |v| ( v / 86_400_000 ) as i32 ) )
445+ . collect ( ) ;
446+ Ok ( (
447+ Field :: new ( name, DataType :: Date32 , true ) ,
448+ Arc :: new ( result) as ArrayRef ,
449+ ) )
450+ } ,
377451 DataType :: Timestamp ( unit, _) => {
378452 let casted = timestamp_to_millis ( array, unit) ;
379453 Ok ( (
380454 Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
381455 casted,
382456 ) )
383457 } ,
458+ DataType :: Time32 ( TimeUnit :: Second ) => {
459+ let arr = array. as_any ( ) . downcast_ref :: < Time32SecondArray > ( ) . unwrap ( ) ;
460+ let result: TimestampMillisecondArray =
461+ arr. iter ( ) . map ( |v| v. map ( |v| v as i64 * 1_000 ) ) . collect ( ) ;
462+ Ok ( (
463+ Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
464+ Arc :: new ( result) as ArrayRef ,
465+ ) )
466+ } ,
467+ DataType :: Time32 ( TimeUnit :: Millisecond ) => {
468+ let arr = array
469+ . as_any ( )
470+ . downcast_ref :: < Time32MillisecondArray > ( )
471+ . unwrap ( ) ;
472+ let result: TimestampMillisecondArray =
473+ arr. iter ( ) . map ( |v| v. map ( |v| v as i64 ) ) . collect ( ) ;
474+ Ok ( (
475+ Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
476+ Arc :: new ( result) as ArrayRef ,
477+ ) )
478+ } ,
479+ DataType :: Time64 ( TimeUnit :: Microsecond ) => {
480+ let arr = array
481+ . as_any ( )
482+ . downcast_ref :: < Time64MicrosecondArray > ( )
483+ . unwrap ( ) ;
484+ let result: TimestampMillisecondArray =
485+ arr. iter ( ) . map ( |v| v. map ( |v| v / 1_000 ) ) . collect ( ) ;
486+ Ok ( (
487+ Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
488+ Arc :: new ( result) as ArrayRef ,
489+ ) )
490+ } ,
491+ DataType :: Time64 ( TimeUnit :: Nanosecond ) => {
492+ let arr = array
493+ . as_any ( )
494+ . downcast_ref :: < Time64NanosecondArray > ( )
495+ . unwrap ( ) ;
496+ let result: TimestampMillisecondArray =
497+ arr. iter ( ) . map ( |v| v. map ( |v| v / 1_000_000 ) ) . collect ( ) ;
498+ Ok ( (
499+ Field :: new ( name, DataType :: Timestamp ( TimeUnit :: Millisecond , None ) , true ) ,
500+ Arc :: new ( result) as ArrayRef ,
501+ ) )
502+ } ,
503+ DataType :: LargeUtf8 => {
504+ let arr = array. as_any ( ) . downcast_ref :: < LargeStringArray > ( ) . unwrap ( ) ;
505+ let result: StringArray = arr. iter ( ) . map ( |v| v. map ( |v| v. to_string ( ) ) ) . collect ( ) ;
506+ Ok ( (
507+ Field :: new ( name, DataType :: Utf8 , true ) ,
508+ Arc :: new ( result) as ArrayRef ,
509+ ) )
510+ } ,
384511 dt => {
385512 tracing:: warn!(
386513 "Coercing unknown Arrow type {} to Utf8 for column '{}'" ,
@@ -393,7 +520,8 @@ fn coerce_column(
393520 if array. is_null ( i) {
394521 builder. append_null ( ) ;
395522 } else {
396- builder. append_value ( format ! ( "{:?}" , array) ) ;
523+ let scalar_arr = array. slice ( i, 1 ) ;
524+ builder. append_value ( format ! ( "{:?}" , scalar_arr) ) ;
397525 }
398526 }
399527 Ok ( (
@@ -518,7 +646,11 @@ impl VirtualDataSlice {
518646 }
519647
520648 let new_schema = Arc :: new ( Schema :: new ( new_fields) ) ;
521- self . frozen = Some ( RecordBatch :: try_new ( new_schema, new_arrays) ?) ;
649+ self . frozen = if new_arrays. is_empty ( ) {
650+ Some ( RecordBatch :: new_empty ( new_schema) )
651+ } else {
652+ Some ( RecordBatch :: try_new ( new_schema, new_arrays) ?)
653+ } ;
522654 Ok ( ( ) )
523655 }
524656
@@ -658,7 +790,7 @@ impl VirtualDataSlice {
658790 }
659791
660792 /// Serializes the data to a column-oriented JSON string.
661- pub ( crate ) fn render_to_columns_json ( & mut self ) -> Result < String , Box < dyn Error > > {
793+ pub fn render_to_columns_json ( & mut self ) -> Result < String , Box < dyn Error > > {
662794 let batch = self . freeze ( ) . clone ( ) ;
663795 let schema = batch. schema ( ) ;
664796 let mut map = serde_json:: Map :: new ( ) ;
0 commit comments