Skip to content

Commit 9331e01

Browse files
authored
feat(trogon-nats): extract release logic from CRON (#115)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 6ccd649 commit 9331e01

File tree

20 files changed

+2661
-16
lines changed

20 files changed

+2661
-16
lines changed

devops/docker/compose/compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: trogonai
22

33
services:
44
nats:
5-
image: nats:alpine
5+
image: nats:2.11-alpine
66
command:
77
- "--jetstream"
88
- "--store_dir=/data"

rsworkspace/crates/trogon-nats/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ edition = "2024"
77
workspace = true
88

99
[dependencies]
10-
async-nats = { workspace = true, features = ["ring", "nkeys", "jetstream", "object-store"] }
10+
async-nats = { workspace = true, features = ["ring", "nkeys", "jetstream", "kv", "object-store", "server_2_11"] }
1111
bytes = { workspace = true }
1212
futures = { workspace = true }
1313
opentelemetry = { workspace = true }

rsworkspace/crates/trogon-nats/src/jetstream/client.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use bytes::Bytes;
88

99
use super::message::{JsAck, JsAckWith, JsDoubleAck, JsDoubleAckWith, JsMessageRef};
1010
use super::traits::{
11-
JetStreamConsumer, JetStreamContext, JetStreamCreateConsumer, JetStreamGetStream,
12-
JetStreamPublisher,
11+
JetStreamConsumer, JetStreamContext, JetStreamCreateConsumer, JetStreamCreateKeyValue,
12+
JetStreamGetKeyValue, JetStreamGetStream, JetStreamPublisher,
1313
};
1414

1515
#[derive(Clone)]
@@ -58,6 +58,28 @@ impl JetStreamPublisher for NatsJetStreamClient {
5858
}
5959
}
6060

