@@ -191,12 +191,17 @@ def upsert_postings(self, source: str, postings: Iterable) -> int:
191191 matching attributes — `id`, `source`, `title`, `company`, etc.).
192192 Conflict key is (source, job_id) so re-runs are idempotent.
193193
194- Tier 2 embed-on-write: when hybrid search is enabled, each row's
195- `embedding` is computed in the same call so the corpus stays
196- current without a re-run of the backfill. This is STRICTLY
197- non-fatal — if the embeddings call fails, the jobs are still
198- upserted (without an embedding; the hybrid RPC degrades those
199- rows to lexical until the next backfill / refresh fills them).
194+ Tier 2 embed-on-write: when hybrid search is enabled, rows that
195+ are NEW to the cache (no embedding stored yet) get an
196+ `embedding` computed in this same call. Rows already embedded
197+ are NOT re-embedded — re-writing every job's vector on every
198+ 4-hour refresh churned the HNSW index hard enough to blow the
199+ upsert statement timeout (DEVLOG Day 75). STRICTLY non-fatal:
200+ if the embeddings call fails the jobs are still upserted, just
201+ without an embedding. The upsert is issued as up to two batches
202+ (rows without an `embedding`, then rows with one) so each
203+ PostgREST request has a homogeneous column set and the HNSW
204+ index is only touched for the small newly-embedded batch.
200205 """
201206 client = self ._require_client ()
202207 rows = []
@@ -233,33 +238,48 @@ def upsert_postings(self, source: str, postings: Iterable) -> int:
233238 if not rows :
234239 return 0
235240 # Tier 2 embed-on-write — non-fatal. Attaches an `embedding` to
236- # each row dict in place. Any failure inside leaves the rows
237- # embedding-free and the upsert proceeds regardless.
238- self ._attach_embeddings_on_write (rows )
241+ # the subset of `rows` that are NEW to the cache; already-
242+ # embedded rows are left untouched so the upsert never rewrites
243+ # their vector.
244+ self ._embed_new_rows (source , rows )
245+ # Split by column shape: a row dict WITHOUT an `embedding` key
246+ # leaves the stored vector intact on conflict; rows WITH a fresh
247+ # embedding go in their own batch. Two homogeneous upserts keep
248+ # each PostgREST request's column set consistent and confine the
249+ # HNSW index writes to the (small) new-rows batch.
250+ without_embedding = [row for row in rows if "embedding" not in row ]
251+ with_embedding = [row for row in rows if "embedding" in row ]
252+ touched = 0
239253 try :
240- response = (
241- client .table (self ._table )
242- .upsert (rows , on_conflict = "source,job_id" )
243- .execute ()
244- )
254+ for batch in (without_embedding , with_embedding ):
255+ if not batch :
256+ continue
257+ response = (
258+ client .table (self ._table )
259+ .upsert (batch , on_conflict = "source,job_id" )
260+ .execute ()
261+ )
262+ touched += len (self ._extract_rows (response ))
245263 except Exception as exc :
246264 raise AppError (
247265 "Failed to upsert cached jobs." ,
248266 details = f"{ type (exc ).__name__ } : { exc } " ,
249267 ) from exc
250- return len ( self . _extract_rows ( response ))
268+ return touched
251269
252- def _attach_embeddings_on_write (self , rows : list [dict ]) -> None :
253- """Compute + attach an `embedding` to each upsert row in place .
270+ def _embed_new_rows (self , source : str , rows : list [dict ]) -> None :
271+ """Attach an `embedding` to the `rows` that are new to the cache .
254272
255- The Tier 2 embed-on-write path. Mutates `rows`: on success each
256- dict gains an `embedding` key (a list[float]); on ANY failure
257- the rows are left untouched and the caller upserts them without
258- embeddings. This method NEVER raises — embed-on-write must not
259- be able to break the refresh worker .
273+ The Tier 2 embed-on-write path. Mutates `rows` in place: each
274+ NEW row's dict gains an `embedding` key (a list[float]). Rows
275+ already carrying a vector are left untouched — re-embedding the
276+ whole corpus on every refresh churned the HNSW index and timed
277+ out the chunk upserts (DEVLOG Day 75) .
260278
261- Skipped entirely when hybrid search is disabled (no point
262- spending tokens on a column nothing queries yet).
279+ NEVER raises — embed-on-write must not be able to break the
280+ refresh worker. Skipped entirely when hybrid search is disabled
281+ or no OpenAI service is available (no point spending tokens on a
282+ column nothing queries yet).
263283 """
264284 if not rows :
265285 return
@@ -268,14 +288,37 @@ def _attach_embeddings_on_write(self, rows: list[dict]) -> None:
268288 service = self ._get_openai_service ()
269289 if service is None or not service .is_available ():
270290 return
291+ # Only NEW / un-embedded jobs need work. A failure to determine
292+ # which those are degrades to "embed nothing this chunk" rather
293+ # than risking a re-embed of the whole corpus.
294+ try :
295+ already_embedded = self ._already_embedded_job_ids (
296+ source , [row ["job_id" ] for row in rows ]
297+ )
298+ except Exception as exc : # noqa: BLE001 — embed-on-write is non-fatal
299+ log_event (
300+ LOGGER ,
301+ logging .WARNING ,
302+ "cached_jobs_embed_diff_failed" ,
303+ "Could not determine which jobs still need embeddings; "
304+ "skipping embed-on-write for this chunk." ,
305+ row_count = len (rows ),
306+ error = f"{ type (exc ).__name__ } : { exc } " ,
307+ )
308+ return
309+ new_rows = [
310+ row for row in rows if row ["job_id" ] not in already_embedded
311+ ]
312+ if not new_rows :
313+ return
271314 try :
272315 inputs = [
273316 build_job_embedding_input (
274317 title = row .get ("title" , "" ),
275318 company = row .get ("company" , "" ),
276319 description = row .get ("description" , "" ),
277320 )
278- for row in rows
321+ for row in new_rows
279322 ]
280323 vectors = service .create_embeddings (
281324 inputs , task_name = "job_embedding_on_write"
@@ -285,28 +328,81 @@ def _attach_embeddings_on_write(self, rows: list[dict]) -> None:
285328 LOGGER ,
286329 logging .WARNING ,
287330 "cached_jobs_embed_on_write_failed" ,
288- "Embed-on-write failed; jobs cached without embeddings "
289- "(they fall back to lexical until the next backfill )." ,
290- row_count = len (rows ),
331+ "Embed-on-write failed; new jobs cached without "
332+ "embeddings (a later refresh re-embeds them )." ,
333+ row_count = len (new_rows ),
291334 error = f"{ type (exc ).__name__ } : { exc } " ,
292335 )
293336 return
294- if len (vectors ) != len (rows ):
337+ if len (vectors ) != len (new_rows ):
295338 # Count mismatch — can't safely pair vectors to rows; skip
296339 # attaching rather than risk the wrong vector on a job.
297340 log_event (
298341 LOGGER ,
299342 logging .WARNING ,
300343 "cached_jobs_embed_on_write_count_mismatch" ,
301344 "Embed-on-write returned a mismatched vector count; "
302- "jobs cached without embeddings." ,
303- row_count = len (rows ),
345+ "new jobs cached without embeddings." ,
346+ row_count = len (new_rows ),
304347 vector_count = len (vectors ),
305348 )
306349 return
307- for row , vector in zip (rows , vectors ):
350+ for row , vector in zip (new_rows , vectors ):
308351 row ["embedding" ] = vector
309352
353+ def _already_embedded_job_ids (
354+ self , source : str , job_ids : list [str ]
355+ ) -> set [str ]:
356+ """job_ids (within `source`) already in cached_jobs WITH a
357+ non-NULL embedding.
358+
359+ Embed-on-write skips these so the upsert never rewrites a stored
360+ vector. Computed with two cheap (source, job_id)-indexed reads:
361+ the rows that exist, and (of those) the ones whose embedding is
362+ still NULL — the difference is the already-embedded set. A row
363+ that exists but has a NULL embedding (e.g. a prior embed-on-
364+ write failure) is therefore NOT in the set, so the next refresh
365+ retries it.
366+
367+ Raises on a query failure — the caller treats that as "embed
368+ nothing this chunk".
369+ """
370+ client = self ._require_client ()
371+ ids = [jid for jid in job_ids if jid ]
372+ if not ids :
373+ return set ()
374+ existing : set [str ] = set ()
375+ null_embedding : set [str ] = set ()
376+ # Bound the IN-list so a large chunk can't build an over-long
377+ # PostgREST URL. The refresh upserts ~30 at a time, so this is
378+ # normally a single pass.
379+ for start in range (0 , len (ids ), 200 ):
380+ batch = ids [start : start + 200 ]
381+ existing_response = (
382+ client .table (self ._table )
383+ .select ("job_id" )
384+ .eq ("source" , source )
385+ .in_ ("job_id" , batch )
386+ .execute ()
387+ )
388+ for row in self ._extract_rows (existing_response ):
389+ jid = str (row .get ("job_id" ) or "" ).strip ()
390+ if jid :
391+ existing .add (jid )
392+ null_response = (
393+ client .table (self ._table )
394+ .select ("job_id" )
395+ .eq ("source" , source )
396+ .in_ ("job_id" , batch )
397+ .is_ ("embedding" , "null" )
398+ .execute ()
399+ )
400+ for row in self ._extract_rows (null_response ):
401+ jid = str (row .get ("job_id" ) or "" ).strip ()
402+ if jid :
403+ null_embedding .add (jid )
404+ return existing - null_embedding
405+
310406 def cleanup_missing (
311407 self ,
312408 * ,
0 commit comments