Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions src/__tests__/fix-3366-acp-persist-cascade.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Regression tests for #3366: ACP local storage persist() failure cascade.
*
* Bug: FileAcpLocalStorageProfile.persist() chains writes onto writeChain
* without error recovery. A single failed write poisons all subsequent writes
* because every .then() chains onto the rejected promise.
*
* Fix: persist() now catches errors, resets the chain, and logs the failure.
* Subsequent writes get a fresh attempt.
*/
import { readFile } from 'node:fs/promises';
import fs from 'node:fs';
import path from 'node:path';
import os from 'node:os';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { createFileAcpLocalStorageProfile } from '../services/acp/local-storage.js';

function makeSession(id: string) {
const now = Date.now();
return {
id,
conversationId: `conv-${id}`,
transcriptId: `transcript-${id}`,
tenantId: 'test-tenant',
ownerKeyId: 'test-key',
runnerType: 'acp' as const,
status: 'initializing' as const,
createdAt: now,
updatedAt: now,
metadata: {},
};
}

describe('Issue #3366: ACP local storage persist() failure cascade', () => {
let tmpDir: string;
let filePath: string;

beforeEach(async () => {
tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'aegis-3366-'));
filePath = path.join(tmpDir, 'acp-local-storage.json');
});

afterEach(async () => {
// Restore permissions so cleanup can succeed
await fs.promises.chmod(tmpDir, 0o755).catch(() => {});
await fs.promises.rm(tmpDir, { recursive: true, force: true }).catch(() => {});
});

it('recovers from a failed persist — subsequent writes succeed', async () => {
const profile = createFileAcpLocalStorageProfile({ filePath });
await profile.start();
expect(profile.getPersistError()).toBeNull();

// Make the directory read-only to force a write failure
await fs.promises.chmod(tmpDir, 0o444);

// Trigger persist via a mutation — should fail silently
await profile.sessionStore.create(makeSession('fail-1'));
expect(profile.getPersistError()).not.toBeNull();

// Restore write permissions
await fs.promises.chmod(tmpDir, 0o755);

// Trigger another mutation — this should SUCCEED (not cascade!)
await profile.sessionStore.create(makeSession('recover-1'));
expect(profile.getPersistError()).toBeNull();

// Verify the file actually contains the data
const content = await readFile(filePath, 'utf8');
const parsed = JSON.parse(content);
expect(parsed.sessions).toHaveLength(2);

await profile.stop();
});

it('does NOT cascade rejection across multiple sequential failures', async () => {
const profile = createFileAcpLocalStorageProfile({ filePath });
await profile.start();

// Make dir read-only
await fs.promises.chmod(tmpDir, 0o444);

// First failed persist
await profile.sessionStore.create(makeSession('fail-1'));
expect(profile.getPersistError()).not.toBeNull();

// Second failed persist — should NOT be chained to rejected promise
await profile.sessionStore.create(makeSession('fail-2'));
expect(profile.getPersistError()).not.toBeNull();

// Third failed persist — still no cascade
await profile.sessionStore.create(makeSession('fail-3'));
expect(profile.getPersistError()).not.toBeNull();

// Now restore and verify recovery
await fs.promises.chmod(tmpDir, 0o755);
await profile.sessionStore.create(makeSession('recover'));
expect(profile.getPersistError()).toBeNull();

const content = await readFile(filePath, 'utf8');
const parsed = JSON.parse(content);
expect(parsed.sessions).toHaveLength(4);

await profile.stop();
});

it('stop() does not throw even when writeChain has a rejection', async () => {
const profile = createFileAcpLocalStorageProfile({ filePath });
await profile.start();

// Make dir read-only so persist fails
await fs.promises.chmod(tmpDir, 0o444);
await profile.sessionStore.create(makeSession('doomed'));

// stop() should NOT throw — it catches the chain rejection
await fs.promises.chmod(tmpDir, 0o755);
await expect(profile.stop()).resolves.toBeUndefined();
});
});
52 changes: 46 additions & 6 deletions src/services/acp/local-storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { mkdir, readFile, writeFile, rename } from 'node:fs/promises';
import { mkdir, readFile, writeFile, rename, unlink } from 'node:fs/promises';
import path from 'node:path';

import { logger } from '../../logger.js';
import type { ServiceHealth } from '../../container.js';
import {
AcpDurableIdentityError,
Expand Down Expand Up @@ -48,6 +49,7 @@ export interface AcpLocalStorageProfile {
start(): Promise<void>;
stop(signal?: AbortSignal): Promise<void>;
health(): Promise<ServiceHealth>;
getPersistError(): Error | null;
}

export interface FileAcpLocalStorageProfileConfig {
Expand Down Expand Up @@ -101,12 +103,17 @@ export class MemoryAcpLocalStorageProfile implements AcpLocalStorageProfile {
async health(): Promise<ServiceHealth> {
return { healthy: true, details: 'memory ACP local storage profile ok' };
}

getPersistError(): Error | null {
return null;
}
}

export class FileAcpLocalStorageProfile implements AcpLocalStorageProfile {
private state = createEmptyState();
private started = false;
private writeChain: Promise<void> = Promise.resolve();
private persistError: Error | null = null;
private readonly memorySessionStore: MemoryAcpSessionStore;
private readonly memoryEventStore: MemoryAcpEventStore;
private readonly memoryActionQueue: MemoryAcpActionQueue;
Expand Down Expand Up @@ -143,7 +150,8 @@ export class FileAcpLocalStorageProfile implements AcpLocalStorageProfile {

async stop(_signal?: AbortSignal): Promise<void> {
if (!this.started) return;
await this.writeChain;
// Best-effort final persist — swallow errors so shutdown completes
await this.writeChain.catch(() => {});
this.started = false;
}

Expand All @@ -156,16 +164,48 @@ export class FileAcpLocalStorageProfile implements AcpLocalStorageProfile {

private async persist(): Promise<void> {
if (!this.started) {
throw new Error('FileAcpLocalStorageProfile: start() must be called before use');
throw new Error('FileAcpLocalStorageProfile: persist() called before start()');
}
// Issue #3045: atomic write to prevent truncation on SIGTERM/OOM kill
// Issue #3366: error recovery — a failed write must not poison subsequent writes
const content = `${JSON.stringify(serializeState(this.state), null, 2)}\n`;
const tmpFile = `${this.config.filePath}.tmp.${process.pid}`;
this.writeChain = this.writeChain.then(
() => writeFile(tmpFile, content, 'utf8').then(() => rename(tmpFile, this.config.filePath)),
);

const prevChain = this.writeChain;
this.writeChain = prevChain
.then(
// Previous write succeeded — do this write
() => writeFile(tmpFile, content, 'utf8').then(() => rename(tmpFile, this.config.filePath)),
// Previous write failed — still attempt this write
() => writeFile(tmpFile, content, 'utf8').then(() => rename(tmpFile, this.config.filePath)),
)
.then(() => {
this.persistError = null;
})
.catch((err: Error) => {
this.persistError = err;
logger.error({
component: 'acp-local-storage',
operation: 'persist',
errorCode: 'PERSIST_FAILED',
attributes: { error: err.message, filePath: this.config.filePath },
});
// Clean up stale tmp file if it exists
unlink(tmpFile).catch(() => {});
// Reset chain so next persist() is not chained to a rejected promise
this.writeChain = Promise.resolve();
});

await this.writeChain;
}

/**
* Returns the last persist error, or null if all writes succeeded.
* Useful for diagnostics and health checks.
*/
getPersistError(): Error | null {
return this.persistError;
}
}

export class MemoryAcpSessionStore implements AcpSessionStore {
Expand Down
Loading