Skip to content

Commit 07be0eb

Browse files
committed
perf: Use protobuf instead of JSON to serialize Iceberg partition values
1 parent ea264a3 commit 07be0eb

4 files changed

Lines changed: 337 additions & 90 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 111 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2629,6 +2629,106 @@ fn convert_spark_types_to_arrow_schema(
26292629
arrow_schema
26302630
}
26312631

2632+
/// Converts a protobuf PartitionValue to an iceberg Literal.
2633+
///
2634+
/// This replaces JSON parsing with direct protobuf deserialization with a more compact
2635+
/// representation (e.g., timestamps as integers vs strings).
2636+
fn partition_value_to_literal(
2637+
proto_value: &spark_operator::PartitionValue,
2638+
) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
2639+
use spark_operator::partition_value::Value;
2640+
2641+
if proto_value.is_null {
2642+
return Ok(None);
2643+
}
2644+
2645+
let literal = match &proto_value.value {
2646+
Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v),
2647+
Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v),
2648+
Some(Value::DateVal(v)) => {
2649+
// Convert i64 to i32 for date (days since epoch)
2650+
let days = (*v).try_into().map_err(|_| {
2651+
GeneralError(format!("Date value out of range: {}", v))
2652+
})?;
2653+
iceberg::spec::Literal::date(days)
2654+
}
2655+
Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v),
2656+
Some(Value::TimestampTzVal(v)) => iceberg::spec::Literal::timestamptz(*v),
2657+
Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()),
2658+
Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v),
2659+
Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v),
2660+
Some(Value::DecimalVal(bytes)) => {
2661+
// Deserialize unscaled BigInteger bytes to i128
2662+
// BigInteger is serialized as signed big-endian bytes
2663+
if bytes.len() > 16 {
2664+
return Err(GeneralError(format!(
2665+
"Decimal bytes too large: {} bytes (max 16 for i128)",
2666+
bytes.len()
2667+
)));
2668+
}
2669+
2670+
// Convert big-endian bytes to i128
2671+
let mut buf = [0u8; 16];
2672+
let offset = 16 - bytes.len();
2673+
buf[offset..].copy_from_slice(bytes);
2674+
2675+
// Handle sign extension for negative numbers
2676+
let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
2677+
// Negative number - sign extend
2678+
for i in 0..offset {
2679+
buf[i] = 0xFF;
2680+
}
2681+
i128::from_be_bytes(buf)
2682+
} else {
2683+
// Positive number
2684+
i128::from_be_bytes(buf)
2685+
};
2686+
2687+
iceberg::spec::Literal::decimal(value)
2688+
}
2689+
Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v),
2690+
Some(Value::UuidVal(bytes)) => {
2691+
// Deserialize UUID from 16 bytes
2692+
if bytes.len() != 16 {
2693+
return Err(GeneralError(format!(
2694+
"Invalid UUID bytes length: {} (expected 16)",
2695+
bytes.len()
2696+
)));
2697+
}
2698+
let uuid = uuid::Uuid::from_slice(bytes).map_err(|e| {
2699+
GeneralError(format!("Failed to parse UUID: {}", e))
2700+
})?;
2701+
iceberg::spec::Literal::uuid(uuid)
2702+
}
2703+
Some(Value::FixedVal(bytes)) => iceberg::spec::Literal::fixed(bytes.to_vec()),
2704+
Some(Value::BinaryVal(bytes)) => iceberg::spec::Literal::binary(bytes.to_vec()),
2705+
None => {
2706+
return Err(GeneralError(
2707+
"PartitionValue has no value set and is_null is false".to_string(),
2708+
));
2709+
}
2710+
};
2711+
2712+
Ok(Some(literal))
2713+
}
2714+
2715+
/// Converts a protobuf PartitionData to an iceberg Struct.
2716+
///
2717+
/// Uses the existing Struct::from_iter() API from iceberg-rust to construct the struct
2718+
/// from the list of partition values.
2719+
/// This can potentially be upstreamed to iceberg_rust
2720+
fn partition_data_to_struct(
2721+
proto_partition: &spark_operator::PartitionData,
2722+
) -> Result<iceberg::spec::Struct, ExecutionError> {
2723+
let literals: Vec<Option<iceberg::spec::Literal>> = proto_partition
2724+
.values
2725+
.iter()
2726+
.map(partition_value_to_literal)
2727+
.collect::<Result<Vec<_>, _>>()?;
2728+
2729+
Ok(iceberg::spec::Struct::from_iter(literals))
2730+
}
2731+
26322732
/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects.
26332733
///
26342734
/// Each task contains a residual predicate that is used for row-group level filtering
@@ -2655,19 +2755,6 @@ fn parse_file_scan_tasks(
26552755
})
26562756
.collect::<Result<Vec<_>, _>>()?;
26572757