61+
impl JetStreamCreateKeyValue for NatsJetStreamClient {
62+
type Store = jetstream::kv::Store;
63+
64+
async fn create_key_value(
65+
&self,
66+
config: jetstream::kv::Config,
67+
) -> Result<Self::Store, async_nats::jetstream::context::CreateKeyValueError> {
68+
self.context.create_key_value(config).await
69+
}
70+
}
71+
72+
impl JetStreamGetKeyValue for NatsJetStreamClient {
73+
type Store = jetstream::kv::Store;
74+
75+
async fn get_key_value<T: Into<String> + Send>(
76+
&self,
77+
bucket: T,
78+
) -> Result<Self::Store, async_nats::jetstream::context::KeyValueError> {
79+
self.context.get_key_value(bucket).await
80+
}
81+
}
82+
6183
impl JsMessageRef for jetstream::Message {
6284
fn message(&self) -> &async_nats::Message {
6385
&self.message

rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs

Lines changed: 299 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use std::collections::VecDeque;
22
use std::sync::{Arc, Mutex};
3+
use std::time::Duration;
34

45
use async_nats::HeaderMap;
5-
use async_nats::jetstream::AckKind;
66
use async_nats::jetstream::consumer::pull;
7+
use async_nats::jetstream::context::{self, CreateKeyValueError, CreateKeyValueErrorKind};
8+
use async_nats::jetstream::kv;
79
use async_nats::jetstream::publish::PublishAck;
810
use async_nats::jetstream::stream;
11+
use async_nats::jetstream::{self, AckKind};
912
use async_nats::subject::ToSubject;
1013
use bytes::Bytes;
1114
use futures::channel::mpsc;
@@ -14,7 +17,9 @@ use futures::stream::BoxStream;
1417
use super::message::{JsAck, JsAckWith, JsDoubleAck, JsDoubleAckWith, JsMessageRef};
1518
use super::object_store::{ObjectStoreGet, ObjectStorePut};
1619
use super::traits::{
17-
JetStreamConsumer, JetStreamContext, JetStreamCreateConsumer, JetStreamGetStream,
20+
JetStreamConsumer, JetStreamContext, JetStreamCreateConsumer, JetStreamCreateKeyValue,
21+
JetStreamGetKeyValue, JetStreamGetStream, JetStreamKeyValueCreateWithTtl,
22+
JetStreamKeyValueDeleteExpectRevision, JetStreamKeyValueStatus, JetStreamKeyValueUpdate,
1823
JetStreamPublisher,
1924
};
2025
use crate::mocks::MockError;
@@ -300,6 +305,298 @@ impl JetStreamPublisher for MockJetStreamPublisher {
300305
}
301306
}
302307

308+
#[derive(Clone, Debug)]
309+
pub struct MockJetStreamKvStore {
310+
settings: Arc<Mutex<Result<MockKeyValueSettings, kv::StatusErrorKind>>>,
311+
create_with_ttl_result: Arc<Mutex<Result<u64, kv::CreateErrorKind>>>,
312+
update_result: Arc<Mutex<Result<u64, kv::UpdateErrorKind>>>,
313+
delete_result: Arc<Mutex<Result<(), kv::DeleteErrorKind>>>,
314+
create_with_ttl_calls: Arc<Mutex<CreateWithTtlCalls>>,
315+
update_calls: Arc<Mutex<UpdateCalls>>,
316+
delete_calls: Arc<Mutex<DeleteCalls>>,
317+
}
318+
319+
#[derive(Clone, Copy, Debug)]
320+
struct MockKeyValueSettings {
321+
history: i64,
322+
max_age: Duration,
323+
allow_message_ttl: bool,
324+
subject_delete_marker_ttl: Option<Duration>,
325+
}
326+
327+
type CreateWithTtlCalls = Vec<(String, Bytes, Duration)>;
328+
type UpdateCalls = Vec<(String, Bytes, u64)>;
329+
type DeleteCalls = Vec<(String, Option<u64>)>;
330+
331+
impl MockJetStreamKvStore {
332+
pub fn new() -> Self {
333+
Self {
334+
settings: Arc::new(Mutex::new(Ok(MockKeyValueSettings {
335+
history: 1,
336+
max_age: Duration::ZERO,
337+
allow_message_ttl: true,
338+
subject_delete_marker_ttl: None,
339+
}))),
340+
create_with_ttl_result: Arc::new(Mutex::new(Ok(1))),
341+
update_result: Arc::new(Mutex::new(Ok(1))),
342+
delete_result: Arc::new(Mutex::new(Ok(()))),
343+
create_with_ttl_calls: Arc::new(Mutex::new(Vec::new())),
344+
update_calls: Arc::new(Mutex::new(Vec::new())),
345+
delete_calls: Arc::new(Mutex::new(Vec::new())),
346+
}
347+
}
348+
349+
pub fn set_settings(
350+
&self,
351+
history: i64,
352+
max_age: Duration,
353+
allow_message_ttl: bool,
354+
subject_delete_marker_ttl: Option<Duration>,
355+
) {
356+
*self.settings.lock().unwrap() = Ok(MockKeyValueSettings {
357+
history,
358+
max_age,
359+
allow_message_ttl,
360+
subject_delete_marker_ttl,
361+
});
362+
}
363+
364+
pub fn fail_status(&self, kind: kv::StatusErrorKind) {
365+
*self.settings.lock().unwrap() = Err(kind);
366+
}
367+
368+
pub fn set_create_with_ttl_result(&self, result: Result<u64, kv::CreateErrorKind>) {
369+
*self.create_with_ttl_result.lock().unwrap() = result;
370+
}
371+
372+
pub fn set_update_result(&self, result: Result<u64, kv::UpdateErrorKind>) {
373+
*self.update_result.lock().unwrap() = result;
374+
}
375+
376+
pub fn set_delete_result(&self, result: Result<(), kv::DeleteErrorKind>) {
377+
*self.delete_result.lock().unwrap() = result;
378+
}
379+
380+
pub fn create_with_ttl_calls(&self) -> Vec<(String, Bytes, Duration)> {
381+
self.create_with_ttl_calls.lock().unwrap().clone()
382+
}
383+
384+
pub fn update_calls(&self) -> Vec<(String, Bytes, u64)> {
385+
self.update_calls.lock().unwrap().clone()
386+
}
387+
388+
pub fn delete_calls(&self) -> Vec<(String, Option<u64>)> {
389+
self.delete_calls.lock().unwrap().clone()
390+
}
391+
}
392+
393+
impl Default for MockJetStreamKvStore {
394+
fn default() -> Self {
395+
Self::new()
396+
}
397+
}
398+
399+
impl JetStreamKeyValueStatus for MockJetStreamKvStore {
400+
async fn status(&self) -> Result<async_nats::jetstream::kv::bucket::Status, kv::StatusError> {
401+
self.settings
402+
.lock()
403+
.unwrap()
404+
.clone()
405+
.map(|settings| status_from_settings("bucket", settings))
406+
.map_err(kv::StatusError::new)
407+
}
408+
}
409+
410+
impl JetStreamKeyValueCreateWithTtl for MockJetStreamKvStore {
411+
async fn create_with_ttl(
412+
&self,
413+
key: &str,
414+
value: Bytes,
415+
ttl: Duration,
416+
) -> Result<u64, kv::CreateError> {
417+
self.create_with_ttl_calls
418+
.lock()
419+
.unwrap()
420+
.push((key.to_string(), value, ttl));
421+
(*self.create_with_ttl_result.lock().unwrap()).map_err(kv::CreateError::new)
422+
}
423+
}
424+
425+
impl JetStreamKeyValueUpdate for MockJetStreamKvStore {
426+
async fn update(&self, key: &str, value: Bytes, revision: u64) -> Result<u64, kv::UpdateError> {
427+
self.update_calls
428+
.lock()
429+
.unwrap()
430+
.push((key.to_string(), value, revision));
431+
(*self.update_result.lock().unwrap()).map_err(kv::UpdateError::new)
432+
}
433+
}
434+
435+
impl JetStreamKeyValueDeleteExpectRevision for MockJetStreamKvStore {
436+
async fn delete_expect_revision(
437+
&self,
438+
key: &str,
439+
revision: Option<u64>,
440+
) -> Result<(), kv::DeleteError> {
441+
self.delete_calls
442+
.lock()
443+
.unwrap()
444+
.push((key.to_string(), revision));
445+
(*self.delete_result.lock().unwrap()).map_err(kv::DeleteError::new)
446+
}
447+
}
448+
449+
#[derive(Clone, Debug)]
450+
pub struct MockJetStreamKvClient {
451+
create_result: Arc<Mutex<MockCreateKeyValueResult>>,
452+
get_result: Arc<Mutex<MockGetKeyValueResult>>,
453+
create_configs: Arc<Mutex<Vec<kv::Config>>>,
454+
requested_buckets: Arc<Mutex<Vec<String>>>,
455+
}
456+
457+
#[derive(Clone, Debug)]
458+
enum MockCreateKeyValueResult {
459+
Ok(MockJetStreamKvStore),
460+
Err(CreateKeyValueErrorKind),
461+
AlreadyExists,
462+
}
463+
464+
#[derive(Clone, Debug)]
465+
enum MockGetKeyValueResult {
466+
Ok(MockJetStreamKvStore),
467+
Err(context::KeyValueErrorKind),
468+
}
469+
470+
impl MockJetStreamKvClient {
471+
pub fn new() -> Self {
472+
Self {
473+
create_result: Arc::new(Mutex::new(MockCreateKeyValueResult::Ok(
474+
MockJetStreamKvStore::new(),
475+
))),
476+
get_result: Arc::new(Mutex::new(MockGetKeyValueResult::Ok(
477+
MockJetStreamKvStore::new(),
478+
))),
479+
create_configs: Arc::new(Mutex::new(Vec::new())),
480+
requested_buckets: Arc::new(Mutex::new(Vec::new())),
481+
}
482+
}
483+
484+
pub fn set_create_result(&self, result: MockJetStreamKvStore) {
485+
*self.create_result.lock().unwrap() = MockCreateKeyValueResult::Ok(result);
486+
}
487+
488+
pub fn set_get_result(&self, result: MockJetStreamKvStore) {
489+
*self.get_result.lock().unwrap() = MockGetKeyValueResult::Ok(result);
490+
}
491+
492+
pub fn fail_create_already_exists(&self) {
493+
*self.create_result.lock().unwrap() = MockCreateKeyValueResult::AlreadyExists;
494+
}
495+
496+
pub fn fail_create(&self, kind: CreateKeyValueErrorKind) {
497+
*self.create_result.lock().unwrap() = MockCreateKeyValueResult::Err(kind);
498+
}
499+
500+
pub fn fail_get(&self, kind: context::KeyValueErrorKind) {
501+
*self.get_result.lock().unwrap() = MockGetKeyValueResult::Err(kind);
502+
}
503+
504+
pub fn create_configs(&self) -> Vec<kv::Config> {
505+
self.create_configs.lock().unwrap().clone()
506+
}
507+
508+
pub fn requested_buckets(&self) -> Vec<String> {
509+
self.requested_buckets.lock().unwrap().clone()
510+
}
511+
}
512+
513+
impl Default for MockJetStreamKvClient {
514+
fn default() -> Self {
515+
Self::new()
516+
}
517+
}
518+
519+
impl JetStreamCreateKeyValue for MockJetStreamKvClient {
520+
type Store = MockJetStreamKvStore;
521+
522+
async fn create_key_value(
523+
&self,
524+
config: kv::Config,
525+
) -> Result<Self::Store, CreateKeyValueError> {
526+
self.create_configs.lock().unwrap().push(config);
527+
match self.create_result.lock().unwrap().clone() {
528+
MockCreateKeyValueResult::Ok(store) => Ok(store),
529+
MockCreateKeyValueResult::Err(kind) => Err(CreateKeyValueError::new(kind)),
530+
MockCreateKeyValueResult::AlreadyExists => Err(CreateKeyValueError::with_source(
531+
CreateKeyValueErrorKind::BucketCreate,
532+
mock_stream_exists_error(),
533+
)),
534+
}
535+
}
536+
}
537+
538+
impl JetStreamGetKeyValue for MockJetStreamKvClient {
539+
type Store = MockJetStreamKvStore;
540+
541+
async fn get_key_value<T: Into<String> + Send>(
542+
&self,
543+
bucket: T,
544+
) -> Result<Self::Store, context::KeyValueError> {
545+
self.requested_buckets.lock().unwrap().push(bucket.into());
546+
match self.get_result.lock().unwrap().clone() {
547+
MockGetKeyValueResult::Ok(store) => Ok(store),
548+
MockGetKeyValueResult::Err(kind) => Err(context::KeyValueError::new(kind)),
549+
}
550+
}
551+
}
552+
553+
fn mock_stream_exists_error() -> context::CreateStreamError {
554+
let source: jetstream::Error = serde_json::from_str(
555+
r#"{"code":400,"err_code":10058,"description":"stream name already in use with a different configuration"}"#,
556+
)
557+
.unwrap();
558+
559+
context::CreateStreamError::new(context::CreateStreamErrorKind::JetStream(source))
560+
}
561+
562+
fn status_from_settings(
563+
bucket: &str,
564+
settings: MockKeyValueSettings,
565+
) -> async_nats::jetstream::kv::bucket::Status {
566+
let config = stream::Config {
567+
name: format!("KV_{bucket}"),
568+
max_messages_per_subject: settings.history,
569+
max_age: settings.max_age,
570+
allow_message_ttl: settings.allow_message_ttl,
571+
subject_delete_marker_ttl: settings.subject_delete_marker_ttl,
572+
..Default::default()
573+
};
574+
575+
let info: stream::Info = serde_json::from_value(serde_json::json!({
576+
"config": config,
577+
"created": "1970-01-01T00:00:00Z",
578+
"state": {
579+
"messages": 0_u64,
580+
"bytes": 0_u64,
581+
"first_seq": 0_u64,
582+
"first_ts": "1970-01-01T00:00:00Z",
583+
"last_seq": 0_u64,
584+
"last_ts": "1970-01-01T00:00:00Z",
585+
"consumer_count": 0_usize,
586+
"num_subjects": 0_u64
587+
},
588+
"cluster": null,
589+
"mirror": null,
590+
"sources": []
591+
}))
592+
.unwrap();
593+
594+
async_nats::jetstream::kv::bucket::Status {
595+
info,
596+
bucket: bucket.to_string(),
597+
}
598+
}
599+
303600
pub struct MockJetStreamConsumerFactory {
304601
consumers: Arc<Mutex<VecDeque<MockJetStreamConsumer>>>,
305602
get_stream_fail_at: Arc<Mutex<Option<u32>>>,

0 commit comments

Comments
 (0)