Skip to content

Commit a1d6a84

Browse files
authored
Added initial object-store implementation. (#12)
1 parent 4d76d59 commit a1d6a84

File tree

14 files changed

+360
-14
lines changed

14 files changed

+360
-14
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ classifiers = [
1414
"Topic :: System :: Networking",
1515
]
1616
dynamic = ["version"]
17+
dependencies = [
18+
"typing-extensions>=4.14.0",
19+
]
1720

1821
[[project.authors]]
1922
name = "Pavel Kirilin"

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import timedelta
22
from typing import Any
33

4-
from .managers import KVManager, StreamsManager
4+
from .managers import KVManager, ObjectStoreManager, StreamsManager
55

66
class JetStream:
77
async def publish(
@@ -17,6 +17,8 @@ class JetStream:
1717
def kv(self) -> KVManager: ...
1818
@property
1919
def streams(self) -> StreamsManager: ...
20+
@property
21+
def object_store(self) -> ObjectStoreManager: ...
2022

2123
class JetStreamMessage:
2224
@property

python/natsrpy/_natsrpy_rs/js/managers.pyi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ from .consumers import (
77
PushConsumerConfig,
88
)
99
from .kv import KeyValue, KVConfig
10+
from .object_store import ObjectStore, ObjectStoreConfig
1011
from .stream import Stream, StreamConfig
1112

1213
class StreamsManager:
@@ -28,3 +29,8 @@ class ConsumersManager:
2829
async def create(self, config: PullConsumerConfig) -> PullConsumer: ...
2930
@overload
3031
async def create(self, config: PushConsumerConfig) -> PushConsumer: ...
32+
33+
class ObjectStoreManager:
34+
async def create(self, config: ObjectStoreConfig) -> ObjectStore: ...
35+
async def get(self, bucket: str) -> ObjectStore: ...
36+
async def delete(self, bucket: str) -> None: ...
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from datetime import timedelta
2+
3+
from typing_extensions import Writer
4+
5+
from .stream import Placement, StorageType
6+
7+
class ObjectStoreConfig:
8+
bucket: str
9+
description: str | None
10+
max_age: timedelta
11+
max_bytes: int
12+
storage: StorageType
13+
num_replicas: int
14+
compression: bool
15+
placement: Placement | None
16+
17+
def __init__(
18+
self,
19+
bucket: str,
20+
description: str | None = None,
21+
max_age: float | timedelta | None = None,
22+
max_bytes: int | None = None,
23+
storage: StorageType | None = None,
24+
num_replicas: int | None = None,
25+
compression: bool | None = None,
26+
placement: Placement | None = None,
27+
) -> None: ...
28+
29+
class ObjectStore:
30+
async def get(
31+
self,
32+
name: str,
33+
writer: Writer[bytes],
34+
chunk_size: int | None = 24576, # 24MB
35+
) -> None: ...
36+
async def put(
37+
self,
38+
name: str,
39+
value: bytes | str,
40+
chunk_size: int = 24576, # 24MB
41+
description: str | None = None,
42+
headers: dict[str, str | list[str]] | None = None,
43+
metadata: dict[str, str] | None = None,
44+
) -> None: ...
45+
async def delete(self, name: str) -> None: ...

python/natsrpy/js/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
ReplayPolicy,
1111
)
1212
from natsrpy.js.kv import KeyValue, KVConfig
13+
from natsrpy.js.object_store import ObjectStore, ObjectStoreConfig
1314
from natsrpy.js.stream import (
1415
Compression,
1516
ConsumerLimits,
@@ -37,6 +38,8 @@
3738
"JetStream",
3839
"KVConfig",
3940
"KeyValue",
41+
"ObjectStore",
42+
"ObjectStoreConfig",
4043
"PersistenceMode",
4144
"Placement",
4245
"PriorityPolicy",

python/natsrpy/js/object_store.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from natsrpy._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig
2+
3+
__all__ = [
4+
"ObjectStore",
5+
"ObjectStoreConfig",
6+
]

src/exceptions/rust_err.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ pub type NatsrpyResult<T> = Result<T, NatsrpyError>;
66

77
#[derive(thiserror::Error, Debug)]
88
pub enum NatsrpyError {
9+
#[error(transparent)]
10+
StdIOError(#[from] std::io::Error),
911
#[error("NATS session error: {0}")]
1012
SessionError(String),
1113
#[error("Invalid arguemnt: {0}")]
@@ -72,6 +74,14 @@ pub enum NatsrpyError {
7274
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),
7375
#[error(transparent)]
7476
ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError),
77+
#[error(transparent)]
78+
ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError),
79+
#[error(transparent)]
80+
ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError),
81+
#[error(transparent)]
82+
ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError),
83+
#[error(transparent)]
84+
ObjectStoreDeleteError(#[from] async_nats::jetstream::object_store::DeleteError),
7585
}
7686

7787
impl From<NatsrpyError> for pyo3::PyErr {

src/js/counters.rs

Whitespace-only changes.

src/js/jetstream.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::sync::RwLock;
66

77
use crate::{
88
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
9-
js::managers::{kv::KVManager, streams::StreamsManager},
9+
js::managers::{kv::KVManager, object_store::ObjectStoreManager, streams::StreamsManager},
1010
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue},
1111
};
1212

@@ -26,6 +26,24 @@ impl JetStream {
2626

2727
#[pyo3::pymethods]
2828
impl JetStream {
29+
#[getter]
30+
#[must_use]
31+
pub fn kv(&self) -> KVManager {
32+
KVManager::new(self.ctx.clone())
33+
}
34+
35+
#[getter]
36+
#[must_use]
37+
pub fn streams(&self) -> StreamsManager {
38+
StreamsManager::new(self.ctx.clone())
39+
}
40+
41+
#[getter]
42+
#[must_use]
43+
pub fn object_store(&self) -> ObjectStoreManager {
44+
ObjectStoreManager::new(self.ctx.clone())
45+
}
46+
2947
#[pyo3(signature = (
3048
subject,
3149
payload,
@@ -66,16 +84,4 @@ impl JetStream {
6684
Ok(())
6785
})
6886
}
69-
70-
#[getter]
71-
#[must_use]
72-
pub fn kv(&self) -> KVManager {
73-
KVManager::new(self.ctx.clone())
74-
}
75-
76-
#[getter]
77-
#[must_use]
78-
pub fn streams(&self) -> StreamsManager {
79-
StreamsManager::new(self.ctx.clone())
80-
}
8187
}

src/js/managers/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod consumers;
22
pub mod kv;
3+
pub mod object_store;
34
pub mod streams;
45

56
#[pyo3::pymodule(submodule, name = "managers")]
@@ -9,5 +10,7 @@ pub mod pymod {
910
#[pymodule_export]
1011
use super::kv::KVManager;
1112
#[pymodule_export]
13+
use super::object_store::ObjectStoreManager;
14+
#[pymodule_export]
1215
use super::streams::StreamsManager;
1316
}

0 commit comments

Comments
 (0)