Skip to content

Commit d65d008

Browse files
authored
feat(reader): support timestamp type in create_column (#2180)
## Which issue does this PR close? Similar to #1847 - Closes #. ## What changes are included in this PR? - RecordBatchTransformer does not support timestamp type. This PR adds logic to create_column in the specific scenario for a schema evolution with a new timestamp column. ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> 2 unit tests test_create_timestamp_microsecond_with_timezone_array_repeated and test_create_timestamp_microsecond_array_repeated are added.
1 parent 3bb3a1b commit d65d008

1 file changed

Lines changed: 115 additions & 1 deletion

File tree

crates/iceberg/src/arrow/value.rs

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::{
2424
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
2525
};
2626
use arrow_buffer::NullBuffer;
27-
use arrow_schema::{DataType, FieldRef};
27+
use arrow_schema::{DataType, FieldRef, TimeUnit};
2828
use uuid::Uuid;
2929

3030
use super::get_field_id_from_metadata;
@@ -645,6 +645,38 @@ pub(crate) fn create_primitive_array_single_element(
645645
Ok(Arc::new(Int64Array::from(vec![*v])))
646646
}
647647
(DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
648+
(DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
649+
let array = TimestampMicrosecondArray::from(vec![*v]);
650+
if let Some(timezone) = timezone {
651+
Ok(Arc::new(array.with_timezone(timezone.clone())))
652+
} else {
653+
Ok(Arc::new(array))
654+
}
655+
}
656+
(DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
657+
let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
658+
if let Some(timezone) = timezone {
659+
Ok(Arc::new(array.with_timezone(timezone.clone())))
660+
} else {
661+
Ok(Arc::new(array))
662+
}
663+
}
664+
(DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
665+
let array = TimestampNanosecondArray::from(vec![*v]);
666+
if let Some(timezone) = timezone {
667+
Ok(Arc::new(array.with_timezone(timezone.clone())))
668+
} else {
669+
Ok(Arc::new(array))
670+
}
671+
}
672+
(DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
673+
let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
674+
if let Some(timezone) = timezone {
675+
Ok(Arc::new(array.with_timezone(timezone.clone())))
676+
} else {
677+
Ok(Arc::new(array))
678+
}
679+
}
648680
(DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
649681
Ok(Arc::new(Float32Array::from(vec![v.0])))
650682
}
@@ -720,6 +752,22 @@ pub(crate) fn create_primitive_array_single_element(
720752
DataType::Int64 => {
721753
Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
722754
}
755+
DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
756+
let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
757+
if let Some(timezone) = timezone {
758+
Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
759+
} else {
760+
Ok(Arc::new(array) as ArrayRef)
761+
}
762+
}
763+
DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
764+
let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
765+
if let Some(timezone) = timezone {
766+
Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
767+
} else {
768+
Ok(Arc::new(array) as ArrayRef)
769+
}
770+
}
723771
DataType::Float32 => {
724772
Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
725773
}
@@ -793,6 +841,46 @@ pub(crate) fn create_primitive_array_repeated(
793841
let vals: Vec<Option<i64>> = vec![None; num_rows];
794842
Arc::new(Int64Array::from(vals))
795843
}
844+
(
845+
DataType::Timestamp(TimeUnit::Microsecond, timezone),
846+
Some(PrimitiveLiteral::Long(value)),
847+
) => {
848+
let array = TimestampMicrosecondArray::from(vec![*value; num_rows]);
849+
if let Some(timezone) = timezone {
850+
Arc::new(array.with_timezone(timezone.clone()))
851+
} else {
852+
Arc::new(array)
853+
}
854+
}
855+
(DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
856+
let vals: Vec<Option<i64>> = vec![None; num_rows];
857+
let array = TimestampMicrosecondArray::from(vals);
858+
if let Some(timezone) = timezone {
859+
Arc::new(array.with_timezone(timezone.clone()))
860+
} else {
861+
Arc::new(array)
862+
}
863+
}
864+
(
865+
DataType::Timestamp(TimeUnit::Nanosecond, timezone),
866+
Some(PrimitiveLiteral::Long(value)),
867+
) => {
868+
let array = TimestampNanosecondArray::from(vec![*value; num_rows]);
869+
if let Some(timezone) = timezone {
870+
Arc::new(array.with_timezone(timezone.clone()))
871+
} else {
872+
Arc::new(array)
873+
}
874+
}
875+
(DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
876+
let vals: Vec<Option<i64>> = vec![None; num_rows];
877+
let array = TimestampNanosecondArray::from(vals);
878+
if let Some(timezone) = timezone {
879+
Arc::new(array.with_timezone(timezone.clone()))
880+
} else {
881+
Arc::new(array)
882+
}
883+
}
796884
(DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
797885
Arc::new(Float32Array::from(vec![value.0; num_rows]))
798886
}
@@ -1781,4 +1869,30 @@ mod test {
17811869

17821870
assert_eq!(array.len(), num_rows);
17831871
}
1872+
1873+
#[test]
1874+
fn test_create_timestamp_microsecond_array_repeated() {
1875+
let target_type = DataType::Timestamp(TimeUnit::Microsecond, None);
1876+
let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1877+
let num_rows = 3;
1878+
1879+
let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1880+
.expect("Failed to create repeated timestamp microsecond array");
1881+
1882+
assert_eq!(array.data_type(), &target_type);
1883+
assert_eq!(array.len(), num_rows);
1884+
}
1885+
1886+
#[test]
1887+
fn test_create_timestamp_microsecond_with_timezone_array_repeated() {
1888+
let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
1889+
let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1890+
let num_rows = 2;
1891+
1892+
let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1893+
.expect("Failed to create repeated timestamp microsecond array with timezone");
1894+
1895+
assert_eq!(array.data_type(), &target_type);
1896+
assert_eq!(array.len(), num_rows);
1897+
}
17841898
}

0 commit comments

Comments
 (0)