Skip to content

Commit f4aebf1

Browse files
authored
Full implementation of object_store. (#20)
1 parent b435dc3 commit f4aebf1

File tree

6 files changed

+390
-9
lines changed

6 files changed

+390
-9
lines changed

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class PushConsumerConfig:
169169

170170
@final
171171
class MessagesIterator:
172-
def __aiter__(self) -> MessagesIterator: ...
172+
def __aiter__(self) -> Self: ...
173173
async def __anext__(self) -> JetStreamMessage: ...
174174
async def next(
175175
self,

python/natsrpy/_natsrpy_rs/js/object_store.pyi

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
from datetime import timedelta
2-
from typing import final
1+
from datetime import datetime, timedelta
2+
from typing import Any, final
33

44
from typing_extensions import Self, Writer
55

66
from .stream import Placement, StorageType
77

88
__all__ = [
9+
"ObjectInfo",
10+
"ObjectInfoIterator",
11+
"ObjectLink",
912
"ObjectStore",
1013
"ObjectStoreConfig",
1114
]
@@ -33,6 +36,33 @@ class ObjectStoreConfig:
3336
placement: Placement | None = None,
3437
) -> Self: ...
3538

39+
@final
40+
class ObjectLink:
41+
name: str | None
42+
bucket: str
43+
44+
@final
45+
class ObjectInfo:
46+
name: str
47+
description: str | None
48+
metadata: dict[str, str]
49+
headers: dict[str, Any]
50+
bucket: str
51+
nuid: str
52+
size: int
53+
chunks: int
54+
modified: datetime | None
55+
digest: str | None
56+
deleted: bool
57+
link: ObjectLink | None
58+
max_chunk_size: int | None
59+
60+
@final
61+
class ObjectInfoIterator:
62+
def __aiter__(self) -> Self: ...
63+
async def __anext__(self) -> ObjectInfo: ...
64+
async def next(self, timeout: float | timedelta | None = None) -> ObjectInfo: ...
65+
3666
@final
3767
class ObjectStore:
3868
async def get(
@@ -51,3 +81,17 @@ class ObjectStore:
5181
metadata: dict[str, str] | None = None,
5282
) -> None: ...
5383
async def delete(self, name: str) -> None: ...
84+
async def seal(self) -> None: ...
85+
async def get_info(self, name: str) -> ObjectInfo: ...
86+
async def watch(self, with_history: bool = False) -> ObjectInfoIterator: ...
87+
async def list(self) -> ObjectInfoIterator: ...
88+
async def link_bucket(self, src_bucket: str, dest: str) -> ObjectInfo: ...
89+
async def link_object(self, src: str, dest: str) -> ObjectInfo: ...
90+
async def update_metadata(
91+
self,
92+
name: str,
93+
new_name: str | None = None,
94+
description: str | None = None,
95+
headers: dict[str, Any] | None = None,
96+
metadata: dict[str, str] | None = None,
97+
) -> ObjectInfo: ...

src/exceptions/rust_err.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,20 @@ pub enum NatsrpyError {
8888
ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError),
8989
#[error(transparent)]
9090
ObjectStoreDeleteError(#[from] async_nats::jetstream::object_store::DeleteError),
91+
#[error(transparent)]
92+
ObjectStoreSealError(#[from] async_nats::jetstream::object_store::SealError),
93+
#[error(transparent)]
94+
ObjectStoreInfoError(#[from] async_nats::jetstream::object_store::InfoError),
95+
#[error(transparent)]
96+
ObjectStoreWatchError(#[from] async_nats::jetstream::object_store::WatchError),
97+
#[error(transparent)]
98+
ObjectStoreWatcherError(#[from] async_nats::jetstream::object_store::WatcherError),
99+
#[error(transparent)]
100+
ObjectStoreAddLinkError(#[from] async_nats::jetstream::object_store::AddLinkError),
101+
#[error(transparent)]
102+
ObjectStoreUpdateMetadataError(
103+
#[from] async_nats::jetstream::object_store::UpdateMetadataError,
104+
),
91105
}
92106

93107
impl From<NatsrpyError> for pyo3::PyErr {

src/js/object_store.rs

Lines changed: 264 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
1+
use futures_util::StreamExt;
12
use std::{collections::HashMap, sync::Arc, time::Duration};
23

34
use async_nats::HeaderMap;
4-
use pyo3::{Bound, Py, PyAny, Python, types::PyDict};
5-
use tokio::{io::AsyncReadExt, sync::RwLock};
5+
use pyo3::{
6+
Bound, Py, PyAny, PyRef, Python,
7+
types::{IntoPyDict, PyDateTime, PyDict},
8+
};
9+
use tokio::{
10+
io::AsyncReadExt,
11+
sync::{Mutex, RwLock},
12+
};
613

714
use crate::{
8-
exceptions::rust_err::NatsrpyResult,
15+
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
916
js::stream::{Placement, StorageType},
1017
utils::{
18+
futures::natsrpy_future_with_timeout,
1119
headers::NatsrpyHeadermapExt,
1220
natsrpy_future,
13-
py_types::{SendableValue, TimeValue},
21+
py_types::{SendableValue, TimeValue, ToPyDate},
22+
streamer::Streamer,
1423
},
1524
};
1625

@@ -27,6 +36,95 @@ pub struct ObjectStoreConfig {
2736
pub placement: Option<Placement>,
2837
}
2938

39+
#[pyo3::pyclass(from_py_object, get_all)]
40+
#[derive(Debug, Clone)]
41+
pub struct ObjectLink {
42+
pub name: Option<String>,
43+
pub bucket: String,
44+
}
45+
46+
#[pyo3::pyclass(get_all)]
47+
#[derive(Debug)]
48+
pub struct ObjectInfo {
49+
pub name: String,
50+
pub description: Option<String>,
51+
pub metadata: Py<PyDict>,
52+
pub headers: Py<PyDict>,
53+
pub bucket: String,
54+
pub nuid: String,
55+
pub size: usize,
56+
pub chunks: usize,
57+
pub modified: Option<Py<PyDateTime>>,
58+
pub digest: Option<String>,
59+
pub deleted: bool,
60+
61+
pub link: Option<ObjectLink>,
62+
pub max_chunk_size: Option<usize>,
63+
}
64+
65+
impl From<async_nats::jetstream::object_store::ObjectLink> for ObjectLink {
66+
fn from(value: async_nats::jetstream::object_store::ObjectLink) -> Self {
67+
Self {
68+
name: value.name,
69+
bucket: value.bucket,
70+
}
71+
}
72+
}
73+
74+
impl TryFrom<async_nats::jetstream::object_store::ObjectInfo> for ObjectInfo {
75+
type Error = NatsrpyError;
76+
77+
fn try_from(
78+
value: async_nats::jetstream::object_store::ObjectInfo,
79+
) -> Result<Self, Self::Error> {
80+
Ok(Self {
81+
name: value.name,
82+
description: value.description,
83+
metadata: Python::attach(|gil| {
84+
value.metadata.into_py_dict(gil).map(pyo3::Bound::unbind)
85+
})?,
86+
headers: value
87+
.headers
88+
// To PyResul<Bound<'_, PyDict>> and then to PyResul<Py<PyDict>>
89+
.map(|val| Python::attach(|gil| val.to_pydict(gil).map(pyo3::Bound::unbind)))
90+
.transpose()?
91+
.unwrap_or_else(|| Python::attach(|gil| PyDict::new(gil).unbind())),
92+
bucket: value.bucket,
93+
nuid: value.nuid,
94+
size: value.size,
95+
chunks: value.chunks,
96+
modified: value
97+
.modified
98+
// To PyResul<Bound<'_, PyDateTime>> and then to PyResul<Py<PyDateTime>>
99+
.map(|dt| Python::attach(|gil| dt.to_py_date(gil).map(pyo3::Bound::unbind)))
100+
.transpose()?,
101+
digest: value.digest,
102+
deleted: value.deleted,
103+
link: value
104+
.options
105+
.as_ref()
106+
.and_then(|opts| opts.link.as_ref().map(|link| link.clone().into())),
107+
max_chunk_size: value.options.and_then(|opts| opts.max_chunk_size),
108+
})
109+
}
110+
}
111+
112+
#[pyo3::pymethods]
113+
impl ObjectInfo {
114+
pub fn __str__(&self) -> String {
115+
format!(
116+
"ObjectInfo<name={:?}, bucket={:?}, size={}, modified={:?}, description={:?}>",
117+
self.name,
118+
self.bucket,
119+
self.size,
120+
self.modified
121+
.as_ref()
122+
.map_or_else(|| String::from("None"), ToString::to_string),
123+
self.description,
124+
)
125+
}
126+
}
127+
30128
impl From<ObjectStoreConfig> for async_nats::jetstream::object_store::Config {
31129
fn from(value: ObjectStoreConfig) -> Self {
32130
Self {
@@ -188,10 +286,171 @@ impl ObjectStore {
188286
Ok(())
189287
})
190288
}
289+
290+
#[pyo3(signature=(
291+
name,
292+
new_name=None,
293+
description=None,
294+
headers=None,
295+
metadata=None,
296+
))]
297+
pub fn update_metadata<'py>(
298+
&self,
299+
py: Python<'py>,
300+
name: String,
301+
new_name: Option<String>,
302+
description: Option<String>,
303+
headers: Option<Bound<'py, PyDict>>,
304+
metadata: Option<HashMap<String, String>>,
305+
) -> NatsrpyResult<Bound<'py, PyAny>> {
306+
let ctx_guard = self.object_store.clone();
307+
let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?;
308+
let meta = async_nats::jetstream::object_store::UpdateMetadata {
309+
name: new_name.unwrap_or_else(|| name.clone()),
310+
description,
311+
metadata: metadata.unwrap_or_default(),
312+
headers,
313+
};
314+
natsrpy_future(py, async move {
315+
ObjectInfo::try_from(ctx_guard.read().await.update_metadata(name, meta).await?)
316+
})
317+
}
318+
319+
pub fn seal<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
320+
let ctx_guard = self.object_store.clone();
321+
natsrpy_future(py, async move {
322+
ctx_guard.write().await.seal().await?;
323+
Ok(())
324+
})
325+
}
326+
327+
pub fn get_info<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
328+
let ctx_guard = self.object_store.clone();
329+
natsrpy_future(py, async move {
330+
let info = ctx_guard.write().await.info(name).await?;
331+
ObjectInfo::try_from(info)
332+
})
333+
}
334+
335+
#[pyo3(signature=(with_history=false))]
336+
pub fn watch<'py>(
337+
&self,
338+
py: Python<'py>,
339+
with_history: bool,
340+
) -> NatsrpyResult<Bound<'py, PyAny>> {
341+
let ctx_guard = self.object_store.clone();
342+
natsrpy_future(py, async move {
343+
let watcher = if with_history {
344+
ctx_guard.read().await.watch_with_history().await?
345+
} else {
346+
ctx_guard.read().await.watch().await?
347+
};
348+
Ok(ObjectInfoIterator::new(Streamer::new(watcher)))
349+
})
350+
}
351+
352+
pub fn list<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
353+
let ctx_guard = self.object_store.clone();
354+
natsrpy_future(py, async move {
355+
Ok(ObjectInfoIterator::new(Streamer::new(
356+
ctx_guard.read().await.list().await?,
357+
)))
358+
})
359+
}
360+
361+
pub fn link_bucket<'py>(
362+
&self,
363+
py: Python<'py>,
364+
src_bucket: String,
365+
dest: String,
366+
) -> NatsrpyResult<Bound<'py, PyAny>> {
367+
let ctx_guard = self.object_store.clone();
368+
natsrpy_future(py, async move {
369+
ObjectInfo::try_from(
370+
ctx_guard
371+
.read()
372+
.await
373+
.add_bucket_link(dest, src_bucket)
374+
.await?,
375+
)
376+
})
377+
}
378+
379+
pub fn link_object<'py>(
380+
&self,
381+
py: Python<'py>,
382+
src: String,
383+
dest: String,
384+
) -> NatsrpyResult<Bound<'py, PyAny>> {
385+
let ctx_guard = self.object_store.clone();
386+
natsrpy_future(py, async move {
387+
let target = ctx_guard.read().await.get(src).await?;
388+
ObjectInfo::try_from(ctx_guard.read().await.add_link(dest, &target.info).await?)
389+
})
390+
}
391+
}
392+
393+
#[pyo3::pyclass(from_py_object)]
394+
#[derive(Clone, Debug)]
395+
pub struct ObjectInfoIterator {
396+
streamer: Arc<
397+
Mutex<
398+
Streamer<
399+
Result<
400+
async_nats::jetstream::object_store::ObjectInfo,
401+
async_nats::jetstream::object_store::WatcherError,
402+
>,
403+
>,
404+
>,
405+
>,
406+
}
407+
408+
impl ObjectInfoIterator {
409+
#[must_use]
410+
pub fn new(
411+
streamer: Streamer<
412+
Result<
413+
async_nats::jetstream::object_store::ObjectInfo,
414+
async_nats::jetstream::object_store::WatcherError,
415+
>,
416+
>,
417+
) -> Self {
418+
Self {
419+
streamer: Arc::new(Mutex::new(streamer)),
420+
}
421+
}
422+
}
423+
424+
#[pyo3::pymethods]
425+
impl ObjectInfoIterator {
426+
#[must_use]
427+
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
428+
slf
429+
}
430+
431+
#[pyo3(signature=(timeout=None))]
432+
pub fn next<'py>(
433+
&self,
434+
py: Python<'py>,
435+
timeout: Option<TimeValue>,
436+
) -> NatsrpyResult<Bound<'py, PyAny>> {
437+
let ctx = self.streamer.clone();
438+
natsrpy_future_with_timeout(py, timeout, async move {
439+
let value = ctx.lock().await.next().await;
440+
match value {
441+
Some(info) => ObjectInfo::try_from(info?),
442+
None => Err(NatsrpyError::AsyncStopIteration),
443+
}
444+
})
445+
}
446+
447+
pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
448+
self.next(py, None)
449+
}
191450
}
192451

193452
#[pyo3::pymodule(submodule, name = "object_store")]
194453
pub mod pymod {
195454
#[pymodule_export]
196-
pub use super::{ObjectStore, ObjectStoreConfig};
455+
pub use super::{ObjectInfo, ObjectInfoIterator, ObjectLink, ObjectStore, ObjectStoreConfig};
197456
}

0 commit comments

Comments
 (0)