Skip to content

Commit 363e43a

Browse files
authored
Merge pull request #40 from AxmeAI/feat/audit-dedup-20260407
fix: prevent duplicate sessions, audits, and stuck audit logs
2 parents 2d02fcc + 2d44cc7 commit 363e43a

5 files changed

Lines changed: 366 additions & 27 deletions

File tree

src/cli.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,25 @@ async function main() {
403403
process.stderr.write(
404404
`axme-code audit-session: workspace=${workspacePath} session=${sessionId} pid=${process.pid}\n`,
405405
);
406+
// Register signal handlers so SIGTERM/SIGINT (OOM killer, manual kill)
407+
// updates audit status before exit. SIGKILL is uncatchable - handled
408+
// by 15-minute stale timeout in runSessionCleanup.
409+
const signalCleanup = (signal: string) => {
410+
process.stderr.write(`axme-code audit-session: received ${signal}, cleaning up\n`);
411+
try {
412+
const { loadSession, writeSession } = require("./storage/sessions.js");
413+
const s = loadSession(workspacePath, sessionId);
414+
if (s && s.auditStatus === "pending") {
415+
s.auditStatus = "failed";
416+
s.lastAuditError = `killed by ${signal}`;
417+
s.auditFinishedAt = new Date().toISOString();
418+
writeSession(workspacePath, s);
419+
}
420+
} catch {}
421+
process.exit(1);
422+
};
423+
process.on("SIGTERM", () => signalCleanup("SIGTERM"));
424+
process.on("SIGINT", () => signalCleanup("SIGINT"));
406425
try {
407426
const { runSessionCleanup } = await import("./session-cleanup.js");
408427
const result = await runSessionCleanup(workspacePath, sessionId);

src/server.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import {
3131
clearLegacyPendingAuditsDir,
3232
readClaudeSessionMapping,
3333
isPidAlive,
34+
loadSession,
35+
closeSession,
3436
} from "./storage/sessions.js";
3537
import { logEvent } from "./storage/worklog.js";
3638
import { spawnDetachedAuditWorker } from "./audit-spawner.js";
@@ -138,10 +140,40 @@ async function cleanupAndExit(reason: string): Promise<void> {
138140
try {
139141
const mappings = listClaudeSessionMappings(defaultProjectPath);
140142
const owned = mappings.filter(m => m.ownerPpid === OWN_PPID);
143+
144+
// Deduplicate: group AXME sessions by Claude session ID.
145+
// Multiple AXME sessions can share the same Claude session (race condition
146+
// from parallel hooks). Only audit one per Claude session — the newest.
147+
const claudeToAxme = new Map<string, { axmeId: string; createdAt: number }[]>();
148+
for (const m of owned) {
149+
const session = loadSession(defaultProjectPath, m.axmeSessionId);
150+
if (!session) continue;
151+
for (const ref of session.claudeSessions ?? []) {
152+
const list = claudeToAxme.get(ref.id) ?? [];
153+
list.push({ axmeId: m.axmeSessionId, createdAt: Date.parse(session.createdAt) || 0 });
154+
claudeToAxme.set(ref.id, list);
155+
}
156+
}
157+
const toAudit = new Set<string>();
158+
const toSkip = new Set<string>();
159+
for (const [, entries] of claudeToAxme) {
160+
entries.sort((a, b) => b.createdAt - a.createdAt); // newest first
161+
toAudit.add(entries[0].axmeId);
162+
for (let i = 1; i < entries.length; i++) toSkip.add(entries[i].axmeId);
163+
}
164+
// Mark duplicates as done so they don't linger
165+
for (const skipId of toSkip) {
166+
try { closeSession(defaultProjectPath, skipId); } catch {}
167+
}
168+
141169
process.stderr.write(
142-
`AXME cleanup (${reason}): ${owned.length} owned session(s) of ${mappings.length} total — spawning detached audit workers\n`,
170+
`AXME cleanup (${reason}): ${owned.length} owned, ${toAudit.size} to audit, ${toSkip.size} deduped\n`,
143171
);
144172
for (const m of owned) {
173+
if (!toAudit.has(m.axmeSessionId)) {
174+
try { clearClaudeSessionMapping(defaultProjectPath, m.claudeSessionId); } catch {}
175+
continue;
176+
}
145177
try {
146178
spawnDetachedAuditWorker(defaultProjectPath, m.axmeSessionId);
147179
} catch (err) {

src/session-cleanup.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
MAX_AUDIT_ATTEMPTS,
3232
isRetryableError,
3333
RETRYABLE_MAX_ATTEMPTS,
34+
listSessions,
3435
type AuditLog,
3536
type AuditLogExtraction,
3637
type AuditLogResumeInfo,
@@ -230,6 +231,26 @@ export async function runSessionCleanup(
230231
session.auditAttempts = 0;
231232
}
232233

234+
// Dedup 2b: cross-session concurrent-audit protection.
235+
// Multiple AXME sessions can share the same Claude session ID (parallel hook
236+
// race condition). If another AXME session with the same Claude transcript
237+
// is already pending audit (not stale), skip to avoid duplicate LLM calls.
238+
const myClaudeIds = new Set((session.claudeSessions ?? []).map((c: any) => c.id));
239+
if (myClaudeIds.size > 0) {
240+
const allSessions = listSessions(workspacePath);
241+
for (const other of allSessions) {
242+
if (other.id === sessionId) continue;
243+
if (other.auditStatus !== "pending" || !other.auditStartedAt) continue;
244+
const startedMs = Date.parse(other.auditStartedAt);
245+
if (!Number.isFinite(startedMs) || Date.now() - startedMs > AUDIT_STALE_TIMEOUT_MS) continue;
246+
const otherClaudeIds = new Set((other.claudeSessions ?? []).map((c: any) => c.id));
247+
const overlap = [...myClaudeIds].some(id => otherClaudeIds.has(id));
248+
if (overlap) {
249+
return { ...base, skipped: "concurrent-audit" };
250+
}
251+
}
252+
}
253+
233254
// Dedup 3: retry cap. If the session already used up its audit attempts
234255
// and still has no auditedAt, do NOT retry — it either hit a deterministic
235256
// failure (too-large prompt, parser rejection) or a bug that needs manual
@@ -387,6 +408,7 @@ export async function runSessionCleanup(
387408
// Audit log failure is non-fatal.
388409
}
389410

411+
let auditLogFinalized = false;
390412
try {
391413
const { runSessionAudit } = await import("./agents/session-auditor.js");
392414

@@ -707,6 +729,7 @@ export async function runSessionCleanup(
707729
safetyDeduped: sDeduped,
708730
},
709731
});
732+
auditLogFinalized = true;
710733
}
711734
} catch (err) {
712735
// Audit failure is non-fatal for the caller (we still close the session),
@@ -721,6 +744,20 @@ export async function runSessionCleanup(
721744
durationMs: Date.now() - auditStartMs,
722745
error: err instanceof Error ? err.message : String(err),
723746
});
747+
auditLogFinalized = true;
748+
}
749+
} finally {
750+
// Safety net: if neither success nor catch finalized the audit log
751+
// (e.g., process received SIGTERM mid-LLM-call), mark it failed.
752+
if (!auditLogFinalized && auditLogPath) {
753+
try {
754+
updateAuditLog(auditLogPath, {
755+
phase: "failed",
756+
finishedAt: new Date().toISOString(),
757+
durationMs: Date.now() - auditStartMs,
758+
error: "audit worker terminated unexpectedly",
759+
});
760+
} catch {}
724761
}
725762
}
726763
}

