@@ -32,43 +32,41 @@ export async function createDb(databaseUrl: string): Promise<DbClient> {
3232
3333 return {
3434 async upsertDocument ( doc : Partial < DocumentRecord > & { id : string } ) {
35- const existing = await sql `SELECT id FROM documents WHERE id = ${ doc . id } ` . then (
36- ( rows ) => rows [ 0 ]
35+ const columns = Object . keys ( doc ) . filter (
36+ ( k ) => doc [ k as keyof typeof doc ] !== undefined
3737 ) ;
38+ const values = columns . map ( ( k ) => doc [ k as keyof typeof doc ] ) ;
39+ const placeholders = columns . map ( ( _ , i ) => `$${ i + 1 } ` ) . join ( ", " ) ;
3840
39- if ( existing ) {
40- // Build dynamic update
41- const updates : Record < string , unknown > = { } ;
42- for ( const [ key , value ] of Object . entries ( doc ) ) {
43- if ( key !== "id" && value !== undefined ) {
44- updates [ key ] = value ;
45- }
46- }
47-
48- if ( Object . keys ( updates ) . length > 0 ) {
49- // Use raw SQL for dynamic updates
50- const setClauses = Object . keys ( updates )
51- . map ( ( key , i ) => `${ key } = $${ i + 2 } ` )
52- . join ( ", " ) ;
53- const values = [ doc . id , ...Object . values ( updates ) ] ;
54-
55- await sql . unsafe (
56- `UPDATE documents SET ${ setClauses } WHERE id = $1` ,
57- values
58- ) ;
59- }
60- } else {
61- // Insert
62- const columns = Object . keys ( doc ) . filter (
63- ( k ) => doc [ k as keyof typeof doc ] !== undefined
64- ) ;
65- const values = columns . map ( ( k ) => doc [ k as keyof typeof doc ] ) ;
66- const placeholders = columns . map ( ( _ , i ) => `$${ i + 1 } ` ) . join ( ", " ) ;
41+ // Check if this looks like an insert (has required fields) or update (partial)
42+ const hasRequiredFields = "source_url" in doc && "crawl_id" in doc ;
43+
44+ if ( hasRequiredFields ) {
45+ // Full insert with ON CONFLICT for atomic upsert
46+ const updateColumns = columns . filter ( ( k ) => k !== "id" ) ;
47+ const updateClauses = updateColumns
48+ . map ( ( key ) => `${ key } = EXCLUDED.${ key } ` )
49+ . join ( ", " ) ;
6750
6851 await sql . unsafe (
69- `INSERT INTO documents (${ columns . join ( ", " ) } ) VALUES (${ placeholders } )` ,
52+ `INSERT INTO documents (${ columns . join ( ", " ) } ) VALUES (${ placeholders } )
53+ ON CONFLICT (id) DO UPDATE SET ${ updateClauses } ` ,
7054 values as unknown [ ]
7155 ) ;
56+ } else {
57+ // Partial update - only update specified fields
58+ const updateColumns = columns . filter ( ( k ) => k !== "id" ) ;
59+ if ( updateColumns . length === 0 ) return ;
60+
61+ const setClauses = updateColumns
62+ . map ( ( key , i ) => `${ key } = $${ i + 2 } ` )
63+ . join ( ", " ) ;
64+ const updateValues = [ doc . id , ...updateColumns . map ( ( k ) => doc [ k as keyof typeof doc ] ) ] ;
65+
66+ await sql . unsafe (
67+ `UPDATE documents SET ${ setClauses } WHERE id = $1` ,
68+ updateValues
69+ ) ;
7270 }
7371 } ,
7472
0 commit comments