Skip to content

Commit 11e0532

Browse files
zhangyue19921010zhangyue19921010
andauthored
feat: expose base scoped store bindings to python (#6547)
Tested: Python can write to different S3 buckets within the same multi-base dataset, using different storage configs for each base.configs. ``` from pathlib import Path import shutil import lance import pyarrow as pa from lance import DatasetBasePath def main(): path0 = "s3://ceshi/las/lance/multi-base-write2" path1 = "s3://zy-test-lance/zy-test-lance/multi-base-write2/" storage_options_0 = { "access_key_id": "", "secret_access_key": "==", "aws_endpoint": "", "aws_region": "", "virtual_hosted_style_request": "true", } storage_options_1 = { "access_key_id": "", "secret_access_key": "==", "aws_region": "", "aws_endpoint": "", "virtual_hosted_style_request": "true" } base_store_params = { path1: storage_options_1, } local_rows = pa.Table.from_pylist( [ {"name": "Alice", "age": 20}, {"name": "Jack", "age": 20}, ] ) s3_rows = pa.Table.from_pylist( [ {"name": "Bob", "age": 30}, ] ) ds = lance.write_dataset( local_rows, path0, mode="create", initial_bases=[ DatasetBasePath(path0, name="s3", is_dataset_root=True), DatasetBasePath(path1, name="s3"), ], target_bases=["s3"], base_store_params=base_store_params, storage_options=storage_options_0, ) ds = lance.write_dataset( s3_rows, ds, mode="append", target_bases=["s3"], base_store_params=base_store_params, storage_options=storage_options_0, ) if __name__ == "__main__": main() ``` --------- Co-authored-by: zhangyue19921010 <zhangyue.1010@bytedance.com>
1 parent d88b5cd commit 11e0532

3 files changed

Lines changed: 86 additions & 3 deletions

File tree

python/python/lance/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def dataset(
101101
session: Optional[Session] = None,
102102
namespace_client: Optional[LanceNamespace] = None,
103103
table_id: Optional[List[str]] = None,
104+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
104105
) -> LanceDataset:
105106
"""
106107
Opens the Lance dataset from the address specified.
@@ -169,6 +170,12 @@ def dataset(
169170
table_id : optional, List[str]
170171
The table identifier when using a namespace (e.g., ["my_table"]).
171172
Must be provided together with `namespace_client`. Cannot be used with `uri`.
173+
base_store_params : dict of str to dict, optional
174+
Runtime-only object store parameters keyed by base path URI. Each key
175+
is a base path URI (e.g., "s3://bucket/path") and each value is a dict
176+
of storage options (credentials, endpoint, etc.) for that base. When a base
177+
has no explicit entry here, the top-level ``storage_options`` is
178+
used as a fallback.
172179
173180
Notes
174181
-----
@@ -244,6 +251,7 @@ def dataset(
244251
namespace_client=namespace_client,
245252
table_id=table_id,
246253
namespace_client_managed_versioning=namespace_client_managed_versioning,
254+
base_store_params=base_store_params,
247255
)
248256
if version is None and asof is not None:
249257
ts_cutoff = sanitize_ts(asof)
@@ -270,6 +278,7 @@ def dataset(
270278
namespace_client=namespace_client,
271279
table_id=table_id,
272280
namespace_client_managed_versioning=namespace_client_managed_versioning,
281+
base_store_params=base_store_params,
273282
)
274283
else:
275284
return ds

python/python/lance/dataset.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,10 +583,12 @@ def __init__(
583583
namespace_client: Optional[Any] = None,
584584
table_id: Optional[List[str]] = None,
585585
namespace_client_managed_versioning: bool = False,
586+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
586587
):
587588
uri = os.fspath(uri) if isinstance(uri, Path) else uri
588589
self._uri = uri
589590
self._storage_options = storage_options
591+
self._base_store_params = base_store_params
590592

591593
# Handle deprecation warning for index_cache_size
592594
if index_cache_size is not None:
@@ -621,6 +623,7 @@ def __init__(
621623
namespace_client=namespace_client,
622624
table_id=table_id,
623625
namespace_client_managed_versioning=namespace_client_managed_versioning,
626+
base_store_params=base_store_params,
624627
)
625628
self._default_scan_options = default_scan_options
626629
self._read_params = read_params
@@ -634,6 +637,7 @@ def __deserialize__(
634637
manifest: bytes,
635638
default_scan_options: Optional[Dict[str, Any]],
636639
read_params: Optional[Dict[str, Any]] = None,
640+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
637641
):
638642
return cls(
639643
uri,
@@ -642,6 +646,7 @@ def __deserialize__(
642646
serialized_manifest=manifest,
643647
default_scan_options=default_scan_options,
644648
read_params=read_params,
649+
base_store_params=base_store_params,
645650
)
646651

647652
def __reduce__(self):
@@ -652,6 +657,7 @@ def __reduce__(self):
652657
self._ds.serialized_manifest(),
653658
self._default_scan_options,
654659
self._read_params,
660+
self._base_store_params,
655661
)
656662

657663
def __getstate__(self):
@@ -662,29 +668,34 @@ def __getstate__(self):
662668
self._ds.serialized_manifest(),
663669
self._default_scan_options,
664670
self._read_params,
671+
self._base_store_params,
665672
)
666673

667674
def __setstate__(self, state):
668-
# Handle backwards compatibility - state may not have read_params
675+
# Handle backwards compatibility - state may not have read_params or
676+
# base_store_params.
669677
(
670678
self._uri,
671679
self._storage_options,
672680
version,
673681
manifest,
674682
default_scan_options,
675-
*rest, # Capture optional read_params
683+
*rest, # Capture optional read_params and base_store_params.
676684
) = state
677685
read_params = rest[0] if rest else None
686+
base_store_params = rest[1] if len(rest) > 1 else None
678687
self._ds = _Dataset(
679688
self._uri,
680689
version,
681690
storage_options=self._storage_options,
682691
manifest=manifest,
683692
default_scan_options=default_scan_options,
684693
read_params=read_params,
694+
base_store_params=base_store_params,
685695
)
686696
self._default_scan_options = default_scan_options
687697
self._read_params = read_params
698+
self._base_store_params = base_store_params
688699
self._namespace_client = None
689700
self._table_id = None
690701
self._namespace_client_managed_versioning = False
@@ -693,6 +704,7 @@ def __copy__(self):
693704
ds = LanceDataset.__new__(LanceDataset)
694705
ds._uri = self._uri
695706
ds._storage_options = self._storage_options
707+
ds._base_store_params = self._base_store_params
696708
ds._namespace_client = self._namespace_client
697709
ds._table_id = self._table_id
698710
ds._namespace_client_managed_versioning = (
@@ -792,6 +804,7 @@ def create_branch(
792804
ds._ds = new_ds
793805
ds._uri = new_ds.uri
794806
ds._storage_options = self._storage_options
807+
ds._base_store_params = self._base_store_params
795808
ds._namespace_client = self._namespace_client
796809
ds._table_id = self._table_id
797810
ds._default_scan_options = self._default_scan_options
@@ -3902,6 +3915,15 @@ def session(self) -> Session:
39023915
"""
39033916
return self._ds.session()
39043917

3918+
@staticmethod
3919+
def _inherit_base_store_params(
3920+
dataset_or_uri: Union[str, Path, LanceDataset, None],
3921+
base_store_params: Optional[Dict[str, Dict[str, str]]],
3922+
) -> Optional[Dict[str, Dict[str, str]]]:
3923+
if base_store_params is None and isinstance(dataset_or_uri, LanceDataset):
3924+
return dataset_or_uri._base_store_params
3925+
return base_store_params
3926+
39053927
@staticmethod
39063928
def _commit(
39073929
base_uri: Union[str, Path],
@@ -3931,6 +3953,7 @@ def commit(
39313953
namespace_client: Optional["LanceNamespace"] = None,
39323954
table_id: Optional[List[str]] = None,
39333955
namespace_client_managed_versioning: bool = False,
3956+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
39343957
) -> LanceDataset:
39353958
"""Create a new version of dataset
39363959
@@ -4001,6 +4024,8 @@ def commit(
40014024
table_id : List[str], optional
40024025
The table identifier within the namespace (e.g., ["workspace", "table"]).
40034026
Must be provided together with namespace_client.
4027+
base_store_params : dict of str to dict, optional
4028+
Runtime-only object store parameters keyed by base path URI.
40044029
40054030
Returns
40064031
-------
@@ -4028,6 +4053,10 @@ def commit(
40284053
2 3 c
40294054
3 4 d
40304055
"""
4056+
base_store_params = LanceDataset._inherit_base_store_params(
4057+
base_uri, base_store_params
4058+
)
4059+
40314060
if isinstance(base_uri, Path):
40324061
base_uri = str(base_uri)
40334062
elif isinstance(base_uri, LanceDataset):
@@ -4101,6 +4130,7 @@ def commit(
41014130

41024131
ds = LanceDataset.__new__(LanceDataset)
41034132
ds._storage_options = storage_options
4133+
ds._base_store_params = base_store_params
41044134
ds._namespace_client = namespace_client
41054135
ds._table_id = table_id
41064136
ds._namespace_client_managed_versioning = namespace_client_managed_versioning
@@ -4119,6 +4149,7 @@ def commit_batch(
41194149
enable_v2_manifest_paths: Optional[bool] = None,
41204150
detached: Optional[bool] = False,
41214151
max_retries: int = 20,
4152+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
41224153
) -> BulkCommitResult:
41234154
"""Create a new version of dataset with multiple transactions.
41244155
@@ -4161,6 +4192,8 @@ def commit_batch(
41614192
the future.
41624193
max_retries : int
41634194
The maximum number of retries to perform when committing the dataset.
4195+
base_store_params : dict of str to dict, optional
4196+
Runtime-only object store parameters keyed by base path URI.
41644197
41654198
Returns
41664199
-------
@@ -4170,6 +4203,10 @@ def commit_batch(
41704203
merged: Transaction
41714204
The merged transaction that was applied to the dataset.
41724205
"""
4206+
base_store_params = LanceDataset._inherit_base_store_params(
4207+
dest, base_store_params
4208+
)
4209+
41734210
if isinstance(dest, Path):
41744211
dest = str(dest)
41754212
elif isinstance(dest, LanceDataset):
@@ -4198,6 +4235,7 @@ def commit_batch(
41984235
ds._ds = new_ds
41994236
ds._uri = new_ds.uri
42004237
ds._storage_options = storage_options
4238+
ds._base_store_params = base_store_params
42014239
ds._namespace_client = None
42024240
ds._table_id = None
42034241
ds._default_scan_options = None
@@ -6361,6 +6399,7 @@ def write_dataset(
63616399
transaction_properties: Optional[Dict[str, str]] = None,
63626400
initial_bases: Optional[List[DatasetBasePath]] = None,
63636401
target_bases: Optional[List[str]] = None,
6402+
base_store_params: Optional[Dict[str, Dict[str, str]]] = None,
63646403
external_blob_mode: Literal["reference", "ingest"] = "reference",
63656404
allow_external_blob_outside_bases: bool = False,
63666405
blob_pack_file_size_threshold: Optional[int] = None,
@@ -6458,6 +6497,12 @@ def write_dataset(
64586497
64596498
**CREATE mode**: References must match bases in `initial_bases`
64606499
**APPEND/OVERWRITE modes**: References must match bases in the existing manifest
6500+
base_store_params : dict of str to dict, optional
6501+
Runtime-only object store parameters keyed by base path URI. Each key
6502+
is a base path URI (e.g., "s3://bucket/path") and each value is a dict
6503+
of storage options (credentials, endpoint, etc.) for that base. These
6504+
are not persisted to the manifest. When a base has no explicit entry
6505+
here, the top-level ``storage_options`` is used as a fallback.
64616506
external_blob_mode: {"reference", "ingest"}, default "reference"
64626507
How external blob URIs are handled on write.
64636508
@@ -6578,6 +6623,7 @@ def write_dataset(
65786623
reader = _coerce_reader(data_obj, schema)
65796624
_validate_schema(reader.schema)
65806625
# TODO add support for passing in LanceDataset and LanceScanner here
6626+
base_store_params = LanceDataset._inherit_base_store_params(uri, base_store_params)
65816627

65826628
# Merge properties and commit_message with priority to commit_message
65836629
merged_properties = _merge_message_to_properties(
@@ -6598,6 +6644,7 @@ def write_dataset(
65986644
"transaction_properties": merged_properties,
65996645
"initial_bases": initial_bases,
66006646
"target_bases": target_bases,
6647+
"base_store_params": base_store_params,
66016648
"external_blob_mode": external_blob_mode,
66026649
"allow_external_blob_outside_bases": allow_external_blob_outside_bases,
66036650
"blob_pack_file_size_threshold": blob_pack_file_size_threshold,
@@ -6628,6 +6675,7 @@ def write_dataset(
66286675

66296676
ds = LanceDataset.__new__(LanceDataset)
66306677
ds._storage_options = storage_options
6678+
ds._base_store_params = base_store_params
66316679
ds._namespace_client = namespace_client
66326680
ds._table_id = table_id
66336681
ds._namespace_client_managed_versioning = namespace_client_managed_versioning

python/src/dataset.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ impl Dataset {
513513
#[allow(clippy::too_many_arguments)]
514514
#[allow(deprecated)]
515515
#[new]
516-
#[pyo3(signature=(uri, version=None, block_size=None, index_cache_size=None, metadata_cache_size=None, commit_handler=None, storage_options=None, manifest=None, metadata_cache_size_bytes=None, index_cache_size_bytes=None, read_params=None, session=None, namespace_client=None, table_id=None, namespace_client_managed_versioning=false))]
516+
#[pyo3(signature=(uri, version=None, block_size=None, index_cache_size=None, metadata_cache_size=None, commit_handler=None, storage_options=None, manifest=None, metadata_cache_size_bytes=None, index_cache_size_bytes=None, read_params=None, session=None, namespace_client=None, table_id=None, namespace_client_managed_versioning=false, base_store_params=None))]
517517
fn new(
518518
py: Python,
519519
uri: String,
@@ -531,6 +531,7 @@ impl Dataset {
531531
namespace_client: Option<&Bound<'_, PyAny>>,
532532
table_id: Option<Vec<String>>,
533533
namespace_client_managed_versioning: bool,
534+
base_store_params: Option<HashMap<String, HashMap<String, String>>>,
534535
) -> PyResult<Self> {
535536
let mut params = ReadParams::default();
536537
if let Some(metadata_cache_size_bytes) = metadata_cache_size_bytes {
@@ -653,6 +654,17 @@ impl Dataset {
653654
}
654655
}
655656

657+
if let Some(base_store_params) = base_store_params {
658+
for (base_path, opts) in base_store_params {
659+
let accessor = Arc::new(StorageOptionsAccessor::with_static_options(opts));
660+
let store_params = ObjectStoreParams {
661+
storage_options_accessor: Some(accessor),
662+
..Default::default()
663+
};
664+
builder = builder.with_base_store_params(base_path, store_params);
665+
}
666+
}
667+
656668
let dataset = rt().block_on(Some(py), builder.load())?;
657669

658670
match dataset {
@@ -3620,6 +3632,20 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult<Option<WritePar
36203632
p = p.with_target_base_names_or_paths(target_bases_list);
36213633
}
36223634

3635+
// Handle base_store_params: per-base storage options keyed by base path URI
3636+
if let Some(base_store_params) =
3637+
get_dict_opt::<HashMap<String, HashMap<String, String>>>(options, "base_store_params")?
3638+
{
3639+
for (base_path, opts) in base_store_params {
3640+
let accessor = Arc::new(StorageOptionsAccessor::with_static_options(opts));
3641+
let store_params = ObjectStoreParams {
3642+
storage_options_accessor: Some(accessor),
3643+
..Default::default()
3644+
};
3645+
p = p.with_base_store_params(base_path, store_params);
3646+
}
3647+
}
3648+
36233649
if let Some(allow_external) =
36243650
get_dict_opt::<bool>(options, "allow_external_blob_outside_bases")?
36253651
{

0 commit comments

Comments
 (0)