Skip to content

Commit acdc818

Browse files
sanil-23claude
andauthored
fix(voice): atomic install-start guard for Whisper/Piper install RPCs (tinyhumansai#1787)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b778433 commit acdc818

4 files changed

Lines changed: 406 additions & 64 deletions

File tree

src/openhuman/composio/client.rs

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,38 @@ impl ComposioClient {
150150

151151
// ── Execute ─────────────────────────────────────────────────────
152152

153+
/// `POST /agent-integrations/composio/execute` — single, non-retrying
154+
/// HTTP round-trip. Use this when the caller owns the retry loop
155+
/// (e.g. `auth_retry`) to avoid double-retry.
156+
pub(crate) async fn execute_tool_once(
157+
&self,
158+
tool: &str,
159+
arguments: Option<serde_json::Value>,
160+
) -> Result<ComposioExecuteResponse> {
161+
let tool = tool.trim();
162+
if tool.is_empty() {
163+
anyhow::bail!("composio.execute_tool_once: tool slug must not be empty");
164+
}
165+
let arguments = arguments.unwrap_or(serde_json::Value::Object(Default::default()));
166+
tracing::debug!(tool = %tool, "[composio] execute_tool_once (no built-in retry)");
167+
let body = json!({ "tool": tool, "arguments": arguments });
168+
let result = self.post_execute_tool(&body).await;
169+
match &result {
170+
Ok(resp) => tracing::debug!(
171+
tool = %tool,
172+
successful = resp.successful,
173+
has_error = resp.error.is_some(),
174+
"[composio] execute_tool_once completed"
175+
),
176+
Err(err) => tracing::error!(
177+
tool = %tool,
178+
error = %err,
179+
"[composio] execute_tool_once failed"
180+
),
181+
}
182+
result
183+
}
184+
153185
/// `POST /agent-integrations/composio/execute` — run a Composio
154186
/// action and return the provider result + cost.
155187
pub async fn execute_tool(
@@ -233,46 +265,6 @@ impl ComposioClient {
233265
.await
234266
}
235267

236-
/// Single-shot `execute_tool` — same body construction and slug validation
237-
/// as [`Self::execute_tool`], but **without** the inner post-OAuth retry
238-
/// that [`Self::execute_tool_with_post_oauth_retry`] performs. Reserved
239-
/// for callers that already own a higher-level retry policy and would
240-
/// otherwise stack two retry layers (4 hits to the gateway instead of 2).
241-
/// In particular, [`super::auth_retry::execute_with_auth_retry`] uses
242-
/// this entry point so its `must retry exactly once` contract still
243-
/// holds after PR #1707 introduced the inner retry.
244-
pub(crate) async fn execute_tool_once(
245-
&self,
246-
tool: &str,
247-
arguments: Option<serde_json::Value>,
248-
) -> Result<ComposioExecuteResponse> {
249-
let tool = tool.trim();
250-
if tool.is_empty() {
251-
anyhow::bail!("composio.execute_tool_once: tool slug must not be empty");
252-
}
253-
let arguments = arguments.unwrap_or(serde_json::Value::Object(Default::default()));
254-
tracing::debug!(
255-
tool = %tool,
256-
"[composio] execute_tool_once start"
257-
);
258-
let body = json!({ "tool": tool, "arguments": arguments });
259-
let result = self.post_execute_tool(&body).await;
260-
match &result {
261-
Ok(resp) => tracing::debug!(
262-
tool = %tool,
263-
successful = resp.successful,
264-
has_error = resp.error.is_some(),
265-
"[composio] execute_tool_once completed"
266-
),
267-
Err(err) => tracing::debug!(
268-
tool = %tool,
269-
error = %err,
270-
"[composio] execute_tool_once failed"
271-
),
272-
}
273-
result
274-
}
275-
276268
/// `GET /agent-integrations/composio/github/repos` — list repositories
277269
/// available via the user's authorized GitHub connected account.
278270
pub async fn list_github_repos(

src/openhuman/local_ai/schemas.rs

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,20 +1036,29 @@ fn handle_local_ai_install_whisper(params: Map<String, Value>) -> ControllerFutu
10361036
let config = config_rpc::load_config_with_timeout().await?;
10371037
let force = p.force.unwrap_or(false);
10381038

1039-
// Idempotency: a duplicate click while an install is already in
1040-
// flight should be a no-op, not a second concurrent download.
1041-
let current = crate::openhuman::local_ai::voice_install_common::read_status(
1039+
// Atomic install-start guard. A duplicate click while an install
1040+
// is already in flight (or a parallel auto-install firing
1041+
// alongside a manual click) must be a no-op — not a second
1042+
// concurrent download racing on the same `.part` file inside
1043+
// `download_to_file`. The previous read_status -> check ->
1044+
// write_status sequence was non-atomic and let two callers slip
1045+
// through; `try_acquire_install_slot` does the check-and-claim
1046+
// under a single mutex acquisition.
1047+
let slot = match crate::openhuman::local_ai::voice_install_common::try_acquire_install_slot(
10421048
crate::openhuman::local_ai::voice_install_common::ENGINE_WHISPER,
1043-
);
1044-
if current.state
1045-
== crate::openhuman::local_ai::voice_install_common::VoiceInstallState::Installing
1046-
{
1047-
tracing::debug!(
1048-
"[voice-install:whisper] already installing — returning current status"
1049-
);
1050-
return serde_json::to_value(current)
1051-
.map_err(|e| format!("serialize whisper status: {e}"));
1052-
}
1049+
) {
1050+
Some(slot) => slot,
1051+
None => {
1052+
tracing::debug!(
1053+
"[voice-install:whisper] slot already held — returning current status"
1054+
);
1055+
let current = crate::openhuman::local_ai::voice_install_common::read_status(
1056+
crate::openhuman::local_ai::voice_install_common::ENGINE_WHISPER,
1057+
);
1058+
return serde_json::to_value(current)
1059+
.map_err(|e| format!("serialize whisper status: {e}"));
1060+
}
1061+
};
10531062

10541063
// Mark "installing" before the spawn so the very next status poll
10551064
// (≤ 2s away) reflects the new state without a stale read.
@@ -1073,7 +1082,12 @@ fn handle_local_ai_install_whisper(params: Map<String, Value>) -> ControllerFutu
10731082
"[voice-install:whisper] spawning background install"
10741083
);
10751084
let model_size = p.model_size.clone();
1085+
// Move the slot into the spawned task so it lives for the actual
1086+
// install duration (download + extract + validate), not just the
1087+
// RPC handler's lifetime. The slot's Drop releases the
1088+
// single-writer guard on task exit, including via panic.
10761089
tokio::spawn(async move {
1090+
let _slot = slot;
10771091
if let Err(e) = crate::openhuman::local_ai::install_whisper::install_whisper(
10781092
&config, model_size, force,
10791093
)
@@ -1096,16 +1110,23 @@ fn handle_local_ai_install_piper(params: Map<String, Value>) -> ControllerFuture
10961110
let config = config_rpc::load_config_with_timeout().await?;
10971111
let force = p.force.unwrap_or(false);
10981112

1099-
let current = crate::openhuman::local_ai::voice_install_common::read_status(
1113+
// See the whisper handler above for why this is an atomic slot
1114+
// acquisition rather than a read_status / write_status pair.
1115+
let slot = match crate::openhuman::local_ai::voice_install_common::try_acquire_install_slot(
11001116
crate::openhuman::local_ai::voice_install_common::ENGINE_PIPER,
1101-
);
1102-
if current.state
1103-
== crate::openhuman::local_ai::voice_install_common::VoiceInstallState::Installing
1104-
{
1105-
tracing::debug!("[voice-install:piper] already installing — returning current status");
1106-
return serde_json::to_value(current)
1107-
.map_err(|e| format!("serialize piper status: {e}"));
1108-
}
1117+
) {
1118+
Some(slot) => slot,
1119+
None => {
1120+
tracing::debug!(
1121+
"[voice-install:piper] slot already held — returning current status"
1122+
);
1123+
let current = crate::openhuman::local_ai::voice_install_common::read_status(
1124+
crate::openhuman::local_ai::voice_install_common::ENGINE_PIPER,
1125+
);
1126+
return serde_json::to_value(current)
1127+
.map_err(|e| format!("serialize piper status: {e}"));
1128+
}
1129+
};
11091130

11101131
crate::openhuman::local_ai::voice_install_common::write_status(
11111132
crate::openhuman::local_ai::voice_install_common::VoiceInstallStatus {
@@ -1126,7 +1147,10 @@ fn handle_local_ai_install_piper(params: Map<String, Value>) -> ControllerFuture
11261147
"[voice-install:piper] spawning background install"
11271148
);
11281149
let voice_id = p.voice_id.clone();
1150+
// Move the slot into the spawned task — same rationale as the
1151+
// whisper handler.
11291152
tokio::spawn(async move {
1153+
let _slot = slot;
11301154
if let Err(e) =
11311155
crate::openhuman::local_ai::install_piper::install_piper(&config, voice_id, force)
11321156
.await

src/openhuman/local_ai/schemas_tests.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,130 @@ async fn handle_set_ollama_path_accepts_empty_string_to_clear() {
269269
std::env::remove_var("OPENHUMAN_WORKSPACE");
270270
}
271271
}
272+
273+
/// Regression test for the CodeRabbit #7 race on PR #1755: when two
274+
/// concurrent RPC calls (e.g. a double-click, or the auto-install firing
275+
/// alongside a manual click) hit `handle_local_ai_install_whisper` at
276+
/// the same time, only one of them must spawn a real install task. The
277+
/// other must short-circuit and return the in-flight status without
278+
/// starting a second download that would race on the same `.part` file.
279+
///
280+
/// We exercise the actual handler — not just the slot primitive — so
281+
/// the wiring at the call site is also covered.
282+
#[tokio::test]
283+
async fn install_whisper_handler_serializes_concurrent_calls() {
284+
let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
285+
let tmp = TempDir::new().unwrap();
286+
unsafe {
287+
std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path());
288+
}
289+
290+
// Pre-acquire the install slot from the test so we're guaranteed to
291+
// observe the "already in flight" code path. Holding the slot here
292+
// also means the handler under test will short-circuit immediately
293+
// rather than spawning a real install task that would try to hit
294+
// the network in CI.
295+
let slot = crate::openhuman::local_ai::voice_install_common::try_acquire_install_slot(
296+
crate::openhuman::local_ai::voice_install_common::ENGINE_WHISPER,
297+
)
298+
.expect("test should be able to claim the slot first");
299+
300+
// Mark the status table as `Installing` so the handler's
301+
// short-circuit branch (which reads current status to return) sees
302+
// a coherent snapshot.
303+
crate::openhuman::local_ai::voice_install_common::write_status(
304+
crate::openhuman::local_ai::voice_install_common::VoiceInstallStatus {
305+
engine: crate::openhuman::local_ai::voice_install_common::ENGINE_WHISPER.to_string(),
306+
state: crate::openhuman::local_ai::voice_install_common::VoiceInstallState::Installing,
307+
progress: Some(0),
308+
downloaded_bytes: None,
309+
total_bytes: None,
310+
stage: Some("queued".to_string()),
311+
error_detail: None,
312+
},
313+
);
314+
315+
// Fire two handler calls in parallel. Both must succeed and both
316+
// must return the existing `Installing` status — neither must
317+
// mutate or re-spawn. This is exactly the double-click / auto-fire
318+
// shape described in CodeRabbit #7.
319+
let (r1, r2) = tokio::join!(
320+
handle_local_ai_install_whisper(Map::new()),
321+
handle_local_ai_install_whisper(Map::new())
322+
);
323+
324+
unsafe {
325+
std::env::remove_var("OPENHUMAN_WORKSPACE");
326+
}
327+
drop(slot);
328+
// Clean up so other tests see Missing.
329+
crate::openhuman::local_ai::voice_install_common::reset_status(
330+
crate::openhuman::local_ai::voice_install_common::ENGINE_WHISPER,
331+
);
332+
333+
let v1 = r1.expect("first call ok");
334+
let v2 = r2.expect("second call ok");
335+
// Both calls must report the engine is already installing — proving
336+
// the handler short-circuited rather than running the spawn path.
337+
for (label, v) in [("first", &v1), ("second", &v2)] {
338+
let state = v.get("state").and_then(|s| s.as_str());
339+
assert_eq!(
340+
state,
341+
Some("installing"),
342+
"{label} concurrent call should see Installing, got {v:?}"
343+
);
344+
}
345+
}
346+
347+
/// Same regression for Piper. The two handlers share the slot
348+
/// infrastructure but live in separate code paths, so the wiring needs
349+
/// independent coverage.
350+
#[tokio::test]
351+
async fn install_piper_handler_serializes_concurrent_calls() {
352+
let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
353+
let tmp = TempDir::new().unwrap();
354+
unsafe {
355+
std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path());
356+
}
357+
358+
let slot = crate::openhuman::local_ai::voice_install_common::try_acquire_install_slot(
359+
crate::openhuman::local_ai::voice_install_common::ENGINE_PIPER,
360+
)
361+
.expect("test should be able to claim the slot first");
362+
363+
crate::openhuman::local_ai::voice_install_common::write_status(
364+
crate::openhuman::local_ai::voice_install_common::VoiceInstallStatus {
365+
engine: crate::openhuman::local_ai::voice_install_common::ENGINE_PIPER.to_string(),
366+
state: crate::openhuman::local_ai::voice_install_common::VoiceInstallState::Installing,
367+
progress: Some(0),
368+
downloaded_bytes: None,
369+
total_bytes: None,
370+
stage: Some("queued".to_string()),
371+
error_detail: None,
372+
},
373+
);
374+
375+
let (r1, r2) = tokio::join!(
376+
handle_local_ai_install_piper(Map::new()),
377+
handle_local_ai_install_piper(Map::new())
378+
);
379+
380+
unsafe {
381+
std::env::remove_var("OPENHUMAN_WORKSPACE");
382+
}
383+
drop(slot);
384+
crate::openhuman::local_ai::voice_install_common::reset_status(
385+
crate::openhuman::local_ai::voice_install_common::ENGINE_PIPER,
386+
);
387+
388+
let v1 = r1.expect("first call ok");
389+
let v2 = r2.expect("second call ok");
390+
for (label, v) in [("first", &v1), ("second", &v2)] {
391+
let state = v.get("state").and_then(|s| s.as_str());
392+
assert_eq!(
393+
state,
394+
Some("installing"),
395+
"{label} concurrent call should see Installing, got {v:?}"
396+
);
397+
}
398+
}

0 commit comments

Comments
 (0)