Skip to content

Commit dd2100c

Browse files
committed
feat(sqlite): validate payload types at build time
PR #27 closed the reachable LargeUtf8/Utf8View/list read-back gaps, but the underlying defect class remained: the write side accepted any Arrow type (silent TEXT/NULL fallback) while the read side reconstructed only a subset, so an unsupported payload column built a "successful" sidecar that then failed on the first query (runtimedb#631). Close the class structurally: - Add `payload_type_supported` as the single source of truth for which Arrow types round-trip, and `validate_payload_schema` to gate every build entry point (`SqliteSidecarBuilder::begin`, `open_or_build`). An unsupported column now fails at index-build time with the column named, instead of at query time — which is where runtimedb's index-create step will surface it. - Widen actual support to the common parquet/DuckDB scalar types that previously fell through: Boolean, Date32, Date64, and Timestamp (all units, time zone preserved on read-back). Tests: reject a Decimal128 payload at build time; round-trip Boolean, Date32, and a tz-carrying Timestamp.
1 parent d1cd409 commit dd2100c

2 files changed

Lines changed: 315 additions & 3 deletions

File tree

src/sqlite_provider.rs

Lines changed: 181 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_array::{
2525
Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, OffsetSizeTrait,
2626
RecordBatch, StringArray, UInt32Array, UInt64Array,
2727
};
28-
use arrow_schema::{DataType, FieldRef, SchemaRef};
28+
use arrow_schema::{DataType, FieldRef, SchemaRef, TimeUnit};
2929
use async_trait::async_trait;
3030
use datafusion::catalog::{Session, TableProvider};
3131
use datafusion::common::Result as DFResult;
@@ -213,6 +213,7 @@ impl SqliteLookupProvider {
213213
"pool_size must be at least 1".into(),
214214
));
215215
}
216+
validate_payload_schema(&schema, 0)?;
216217
let conn = open_conn(db_path)?;
217218

218219
let table_exists: bool = conn
@@ -336,6 +337,7 @@ impl SqliteSidecarBuilder {
336337
value_col_indices.len()
337338
)));
338339
}
340+
validate_payload_schema(&schema, key_col_index)?;
339341
let (create_sql, insert_sql) = ddl(table_name, &schema);
340342
let conn = open_conn(db_path)?;
341343
// Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the
@@ -981,11 +983,85 @@ fn build_table(
981983

982984
// ── Type conversion helpers ───────────────────────────────────────────────────
983985

986+
/// Single source of truth for which Arrow types this provider can store in the
987+
/// sidecar **and** reconstruct on read-back. Every type that returns `true` here
988+
/// must have a matching arm in all of `arrow_cell_to_sql` (write), `arrow_type_to_sql`
989+
/// (column affinity), and `sql_values_to_arrow` (read). `validate_payload_schema`
990+
/// gates every build against this set so an unsupported column fails at index
991+
/// build time — not later at query time, half a sidecar in.
992+
fn payload_type_supported(dt: &DataType) -> bool {
993+
match dt {
994+
DataType::Boolean
995+
| DataType::Int32
996+
| DataType::Int64
997+
| DataType::UInt32
998+
| DataType::UInt64
999+
| DataType::Float32
1000+
| DataType::Float64
1001+
| DataType::Utf8
1002+
| DataType::LargeUtf8
1003+
| DataType::Utf8View
1004+
| DataType::Date32
1005+
| DataType::Date64
1006+
| DataType::Timestamp(_, _) => true,
1007+
DataType::List(item) | DataType::LargeList(item) => {
1008+
list_item_type_supported(item.data_type())
1009+
}
1010+
_ => false,
1011+
}
1012+
}
1013+
1014+
/// Item types reconstructable by `json_text_to_list`. Lists are serialized to
1015+
/// JSON TEXT, so only types with a JSON representation `serialize_list` writes
1016+
/// and `json_text_to_list` reads are supported.
1017+
fn list_item_type_supported(dt: &DataType) -> bool {
1018+
matches!(
1019+
dt,
1020+
DataType::Utf8
1021+
| DataType::LargeUtf8
1022+
| DataType::Utf8View
1023+
| DataType::Int32
1024+
| DataType::Int64
1025+
)
1026+
}
1027+
1028+
/// Reject a build whose payload columns include a type the read-back path cannot
1029+
/// reconstruct. Called from every build entry point so the failure surfaces at
1030+
/// index-creation time with the offending column named, rather than as a
1031+
/// `unsupported Arrow type` error on the first query against a sidecar that
1032+
/// "built successfully". `key_col_index` is skipped — keys are validated
1033+
/// separately by `extract_key`.
1034+
fn validate_payload_schema(schema: &SchemaRef, key_col_index: usize) -> DFResult<()> {
1035+
for (i, field) in schema.fields().iter().enumerate() {
1036+
if i == key_col_index {
1037+
continue;
1038+
}
1039+
if !payload_type_supported(field.data_type()) {
1040+
return Err(DataFusionError::Execution(format!(
1041+
"SqliteLookupProvider: payload column '{}' has unsupported type {:?}. \
1042+
Supported types: Boolean, Int32/64, UInt32/64, Float32/64, \
1043+
Utf8/LargeUtf8/Utf8View, Date32/64, Timestamp, and List/LargeList \
1044+
of {{Utf8, LargeUtf8, Utf8View, Int32, Int64}}.",
1045+
field.name(),
1046+
field.data_type()
1047+
)));
1048+
}
1049+
}
1050+
Ok(())
1051+
}
1052+
9841053
fn arrow_type_to_sql(dt: &DataType) -> &'static str {
9851054
match dt {
986-
DataType::UInt64 | DataType::UInt32 | DataType::Int32 | DataType::Int64 => "INTEGER",
1055+
DataType::UInt64
1056+
| DataType::UInt32
1057+
| DataType::Int32
1058+
| DataType::Int64
1059+
| DataType::Boolean
1060+
| DataType::Date32
1061+
| DataType::Date64
1062+
| DataType::Timestamp(_, _) => "INTEGER",
9871063
DataType::Float32 | DataType::Float64 => "REAL",
988-
_ => "TEXT", // Utf8, LargeUtf8, List variants → TEXT (JSON for lists)
1064+
_ => "TEXT", // Utf8, LargeUtf8, Utf8View, List variants → TEXT (JSON for lists)
9891065
}
9901066
}
9911067

