Skip to content

Commit fe64827

Browse files
committed
[recipes] Thought enrichment pipeline
LLM-based thought classification. Hardened from code review: bounded regex (closes a ReDoS vector), AbortController fetch timeouts, delimited untrusted content with capped outputs, a --max-calls spend cap, cursor pagination, and checkpoint resume.
1 parent 151a8d1 commit fe64827

7 files changed

Lines changed: 392 additions & 88 deletions

File tree

recipes/thought-enrichment/README.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ Classifies each thought using an LLM and writes structured metadata back to Supa
5151
node enrich-thoughts.mjs --apply --retry-failed
5252
```
5353

54-
**Flags:** `--provider` (openrouter or anthropic), `--concurrency`, `--limit`, `--skip`, `--model`.
54+
**Flags:** `--provider` (openrouter or anthropic), `--concurrency`, `--limit`, `--skip`, `--model`, `--max-calls`, `--reset-state`.
55+
56+
The `--max-calls` flag is a hard ceiling on the number of LLM calls per run. The default is `10000`; pass `--max-calls 0` to disable the cap. When the limit is hit the script aborts cleanly, prints a summary, and leaves remaining rows with `enriched=false` so you can resume later. This protects against a shell typo (e.g. dropping `--limit`) burning unbounded spend against a large un-enriched table.
57+
58+
**Resume.** The script checkpoints `lastProcessedId` to `data/enrichment-state.json` after each concurrency chunk. On startup, if a checkpoint exists and neither `--skip` nor `--reset-state` was passed, the run resumes from `id > lastProcessedId`. The `enriched=false` filter is still applied as a second layer of defense. Pass `--reset-state` to ignore the checkpoint and start from scratch.
5559

5660
### backfill-type.mjs -- Type canonicalization
5761

@@ -81,9 +85,9 @@ Scans thought content for patterns matching SSNs, credit cards, API keys, passwo
8185

8286
2. Apply:
8387

84-
```bash
85-
node backfill-sensitivity.mjs --apply
86-
```
88+
```bash
89+
node backfill-sensitivity.mjs --apply
90+
```
8791

8892
## Recommended execution order
8993

@@ -92,6 +96,11 @@ Scans thought content for patterns matching SSNs, credit cards, API keys, passwo
9296
3. Run `enrich-thoughts.mjs --dry-run --limit 20` to preview LLM classifications.
9397
4. Run `enrich-thoughts.mjs --apply` to enrich all remaining thoughts.
9498

99+
## Security notes
100+
101+
- **Prompt injection:** thought content is wrapped in `<thought_content>` tags and the system prompt instructs the model to treat everything inside as untrusted data. Any literal tag occurrences in content are escaped. Output fields (`summary`, `topics`, `tags`, `people`, `action_items`) are length-capped and control-char-stripped before they are written to `metadata`. Even so, enriching hostile third-party imports (shared chat exports, scraped feeds) can still influence classification labels — review before trusting them as ground truth.
102+
- **Bearer token on the wire:** every request carries your Supabase service-role key. Double-check that `SUPABASE_URL` points at your own Supabase project, not a proxy or debug server.
103+
95104
## Cost expectations
96105

97106
The default OpenRouter model is `openai/gpt-4o-mini` at roughly $0.001--0.002 per thought. For 1,000 thoughts, expect approximately $1--2. The `backfill-type` and `backfill-sensitivity` scripts are free (no LLM calls -- they use local logic only).

recipes/thought-enrichment/backfill-sensitivity.mjs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,16 @@
1313
import fs from "node:fs";
1414
import path from "node:path";
1515
import { fileURLToPath } from "node:url";
16+
import {
17+
fetchWithTimeout,
18+
resolveTimeoutMs,
19+
DEFAULT_SUPABASE_TIMEOUT_MS,
20+
} from "./lib/memory-core.mjs";
1621

1722
const __dirname = path.dirname(fileURLToPath(import.meta.url));
1823

24+
const SUPABASE_TIMEOUT_MS = resolveTimeoutMs(process.env.FETCH_TIMEOUT_MS, DEFAULT_SUPABASE_TIMEOUT_MS);
25+
1926
// Load env from .env.local
2027
const envPath = path.resolve(__dirname, ".env.local");
2128
const envVars = {};
@@ -94,18 +101,24 @@ console.log(`Mode: ${dryRun ? "DRY RUN (no changes)" : "APPLY (will update DB)"}
94101
console.log();
95102

96103
const BATCH_SIZE = 500;
97-
let offset = 0;
104+
const PROGRESS_EVERY = 5000;
105+
let afterId = 0;
98106
let scanned = 0;
107+
let scannedAtLastProgress = 0;
99108
let upgradedPersonal = 0;
100109
let upgradedRestricted = 0;
101110
let errors = 0;
102111

