Skip to content

Commit 4cc5062

Browse files
committed
fix:worker
1 parent fc88014 commit 4cc5062

4 files changed

Lines changed: 22 additions & 4 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
dprompts

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
# dPrompt
1+
# dPrompts
22

33
## Overview
44

5-
dPrompt is a distributed job processing system using River and PostgreSQL, with support for LLM (Ollama) integration.
5+
dPrompts enables teams to perform distributed, bulk LLM operations locally using Ollama, which is cost-effective and works on most laptops with an integrated GPU.
66

77
## Installation
88

dprompts

-15.8 MB
Binary file not shown.

worker.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,14 @@ func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArg
5454
return err
5555
}
5656

57-
_, err = w.db.Exec(ctx,
58-
"INSERT INTO dprompts_results (job_id, response) VALUES ($1, $2)",
57+
tx, err := w.db.Begin(ctx)
58+
if err != nil {
59+
return err
60+
}
61+
defer tx.Rollback(ctx)
62+
63+
_, err = tx.Exec(ctx,
64+
"INSERT INTO dprompts_results (job_id, response) VALUES ($1, $2) ON CONFLICT (job_id) DO NOTHING",
5965
job.ID,
6066
jsonResponse,
6167
)
@@ -64,6 +70,17 @@ func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArg
6470
return err
6571
}
6672

73+
_, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
74+
if err != nil {
75+
log.Error().Err(err).Msg("Failed to complete job transactionally")
76+
return err
77+
}
78+
79+
if err := tx.Commit(ctx); err != nil {
80+
log.Error().Err(err).Msg("Failed to commit transaction")
81+
return err
82+
}
83+
6784
log.Info().
6885
Str("job_id", strconv.FormatInt(job.ID, 10)).
6986
Msg("Job completed and saved")

0 commit comments

Comments
 (0)