Skip to content

Commit 478a4a3

Browse files
whoabuddyclaude
andcommitted
fix(substrate): review-feedback correctness pass — catch widening, init flag, host requirement, write-back guard, idempotency key
Addresses 7 inline review findings (Copilot + secret-mars + arc0btc) on PR #5 — all substantive items resolved in-tree, no follow-ups left. **Catch widening (Copilot #4, secret-mars [blocking-risk], arc0btc #1)** `runSubstrateIntakeTick` previously only caught `claimNextJob`. Both `jobRowToTaskInput` and `enqueueTask` ran outside the try, so a malformed JobRow or a local SQLite lock contention escaped as a thrown error from `runOnce`. Each call site now has its own catch with a distinct skip reason — `job-parse-fail` and `local-enqueue-fail` — so the contract's "NEVER throws" guarantee is real. The substrate job stays under lease in both failure modes; `releaseExpiredLeases` reconciles on the next cycle. **`substrateDbInitialized` retry on credential fail (Copilot #1, all 3 reviewers seconded)** The flag was set to `true` BEFORE `createSubstrateConnection` succeeded. A transient credential read miss at first tick (e.g. encrypted blob not yet available) permanently disabled substrate intake for the process lifetime — the contract's documented `[substrate] skip reason=credential-fail` log line would never re-fire. Flag now sets only inside the success branch; next tick retries. **`runSubstrateWriteBack` swallows transient PG blips (arc0btc #3)** Was the only substrate fn whose Postgres calls were unguarded — a mid-write-back connection reset would propagate out of `runOnce` even though the local task already finished cleanly. Wrapped both `completeJob` and `failJob` in a single try; new `[substrate] write-back error=<msg> jobs.id=<id>` log line on transient fail. Lease recovery still reconciles eventually. **Default host removed (Copilot #3, arc0btc seconded)** `host` no longer defaults to a hard-coded private IP. When `substrate.enabled: true`, an explicit `substrate.host` is required — `createSubstrateConnection` throws a clear error if unset. Closes the "dev/test slot misconfigured at enabled:true accidentally connects to prod" footgun. **Self-contained log fallbacks (secret-mars #5)** The empty-else branches on `completeJob`/`failJob` `{ ok: false }` results relied on the substrate-db package emitting its own `[substrate] complete-epoch-mismatch ...` log. If that package's log format ever changes, those branches went silent. Added `[substrate] complete-failed jobs.id=<id> epoch=<n>` and `[substrate] fail-failed ...` fallbacks so this code is self-contained. **Idempotency-key threading (arc0btc #2 design, secret-mars idempotency follow-up)** Closes the "side-effecting jobs can execute twice when a lease expires mid-flight" hazard structurally — the fence on `jobs.claim_epoch` only protects the status write, not the action. `jobRowToTaskInput` now threads `payload.idempotency_key = "substrate-<job_id>-e<claim_epoch>"` so downstream side-effecting handlers (email send, PR open, tx broadcast) can dedup against their own per-handler key store. Bounds the *consequence* from "action runs twice" to "action runs once, second attempt no-ops." Substrate tasks also enqueue with `priority: 1` so they execute on the same or next tick — the lease window now tracks real execution latency instead of waiting behind lower-priority work, which shrinks the "lease expires while task waits in local queue" window further. **Silent null claim (Copilot #5)** Contract said null-claim is silent; impl logged `[substrate] claim ... result=none` every tick. Dropped the log — quiet-tick visibility lives in successful-claim and idle-dispatch event lines, not substrate tick-rate noise. **Other nits (Copilot #2, arc0btc #5)** - Removed unused `getTaskById` import in `substrate.ts`. - Documented `substrate` block's shallow-merge behavior in `types.ts` (unlike `profiles`/`adapters` which deep-merge). Slots that extend a base and want to flip only `isLeaseRecoveryOwner: true` must repeat the whole substrate block. Deferred (already declared out of scope by reviewers): - `pg_advisory_lock` guard on `isLeaseRecoveryOwner` race (secret-mars [suggestion]) — multi-slot mis-config protection is a follow-up. - `_substrate_*` payload-injection vs strict validators (secret-mars [suggestion]) — naming is defensive prefixing already. Contract block in src/substrate.ts updated with all new log lines and a new "Side-effect duplicate-execution guard" section. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f51de46 commit 478a4a3

3 files changed

Lines changed: 120 additions & 38 deletions

File tree