112+
// Cursor-based pagination on id. Offset-pagination is unsafe here:
113+
// successful PATCHes shift the "where sensitivity_tier in
114+
// (null,standard,'')" result set, so `offset += BATCH_SIZE` would skip
115+
// un-processed rows. Cursor on id ASC is stable under mutation.
103116
while (true) {
104-
const url = `${BASE_URL}/thoughts?select=id,content,sensitivity_tier&or=(sensitivity_tier.is.null,sensitivity_tier.eq.standard,sensitivity_tier.eq.)&order=id&offset=${offset}&limit=${BATCH_SIZE}`;
105-
const res = await fetch(url, { headers });
117+
const url = `${BASE_URL}/thoughts?select=id,content,sensitivity_tier&or=(sensitivity_tier.is.null,sensitivity_tier.eq.standard,sensitivity_tier.eq.)&id=gt.${afterId}&order=id.asc&limit=${BATCH_SIZE}`;
118+
const res = await fetchWithTimeout(url, { headers }, SUPABASE_TIMEOUT_MS);
106119

107120
if (!res.ok) {
108-
console.error(`Query error at offset ${offset}: ${res.status} ${await res.text()}`);
121+
console.error(`Query error after id ${afterId}: ${res.status} ${await res.text()}`);
109122
errors++;
110123
break;
111124
}
@@ -123,11 +136,11 @@ while (true) {
123136

124137
if (apply) {
125138
const updateUrl = `${BASE_URL}/thoughts?id=eq.${row.id}`;
126-
const updateRes = await fetch(updateUrl, {
139+
const updateRes = await fetchWithTimeout(updateUrl, {
127140
method: "PATCH",
128141
headers,
129142
body: JSON.stringify({ sensitivity_tier: result.tier }),
130-
});
143+
}, SUPABASE_TIMEOUT_MS);
131144

132145
if (!updateRes.ok) {
133146
console.error(` Failed to update thought #${row.id}: ${updateRes.status}`);
@@ -143,11 +156,16 @@ while (true) {
143156
}
144157
}
145158

146-
offset += data.length;
159+
// Advance the cursor past the last id seen, regardless of whether
160+
// any rows in this page were upgraded.
161+
afterId = data[data.length - 1].id;
147162
if (data.length < BATCH_SIZE) break;
148163

149-
if (offset % 5000 === 0) {
164+
// Progress reporter independent of a multiple-of-offset check, so
165+
// partial batches do not silently stop emitting progress.
166+
if (Math.floor(scanned / PROGRESS_EVERY) > Math.floor(scannedAtLastProgress / PROGRESS_EVERY)) {
150167
console.log(` ... scanned ${scanned} thoughts so far (${upgradedPersonal} personal, ${upgradedRestricted} restricted)`);
168+
scannedAtLastProgress = scanned;
151169
}
152170
}
153171

recipes/thought-enrichment/backfill-type.mjs

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,16 @@
1111
import { readFileSync } from "fs";
1212
import { fileURLToPath } from "url";
1313
import { dirname, join } from "path";
14+
import {
15+
fetchWithTimeout,
16+
resolveTimeoutMs,
17+
DEFAULT_SUPABASE_TIMEOUT_MS,
18+
} from "./lib/memory-core.mjs";
1419

1520
const __dirname = dirname(fileURLToPath(import.meta.url));
1621

22+
const SUPABASE_TIMEOUT_MS = resolveTimeoutMs(process.env.FETCH_TIMEOUT_MS, DEFAULT_SUPABASE_TIMEOUT_MS);
23+
1724
// Load env
1825
function loadEnv() {
1926
const envPath = join(__dirname, ".env.local");
@@ -69,10 +76,31 @@ async function sleep(ms) {
6976
return new Promise((resolve) => setTimeout(resolve, ms));
7077
}
7178

72-
async function fetchBatch(offset, retries = 4) {
73-
const url = `${BASE}/thoughts?select=id,metadata->>type&type=eq.reference&limit=${BATCH_SIZE}&offset=${offset}`;
79+
// Cursor-based pagination on id. Offset pagination is unsafe here:
80+
// every successful PATCH removes a row from the `type=eq.reference`
81+
// filter, so `offset += rows.length` would skip unprocessed rows. The
82+
// cursor pattern is id > afterId ORDER BY id ASC. `includeCount` is
83+
// used once on the first call to populate the total for the progress
84+
// bar — every subsequent call omits the count=exact header so
85+
// PostgreSQL does not COUNT(*) the filtered set per page (LOW-7).
86+
async function fetchBatch(afterId, { includeCount = false } = {}, retries = 4) {
87+
const url = `${BASE}/thoughts?select=id,metadata->>type&type=eq.reference&id=gt.${afterId}&order=id.asc&limit=${BATCH_SIZE}`;
88+
const batchHeaders = includeCount ? { ...headers, Prefer: "count=exact" } : { ...headers };
7489
for (let attempt = 0; attempt <= retries; attempt++) {
75-
const r = await fetch(url, { headers: { ...headers, Prefer: "count=exact" } });
90+
let r;
91+
try {
92+
r = await fetchWithTimeout(url, { headers: batchHeaders }, SUPABASE_TIMEOUT_MS);
93+
} catch (err) {
94+
// Treat AbortError/timeouts and other network errors as transient.
95+
const msg = err?.message || String(err);
96+
if (attempt < retries) {
97+
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
98+
process.stderr.write(`\n[retry] fetch afterId ${afterId} ${msg.slice(0, 120)}, waiting ${delay}ms\n`);
99+
await sleep(delay);
100+
continue;
101+
}
102+
throw err;
103+
}
76104
if (r.ok) {
77105
const contentRange = r.headers.get("content-range");
78106
const total = contentRange ? parseInt(contentRange.split("/")[1], 10) : null;
@@ -83,22 +111,34 @@ async function fetchBatch(offset, retries = 4) {
83111
const isTransient = r.status === 502 || r.status === 503 || r.status === 504 || r.status === 429;
84112
if (isTransient && attempt < retries) {
85113
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
86-
process.stderr.write(`\n[retry] fetch offset ${offset} got ${r.status}, waiting ${delay}ms\n`);
114+
process.stderr.write(`\n[retry] fetch afterId ${afterId} got ${r.status}, waiting ${delay}ms\n`);
87115
await sleep(delay);
88116
continue;
89117
}
90-
throw new Error(`Fetch failed at offset ${offset}: ${r.status} ${body.slice(0, 200)}`);
118+
throw new Error(`Fetch failed after id ${afterId}: ${r.status} ${body.slice(0, 200)}`);
91119
}
92120
}
93121

94122
async function updateRow(id, newType, retries = 6) {
95123
const url = `${BASE}/thoughts?id=eq.${id}`;
96124
for (let attempt = 0; attempt <= retries; attempt++) {
97-
const r = await fetch(url, {
98-
method: "PATCH",
99-
headers,
100-
body: JSON.stringify({ type: newType }),
101-
});
125+
let r;
126+
try {
127+
r = await fetchWithTimeout(url, {
128+
method: "PATCH",
129+
headers,
130+
body: JSON.stringify({ type: newType }),
131+
}, SUPABASE_TIMEOUT_MS);
132+
} catch (err) {
133+
const msg = err?.message || String(err);
134+
if (attempt < retries) {
135+
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
136+
process.stderr.write(`\n[retry] id ${id} ${msg.slice(0, 120)}, waiting ${delay}ms (attempt ${attempt + 1}/${retries})\n`);
137+
await sleep(delay);
138+
continue;
139+
}
140+
throw err;
141+
}
102142
if (r.ok) return;
103143
const body = await r.text();
104144
const isTransient = r.status === 502 || r.status === 503 || r.status === 504 || r.status === 429;
@@ -126,8 +166,14 @@ async function main() {
126166
console.log(`Batch size: ${BATCH_SIZE}`);
127167
console.log("");
128168

129-
let offset = 0;
169+
// Cursor replaces offset. afterId starts at 0 (all thought ids are
170+
// positive) and advances to the last id seen in each page, so a
171+
// PATCH that removes rows from the `type=eq.reference` filter cannot
172+
// cause the cursor to skip un-processed rows.
173+
let afterId = 0;
174+
let processedRows = 0;
130175
let total = null;
176+
let firstCountDone = false;
131177
let totalUpdated = 0;
132178
let totalSkippedInvalidType = 0;
133179
let totalSkippedAlreadyCorrect = 0;
@@ -137,7 +183,10 @@ async function main() {
137183
const typeDistribution = {};
138184

139185
while (true) {
140-
const { rows, total: fetchedTotal } = await fetchBatch(offset);
186+
const { rows, total: fetchedTotal } = await fetchBatch(afterId, {
187+
includeCount: !firstCountDone,
188+
});
189+
firstCountDone = true;
141190

142191
if (total === null && fetchedTotal !== null) {
143192
total = fetchedTotal;
@@ -179,18 +228,20 @@ async function main() {
179228
totalUpdated += updates.length;
180229
}
181230

182-
offset += rows.length;
231+
processedRows += rows.length;
232+
// Advance cursor past the highest id seen (rows are ordered by id ASC).
233+
afterId = rows[rows.length - 1].id;
183234

184-
const pct = total ? ((offset / total) * 100).toFixed(1) : "?";
185-
process.stdout.write(`\rProgress: ${offset}/${total ?? "?"} (${pct}%) — updated so far: ${totalUpdated}`);
235+
const pct = total ? ((processedRows / total) * 100).toFixed(1) : "?";
236+
process.stdout.write(`\rProgress: ${processedRows}/${total ?? "?"} (${pct}%) — updated so far: ${totalUpdated}`);
186237

187238
if (rows.length < BATCH_SIZE) break;
188239
}
189240

190241
console.log("\n");
191242
console.log("=== BACKFILL COMPLETE ===");
192243
console.log("");
193-
console.log(`Rows processed: ${offset}`);
244+
console.log(`Rows processed: ${processedRows}`);
194245
console.log(`Rows updated: ${totalUpdated}${DRY_RUN ? " (dry run, not written)" : ""}`);
195246
console.log(`Skipped (already reference): ${totalSkippedAlreadyCorrect}`);
196247
console.log(`Skipped (null/empty type): ${totalSkippedNullType}`);

0 commit comments

Comments
 (0)