diff --git a/.gitignore b/.gitignore index b7faf40..90cdcf1 100644 --- a/.gitignore +++ b/.gitignore @@ -198,6 +198,11 @@ cython_debug/ # Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to # exclude from AI features like autocomplete and code analysis. Recommended for sensitive data # refer to https://docs.cursor.com/context/ignore-files + +# BSP / IDE workspace files +.bazelbsp/ +.bsp/ +.ijwb/ .cursorignore .cursorindexingignore diff --git a/rust/lance-context/Cargo.lock b/rust/lance-context/Cargo.lock index 2914054..8deac67 100644 --- a/rust/lance-context/Cargo.lock +++ b/rust/lance-context/Cargo.lock @@ -3385,6 +3385,7 @@ name = "lance-context" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-ipc", "arrow-schema", "chrono", "lance", diff --git a/rust/lance-context/Cargo.toml b/rust/lance-context/Cargo.toml index 91ed5d1..016d020 100644 --- a/rust/lance-context/Cargo.toml +++ b/rust/lance-context/Cargo.toml @@ -12,6 +12,7 @@ categories = ["database", "data-structures", "science"] [dependencies] arrow-array = "56.2.0" +arrow-ipc = "56.2.0" arrow-schema = "56.2.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } lance = "1.0.0" diff --git a/rust/lance-context/src/lib.rs b/rust/lance-context/src/lib.rs index c6e3165..5e8460a 100644 --- a/rust/lance-context/src/lib.rs +++ b/rust/lance-context/src/lib.rs @@ -2,6 +2,7 @@ mod context; mod record; +pub mod serde; mod store; pub use context::{Context, ContextEntry, Snapshot}; diff --git a/rust/lance-context/src/serde.rs b/rust/lance-context/src/serde.rs new file mode 100644 index 0000000..d0c97b1 --- /dev/null +++ b/rust/lance-context/src/serde.rs @@ -0,0 +1,141 @@ +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; +use arrow_schema::ArrowError; +use serde::{Deserialize, Serialize}; + +pub const CONTENT_TYPE_TEXT: &str = "text/plain"; +pub const CONTENT_TYPE_ARROW_STREAM: &str = "application/vnd.apache.arrow.stream"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SerializedContent { + pub content_type: String, + pub text_payload: Option, + pub binary_payload: Option>, +} + +impl SerializedContent { + pub fn text(value: impl Into, content_type: Option<&str>) -> Self { + Self { + content_type: content_type.unwrap_or(CONTENT_TYPE_TEXT).to_string(), + text_payload: Some(value.into()), + binary_payload: None, + } + } + + pub fn image(bytes: impl Into>, mime: impl Into) -> Self { + Self { + content_type: mime.into(), + text_payload: None, + binary_payload: Some(bytes.into()), + } + } + + pub fn dataframe_batches(batches: &[RecordBatch]) -> Result { + let ipc_bytes = record_batches_to_ipc(batches)?; + Ok(Self::dataframe_ipc_bytes(ipc_bytes)) + } + + pub fn dataframe_ipc_bytes(bytes: impl Into>) -> Self { + Self { + content_type: CONTENT_TYPE_ARROW_STREAM.to_string(), + text_payload: None, + binary_payload: Some(bytes.into()), + } + } +} + +pub fn serialize_image(bytes: impl Into>, mime: impl Into) -> SerializedContent { + SerializedContent::image(bytes, mime) +} + +pub fn serialize_dataframe(batches: &[RecordBatch]) -> Result { + SerializedContent::dataframe_batches(batches) +} + +pub fn serialize_dataframe_ipc(bytes: impl Into>) -> SerializedContent { + SerializedContent::dataframe_ipc_bytes(bytes) +} + +fn record_batches_to_ipc(batches: &[RecordBatch]) -> Result, ArrowError> { + if batches.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "no record batches provided".to_string(), + )); + } + + let schema = batches[0].schema(); + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema)?; + for batch in batches { + if batch.schema() != schema { + return Err(ArrowError::SchemaError( + "record batch schema mismatch".to_string(), + )); + } + writer.write(batch)?; + } + writer.finish()?; + } + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_ipc::reader::StreamReader; + use arrow_schema::{DataType, Field, Schema}; + use std::io::Cursor; + use std::sync::Arc; + + fn make_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let id_array = Arc::new(Int32Array::from(vec![1, 2])); + let name_array = Arc::new(StringArray::from(vec!["alpha", "beta"])); + RecordBatch::try_new(schema, vec![id_array, name_array]).unwrap() + } + + #[test] + fn image_serialization_sets_payloads() { + let content = serialize_image(vec![1, 2, 3], "image/png"); + assert_eq!(content.content_type, "image/png"); + assert_eq!(content.text_payload, None); + assert_eq!(content.binary_payload, Some(vec![1, 2, 3])); + } + + #[test] + fn dataframe_serialization_writes_ipc_stream() { + let batch = make_batch(); + let content = serialize_dataframe(std::slice::from_ref(&batch)).unwrap(); + assert_eq!(content.content_type, CONTENT_TYPE_ARROW_STREAM); + let bytes = content.binary_payload.expect("expected IPC payload"); + + let reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap(); + let batches: Vec = reader.map(|item| item.unwrap()).collect(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema(), batch.schema()); + assert_eq!(batches[0].num_rows(), batch.num_rows()); + } + + #[test] + fn dataframe_serialization_rejects_empty_batches() { + let err = serialize_dataframe(&[]).unwrap_err(); + assert!(matches!(err, ArrowError::InvalidArgumentError(_))); + } + + #[test] + fn dataframe_serialization_rejects_mismatched_schema() { + let batch = make_batch(); + let other_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let other_batch = + RecordBatch::try_new(other_schema, vec![Arc::new(Int32Array::from(vec![1, 2]))]) + .unwrap(); + + let err = serialize_dataframe(&[batch, other_batch]).unwrap_err(); + assert!(matches!(err, ArrowError::SchemaError(_))); + } +}