@@ -1057,7 +1133,54 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue {
10571133
.unwrap()
10581134
.value(row),
10591135
),
1136+
DataType::Boolean => SqlValue::Integer(
1137+
col.as_any()
1138+
.downcast_ref::<arrow_array::BooleanArray>()
1139+
.unwrap()
1140+
.value(row) as i64,
1141+
),
1142+
DataType::Date32 => SqlValue::Integer(
1143+
col.as_any()
1144+
.downcast_ref::<arrow_array::Date32Array>()
1145+
.unwrap()
1146+
.value(row) as i64,
1147+
),
1148+
DataType::Date64 => SqlValue::Integer(
1149+
col.as_any()
1150+
.downcast_ref::<arrow_array::Date64Array>()
1151+
.unwrap()
1152+
.value(row),
1153+
),
1154+
// Timestamps are stored as their raw i64 tick count; the unit and time
1155+
// zone live in the schema and are restored on read-back.
1156+
DataType::Timestamp(unit, _) => {
1157+
let v = match unit {
1158+
TimeUnit::Second => col
1159+
.as_any()
1160+
.downcast_ref::<arrow_array::TimestampSecondArray>()
1161+
.unwrap()
1162+
.value(row),
1163+
TimeUnit::Millisecond => col
1164+
.as_any()
1165+
.downcast_ref::<arrow_array::TimestampMillisecondArray>()
1166+
.unwrap()
1167+
.value(row),
1168+
TimeUnit::Microsecond => col
1169+
.as_any()
1170+
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
1171+
.unwrap()
1172+
.value(row),
1173+
TimeUnit::Nanosecond => col
1174+
.as_any()
1175+
.downcast_ref::<arrow_array::TimestampNanosecondArray>()
1176+
.unwrap()
1177+
.value(row),
1178+
};
1179+
SqlValue::Integer(v)
1180+
}
10601181
DataType::List(_) | DataType::LargeList(_) => SqlValue::Text(serialize_list(col, row)),
1182+
// Unreachable for any schema that passed `validate_payload_schema`; kept
1183+
// as a defensive fallback rather than a panic.
10611184
_ => SqlValue::Null,
10621185
}
10631186
}
@@ -1239,6 +1362,61 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec<SqlValue>) -> DFResult<ArrayRe
12391362
}
12401363
Arc::new(b.finish())
12411364
}
1365+
DataType::Boolean => {
1366+
let arr: arrow_array::BooleanArray = values
1367+
.iter()
1368+
.map(|v| match v {
1369+
SqlValue::Integer(i) => Some(*i != 0),
1370+
_ => None,
1371+
})
1372+
.collect();
1373+
Arc::new(arr)
1374+
}
1375+
DataType::Date32 => {
1376+
let arr: arrow_array::Date32Array = values
1377+
.iter()
1378+
.map(|v| match v {
1379+
SqlValue::Integer(i) => Some(*i as i32),
1380+
_ => None,
1381+
})
1382+
.collect();
1383+
Arc::new(arr)
1384+
}
1385+
DataType::Date64 => {
1386+
let arr: arrow_array::Date64Array = values
1387+
.iter()
1388+
.map(|v| match v {
1389+
SqlValue::Integer(i) => Some(*i),
1390+
_ => None,
1391+
})
1392+
.collect();
1393+
Arc::new(arr)
1394+
}
1395+
// Rebuild the unit-specific array, then restore the schema's time zone so
1396+
// the reconstructed column's DataType matches the original exactly.
1397+
DataType::Timestamp(unit, tz) => {
1398+
let it = values.iter().map(|v| match v {
1399+
SqlValue::Integer(i) => Some(*i),
1400+
_ => None,
1401+
});
1402+
match unit {
1403+
TimeUnit::Second => Arc::new(
1404+
arrow_array::TimestampSecondArray::from_iter(it).with_timezone_opt(tz.clone()),
1405+
),
1406+
TimeUnit::Millisecond => Arc::new(
1407+
arrow_array::TimestampMillisecondArray::from_iter(it)
1408+
.with_timezone_opt(tz.clone()),
1409+
),
1410+
TimeUnit::Microsecond => Arc::new(
1411+
arrow_array::TimestampMicrosecondArray::from_iter(it)
1412+
.with_timezone_opt(tz.clone()),
1413+
),
1414+
TimeUnit::Nanosecond => Arc::new(
1415+
arrow_array::TimestampNanosecondArray::from_iter(it)
1416+
.with_timezone_opt(tz.clone()),
1417+
),
1418+
}
1419+
}
12421420
DataType::List(item_field) => json_text_to_list::<i32>(item_field, &values)?,
12431421
DataType::LargeList(item_field) => json_text_to_list::<i64>(item_field, &values)?,
12441422
DataType::Float64 => {

tests/sqlite_provider_test.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,3 +781,137 @@ async fn test_custom_key_column_name() {
781781
.unwrap();
782782
assert_eq!(key_col.value(0), 1);
783783
}
784+
785+
/// The defect class behind hotdata-dev/runtimedb#631 was that the write side
786+
/// accepted any Arrow type (silent TEXT/NULL fallback) while the read side only
787+
/// reconstructed a subset — so an unsupported payload column built a "successful"
788+
/// sidecar that then exploded on the first query. This asserts the failure now
789+
/// surfaces at *build* time (`begin`) with the offending column named, which is
790+
/// where runtimedb's index-create step will catch it.
791+
#[tokio::test]
792+
async fn test_unsupported_payload_type_rejected_at_build() {
793+
let dir = tempdir().unwrap();
794+
795+
// Decimal128 is a common parquet type the read-back path cannot reconstruct.
796+
let schema = Arc::new(Schema::new(vec![
797+
Field::new("rowid", DataType::Int64, false),
798+
Field::new("price", DataType::Decimal128(10, 2), true),
799+
]));
800+
801+
let db_path = dir.path().join("bad.db");
802+
let err =
803+
SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1])
804+
.map(|_| ())
805+
.expect_err("a Decimal128 payload column must be rejected at build time");
806+
807+
let msg = err.to_string();
808+
assert!(
809+
msg.contains("price") && msg.contains("unsupported type"),
810+
"error should name the offending column and type, got: {msg}"
811+
);
812+
}
813+
814+
/// Round-trips the basic scalar types added alongside the validation gate
815+
/// (Boolean, Date32, and a timezone-carrying Timestamp). These are exactly the
816+
/// kinds of columns DuckDB/parquet emit that previously fell through to the
817+
/// "unsupported Arrow type" error on read-back.
818+
#[tokio::test]
819+
async fn test_basic_scalar_types_roundtrip() {
820+
use arrow_array::{BooleanArray, Date32Array, TimestampMicrosecondArray};
821+
use arrow_schema::TimeUnit;
822+
823+
let dir = tempdir().unwrap();
824+
825+
let ts_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
826+
let schema = Arc::new(Schema::new(vec![
827+
Field::new("rowid", DataType::Int64, false),
828+
Field::new("flag", DataType::Boolean, true),
829+
Field::new("day", DataType::Date32, true),
830+
Field::new("ts", ts_type.clone(), true),
831+
]));
832+
833+
let ts_array =
834+
TimestampMicrosecondArray::from(vec![Some(1_000_000_i64), None, Some(3_000_000)])
835+
.with_timezone_opt(Some("UTC".to_string()));
836+
837+
let batch = RecordBatch::try_new(
838+
schema.clone(),
839+
vec![
840+
Arc::new(Int64Array::from(vec![1_i64, 2, 3])),
841+
Arc::new(BooleanArray::from(vec![Some(true), Some(false), None])),
842+
Arc::new(Date32Array::from(vec![
843+
Some(19_000_i32),
844+
None,
845+
Some(19_002),
846+
])),
847+
Arc::new(ts_array),
848+
],
849+
)
850+
.unwrap();
851+
852+
let db_path = dir.path().join("basics.db");
853+
let mut builder = SqliteSidecarBuilder::begin(
854+
db_path.to_str().unwrap(),
855+
"models",
856+
2,
857+
schema,
858+
0,
859+
vec![1, 2, 3],
860+
)
861+
.unwrap();
862+
builder.push_batch(&batch).unwrap();
863+
let provider = builder.finish().unwrap();
864+
865+
let batches = provider
866+
.fetch_by_keys(&[1, 3], "rowid", None)
867+
.await
868+
.unwrap();
869+
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
870+
871+
// The timestamp column must reconstruct with its original unit *and* time
872+
// zone, otherwise the column's DataType won't match the declared schema.
873+
for b in &batches {
874+
assert_eq!(b.column_by_name("ts").unwrap().data_type(), &ts_type);
875+
b.column_by_name("flag")
876+
.unwrap()
877+
.as_any()
878+
.downcast_ref::<BooleanArray>()
879+
.expect("flag should reconstruct as BooleanArray");
880+
b.column_by_name("day")
881+
.unwrap()
882+
.as_any()
883+
.downcast_ref::<Date32Array>()
884+
.expect("day should reconstruct as Date32Array");
885+
}
886+
887+
// Spot-check rowid 1's values survived the round-trip.
888+
let rowid_one = batches
889+
.iter()
890+
.flat_map(|b| {
891+
let rid = b
892+
.column_by_name("rowid")
893+
.unwrap()
894+
.as_any()
895+
.downcast_ref::<Int64Array>()
896+
.unwrap();
897+
let flag = b
898+
.column_by_name("flag")
899+
.unwrap()
900+
.as_any()
901+
.downcast_ref::<BooleanArray>()
902+
.unwrap();
903+
let day = b
904+
.column_by_name("day")
905+
.unwrap()
906+
.as_any()
907+
.downcast_ref::<Date32Array>()
908+
.unwrap();
909+
(0..b.num_rows())
910+
.filter(|&i| rid.value(i) == 1)
911+
.map(|i| (flag.value(i), day.value(i)))
912+
.collect::<Vec<_>>()
913+
})
914+
.next()
915+
.expect("rowid 1 should be present");
916+
assert_eq!(rowid_one, (true, 19_000));
917+
}

0 commit comments

Comments
 (0)