1+ use std:: collections:: HashMap ;
12use std:: sync:: Arc ;
23
34use arrow_array:: builder:: {
@@ -13,7 +14,8 @@ use arrow_array::{
1314use arrow_schema:: { ArrowError , DataType , Field , FieldRef , Schema , TimeUnit } ;
1415use chrono:: DateTime ;
1516use futures:: TryStreamExt ;
16- use lance:: dataset:: { Dataset , WriteMode , WriteParams } ;
17+ use lance:: dataset:: { builder:: DatasetBuilder , Dataset , WriteMode , WriteParams } ;
18+ use lance:: io:: ObjectStoreParams ;
1719use lance:: { Error as LanceError , Result as LanceResult } ;
1820
1921use crate :: record:: { ContextRecord , SearchResult , StateMetadata } ;
@@ -28,23 +30,32 @@ pub struct ContextStore {
2830 dataset : Dataset ,
2931}
3032
33+ /// Additional configuration when opening a [`ContextStore`].
34+ #[ derive( Debug , Clone , Default ) ]
35+ pub struct ContextStoreOptions {
36+ pub storage_options : Option < HashMap < String , String > > ,
37+ }
38+
39+ impl ContextStoreOptions {
40+ #[ must_use]
41+ pub fn storage_options ( & self ) -> Option < HashMap < String , String > > {
42+ self . storage_options . clone ( )
43+ }
44+ }
45+
3146impl ContextStore {
3247 /// Open an existing context dataset or create a new one with the project schema.
3348 pub async fn open ( uri : & str ) -> LanceResult < Self > {
34- match Dataset :: open ( uri) . await {
49+ Self :: open_with_options ( uri, ContextStoreOptions :: default ( ) ) . await
50+ }
51+
52+ /// Open a dataset with explicit object store configuration (e.g. S3 credentials).
53+ pub async fn open_with_options ( uri : & str , options : ContextStoreOptions ) -> LanceResult < Self > {
54+ let storage_options = options. storage_options ( ) ;
55+ match Self :: load_with_options ( uri, storage_options. clone ( ) ) . await {
3556 Ok ( dataset) => Ok ( Self { dataset } ) ,
3657 Err ( LanceError :: DatasetNotFound { .. } ) => {
37- let schema = Arc :: new ( Self :: schema ( ) ) ;
38- let empty_batch = RecordBatch :: new_empty ( schema. clone ( ) ) ;
39- let batches = RecordBatchIterator :: new (
40- vec ! [ Ok :: <RecordBatch , ArrowError >( empty_batch) ] . into_iter ( ) ,
41- schema. clone ( ) ,
42- ) ;
43- let params = WriteParams {
44- mode : WriteMode :: Create ,
45- ..Default :: default ( )
46- } ;
47- let dataset = Dataset :: write ( batches, uri, Some ( params) ) . await ?;
58+ let dataset = Self :: create_with_options ( uri, storage_options) . await ?;
4859 Ok ( Self { dataset } )
4960 }
5061 Err ( err) => Err ( err) ,
@@ -156,6 +167,47 @@ impl ContextStore {
156167 ] )
157168 }
158169
170+ async fn load_with_options (
171+ uri : & str ,
172+ storage_options : Option < HashMap < String , String > > ,
173+ ) -> LanceResult < Dataset > {
174+ if let Some ( options) = storage_options {
175+ DatasetBuilder :: from_uri ( uri)
176+ . with_storage_options ( options)
177+ . load ( )
178+ . await
179+ } else {
180+ Dataset :: open ( uri) . await
181+ }
182+ }
183+
184+ async fn create_with_options (
185+ uri : & str ,
186+ storage_options : Option < HashMap < String , String > > ,
187+ ) -> LanceResult < Dataset > {
188+ let schema = Arc :: new ( Self :: schema ( ) ) ;
189+ let empty_batch = RecordBatch :: new_empty ( schema. clone ( ) ) ;
190+ let batches = RecordBatchIterator :: new (
191+ vec ! [ Ok :: <RecordBatch , ArrowError >( empty_batch) ] . into_iter ( ) ,
192+ schema. clone ( ) ,
193+ ) ;
194+
195+ let mut params = WriteParams {
196+ mode : WriteMode :: Create ,
197+ ..Default :: default ( )
198+ } ;
199+
200+ if let Some ( options) = storage_options {
201+ let store_params = ObjectStoreParams {
202+ storage_options : Some ( options) ,
203+ ..Default :: default ( )
204+ } ;
205+ params. store_params = Some ( store_params) ;
206+ }
207+
208+ Dataset :: write ( batches, uri, Some ( params) ) . await
209+ }
210+
159211 fn records_to_batch ( entries : & [ ContextRecord ] ) -> LanceResult < RecordBatch > {
160212 let mut id_builder = StringBuilder :: new ( ) ;
161213 let mut run_id_builder = StringBuilder :: new ( ) ;
0 commit comments