Skip to content

Commit 1939161

Browse files
committed
fix: add request timeouts, bounded queues, and tool output truncation
1 parent 1a593ff commit 1939161

8 files changed

Lines changed: 109 additions & 30 deletions

File tree

src-tauri/src/backend/app_server.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ fn build_initialize_params(client_version: &str) -> Value {
5252
})
5353
}
5454

55+
const REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
56+
5557
pub(crate) struct WorkspaceSession {
5658
pub(crate) entry: WorkspaceEntry,
5759
pub(crate) child: Mutex<Child>,
@@ -77,9 +79,24 @@ impl WorkspaceSession {
7779
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
7880
let (tx, rx) = oneshot::channel();
7981
self.pending.lock().await.insert(id, tx);
80-
self.write_message(json!({ "id": id, "method": method, "params": params }))
81-
.await?;
82-
rx.await.map_err(|_| "request canceled".to_string())
82+
if let Err(error) = self
83+
.write_message(json!({ "id": id, "method": method, "params": params }))
84+
.await
85+
{
86+
self.pending.lock().await.remove(&id);
87+
return Err(error);
88+
}
89+
match timeout(REQUEST_TIMEOUT, rx).await {
90+
Ok(Ok(value)) => Ok(value),
91+
Ok(Err(_)) => Err("request canceled".to_string()),
92+
Err(_) => {
93+
self.pending.lock().await.remove(&id);
94+
Err(format!(
95+
"request timed out after {} seconds",
96+
REQUEST_TIMEOUT.as_secs()
97+
))
98+
}
99+
}
83100
}
84101

85102
pub(crate) async fn send_notification(
@@ -406,6 +423,9 @@ pub(crate) async fn spawn_workspace_session<E: EventSink>(
406423
}
407424
}
408425
}
426+
427+
// Ensure pending foreground requests cannot accumulate after process output ends.
428+
session_clone.pending.lock().await.clear();
409429
});
410430

411431
let workspace_id = entry.id.clone();

src-tauri/src/remote_backend/mod.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ mod transport;
66
use serde_json::{json, Value};
77
use std::sync::atomic::{AtomicU64, Ordering};
88
use std::sync::Arc;
9+
use std::time::Duration;
910

1011
use tauri::AppHandle;
1112
use tokio::sync::Mutex;
13+
use tokio::time::timeout;
1214

