Skip to content

Commit 8955075

Browse files
committed
feat: Add timestamp nanosecond primitive types
1 parent 0ca52ba commit 8955075

35 files changed

Lines changed: 792 additions & 56 deletions

src/iceberg/avro/avro_data_util.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,19 @@ Status AppendPrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
431431
return {};
432432
}
433433

434+
case TypeId::kTimestampNs:
435+
case TypeId::kTimestampTzNs: {
436+
if (avro_node->type() != ::avro::AVRO_LONG ||
437+
avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_NANOS) {
438+
return InvalidArgument(
439+
"Expected Avro long with TIMESTAMP_NANOS for timestamp field, got: {}",
440+
ToString(avro_node));
441+
}
442+
auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
443+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value<int64_t>()));
444+
return {};
445+
}
446+
434447
default:
435448
return InvalidArgument("Unsupported primitive type {} to append avro node {}",
436449
projected_field.type()->ToString(), ToString(avro_node));

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,20 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
562562
return {};
563563
}
564564

565+
case TypeId::kTimestampNs:
566+
case TypeId::kTimestampTzNs: {
567+
if (avro_node->type() != ::avro::AVRO_LONG ||
568+
avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_NANOS) {
569+
return InvalidArgument(
570+
"Expected Avro long with TIMESTAMP_NANOS for timestamp field, got: {}",
571+
ToString(avro_node));
572+
}
573+
auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
574+
int64_t value = decoder.decodeLong();
575+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
576+
return {};
577+
}
578+
565579
default:
566580
return InvalidArgument("Unsupported primitive type {} to decode from avro node {}",
567581
projected_field.type()->ToString(), ToString(avro_node));

src/iceberg/avro/avro_schema_util.cc

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ Status ToAvroNodeVisitor::Visit(const TimestampTzType& type, ::avro::NodePtr* no
194194
return {};
195195
}
196196

197+
Status ToAvroNodeVisitor::Visit(const TimestampNsType& type, ::avro::NodePtr* node) {
198+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
199+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_NANOS});
200+
::avro::CustomAttributes attributes;
201+
attributes.addAttribute(std::string(kAdjustToUtcProp), "false", /*addQuotes=*/false);
202+
(*node)->addCustomAttributesForField(attributes);
203+
return {};
204+
}
205+
206+
Status ToAvroNodeVisitor::Visit(const TimestampTzNsType& type, ::avro::NodePtr* node) {
207+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
208+
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_NANOS});
209+
::avro::CustomAttributes attributes;
210+
attributes.addAttribute(std::string(kAdjustToUtcProp), "true", /*addQuotes=*/false);
211+
(*node)->addCustomAttributesForField(attributes);
212+
return {};
213+
}
214+
197215
Status ToAvroNodeVisitor::Visit(const StringType& type, ::avro::NodePtr* node) {
198216
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_STRING);
199217
return {};
@@ -457,6 +475,19 @@ std::optional<std::string> GetAdjustToUtc(const ::avro::NodePtr& node) {
457475
return node->customAttributesAt(0).getAttribute(std::string(kAdjustToUtcProp));
458476
}
459477

