|
| 1 | +//! HIRO **Action API** (`action-ws`) protocol core — the testable runtime |
| 2 | +//! binding for an OGAR-native ActionHandler (parity brick **B2**). |
| 3 | +//! |
| 4 | +//! This is the *protocol core* of the action-ws adapter — the typed messages |
| 5 | +//! plus the binding `submitAction → ActionInvocation → sendActionResult` — with |
| 6 | +//! **no live WebSocket and no command execution**. It is the deterministic, |
| 7 | +//! unit-tested heart that an outer transport (the live `tokio-tungstenite` loop) |
| 8 | +//! and the executor (parity brick **B1**, the `ExecTarget` runner) wrap. |
| 9 | +//! |
| 10 | +//! Source: the HIRO 7 Action API tutorial (`tutorial-action-handler-action-api`), |
| 11 | +//! transcribed verbatim in `docs/ARAGO-ACTIONHANDLER-PARITY.md` §2. The lifecycle |
| 12 | +//! |
| 13 | +//! ```text |
| 14 | +//! engine ──submitAction──► handler ──acknowledged{200}──► engine |
| 15 | +//! handler (execute) |
| 16 | +//! handler ──sendActionResult──► engine ──acknowledged──► |
| 17 | +//! ``` |
| 18 | +//! |
| 19 | +//! maps field-for-field onto OGAR's [`ActionInvocation`] Rubicon lifecycle |
| 20 | +//! (`Pending → Committed`): `submitAction` builds a `Pending` invocation |
| 21 | +//! ([`submit_to_invocation`]); the engine's final ack is the Lance commit; a |
| 22 | +//! `Committed` invocation yields the `sendActionResult` ([`invocation_to_result`]). |
| 23 | +//! Parameter binding ([`bind_parameters`]) validates the engine's `parameters` |
| 24 | +//! against the capability's [`ActionParam`] signature — the same check arago's |
| 25 | +//! Python handler performs before executing. |
| 26 | +
|
| 27 | +#[cfg(feature = "serde")] |
| 28 | +use serde::{Deserialize, Serialize}; |
| 29 | + |
| 30 | +use ogar_vocab::{ |
| 31 | + ActionDef, ActionInvocation, ActionState, ActionSubject, LokalSpec, ModalSpec, TemporalSpec, |
| 32 | +}; |
| 33 | + |
| 34 | +use crate::do_arm::ActionParam; |
| 35 | + |
| 36 | +/// A `submitAction` message (engine → handler). The engine asks the handler to |
| 37 | +/// run `capability` on a target with the supplied `parameters`. |
| 38 | +#[derive(Debug, Clone, PartialEq, Eq, Default)] |
| 39 | +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] |
| 40 | +pub struct SubmitAction { |
| 41 | + /// Correlation id — `"$appId:$requestId"`. Carried through to the result. |
| 42 | + pub id: String, |
| 43 | + /// The capability requested (e.g. `"ExecuteCommand"`) — must match an |
| 44 | + /// [`ActionDef::predicate`]. |
| 45 | + pub capability: String, |
| 46 | + /// The handler id this action is routed to. |
| 47 | + pub handler: String, |
| 48 | + /// The instance scope (tenant). |
| 49 | + pub scope: Option<String>, |
| 50 | + /// The action inputs (`{host, command, user, …}`), as `(key, value)` pairs. |
| 51 | + pub parameters: Vec<(String, String)>, |
| 52 | + /// Per-action SLA in milliseconds. |
| 53 | + pub timeout_millis: Option<i64>, |
| 54 | +} |
| 55 | + |
| 56 | +/// An `acknowledged` message (either direction): receipt confirmation. |
| 57 | +#[derive(Debug, Clone, PartialEq, Eq)] |
| 58 | +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] |
| 59 | +pub struct Acknowledged { |
| 60 | + /// The id of the message being acknowledged. |
| 61 | + pub id: String, |
| 62 | + /// Status code (`200` on success). |
| 63 | + pub code: u16, |
| 64 | + /// Human-readable note. |
| 65 | + pub message: String, |
| 66 | +} |
| 67 | + |
| 68 | +/// A `sendActionResult` message (handler → engine): the outcome payload — the |
| 69 | +/// capability's `resultParameters` as `(key, value)` pairs. |
| 70 | +#[derive(Debug, Clone, PartialEq, Eq, Default)] |
| 71 | +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] |
| 72 | +pub struct SendActionResult { |
| 73 | + /// The same correlation id as the originating [`SubmitAction`]. |
| 74 | + pub id: String, |
| 75 | + /// The result fields (the `resultParameters` output signature, bound). |
| 76 | + pub result: Vec<(String, String)>, |
| 77 | +} |
| 78 | + |
| 79 | +/// Errors in the protocol binding (the pure core — no I/O errors here). |
| 80 | +#[derive(Debug, Clone, PartialEq, Eq)] |
| 81 | +#[non_exhaustive] |
| 82 | +pub enum ActionWsError { |
| 83 | + /// `submitAction.capability` does not match the [`ActionDef`] it was routed to. |
| 84 | + CapabilityMismatch { |
| 85 | + /// The def's predicate. |
| 86 | + expected: String, |
| 87 | + /// The submitAction's capability. |
| 88 | + got: String, |
| 89 | + }, |
| 90 | + /// A mandatory parameter of the capability signature was not supplied and |
| 91 | + /// has no default. |
| 92 | + MissingMandatoryParam(String), |
| 93 | + /// A result was requested from an invocation that has not reached |
| 94 | + /// [`ActionState::Committed`] (the Rubicon crossing). |
| 95 | + NotCommitted(ActionState), |
| 96 | +} |
| 97 | + |
| 98 | +impl core::fmt::Display for ActionWsError { |
| 99 | + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
| 100 | + match self { |
| 101 | + Self::CapabilityMismatch { expected, got } => { |
| 102 | + write!( |
| 103 | + f, |
| 104 | + "capability mismatch: def expects `{expected}`, got `{got}`" |
| 105 | + ) |
| 106 | + } |
| 107 | + Self::MissingMandatoryParam(p) => write!(f, "missing mandatory parameter `{p}`"), |
| 108 | + Self::NotCommitted(s) => write!(f, "invocation not committed (state = {s:?})"), |
| 109 | + } |
| 110 | + } |
| 111 | +} |
| 112 | + |
| 113 | +impl std::error::Error for ActionWsError {} |
| 114 | + |
| 115 | +/// The handler's immediate receipt acknowledgement (code 200), echoing the |
| 116 | +/// action's `id`. Sent before execution; the engine re-sends `submitAction` |
| 117 | +/// until this arrives (at-least-once → idempotency). |
| 118 | +#[must_use] |
| 119 | +pub fn acknowledge(msg: &SubmitAction) -> Acknowledged { |
| 120 | + Acknowledged { |
| 121 | + id: msg.id.clone(), |
| 122 | + code: 200, |
| 123 | + message: "Received the action".to_owned(), |
| 124 | + } |
| 125 | +} |
| 126 | + |
| 127 | +/// Bind the engine-supplied `parameters` to the capability's [`ActionParam`] |
| 128 | +/// signature: every mandatory param must be supplied (or have a default); |
| 129 | +/// optional params fall back to their default when present, and are dropped |
| 130 | +/// when absent. Returns the bound `(name, value)` set in signature order — the |
| 131 | +/// same validation arago's handler runs before executing the `Command`. |
| 132 | +/// |
| 133 | +/// # Errors |
| 134 | +/// |
| 135 | +/// [`ActionWsError::MissingMandatoryParam`] if a mandatory param is neither |
| 136 | +/// supplied nor defaulted. |
| 137 | +pub fn bind_parameters( |
| 138 | + supplied: &[(String, String)], |
| 139 | + signature: &[ActionParam], |
| 140 | +) -> Result<Vec<(String, String)>, ActionWsError> { |
| 141 | + let mut bound = Vec::with_capacity(signature.len()); |
| 142 | + for p in signature { |
| 143 | + if let Some((_, v)) = supplied.iter().find(|(k, _)| k == &p.name) { |
| 144 | + bound.push((p.name.clone(), v.clone())); |
| 145 | + } else if let Some(default) = &p.default { |
| 146 | + bound.push((p.name.clone(), default.clone())); |
| 147 | + } else if p.mandatory { |
| 148 | + return Err(ActionWsError::MissingMandatoryParam(p.name.clone())); |
| 149 | + } |
| 150 | + // optional + absent + no default → omitted |
| 151 | + } |
| 152 | + Ok(bound) |
| 153 | +} |
| 154 | + |
| 155 | +/// The target node an action acts on — arago routes by the `host` parameter |
| 156 | +/// (the MARS node); fall back to the handler id when absent. |
| 157 | +fn target_node(msg: &SubmitAction) -> String { |
| 158 | + msg.parameters |
| 159 | + .iter() |
| 160 | + .find(|(k, _)| k == "host" || k == "node") |
| 161 | + .map(|(_, v)| v.clone()) |
| 162 | + .unwrap_or_else(|| msg.handler.clone()) |
| 163 | +} |
| 164 | + |
| 165 | +/// Build a **`Pending`** [`ActionInvocation`] from a `submitAction`, realizing |
| 166 | +/// `def` (whose [`predicate`](ActionDef::predicate) must equal the action's |
| 167 | +/// `capability`). This is the `submitAction → ActionInvocation` half of the |
| 168 | +/// lifecycle; the invocation then passes through the RBAC/guard/MUL gate |
| 169 | +/// (`commit_via` in `lance-graph-contract`) before reaching `Committed`. |
| 170 | +/// |
| 171 | +/// Field mapping (`docs/ARAGO-ACTIONHANDLER-PARITY.md` §2): |
| 172 | +/// `capability`→`def.predicate`, `id`→`idempotency_key`, `handler`→`lokal.actor`, |
| 173 | +/// `scope`→`lokal.tenant`, target node→`object_instance`. Automation defaults: |
| 174 | +/// `subject = System`, `temporal = Deferred`, `modal = Idempotent` (at-least-once). |
| 175 | +/// |
| 176 | +/// # Errors |
| 177 | +/// |
| 178 | +/// [`ActionWsError::CapabilityMismatch`] if `msg.capability != def.predicate`. |
| 179 | +pub fn submit_to_invocation( |
| 180 | + msg: &SubmitAction, |
| 181 | + def: &ActionDef, |
| 182 | +) -> Result<ActionInvocation, ActionWsError> { |
| 183 | + if msg.capability != def.predicate { |
| 184 | + return Err(ActionWsError::CapabilityMismatch { |
| 185 | + expected: def.predicate.clone(), |
| 186 | + got: msg.capability.clone(), |
| 187 | + }); |
| 188 | + } |
| 189 | + let object_instance = target_node(msg); |
| 190 | + let identity = format!("{}::invocation::{}", def.object_class, msg.id); |
| 191 | + let mut inv = ActionInvocation::new(identity, def.identity.clone(), object_instance); |
| 192 | + inv.subject = ActionSubject::System; |
| 193 | + inv.temporal = TemporalSpec::Deferred; |
| 194 | + inv.modal = ModalSpec::Idempotent; |
| 195 | + inv.state = ActionState::Pending; |
| 196 | + inv.idempotency_key = Some(msg.id.clone()); |
| 197 | + // LokalSpec is #[non_exhaustive] — build via Default + field set, not a literal. |
| 198 | + let mut lokal = LokalSpec::default(); |
| 199 | + lokal.actor = Some(msg.handler.clone()); |
| 200 | + lokal.tenant = msg.scope.clone(); |
| 201 | + inv.lokal = lokal; |
| 202 | + Ok(inv) |
| 203 | +} |
| 204 | + |
| 205 | +/// Build the `sendActionResult` from a **`Committed`** invocation plus the |
| 206 | +/// executor's result payload (the bound `resultParameters`). Only a committed |
| 207 | +/// invocation (the Rubicon crossing) yields a result — a `Pending` / `Failed` / |
| 208 | +/// `Cancelled` invocation has nothing to report on the success path. |
| 209 | +/// |
| 210 | +/// # Errors |
| 211 | +/// |
| 212 | +/// [`ActionWsError::NotCommitted`] if the invocation has not reached |
| 213 | +/// [`ActionState::Committed`]. |
| 214 | +pub fn invocation_to_result( |
| 215 | + inv: &ActionInvocation, |
| 216 | + result: Vec<(String, String)>, |
| 217 | +) -> Result<SendActionResult, ActionWsError> { |
| 218 | + if inv.state != ActionState::Committed { |
| 219 | + return Err(ActionWsError::NotCommitted(inv.state)); |
| 220 | + } |
| 221 | + Ok(SendActionResult { |
| 222 | + id: inv.idempotency_key.clone().unwrap_or_default(), |
| 223 | + result, |
| 224 | + }) |
| 225 | +} |
| 226 | + |
| 227 | +// ───────────────────────────────────────────────────────────── tests ── |
| 228 | +// |
| 229 | +// The pure protocol core: the full submitAction → bind → invocation(Pending) |
| 230 | +// → (Committed) → sendActionResult flow, deterministic and socket-free. |
| 231 | + |
| 232 | +#[cfg(test)] |
| 233 | +mod tests { |
| 234 | + use super::*; |
| 235 | + |
| 236 | + /// An ExecuteCommand-shaped capability signature (the arago SSH handler): |
| 237 | + /// mandatory `command`, optional `timeout` defaulting to `60000`. |
| 238 | + fn execute_command_signature() -> Vec<ActionParam> { |
| 239 | + vec![ |
| 240 | + ActionParam { |
| 241 | + name: "command".to_owned(), |
| 242 | + mandatory: true, |
| 243 | + default: None, |
| 244 | + }, |
| 245 | + ActionParam { |
| 246 | + name: "timeout".to_owned(), |
| 247 | + mandatory: false, |
| 248 | + default: Some("60000".to_owned()), |
| 249 | + }, |
| 250 | + ] |
| 251 | + } |
| 252 | + |
| 253 | + fn execute_command_def() -> ActionDef { |
| 254 | + ActionDef::new( |
| 255 | + "ogit-automation/action_capability::action_def::ExecuteCommand", |
| 256 | + "ExecuteCommand", |
| 257 | + "ogit-automation/mars_machine", |
| 258 | + ) |
| 259 | + } |
| 260 | + |
| 261 | + fn submit() -> SubmitAction { |
| 262 | + SubmitAction { |
| 263 | + id: "app1:req42".to_owned(), |
| 264 | + capability: "ExecuteCommand".to_owned(), |
| 265 | + handler: "handler-7".to_owned(), |
| 266 | + scope: Some("tenant-A".to_owned()), |
| 267 | + parameters: vec![ |
| 268 | + ("host".to_owned(), "node-9".to_owned()), |
| 269 | + ("command".to_owned(), "uptime".to_owned()), |
| 270 | + ], |
| 271 | + timeout_millis: Some(60_000), |
| 272 | + } |
| 273 | + } |
| 274 | + |
| 275 | + #[test] |
| 276 | + fn acknowledge_echoes_id_with_200() { |
| 277 | + let ack = acknowledge(&submit()); |
| 278 | + assert_eq!(ack.id, "app1:req42"); |
| 279 | + assert_eq!(ack.code, 200); |
| 280 | + } |
| 281 | + |
| 282 | + #[test] |
| 283 | + fn bind_parameters_fills_default_and_keeps_supplied() { |
| 284 | + let bound = |
| 285 | + bind_parameters(&submit().parameters, &execute_command_signature()).expect("binds"); |
| 286 | + // `command` supplied, `timeout` defaulted; signature order preserved. |
| 287 | + assert_eq!( |
| 288 | + bound, |
| 289 | + vec![ |
| 290 | + ("command".to_owned(), "uptime".to_owned()), |
| 291 | + ("timeout".to_owned(), "60000".to_owned()), |
| 292 | + ] |
| 293 | + ); |
| 294 | + } |
| 295 | + |
| 296 | + #[test] |
| 297 | + fn bind_parameters_rejects_missing_mandatory() { |
| 298 | + let supplied = vec![("timeout".to_owned(), "5".to_owned())]; |
| 299 | + let err = bind_parameters(&supplied, &execute_command_signature()).unwrap_err(); |
| 300 | + assert_eq!( |
| 301 | + err, |
| 302 | + ActionWsError::MissingMandatoryParam("command".to_owned()) |
| 303 | + ); |
| 304 | + } |
| 305 | + |
| 306 | + #[test] |
| 307 | + fn submit_builds_pending_invocation_with_provenance() { |
| 308 | + let inv = submit_to_invocation(&submit(), &execute_command_def()).expect("builds"); |
| 309 | + assert_eq!(inv.state, ActionState::Pending); |
| 310 | + assert_eq!(inv.object_instance, "node-9"); // routed by the `host` param |
| 311 | + assert_eq!(inv.idempotency_key.as_deref(), Some("app1:req42")); |
| 312 | + assert_eq!(inv.action_def, execute_command_def().identity); |
| 313 | + assert_eq!(inv.lokal.actor.as_deref(), Some("handler-7")); |
| 314 | + assert_eq!(inv.lokal.tenant.as_deref(), Some("tenant-A")); |
| 315 | + assert!(matches!(inv.modal, ModalSpec::Idempotent)); |
| 316 | + } |
| 317 | + |
| 318 | + #[test] |
| 319 | + fn submit_rejects_capability_mismatch() { |
| 320 | + let mut bad = submit(); |
| 321 | + bad.capability = "RunScript".to_owned(); |
| 322 | + let err = submit_to_invocation(&bad, &execute_command_def()).unwrap_err(); |
| 323 | + assert_eq!( |
| 324 | + err, |
| 325 | + ActionWsError::CapabilityMismatch { |
| 326 | + expected: "ExecuteCommand".to_owned(), |
| 327 | + got: "RunScript".to_owned(), |
| 328 | + } |
| 329 | + ); |
| 330 | + } |
| 331 | + |
| 332 | + #[test] |
| 333 | + fn committed_invocation_yields_result_pending_does_not() { |
| 334 | + let mut inv = submit_to_invocation(&submit(), &execute_command_def()).expect("builds"); |
| 335 | + |
| 336 | + // Pending → no result on the success path. |
| 337 | + let pending = invocation_to_result(&inv, vec![]); |
| 338 | + assert_eq!( |
| 339 | + pending.unwrap_err(), |
| 340 | + ActionWsError::NotCommitted(ActionState::Pending) |
| 341 | + ); |
| 342 | + |
| 343 | + // The Rubicon crossing (the gate would set this) → result emitted. |
| 344 | + inv.state = ActionState::Committed; |
| 345 | + let result = invocation_to_result( |
| 346 | + &inv, |
| 347 | + vec![("output".to_owned(), "12:00 up 3 days".to_owned())], |
| 348 | + ) |
| 349 | + .expect("committed → result"); |
| 350 | + assert_eq!(result.id, "app1:req42"); // correlation id round-trips |
| 351 | + assert_eq!( |
| 352 | + result.result, |
| 353 | + vec![("output".to_owned(), "12:00 up 3 days".to_owned())] |
| 354 | + ); |
| 355 | + } |
| 356 | + |
| 357 | + /// The whole loop, end-to-end (socket-free): submit → ack → bind → invoke |
| 358 | + /// → commit → result, with the `id` correlating throughout. |
| 359 | + #[test] |
| 360 | + fn full_action_ws_roundtrip() { |
| 361 | + let msg = submit(); |
| 362 | + let def = execute_command_def(); |
| 363 | + |
| 364 | + let ack = acknowledge(&msg); |
| 365 | + assert_eq!(ack.code, 200); |
| 366 | + |
| 367 | + let _bound = bind_parameters(&msg.parameters, &execute_command_signature()).expect("bind"); |
| 368 | + |
| 369 | + let mut inv = submit_to_invocation(&msg, &def).expect("invoke"); |
| 370 | + assert_eq!(inv.state, ActionState::Pending); |
| 371 | + |
| 372 | + // (the executor + commit_via gate run here; we simulate the crossing) |
| 373 | + inv.state = ActionState::Committed; |
| 374 | + |
| 375 | + let result = invocation_to_result(&inv, vec![("exitcode".to_owned(), "0".to_owned())]) |
| 376 | + .expect("result"); |
| 377 | + assert_eq!(result.id, msg.id); |
| 378 | + } |
| 379 | +} |
0 commit comments