Skip to content

Commit 8e5120d

Browse files
committed
fix(tail): reap closed follow-sessions instead of leaking them
The module-level sessions Map dropped an entry only on an explicit ssh_tail_stop. A follow-start whose stream closed (remote ended, connection dropped) but was never explicitly stopped left its state in the Map for the server's lifetime. The 1 MB ring buffer bounds per-session memory, but closed sessions were never reaped. Track readOffset (highest delivered offset) and closedAt per session. reapClosedSessions() drops a closed session once its buffer is fully read, or once it has been closed past a grace period even if unread (the stream is dead). It runs on every follow-start and follow-read. handleSshTailRead's opening sweep excludes the session it is about to serve so the caller still gets one final closed:true read; that session is then reaped at the end of the call. Tests: a closed + fully-read session is reaped on the next read with no explicit stop (fails without the fix), a closed session past the grace window is reaped even if never read, and an open session survives any sweep.
1 parent 55e41fa commit 8e5120d

2 files changed

Lines changed: 124 additions & 2 deletions

File tree

src/tools/tail-tools.js

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const DEFAULT_LINES = 10;
2121
const DEFAULT_TIMEOUT_MS = 15_000;
2222
const DEFAULT_MAX_LEN = 10_000;
2323
const RING_BUFFER_CAP = 1_000_000;
24+
// Grace period a closed-but-unread session lingers before a sweep reaps it.
25+
const CLOSED_SESSION_GRACE_MS = 5 * 60_000;
2426

2527
/** Session registry -- module-level Map so tools across calls share it. */
2628
const sessions = new Map();
@@ -37,13 +39,41 @@ function rememberStopped(id) {
3739
}
3840
}
3941

42+
/**
43+
* Reap closed follow-sessions so a stream that ended without an explicit
44+
* ssh_tail_stop never lingers for the server's lifetime. A closed session
45+
* drops when its buffer is fully read, or when it closed past the grace
46+
* period (stream dead -- a stale buffer helps nobody).
47+
*
48+
* `excludeId` is left intact even if reapable: handleSshTailRead passes the
49+
* session it is mid-serving so the caller still gets one final closed:true
50+
* read before that session is dropped at the end of the call.
51+
*/
52+
function reapClosedSessions(now = Date.now(), excludeId = null) {
53+
for (const [id, st] of sessions) {
54+
if (id === excludeId) continue;
55+
if (!st.closed) continue;
56+
const fullyRead = st.readOffset >= st.totalBytes;
57+
const expired = st.closedAt != null && (now - st.closedAt) >= CLOSED_SESSION_GRACE_MS;
58+
if (fullyRead || expired) {
59+
sessions.delete(id);
60+
rememberStopped(id);
61+
}
62+
}
63+
}
64+
4065
/** Exposed for tests to introspect internal state. */
4166
export function _sessionsForTest() {
4267
return sessions;
4368
}
4469
export function _stoppedIdsForTest() {
4570
return stoppedIds;
4671
}
72+
/** Exposed for tests: drive the closed-session sweep directly. */
73+
export function _reapClosedSessionsForTest(now) {
74+
return reapClosedSessions(now);
75+
}
76+
export const _CLOSED_SESSION_GRACE_MS = CLOSED_SESSION_GRACE_MS;
4777

4878
/** Coerce numeric arg to a positive integer, with safe fallback. */
4979
function safeLines(n, fallback = DEFAULT_LINES) {
@@ -151,7 +181,9 @@ export async function handleSshTailStart({ getConnection, args }) {
151181
createdAt: Date.now(),
152182
buffer: '',
153183
totalBytes: 0, // lifetime bytes appended (monotonic, pre-truncation)
184+
readOffset: 0, // highest offset a follow-read has delivered to a caller
154185
closed: false,
186+
closedAt: null, // ms timestamp the stream closed (null while open)
155187
stream,
156188
};
157189

@@ -164,11 +196,17 @@ export async function handleSshTailStart({ getConnection, args }) {
164196
}
165197
}
166198

199+
function markClosed() {
200+
if (state.closed) return;
201+
state.closed = true;
202+
state.closedAt = Date.now();
203+
}
167204
stream.on('data', (d) => append(stripAnsi(d.toString('utf8'))));
168205
stream.stderr && stream.stderr.on('data', (d) => append(stripAnsi(d.toString('utf8'))));
169-
stream.on('close', () => { state.closed = true; });
170-
stream.on('error', () => { state.closed = true; });
206+
stream.on('close', markClosed);
207+
stream.on('error', markClosed);
171208

