88 "fmt"
99 "io"
1010 "os"
11+ "strings"
1112
1213 "github.com/jackc/pgx/v5"
1314 "github.com/jackc/pgx/v5/pgxpool"
@@ -16,9 +17,11 @@ import (
1617 "github.com/rs/zerolog/log"
1718)
1819
20+
21+
1922type BulkJob struct {
20- Args DPromptsJobArgs `json:"args "`
21- Metadata map [ string ] interface {} `json:"metadata ,omitempty"`
23+ SubTasks [] DPromptsSubTask `json:"sub_tasks "`
24+ BasePrompt string `json:"base_prompt ,omitempty"`
2225}
2326
2427// RunClient enqueues a job with args and metadata as JSON strings.
@@ -109,32 +112,38 @@ func processJSONArray(ctx context.Context, decoder *json.Decoder, riverClient *r
109112
110113 params , err := toInsertParams (job )
111114 if err != nil {
115+ log .Error ().
116+ Int ("job_index" , total ).
117+ Err (err ).
118+ Msg ("Failed to convert job to InsertManyParams" )
112119 return err
113120 }
114121
115122 batch = append (batch , params )
116123 total ++
117124 count ++
118125
119- if total % 100 == 0 {
120- log .Info ().Msgf ("Loaded %d jobs..." , total )
126+ if total % 50 == 0 {
127+ log .Info ().Msgf ("Loaded %d jobs into batch ..." , total )
121128 }
122129
123130 if count == batchSize {
131+ log .Info ().Msgf ("Inserting batch of %d jobs (total so far: %d)" , batchSize , total )
124132 if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
133+ log .Error ().Err (err ).Msg ("Failed to insert batch" )
125134 return err
126135 }
127- log .Info ().Msgf ("Inserted batch of %d jobs (total: %d)" , batchSize , total )
128136 batch = batch [:0 ]
129137 count = 0
130138 }
131139 }
132140
133141 if len (batch ) > 0 {
142+ log .Info ().Msgf ("Inserting final batch of %d jobs (total: %d)" , len (batch ), total )
134143 if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
144+ log .Error ().Err (err ).Msg ("Failed to insert final batch" )
135145 return err
136146 }
137- log .Info ().Msgf ("Inserted final batch of %d jobs (total: %d)" , len (batch ), total )
138147 }
139148
140149 log .Info ().Msgf ("Bulk insert complete. Total jobs inserted: %d" , total )
@@ -160,38 +169,43 @@ func processNDJSON(ctx context.Context, decoder *json.Decoder, riverClient *rive
160169
161170 params , err := toInsertParams (job )
162171 if err != nil {
172+ log .Error ().
173+ Int ("job_index" , total ).
174+ Err (err ).
175+ Msg ("Failed to convert job to InsertManyParams" )
163176 return err
164177 }
165178
166179 batch = append (batch , params )
167180 total ++
168181 count ++
169182
170- if total % 100 == 0 {
171- log .Info ().Msgf ("Loaded %d jobs..." , total )
183+ if total % 50 == 0 {
184+ log .Info ().Msgf ("Loaded %d jobs into batch ..." , total )
172185 }
173186
174187 if count == batchSize {
188+ log .Info ().Msgf ("Inserting batch of %d jobs (total so far: %d)" , batchSize , total )
175189 if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
190+ log .Error ().Err (err ).Msg ("Failed to insert batch" )
176191 return err
177192 }
178- log .Info ().Msgf ("Inserted batch of %d jobs (total: %d)" , batchSize , total )
179193 batch = batch [:0 ]
180194 count = 0
181195 }
182196 }
183197
184198 if len (batch ) > 0 {
199+ log .Info ().Msgf ("Inserting final batch of %d jobs (total: %d)" , len (batch ), total )
185200 if err := insertBatch (ctx , riverClient , dbPool , batch ); err != nil {
201+ log .Error ().Err (err ).Msg ("Failed to insert final batch" )
186202 return err
187203 }
188- log .Info ().Msgf ("Inserted final batch of %d jobs (total: %d)" , len (batch ), total )
189204 }
190205
191206 log .Info ().Msgf ("Bulk insert complete. Total jobs inserted: %d" , total )
192207 return nil
193208}
194-
195209// Helper: Read first non-space token
196210func nextNonSpaceToken (dec * json.Decoder ) (json.Token , error ) {
197211 for {
@@ -206,35 +220,67 @@ func nextNonSpaceToken(dec *json.Decoder) (json.Token, error) {
206220}
207221
208222func toInsertParams (job BulkJob ) (river.InsertManyParams , error ) {
209- var insertOpts * river.InsertOpts
210- if job .Metadata != nil {
211- metaBytes , err := json .Marshal (job .Metadata )
212- if err != nil {
213- return river.InsertManyParams {}, err
223+ if len (job .SubTasks ) == 0 {
224+ return river.InsertManyParams {}, fmt .Errorf ("job has no sub_tasks" )
225+ }
226+
227+ for i , st := range job .SubTasks {
228+ if strings .TrimSpace (st .Prompt ) == "" {
229+ return river.InsertManyParams {}, fmt .Errorf ("sub_task[%d] has empty prompt" , i )
230+ }
231+ }
232+
233+ var opts * river.InsertOpts
234+ if job .SubTasks [0 ].Metadata != nil {
235+ metadataBytes , _ := json .Marshal (job .SubTasks [0 ].Metadata )
236+ opts = & river.InsertOpts {
237+ Metadata : metadataBytes ,
214238 }
215- insertOpts = & river.InsertOpts {Metadata : metaBytes }
216239 }
240+
217241 return river.InsertManyParams {
218- Args : job .Args ,
219- InsertOpts : insertOpts ,
242+ Args : DPromptsJobArgs {
243+ BasePrompt : job .BasePrompt ,
244+ SubTasks : job .SubTasks ,
245+ },
246+ InsertOpts : opts ,
220247 }, nil
221248}
222249
223- func insertBatch (ctx context.Context , riverClient * river.Client [pgx.Tx ], dbPool * pgxpool.Pool , batch []river.InsertManyParams ) error {
250+
251+
252+
253+
254+
255+ func insertBatch (
256+ ctx context.Context ,
257+ riverClient * river.Client [pgx.Tx ],
258+ dbPool * pgxpool.Pool ,
259+ batch []river.InsertManyParams ,
260+ ) error {
261+
224262 tx , err := dbPool .Begin (ctx )
225263 if err != nil {
226264 return err
227265 }
228- defer tx .Rollback (ctx )
266+ defer func () {
267+ _ = tx .Rollback (ctx ) // safe no-op if already committed
268+ }()
229269
230270 if _ , err := riverClient .InsertManyTx (ctx , tx , batch ); err != nil {
231271 return err
232272 }
233273
234- return tx .Commit (ctx )
274+ if err := tx .Commit (ctx ); err != nil {
275+ return err
276+ }
277+
278+ return nil
235279}
236280
237281
282+
283+
238284func newRiverClient (driver * riverpgxv5.Driver ) (* river.Client [pgx.Tx ], error ) {
239285 return river .NewClient [pgx.Tx ](driver , & river.Config {})
240286}
0 commit comments