Skip to content

Commit 2323a0d

Browse files
feat(ipc): Implement IPC effects for sidecar communication
This commit introduces three new ActionEffects to facilitate robust Inter-Process Communication (IPC) with sidecar processes: * **ProxyCallToSideCar**: A high-level effect designed to route an entire RPC call (method and parameters) received by the host to another sidecar. It extracts the method and parameters from a `CallData` JSON object and forwards them using `SendRequestToSideCar`. * **SendNotificationToSideCar**: An effect for sending fire-and-forget notifications to a sidecar. It takes a target sidecar ID, a method name, and parameters, and executes the send operation without awaiting a response. * **SendRequestToSideCar**: A core effect for performing request-response RPC calls to a sidecar. It accepts a target sidecar ID, method name, parameters, and a timeout, then awaits and returns the JSON response from the sidecar. These effects abstract the details of the underlying IPC mechanism (e.g., gRPC) by utilizing a shared `IPCProvider` capability, promoting modularity and testability within the application's architecture.
1 parent c52cd6b commit 2323a0d

File tree

3 files changed

+170
-0
lines changed

3 files changed

+170
-0
lines changed

Source/IPC/ProxyCallToSideCar.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! # ProxyCallToSideCar Effect
2+
//!
3+
//! Defines the `ActionEffect` for proxying a generic RPC call to a sidecar
4+
//! process.
5+
6+
use std::sync::Arc;
7+
8+
use serde_json::Value;
9+
10+
use super::IPCProvider::IPCProvider;
11+
use crate::{Effect::ActionEffect::ActionEffect, Error::CommonError::CommonError};
12+
13+
/// Creates an effect that proxies an RPC call to a specified target sidecar.
14+
///
15+
/// This is a powerful utility for scenarios where the host application
16+
/// (`Mountain`) needs to act as a router, forwarding a request it received from
17+
/// one process to another without needing to understand the request's content.
18+
/// The entire call payload is encapsulated within the `CallData` object.
19+
///
20+
/// # Parameters
21+
///
22+
/// * `TargetSideCarIdentifier`: The unique ID of the sidecar to which the call
23+
/// should be proxied.
24+
/// * `CallData`: A JSON `Value` expected to be an object containing `{"Method":
25+
/// "...", "Parameters": ...}`.
26+
///
27+
/// # Returns
28+
///
29+
/// An `ActionEffect` that resolves with the JSON `Value` returned by the
30+
/// target sidecar.
31+
pub fn ProxyCallToSideCar(
32+
TargetSideCarIdentifier:String,
33+
34+
CallData:Value,
35+
) -> ActionEffect<Arc<dyn IPCProvider>, CommonError, Value> {
36+
ActionEffect::New(Arc::new(move |Provider:Arc<dyn IPCProvider>| {
37+
let TargetIdentifierClone = TargetSideCarIdentifier.clone();
38+
39+
let CallDataClone = CallData.clone();
40+
41+
Box::pin(async move {
42+
let MethodString = CallDataClone
43+
.get("Method")
44+
.and_then(Value::as_str)
45+
.ok_or_else(|| {
46+
CommonError::InvalidArgument {
47+
ArgumentName:"CallData.Method".to_string(),
48+
49+
Reason:"Expected a 'Method' string field in CallData for proxying.".to_string(),
50+
}
51+
})?
52+
.to_string();
53+
54+
let ParametersValue = CallDataClone.get("Parameters").cloned().unwrap_or(Value::Null);
55+
56+
// Using a default timeout here; a real implementation might make this
57+
// configurable by extracting it from the CallData payload.
58+
let DefaultTimeoutMilliseconds = 30000;
59+
60+
Provider
61+
.SendRequestToSideCar(TargetIdentifierClone, MethodString, ParametersValue, DefaultTimeoutMilliseconds)
62+
.await
63+
})
64+
}))
65+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//! # SendNotificationToSideCar Effect
2+
//!
3+
//! Defines the `ActionEffect` for sending a fire-and-forget notification to a
4+
//! sidecar process.
5+
6+
use std::sync::Arc;
7+
8+
use serde_json::Value;
9+
10+
use super::IPCProvider::IPCProvider;
11+
use crate::{Effect::ActionEffect::ActionEffect, Error::CommonError::CommonError};
12+
13+
/// Creates an effect that, when executed, will send a fire-and-forget
14+
/// notification to a specified sidecar process.
15+
///
16+
/// It uses the `IPCProvider` capability from the environment to perform the
17+
/// actual IPC send operation. Unlike `SendRequestToSideCar`, this effect does
18+
/// not wait for or expect a response.
19+
///
20+
/// # Parameters
21+
///
22+
/// * `SideCarIdentifier`: The unique ID of the target sidecar process.
23+
/// * `Method`: The name of the notification method to be invoked on the
24+
/// sidecar.
25+
/// * `Parameters`: A `serde_json::Value` containing the parameters for the
26+
/// notification.
27+
///
28+
/// # Returns
29+
///
30+
/// An `ActionEffect` that resolves to `()` on success.
31+
pub fn SendNotificationToSideCar(
32+
SideCarIdentifier:String,
33+
34+
Method:String,
35+
36+
Parameters:Value,
37+
) -> ActionEffect<Arc<dyn IPCProvider>, CommonError, ()> {
38+
ActionEffect::New(Arc::new(move |Provider:Arc<dyn IPCProvider>| {
39+
let SideCarIdentifierClone = SideCarIdentifier.clone();
40+
41+
let MethodClone = Method.clone();
42+
43+
let ParametersClone = Parameters.clone();
44+
45+
Box::pin(async move {
46+
Provider
47+
.SendNotificationToSideCar(SideCarIdentifierClone, MethodClone, ParametersClone)
48+
.await
49+
})
50+
}))
51+
}

Source/IPC/SendRequestToSideCar.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! # SendRequestToSideCar Effect
2+
//!
3+
//! Defines the `ActionEffect` for sending a request-response RPC call to a
4+
//! sidecar process.
5+
6+
use std::sync::Arc;
7+
8+
use serde_json::Value;
9+
10+
use super::IPCProvider::IPCProvider;
11+
use crate::{Effect::ActionEffect::ActionEffect, Error::CommonError::CommonError};
12+
13+
/// Creates an effect that, when executed, will send a request to a specified
14+
/// sidecar process and await its response.
15+
///
16+
/// It uses the `IPCProvider` capability from the environment to perform the
17+
/// actual IPC request operation over the underlying transport (e.g., gRPC).
18+
///
19+
/// # Parameters
20+
///
21+
/// * `SideCarIdentifier`: The unique ID of the target sidecar process.
22+
/// * `Method`: The name of the RPC method to be invoked on the sidecar.
23+
/// * `Parameters`: A `serde_json::Value` containing the parameters for the
24+
/// request.
25+
/// * `TimeoutMilliseconds`: The maximum time to wait for a response before
26+
/// failing.
27+
///
28+
/// # Returns
29+
///
30+
/// An `ActionEffect` that resolves with the `serde_json::Value` response from
31+
/// the sidecar.
32+
pub fn SendRequestToSideCar(
33+
SideCarIdentifier:String,
34+
35+
Method:String,
36+
37+
Parameters:Value,
38+
39+
TimeoutMilliseconds:u64,
40+
) -> ActionEffect<Arc<dyn IPCProvider>, CommonError, Value> {
41+
ActionEffect::New(Arc::new(move |Provider:Arc<dyn IPCProvider>| {
42+
let SideCarIdentifierClone = SideCarIdentifier.clone();
43+
44+
let MethodClone = Method.clone();
45+
46+
let ParametersClone = Parameters.clone();
47+
48+
Box::pin(async move {
49+
Provider
50+
.SendRequestToSideCar(SideCarIdentifierClone, MethodClone, ParametersClone, TimeoutMilliseconds)
51+
.await
52+
})
53+
}))
54+
}

0 commit comments

Comments
 (0)