src/runtime.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,18 @@ export async function runOnce(db: Database, config: RuntimeConfig): Promise<Reco
210210
// ---------------------------------------------------------------------------
211211
const subConfig = resolveSubstrateConfig(config);
212212
if (subConfig) {
213-
// Lazy-init the substrate DB connection (credential resolved once per process).
213+
// Lazy-init the substrate DB connection (credential resolved on success;
214+
// re-attempted next tick on failure so a transient credential / host
215+
// read miss does not permanently disable substrate intake).
214216
if (!substrateDbInitialized) {
215-
substrateDbInitialized = true;
216217
try {
217218
substrateDb = await createSubstrateConnection(subConfig);
219+
substrateDbInitialized = true; // only mark initialized on success
218220
} catch (error) {
219221
const msg = error instanceof Error ? error.message : String(error);
220222
console.error(`[substrate] skip reason=credential-fail error=${msg}`);
221223
substrateDb = null;
224+
// substrateDbInitialized stays false — next tick retries.
222225
}
223226
}
224227

src/substrate.ts

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,36 @@
55
// Zero behavior change on any slot where substrate is not configured.
66
//
77
// Contract (stable, parseable log lines):
8-
// [substrate] claim slot=<id> kinds=<list> result=<jobs.id|none>
8+
// [substrate] claim slot=<id> kinds=<list> result=<jobs.id> (success — null claim is silent)
99
// [substrate] complete jobs.id=<id> epoch=<n>
1010
// [substrate] fail jobs.id=<id> epoch=<n> reason=<...>
1111
// [substrate] lease-recovery released=<n>
1212
// [substrate] skip reason=credential-fail error=<...>
1313
// [substrate] skip reason=db-unreachable error=<...>
14+
// [substrate] skip reason=job-parse-fail error=<...> jobs.id=<id>
15+
// [substrate] skip reason=local-enqueue-fail error=<...> jobs.id=<id>
1416
// [substrate] complete-epoch-mismatch jobs.id=<id> expected=<n> actual=<m> — ...
17+
// [substrate] complete-failed jobs.id=<id> epoch=<n> (self-contained fallback)
18+
// [substrate] fail-failed jobs.id=<id> epoch=<n> (self-contained fallback)
19+
// [substrate] write-back error=<...> jobs.id=<id> (transient PG blip; swallowed)
1520
//
1621
// Failure conditions:
1722
// - Substrate unreachable → tick logs [substrate] skip and continues (NOT crash)
18-
// - Credential resolution fail → tick logs [substrate] skip with distinct line
19-
// - claimNextJob returns null → no-op tick (silent)
23+
// - Credential resolution fail → tick logs [substrate] skip with distinct line; flag NOT
24+
// marked initialized so the next tick retries.
25+
// - claimNextJob returns null → no-op tick (silent — quiet-tick visibility goes through
26+
// successful-claim and idle-dispatch event lines, not substrate-specific log spam)
27+
// - jobRowToTaskInput / enqueueTask throw → caught with distinct reason; substrate job
28+
// stays held under lease and gets returned by releaseExpiredLeases.
29+
// - completeJob / failJob throw mid-write-back → caught; lease recovery reconciles.
30+
//
31+
// Side-effect duplicate-execution guard:
32+
// Substrate `jobs.id` + `claim_epoch` are threaded into the local TaskInput as both
33+
// `payload._substrate_*` fields (for write-back fencing) AND a top-level
34+
// `payload.idempotency_key = "substrate-<job_id>-e<claim_epoch>"` (for downstream
35+
// side-effecting handlers to dedup against). Substrate tasks also enqueue with
36+
// `priority: 1` to maximize same/next-tick execution and minimize the window
37+
// where a lease can expire on a queued-but-unrun task.
2038
// ---------------------------------------------------------------------------
2139

2240
import type { Database } from "bun:sqlite";
@@ -30,7 +48,7 @@ import {
3048
} from "@genesis-works/substrate-db";
3149
import type { RuntimeConfig, SubstrateConfig, TaskInput, CanonicalOutcome, TaskRecord } from "./types";
3250
import { resolveCredentialRefs } from "./credentials";
33-
import { enqueueTask, getTaskById } from "./db";
51+
import { enqueueTask } from "./db";
3452

3553
// ---------------------------------------------------------------------------
3654
// Types
@@ -73,6 +91,15 @@ export function resolveSubstrateConfig(config: RuntimeConfig): SubstrateConfig |
7391
export async function createSubstrateConnection(
7492
sub: SubstrateConfig
7593
): Promise<SubstrateDb> {
94+
// Host is REQUIRED when substrate is enabled — no implicit default to any private IP.
95+
// A misconfigured slot with substrate.enabled=true but no host would otherwise quietly
96+
// connect to whatever sits at the prior default; explicit host config is the safer floor.
97+
if (!sub.host || sub.host.trim().length === 0) {
98+
throw new Error(
99+
'Substrate enabled but substrate.host is not set — explicit host required (no implicit default).'
100+
);
101+
}
102+
76103
// Resolve the credential — the env key SUBSTRATE_DB_CREDENTIAL holds the id.
77104
// We use resolveCredentialRefs on a fake env object to reuse the existing pattern.
78105
const fakeEnv: Record<string, string> = {
@@ -85,7 +112,7 @@ export async function createSubstrateConnection(
85112
}
86113

87114
return createSubstrateClient({
88-
host: sub.host ?? "192.168.1.31",
115+
host: sub.host,
89116
port: sub.port ?? 5432,
90117
database: sub.database ?? "substrate",
91118
user: sub.user ?? "substrate_app",
@@ -101,6 +128,12 @@ export async function createSubstrateConnection(
101128

102129
export function jobRowToTaskInput(job: JobRow): TaskInput {
103130
const p = (job.payload ?? {}) as Record<string, unknown>;
131+
// Side-effect duplicate guard: every substrate-sourced task carries a stable
132+
// idempotency_key derived from (job_id, claim_epoch). Downstream handlers that
133+
// perform external side-effects (email send, PR open, tx broadcast) check this
134+
// key in their own dedup store before acting — bounds duplicate execution from
135+
// lease-expiry-mid-flight scenarios to a no-op on the second handler call.
136+
const idempotencyKey = `substrate-${job.id}-e${job.claim_epoch}`;
104137
return {
105138
kind: job.kind,
106139
source: `substrate:${job.id}`, // ties runtime task back to the substrate job
@@ -110,9 +143,13 @@ export function jobRowToTaskInput(job: JobRow): TaskInput {
110143
...p,
111144
_substrate_job_id: job.id, // load-bearing: write-back uses this
112145
_substrate_claim_epoch: job.claim_epoch, // load-bearing: epoch fencing
146+
idempotency_key: idempotencyKey, // dedup key for side-effecting handlers
113147
},
114148
requested_profile: typeof p.requested_profile === "string" ? p.requested_profile : undefined,
115149
requested_adapter: typeof p.requested_adapter === "string" ? p.requested_adapter : undefined,
150+
// Priority 1 maximizes same/next-tick execution so the lease window roughly
151+
// tracks real execution latency instead of waiting behind lower-priority work.
152+
priority: 1,
116153
max_attempts: 1, // substrate handles retry via releaseExpiredLeases
117154
};
118155
}
@@ -128,7 +165,13 @@ export function jobRowToTaskInput(job: JobRow): TaskInput {
128165
// Returns { claimed: false } if substrate is disabled, unreachable, or no jobs.
129166
// Returns { claimed: true, jobId, taskId, epochUsed } on successful enqueue.
130167
//
131-
// NEVER throws — all errors are logged and result in { claimed: false }.
168+
// NEVER throws — every code path inside is wrapped. Distinct skip reasons:
169+
// db-unreachable — claimNextJob threw (Postgres connectivity)
170+
// job-parse-fail — jobRowToTaskInput threw on a malformed JobRow
171+
// local-enqueue-fail — enqueueTask threw (sqlite lock, validator, etc.)
172+
// In the latter two, the substrate `jobs` row stays held under lease and is
173+
// returned by releaseExpiredLeases — another slot (or this slot on a later
174+
// tick) re-claims with a fresh epoch.
132175
// ---------------------------------------------------------------------------
133176

134177
export async function runSubstrateIntakeTick(
@@ -148,17 +191,37 @@ export async function runSubstrateIntakeTick(
148191
return { claimed: false, reason: "db-unreachable" };
149192
}
150193

151-
const kindList = sub.kinds.join(",");
194+
// Silent no-op on null claim (per contract). Successful claims and idle dispatch
195+
// are already logged elsewhere; substrate-specific result=none lines would just
196+
// be tick-rate noise on a queue that's frequently empty.
152197
if (!job) {
153-
console.info(`[substrate] claim slot=${sub.slotId} kinds=${kindList} result=none`);
154198
return { claimed: false };
155199
}
156200

201+
const kindList = sub.kinds.join(",");
157202
console.info(`[substrate] claim slot=${sub.slotId} kinds=${kindList} result=${job.id}`);
158203

159-
// Convert to TaskInput and enqueue locally
160-
const taskInput = jobRowToTaskInput(job);
161-
const task = enqueueTask(localDb, config, taskInput);
204+
// Convert to TaskInput and enqueue locally — both can throw on malformed payload
205+
// or local DB pressure. A throw here leaves the substrate job under lease so it
206+
// gets re-claimed by another slot via releaseExpiredLeases. We DO NOT release the
207+
// lease eagerly here: doing so would race with the lease-recovery owner.
208+
let taskInput: TaskInput;
209+
try {
210+
taskInput = jobRowToTaskInput(job);
211+
} catch (error) {
212+
const msg = error instanceof Error ? error.message : String(error);
213+
console.error(`[substrate] skip reason=job-parse-fail error=${msg} jobs.id=${job.id}`);
214+
return { claimed: false, reason: "job-parse-fail" };
215+
}
216+
217+
let task;
218+
try {
219+
task = enqueueTask(localDb, config, taskInput);
220+
} catch (error) {
221+
const msg = error instanceof Error ? error.message : String(error);
222+
console.error(`[substrate] skip reason=local-enqueue-fail error=${msg} jobs.id=${job.id}`);
223+
return { claimed: false, reason: "local-enqueue-fail" };
224+
}
162225

163226
return {
164227
claimed: true,
@@ -200,33 +263,45 @@ export async function runSubstrateWriteBack(
200263

201264
const isSuccess = outcome.status === "completed";
202265

203-
if (isSuccess) {
204-
const result = await completeJob(
205-
substrateDb,
206-
jobId,
207-
{
208-
task_id: task.task_id,
209-
status: "completed",
210-
operator_summary: outcome.operator_summary,
211-
artifact_paths: outcome.artifact_paths ?? [],
212-
machine_status: outcome.machine_status,
213-
},
214-
undefined,
215-
claimEpoch
216-
);
217-
if (result.ok) {
218-
console.info(`[substrate] complete jobs.id=${jobId} epoch=${claimEpoch ?? "none"}`);
219-
} else {
220-
// conflict is already logged by completeJob with the [substrate] prefix
221-
}
222-
} else {
223-
const reason = outcome.operator_summary.slice(0, 500);
224-
const result = await failJob(substrateDb, jobId, reason, claimEpoch);
225-
if (result.ok) {
226-
console.info(`[substrate] fail jobs.id=${jobId} epoch=${claimEpoch ?? "none"} reason=${reason.slice(0, 100)}`);
266+
// The whole substrate write-back is wrapped: a transient PG blip mid-write
267+
// must NOT propagate out of runOnce and turn a clean local-task completion
268+
// into an errored tick. The substrate `jobs` row stays held under lease and
269+
// releaseExpiredLeases reconciles on the next cycle.
270+
try {
271+
if (isSuccess) {
272+
const result = await completeJob(
273+
substrateDb,
274+
jobId,
275+
{
276+
task_id: task.task_id,
277+
status: "completed",
278+
operator_summary: outcome.operator_summary,
279+
artifact_paths: outcome.artifact_paths ?? [],
280+
machine_status: outcome.machine_status,
281+
},
282+
undefined,
283+
claimEpoch
284+
);
285+
if (result.ok) {
286+
console.info(`[substrate] complete jobs.id=${jobId} epoch=${claimEpoch ?? "none"}`);
287+
} else {
288+
// Self-contained fallback log: don't assume the substrate-db package's
289+
// log format stays stable across versions. Mismatch / already-done
290+
// both land here; the package may also log its own line.
291+
console.warn(`[substrate] complete-failed jobs.id=${jobId} epoch=${claimEpoch ?? "none"}`);
292+
}
227293
} else {
228-
// conflict is already logged by failJob
294+
const reason = outcome.operator_summary.slice(0, 500);
295+
const result = await failJob(substrateDb, jobId, reason, claimEpoch);
296+
if (result.ok) {
297+
console.info(`[substrate] fail jobs.id=${jobId} epoch=${claimEpoch ?? "none"} reason=${reason.slice(0, 100)}`);
298+
} else {
299+
console.warn(`[substrate] fail-failed jobs.id=${jobId} epoch=${claimEpoch ?? "none"}`);
300+
}
229301
}
302+
} catch (error) {
303+
const msg = error instanceof Error ? error.message : String(error);
304+
console.error(`[substrate] write-back error=${msg} jobs.id=${jobId}`);
230305
}
231306
}
232307

src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ export type RuntimeConfig = {
5353
profiles: Record<string, string>;
5454
adapters: Record<string, AdapterConfig>;
5555
// Opt-in substrate dispatch intake (Phase 5 — disabled by default).
56+
// Note: this block is shallow-overridden by mergeRuntimeConfig (unlike profiles /
57+
// adapters which deep-merge). Slots that extend a base and want to set only
58+
// `isLeaseRecoveryOwner: true` must repeat the whole substrate block in their
59+
// host config; a deep-merge here is a follow-up if that pattern becomes common.
5660
substrate?: SubstrateConfig;
5761
};
5862

0 commit comments

Comments
 (0)