11package main
22
33import (
4+ "bufio"
45 "context"
56 "encoding/json"
7+ "errors"
8+ "fmt"
9+ "io"
610 "os"
11+ "strings"
712
813 "github.com/jackc/pgx/v5"
914 "github.com/jackc/pgx/v5/pgxpool"
@@ -13,8 +18,8 @@ import (
1318)
1419
1520type BulkJob struct {
16- Args DPromptsJobArgs `json:"args "`
17- Metadata map [ string ] interface {} `json:"metadata ,omitempty"`
21+ SubTasks [] DPromptsSubTask `json:"sub_tasks "`
22+ BasePrompt string `json:"base_prompt ,omitempty"`
1823}
1924
2025// RunClient enqueues a job with args and metadata as JSON strings.
@@ -40,7 +45,6 @@ func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string,
4045 log .Fatal ().Err (err ).Msg ("Failed to parse args JSON" )
4146 }
4247
43-
4448 var insertOpts * river.InsertOpts
4549 if metadataJSON != "" {
4650 var metadata map [string ]interface {}
@@ -73,45 +77,197 @@ func enqueueBulkJobsFromFile(ctx context.Context, riverClient *river.Client[pgx.
7377 }
7478 defer file .Close ()
7579
76- var jobs []BulkJob
77- if err := json .NewDecoder (file ).Decode (& jobs ); err != nil {
78- return err
79- }
80+ decoder := json .NewDecoder (bufio .NewReader (file ))
8081
81- tx , err := dbPool .Begin (ctx )
82+ // Peek first non-whitespace token
83+ tok , err := nextNonSpaceToken (decoder )
8284 if err != nil {
83- return err
85+ return fmt . Errorf ( "cannot read file: %w" , err )
8486 }
85- defer tx .Rollback (ctx )
8687
87- var jobsToInsert []river.InsertManyParams
88+ // NDJSON format (each line = JSON object)
89+ if tok != json .Delim ('[' ) {
90+ return processNDJSON (ctx , decoder , riverClient , dbPool )
91+ }
92+
93+ return processJSONArray (ctx , decoder , riverClient , dbPool )
94+ }
95+
96+ // ------ JSON ARRAY VERSION ------
97+ func processJSONArray (ctx context.Context , decoder * json.Decoder , riverClient * river.Client [pgx.Tx ], dbPool * pgxpool.Pool ) error {
98+ const batchSize = 500
8899
89- for i := range jobs {
100+ batch := make ([]river.InsertManyParams , 0 , batchSize )
101+ total := 0
102+ count := 0
103+
104+ for decoder .More () {
105+ var job BulkJob
106+ if err := decoder .Decode (& job ); err != nil {
107+ return fmt .Errorf ("decode error at item %d: %w" , total , err )
108+ }
109+
110+ params , err := toInsertParams (job )
111+ if err != nil {
112+ log .Error ().
113+ Int ("job_index" , total ).
114+ Err (err ).
115+ Msg ("Failed to convert job to InsertManyParams" )
116+ return err
117+ }
118+
119+ batch = append (batch , params )
120+ total ++
121+ count ++
122+
123+ if total % 50 == 0 {
124+ log .Info ().Msgf ("Loaded %d jobs into batch..." , total )
125+ }
90126
91- var insertOpts * river. InsertOpts
92- if jobs [ i ]. Metadata != nil {
93- metadataBytes , err := json . Marshal ( jobs [ i ]. Metadata )
94- if err != nil {
127+ if count == batchSize {
128+ log . Info (). Msgf ( "Inserting batch of %d jobs (total so far: %d)" , batchSize , total )
129+ if err := insertBatch ( ctx , riverClient , dbPool , batch ); err != nil {
130+ log . Error (). Err ( err ). Msg ( "Failed to insert batch" )
95131 return err
96132 }
97- insertOpts = & river.InsertOpts {Metadata : metadataBytes }
133+ batch = batch [:0 ]
134+ count = 0
98135 }
99- jobsToInsert = append (jobsToInsert , river.InsertManyParams {
100- Args : jobs [i ].Args ,
101- InsertOpts : insertOpts ,
102- })
103136 }
104137
105- results , err := riverClient .InsertManyTx (ctx , tx , jobsToInsert )
138+ if len (batch ) > 0 {
139+ log .Info ().Msgf ("Inserting final batch of %d jobs (total: %d)" , len (batch ), total )
140+ if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
141+ log .Error ().Err (err ).Msg ("Failed to insert final batch" )
142+ return err
143+ }
144+ }
145+
146+ log .Info ().Msgf ("Bulk insert complete. Total jobs inserted: %d" , total )
147+ return nil
148+ }
149+
150+ // ------ NDJSON VERSION ------
151+ func processNDJSON (ctx context.Context , decoder * json.Decoder , riverClient * river.Client [pgx.Tx ], dbPool * pgxpool.Pool ) error {
152+ const batchSize = 500
153+
154+ batch := make ([]river.InsertManyParams , 0 , batchSize )
155+ total := 0
156+ count := 0
157+
158+ for {
159+ var job BulkJob
160+ if err := decoder .Decode (& job ); err != nil {
161+ if errors .Is (err , io .EOF ) {
162+ break
163+ }
164+ return err
165+ }
166+
167+ params , err := toInsertParams (job )
168+ if err != nil {
169+ log .Error ().
170+ Int ("job_index" , total ).
171+ Err (err ).
172+ Msg ("Failed to convert job to InsertManyParams" )
173+ return err
174+ }
175+
176+ batch = append (batch , params )
177+ total ++
178+ count ++
179+
180+ if total % 50 == 0 {
181+ log .Info ().Msgf ("Loaded %d jobs into batch..." , total )
182+ }
183+
184+ if count == batchSize {
185+ log .Info ().Msgf ("Inserting batch of %d jobs (total so far: %d)" , batchSize , total )
186+ if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
187+ log .Error ().Err (err ).Msg ("Failed to insert batch" )
188+ return err
189+ }
190+ batch = batch [:0 ]
191+ count = 0
192+ }
193+ }
194+
195+ if len (batch ) > 0 {
196+ log .Info ().Msgf ("Inserting final batch of %d jobs (total: %d)" , len (batch ), total )
197+ if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
198+ log .Error ().Err (err ).Msg ("Failed to insert final batch" )
199+ return err
200+ }
201+ }
202+
203+ log .Info ().Msgf ("Bulk insert complete. Total jobs inserted: %d" , total )
204+ return nil
205+ }
206+
207+ // Helper: Read first non-space token
208+ func nextNonSpaceToken (dec * json.Decoder ) (json.Token , error ) {
209+ for {
210+ t , err := dec .Token ()
211+ if err != nil {
212+ return nil , err
213+ }
214+ if _ , ok := t .(json.Delim ); ok || t != nil {
215+ return t , nil
216+ }
217+ }
218+ }
219+
220+ func toInsertParams (job BulkJob ) (river.InsertManyParams , error ) {
221+ if len (job .SubTasks ) == 0 {
222+ return river.InsertManyParams {}, fmt .Errorf ("job has no sub_tasks" )
223+ }
224+
225+ for i , st := range job .SubTasks {
226+ if strings .TrimSpace (st .Prompt ) == "" {
227+ return river.InsertManyParams {}, fmt .Errorf ("sub_task[%d] has empty prompt" , i )
228+ }
229+ }
230+
231+ var opts * river.InsertOpts
232+ if job .SubTasks [0 ].Metadata != nil {
233+ metadataBytes , _ := json .Marshal (job .SubTasks [0 ].Metadata )
234+ opts = & river.InsertOpts {
235+ Metadata : metadataBytes ,
236+ }
237+ }
238+
239+ return river.InsertManyParams {
240+ Args : DPromptsJobArgs {
241+ BasePrompt : job .BasePrompt ,
242+ SubTasks : job .SubTasks ,
243+ },
244+ InsertOpts : opts ,
245+ }, nil
246+ }
247+
248+ func insertBatch (
249+ ctx context.Context ,
250+ riverClient * river.Client [pgx.Tx ],
251+ dbPool * pgxpool.Pool ,
252+ batch []river.InsertManyParams ,
253+ ) error {
254+
255+ tx , err := dbPool .Begin (ctx )
106256 if err != nil {
107257 return err
108258 }
259+ defer func () {
260+ _ = tx .Rollback (ctx ) // safe no-op if already committed
261+ }()
262+
263+ if _ , err := riverClient .InsertManyTx (ctx , tx , batch ); err != nil {
264+ return err
265+ }
109266
110267 if err := tx .Commit (ctx ); err != nil {
111268 return err
112269 }
113270
114- log .Info ().Msgf ("Successfully enqueued %d jobs" , len (results ))
115271 return nil
116272}
117273
0 commit comments