src/storage/sessions.ts

Lines changed: 88 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
import { join, resolve } from "node:path";
20-
import { readdirSync, readFileSync, rmSync } from "node:fs";
20+
import { readdirSync, readFileSync, rmSync, openSync, closeSync, unlinkSync, statSync } from "node:fs";
2121
import { randomUUID } from "node:crypto";
2222
import { ensureDir, writeJson, readJson, pathExists, atomicWrite, removeFile, readSafe } from "./engine.js";
2323
import { logSessionStart } from "./worklog.js";
@@ -478,6 +478,54 @@ export function clearLegacyActiveSession(projectPath: string): void {
478478
removeFile(legacyActiveSessionPath(projectPath));
479479
}
480480

481+
/**
482+
* Filesystem lock for session creation to prevent parallel hooks from
483+
* creating duplicate AXME sessions for the same Claude session.
484+
*
485+
* Uses O_EXCL (atomic create-or-fail) as a cross-process mutex.
486+
* On contention: spin-wait up to 500ms, then re-read the mapping.
487+
* Stale lock (>5s) auto-cleaned to handle crashed lock holders.
488+
*/
489+
const LOCK_STALE_MS = 5_000;
490+
const LOCK_WAIT_MS = 500;
491+
const LOCK_POLL_MS = 50;
492+
493+
// Exported for testing
494+
export function sessionLockPath(projectPath: string, claudeSessionId: string): string {
495+
return join(activeSessionsDir(projectPath), `${claudeSessionId}.lock`);
496+
}
497+
498+
export function acquireLock(projectPath: string, claudeSessionId: string): boolean {
499+
const lp = sessionLockPath(projectPath, claudeSessionId);
500+
ensureDir(activeSessionsDir(projectPath));
501+
try {
502+
// Clean stale lock from crashed process
503+
try {
504+
const st = statSync(lp);
505+
if (Date.now() - st.mtimeMs > LOCK_STALE_MS) unlinkSync(lp);
506+
} catch {}
507+
const fd = openSync(lp, "wx");
508+
closeSync(fd);
509+
return true;
510+
} catch {
511+
return false; // EEXIST or other - graceful degradation
512+
}
513+
}
514+
515+
export function releaseLock(projectPath: string, claudeSessionId: string): void {
516+
try { unlinkSync(sessionLockPath(projectPath, claudeSessionId)); } catch {}
517+
}
518+
519+
function waitForLock(projectPath: string, claudeSessionId: string): boolean {
520+
const deadline = Date.now() + LOCK_WAIT_MS;
521+
while (Date.now() < deadline) {
522+
if (acquireLock(projectPath, claudeSessionId)) return true;
523+
const start = Date.now();
524+
while (Date.now() - start < LOCK_POLL_MS) { /* spin */ }
525+
}
526+
return false;
527+
}
528+
481529
/**
482530
* Ensure an AXME session exists for the given Claude session. Lazy-created
483531
* on the first hook call that knows its Claude session_id.
@@ -507,6 +555,7 @@ export function ensureAxmeSessionForClaude(
507555
* the existing session id instead of creating a fresh empty-tail session. */
508556
toolName?: string,
509557
): string {
558+
// Fast path: live mapping exists, just reuse it (no lock needed).
510559
const existing = readClaudeSessionMapping(projectPath, claudeSessionId);
511560
if (existing) {
512561
const existingSession = loadSession(projectPath, existing);
@@ -515,43 +564,56 @@ export function ensureAxmeSessionForClaude(
515564
existingSession.auditedAt != null ||
516565
(existingSession.pid != null && !isPidAlive(existingSession.pid));
517566
if (!isStale) {
518-
// Live mapping — attach transcript and reuse.
519567
attachClaudeSession(projectPath, existing, {
520568
id: claudeSessionId,
521569
transcriptPath,
522570
role: "main",
523571
});
524-
// Always refresh ownerPpid — after VS Code reload the Claude Code
525-
// PID changes but the old process may still be alive (different
526-
// window or zombie). The MCP server matches by ownerPpid === OWN_PPID,
527-
// so the mapping must point to the current Claude Code instance.
528572
writeClaudeSessionMapping(projectPath, claudeSessionId, existing);
529573
return existing;
530574
}
531-
// Read-only tools (Read/Glob/Grep) should not create fresh sessions from
532-
// stale mappings — that produces empty "tail" sessions with 0 extractions.
533-
// Return the stale id instead; the next mutation tool will create a fresh one.
575+
// Read-only tools should not create fresh sessions from stale mappings.
534576
const READ_ONLY_TOOLS = ["Read", "Glob", "Grep"];
535-
if (toolName && READ_ONLY_TOOLS.includes(toolName) && existing) {
577+
if (toolName && READ_ONLY_TOOLS.includes(toolName)) {
536578
return existing;
537579
}
538-
// Stale mapping: log once and fall through to create a fresh session,
539-
// which will overwrite the mapping file below.
540-
process.stderr.write(
541-
`AXME: stale mapping for Claude session ${claudeSessionId} → ` +
542-
`AXME ${existing} (audited=${existingSession?.auditedAt ?? "no"}, ` +
543-
`pid=${existingSession?.pid ?? "?"}). Creating fresh AXME session.\n`,
544-
);
545580
}
546-
const axmeSession = createSession(projectPath);
547-
try { logSessionStart(projectPath, axmeSession.id); } catch {}
548-
writeClaudeSessionMapping(projectPath, claudeSessionId, axmeSession.id);
549-
attachClaudeSession(projectPath, axmeSession.id, {
550-
id: claudeSessionId,
551-
transcriptPath,
552-
role: "main",
553-
});
554-
return axmeSession.id;
581+
582+
// Slow path: need to create a new session (stale mapping OR first time).
583+
// Acquire filesystem lock to prevent parallel hooks from each creating one.
584+
const gotLock = waitForLock(projectPath, claudeSessionId);
585+
try {
586+
// Re-check inside lock — another process may have won the race.
587+
// If a DIFFERENT mapping appeared (created by the lock winner), use it
588+
// unconditionally — the winner just created it, so it's fresh by definition.
589+
// Do NOT re-run stale checks here: the winner's process may have already
590+
// exited (test workers, short-lived hooks), making the pid look dead.
591+
const recheck = readClaudeSessionMapping(projectPath, claudeSessionId);
592+
if (recheck && recheck !== existing) {
593+
attachClaudeSession(projectPath, recheck, { id: claudeSessionId, transcriptPath, role: "main" });
594+
return recheck;
595+
}
596+
// We won the race (or lock timed out) — create fresh session.
597+
if (existing) {
598+
const existingSession = loadSession(projectPath, existing);
599+
process.stderr.write(
600+
`AXME: stale mapping for Claude session ${claudeSessionId} → ` +
601+
`AXME ${existing} (audited=${existingSession?.auditedAt ?? "no"}, ` +
602+
`pid=${existingSession?.pid ?? "?"}). Creating fresh AXME session.\n`,
603+
);
604+
}
605+
const axmeSession = createSession(projectPath);
606+
try { logSessionStart(projectPath, axmeSession.id); } catch {}
607+
writeClaudeSessionMapping(projectPath, claudeSessionId, axmeSession.id);
608+
attachClaudeSession(projectPath, axmeSession.id, {
609+
id: claudeSessionId,
610+
transcriptPath,
611+
role: "main",
612+
});
613+
return axmeSession.id;
614+
} finally {
615+
if (gotLock) releaseLock(projectPath, claudeSessionId);
616+
}
555617
}
556618

557619
// --- Legacy single-file API (DEPRECATED, kept for backward compatibility) ---

0 commit comments

Comments
 (0)