Hi! First of all, great project. I just discovered it and I really liked the approach.
Please correct me if some of my assumptions are incorrect! Also, is there any other channel for asking these kind of questions? I see no discord, slack, or any other comm. channel in the repo
River unique jobs + scheduled_at pattern is exactly the foundation we need for our event processing system.
Use case
We have a CDC pipeline (outbox -> Pub/Sub -> processors) where multiple events can arrive for the same entity in quick succession. We want to collect all events that arrive within a fixed window and process them together in a single job.
What we'll be doing
We’re using unique jobs with ByPeriod + ScheduledAt to ensure only one job exists per entity during the collection window. That part works perfectly.
The problem is attaching the event data. When the second event arrives and the insert returns UniqueSkippedAsDuplicate, the new event’s args are discarded. We need that data.
Our current workaround is an associated table:
result, err := riverClient.Insert(ctx, ProcessArgs{PartitionKey: "installation:123"}, &river.InsertOpts{
UniqueOpts: river.UniqueOpts{ByPeriod: 5 * time.Second},
ScheduledAt: time.Now().Add(5 * time.Second),
})
// Whether new or duplicate, attach the event data
jobID := result.Job.ID
db.Exec("INSERT INTO job_events (job_id, event_id, event_type, data) VALUES ($1, $2, $3, $4)",
jobID, eventID, eventType, eventData)
When the worker picks up the job, it joins against job_events to get all collected events.
My question:
Is there a recommended pattern within River for this kind of event aggregation? Specifically:
- Is there a way to append/merge data into an existing unique job’s args rather than discarding the duplicate’s args?
- If not, is the child table approach reasonable, or is there a better pattern you’d suggest?
- Any concerns with inserting into a child table referencing result.Job.ID from a UniqueSkippedAsDuplicate result — particularly around race conditions with the worker claiming the job between the insert check and the child row write?
Happy to share more context on our architecture if helpful. Thanks!
(P.D. written in the GitHub mobile app, sorry for the formatting!)
Hi! First of all, great project. I just discovered it and I really liked the approach.
Please correct me if some of my assumptions are incorrect! Also, is there any other channel for asking these kind of questions? I see no discord, slack, or any other comm. channel in the repo
River unique jobs +
scheduled_atpattern is exactly the foundation we need for our event processing system.Use case
We have a CDC pipeline (outbox -> Pub/Sub -> processors) where multiple events can arrive for the same entity in quick succession. We want to collect all events that arrive within a fixed window and process them together in a single job.
What we'll be doing
We’re using unique jobs with
ByPeriod+ScheduledAtto ensure only one job exists per entity during the collection window. That part works perfectly.The problem is attaching the event data. When the second event arrives and the insert returns
UniqueSkippedAsDuplicate, the new event’s args are discarded. We need that data.Our current workaround is an associated table:
When the worker picks up the job, it joins against
job_eventsto get all collected events.My question:
Is there a recommended pattern within River for this kind of event aggregation? Specifically:
Happy to share more context on our architecture if helpful. Thanks!
(P.D. written in the GitHub mobile app, sorry for the formatting!)