209+
reapClosedSessions(); // sweep stale sessions before adding a new one
172210
sessions.set(id, state);
173211
resolve(state);
174212
});
@@ -198,6 +236,10 @@ export async function handleSshTailStart({ getConnection, args }) {
198236
export async function handleSshTailRead({ args }) {
199237
const { session_id, since_offset, format = 'markdown' } = args || {};
200238

239+
// Sweep other closed/dead sessions; never the one we are about to serve --
240+
// it gets one final closed:true read, then is reaped at the end of the call.
241+
reapClosedSessions(Date.now(), session_id);
242+
201243
const state = sessions.get(session_id);
202244
if (!state) {
203245
return toMcp(fail('ssh_tail_read', `unknown session_id: ${session_id}`), { format });
@@ -241,6 +283,12 @@ export async function handleSshTailRead({ args }) {
241283
elided_bytes: elided,
242284
};
243285

286+
// Caller has now seen everything up to `total`. Advance the read watermark
287+
// and reap: a closed + fully-read session drops here -- this very response
288+
// already carried closed:true, so the caller knows the stream ended.
289+
state.readOffset = Math.max(state.readOffset, total);
290+
reapClosedSessions();
291+
244292
return toMcp(
245293
ok('ssh_tail_read', data, { server: state.server }),
246294
{ format }

tests/test-tail-tools.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414
buildTailCommand,
1515
_sessionsForTest,
1616
_stoppedIdsForTest,
17+
_reapClosedSessionsForTest,
18+
_CLOSED_SESSION_GRACE_MS,
1719
} from '../src/tools/tail-tools.js';
1820
import { unwrapTimeout } from './util-timeout-unwrap.js';
1921

@@ -372,6 +374,78 @@ await test('handleSshTailStop: unknown id (never seen) -> structured fail', asyn
372374
assert.strictEqual(r.isError, true);
373375
});
374376

377+
// --------------------------------------------------------------------------
378+
// Closed-session reaping -- a follow-start whose stream closes but is never
379+
// explicitly ssh_tail_stop'd must not linger in the registry forever.
380+
// --------------------------------------------------------------------------
381+
await test('closed + fully-read session is reaped on the next read (no explicit stop)', async () => {
382+
const client = new FollowClient();
383+
const started = await handleSshTailStart({
384+
getConnection: async () => client,
385+
args: { server: 's', file: '/f', format: 'json' },
386+
});
387+
const { session_id } = JSON.parse(started.content[0].text).data;
388+
assert(_sessionsForTest().has(session_id), 'session registered');
389+
390+
client.feed('done\n');
391+
await sleep(5);
392+
// First read consumes the whole buffer.
393+
const r1 = JSON.parse((await handleSshTailRead({ args: { session_id, format: 'json' } })).content[0].text);
394+
assert.strictEqual(r1.data.chunk, 'done\n');
395+
assert(_sessionsForTest().has(session_id), 'still alive while stream open');
396+
397+
// Stream closes (remote ended) -- no ssh_tail_stop is ever issued.
398+
client.lastStream().emit('close', 0);
399+
await sleep(5);
400+
401+
// Next read sees closed:true AND reaps the now-dead, fully-read session.
402+
const r2 = JSON.parse((await handleSshTailRead({ args: { session_id, format: 'json' } })).content[0].text);
403+
assert.strictEqual(r2.data.closed, true, 'caller is told the stream closed');
404+
assert.strictEqual(_sessionsForTest().has(session_id), false,
405+
'closed + fully-read session must be reaped, not leaked');
406+
assert(_stoppedIdsForTest().has(session_id),
407+
'reaped id is remembered so a later stop is a clean no-op');
408+
});
409+
410+
await test('closed session past the grace period is reaped by the sweep even if unread', async () => {
411+
const client = new FollowClient();
412+
const started = await handleSshTailStart({
413+
getConnection: async () => client,
414+
args: { server: 's', file: '/f', format: 'json' },
415+
});
416+
const { session_id } = JSON.parse(started.content[0].text).data;
417+
418+
// Stream closes with buffered-but-never-read data.
419+
client.feed('unread bytes\n');
420+
client.lastStream().emit('close', 0);
421+
await sleep(5);
422+
assert(_sessionsForTest().has(session_id), 'closed-but-recent session still lingers');
423+
424+
// A fresh sweep does nothing yet (within grace).
425+
_reapClosedSessionsForTest(Date.now());
426+
assert(_sessionsForTest().has(session_id), 'not reaped within grace window');
427+
428+
// Backdate closedAt past the grace period; the sweep now reaps it.
429+
_sessionsForTest().get(session_id).closedAt =
430+
Date.now() - _CLOSED_SESSION_GRACE_MS - 1000;
431+
_reapClosedSessionsForTest(Date.now());
432+
assert.strictEqual(_sessionsForTest().has(session_id), false,
433+
'closed session past grace must be reaped even though never read');
434+
});
435+
436+
await test('an OPEN session is never reaped, no matter how old', async () => {
437+
const client = new FollowClient();
438+
const started = await handleSshTailStart({
439+
getConnection: async () => client,
440+
args: { server: 's', file: '/f', format: 'json' },
441+
});
442+
const { session_id } = JSON.parse(started.content[0].text).data;
443+
// Stream stays open. Sweep with a far-future clock.
444+
_reapClosedSessionsForTest(Date.now() + _CLOSED_SESSION_GRACE_MS * 100);
445+
assert(_sessionsForTest().has(session_id), 'open session must survive any sweep');
446+
await handleSshTailStop({ args: { session_id } });
447+
});
448+
375449
// --------------------------------------------------------------------------
376450
// Summary
377451
// --------------------------------------------------------------------------

0 commit comments

Comments
 (0)