Skip to content

Commit 8c19950

Browse files
committed
feat: allow configuring daemon with extra interests
1 parent f28d1a4 commit 8c19950

3 files changed

Lines changed: 210 additions & 1 deletion

File tree

one/src/daemon.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{path::PathBuf, time::Duration};
22

33
use crate::{
4-
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts,
4+
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, startup, DBOpts,
55
DBOptsExperimental, Info, LogOpts, Network,
66
};
77
use anyhow::{anyhow, bail, Result};
@@ -274,6 +274,19 @@ pub struct DaemonOpts {
274274
default_value = "file://./pipeline"
275275
)]
276276
object_store_url: url::Url,
277+
278+
/// Comma-separated list of stream IDs or model IDs to register as interests on startup.
279+
/// The node will track streams implementing these models, and sync their data.
280+
///
281+
/// For this to work with user-defined models, the daemon also needs to track the
282+
/// metamodel: kh4q0ozorrgaq2mezktnrmdwleo1d
283+
#[arg(
284+
long,
285+
use_value_delimiter = true,
286+
value_delimiter = ',',
287+
env = "CERAMIC_ONE_EXTRA_INTERESTS"
288+
)]
289+
extra_interests: Vec<String>,
277290
}
278291

279292
fn spawn_database_optimizer(
@@ -447,6 +460,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
447460
let node_key = NodeKey::try_from_dir(opts.p2p_key_dir).await?;
448461
let node_id = node_key.id();
449462

463+
// Process initial interests if provided
464+
startup::process_extra_interests(&opts.extra_interests, &interest_svc, &network, &node_id)
465+
.await?;
466+
450467
// Register metrics for all components
451468
let recon_metrics = MetricsHandle::register(recon::Metrics::register);
452469
let peer_svc_store_metrics =

one/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod metrics;
88
mod migrations;
99
mod network;
1010
mod query;
11+
mod startup;
1112

1213
use anyhow::{anyhow, bail, Result};
1314
use ceramic_core::ssi::caip2::ChainId;

one/src/startup.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
//! Startup utilities for the Ceramic One daemon.
2+
3+
use anyhow::{anyhow, Result};
4+
use ceramic_api::InterestService as InterestServiceTrait;
5+
use ceramic_core::{EventId, Interest, NodeId, StreamId};
6+
use ceramic_interest_svc::InterestService;
7+
use std::str::FromStr;
8+
use std::sync::Arc;
9+
use tracing::{debug, info, warn};
10+
11+
/// Process initial interests by registering them with the interest service
12+
pub async fn process_extra_interests(
13+
extra_interests: &[String],
14+
interest_svc: &Arc<InterestService>,
15+
network: &ceramic_core::Network,
16+
node_id: &NodeId,
17+
) -> Result<()> {
18+
if extra_interests.is_empty() {
19+
return Ok(());
20+
}
21+
22+
info!("Processing {} extra interests", extra_interests.len());
23+
24+
for stream_id_str in extra_interests {
25+
let stream_id_str = stream_id_str.trim();
26+
if stream_id_str.is_empty() {
27+
continue;
28+
}
29+
30+
// Validate that the model stream ID is parseable
31+
let _stream_id = StreamId::from_str(stream_id_str)
32+
.map_err(|e| anyhow!("Invalid model ID '{}': {}", stream_id_str, e))?;
33+
34+
// Create an interest for the "model" separator key covering the full range for this stream
35+
// This follows the same pattern as the API endpoint /ceramic/interests/model/{stream_id}
36+
let stream_id_bytes = multibase::decode(stream_id_str)
37+
.map_err(|e| anyhow!("Failed to decode stream ID '{}': {}", stream_id_str, e))?
38+
.1;
39+
let start = EventId::builder()
40+
.with_network(network)
41+
.with_sep("model", &stream_id_bytes)
42+
.with_min_controller()
43+
.with_min_init()
44+
.with_min_event()
45+
.build_fencepost();
46+
let stop = EventId::builder()
47+
.with_network(network)
48+
.with_sep("model", &stream_id_bytes)
49+
.with_max_controller()
50+
.with_max_init()
51+
.with_max_event()
52+
.build_fencepost();
53+
54+
let interest = Interest::builder()
55+
.with_sep_key("model")
56+
.with_peer_id(&node_id.peer_id())
57+
.with_range((start.as_slice(), stop.as_slice()))
58+
.with_not_after(0)
59+
.build();
60+
61+
match interest_svc.insert(interest).await {
62+
Ok(was_inserted) => {
63+
if was_inserted {
64+
info!(
65+
"Successfully registered extra interest for model: {}",
66+
stream_id_str
67+
);
68+
} else {
69+
debug!(
70+
"Interest for model {} was already registered",
71+
stream_id_str
72+
);
73+
}
74+
}
75+
Err(e) => {
76+
warn!(
77+
"Failed to register initial interest for model {}: {}",
78+
stream_id_str, e
79+
);
80+
}
81+
}
82+
}
83+
84+
Ok(())
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use super::*;
90+
use ceramic_core::Network;
91+
use ceramic_sql::sqlite::SqlitePool;
92+
use recon::Store;
93+
use std::sync::Arc;
94+
95+
#[tokio::test]
96+
async fn test_process_extra_interests_empty() {
97+
let pool = SqlitePool::connect_in_memory().await.unwrap();
98+
let interest_svc = Arc::new(InterestService::new(pool));
99+
let network = Network::InMemory;
100+
let node_key = ceramic_core::NodeKey::random();
101+
let node_id = node_key.id();
102+
103+
// Test with empty interests
104+
let result = process_extra_interests(&[], &interest_svc, &network, &node_id).await;
105+
assert!(result.is_ok());
106+
}
107+
108+
#[tokio::test]
109+
async fn test_process_extra_interests_invalid_stream_id() {
110+
let pool = SqlitePool::connect_in_memory().await.unwrap();
111+
let interest_svc = Arc::new(InterestService::new(pool));
112+
let network = Network::InMemory;
113+
let node_key = ceramic_core::NodeKey::random();
114+
let node_id = node_key.id();
115+
116+
// Invalid stream ID - tests daemon-specific stream ID parsing and error handling
117+
let stream_ids = vec!["invalid-stream-id".to_string()];
118+
119+
let result = process_extra_interests(&stream_ids, &interest_svc, &network, &node_id).await;
120+
assert!(result.is_err());
121+
122+
// Verify the error message contains information about the invalid stream ID
123+
let error_msg = result.unwrap_err().to_string();
124+
assert!(error_msg.contains("invalid-stream-id"));
125+
}
126+
127+
#[tokio::test]
128+
async fn test_process_extra_interests_mixed_valid_invalid() {
129+
let pool = SqlitePool::connect_in_memory().await.unwrap();
130+
let interest_svc = Arc::new(InterestService::new(pool));
131+
let network = Network::InMemory;
132+
let node_key = ceramic_core::NodeKey::random();
133+
let node_id = node_key.id();
134+
135+
// Mix of valid and invalid stream IDs - tests early failure behavior
136+
let stream_ids = vec![
137+
"k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid
138+
"invalid-stream-id".to_string(), // Invalid
139+
];
140+
141+
let result = process_extra_interests(&stream_ids, &interest_svc, &network, &node_id).await;
142+
// Should fail on the first invalid ID
143+
assert!(result.is_err());
144+
}
145+
146+
#[tokio::test]
147+
async fn test_process_extra_interests_whitespace_handling() {
148+
let pool = SqlitePool::connect_in_memory().await.unwrap();
149+
let interest_svc = Arc::new(InterestService::new(pool));
150+
let network = Network::InMemory;
151+
let node_key = ceramic_core::NodeKey::random();
152+
let node_id = node_key.id();
153+
154+
// Test daemon-specific input sanitization logic
155+
let stream_ids = vec![
156+
" k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9 ".to_string(), // Valid with whitespace
157+
"".to_string(), // Empty string
158+
" ".to_string(), // Only whitespace
159+
];
160+
161+
let result = process_extra_interests(&stream_ids, &interest_svc, &network, &node_id).await;
162+
163+
// Should succeed and handle whitespace correctly
164+
assert!(result.is_ok());
165+
166+
let registered_interests = interest_svc.full_range().await.unwrap();
167+
assert_eq!(registered_interests.count(), 1);
168+
}
169+
170+
#[tokio::test]
171+
async fn test_process_extra_interests_multibase_decoding() {
172+
let pool = SqlitePool::connect_in_memory().await.unwrap();
173+
let interest_svc = Arc::new(InterestService::new(pool));
174+
let network = Network::InMemory;
175+
let node_key = ceramic_core::NodeKey::random();
176+
let node_id = node_key.id();
177+
178+
// Test daemon-specific multibase decoding logic
179+
let stream_ids = vec![
180+
"k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid multibase-encoded stream ID
181+
];
182+
183+
let result = process_extra_interests(&stream_ids, &interest_svc, &network, &node_id).await;
184+
185+
// Should succeed with proper multibase decoding
186+
assert!(result.is_ok());
187+
188+
let registered_interests = interest_svc.full_range().await.unwrap();
189+
assert_eq!(registered_interests.count(), 1);
190+
}
191+
}

0 commit comments

Comments
 (0)