Skip to content

Commit f2edaf1

Browse files
committed
Add event store
1 parent 547652e commit f2edaf1

2 files changed

Lines changed: 108 additions & 4 deletions

File tree

pkg/capabilities/base_trigger.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ func NewBaseTriggerCapability(
6666
tRetransmit, tMax time.Duration,
6767
) *BaseTriggerCapability {
6868
ctx, cancel := context.WithCancel(context.Background())
69-
7069
return &BaseTriggerCapability{
7170
store: store,
7271
send: send,
@@ -80,9 +79,6 @@ func NewBaseTriggerCapability(
8079
}
8180
}
8281

83-
// If you want logs:
84-
func (b *BaseTriggerCapability) SetLogger(l *log.Logger) { b.lggr = l }
85-
8682
func (b *BaseTriggerCapability) Start(ctx context.Context) error {
8783
b.ctx, b.cancel = context.WithCancel(ctx)
8884

pkg/capabilities/event_store.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package capabilities
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"time"
8+
)
9+
10+
type pgEventStore struct {
11+
db *sql.DB
12+
tableName string
13+
}
14+
15+
// NewPostgresEventStore creates the table (if needed) and returns an EventStore backed by Postgres.
16+
func NewPostgresEventStore(ctx context.Context, db *sql.DB, tableName string) (EventStore, error) {
17+
s := &pgEventStore{db: db, tableName: tableName}
18+
if err := s.ensureSchema(ctx); err != nil {
19+
return nil, err
20+
}
21+
return s, nil
22+
}
23+
24+
func (s *pgEventStore) ensureSchema(ctx context.Context) error {
25+
ddl := fmt.Sprintf(`
26+
CREATE TABLE IF NOT EXISTS %s (
27+
trigger_id TEXT NOT NULL,
28+
workflow_id TEXT NOT NULL,
29+
event_id TEXT NOT NULL,
30+
any_type_url TEXT NOT NULL,
31+
payload BYTEA NOT NULL,
32+
first_at TIMESTAMPTZ NOT NULL,
33+
last_sent_at TIMESTAMPTZ,
34+
attempts INT NOT NULL DEFAULT 0,
35+
PRIMARY KEY (trigger_id, workflow_id, event_id)
36+
);
37+
CREATE INDEX IF NOT EXISTS %s_firstat_idx ON %s (first_at);
38+
`, s.tableName, s.tableName, s.tableName)
39+
40+
// Exec can run multiple statements on Postgres.
41+
_, err := s.db.ExecContext(ctx, ddl)
42+
return err
43+
}
44+
45+
func (s *pgEventStore) Insert(ctx context.Context, rec PendingEvent) error {
46+
q := fmt.Sprintf(`
47+
INSERT INTO %s (trigger_id, workflow_id, event_id, any_type_url, payload, first_at, last_sent_at, attempts)
48+
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
49+
ON CONFLICT (trigger_id, workflow_id, event_id)
50+
DO UPDATE SET
51+
any_type_url = EXCLUDED.any_type_url,
52+
payload = EXCLUDED.payload,
53+
first_at = LEAST(%s.first_at, EXCLUDED.first_at),
54+
last_sent_at = EXCLUDED.last_sent_at,
55+
attempts = EXCLUDED.attempts;`, s.tableName, s.tableName)
56+
57+
_, err := s.db.ExecContext(
58+
ctx, q,
59+
rec.TriggerId, rec.WorkflowId, rec.EventId,
60+
rec.AnyTypeURL, rec.Payload,
61+
rec.FirstAt, nullTime(rec.LastSentAt), rec.Attempts,
62+
)
63+
return err
64+
}
65+
66+
func (s *pgEventStore) Delete(ctx context.Context, triggerId, workflowId, eventId string) error {
67+
q := fmt.Sprintf(`DELETE FROM %s WHERE trigger_id=$1 AND workflow_id=$2 AND event_id=$3;`, s.tableName)
68+
_, err := s.db.ExecContext(ctx, q, triggerId, workflowId, eventId)
69+
return err
70+
}
71+
72+
func (s *pgEventStore) List(ctx context.Context) ([]PendingEvent, error) {
73+
q := fmt.Sprintf(`
74+
SELECT trigger_id, workflow_id, event_id, any_type_url, payload, first_at, last_sent_at, attempts
75+
FROM %s
76+
ORDER BY first_at ASC;`, s.tableName)
77+
78+
rows, err := s.db.QueryContext(ctx, q)
79+
if err != nil {
80+
return nil, err
81+
}
82+
defer rows.Close()
83+
84+
var out []PendingEvent
85+
for rows.Next() {
86+
var rec PendingEvent
87+
var lastSent sql.NullTime
88+
if err := rows.Scan(
89+
&rec.TriggerId, &rec.WorkflowId, &rec.EventId,
90+
&rec.AnyTypeURL, &rec.Payload,
91+
&rec.FirstAt, &lastSent, &rec.Attempts,
92+
); err != nil {
93+
return nil, err
94+
}
95+
if lastSent.Valid {
96+
rec.LastSentAt = lastSent.Time
97+
}
98+
out = append(out, rec)
99+
}
100+
return out, rows.Err()
101+
}
102+
103+
func nullTime(t time.Time) sql.NullTime {
104+
if t.IsZero() {
105+
return sql.NullTime{Valid: false}
106+
}
107+
return sql.NullTime{Time: t, Valid: true}
108+
}

0 commit comments

Comments
 (0)