Skip to content

Commit 3e52a85

Browse files
authored
Merge pull request #1 from HexmosTech/rijul/schema-feature
Added schema support, grouping of tasks, queue management commands and data viewing
2 parents 22932cd + 082d256 commit 3e52a85

13 files changed

Lines changed: 471 additions & 25 deletions

README.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,45 @@ The JSON file should be an array of job objects:
7575
]
7676
```
7777

78+
### View Last Results
79+
80+
To view the last `n` results processed by the worker:
81+
82+
```
83+
dpr --mode=view --n=20
84+
```
85+
86+
87+
### View Groups
88+
89+
List all groups created in the system:
90+
91+
```
92+
dpr --mode=view --total-groups
93+
```
94+
95+
### View results for a specific group by its ID
96+
97+
```
98+
dpr --mode=view --group=1
99+
```
100+
101+
### View Queued Jobs
102+
103+
```
104+
dpr --mode=queue --action=view --queue-n=20
105+
```
106+
107+
### Clear Queued Jobs
108+
109+
```
110+
dpr --mode=queue --action=clear
111+
```
112+
113+
114+
115+
116+
78117
## Useful Ollama Commands
79118

80119
- **Run Ollama server:**
@@ -115,4 +154,9 @@ The JSON file should be an array of job objects:
115154

116155
- The `.dprompts.toml` file **must** be placed in your home directory.
117156
- You can customize job arguments and metadata using the `--args` and `--metadata` flags (as JSON).
118-
- The worker will process jobs and store results in the configured PostgreSQL database.
157+
- The worker will process jobs and store results in the configured PostgreSQL database.
158+
- **PostgreSQL Storage Details:**
159+
- `dprompt_results` — stores the results of processed jobs.
160+
- `dprompt_groups` — stores job groups with unique `group_name` and `id`.
161+
- Groups are a way to organize related jobs that work toward the same goal.
162+
- This makes it easy to view or analyze all jobs related to a single goal together.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
[
2+
{
3+
"args": {
4+
"prompt": "Explain the difference between TCP and UDP.",
5+
"group_name": "networking_basics"
6+
},
7+
"metadata": {
8+
"type": "test",
9+
"category": "networking",
10+
"filename": "tcp_udp.md"
11+
}
12+
},
13+
{
14+
"args": {
15+
"prompt": "What is a subnet mask and why is it important?",
16+
"group_name": "networking_basics"
17+
},
18+
"metadata": {
19+
"type": "test",
20+
"category": "networking",
21+
"filename": "subnet_mask.md"
22+
}
23+
},
24+
{
25+
"args": {
26+
"prompt": "How do you reverse a linked list in Python?",
27+
"group_name": "python_algorithms"
28+
},
29+
"metadata": {
30+
"type": "test",
31+
"category": "python",
32+
"filename": "reverse_linked_list.md"
33+
}
34+
},
35+
{
36+
"args": {
37+
"prompt": "Explain Python's list comprehension with examples.",
38+
"group_name": "python_algorithms"
39+
},
40+
"metadata": {
41+
"type": "test",
42+
"category": "python",
43+
"filename": "list_comprehension.md"
44+
}
45+
}
46+
]
47+

client.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string,
4040
log.Fatal().Err(err).Msg("Failed to parse args JSON")
4141
}
4242

43+
4344
var insertOpts *river.InsertOpts
4445
if metadataJSON != "" {
4546
var metadata map[string]interface{}
@@ -58,7 +59,11 @@ func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string,
5859
if _, err := riverClient.Insert(ctx, &args, insertOpts); err != nil {
5960
log.Fatal().Err(err).Msg("Failed to enqueue job")
6061
}
61-
log.Info().Interface("args", args).Interface("metadata", insertOpts).Msg("Enqueued job")
62+
63+
log.Info().
64+
Interface("args", args).
65+
Interface("metadata", insertOpts).
66+
Msg("Enqueued job")
6267
}
6368

6469
func enqueueBulkJobsFromFile(ctx context.Context, riverClient *river.Client[pgx.Tx], dbPool *pgxpool.Pool, filename string) error {
@@ -80,17 +85,19 @@ func enqueueBulkJobsFromFile(ctx context.Context, riverClient *river.Client[pgx.
8085
defer tx.Rollback(ctx)
8186

8287
var jobsToInsert []river.InsertManyParams
83-
for _, job := range jobs {
88+
89+
for i := range jobs {
90+
8491
var insertOpts *river.InsertOpts
85-
if job.Metadata != nil {
86-
metadataBytes, err := json.Marshal(job.Metadata)
92+
if jobs[i].Metadata != nil {
93+
metadataBytes, err := json.Marshal(jobs[i].Metadata)
8794
if err != nil {
8895
return err
8996
}
9097
insertOpts = &river.InsertOpts{Metadata: metadataBytes}
9198
}
9299
jobsToInsert = append(jobsToInsert, river.InsertManyParams{
93-
Args: job.Args,
100+
Args: jobs[i].Args,
94101
InsertOpts: insertOpts,
95102
})
96103
}

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,4 @@ func NewDBPool(ctx context.Context, configPath string) (*pgxpool.Pool, error) {
6565
}
6666

6767
return dbPool, nil
68-
}
68+
}

groupmanagement.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/jackc/pgx/v5/pgxpool"
8+
)
9+
10+
// DeleteGroupAndResults deletes a group by its ID and all associated results
11+
func DeleteGroupAndResults(ctx context.Context, db *pgxpool.Pool, groupID int) error {
12+
// Delete associated results first
13+
res1, err := db.Exec(ctx, `DELETE FROM dprompts_results WHERE group_id = $1`, groupID)
14+
if err != nil {
15+
return fmt.Errorf("failed to delete results: %w", err)
16+
}
17+
18+
// Delete the group itself
19+
res2, err := db.Exec(ctx, `DELETE FROM dprompt_groups WHERE id = $1`, groupID)
20+
if err != nil {
21+
return fmt.Errorf("failed to delete group: %w", err)
22+
}
23+
24+
fmt.Printf("Deleted %d results and %d group(s) for group ID %d\n", res1.RowsAffected(), res2.RowsAffected(), groupID)
25+
return nil
26+
}

main.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ func main() {
1919
metadataJSON := flag.String("metadata", "", "Job metadata as JSON (for client mode)")
2020
bulkFile := flag.String("bulk-from-file", "", "Bulk insert jobs from JSON file")
2121
configPath := flag.String("config", "", "Path to config file (default: $HOME/.dprompt.toml)")
22+
23+
totalGroups := flag.Bool("total-groups", false, "Display total number of groups (view mode)")
24+
groupID := flag.Int("group", 0, "Display results for a specific group ID (view mode)")
25+
deleteGroupID := flag.Int("delete-group-id", 0, "Delete a specific group ID and all its associated results")
26+
27+
n := flag.Int("n", 10, "Number of results to display (view mode)")
28+
29+
queueN := flag.Int("queue-n", 10, "Number of queued jobs to display (for view action)")
30+
queueAction := flag.String("action", "", "Queue action: 'view' to list queued jobs, 'clear' to delete all queued jobs")
2231

2332
flag.Parse()
2433

@@ -39,6 +48,41 @@ func main() {
3948
RunClient(ctx, driver, *argsJSON, *metadataJSON, *bulkFile, dbPool)
4049
case "worker":
4150
RunWorker(ctx, driver, cancel, dbPool)
51+
case "delete-group":
52+
if *deleteGroupID == 0 {
53+
log.Fatal().Msg("Please provide --delete-group-id")
54+
}
55+
if err := DeleteGroupAndResults(ctx, dbPool, *deleteGroupID); err != nil {
56+
log.Fatal().Err(err).Msg("Failed to delete group and results")
57+
}
58+
log.Info().Int("group_id", *deleteGroupID).Msg("Deleted group and associated results")
59+
case "view":
60+
if *totalGroups {
61+
if err := viewTotalGroups(ctx, dbPool); err != nil {
62+
log.Fatal().Err(err).Msg("Failed to get total groups")
63+
}
64+
} else if *groupID != 0 {
65+
if err := viewResultsByGroup(ctx, dbPool, *groupID); err != nil {
66+
log.Fatal().Err(err).Msg("Failed to get results by group")
67+
}
68+
} else {
69+
if err := viewLastResults(ctx, dbPool, *n); err != nil {
70+
log.Fatal().Err(err).Msg("Failed to get last results")
71+
}
72+
}
73+
case "queue":
74+
switch *queueAction {
75+
case "view":
76+
if err := ViewQueuedJobs(ctx, dbPool, *queueN); err != nil {
77+
log.Fatal().Err(err).Msg("Failed to view queued jobs")
78+
}
79+
case "clear":
80+
if err := ClearQueuedJobs(ctx, dbPool); err != nil {
81+
log.Fatal().Err(err).Msg("Failed to clear queued jobs")
82+
}
83+
default:
84+
log.Fatal().Str("action", *queueAction).Msg("Unknown queue action")
85+
}
4286
default:
4387
log.Fatal().Str("mode", *mode).Msg("Unknown mode")
4488
}

managequeuedjobs.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/jackc/pgx/v5/pgxpool"
9+
)
10+
11+
func ViewQueuedJobs(ctx context.Context, db *pgxpool.Pool, n int) error {
12+
rows, err := db.Query(ctx, `
13+
SELECT id, state, created_at, scheduled_at
14+
FROM river_job
15+
WHERE state IN ('available', 'scheduled')
16+
ORDER BY created_at DESC
17+
LIMIT $1
18+
`, n)
19+
if err != nil {
20+
return err
21+
}
22+
defer rows.Close()
23+
24+
fmt.Printf("Last %d queued jobs:\n", n)
25+
for rows.Next() {
26+
var id int64
27+
var state string
28+
var createdAt, scheduledAt time.Time
29+
if err := rows.Scan(&id, &state, &createdAt, &scheduledAt); err != nil {
30+
return err
31+
}
32+
fmt.Printf("ID: %d | State: %s | CreatedAt: %s | ScheduledAt: %s\n",
33+
id, state, createdAt.Format(time.RFC3339), scheduledAt.Format(time.RFC3339))
34+
}
35+
return rows.Err()
36+
}
37+
38+
func ClearQueuedJobs(ctx context.Context, db *pgxpool.Pool) error {
39+
res, err := db.Exec(ctx, `
40+
DELETE FROM river_job
41+
WHERE state IN ('available', 'scheduled')
42+
`)
43+
if err != nil {
44+
return err
45+
}
46+
fmt.Printf("Deleted %d queued jobs\n", res.RowsAffected())
47+
return nil
48+
}

ollama.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,33 @@ func LoadLLMConfig(configPath string) (*LLMConfig, error) {
2525
return &conf.LLM, nil
2626
}
2727

28-
func CallOllama(prompt string, configPath string) (string, error) {
28+
func CallOllama(prompt string, schema interface{}, configPath string) (string, error) {
2929
llmConfig, err := LoadLLMConfig(configPath)
3030
if err != nil {
3131
return "", err
3232
}
3333

34-
reqBody, err := json.Marshal(map[string]interface{}{
35-
"model": llmConfig.Model,
36-
"messages": []map[string]string{{"role": "user", "content": prompt}},
37-
"stream": false,
38-
"format": "json",
34+
// Build request body
35+
req := map[string]interface{}{
36+
"model": llmConfig.Model,
37+
"stream": false,
38+
"messages": []map[string]string{
39+
{"role": "user", "content": prompt},
40+
},
3941
"options": map[string]float64{
4042
"temperature": llmConfig.Temperature,
4143
"top_p": llmConfig.TopP,
42-
"num_predict": 100,
4344
},
44-
})
45+
}
46+
47+
// ⬇️ Only include schema if provided
48+
if schema != nil {
49+
req["format"] = schema
50+
} else {
51+
req["format"] = "json" // default JSON output
52+
}
53+
54+
reqBody, err := json.Marshal(req)
4555
if err != nil {
4656
return "", err
4757
}

sql-queries/dprompts-groups.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE dprompt_groups (
2+
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
3+
group_name TEXT NOT NULL UNIQUE,
4+
created_at TIMESTAMPTZ DEFAULT NOW()
5+
);

sql-queries/dprompts-results.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE dprompts_results (
2+
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
3+
job_id BIGINT UNIQUE,
4+
response JSONB,
5+
created_at TIMESTAMPTZ DEFAULT NOW(),
6+
group_id INT,
7+
CONSTRAINT fk_group
8+
FOREIGN KEY (group_id)
9+
REFERENCES dprompt_groups(id)
10+
);

0 commit comments

Comments
 (0)