Skip to content

Commit e0e6a88

Browse files
rustyconoverclaude
andcommitted
wire: drop hand-rolled flatbuffer parsing for upstream RecordBatch::custom_metadata
Pin arrow-rs to rustyconover/arrow-rs#feat/recordbatch-custom-metadata (apache/arrow-rs#9445) and rewrite vgi-rpc/src/wire.rs as a thin wrapper around arrow_ipc::reader::StreamReader / writer::StreamWriter. Per-batch metadata now travels on RecordBatch directly via with_custom_metadata() / custom_metadata(); the Metadata alias becomes HashMap<String, String> and the ReadBatch wrapper is gone. relax_nullability flips with_skip_validation(true) on the inner reader since upstream validates before our schema rewrap. Also bundles in-progress conformance worker, http, and arrow_type changes that were already pending on the branch. Conformance: 723/723 across pipe/subprocess/http/unix/externalize. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 91d9c38 commit e0e6a88

29 files changed

Lines changed: 1830 additions & 564 deletions

Cargo.lock

Lines changed: 30 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ bincode = "1.3"
3535
syn = { version = "2", features = ["full", "extra-traits"] }
3636
quote = "1"
3737
proc-macro2 = "1"
38+
chrono = { version = "0.4", default-features = false, features = ["std"] }
39+
rust_decimal = { version = "1", default-features = false }
40+
41+
[patch.crates-io]
42+
arrow-array = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
43+
arrow-buffer = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
44+
arrow-data = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
45+
arrow-ipc = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
46+
arrow-schema = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
47+
arrow-select = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
48+
arrow-cast = { git = "https://github.com/rustyconover/arrow-rs.git", branch = "feat/recordbatch-custom-metadata" }
3849

3950
[profile.release]
4051
opt-level = 3

conformance-worker/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ path = "src/main.rs"
1111

1212
[dependencies]
1313
vgi-rpc = { path = "../vgi-rpc", features = ["http"] }
14+
vgi-rpc-s3 = { path = "../vgi-rpc-s3" }
15+
reqwest = { version = "0.12", default-features = false, features = ["blocking", "rustls-tls", "json"] }
1416
arrow-array.workspace = true
1517
arrow-buffer.workspace = true
1618
arrow-data.workspace = true
@@ -26,3 +28,4 @@ parking_lot = "0.12"
2628
serde.workspace = true
2729
serde_json = "1"
2830
ctrlc = "3"
31+
chrono.workspace = true

conformance-worker/src/conformance/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,32 @@ mod params;
1212
mod streams;
1313
mod types;
1414
mod unary;
15+
mod wide_types;
1516

1617
use std::sync::Arc;
1718

1819
use vgi_rpc::RpcServer;
1920

2021
/// Build an `RpcServer` with all conformance methods registered.
2122
pub fn build_server() -> RpcServer {
23+
build_server_with_external(None)
24+
}
25+
26+
/// Build an `RpcServer` with all conformance methods registered, optionally
27+
/// wired to an external-location config (used by `TestExternalLocation`).
28+
pub fn build_server_with_external(
29+
external: Option<vgi_rpc::external::ExternalLocationConfig>,
30+
) -> RpcServer {
2231
let mut builder = RpcServer::builder()
2332
.server_id("rust-conf-0001")
2433
.protocol_name("ConformanceService")
2534
.server_version("rust-conformance-0.2.0")
2635
.enable_describe(true);
2736

37+
if let Some(cfg) = external {
38+
builder = builder.with_external_location(cfg);
39+
}
40+
2841
// When VGI_ACCESS_LOG is set, emit JSON-per-call access records to that
2942
// file. Used for manual validation against vgi_rpc.access_log_conformance.
3043
if let Ok(path) = std::env::var("VGI_ACCESS_LOG") {

conformance-worker/src/conformance/types.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,17 @@ impl Point {
5757
let mut buf = Vec::new();
5858
{
5959
let mut w = StreamWriter::new(&mut buf, &schema)?;
60-
w.write(&batch, None)?;
60+
w.write(&batch)?;
6161
w.finish()?;
6262
}
6363
Ok(buf)
6464
}
6565

6666
pub fn deserialize_ipc(bytes: &[u8]) -> Result<Self> {
6767
let mut r = StreamReader::new(bytes)?;
68-
let rb = r
68+
let batch = r
6969
.read_next()?
7070
.ok_or_else(|| RpcError::type_error("empty Point IPC stream"))?;
71-
let batch = rb.batch;
7271
let schema = batch.schema();
7372
let x_idx = schema
7473
.index_of("x")
@@ -338,18 +337,17 @@ impl AllTypes {
338337
let mut buf = Vec::new();
339338
{
340339
let mut w = StreamWriter::new(&mut buf, &all_types_schema())?;
341-
w.write(&batch, None)?;
340+
w.write(&batch)?;
342341
w.finish()?;
343342
}
344343
Ok(buf)
345344
}
346345

347346
pub fn deserialize_ipc(bytes: &[u8]) -> Result<Self> {
348347
let mut r = StreamReader::new(bytes)?;
349-
let rb = r
348+
let batch = r
350349
.read_next()?
351350
.ok_or_else(|| RpcError::type_error("empty AllTypes IPC"))?;
352-
let batch = rb.batch;
353351
AllTypes::from_record_batch(&batch)
354352
}
355353

@@ -679,7 +677,7 @@ pub fn serialize_bounding_box_ipc(top: &Point, bot: &Point, label: &str) -> Resu
679677
let mut buf = Vec::new();
680678
{
681679
let mut w = StreamWriter::new(&mut buf, &schema)?;
682-
w.write(&batch, None)?;
680+
w.write(&batch)?;
683681
w.finish()?;
684682
}
685683
Ok(buf)
@@ -690,5 +688,5 @@ pub fn deserialize_bounding_box_ipc(bytes: &[u8]) -> Result<(Point, Point, Strin
690688
let rb = r
691689
.read_next()?
692690
.ok_or_else(|| RpcError::type_error("empty BoundingBox IPC"))?;
693-
bounding_box_from_batch(&rb.batch)
691+
bounding_box_from_batch(&rb)
694692
}

0 commit comments

Comments
 (0)