1315
use crate::state::AppState;
1416
use crate::types::{BackendMode, RemoteBackendProvider};
@@ -18,6 +20,9 @@ use self::protocol::{build_request_line, DEFAULT_REMOTE_HOST, DISCONNECTED_MESSA
1820
use self::tcp_transport::TcpTransport;
1921
use self::transport::{PendingMap, RemoteTransport, RemoteTransportConfig, RemoteTransportKind};
2022

23+
const REMOTE_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
24+
const REMOTE_SEND_TIMEOUT: Duration = Duration::from_secs(15);
25+
2126
pub(crate) fn normalize_path_for_remote(path: String) -> String {
2227
let trimmed = path.trim();
2328
if trimmed.is_empty() {
@@ -58,7 +63,7 @@ pub(crate) struct RemoteBackend {
5863
}
5964

6065
struct RemoteBackendInner {
61-
out_tx: tokio::sync::mpsc::UnboundedSender<String>,
66+
out_tx: tokio::sync::mpsc::Sender<String>,
6267
pending: Arc<Mutex<PendingMap>>,
6368
next_id: AtomicU64,
6469
connected: Arc<std::sync::atomic::AtomicBool>,
@@ -75,12 +80,32 @@ impl RemoteBackend {
7580
self.inner.pending.lock().await.insert(id, tx);
7681

7782
let message = build_request_line(id, method, params)?;
78-
if self.inner.out_tx.send(message).is_err() {
79-
self.inner.pending.lock().await.remove(&id);
80-
return Err(DISCONNECTED_MESSAGE.to_string());
83+
match timeout(REMOTE_SEND_TIMEOUT, self.inner.out_tx.send(message)).await {
84+
Ok(Ok(())) => {}
85+
Ok(Err(_)) => {
86+
self.inner.pending.lock().await.remove(&id);
87+
return Err(DISCONNECTED_MESSAGE.to_string());
88+
}
89+
Err(_) => {
90+
self.inner.pending.lock().await.remove(&id);
91+
return Err(format!(
92+
"remote backend request dispatch timed out after {} seconds",
93+
REMOTE_SEND_TIMEOUT.as_secs()
94+
));
95+
}
8196
}
8297

83-
rx.await.map_err(|_| DISCONNECTED_MESSAGE.to_string())?
98+
match timeout(REMOTE_REQUEST_TIMEOUT, rx).await {
99+
Ok(Ok(result)) => result,
100+
Ok(Err(_)) => Err(DISCONNECTED_MESSAGE.to_string()),
101+
Err(_) => {
102+
self.inner.pending.lock().await.remove(&id);
103+
Err(format!(
104+
"remote backend request timed out after {} seconds",
105+
REMOTE_REQUEST_TIMEOUT.as_secs()
106+
))
107+
}
108+
}
84109
}
85110
}
86111

src-tauri/src/remote_backend/orbit_ws_transport.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use super::transport::{
1515
};
1616

1717
pub(crate) struct OrbitWsTransport;
18+
const OUTBOUND_QUEUE_CAPACITY: usize = 512;
1819

1920
impl RemoteTransport for OrbitWsTransport {
2021
fn connect(&self, app: AppHandle, config: RemoteTransportConfig) -> TransportFuture {
@@ -29,7 +30,7 @@ impl RemoteTransport for OrbitWsTransport {
2930
.map_err(|err| format!("Failed to connect to Orbit relay at {ws_url}: {err}"))?;
3031
let (mut writer, mut reader) = stream.split();
3132

32-
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<String>();
33+
let (out_tx, mut out_rx) = mpsc::channel::<String>(OUTBOUND_QUEUE_CAPACITY);
3334
let pending = Arc::new(Mutex::new(PendingMap::new()));
3435
let pending_for_writer = Arc::clone(&pending);
3536
let pending_for_reader = Arc::clone(&pending);

src-tauri/src/remote_backend/transport.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::sync::{mpsc, oneshot, Mutex};
1212
use super::protocol::{parse_incoming_line, IncomingMessage, DISCONNECTED_MESSAGE};
1313

1414
pub(crate) type PendingMap = HashMap<u64, oneshot::Sender<Result<Value, String>>>;
15+
const OUTBOUND_QUEUE_CAPACITY: usize = 512;
1516

1617
#[derive(Clone, Debug)]
1718
pub(crate) enum RemoteTransportConfig {
@@ -48,7 +49,7 @@ impl RemoteTransportConfig {
4849
}
4950

5051
pub(crate) struct TransportConnection {
51-
pub(crate) out_tx: mpsc::UnboundedSender<String>,
52+
pub(crate) out_tx: mpsc::Sender<String>,
5253
pub(crate) pending: Arc<Mutex<PendingMap>>,
5354
pub(crate) connected: Arc<AtomicBool>,
5455
}
@@ -69,7 +70,7 @@ where
6970
R: AsyncRead + Unpin + Send + 'static,
7071
W: AsyncWrite + Unpin + Send + 'static,
7172
{
72-
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<String>();
73+
let (out_tx, mut out_rx) = mpsc::channel::<String>(OUTBOUND_QUEUE_CAPACITY);
7374
let pending = Arc::new(Mutex::new(PendingMap::new()));
7475
let pending_for_writer = Arc::clone(&pending);
7576
let pending_for_reader = Arc::clone(&pending);

src-tauri/src/shared/codex_aux_core.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,10 @@ where
352352

353353
let mut response_text = String::new();
354354
let collect_result = timeout(Duration::from_secs(60), async {
355-
while let Some(event) = rx.recv().await {
355+
loop {
356+
let Some(event) = rx.recv().await else {
357+
return Err("Background response stream closed before completion".to_string());
358+
};
356359
let method = event.get("method").and_then(|m| m.as_str()).unwrap_or("");
357360
match method {
358361
"item/agentMessage/delta" => {

src-tauri/src/terminal.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44

55
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
66
use serde::Serialize;
7-
use tauri::{AppHandle, State};
7+
use tauri::{AppHandle, Manager, State};
88
use tokio::sync::Mutex;
99

1010
use crate::backend::events::{EventSink, TerminalExit, TerminalOutput};
@@ -66,6 +66,8 @@ fn resolve_locale() -> String {
6666

6767
fn spawn_terminal_reader(
6868
event_sink: impl EventSink,
69+
app: AppHandle,
70+
session: Arc<TerminalSession>,
6971
workspace_id: String,
7072
terminal_id: String,
7173
mut reader: Box<dyn Read + Send>,
@@ -125,10 +127,24 @@ fn spawn_terminal_reader(
125127
Err(_) => break,
126128
}
127129
}
130+
let cleanup_workspace_id = workspace_id.clone();
131+
let cleanup_terminal_id = terminal_id.clone();
132+
let cleanup_session = Arc::clone(&session);
128133
event_sink.emit_terminal_exit(TerminalExit {
129134
workspace_id,
130135
terminal_id,
131136
});
137+
tauri::async_runtime::spawn(async move {
138+
let state = app.state::<AppState>();
139+
let mut sessions = state.terminal_sessions.lock().await;
140+
let key = terminal_key(&cleanup_workspace_id, &cleanup_terminal_id);
141+
let should_remove = sessions
142+
.get(&key)
143+
.is_some_and(|current| Arc::ptr_eq(current, &cleanup_session));
144+
if should_remove {
145+
sessions.remove(&key);
146+
}
147+
});
132148
});
133149
}
134150

@@ -219,10 +235,17 @@ pub(crate) async fn terminal_open(
219235
.await;
220236
return Ok(TerminalSessionInfo { id });
221237
}
222-
sessions.insert(key, session);
238+
sessions.insert(key, Arc::clone(&session));
223239
}
224-
let event_sink = TauriEventSink::new(app);
225-
spawn_terminal_reader(event_sink, workspace_id, terminal_id, reader);
240+
let event_sink = TauriEventSink::new(app.clone());
241+
spawn_terminal_reader(
242+
event_sink,
243+
app,
244+
Arc::clone(&session),
245+
workspace_id,
246+
terminal_id,
247+
reader,
248+
);
226249

227250
Ok(TerminalSessionInfo { id: session_id })
228251
}

src/utils/threadItems.test.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ describe("threadItems", () => {
2929
}
3030
});
3131

32-
it("preserves tool output for fileChange and commandExecution", () => {
33-
const output = "x".repeat(21000);
32+
it("truncates extremely large tool output for fileChange and commandExecution", () => {
33+
const output = "x".repeat(250000);
3434
const item: ConversationItem = {
3535
id: "tool-1",
3636
kind: "tool",
@@ -42,7 +42,9 @@ describe("threadItems", () => {
4242
const normalized = normalizeItem(item);
4343
expect(normalized.kind).toBe("tool");
4444
if (normalized.kind === "tool") {
45-
expect(normalized.output).toBe(output);
45+
expect(normalized.output).not.toBe(output);
46+
expect(normalized.output?.endsWith("...")).toBe(true);
47+
expect((normalized.output ?? "").length).toBeLessThan(output.length);
4648
}
4749
});
4850

src/utils/threadItems.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import type { ConversationItem } from "../types";
22

33
const MAX_ITEMS_PER_THREAD = 200;
44
const MAX_ITEM_TEXT = 20000;
5+
const MAX_LARGE_TOOL_TEXT = 200000;
56
const TOOL_OUTPUT_RECENT_ITEMS = 40;
6-
const NO_TRUNCATE_TOOL_TYPES = new Set(["fileChange", "commandExecution"]);
7+
const LARGE_TOOL_TYPES = new Set(["fileChange", "commandExecution"]);
78
const READ_COMMANDS = new Set(["cat", "sed", "head", "tail", "less", "more", "nl"]);
89
const LIST_COMMANDS = new Set(["ls", "tree", "find", "fd"]);
910
const SEARCH_COMMANDS = new Set(["rg", "grep", "ripgrep", "findstr"]);
@@ -50,6 +51,13 @@ function truncateText(text: string, maxLength = MAX_ITEM_TEXT) {
5051
return `${text.slice(0, sliceLength)}...`;
5152
}
5253

54+
function truncateToolText(toolType: string, text: string) {
55+
const maxLength = LARGE_TOOL_TYPES.has(toolType)
56+
? MAX_LARGE_TOOL_TEXT
57+
: MAX_ITEM_TEXT;
58+
return truncateText(text, maxLength);
59+
}
60+
5361
function normalizeStringList(value: unknown) {
5462
if (Array.isArray(value)) {
5563
return value.map((entry) => asString(entry)).filter(Boolean);
@@ -94,23 +102,19 @@ export function normalizeItem(item: ConversationItem): ConversationItem {
94102
return { ...item, diff: truncateText(item.diff) };
95103
}
96104
if (item.kind === "tool") {
97-
const isNoTruncateTool = NO_TRUNCATE_TOOL_TYPES.has(item.toolType);
98105
return {
99106
...item,
100107
title: truncateText(item.title, 200),
101108
detail: truncateText(item.detail, 2000),
102-
output: isNoTruncateTool
103-
? item.output
104-
: item.output
105-
? truncateText(item.output)
106-
: item.output,
109+
output: item.output
110+
? truncateToolText(item.toolType, item.output)
111+
: item.output,
107112
changes: item.changes
108113
? item.changes.map((change) => ({
109114
...change,
110-
diff:
111-
isNoTruncateTool || !change.diff
112-
? change.diff
113-
: truncateText(change.diff),
115+
diff: change.diff
116+
? truncateToolText(item.toolType, change.diff)
117+
: change.diff,
114118
}))
115119
: item.changes,
116120
};

0 commit comments

Comments
 (0)