Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Key motivations inspired by the broader Lance roadmap<sup>[1](https://github.com

- Unified schema for agent messages (`ContextRecord`) with optional embeddings and metadata.
- Automatic versioning via Lance manifests with `checkout(version)` support.
- Remote persistence: point the store at `s3://` URIs with either AWS environment variables or explicit credentials/endpoint overrides.
- Python API (`lance_context.api.Context`) aligned with the Rust implementation.
- Integration tests that exercise real persistence, image serialization, and version rollbacks.

Expand Down Expand Up @@ -65,6 +66,17 @@ ctx.add("assistant", "Let me fetch suggestions…")
ctx.checkout(first_version)

print("Entries after checkout:", ctx.entries())

# Store context in S3 (e.g., for MinIO/moto test endpoints)
ctx = Context.create(
"s3://my-bucket/context.lance",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
region="us-east-1",
endpoint_url="http://localhost:9000",
allow_http=True,
)
# AWS_* environment variables work too—pass overrides only when you need custom endpoints.
```

### Rust
Expand Down
2 changes: 1 addition & 1 deletion crates/lance-context-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ mod store;

pub use context::{Context, ContextEntry, Snapshot};
pub use record::{ContextRecord, SearchResult, StateMetadata};
pub use store::ContextStore;
pub use store::{ContextStore, ContextStoreOptions};
78 changes: 65 additions & 13 deletions crates/lance-context-core/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
Expand All @@ -13,7 +14,8 @@ use arrow_array::{
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, TimeUnit};
use chrono::DateTime;
use futures::TryStreamExt;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use lance::{Error as LanceError, Result as LanceResult};

use crate::record::{ContextRecord, SearchResult, StateMetadata};
Expand All @@ -28,23 +30,32 @@ pub struct ContextStore {
dataset: Dataset,
}

/// Additional configuration when opening a [`ContextStore`].
#[derive(Debug, Clone, Default)]
pub struct ContextStoreOptions {
pub storage_options: Option<HashMap<String, String>>,
}

impl ContextStoreOptions {
#[must_use]
pub fn storage_options(&self) -> Option<HashMap<String, String>> {
self.storage_options.clone()
}
}

impl ContextStore {
/// Open an existing context dataset or create a new one with the project schema.
pub async fn open(uri: &str) -> LanceResult<Self> {
match Dataset::open(uri).await {
Self::open_with_options(uri, ContextStoreOptions::default()).await
}

/// Open a dataset with explicit object store configuration (e.g. S3 credentials).
pub async fn open_with_options(uri: &str, options: ContextStoreOptions) -> LanceResult<Self> {
let storage_options = options.storage_options();
match Self::load_with_options(uri, storage_options.clone()).await {
Ok(dataset) => Ok(Self { dataset }),
Err(LanceError::DatasetNotFound { .. }) => {
let schema = Arc::new(Self::schema());
let empty_batch = RecordBatch::new_empty(schema.clone());
let batches = RecordBatchIterator::new(
vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
schema.clone(),
);
let params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let dataset = Dataset::write(batches, uri, Some(params)).await?;
let dataset = Self::create_with_options(uri, storage_options).await?;
Ok(Self { dataset })
}
Err(err) => Err(err),
Expand Down Expand Up @@ -156,6 +167,47 @@ impl ContextStore {
])
}

async fn load_with_options(
uri: &str,
storage_options: Option<HashMap<String, String>>,
) -> LanceResult<Dataset> {
if let Some(options) = storage_options {
DatasetBuilder::from_uri(uri)
.with_storage_options(options)
.load()
.await
} else {
Dataset::open(uri).await
}
}

async fn create_with_options(
uri: &str,
storage_options: Option<HashMap<String, String>>,
) -> LanceResult<Dataset> {
let schema = Arc::new(Self::schema());
let empty_batch = RecordBatch::new_empty(schema.clone());
let batches = RecordBatchIterator::new(
vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
schema.clone(),
);

let mut params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};

if let Some(options) = storage_options {
let store_params = ObjectStoreParams {
storage_options: Some(options),
..Default::default()
};
params.store_params = Some(store_params);
}

Dataset::write(batches, uri, Some(params)).await
}

fn records_to_batch(entries: &[ContextRecord]) -> LanceResult<RecordBatch> {
let mut id_builder = StringBuilder::new();
let mut run_id_builder = StringBuilder::new();
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ requires = ["maturin>=1.4"]
build-backend = "maturin"

[project.optional-dependencies]
tests = ["pytest", "ruff"]
tests = ["pytest", "ruff", "moto[s3]", "boto3", "botocore"]
dev = ["ruff", "pyright"]

[tool.ruff]
Expand Down
56 changes: 52 additions & 4 deletions python/python/lance_context/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,60 @@ def _normalize_search_hit(raw: dict[str, Any]) -> dict[str, Any]:


class Context:
def __init__(self, uri: str) -> None:
self._inner = _Context.create(uri)
def __init__(
self,
uri: str,
*,
storage_options: dict[str, Any] | None = None,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
region: str | None = None,
endpoint_url: str | None = None,
allow_http: bool = False,
) -> None:
options = dict(storage_options or {})
if aws_access_key_id is not None:
options["aws_access_key_id"] = aws_access_key_id
if aws_secret_access_key is not None:
options["aws_secret_access_key"] = aws_secret_access_key
if aws_session_token is not None:
options["aws_session_token"] = aws_session_token
if region is not None:
options["aws_region"] = region
if endpoint_url is not None:
options["aws_endpoint_url"] = endpoint_url
if allow_http:
options["aws_allow_http"] = True

if options:
self._inner = _Context.create(uri, storage_options=options)
else:
self._inner = _Context.create(uri)

@classmethod
def create(cls, uri: str) -> Context:
return cls(uri)
def create(
cls,
uri: str,
*,
storage_options: dict[str, Any] | None = None,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
region: str | None = None,
endpoint_url: str | None = None,
allow_http: bool = False,
) -> Context:
return cls(
uri,
storage_options=storage_options,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region=region,
endpoint_url=endpoint_url,
allow_http=allow_http,
)

def uri(self) -> str:
return self._inner.uri()
Expand Down
55 changes: 52 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use chrono::{SecondsFormat, Utc};
Expand All @@ -8,7 +9,9 @@ use pyo3::IntoPyObject;
use tokio::runtime::Runtime;

use lance_context::serde::CONTENT_TYPE_TEXT;
use lance_context::{Context as RustContext, ContextRecord, ContextStore, SearchResult};
use lance_context::{
Context as RustContext, ContextRecord, ContextStore, ContextStoreOptions, SearchResult,
};

const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream";
const BINARY_PLACEHOLDER: &str = "[binary]";
Expand All @@ -26,13 +29,59 @@ struct Context {
run_id: String,
}

fn storage_options_from_dict<'py>(
dict: Option<&Bound<'py, PyDict>>,
) -> PyResult<Option<HashMap<String, String>>> {
let Some(dict) = dict else {
return Ok(None);
};

let mut options = HashMap::new();
for (key, value) in dict.iter() {
let key_str = key.extract::<String>()?;
if value.is_none() {
continue;
}
let string_value = if let Ok(boolean) = value.extract::<bool>() {
if boolean {
"true".to_string()
} else {
"false".to_string()
}
} else if let Ok(number) = value.extract::<i64>() {
number.to_string()
} else if let Ok(float_val) = value.extract::<f64>() {
float_val.to_string()
} else {
value.str()?.to_string()
};
options.insert(key_str, string_value);
}

if options.is_empty() {
Ok(None)
} else {
Ok(Some(options))
}
}

#[pymethods]
impl Context {
#[classmethod]
fn create(_cls: &Bound<'_, PyType>, uri: &str) -> PyResult<Self> {
#[pyo3(signature = (uri, *, storage_options=None))]
fn create(
_cls: &Bound<'_, PyType>,
uri: &str,
storage_options: Option<&Bound<'_, PyDict>>,
) -> PyResult<Self> {
let runtime = Arc::new(Runtime::new().map_err(to_py_err)?);

let options = ContextStoreOptions {
storage_options: storage_options_from_dict(storage_options)?,
};

let store = runtime
.block_on(ContextStore::open(uri))
.block_on(ContextStore::open_with_options(uri, options))
.map_err(to_py_err)?;
let run_id = new_run_id();
Ok(Self {
Expand Down
Loading
Loading