From 106e6a19dca388bf90e57acc8b335298efc045db Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 21 May 2025 22:49:35 +0200 Subject: [PATCH 1/5] refactor: switch to irpc --- Cargo.lock | 109 ++++++----- Cargo.toml | 6 +- src/client.rs | 474 +++++++++++------------------------------------- src/lib.rs | 4 +- src/main.rs | 2 +- src/protocol.rs | 136 +++++++++----- 6 files changed, 259 insertions(+), 472 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2083fae4..422072ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -290,15 +290,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -797,18 +788,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "educe" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" -dependencies = [ - "enum-ordinalize", - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "elliptic-curve" version = "0.13.8" @@ -852,26 +831,6 @@ dependencies = [ "syn 2.0.101", ] -[[package]] -name = "enum-ordinalize" -version = "4.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" -dependencies = [ - "enum-ordinalize-derive", -] - -[[package]] -name = "enum-ordinalize-derive" -version = "4.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "enumflags2" version = "0.7.11" @@ -1974,15 +1933,17 @@ dependencies = [ "iroh-blobs", "iroh-gossip", "iroh-metrics", + "irpc", + "irpc-derive", + "irpc-iroh", "n0-future", "rand 0.8.5", "rcan", "serde", "ssh-key", "strum 0.27.1", + "thiserror 2.0.12", "tokio", - "tokio-serde", - "tokio-util", "tracing", "uuid", ] @@ -2088,6 +2049,53 @@ dependencies = [ "z32", ] +[[package]] +name = "irpc" +version = "0.2.3" +source = "git+https://github.com/n0-computer/irpc?branch=Frando/manual-accept-with-auth#4e138054cc634d9b4730bae68a29aff46b85b06a" +dependencies = [ + "anyhow", + "futures-buffered", + "futures-util", + "iroh-quinn", + "n0-future", + "postcard", + "rcgen", + "rustls", + "serde", + "smallvec", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "irpc-derive" +version = "0.2.3" +source = "git+https://github.com/n0-computer/irpc?branch=Frando/manual-accept-with-auth#4e138054cc634d9b4730bae68a29aff46b85b06a" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "irpc-iroh" +version = "0.2.3" +source = "git+https://github.com/n0-computer/irpc?branch=Frando/manual-accept-with-auth#4e138054cc634d9b4730bae68a29aff46b85b06a" +dependencies = [ + "anyhow", + "getrandom 0.3.2", + "iroh", + "irpc", + "n0-future", + "postcard", + "serde", + "tokio", + "tracing", +] + [[package]] name = "itoa" version = "1.0.15" @@ -4260,21 +4268,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-serde" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf600e7036b17782571dd44fa0a5cea3c82f60db5137f774a325a76a0d6852b" -dependencies = [ - "bincode", - "bytes", - "educe", - "futures-core", - "futures-sink", - "pin-project", - "serde", -] - [[package]] name = "tokio-stream" version = "0.1.17" diff --git a/Cargo.toml b/Cargo.toml index 0b2a5231..db61b368 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,9 @@ rust-version = "1.81" anyhow = "1.0.95" derive_more = { version = "2.0.1", features = ["from"] } ed25519-dalek = "2.1.1" +irpc = { git = "https://github.com/n0-computer/irpc", branch = "Frando/manual-accept-with-auth" } +irpc-derive = { git = "https://github.com/n0-computer/irpc", branch = "Frando/manual-accept-with-auth" } +irpc-iroh = { git = "https://github.com/n0-computer/irpc", branch = "Frando/manual-accept-with-auth" } iroh = "0.35" iroh-blobs = "0.35" iroh-gossip = { version = "0.35", default-features = false } @@ -24,9 +27,8 @@ rcan = { git = "https://github.com/n0-computer/rcan", branch = "main" } serde = { version = "1.0.217", features = ["derive"] } ssh-key = { version = "0.6.7", features = ["ed25519"] } strum = { version = "0.27.1", features = ["derive"] } +thiserror = "2.0.12" tokio = "1.43.0" -tokio-serde = { version = "0.9.0", features = ["bincode"] } -tokio-util = { version = "0.7.13", features = ["codec"] } tracing = "0.1.41" uuid = { version = "1.12.1", features = ["v4", "serde"] } diff --git a/src/client.rs b/src/client.rs index 5650cf92..a9848a1e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,32 +1,29 @@ use std::{path::Path, time::Duration}; -use anyhow::{anyhow, ensure, Context, Result}; -use iroh::{ - endpoint::{RecvStream, SendStream}, - Endpoint, NodeAddr, NodeId, -}; +use anyhow::{anyhow, ensure, Result}; +use iroh::{Endpoint, NodeAddr, NodeId}; use iroh_blobs::{ticket::BlobTicket, BlobFormat, Hash}; use iroh_gossip::proto::TopicId; use iroh_metrics::{MetricsSource, Registry}; -use n0_future::{task::AbortOnDropHandle, SinkExt, StreamExt}; +use irpc_iroh::IrohRemoteConnection; +use n0_future::task::AbortOnDropHandle; use rand::Rng; use rcan::Rcan; -use tokio::sync::{mpsc, oneshot}; -use tokio_serde::formats::Bincode; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use tracing::{debug, warn}; +use tracing::warn; use uuid::Uuid; use crate::{ caps::Caps, - protocol::{ClientMessage, ServerMessage, ALPN}, + protocol::{ + Auth, DeleteTopic, GetTag, N0desClient, Ping, PutBlob, PutMetrics, PutTopic, RemoteError, + ALPN, + }, }; #[derive(Debug)] pub struct Client { - sender: mpsc::Sender, - _actor_task: AbortOnDropHandle<()>, - cap: Rcan, + client: N0desClient, + _metrics_task: Option>, } /// Constructs an IPS client @@ -91,74 +88,79 @@ impl ClientBuilder { } /// Create a new client, connected to the provide service node - pub async fn build(self, remote: impl Into) -> Result { - let cap = self.cap.context("missing capability")?; - - let remote_addr = remote.into(); - let connection = self.endpoint.connect(remote_addr.clone(), ALPN).await?; - - let (send_stream, recv_stream) = connection.open_bi().await?; - - // Delimit frames using a length header - let length_delimited_read = FramedRead::new(recv_stream, LengthDelimitedCodec::new()); - let length_delimited_write = FramedWrite::new(send_stream, LengthDelimitedCodec::new()); - - // Deserialize frames - let reader = tokio_serde::Framed::new( - length_delimited_read, - Bincode::::default(), - ); - - let writer = tokio_serde::Framed::new( - length_delimited_write, - Bincode::::default(), - ); - - let (internal_sender, internal_receiver) = mpsc::channel(64); - - let actor = Actor { - metrics: None, - endpoint: self.endpoint, - reader, - writer, - internal_receiver, - internal_sender: internal_sender.clone(), - session_id: Uuid::new_v4(), - }; - let enable_metrics = self.enable_metrics; - let run_handle = tokio::task::spawn(async move { - actor.run(enable_metrics).await; + pub async fn build(self, remote: impl Into) -> Result { + let cap = self.cap.ok_or(BuildError::MissingCapability)?; + let conn = IrohRemoteConnection::new(self.endpoint.clone(), remote.into(), ALPN.to_vec()); + let client = N0desClient::boxed(conn); + + // If auth fails, the connection is aborted. + let () = client.rpc(Auth { caps: cap }).await?; + + let metrics_task = self.enable_metrics.map(|interval| { + AbortOnDropHandle::new(n0_future::task::spawn( + MetricsTask { + client: client.clone(), + session_id: Uuid::new_v4(), + endpoint: self.endpoint.clone(), + } + .run(interval), + )) }); - let actor_task = AbortOnDropHandle::new(run_handle); - let mut this = Client { - cap, - sender: internal_sender, - _actor_task: actor_task, - }; + Ok(Client { + client, + _metrics_task: metrics_task, + }) + } +} - this.authenticate().await?; +#[derive(thiserror::Error, Debug)] +pub enum BuildError { + #[error("Missing capability")] + MissingCapability, + #[error("Unauthorized")] + Unauthorized, + #[error("Remote error: {0}")] + Remote(#[from] RemoteError), + #[error("Connection error: {0}")] + Rpc(irpc::Error), +} - Ok(this) +impl From for BuildError { + fn from(value: irpc::Error) -> Self { + match value { + irpc::Error::Request(irpc::RequestError::Connection( + iroh::endpoint::ConnectionError::ApplicationClosed(frame), + )) if frame.error_code == 401u32.into() => Self::Unauthorized, + value @ _ => Self::Rpc(value), + } } } +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Remote error: {0}")] + Remote(#[from] RemoteError), + #[error("Connection error: {0}")] + Rpc(#[from] irpc::Error), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + impl Client { pub fn builder(endpoint: &Endpoint) -> ClientBuilder { ClientBuilder::new(endpoint) } - /// Trigger the auth handshake with the server - async fn authenticate(&mut self) -> Result<()> { - let (s, r) = oneshot::channel(); - self.sender - .send(ActorMessage::Auth { - rcan: self.cap.clone(), - s, - }) - .await?; - r.await??; - Ok(()) + /// Pings the remote node. + pub async fn ping(&mut self) -> Result<(), Error> { + let req = rand::thread_rng().gen(); + let pong = self.client.rpc(Ping { req }).await?; + if pong.req == req { + Ok(()) + } else { + Err(Error::Other(anyhow!("unexpected pong response"))) + } } /// Transfer the blob from the local iroh node to the service node. @@ -168,32 +170,16 @@ impl Client { hash: Hash, format: BlobFormat, name: String, - ) -> Result<()> { + ) -> Result<(), Error> { let ticket = BlobTicket::new(node.into(), hash, format)?; - - let (s, r) = oneshot::channel(); - self.sender - .send(ActorMessage::PutBlob { ticket, name, s }) - .await?; - r.await??; - Ok(()) - } - - /// Pings the remote node. - pub async fn ping(&mut self) -> Result<()> { - let (s, r) = oneshot::channel(); - let req = rand::thread_rng().gen(); - self.sender.send(ActorMessage::Ping { req, s }).await?; - r.await??; + self.client.rpc(PutBlob { name, ticket }).await??; Ok(()) } /// Get the `Hash` behind the tag, if available. - pub async fn get_tag(&mut self, name: String) -> Result { - let (s, r) = oneshot::channel(); - self.sender.send(ActorMessage::GetTag { name, s }).await?; - let res = r.await??; - Ok(res) + pub async fn get_tag(&mut self, name: String) -> Result, Error> { + let maybe_hash = self.client.rpc(GetTag { name }).await??; + Ok(maybe_hash) } /// Create a gossip topic. @@ -202,299 +188,57 @@ impl Client { topic: TopicId, label: String, bootstrap: Vec, - ) -> Result<()> { - let (s, r) = oneshot::channel(); - self.sender - .send(ActorMessage::PutTopic { - topic, + ) -> Result<(), Error> { + self.client + .rpc(PutTopic { + topic: *topic.as_bytes(), label, bootstrap, - s, }) - .await?; - r.await??; + .await??; Ok(()) } /// Delete a gossip topic. - pub async fn delete_gossip_topic(&mut self, topic: TopicId) -> Result<()> { - let (s, r) = oneshot::channel(); - self.sender - .send(ActorMessage::DeleteTopic { topic, s }) - .await?; - r.await??; + pub async fn delete_gossip_topic(&mut self, topic: TopicId) -> Result<(), Error> { + self.client + .rpc(DeleteTopic { + topic: *topic.as_bytes(), + }) + .await??; Ok(()) } } -struct Actor { - metrics: Option, - endpoint: Endpoint, - reader: tokio_serde::Framed< - FramedRead, - ClientMessage, - ServerMessage, - Bincode, - >, - writer: tokio_serde::Framed< - FramedWrite, - ClientMessage, - ServerMessage, - Bincode, - >, - internal_receiver: mpsc::Receiver, - internal_sender: mpsc::Sender, +struct MetricsTask { + client: N0desClient, session_id: Uuid, + endpoint: Endpoint, } -#[allow(clippy::large_enum_variant)] -enum ActorMessage { - Auth { - rcan: Rcan, - s: oneshot::Sender>, - }, - PutBlob { - ticket: BlobTicket, - name: String, - s: oneshot::Sender>, - }, - Ping { - req: [u8; 32], - s: oneshot::Sender>, - }, - PutMetrics { - encoded: String, - session_id: Uuid, - s: oneshot::Sender>, - }, - GetTag { - name: String, - s: oneshot::Sender>, - }, - PutTopic { - topic: iroh_gossip::proto::TopicId, - label: String, - bootstrap: Vec, - s: oneshot::Sender>, - }, - DeleteTopic { - topic: iroh_gossip::proto::TopicId, - s: oneshot::Sender>, - }, -} - -impl Actor { - async fn run(mut self, enable_metrics: Option) { - if enable_metrics.is_some() { - let mut registry = Registry::default(); - registry.register_all(self.endpoint.metrics()); - self.metrics = Some(registry); - } - let metrics_time = enable_metrics.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24)); - let mut metrics_timer = tokio::time::interval(metrics_time); +impl MetricsTask { + async fn run(self, interval: Duration) { + let mut registry = Registry::default(); + registry.register_all(self.endpoint.metrics()); + let mut metrics_timer = tokio::time::interval(interval); loop { - tokio::select! { - biased; - msg = self.internal_receiver.recv() => { - match msg { - Some(server_msg) => { - self.handle_message(server_msg).await; - } - None => { - break; - } - } - } - _ = metrics_timer.tick(), if enable_metrics.is_some() => { - debug!("metrics_timer::tick()"); - self.send_metrics().await; - } + metrics_timer.tick().await; + if let Err(err) = self.send_metrics(®istry).await { + warn!("failed to push metrics: {:#?}", err); } } - - debug!("shutting down"); } - async fn handle_message(&mut self, msg: ActorMessage) { - match msg { - ActorMessage::Auth { rcan, s } => { - if let Err(err) = self.writer.send(ServerMessage::Auth(rcan)).await { - s.send(Err(err.into())).ok(); - return; - } - - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::AuthResponse(None) => Ok(()), - ClientMessage::AuthResponse(Some(err)) => { - Err(anyhow!("failed to authenticate: {}", err)) - } - _ => Err(anyhow!("unexpected message from server: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("auth: failed to receive response: {:?}", err)), - None => Err(anyhow!("auth: connection closed")), - }; - s.send(response).ok(); - } - ActorMessage::PutBlob { ticket, name, s } => { - if let Err(err) = self - .writer - .send(ServerMessage::PutBlob { name, ticket }) - .await - { - s.send(Err(err.into())).ok(); - return; - } - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::PutBlobResponse(None) => Ok(()), - ClientMessage::PutBlobResponse(Some(err)) => { - Err(anyhow!("upload failed: {}", err)) - } - _ => Err(anyhow!("unexpected message from server: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)), - None => Err(anyhow!("connection closed")), - }; - s.send(response).ok(); - } - ActorMessage::GetTag { name, s } => { - if let Err(err) = self.writer.send(ServerMessage::GetTag { name }).await { - s.send(Err(err.into())).ok(); - return; - }; - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::GetTagResponse(maybe_hash) => match maybe_hash { - Some(hash) => Ok(hash), - None => Err(anyhow!("blob not found")), - }, - _ => Err(anyhow!("unexpected response: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)), - None => Err(anyhow!("connection closed")), - }; - s.send(response).ok(); - } - ActorMessage::PutMetrics { - encoded, - session_id, - s, - } => { - let response = self - .writer - .send(ServerMessage::PutMetrics { - encoded, - session_id, - }) - .await; - // we don't expect a response - s.send(response.map_err(Into::into)).ok(); - } - ActorMessage::Ping { req, s } => { - if let Err(err) = self.writer.send(ServerMessage::Ping { req }).await { - s.send(Err(err.into())).ok(); - return; - } - - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::Pong { req: req_back } => { - if req_back != req { - Err(anyhow!("unexpected pong response")) - } else { - Ok(()) - } - } - _ => Err(anyhow!("unexpected message from server: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)), - None => Err(anyhow!("connection closed")), - }; - s.send(response).ok(); - } - ActorMessage::PutTopic { - topic, - label, - bootstrap, - s, - } => { - if let Err(err) = self - .writer - .send(ServerMessage::PutTopic { - topic: *topic.as_bytes(), - label, - bootstrap, - }) - .await - { - s.send(Err(err.into())).ok(); - return; - } - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::PutTopicResponse(None) => Ok(()), - ClientMessage::PutTopicResponse(Some(err)) => { - Err(anyhow!("put topic failed: {}", err)) - } - _ => Err(anyhow!("unexpected message from server: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)), - None => Err(anyhow!("connection closed")), - }; - s.send(response).ok(); - } - ActorMessage::DeleteTopic { topic, s } => { - if let Err(err) = self - .writer - .send(ServerMessage::DeleteTopic { - topic: *topic.as_bytes(), - }) - .await - { - s.send(Err(err.into())).ok(); - return; - } - let response = match self.reader.next().await { - Some(Ok(msg)) => match msg { - ClientMessage::DeleteTopicResponse(None) => Ok(()), - ClientMessage::DeleteTopicResponse(Some(err)) => { - Err(anyhow!("delete topic failed: {}", err)) - } - _ => Err(anyhow!("unexpected message from server: {:?}", msg)), - }, - Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)), - None => Err(anyhow!("connection closed")), - }; - s.send(response).ok(); - } - } - } - - async fn send_metrics(&mut self) { - if let Some(registry) = &self.metrics { - let dump = registry - .encode_openmetrics_to_string() - .expect("this never fails"); - - let (s, r) = oneshot::channel(); - if let Err(err) = self - .internal_sender - .send(ActorMessage::PutMetrics { - encoded: dump, - session_id: self.session_id, - s, - }) - .await - { - warn!("failed to send internal message: {:?}", err); - } - // spawn a task, to not block the run loop - tokio::task::spawn(async move { - let res = r.await; - debug!("metrics sent: {:?}", res); - }); - } + async fn send_metrics(&self, registry: &iroh_metrics::Registry) -> Result<()> { + let dump = registry + .encode_openmetrics_to_string() + .expect("this never fails"); + let req = PutMetrics { + session_id: self.session_id, + encoded: dump, + }; + self.client.rpc(req).await??; + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index a1bc23a9..73d9115a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ mod client; -mod protocol; pub mod caps; +pub mod protocol; pub use self::{ client::{Client, ClientBuilder}, - protocol::{ClientMessage, ServerMessage, ALPN}, + protocol::ALPN, }; diff --git a/src/main.rs b/src/main.rs index eceac445..0418ef27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,7 +69,7 @@ pub async fn main() -> Result<()> { println!("downloaded blob: {:?}", blob); - let server_hash = rpc_client.get_tag(name).await?; + let server_hash = rpc_client.get_tag(name).await?.unwrap(); assert_eq!(server_hash, client_blob.hash); println!("waiting for Ctrl+C.."); diff --git a/src/protocol.rs b/src/protocol.rs index 3e8329b3..77a26978 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,5 +1,8 @@ +use anyhow::Result; use iroh::NodeId; use iroh_blobs::{ticket::BlobTicket, Hash}; +use irpc::{channel::oneshot, Service}; +use irpc_derive::rpc_requests; use rcan::Rcan; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -8,50 +11,95 @@ use crate::caps::Caps; pub const ALPN: &[u8] = b"/iroh/n0des/1"; +pub type N0desClient = irpc::Client; + +#[derive(Debug, Clone, Copy)] +pub struct N0desService; + +impl Service for N0desService {} + +#[rpc_requests(N0desService, message = N0desMessage)] +#[derive(Debug, Serialize, Deserialize)] +pub enum N0desProtocol { + #[rpc(tx=oneshot::Sender<()>)] + Auth(Auth), + #[rpc(tx=oneshot::Sender>)] + PutBlob(PutBlob), + #[rpc(tx=oneshot::Sender>>)] + GetTag(GetTag), + #[rpc(tx=oneshot::Sender>)] + PutTopic(PutTopic), + #[rpc(tx=oneshot::Sender>)] + DeleteTopic(DeleteTopic), + #[rpc(tx=oneshot::Sender>)] + PutMetrics(PutMetrics), + #[rpc(tx=oneshot::Sender)] + Ping(Ping), +} + +pub type RemoteResult = Result; + +// #[derive(Debug, Serialize, Deserialize)] +// pub struct RemoteError(pub String); + +#[derive(Serialize, Deserialize, thiserror::Error, Debug)] +pub enum RemoteError { + #[error("Missing capability: {}", _0.to_strings().join(", "))] + MissingCapability(Caps), + #[error("Internal server error")] + InternalServerError, +} + +/// Authentication on first request +#[derive(Debug, Serialize, Deserialize)] +pub struct Auth { + pub caps: Rcan, +} + +/// Request that the node fetches the given blob. +#[derive(Debug, Serialize, Deserialize)] +pub struct PutBlob { + pub ticket: BlobTicket, + pub name: String, +} + +/// Request the name of a blob held by the node +#[derive(Debug, Serialize, Deserialize)] +pub struct GetTag { + pub name: String, +} + pub type ProtoTopicId = [u8; 32]; -/// Messages sent from the client to the server -#[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ServerMessage { - /// Authentication on first request - Auth(Rcan), - /// Request that the node fetches the given blob. - PutBlob { ticket: BlobTicket, name: String }, - /// Request that the node joins the given tossip topic - PutTopic { - topic: ProtoTopicId, - label: String, - bootstrap: Vec, - }, - /// Request that the node joins the given tossip topic - DeleteTopic { topic: ProtoTopicId }, - /// Request the name of a blob held by the node - GetTag { name: String }, - /// Request to store the given metrics data - PutMetrics { encoded: String, session_id: Uuid }, - /// Simple ping requests - Ping { req: [u8; 32] }, -} - -/// Messages sent from the server to the client -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ClientMessage { - // Empty reply - Ack, - /// Authentication response - /// if set, error, otherwise ok - AuthResponse(Option), - /// If set, this means it was an error. - PutBlobResponse(Option), - /// If set, this means it was an error. - PutTopicResponse(Option), - /// If set, this means it was an error. - DeleteTopicResponse(Option), - /// Simple pong response - Pong { - req: [u8; 32], - }, - // if **missing**, means there was an error - GetTagResponse(Option), +/// Request that the node joins the given tossip topic +#[derive(Debug, Serialize, Deserialize)] +pub struct PutTopic { + pub topic: ProtoTopicId, + pub label: String, + pub bootstrap: Vec, +} + +/// Request that the node joins the given tossip topic +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteTopic { + pub topic: ProtoTopicId, +} + +/// Request to store the given metrics data +#[derive(Debug, Serialize, Deserialize)] +pub struct PutMetrics { + pub encoded: String, + pub session_id: Uuid, +} + +/// Simple ping requests +#[derive(Debug, Serialize, Deserialize)] +pub struct Ping { + pub req: [u8; 32], +} + +/// Simple ping response +#[derive(Debug, Serialize, Deserialize)] +pub struct Pong { + pub req: [u8; 32], } From 6f35e3f14d56e4d06cc45718553412b968342f6d Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 21 May 2025 23:31:21 +0200 Subject: [PATCH 2/5] chore: clippy --- src/client.rs | 2 +- src/protocol.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index a9848a1e..549a6614 100644 --- a/src/client.rs +++ b/src/client.rs @@ -132,7 +132,7 @@ impl From for BuildError { irpc::Error::Request(irpc::RequestError::Connection( iroh::endpoint::ConnectionError::ApplicationClosed(frame), )) if frame.error_code == 401u32.into() => Self::Unauthorized, - value @ _ => Self::Rpc(value), + value => Self::Rpc(value), } } } diff --git a/src/protocol.rs b/src/protocol.rs index 77a26978..8ed01e67 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -20,6 +20,7 @@ impl Service for N0desService {} #[rpc_requests(N0desService, message = N0desMessage)] #[derive(Debug, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] pub enum N0desProtocol { #[rpc(tx=oneshot::Sender<()>)] Auth(Auth), From ee423e72106b1f891f6f60d81336f53be7ce0f88 Mon Sep 17 00:00:00 2001 From: b5 Date: Tue, 10 Jun 2025 09:18:01 -0400 Subject: [PATCH 3/5] feat: N0de trait --- src/lib.rs | 7 ++++--- src/n0des.rs | 10 ++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) create mode 100644 src/n0des.rs diff --git a/src/lib.rs b/src/lib.rs index 73d9115a..5a0ecb35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ -mod client; - pub mod caps; -pub mod protocol; +mod client; +mod n0des; +mod protocol; pub use self::{ client::{Client, ClientBuilder}, + n0des::N0de, protocol::ALPN, }; diff --git a/src/n0des.rs b/src/n0des.rs new file mode 100644 index 00000000..24b85f6a --- /dev/null +++ b/src/n0des.rs @@ -0,0 +1,10 @@ +use anyhow::Result; +use iroh::Endpoint; +use n0_future::Future; + +pub trait N0de: 'static + Send { + fn spawn(endpoint: Endpoint) -> impl Future> + Send + where + Self: Sized; + fn shutdown(&mut self) -> impl Future> + Send; +} From a4f78675e1539e6cc53d81d218d0e22ddb068e89 Mon Sep 17 00:00:00 2001 From: b5 Date: Wed, 4 Jun 2025 22:29:31 -0400 Subject: [PATCH 4/5] add exprt macro --- src/n0des.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/n0des.rs b/src/n0des.rs index 24b85f6a..2253c877 100644 --- a/src/n0des.rs +++ b/src/n0des.rs @@ -8,3 +8,13 @@ pub trait N0de: 'static + Send { Self: Sized; fn shutdown(&mut self) -> impl Future> + Send; } + +#[macro_export] +macro_rules! export_node { + ($type:ty) => { + #[no_mangle] + pub extern "C" fn load_plugin() -> Box { + Box::new(<$type>::default()) + } + }; +} From 2d8e6f5d97a31f9e640813ffdd765334e3ca7a88 Mon Sep 17 00:00:00 2001 From: b5 Date: Thu, 5 Jun 2025 11:19:03 -0400 Subject: [PATCH 5/5] WIP: getting compilation to work --- Cargo.lock | 1 + Cargo.toml | 1 + src/n0des.rs | 16 +++++----------- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 422072ef..ad7727b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1937,6 +1937,7 @@ dependencies = [ "irpc-derive", "irpc-iroh", "n0-future", + "paste", "rand 0.8.5", "rcan", "serde", diff --git a/Cargo.toml b/Cargo.toml index db61b368..2c061ee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ iroh-blobs = "0.35" iroh-gossip = { version = "0.35", default-features = false } iroh-metrics = "0.34" n0-future = "0.1.2" +paste = "1.0.14" rand = "0.8" rcan = { git = "https://github.com/n0-computer/rcan", branch = "main" } serde = { version = "1.0.217", features = ["derive"] } diff --git a/src/n0des.rs b/src/n0des.rs index 2253c877..8118c28c 100644 --- a/src/n0des.rs +++ b/src/n0des.rs @@ -1,20 +1,14 @@ +use std::future::Future; + use anyhow::Result; use iroh::Endpoint; -use n0_future::Future; +/// A trait for nodes that can be spawned and shut down pub trait N0de: 'static + Send { fn spawn(endpoint: Endpoint) -> impl Future> + Send where Self: Sized; - fn shutdown(&mut self) -> impl Future> + Send; -} -#[macro_export] -macro_rules! export_node { - ($type:ty) => { - #[no_mangle] - pub extern "C" fn load_plugin() -> Box { - Box::new(<$type>::default()) - } - }; + /// Asynchronously shut down the node + fn shutdown(&mut self) -> impl Future> + Send; }