Skip to content

Commit 1f6ea28

Browse files
committed
feat: allow configuring daemon with initial interests
1 parent f28d1a4 commit 1f6ea28

3 files changed

Lines changed: 202 additions & 1 deletion

File tree

one/src/daemon.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{path::PathBuf, time::Duration};
22

33
use crate::{
4-
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, DBOpts,
4+
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, startup, DBOpts,
55
DBOptsExperimental, Info, LogOpts, Network,
66
};
77
use anyhow::{anyhow, bail, Result};
@@ -274,6 +274,16 @@ pub struct DaemonOpts {
274274
default_value = "file://./pipeline"
275275
)]
276276
object_store_url: url::Url,
277+
278+
/// Comma-separated list of stream IDs or model IDs to register as initial interests on startup.
279+
/// The node will track these streams and sync their data.
280+
#[arg(
281+
long,
282+
use_value_delimiter = true,
283+
value_delimiter = ',',
284+
env = "CERAMIC_ONE_INITIAL_INTERESTS"
285+
)]
286+
initial_interests: Vec<String>,
277287
}
278288

279289
fn spawn_database_optimizer(
@@ -447,6 +457,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
447457
let node_key = NodeKey::try_from_dir(opts.p2p_key_dir).await?;
448458
let node_id = node_key.id();
449459

460+
// Process initial interests if provided
461+
startup::process_initial_interests(&opts.initial_interests, &interest_svc, &network, &node_id)
462+
.await?;
463+
450464
// Register metrics for all components
451465
let recon_metrics = MetricsHandle::register(recon::Metrics::register);
452466
let peer_svc_store_metrics =

one/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod metrics;
88
mod migrations;
99
mod network;
1010
mod query;
11+
mod startup;
1112

1213
use anyhow::{anyhow, bail, Result};
1314
use ceramic_core::ssi::caip2::ChainId;

one/src/startup.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
//! Startup utilities for the Ceramic One daemon.
2+
3+
use anyhow::{anyhow, Result};
4+
use ceramic_api::InterestService as InterestServiceTrait;
5+
use ceramic_core::{EventId, Interest, NodeId, StreamId};
6+
use ceramic_interest_svc::InterestService;
7+
use std::str::FromStr;
8+
use std::sync::Arc;
9+
use tracing::{debug, info, warn};
10+
11+
/// Process initial interests by registering them with the interest service
12+
pub async fn process_initial_interests(
13+
initial_interests: &[String],
14+
interest_svc: &Arc<InterestService>,
15+
network: &ceramic_core::Network,
16+
node_id: &NodeId,
17+
) -> Result<()> {
18+
if initial_interests.is_empty() {
19+
return Ok(());
20+
}
21+
22+
info!("Processing {} initial interests", initial_interests.len());
23+
24+
for stream_id_str in initial_interests {
25+
let stream_id_str = stream_id_str.trim();
26+
if stream_id_str.is_empty() {
27+
continue;
28+
}
29+
30+
// Validate that the model stream ID is parseable
31+
let _stream_id = StreamId::from_str(stream_id_str)
32+
.map_err(|e| anyhow!("Invalid model ID '{}': {}", stream_id_str, e))?;
33+
34+
// Create an interest for the "model" separator key covering the full range for this stream
35+
// This follows the same pattern as the API endpoint /ceramic/interests/model/{stream_id}
36+
let stream_id_bytes = multibase::decode(stream_id_str)
37+
.map_err(|e| anyhow!("Failed to decode stream ID '{}': {}", stream_id_str, e))?
38+
.1;
39+
let start = EventId::builder()
40+
.with_network(network)
41+
.with_sep("model", &stream_id_bytes)
42+
.with_min_controller()
43+
.with_min_init()
44+
.with_min_event()
45+
.build_fencepost();
46+
let stop = EventId::builder()
47+
.with_network(network)
48+
.with_sep("model", &stream_id_bytes)
49+
.with_max_controller()
50+
.with_max_init()
51+
.with_max_event()
52+
.build_fencepost();
53+
54+
let interest = Interest::builder()
55+
.with_sep_key("model")
56+
.with_peer_id(&node_id.peer_id())
57+
.with_range((start.as_slice(), stop.as_slice()))
58+
.with_not_after(0)
59+
.build();
60+
61+
match interest_svc.insert(interest).await {
62+
Ok(was_inserted) => {
63+
if was_inserted {
64+
info!(
65+
"Successfully registered initial interest for stream: {}",
66+
stream_id_str
67+
);
68+
} else {
69+
debug!(
70+
"Interest for stream {} was already registered",
71+
stream_id_str
72+
);
73+
}
74+
}
75+
Err(e) => {
76+
warn!(
77+
"Failed to register initial interest for stream {}: {}",
78+
stream_id_str, e
79+
);
80+
}
81+
}
82+
}
83+
84+
Ok(())
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use super::*;
90+
use ceramic_core::Network;
91+
use ceramic_sql::sqlite::SqlitePool;
92+
use std::sync::Arc;
93+
94+
#[tokio::test]
95+
async fn test_process_initial_interests_empty() {
96+
let pool = SqlitePool::connect_in_memory().await.unwrap();
97+
let interest_svc = Arc::new(InterestService::new(pool));
98+
let network = Network::InMemory;
99+
let node_key = ceramic_core::NodeKey::random();
100+
let node_id = node_key.id();
101+
102+
// Test with empty interests
103+
let result = process_initial_interests(&[], &interest_svc, &network, &node_id).await;
104+
assert!(result.is_ok());
105+
}
106+
107+
#[tokio::test]
108+
async fn test_process_initial_interests_invalid_stream_id() {
109+
let pool = SqlitePool::connect_in_memory().await.unwrap();
110+
let interest_svc = Arc::new(InterestService::new(pool));
111+
let network = Network::InMemory;
112+
let node_key = ceramic_core::NodeKey::random();
113+
let node_id = node_key.id();
114+
115+
// Invalid stream ID - tests daemon-specific stream ID parsing and error handling
116+
let stream_ids = vec!["invalid-stream-id".to_string()];
117+
118+
let result =
119+
process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await;
120+
assert!(result.is_err());
121+
122+
// Verify the error message contains information about the invalid stream ID
123+
let error_msg = result.unwrap_err().to_string();
124+
assert!(error_msg.contains("invalid-stream-id"));
125+
}
126+
127+
#[tokio::test]
128+
async fn test_process_initial_interests_mixed_valid_invalid() {
129+
let pool = SqlitePool::connect_in_memory().await.unwrap();
130+
let interest_svc = Arc::new(InterestService::new(pool));
131+
let network = Network::InMemory;
132+
let node_key = ceramic_core::NodeKey::random();
133+
let node_id = node_key.id();
134+
135+
// Mix of valid and invalid stream IDs - tests early failure behavior
136+
let stream_ids = vec![
137+
"k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid
138+
"invalid-stream-id".to_string(), // Invalid
139+
];
140+
141+
let result =
142+
process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await;
143+
// Should fail on the first invalid ID
144+
assert!(result.is_err());
145+
}
146+
147+
#[tokio::test]
148+
async fn test_process_initial_interests_whitespace_handling() {
149+
let pool = SqlitePool::connect_in_memory().await.unwrap();
150+
let interest_svc = Arc::new(InterestService::new(pool));
151+
let network = Network::InMemory;
152+
let node_key = ceramic_core::NodeKey::random();
153+
let node_id = node_key.id();
154+
155+
// Test daemon-specific input sanitization logic
156+
let stream_ids = vec![
157+
" k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9 ".to_string(), // Valid with whitespace
158+
"".to_string(), // Empty string
159+
" ".to_string(), // Only whitespace
160+
];
161+
162+
let result =
163+
process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await;
164+
// Should succeed and handle whitespace correctly
165+
assert!(result.is_ok());
166+
}
167+
168+
#[tokio::test]
169+
async fn test_process_initial_interests_multibase_decoding() {
170+
let pool = SqlitePool::connect_in_memory().await.unwrap();
171+
let interest_svc = Arc::new(InterestService::new(pool));
172+
let network = Network::InMemory;
173+
let node_key = ceramic_core::NodeKey::random();
174+
let node_id = node_key.id();
175+
176+
// Test daemon-specific multibase decoding logic
177+
let stream_ids = vec![
178+
"k2t6wz4ylx0qr6v7dvbczbxqy7pqjb0879qx930c1e27gacg3r8sllonqt4xx9".to_string(), // Valid multibase-encoded stream ID
179+
];
180+
181+
let result =
182+
process_initial_interests(&stream_ids, &interest_svc, &network, &node_id).await;
183+
// Should succeed with proper multibase decoding
184+
assert!(result.is_ok());
185+
}
186+
}

0 commit comments

Comments
 (0)