2658-
let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
2659-
.partition_type_pool
2660-
.iter()
2661-
.map(|json| {
2662-
serde_json::from_str(json).map_err(|e| {
2663-
ExecutionError::GeneralError(format!(
2664-
"Failed to parse partition type JSON from pool: {}",
2665-
e
2666-
))
2667-
})
2668-
})
2669-
.collect::<Result<Vec<_>, _>>()?;
2670-
26712758
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
26722759
.partition_spec_pool
26732760
.iter()
@@ -2721,19 +2808,7 @@ fn parse_file_scan_tasks(
27212808
})
27222809
.collect::<Result<Vec<_>, _>>()?;
27232810

2724-
let partition_data_cache: Vec<serde_json::Value> = proto_scan
2725-
.partition_data_pool
2726-
.iter()
2727-
.map(|json| {
2728-
serde_json::from_str(json).map_err(|e| {
2729-
ExecutionError::GeneralError(format!(
2730-
"Failed to parse partition data JSON from pool: {}",
2731-
e
2732-
))
2733-
})
2734-
})
2735-
.collect::<Result<Vec<_>, _>>()?;
2736-
2811+
// Partition data pool is in protobuf messages
27372812
let results: Result<Vec<_>, _> = proto_tasks
27382813
.iter()
27392814
.map(|proto_task| {
@@ -2787,48 +2862,24 @@ fn parse_file_scan_tasks(
27872862
};
27882863

27892864
let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
2790-
let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| {
2791-
ExecutionError::GeneralError(
2792-
"partition_type_idx is required when partition_data_idx is present"
2793-
.to_string(),
2794-
)
2795-
})?;
2796-
2797-
let partition_data_value = partition_data_cache
2865+
// Get partition data from protobuf pool
2866+
let partition_data_proto = proto_scan
2867+
.partition_data_pool
27982868
.get(partition_data_idx as usize)
27992869
.ok_or_else(|| {
28002870
ExecutionError::GeneralError(format!(
2801-
"Invalid partition_data_idx: {} (cache size: {})",
2871+
"Invalid partition_data_idx: {} (pool size: {})",
28022872
partition_data_idx,
2803-
partition_data_cache.len()
2873+
proto_scan.partition_data_pool.len()
28042874
))
28052875
})?;
28062876

2807-
let partition_type = partition_type_cache
2808-
.get(partition_type_idx as usize)
2809-
.ok_or_else(|| {
2810-
ExecutionError::GeneralError(format!(
2811-
"Invalid partition_type_idx: {} (cache size: {})",
2812-
partition_type_idx,
2813-
partition_type_cache.len()
2814-
))
2815-
})?;
2816-
2817-
match iceberg::spec::Literal::try_from_json(
2818-
partition_data_value.clone(),
2819-
&iceberg::spec::Type::Struct(partition_type.clone()),
2820-
) {
2821-
Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
2822-
Ok(None) => None,
2823-
Ok(other) => {
2824-
return Err(GeneralError(format!(
2825-
"Expected struct literal for partition data, got: {:?}",
2826-
other
2827-
)))
2828-
}
2877+
// Convert protobuf PartitionData to iceberg Struct
2878+
match partition_data_to_struct(partition_data_proto) {
2879+
Ok(s) => Some(s),
28292880
Err(e) => {
2830-
return Err(GeneralError(format!(
2831-
"Failed to deserialize partition data from JSON: {}",
2881+
return Err(ExecutionError::GeneralError(format!(
2882+
"Failed to deserialize partition data from protobuf: {}",
28322883
e
28332884
)))
28342885
}

native/proto/src/proto/operator.proto

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,32 @@ message CsvOptions {
130130
bool truncated_rows = 8;
131131
}
132132

133+
// Partition value for Iceberg partition data
134+
message PartitionValue {
135+
int32 field_id = 1;
136+
oneof value {
137+
int32 int_val = 2;
138+
int64 long_val = 3;
139+
int64 date_val = 4; // days since epoch
140+
int64 timestamp_val = 5; // microseconds since epoch
141+
int64 timestamp_tz_val = 6; // microseconds with timezone
142+
string string_val = 7;
143+
double double_val = 8;
144+
float float_val = 9;
145+
bytes decimal_val = 10; // unscaled BigInteger bytes
146+
bool bool_val = 11;
147+
bytes uuid_val = 12;
148+
bytes fixed_val = 13;
149+
bytes binary_val = 14;
150+
}
151+
bool is_null = 15;
152+
}
153+
154+
// Collection of partition values for a single partition
155+
message PartitionData {
156+
repeated PartitionValue values = 1;
157+
}
158+
133159
message IcebergScan {
134160
// Schema to read
135161
repeated SparkStructField required_schema = 1;
@@ -149,7 +175,7 @@ message IcebergScan {
149175
repeated string partition_spec_pool = 7;
150176
repeated string name_mapping_pool = 8;
151177
repeated ProjectFieldIdList project_field_ids_pool = 9;
152-
repeated string partition_data_pool = 10;
178+
repeated PartitionData partition_data_pool = 10;
153179
repeated DeleteFileList delete_files_pool = 11;
154180
repeated spark.spark_expression.Expr residual_pool = 12;
155181
}

0 commit comments

Comments
 (0)