Skip to content

Commit f376615

Browse files
fix(telegram): persist topic map + report channel health (#3169, #3168)
PR #3171 β€” approved by aegis-gh-agent[bot] (Argus) Topic map persisted to disk (atomic rename). Health endpoint reports delivery state. Fixes silent event delivery failure.
1 parent 9e7e163 commit f376615

2 files changed

Lines changed: 140 additions & 1 deletion

File tree

β€Žsrc/channels/manager.tsβ€Ž

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ export class ChannelManager {
123123
if (ch.filter && !ch.filter(payload.event)) { span.end(); return; }
124124
await call(ch);
125125
// Success β€” reset failure count (channel may have been in cooldown)
126+
const prevHealth = this.health.get(ch.name);
127+
if (prevHealth && prevHealth.failCount > 0) {
128+
console.log(`Channel ${ch.name} recovered after ${prevHealth.failCount} failures`);
129+
}
126130
this.health.set(ch.name, { failCount: 0, disabledUntil: 0 });
127131
spanOk(span);
128132
} catch (e) {

β€Žsrc/channels/telegram.tsβ€Ž

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
import type {
1111
Channel,
12+
ChannelHealthStatus,
1213
SessionEventPayload,
1314
InboundHandler,
1415
} from './types.js';
1516
import { homedir } from 'node:os';
17+
import { join } from 'node:path';
18+
import { readFileSync, writeFileSync, mkdirSync, existsSync, renameSync, unlinkSync } from 'node:fs';
1619

1720
import {
1821
esc,
@@ -593,6 +596,83 @@ function formatProgressCard(progress: SessionProgress): string {
593596
return parts.join('\n');
594597
}
595598

599+
600+
// ── Topic Map Persistence ──────────────────────────────────────────────────
601+
602+
/**
603+
* Persists the session→topic mapping to disk so it survives server restarts.
604+
* Issue #3168: Without persistence, the topic map is lost on every restart,
605+
* causing all event deliveries to silently fail (no topic to send to).
606+
*/
607+
class TopicPersistence {
608+
private readonly filePath: string;
609+
610+
constructor(stateDir?: string) {
611+
const dir = stateDir ?? join(homedir(), '.aegis');
612+
this.filePath = join(dir, 'telegram-topics.json');
613+
}
614+
615+
save(topics: Map<string, SessionTopic>): void {
616+
const entries: Array<{
617+
sessionId: string;
618+
topicId: number;
619+
displayName: string;
620+
endedAt: number | null;
621+
}> = [];
622+
for (const [, t] of topics) {
623+
// Don't persist topics that are being deleted
624+
if (t.deleting) continue;
625+
entries.push({
626+
sessionId: t.sessionId,
627+
topicId: t.topicId,
628+
displayName: t.displayName,
629+
endedAt: t.endedAt,
630+
});
631+
}
632+
try {
633+
const dir = join(this.filePath, '..');
634+
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
635+
const tmp = this.filePath + '.tmp';
636+
writeFileSync(tmp, JSON.stringify({ version: 1, topics: entries }), 'utf-8');
637+
// Atomic rename
638+
renameSync(tmp, this.filePath);
639+
} catch (e) {
640+
console.error('Telegram: failed to persist topic map:', e);
641+
}
642+
}
643+
644+
load(): Map<string, SessionTopic> {
645+
const result = new Map<string, SessionTopic>();
646+
try {
647+
if (!existsSync(this.filePath)) return result;
648+
const data = JSON.parse(readFileSync(this.filePath, 'utf-8'));
649+
if (!data?.topics || !Array.isArray(data.topics)) return result;
650+
for (const t of data.topics) {
651+
if (!t.sessionId || !t.topicId) continue;
652+
result.set(t.sessionId, {
653+
sessionId: t.sessionId,
654+
topicId: t.topicId,
655+
displayName: t.displayName || t.sessionId.slice(0, 8),
656+
endedAt: t.endedAt ?? null,
657+
cleanupScheduledAt: null,
658+
deleting: false,
659+
});
660+
}
661+
} catch (e) {
662+
console.error('Telegram: failed to load topic map:', e);
663+
}
664+
return result;
665+
}
666+
667+
clear(): void {
668+
try {
669+
if (existsSync(this.filePath)) {
670+
unlinkSync(this.filePath);
671+
}
672+
} catch { /* non-critical */ }
673+
}
674+
}
675+
596676
// ── Telegram Channel ────────────────────────────────────────────────────────
597677

598678
export class TelegramChannel implements Channel {
@@ -633,12 +713,27 @@ export class TelegramChannel implements Channel {
633713
private lastUserMessage = new Map<string, string>();
634714

635715

636-
constructor(private config: TelegramChannelConfig) {
716+
// Issue #3169: Health tracking for getHealth()
717+
private lastSuccessAt: number | null = null;
718+
private lastErrorAt: number | null = null;
719+
private lastErrorMessage: string | null = null;
720+
private deliveryFailCount = 0;
721+
722+
// Issue #3168: Topic persistence
723+
private readonly topicPersistence: TopicPersistence;
724+
725+
constructor(private config: TelegramChannelConfig, stateDir?: string) {
637726
const configuredTtlMs = config.topicTtlMs ?? TelegramChannel.DEFAULT_TOPIC_TTL_MS;
638727
this.topicTtlMs = Number.isFinite(configuredTtlMs)
639728
? Math.max(0, configuredTtlMs)
640729
: TelegramChannel.DEFAULT_TOPIC_TTL_MS;
641730
this.topicAutoDelete = config.topicAutoDelete ?? true;
731+
this.topicPersistence = new TopicPersistence(stateDir);
732+
// Restore persisted topics on construction
733+
this.topics = this.topicPersistence.load();
734+
if (this.topics.size > 0) {
735+
console.log(`Telegram: restored ${this.topics.size} topic mappings from disk`);
736+
}
642737
}
643738

644739
/** Call Telegram Bot API with retry on 429. Instance method so it can access rateLimitUntil. */
@@ -759,6 +854,9 @@ export class TelegramChannel implements Channel {
759854
formatSessionCreated(payload.session.name, payload.session.workDir, payload.session.id, payload.meta),
760855
);
761856

857+
this.trackSuccess();
858+
this.topicPersistence.save(this.topics);
859+
762860
// Issue #46: Replay any messages that arrived before topic was created
763861
const buffered = this.preTopicBuffer.get(payload.session.id);
764862
if (buffered && buffered.length > 0) {
@@ -1130,6 +1228,7 @@ export class TelegramChannel implements Channel {
11301228
try {
11311229
const result = (await this.tgApi('sendMessage', body)) as { message_id: number };
11321230
this.lastSent.set(sessionId, Date.now());
1231+
this.trackSuccess();
11331232
return result.message_id;
11341233
} catch {
11351234
// Fallback: strip HTML + buttons, send plain
@@ -1146,9 +1245,11 @@ export class TelegramChannel implements Channel {
11461245
disable_web_page_preview: true,
11471246
})) as { message_id: number };
11481247
this.lastSent.set(sessionId, Date.now());
1248+
this.trackSuccess();
11491249
return result.message_id;
11501250
} catch (e) {
11511251
console.error(`Telegram: failed to send styled to topic ${topic.topicId}:`, this.redactError(e));
1252+
this.trackFailure(e);
11521253
return null;
11531254
}
11541255
}
@@ -1236,6 +1337,7 @@ export class TelegramChannel implements Channel {
12361337
})) as { message_id: number };
12371338
this.lastSent.set(sessionId, Date.now());
12381339
this.decrementInFlight(sessionId);
1340+
this.trackSuccess();
12391341
return result.message_id;
12401342
} catch {
12411343
// Fallback: strip HTML, send plain
@@ -1253,10 +1355,12 @@ export class TelegramChannel implements Channel {
12531355
})) as { message_id: number };
12541356
this.lastSent.set(sessionId, Date.now());
12551357
this.decrementInFlight(sessionId);
1358+
this.trackSuccess();
12561359
return result.message_id;
12571360
} catch (e) {
12581361
console.error(`Telegram: failed to send to topic ${topic.topicId}:`, this.redactError(e));
12591362
this.decrementInFlight(sessionId);
1363+
this.trackFailure(e);
12601364
return null;
12611365
}
12621366
}
@@ -1440,9 +1544,11 @@ export class TelegramChannel implements Channel {
14401544

14411545
await this.tgApi('deleteForumTopic', body);
14421546
this.topics.delete(sessionId);
1547+
this.topicPersistence.save(this.topics);
14431548
} catch (e) {
14441549
if (this.isIgnorableTopicDeleteError(e)) {
14451550
this.topics.delete(sessionId);
1551+
this.topicPersistence.save(this.topics);
14461552
} else {
14471553
console.error(`Telegram: failed to cleanup topic for session ${sessionId}:`, this.redactError(e));
14481554
topic.deleting = false;
@@ -1464,6 +1570,35 @@ export class TelegramChannel implements Channel {
14641570
}
14651571

14661572

1573+
1574+
// ── Health Reporting ──────────────────────────────────────────────────────
1575+
1576+
/** Issue #3169: Report actual channel health including delivery state. */
1577+
getHealth(): ChannelHealthStatus {
1578+
return {
1579+
channel: this.name,
1580+
healthy: this.lastErrorAt === null || (this.lastSuccessAt !== null && this.lastSuccessAt > this.lastErrorAt),
1581+
lastSuccess: this.lastSuccessAt,
1582+
lastError: this.lastErrorMessage,
1583+
pendingCount: this.messageQueue.size,
1584+
};
1585+
}
1586+
1587+
/** Track successful delivery for health reporting. */
1588+
private trackSuccess(): void {
1589+
this.lastSuccessAt = Date.now();
1590+
this.deliveryFailCount = 0;
1591+
}
1592+
1593+
/** Track delivery failure for health reporting. */
1594+
private trackFailure(error: unknown): void {
1595+
this.lastErrorAt = Date.now();
1596+
this.lastErrorMessage = this.redactError(error) instanceof Error
1597+
? (this.redactError(error) as Error).message
1598+
: String(this.redactError(error));
1599+
this.deliveryFailCount++;
1600+
}
1601+
14671602
// ── Bidirectional polling ─────────────────────────────────────────────────
14681603

14691604
private async pollLoop(): Promise<void> {

0 commit comments

Comments
Β (0)