Skip to content

Commit 75cd2dd

Browse files
committed
Formatted
1 parent 9e83afc commit 75cd2dd

9 files changed

Lines changed: 13 additions & 80 deletions

File tree

client.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"github.com/rs/zerolog/log"
1818
)
1919

20-
21-
2220
type BulkJob struct {
2321
SubTasks []DPromptsSubTask `json:"sub_tasks"`
2422
BasePrompt string `json:"base_prompt,omitempty"`
@@ -47,7 +45,6 @@ func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string,
4745
log.Fatal().Err(err).Msg("Failed to parse args JSON")
4846
}
4947

50-
5148
var insertOpts *river.InsertOpts
5249
if metadataJSON != "" {
5350
var metadata map[string]interface{}
@@ -206,6 +203,7 @@ func processNDJSON(ctx context.Context, decoder *json.Decoder, riverClient *rive
206203
log.Info().Msgf("Bulk insert complete. Total jobs inserted: %d", total)
207204
return nil
208205
}
206+
209207
// Helper: Read first non-space token
210208
func nextNonSpaceToken(dec *json.Decoder) (json.Token, error) {
211209
for {
@@ -247,11 +245,6 @@ func toInsertParams(job BulkJob) (river.InsertManyParams, error) {
247245
}, nil
248246
}
249247

250-
251-
252-
253-
254-
255248
func insertBatch(
256249
ctx context.Context,
257250
riverClient *river.Client[pgx.Tx],
@@ -278,9 +271,6 @@ func insertBatch(
278271
return nil
279272
}
280273

281-
282-
283-
284274
func newRiverClient(driver *riverpgxv5.Driver) (*river.Client[pgx.Tx], error) {
285275
return river.NewClient[pgx.Tx](driver, &river.Config{})
286276
}

config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/rs/zerolog/log"
1212
)
1313

14-
1514
func LoadDBConfig(path string) (*DBConfig, error) {
1615
var conf struct {
1716
Database DBConfig
@@ -23,7 +22,6 @@ func LoadDBConfig(path string) (*DBConfig, error) {
2322
return &conf.Database, nil
2423
}
2524

26-
2725
func NewDBPool(ctx context.Context, configPath string) (*pgxpool.Pool, error) {
2826

2927
if configPath == "" {
@@ -84,4 +82,3 @@ func LoadWorkerConfig(path string) (*WorkerConfig, error) {
8482

8583
return &conf.Worker, nil
8684
}
87-

groupmanagement.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,3 @@ func DeleteGroupAndResults(ctx context.Context, db *pgxpool.Pool, groupID int) e
2424
fmt.Printf("Deleted %d results and %d group(s) for group ID %d\n", res1.RowsAffected(), res2.RowsAffected(), groupID)
2525
return nil
2626
}
27-
28-
func DeleteGroupAndResultsAll(ctx context.Context, db *pgxpool.Pool,i int) error {
29-
// Delete all results that belong to any group
30-
_ = i
31-
res1, err := db.Exec(ctx, `
32-
DELETE FROM dprompts_results
33-
WHERE group_id IS NOT NULL
34-
`)
35-
if err != nil {
36-
return fmt.Errorf("failed to delete grouped results: %w", err)
37-
}
38-
39-
// Delete all groups
40-
res2, err := db.Exec(ctx, `
41-
DELETE FROM dprompt_groups
42-
`)
43-
if err != nil {
44-
return fmt.Errorf("failed to delete groups: %w", err)
45-
}
46-
47-
fmt.Printf(
48-
"TEMP CLEANUP: Deleted %d grouped results and %d group(s)\n",
49-
res1.RowsAffected(),
50-
res2.RowsAffected(),
51-
)
52-
53-
return nil
54-
}
55-

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func main() {
1616
var configPath string
1717

1818
rootCmd := &cobra.Command{
19-
Use: "dpr",
19+
Use: "dpr",
2020
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
2121
if configPath == "" {
2222
home, err := os.UserHomeDir()

managequeuedjobs.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ func CountQueuedJobs(ctx context.Context, db *pgxpool.Pool) error {
2525
return nil
2626
}
2727

28-
2928
func ViewQueuedJobs(ctx context.Context, db *pgxpool.Pool, n int) error {
3029
rows, err := db.Query(ctx, `
3130
SELECT id, state, created_at, scheduled_at
@@ -120,8 +119,6 @@ func ViewFirstCompletedJobs(ctx context.Context, db *pgxpool.Pool, n int) error
120119
return rows.Err()
121120
}
122121

123-
124-
125122
func ViewLastCompletedJobs(ctx context.Context, db *pgxpool.Pool, n int) error {
126123
rows, err := db.Query(ctx, `
127124
SELECT id, created_at, finalized_at, args
@@ -159,5 +156,3 @@ func ViewLastCompletedJobs(ctx context.Context, db *pgxpool.Pool, n int) error {
159156

160157
return rows.Err()
161158
}
162-
163-

ollama.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/BurntSushi/toml"
1212
)
1313

14-
15-
1614
func LoadLLMConfig(configPath string) (*LLMConfig, error) {
1715
var conf struct {
1816
LLM LLMConfig
@@ -91,6 +89,3 @@ func CallOllama(
9189

9290
return ollamaResp.Message.Content, nil
9391
}
94-
95-
96-

types.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ type DBConfig struct {
1010
}
1111

1212
type DPromptsSubTask struct {
13-
Prompt string `json:"prompt"`
14-
Schema interface{} `json:"schema,omitempty"`
13+
Prompt string `json:"prompt"`
14+
Schema interface{} `json:"schema,omitempty"`
1515
Metadata map[string]interface{} `json:"metadata,omitempty"` // <-- new
1616

1717
}
1818

1919
type DPromptsJobArgs struct {
2020
SubTasks []DPromptsSubTask `json:"sub_tasks"`
2121
BasePrompt string `json:"base_prompt,omitempty"`
22-
2322
}
2423

2524
type DPromptsJobResult struct {
@@ -44,8 +43,6 @@ type OllamaResponse struct {
4443
} `json:"message"`
4544
}
4645

47-
4846
type WorkerConfig struct {
4947
ConcurrentWorkers int `toml:"concurrent_workers"`
5048
}
51-

viewresults.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,6 @@ func viewTotalGroups(ctx context.Context, db *pgxpool.Pool) error {
147147
return rows.Err()
148148
}
149149

150-
151-
152150
// CLI: Display results filtered by group ID
153151
func viewResultsByGroup(ctx context.Context, db *pgxpool.Pool, groupID int) error {
154152
rows, err := db.Query(ctx, `

worker.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"os"
88
"os/signal"
9-
"path/filepath"
109
"strconv"
1110
"syscall"
1211
"time"
@@ -41,7 +40,6 @@ func humanizeDuration(d time.Duration) string {
4140
}
4241
}
4342

44-
4543
func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArgs]) error {
4644
jobStart := time.Now()
4745
jobID := strconv.FormatInt(job.ID, 10)
@@ -62,10 +60,10 @@ func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArg
6260

6361
homeDir, err := os.UserHomeDir()
6462
if err != nil {
63+
log.Error().Err(err).Msg("Unable to determine home directory for config file")
6564
return err
6665
}
67-
configPath := filepath.Join(homeDir, ".dprompts.toml")
68-
66+
configPath := homeDir + string(os.PathSeparator) + ".dprompts.toml"
6967
tx, err := w.db.Begin(ctx)
7068
if err != nil {
7169
return err
@@ -85,10 +83,10 @@ func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArg
8583
// ---- subtasks ----
8684
for i, sub := range job.Args.SubTasks {
8785
log.Info().
88-
Str("job_id", jobID).
89-
Int("subtask", i).
90-
Any("metadata", sub.Metadata).
91-
Msg("Subtask started")
86+
Str("job_id", jobID).
87+
Int("subtask", i).
88+
Any("metadata", sub.Metadata).
89+
Msg("Subtask started")
9290
ollamaStart := time.Now()
9391

9492
response, err := CallOllama(
@@ -155,11 +153,6 @@ func (w *DPromptsWorker) Work(ctx context.Context, job *river.Job[DPromptsJobArg
155153
return nil
156154
}
157155

158-
159-
160-
161-
162-
163156
func RegisterWorkers(db *pgxpool.Pool) *river.Workers {
164157
workers := river.NewWorkers()
165158
river.AddWorker(workers, &DPromptsWorker{db: db})
@@ -171,8 +164,8 @@ func createWorkerClient(
171164
workers *river.Workers,
172165
concurrentWorkers int) (*river.Client[pgx.Tx], error) {
173166
log.Info().
174-
Int("concurrent_workers", concurrentWorkers).
175-
Msg("Initializing River worker client")
167+
Int("concurrent_workers", concurrentWorkers).
168+
Msg("Initializing River worker client")
176169
return river.NewClient[pgx.Tx](driver, &river.Config{
177170
Queues: map[string]river.QueueConfig{
178171
river.QueueDefault: {MaxWorkers: concurrentWorkers},
@@ -183,7 +176,7 @@ func createWorkerClient(
183176
}
184177

185178
func RunWorker(ctx context.Context, driver *riverpgxv5.Driver, cancel context.CancelFunc, db *pgxpool.Pool) {
186-
179+
187180
homeDir, err := os.UserHomeDir()
188181
if err != nil {
189182
log.Fatal().Err(err).Msg("Unable to determine home directory")
@@ -256,7 +249,6 @@ func (w *DPromptsWorker) resolveGroup(ctx context.Context, tx pgx.Tx, jobID stri
256249
return &id, nil
257250
}
258251

259-
260252
// insertResult inserts or updates a dprompt result for a job
261253
func (w *DPromptsWorker) insertResult(ctx context.Context, tx pgx.Tx, jobID int64, jsonResponse []byte, groupID *int) error {
262254
_, err := tx.Exec(ctx,
@@ -275,5 +267,3 @@ func (w *DPromptsWorker) insertResult(ctx context.Context, tx pgx.Tx, jobID int6
275267
}
276268
return nil
277269
}
278-
279-

0 commit comments

Comments
 (0)