Skip to content

Commit e3e49d5

Browse files
authored
feat: add multimodal serde serialization (#9)
1 parent a539c46 commit e3e49d5

5 files changed

Lines changed: 149 additions & 0 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ cython_debug/
198198
# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
199199
# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
200200
# refer to https://docs.cursor.com/context/ignore-files
201+
202+
# BSP / IDE workspace files
203+
.bazelbsp/
204+
.bsp/
205+
.ijwb/
201206
.cursorignore
202207
.cursorindexingignore
203208

rust/lance-context/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/lance-context/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ categories = ["database", "data-structures", "science"]
1212

1313
[dependencies]
1414
arrow-array = "56.2.0"
15+
arrow-ipc = "56.2.0"
1516
arrow-schema = "56.2.0"
1617
chrono = { version = "0.4", default-features = false, features = ["clock"] }
1718
lance = "1.0.0"

rust/lance-context/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
mod context;
44
mod record;
5+
pub mod serde;
56
mod store;
67

78
pub use context::{Context, ContextEntry, Snapshot};

rust/lance-context/src/serde.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use arrow_array::RecordBatch;
2+
use arrow_ipc::writer::StreamWriter;
3+
use arrow_schema::ArrowError;
4+
use serde::{Deserialize, Serialize};
5+
6+
pub const CONTENT_TYPE_TEXT: &str = "text/plain";
7+
pub const CONTENT_TYPE_ARROW_STREAM: &str = "application/vnd.apache.arrow.stream";
8+
9+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10+
pub struct SerializedContent {
11+
pub content_type: String,
12+
pub text_payload: Option<String>,
13+
pub binary_payload: Option<Vec<u8>>,
14+
}
15+
16+
impl SerializedContent {
17+
pub fn text(value: impl Into<String>, content_type: Option<&str>) -> Self {
18+
Self {
19+
content_type: content_type.unwrap_or(CONTENT_TYPE_TEXT).to_string(),
20+
text_payload: Some(value.into()),
21+
binary_payload: None,
22+
}
23+
}
24+
25+
pub fn image(bytes: impl Into<Vec<u8>>, mime: impl Into<String>) -> Self {
26+
Self {
27+
content_type: mime.into(),
28+
text_payload: None,
29+
binary_payload: Some(bytes.into()),
30+
}
31+
}
32+
33+
pub fn dataframe_batches(batches: &[RecordBatch]) -> Result<Self, ArrowError> {
34+
let ipc_bytes = record_batches_to_ipc(batches)?;
35+
Ok(Self::dataframe_ipc_bytes(ipc_bytes))
36+
}
37+
38+
pub fn dataframe_ipc_bytes(bytes: impl Into<Vec<u8>>) -> Self {
39+
Self {
40+
content_type: CONTENT_TYPE_ARROW_STREAM.to_string(),
41+
text_payload: None,
42+
binary_payload: Some(bytes.into()),
43+
}
44+
}
45+
}
46+
47+
pub fn serialize_image(bytes: impl Into<Vec<u8>>, mime: impl Into<String>) -> SerializedContent {
48+
SerializedContent::image(bytes, mime)
49+
}
50+
51+
pub fn serialize_dataframe(batches: &[RecordBatch]) -> Result<SerializedContent, ArrowError> {
52+
SerializedContent::dataframe_batches(batches)
53+
}
54+
55+
pub fn serialize_dataframe_ipc(bytes: impl Into<Vec<u8>>) -> SerializedContent {
56+
SerializedContent::dataframe_ipc_bytes(bytes)
57+
}
58+
59+
fn record_batches_to_ipc(batches: &[RecordBatch]) -> Result<Vec<u8>, ArrowError> {
60+
if batches.is_empty() {
61+
return Err(ArrowError::InvalidArgumentError(
62+
"no record batches provided".to_string(),
63+
));
64+
}
65+
66+
let schema = batches[0].schema();
67+
let mut buffer = Vec::new();
68+
{
69+
let mut writer = StreamWriter::try_new(&mut buffer, &schema)?;
70+
for batch in batches {
71+
if batch.schema() != schema {
72+
return Err(ArrowError::SchemaError(
73+
"record batch schema mismatch".to_string(),
74+
));
75+
}
76+
writer.write(batch)?;
77+
}
78+
writer.finish()?;
79+
}
80+
Ok(buffer)
81+
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
use super::*;
86+
use arrow_array::{Int32Array, RecordBatch, StringArray};
87+
use arrow_ipc::reader::StreamReader;
88+
use arrow_schema::{DataType, Field, Schema};
89+
use std::io::Cursor;
90+
use std::sync::Arc;
91+
92+
fn make_batch() -> RecordBatch {
93+
let schema = Arc::new(Schema::new(vec![
94+
Field::new("id", DataType::Int32, false),
95+
Field::new("name", DataType::Utf8, false),
96+
]));
97+
let id_array = Arc::new(Int32Array::from(vec![1, 2]));
98+
let name_array = Arc::new(StringArray::from(vec!["alpha", "beta"]));
99+
RecordBatch::try_new(schema, vec![id_array, name_array]).unwrap()
100+
}
101+
102+
#[test]
103+
fn image_serialization_sets_payloads() {
104+
let content = serialize_image(vec![1, 2, 3], "image/png");
105+
assert_eq!(content.content_type, "image/png");
106+
assert_eq!(content.text_payload, None);
107+
assert_eq!(content.binary_payload, Some(vec![1, 2, 3]));
108+
}
109+
110+
#[test]
111+
fn dataframe_serialization_writes_ipc_stream() {
112+
let batch = make_batch();
113+
let content = serialize_dataframe(std::slice::from_ref(&batch)).unwrap();
114+
assert_eq!(content.content_type, CONTENT_TYPE_ARROW_STREAM);
115+
let bytes = content.binary_payload.expect("expected IPC payload");
116+
117+
let reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
118+
let batches: Vec<RecordBatch> = reader.map(|item| item.unwrap()).collect();
119+
assert_eq!(batches.len(), 1);
120+
assert_eq!(batches[0].schema(), batch.schema());
121+
assert_eq!(batches[0].num_rows(), batch.num_rows());
122+
}
123+
124+
#[test]
125+
fn dataframe_serialization_rejects_empty_batches() {
126+
let err = serialize_dataframe(&[]).unwrap_err();
127+
assert!(matches!(err, ArrowError::InvalidArgumentError(_)));
128+
}
129+
130+
#[test]
131+
fn dataframe_serialization_rejects_mismatched_schema() {
132+
let batch = make_batch();
133+
let other_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
134+
let other_batch =
135+
RecordBatch::try_new(other_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))])
136+
.unwrap();
137+
138+
let err = serialize_dataframe(&[batch, other_batch]).unwrap_err();
139+
assert!(matches!(err, ArrowError::SchemaError(_)));
140+
}
141+
}

0 commit comments

Comments
 (0)