Skip to content

Commit 3d69076

Browse files
authored
Merge PR #472: Activity feed load from disk
Activity feed: load recent history from disk
2 parents 16c4f16 + 235d864 commit 3d69076

2 files changed

Lines changed: 85 additions & 4 deletions

File tree

server/activityFeedService.js

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const logger = winston.createLogger({
1919
const DEFAULT_DIR = path.join(os.homedir(), '.orchestrator');
2020
const DEFAULT_FILE = path.join(DEFAULT_DIR, 'activity.jsonl');
2121
const DEFAULT_MAX_EVENTS = 500;
22+
const DEFAULT_LOAD_MAX_BYTES = 1024 * 1024; // 1MB tail read
2223

2324
function safeJsonLine(obj) {
2425
try {
@@ -28,12 +29,34 @@ function safeJsonLine(obj) {
2829
}
2930
}
3031

32+
function safeJsonParse(line) {
33+
try {
34+
return JSON.parse(line);
35+
} catch {
36+
return null;
37+
}
38+
}
39+
3140
function clampInt(n, { min, max, fallback }) {
3241
const v = Number.parseInt(String(n || ''), 10);
3342
if (!Number.isFinite(v)) return fallback;
3443
return Math.max(min, Math.min(max, v));
3544
}
3645

46+
function dedupeByIdKeepLast(items) {
47+
const seen = new Set();
48+
const out = [];
49+
for (let i = items.length - 1; i >= 0; i -= 1) {
50+
const ev = items[i];
51+
const id = ev?.id;
52+
if (!id || seen.has(id)) continue;
53+
seen.add(id);
54+
out.push(ev);
55+
}
56+
out.reverse();
57+
return out;
58+
}
59+
3760
class ActivityFeedService {
3861
constructor({ filePath = DEFAULT_FILE, maxEvents = DEFAULT_MAX_EVENTS } = {}) {
3962
this.filePath = filePath;
@@ -59,13 +82,55 @@ class ActivityFeedService {
5982
this._loaded = true;
6083
try {
6184
await fs.mkdir(path.dirname(this.filePath), { recursive: true });
62-
// Do not load entire history into memory; keep v1 in-memory only.
63-
// Existence is enough to ensure appends won't throw due to missing dir.
85+
await this.loadRecentFromDisk();
6486
} catch (error) {
6587
logger.warn('Failed to ensure activity dir exists', { error: error.message });
6688
}
6789
}
6890

91+
async loadRecentFromDisk({ maxBytes = DEFAULT_LOAD_MAX_BYTES } = {}) {
92+
const byteBudget = clampInt(maxBytes, { min: 16 * 1024, max: 10 * 1024 * 1024, fallback: DEFAULT_LOAD_MAX_BYTES });
93+
try {
94+
const stat = await fs.stat(this.filePath);
95+
const size = Number(stat.size) || 0;
96+
if (!size) return;
97+
98+
const start = Math.max(0, size - byteBudget);
99+
const length = Math.max(0, size - start);
100+
if (!length) return;
101+
102+
const handle = await fs.open(this.filePath, 'r');
103+
try {
104+
const buf = Buffer.alloc(length);
105+
const { bytesRead } = await handle.read(buf, 0, length, start);
106+
const text = buf.toString('utf8', 0, bytesRead);
107+
let lines = text.split('\n').filter(Boolean);
108+
// If we started mid-file, first line may be partial; drop it.
109+
if (start > 0 && lines.length > 0) {
110+
lines = lines.slice(1);
111+
}
112+
113+
const parsed = [];
114+
for (const line of lines) {
115+
const ev = safeJsonParse(line);
116+
if (!ev || typeof ev !== 'object') continue;
117+
if (!ev.id || !ev.kind || !ev.ts) continue;
118+
parsed.push(ev);
119+
}
120+
121+
const merged = dedupeByIdKeepLast([...parsed, ...this.events]);
122+
this.events = merged.slice(-this.maxEvents);
123+
} finally {
124+
await handle.close();
125+
}
126+
} catch (error) {
127+
// If the file doesn't exist, that's fine. Any other errors should be logged but not fatal.
128+
if (String(error?.code || '') !== 'ENOENT') {
129+
logger.warn('Failed to load activity history from disk', { error: error.message });
130+
}
131+
}
132+
}
133+
69134
list({ since = 0, limit = 200 } = {}) {
70135
const sinceMs = Number.isFinite(Number(since)) ? Number(since) : 0;
71136
const lim = clampInt(limit, { min: 1, max: 1000, fallback: 200 });
@@ -114,4 +179,3 @@ class ActivityFeedService {
114179
}
115180

116181
module.exports = { ActivityFeedService };
117-

tests/unit/activityFeedService.test.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,24 @@ describe('ActivityFeedService', () => {
3333
nowSpy.mockRestore();
3434
});
3535

36+
test('ensureLoaded() loads recent events from existing JSONL file', async () => {
37+
const filePath = nextTmpFile('activity.jsonl');
38+
await fs.mkdir(path.dirname(filePath), { recursive: true });
39+
await fs.writeFile(filePath, [
40+
JSON.stringify({ id: 'a', ts: 1, kind: 'k1', data: { n: 1 } }),
41+
JSON.stringify({ id: 'b', ts: 2, kind: 'k2', data: { n: 2 } })
42+
].join('\n') + '\n', 'utf8');
43+
44+
const svc = new ActivityFeedService({ filePath, maxEvents: 10 });
45+
expect(svc.list({ limit: 10 })).toHaveLength(0);
46+
47+
await svc.ensureLoaded();
48+
const list = svc.list({ limit: 10 });
49+
expect(list).toHaveLength(2);
50+
expect(list[0].id).toBe('b');
51+
expect(list[1].id).toBe('a');
52+
});
53+
3654
test('track() caps in-memory events to maxEvents', () => {
3755
const svc = new ActivityFeedService({ filePath: nextTmpFile('activity.jsonl'), maxEvents: 2 });
3856
const nowSpy = jest.spyOn(Date, 'now');
@@ -74,4 +92,3 @@ describe('ActivityFeedService', () => {
7492
nowSpy.mockRestore();
7593
});
7694
});
77-

0 commit comments

Comments
 (0)