478+
std::optional<std::string> GetAdjustToUtc(
479+
const ::avro::NodePtr& node,
480+
const std::optional<std::reference_wrapper<const ::avro::CustomAttributes>>&
481+
field_attributes) {
482+
if (auto adjust_to_utc = GetAdjustToUtc(node); adjust_to_utc.has_value()) {
483+
return adjust_to_utc;
484+
}
485+
if (!field_attributes.has_value()) {
486+
return std::nullopt;
487+
}
488+
return field_attributes->get().getAttribute(std::string(kAdjustToUtcProp));
489+
}
490+
460491
Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& attr_name,
461492
size_t field_idx) {
462493
if (field_idx >= node->customAttributes()) {
@@ -492,8 +523,10 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
492523
return GetId(node, kFieldIdKey, field_idx);
493524
}
494525

495-
Status ValidateAvroSchemaEvolution(const Type& expected_type,
496-
const ::avro::NodePtr& avro_node) {
526+
Status ValidateAvroSchemaEvolution(
527+
const Type& expected_type, const ::avro::NodePtr& avro_node,
528+
std::optional<std::reference_wrapper<const ::avro::CustomAttributes>>
529+
field_attributes = std::nullopt) {
497530
switch (expected_type.type_id()) {
498531
case TypeId::kBoolean:
499532
if (avro_node->type() == ::avro::AVRO_BOOL) {
@@ -537,14 +570,28 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
537570
case TypeId::kTimestamp:
538571
if (avro_node->type() == ::avro::AVRO_LONG &&
539572
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
540-
GetAdjustToUtc(avro_node).value_or("false") == "false") {
573+
GetAdjustToUtc(avro_node, field_attributes).value_or("false") == "false") {
541574
return {};
542575
}
543576
break;
544577
case TypeId::kTimestampTz:
545578
if (avro_node->type() == ::avro::AVRO_LONG &&
546579
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
547-
GetAdjustToUtc(avro_node).value_or("false") == "true") {
580+
GetAdjustToUtc(avro_node, field_attributes).value_or("false") == "true") {
581+
return {};
582+
}
583+
break;
584+
case TypeId::kTimestampNs:
585+
if (avro_node->type() == ::avro::AVRO_LONG &&
586+
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_NANOS) &&
587+
GetAdjustToUtc(avro_node, field_attributes).value_or("false") == "false") {
588+
return {};
589+
}
590+
break;
591+
case TypeId::kTimestampTzNs:
592+
if (avro_node->type() == ::avro::AVRO_LONG &&
593+
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_NANOS) &&
594+
GetAdjustToUtc(avro_node, field_attributes).value_or("false") == "true") {
548595
return {};
549596
}
550597
break;
@@ -653,15 +700,18 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
653700
FieldProjection child_projection;
654701

655702
if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) {
703+
std::optional<std::reference_wrapper<const ::avro::CustomAttributes>>
704+
field_attributes =
705+
std::cref(avro_node->customAttributesAt(iter->second.local_index));
656706
::avro::NodePtr field_node;
657707
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node));
658708
if (expected_field.type()->is_nested()) {
659709
ICEBERG_ASSIGN_OR_RAISE(
660710
child_projection,
661711
ProjectNested(*expected_field.type(), field_node, prune_source));
662712
} else {
663-
ICEBERG_RETURN_UNEXPECTED(
664-
ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
713+
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(
714+
*expected_field.type(), field_node, field_attributes));
665715
}
666716
child_projection.from = iter->second.local_index;
667717
child_projection.kind = FieldProjection::Kind::kProjected;
@@ -771,14 +821,19 @@ Result<FieldProjection> ProjectMap(const MapType& map_type,
771821
for (size_t i = 0; i < map_node->leaves(); ++i) {
772822
FieldProjection sub_projection;
773823
::avro::NodePtr sub_node;
824+
std::optional<std::reference_wrapper<const ::avro::CustomAttributes>>
825+
field_attributes = std::nullopt;
826+
if (map_node->customAttributes() > i) {
827+
field_attributes = std::cref(map_node->customAttributesAt(i));
828+
}
774829
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
775830
const auto& expected_sub_field = map_type.fields()[i];
776831
if (expected_sub_field.type()->is_nested()) {
777832
ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(),
778833
sub_node, prune_source));
779834
} else {
780-
ICEBERG_RETURN_UNEXPECTED(
781-
ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
835+
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(*expected_sub_field.type(),
836+
sub_node, field_attributes));
782837
}
783838
sub_projection.kind = FieldProjection::Kind::kProjected;
784839
sub_projection.from = i;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class ToAvroNodeVisitor {
5252
Status Visit(const TimeType& type, ::avro::NodePtr* node);
5353
Status Visit(const TimestampType& type, ::avro::NodePtr* node);
5454
Status Visit(const TimestampTzType& type, ::avro::NodePtr* node);
55+
Status Visit(const TimestampNsType& type, ::avro::NodePtr* node);
56+
Status Visit(const TimestampTzNsType& type, ::avro::NodePtr* node);
5557
Status Visit(const StringType& type, ::avro::NodePtr* node);
5658
Status Visit(const UuidType& type, ::avro::NodePtr* node);
5759
Status Visit(const FixedType& type, ::avro::NodePtr* node);

src/iceberg/expression/json_serde.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ Result<nlohmann::json> ToJson(const Literal& literal) {
272272
case TypeId::kTimestampTz:
273273
return nlohmann::json(
274274
TransformUtil::HumanTimestampWithZone(std::get<int64_t>(value)));
275+
case TypeId::kTimestampNs:
276+
return nlohmann::json(TransformUtil::HumanTimestampNs(std::get<int64_t>(value)));
277+
case TypeId::kTimestampTzNs:
278+
return nlohmann::json(
279+
TransformUtil::HumanTimestampNsWithZone(std::get<int64_t>(value)));
275280
case TypeId::kFloat:
276281
return nlohmann::json(std::get<float>(value));
277282
case TypeId::kDouble:
@@ -390,6 +395,26 @@ Result<Literal> LiteralFromJson(const nlohmann::json& json, const Type* type) {
390395
return Literal::TimestampTz(micros);
391396
}
392397

398+
case TypeId::kTimestampNs: {
399+
if (!json.is_string()) [[unlikely]] {
400+
return JsonParseError("Cannot parse {} as a timestamp_ns value",
401+
SafeDumpJson(json));
402+
}
403+
ICEBERG_ASSIGN_OR_RAISE(auto nanos,
404+
TransformUtil::ParseTimestampNs(json.get<std::string>()));
405+
return Literal::TimestampNs(nanos);
406+
}
407+
408+
case TypeId::kTimestampTzNs: {
409+
if (!json.is_string()) [[unlikely]] {
410+
return JsonParseError("Cannot parse {} as a timestamptz_ns value",
411+
SafeDumpJson(json));
412+
}
413+
ICEBERG_ASSIGN_OR_RAISE(
414+
auto nanos, TransformUtil::ParseTimestampNsWithZone(json.get<std::string>()));
415+
return Literal::TimestampTzNs(nanos);
416+
}
417+
393418
case TypeId::kUuid: {
394419
if (!json.is_string()) [[unlikely]] {
395420
return JsonParseError("Cannot parse {} as a uuid value", SafeDumpJson(json));

src/iceberg/expression/literal.cc

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ Result<Literal> LiteralCaster::CastFromLong(
150150
return Literal::Timestamp(long_val);
151151
case TypeId::kTimestampTz:
152152
return Literal::TimestampTz(long_val);
153+
case TypeId::kTimestampNs:
154+
return Literal::TimestampNs(long_val);
155+
case TypeId::kTimestampTzNs:
156+
return Literal::TimestampTzNs(long_val);
153157
default:
154158
return NotSupported("Cast from Long to {} is not supported",
155159
target_type->ToString());
@@ -215,6 +219,15 @@ Result<Literal> LiteralCaster::CastFromString(
215219
TransformUtil::ParseTimestampWithZone(str_val));
216220
return Literal::TimestampTz(micros);
217221
}
222+
case TypeId::kTimestampNs: {
223+
ICEBERG_ASSIGN_OR_RAISE(auto nanos, TransformUtil::ParseTimestampNs(str_val));
224+
return Literal::TimestampNs(nanos);
225+
}
226+
case TypeId::kTimestampTzNs: {
227+
ICEBERG_ASSIGN_OR_RAISE(auto nanos,
228+
TransformUtil::ParseTimestampNsWithZone(str_val));
229+
return Literal::TimestampTzNs(nanos);
230+
}
218231
case TypeId::kBinary: {
219232
ICEBERG_ASSIGN_OR_RAISE(auto bytes, StringUtils::HexStringToBytes(str_val));
220233
return Literal::Binary(std::move(bytes));
@@ -250,14 +263,27 @@ Result<Literal> LiteralCaster::CastFromString(
250263
Result<Literal> LiteralCaster::CastFromTimestamp(
251264
const Literal& literal, const std::shared_ptr<PrimitiveType>& target_type) {
252265
auto timestamp_val = std::get<int64_t>(literal.value_);
266+
const auto& source_timestamp =
267+
internal::checked_cast<const TimestampBase&>(*literal.type());
268+
const bool source_is_nanos = source_timestamp.time_unit() == TimeUnit::kNanosecond;
253269

254270
switch (target_type->type_id()) {
255271
case TypeId::kDate: {
256272
ICEBERG_ASSIGN_OR_RAISE(auto days, TemporalUtils::ExtractDay(literal));
257273
return Literal::Date(std::get<int32_t>(days.value()));
258274
}
275+
case TypeId::kTimestamp:
276+
return source_is_nanos ? Literal::Timestamp(timestamp_val / 1000)
277+
: Literal::Timestamp(timestamp_val);
259278
case TypeId::kTimestampTz:
260-
return Literal::TimestampTz(timestamp_val);
279+
return source_is_nanos ? Literal::TimestampTz(timestamp_val / 1000)
280+
: Literal::TimestampTz(timestamp_val);
281+
case TypeId::kTimestampNs:
282+
return source_is_nanos ? Literal::TimestampNs(timestamp_val)
283+
: Literal::TimestampNs(timestamp_val * 1000);
284+
case TypeId::kTimestampTzNs:
285+
return source_is_nanos ? Literal::TimestampTzNs(timestamp_val)
286+
: Literal::TimestampTzNs(timestamp_val * 1000);
261287
default:
262288
return NotSupported("Cast from Timestamp to {} is not supported",
263289
target_type->ToString());
@@ -266,15 +292,28 @@ Result<Literal> LiteralCaster::CastFromTimestamp(
266292

267293
Result<Literal> LiteralCaster::CastFromTimestampTz(
268294
const Literal& literal, const std::shared_ptr<PrimitiveType>& target_type) {
269-
auto micros = std::get<int64_t>(literal.value_);
295+
auto timestamp_val = std::get<int64_t>(literal.value_);
296+
const auto& source_timestamp =
297+
internal::checked_cast<const TimestampBase&>(*literal.type());
298+
const bool source_is_nanos = source_timestamp.time_unit() == TimeUnit::kNanosecond;
270299

271300
switch (target_type->type_id()) {
272301
case TypeId::kDate: {
273302
ICEBERG_ASSIGN_OR_RAISE(auto days, TemporalUtils::ExtractDay(literal));
274303
return Literal::Date(std::get<int32_t>(days.value()));
275304
}
305+
case TypeId::kTimestampTz:
306+
return source_is_nanos ? Literal::TimestampTz(timestamp_val / 1000)
307+
: Literal::TimestampTz(timestamp_val);
276308
case TypeId::kTimestamp:
277-
return Literal::Timestamp(micros);
309+
return source_is_nanos ? Literal::Timestamp(timestamp_val / 1000)
310+
: Literal::Timestamp(timestamp_val);
311+
case TypeId::kTimestampNs:
312+
return source_is_nanos ? Literal::TimestampNs(timestamp_val)
313+
: Literal::TimestampNs(timestamp_val * 1000);
314+
case TypeId::kTimestampTzNs:
315+
return source_is_nanos ? Literal::TimestampTzNs(timestamp_val)
316+
: Literal::TimestampTzNs(timestamp_val * 1000);
278317
default:
279318
return NotSupported("Cast from TimestampTz to {} is not supported",
280319
target_type->ToString());
@@ -329,6 +368,12 @@ Literal Literal::Timestamp(int64_t value) { return {Value{value}, timestamp()};
329368

330369
Literal Literal::TimestampTz(int64_t value) { return {Value{value}, timestamp_tz()}; }
331370

371+
Literal Literal::TimestampNs(int64_t value) { return {Value{value}, timestamp_ns()}; }
372+
373+
Literal Literal::TimestampTzNs(int64_t value) {
374+
return {Value{value}, timestamp_tz_ns()};
375+
}
376+
332377
Literal Literal::Float(float value) { return {Value{value}, float32()}; }
333378

334379
Literal Literal::Double(double value) { return {Value{value}, float64()}; }
@@ -395,8 +440,11 @@ bool Comparable(TypeId lhs, TypeId rhs) {
395440
case TypeId::kLong:
396441
case TypeId::kTimestamp:
397442
case TypeId::kTimestampTz:
443+
case TypeId::kTimestampNs:
444+
case TypeId::kTimestampTzNs:
398445
return rhs == TypeId::kLong || rhs == TypeId::kTimestamp ||
399-
rhs == TypeId::kTimestampTz;
446+
rhs == TypeId::kTimestampTz || rhs == TypeId::kTimestampNs ||
447+
rhs == TypeId::kTimestampTzNs;
400448
default:
401449
return lhs == rhs;
402450
}
@@ -439,7 +487,9 @@ std::partial_ordering Literal::operator<=>(const Literal& other) const {
439487
case TypeId::kLong:
440488
case TypeId::kTime:
441489
case TypeId::kTimestamp:
442-
case TypeId::kTimestampTz: {
490+
case TypeId::kTimestampTz:
491+
case TypeId::kTimestampNs:
492+
case TypeId::kTimestampTzNs: {
443493
auto this_val = std::get<int64_t>(value_);
444494
auto other_val = std::get<int64_t>(other.value_);
445495
return this_val <=> other_val;
@@ -548,7 +598,9 @@ std::string Literal::ToString() const {
548598
}
549599
case TypeId::kTime:
550600
case TypeId::kTimestamp:
551-
case TypeId::kTimestampTz: {
601+
case TypeId::kTimestampTz:
602+
case TypeId::kTimestampNs:
603+
case TypeId::kTimestampTzNs: {
552604
return std::to_string(std::get<int64_t>(value_));
553605
}
554606
case TypeId::kDate: {
@@ -613,6 +665,10 @@ Result<Literal> LiteralCaster::CastTo(const Literal& literal,
613665
return CastFromTimestamp(literal, target_type);
614666
case TypeId::kTimestampTz:
615667
return CastFromTimestampTz(literal, target_type);
668+
case TypeId::kTimestampNs:
669+
return CastFromTimestamp(literal, target_type);
670+
case TypeId::kTimestampTzNs:
671+
return CastFromTimestampTz(literal, target_type);
616672
default:
617673
break;
618674
}

src/iceberg/expression/literal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
7373
static Literal Time(int64_t value);
7474
static Literal Timestamp(int64_t value);
7575
static Literal TimestampTz(int64_t value);
76+
static Literal TimestampNs(int64_t value);
77+
static Literal TimestampTzNs(int64_t value);
7678
static Literal Float(float value);
7779
static Literal Double(double value);
7880
static Literal String(std::string value);
@@ -199,6 +201,8 @@ DEFINE_LITERAL_TRAIT(kLong, int64_t)
199201
DEFINE_LITERAL_TRAIT(kTime, int64_t)
200202
DEFINE_LITERAL_TRAIT(kTimestamp, int64_t)
201203
DEFINE_LITERAL_TRAIT(kTimestampTz, int64_t)
204+
DEFINE_LITERAL_TRAIT(kTimestampNs, int64_t)
205+
DEFINE_LITERAL_TRAIT(kTimestampTzNs, int64_t)
202206
DEFINE_LITERAL_TRAIT(kFloat, float)
203207
DEFINE_LITERAL_TRAIT(kDouble, double)
204208
DEFINE_LITERAL_TRAIT(kDecimal, Decimal)

src/iceberg/expression/predicate.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,8 +496,9 @@ bool BoundLiteralPredicate::Equals(const Expression& other) const {
496496

497497
// TODO(gangwu): add TypeId::kTimestampNano
498498
static const std::unordered_set<TypeId> kIntegralTypes = {
499-
TypeId::kInt, TypeId::kLong, TypeId::kDate,
500-
TypeId::kTime, TypeId::kTimestamp, TypeId::kTimestampTz};
499+
TypeId::kInt, TypeId::kLong, TypeId::kDate,
500+
TypeId::kTime, TypeId::kTimestamp, TypeId::kTimestampTz,
501+
TypeId::kTimestampNs, TypeId::kTimestampTzNs};
501502

502503
if (kIntegralTypes.contains(term_->type()->type_id()) &&
503504
term_->Equals(*other_pred->term())) {

0 commit comments

Comments
 (0)