Skip to content

Commit 997ac07

Browse files
authored
fix(opencode-plugin): recover stale commit state (#1187)
1 parent c01c14e commit 997ac07

1 file changed

Lines changed: 117 additions & 31 deletions

File tree

examples/opencode-memory-plugin/openviking-memory.ts

Lines changed: 117 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,31 @@ function upsertBufferedMessage(
611611
sessionMessageBuffer.set(sessionId, freshBuffer)
612612
}
613613

614+
function cleanupOrphanedMessageBuffers(now: number): void {
615+
for (const [sessionId, buffer] of sessionMessageBuffer.entries()) {
616+
if (sessionMap.has(sessionId)) {
617+
continue
618+
}
619+
620+
const oldestMessage = buffer[0]
621+
if (!oldestMessage) {
622+
sessionMessageBuffer.delete(sessionId)
623+
continue
624+
}
625+
626+
if (now - oldestMessage.timestamp <= BUFFERED_MESSAGE_TTL_MS * 2) {
627+
continue
628+
}
629+
630+
log("INFO", "buffer", "Cleaning up orphaned message buffer", {
631+
session_id: sessionId,
632+
buffer_age_ms: now - oldestMessage.timestamp,
633+
message_count: buffer.length,
634+
})
635+
sessionMessageBuffer.delete(sessionId)
636+
}
637+
}
638+
614639
function getAutoCommitIntervalMinutes(config: OpenVikingConfig): number {
615640
const configured = Number(config.autoCommit?.intervalMinutes ?? DEFAULT_CONFIG.autoCommit?.intervalMinutes ?? 10)
616641
if (!Number.isFinite(configured)) {
@@ -730,6 +755,15 @@ function clearCommitState(mapping: SessionMapping): void {
730755
mapping.commitStartedAt = undefined
731756
}
732757

758+
function isMissingCommitTaskError(error: unknown): boolean {
759+
if (!(error instanceof Error)) {
760+
return false
761+
}
762+
763+
const message = error.message.toLowerCase()
764+
return message.includes("resource not found") || message.includes("not found")
765+
}
766+
733767
let backgroundCommitSupported: boolean | null = null
734768
const COMMIT_TIMEOUT_MS = 180000
735769

@@ -988,7 +1022,19 @@ async function pollCommitTaskOnce(
9881022
}
9891023

9901024
if (!mapping.commitTaskId) {
991-
return "running"
1025+
const recoveredTaskId = await findRunningCommitTaskId(mapping.ovSessionId, config)
1026+
if (!recoveredTaskId) {
1027+
log("INFO", "session", "Clearing stale in-flight commit without task id", {
1028+
openviking_session: mapping.ovSessionId,
1029+
opencode_session: opencodeSessionId,
1030+
})
1031+
clearCommitState(mapping)
1032+
debouncedSaveSessionMap()
1033+
return "unknown"
1034+
}
1035+
1036+
mapping.commitTaskId = recoveredTaskId
1037+
debouncedSaveSessionMap()
9921038
}
9931039

9941040
try {
@@ -1041,12 +1087,23 @@ async function pollCommitTaskOnce(
10411087
}
10421088

10431089
return task.status
1044-
} catch (error: any) {
1090+
} catch (error: unknown) {
1091+
if (isMissingCommitTaskError(error)) {
1092+
log("INFO", "session", "Commit task disappeared; clearing stale state", {
1093+
openviking_session: mapping.ovSessionId,
1094+
opencode_session: opencodeSessionId,
1095+
task_id: mapping.commitTaskId,
1096+
})
1097+
clearCommitState(mapping)
1098+
debouncedSaveSessionMap()
1099+
return "unknown"
1100+
}
1101+
10451102
log("ERROR", "session", "Failed to poll OpenViking background commit", {
10461103
openviking_session: mapping.ovSessionId,
10471104
opencode_session: opencodeSessionId,
10481105
task_id: mapping.commitTaskId,
1049-
error: error.message,
1106+
error: error instanceof Error ? error.message : String(error),
10501107
})
10511108
return "unknown"
10521109
}
@@ -1070,41 +1127,63 @@ async function waitForCommitCompletion(
10701127
return null
10711128
}
10721129
if (!mapping.commitTaskId) {
1073-
await sleep(500, abortSignal)
1074-
continue
1130+
const recoveredTaskId = await findRunningCommitTaskId(mapping.ovSessionId, config)
1131+
if (!recoveredTaskId) {
1132+
clearCommitState(mapping)
1133+
debouncedSaveSessionMap()
1134+
return null
1135+
}
1136+
1137+
mapping.commitTaskId = recoveredTaskId
1138+
debouncedSaveSessionMap()
10751139
}
10761140

1077-
const response = await makeRequest<OpenVikingResponse<TaskResult>>(config, {
1078-
method: "GET",
1079-
endpoint: `/api/v1/tasks/${mapping.commitTaskId}`,
1080-
timeoutMs: 5000,
1081-
abortSignal,
1082-
})
1083-
const task = unwrapResponse(response)
1141+
try {
1142+
const response = await makeRequest<OpenVikingResponse<TaskResult>>(config, {
1143+
method: "GET",
1144+
endpoint: `/api/v1/tasks/${mapping.commitTaskId}`,
1145+
timeoutMs: 5000,
1146+
abortSignal,
1147+
})
1148+
const task = unwrapResponse(response)
10841149

1085-
if (task.status === "completed") {
1086-
const memoriesExtracted = totalMemoriesFromResult(task.result)
1087-
const archived = task.result?.archived ?? false
1150+
if (task.status === "completed") {
1151+
const memoriesExtracted = totalMemoriesFromResult(task.result)
1152+
const archived = task.result?.archived ?? false
10881153

1089-
await finalizeCommitSuccess(mapping, opencodeSessionId, config)
1154+
await finalizeCommitSuccess(mapping, opencodeSessionId, config)
10901155

1091-
log("INFO", "memcommit", "Background commit completed while waiting", {
1092-
openviking_session: mapping.ovSessionId,
1093-
opencode_session: opencodeSessionId,
1094-
task_id: task.task_id,
1095-
memories_extracted: memoriesExtracted,
1096-
archived,
1097-
})
1098-
return task
1099-
}
1156+
log("INFO", "memcommit", "Background commit completed while waiting", {
1157+
openviking_session: mapping.ovSessionId,
1158+
opencode_session: opencodeSessionId,
1159+
task_id: task.task_id,
1160+
memories_extracted: memoriesExtracted,
1161+
archived,
1162+
})
1163+
return task
1164+
}
11001165

1101-
if (task.status === "failed") {
1102-
clearCommitState(mapping)
1103-
debouncedSaveSessionMap()
1104-
throw new Error(task.error || "Background commit failed")
1105-
}
1166+
if (task.status === "failed") {
1167+
clearCommitState(mapping)
1168+
debouncedSaveSessionMap()
1169+
throw new Error(task.error || "Background commit failed")
1170+
}
1171+
1172+
await sleep(2000, abortSignal)
1173+
} catch (error: unknown) {
1174+
if (isMissingCommitTaskError(error)) {
1175+
log("INFO", "session", "Commit task disappeared while waiting; clearing stale state", {
1176+
openviking_session: mapping.ovSessionId,
1177+
opencode_session: opencodeSessionId,
1178+
task_id: mapping.commitTaskId,
1179+
})
1180+
clearCommitState(mapping)
1181+
debouncedSaveSessionMap()
1182+
return null
1183+
}
11061184

1107-
await sleep(2000, abortSignal)
1185+
throw error
1186+
}
11081187
}
11091188

11101189
return null
@@ -1117,6 +1196,11 @@ async function waitForCommitCompletion(
11171196
let autoCommitTimer: NodeJS.Timeout | null = null
11181197

11191198
function startAutoCommit(config: OpenVikingConfig) {
1199+
if (autoCommitTimer) {
1200+
log("INFO", "auto-commit", "Auto-commit scheduler already running")
1201+
return
1202+
}
1203+
11201204
if (!config.autoCommit?.enabled) {
11211205
log("INFO", "auto-commit", "Auto-commit disabled in config")
11221206
return
@@ -1146,6 +1230,8 @@ async function checkAndCommitSessions(config: OpenVikingConfig): Promise<void> {
11461230
const intervalMs = getAutoCommitIntervalMinutes(config) * 60 * 1000
11471231
const now = Date.now()
11481232

1233+
cleanupOrphanedMessageBuffers(now)
1234+
11491235
for (const [opencodeSessionId, mapping] of sessionMap.entries()) {
11501236
if (mapping.commitInFlight) {
11511237
await pollCommitTaskOnce(mapping, opencodeSessionId, config)

0 commit comments

Comments
 (0)