Skip to content

Commit 6dc080e

Browse files
committed
Require dedupe timestamp in job queue payloads + dedupe
1 parent 65d3d29 commit 6dc080e

7 files changed

Lines changed: 232 additions & 67 deletions

File tree

aidocs/JobQueues.md

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,12 @@ Identity model:
7373
- Parse queue bodies of the form:
7474

7575
```text
76-
<uuid> <shell-escaped arguments>
76+
<uuid>:<timestamp> <shell-escaped arguments>
7777
```
7878

79-
- Match only the first UUID-width bytes of the body against configured job
80-
UUIDs.
79+
- Match the UUID prefix against configured job UUIDs.
80+
- Require a 12-15 digit numeric timestamp after the UUID and colon.
81+
- Deduplicate matching queue triggers by `<uuid>:<timestamp>` for 140 seconds.
8182
- Preserve job arguments with spaces by parsing shell-escaped argument text.
8283
- Log an error for malformed or unknown UUID bodies, and log an info event when
8384
a job is triggered from a queue.
@@ -272,9 +273,12 @@ Rules:
272273
- Bodies shorter than 36 bytes are invalid.
273274
- Only bytes `[0:36]` are considered for job selection.
274275
- The UUID prefix must parse as a UUID.
275-
- UUID-only payloads are valid and mean the job has zero arguments.
276-
- If the body has more data, byte 36 must be a single ASCII space.
277-
- Argument text begins at byte 37.
276+
- Byte 36 must be a single ASCII colon.
277+
- A numeric timestamp of 12-15 digits must follow the colon.
278+
- Timestamp-only payloads are valid and mean the job has zero arguments.
279+
- If the body has more data after the timestamp, the next byte must be a single
280+
ASCII space.
281+
- Argument text begins after that space.
278282
- Empty argument text means zero arguments.
279283
- The argument parser tokenizes shell-escaped words without command execution,
280284
variable expansion, globbing, or file access.
@@ -286,13 +290,27 @@ Rules:
286290
Examples:
287291

288292
```text
289-
1104df4c-feeb-43ab-8c85-83663288cea9 alpha two\ words
290-
1104df4c-feeb-43ab-8c85-83663288cea9 'alpha beta' gamma
293+
1104df4c-feeb-43ab-8c85-83663288cea9:17642656976077
294+
1104df4c-feeb-43ab-8c85-83663288cea9:17642656976077 alpha two\ words
295+
1104df4c-feeb-43ab-8c85-83663288cea9:17642656976077 'alpha beta' gamma
291296
```
292297

293298
The existing `resources/gcloud/scripts/queue-job.sh` POC matches this contract
294-
by using `printf "%q "` for each argument and sending the payload as
295-
`text/plain`.
299+
by calculating the timestamp with `echo "$(( $(date +%s%N) / 100000 ))"`,
300+
using `printf "%q "` for each argument, and sending the payload as `text/plain`.
301+
302+
## Queue Trigger Deduplication
303+
304+
The engine records each `<uuid>:<timestamp>` pair for 140 seconds after the
305+
UUID maps to a configured job. During that retention window, another queue
306+
payload with the same UUID and timestamp is acknowledged and discarded before a
307+
second pipeline is started.
308+
309+
Deduplication is intentionally in the engine queue handler, not in queue
310+
providers, so all provider backends share the same behavior. The dedupe key is
311+
stored only in memory and is cleaned opportunistically as new queue messages are
312+
handled. The timestamp is only an idempotency token; it is not interpreted as a
313+
wall-clock freshness check.
296314

297315
## Job Matching And Pipeline Start
298316

@@ -308,15 +326,18 @@ Flow:
308326
2. Parse and validate the queue body.
309327
3. Lookup the normalized UUID in the current enabled-job UUID map.
310328
4. If no job matches, log `Error` and return `QueueAck`.
311-
5. Parse shell-escaped arguments.
312-
6. Validate supplied arguments against the job's `Arguments` matchers:
329+
5. Record the `<uuid>:<timestamp>` dedupe key for 140 seconds.
330+
6. If the dedupe key is already present, log `Info` and return `QueueAck`
331+
without starting another pipeline.
332+
7. Parse shell-escaped arguments.
333+
8. Validate supplied arguments against the job's `Arguments` matchers:
313334
- Too few arguments is an error.
314335
- Existing behavior for extra arguments is preserved.
315336
- Invalid argument values log `Error`.
316337
- Queue triggers never prompt for missing arguments.
317-
7. Log `Info` that job `<name>` was triggered from queue provider `<provider>`.
318-
8. Start the job pipeline with a new pipeline type, `queuedJob`.
319-
9. Return `QueueAck` once the pipeline has been accepted for execution.
338+
9. Log `Info` that job `<name>` was triggered from queue provider `<provider>`.
339+
10. Start the job pipeline with a new pipeline type, `queuedJob`.
340+
11. Return `QueueAck` once the pipeline has been accepted for execution.
320341

321342
The worker should be built like a scheduled job worker:
322343

@@ -390,15 +411,19 @@ Google resource alignment:
390411
`job-triggers` and subscription `job-triggers-pull`.
391412
- `resources/gcloud/scripts/create-job-webhook-resources.sh` deploys a Cloud
392413
Function with a UUID-bearing URL and publisher service account.
393-
- `resources/gcloud/scripts/webhook/index.js` publishes the raw request body.
394-
- `resources/gcloud/scripts/queue-job.sh` sends the body contract expected by
395-
the engine.
414+
- `resources/gcloud/scripts/webhook/index.js` validates the raw request body
415+
has the required UUID, timestamp, and argument separator shape before
416+
publishing it.
417+
- `resources/gcloud/scripts/queue-job.sh` calculates a 14-digit timestamp and
418+
sends the body contract expected by the engine.
396419

397420
## Security Notes
398421

399422
- Queue provider config is engine/provider config, not extension config.
400423
- `UUIDTrigger` values are bearer trigger secrets and should normally be stored
401424
in `conf/variables/<environment>.yaml` under `Secrets`.
425+
- Queue timestamps are idempotency tokens paired with a UUID trigger. They are
426+
not authorization material.
402427
- Logs must never print configured UUIDs or full queue bodies.
403428
- Unknown UUIDs should log only a short diagnostic such as provider name,
404429
provider message ID, and body length.
@@ -414,6 +439,7 @@ Google resource alignment:
414439
First-slice semantics are accepted-trigger acknowledgment:
415440

416441
- Ack after the engine validates the queue body and accepts the job pipeline.
442+
- Ack duplicate UUID/timestamp pairs after discarding the duplicate.
417443
- Ack malformed bodies and unknown UUIDs after logging an error.
418444
- Retry only when the engine is shutting down or temporarily unable to make a
419445
routing decision.
@@ -431,7 +457,8 @@ Implications:
431457
### 1) Change Summary
432458

433459
- Slice name: job queue provider facility.
434-
- Goal: allow remote queues to trigger configured jobs by UUID and arguments.
460+
- Goal: allow remote queues to trigger configured jobs by UUID/timestamp
461+
prefixes and arguments.
435462
- Out of scope: completion-coupled queue retries, user-scheduled jobs, extension
436463
queue APIs, and connector-based queue routing.
437464

@@ -447,7 +474,8 @@ Expected implementation files and directories:
447474
- `bot/config_validate.go`: `conf/queues/*.yaml` validation.
448475
- `bot/tasks.go`: `Job.UUIDTrigger`.
449476
- `bot/taskconf.go`: UUID trigger parsing, validation, duplicate detection.
450-
- `bot/queue_runtime.go`: provider lifecycle and engine queue message handler.
477+
- `bot/queue_runtime.go`: provider lifecycle, queue body parsing, dedupe cache,
478+
and engine queue message handler.
451479
- `bot/bot_process.go`: startup and shutdown placement.
452480
- `bot/run_pipelines.go`, `bot/constants.go`, `bot/pipelinetype_string.go`:
453481
queued-job pipeline source.
@@ -487,6 +515,8 @@ What changes:
487515
- Root config can enable queue providers.
488516
- Jobs can declare `UUIDTrigger`.
489517
- Queue providers poll external systems and submit queue messages to the engine.
518+
- Queue bodies carry a UUID/timestamp prefix, and the engine deduplicates
519+
matching triggers with the same prefix for 140 seconds.
490520
- The engine starts matching jobs from queue messages with a `queuedJob`
491521
pipeline type.
492522

@@ -532,8 +562,8 @@ What does not change:
532562
- Race risks: reload while queue item is being matched; provider shutdown while
533563
a message callback is active; duplicate queue delivery from provider restart.
534564
- Mitigations: immutable config snapshots, provider stop channels, retry during
535-
shutdown, idempotent job design guidance, and unit tests for reload/shutdown
536-
behavior.
565+
shutdown, engine-side UUID/timestamp dedupe, and unit tests for
566+
reload/shutdown behavior.
537567

538568
### 8) Backward Compatibility
539569

@@ -548,10 +578,12 @@ What does not change:
548578
Focused tests:
549579

550580
- Config validation accepts `conf/queues/gcloud.yaml` with `QueueConfig`.
551-
- Root `QueueConfig` is rejected with a migration-style error.
581+
- Root `QueueConfig` is rejected with a configuration-placement error.
552582
- `QueueProviders` parses and normalizes provider names.
553583
- `UUIDTrigger` accepts valid UUIDs and rejects invalid or duplicate values.
554584
- Queue body parsing preserves arguments with spaces.
585+
- Queue body parsing requires a 12-15 digit timestamp after the UUID.
586+
- Duplicate UUID/timestamp pairs are discarded for 140 seconds.
555587
- Unknown UUID logs an error and returns `QueueAck`.
556588
- Shutdown returns `QueueRetry`.
557589
- Queue-triggered jobs start with `automaticTask=true`, expected args, and
@@ -581,8 +613,7 @@ Manual verification:
581613
- `aidocs/SCHEDULER_FLOW.md`: no behavior change, but cross-reference queued
582614
jobs if helpful.
583615
- `aidocs/COMPONENT_MAP.md`: add `queues/`, `conf/queues/`, and this document.
584-
- `aidocs/V3_COMPATIBILITY_CONTRACT.md`: no required change unless config
585-
migration guidance changes.
616+
- `aidocs/V3_COMPATIBILITY_CONTRACT.md`: no required change.
586617
- `conf/README.md`: add `queues/<provider>.yaml`.
587618
- `resources/gcloud/README.md`: document queue resource scripts and curl flow.
588619

aidocs/PIPELINE_LIFECYCLE.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ Catch-all mode scoping:
134134
- Plugin match → `startPipeline(..., plugCommand|plugMessage, ...)`: `bot/dispatch.go:checkPluginMatchersAndRun`, `bot/constants.go` `pipelineType`.
135135
- Job trigger / command → `startPipeline(..., jobTrigger|jobCommand, ...)`: `bot/jobrun.go:checkJobMatchersAndRun`, `bot/constants.go` `pipelineType`.
136136
- Queue trigger → `triggerJobFromQueue(...)` parses the queue body, matches a
137-
job `UUIDTrigger`, and starts `startPipeline(..., queuedJob, ...)` without
138-
entering connector message routing: `bot/queue_runtime.go`.
137+
job `UUIDTrigger`, deduplicates matching UUID/timestamp prefixes, and starts
138+
`startPipeline(..., queuedJob, ...)` without entering connector message
139+
routing: `bot/queue_runtime.go`.
139140
- `startPipeline` now stamps each pipeline with `startedAt`, effective timeout settings, and operator-channel routing metadata before the primary task runs: `bot/run_pipelines.go`, `bot/pipecontext.go`.
140141
- A bounded live log buffer is attached to every pipeline through `newPipelineLiveLogger(...)`: `bot/history.go`, `bot/pipeline_monitoring.go`.
141142
- The live buffer tees normal history logging and keeps recent section markers, `Robot.Log(...)` / `worker.Log(...)`, and child stdout/stderr even when a job/plugin later discards persisted history (`KeepLogs: 0`).

bot/queue_runtime.go

Lines changed: 82 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,24 @@ import (
77
"sort"
88
"strings"
99
"sync"
10+
"time"
1011

1112
shlex "github.com/anmitsu/go-shlex"
1213
"github.com/google/uuid"
1314
"github.com/lnxjedi/gopherbot/robot"
1415
)
1516

1617
const queueUUIDPrefixLen = 36
18+
const queueTimestampMinLen = 12
19+
const queueTimestampMaxLen = 15
20+
21+
const queueDedupeRetention = 140 * time.Second
22+
23+
type parsedQueueBody struct {
24+
jobUUID string
25+
timestamp string
26+
args []string
27+
}
1728

1829
type queueHandler struct {
1930
handler
@@ -38,6 +49,13 @@ var runtimeQueueProviders = struct {
3849
runtimes: map[string]*managedQueueProvider{},
3950
}
4051

52+
var queueDedupe = struct {
53+
sync.Mutex
54+
seen map[string]time.Time
55+
}{
56+
seen: map[string]time.Time{},
57+
}
58+
4159
func (h queueHandler) GetQueueConfig(v interface{}) error {
4260
cfg := getQueueConfigFor(h.provider)
4361
if cfg == nil {
@@ -251,29 +269,68 @@ func reconcileQueueProviderRuntimes(providers []string) {
251269
}
252270
}
253271

254-
func parseQueueBody(body []byte) (string, []string, error) {
272+
func parseQueueBody(body []byte) (parsedQueueBody, error) {
255273
if len(body) < queueUUIDPrefixLen {
256-
return "", nil, fmt.Errorf("queue body too short: %d byte(s)", len(body))
274+
return parsedQueueBody{}, fmt.Errorf("queue body too short: %d byte(s)", len(body))
257275
}
258276
id, err := uuid.ParseBytes(body[:queueUUIDPrefixLen])
259277
if err != nil {
260-
return "", nil, fmt.Errorf("invalid queue UUID prefix: %w", err)
278+
return parsedQueueBody{}, fmt.Errorf("invalid queue UUID prefix: %w", err)
279+
}
280+
if len(body) == queueUUIDPrefixLen || body[queueUUIDPrefixLen] != ':' {
281+
return parsedQueueBody{}, fmt.Errorf("queue UUID prefix is not followed by a colon")
282+
}
283+
284+
timestampStart := queueUUIDPrefixLen + 1
285+
timestampEnd := timestampStart
286+
for timestampEnd < len(body) && body[timestampEnd] >= '0' && body[timestampEnd] <= '9' {
287+
timestampEnd++
261288
}
262-
if len(body) == queueUUIDPrefixLen {
263-
return id.String(), nil, nil
289+
timestampLen := timestampEnd - timestampStart
290+
if timestampLen < queueTimestampMinLen || timestampLen > queueTimestampMaxLen {
291+
return parsedQueueBody{}, fmt.Errorf("queue timestamp must be %d-%d digits", queueTimestampMinLen, queueTimestampMaxLen)
264292
}
265-
if body[queueUUIDPrefixLen] != ' ' {
266-
return "", nil, fmt.Errorf("queue UUID prefix is not followed by a space")
293+
if timestampEnd == len(body) {
294+
return parsedQueueBody{
295+
jobUUID: id.String(),
296+
timestamp: string(body[timestampStart:timestampEnd]),
297+
}, nil
267298
}
268-
argText := strings.TrimSpace(string(body[queueUUIDPrefixLen+1:]))
299+
if body[timestampEnd] != ' ' {
300+
return parsedQueueBody{}, fmt.Errorf("queue timestamp is not followed by a space")
301+
}
302+
argText := strings.TrimSpace(string(body[timestampEnd+1:]))
269303
if argText == "" {
270-
return id.String(), nil, nil
304+
return parsedQueueBody{
305+
jobUUID: id.String(),
306+
timestamp: string(body[timestampStart:timestampEnd]),
307+
}, nil
271308
}
272309
args, err := shlex.Split(argText, true)
273310
if err != nil {
274-
return "", nil, fmt.Errorf("parsing shell-escaped queue arguments: %w", err)
311+
return parsedQueueBody{}, fmt.Errorf("parsing shell-escaped queue arguments: %w", err)
275312
}
276-
return id.String(), args, nil
313+
return parsedQueueBody{
314+
jobUUID: id.String(),
315+
timestamp: string(body[timestampStart:timestampEnd]),
316+
args: args,
317+
}, nil
318+
}
319+
320+
func recordQueueDedupe(jobUUID, timestamp string, now time.Time) bool {
321+
key := jobUUID + ":" + timestamp
322+
queueDedupe.Lock()
323+
defer queueDedupe.Unlock()
324+
for seenKey, expiresAt := range queueDedupe.seen {
325+
if !now.Before(expiresAt) {
326+
delete(queueDedupe.seen, seenKey)
327+
}
328+
}
329+
if expiresAt, ok := queueDedupe.seen[key]; ok && now.Before(expiresAt) {
330+
return true
331+
}
332+
queueDedupe.seen[key] = now.Add(queueDedupeRetention)
333+
return false
277334
}
278335

279336
func triggerJobFromQueue(provider string, msg robot.QueueMessage) robot.QueueDisposition {
@@ -284,7 +341,7 @@ func triggerJobFromQueue(provider string, msg robot.QueueMessage) robot.QueueDis
284341
}
285342
state.RUnlock()
286343

287-
jobUUID, args, err := parseQueueBody(msg.Body)
344+
parsed, err := parseQueueBody(msg.Body)
288345
if err != nil {
289346
Log(robot.Error, "Queue provider '%s' message '%s' rejected: %v (body length %d)", provider, msg.ID, err, len(msg.Body))
290347
return robot.QueueAck
@@ -297,14 +354,22 @@ func triggerJobFromQueue(provider string, msg robot.QueueMessage) robot.QueueDis
297354
if protocol == "" {
298355
protocol = currentCfg.protocol
299356
}
300-
taskItem := tasks.uuidTriggers[jobUUID]
357+
taskItem := tasks.uuidTriggers[parsed.jobUUID]
301358
currentCfg.RUnlock()
302359

303360
if taskItem == nil {
304361
Log(robot.Error, "Queue provider '%s' message '%s' had no matching job UUID (body length %d)", provider, msg.ID, len(msg.Body))
305362
return robot.QueueAck
306363
}
307364
task, _, job := getTask(taskItem)
365+
if recordQueueDedupe(parsed.jobUUID, parsed.timestamp, time.Now()) {
366+
jobName := "<unknown>"
367+
if task != nil {
368+
jobName = task.name
369+
}
370+
Log(robot.Info, "Queue provider '%s' message '%s' discarded duplicate queue trigger for job '%s'", provider, msg.ID, jobName)
371+
return robot.QueueAck
372+
}
308373
if job == nil {
309374
Log(robot.Error, "Queue provider '%s' message '%s' matched non-job task '%s'", provider, msg.ID, task.name)
310375
return robot.QueueAck
@@ -313,12 +378,12 @@ func triggerJobFromQueue(provider string, msg robot.QueueMessage) robot.QueueDis
313378
Log(robot.Error, "Queue provider '%s' message '%s' matched disabled job '%s'", provider, msg.ID, task.name)
314379
return robot.QueueAck
315380
}
316-
if len(args) < len(job.Arguments) {
317-
Log(robot.Error, "Queue provider '%s' message '%s' supplied too few arguments for job '%s': %d required but %d given", provider, msg.ID, task.name, len(job.Arguments), len(args))
381+
if len(parsed.args) < len(job.Arguments) {
382+
Log(robot.Error, "Queue provider '%s' message '%s' supplied too few arguments for job '%s': %d required but %d given", provider, msg.ID, task.name, len(job.Arguments), len(parsed.args))
318383
return robot.QueueAck
319384
}
320385
for i, jobarg := range job.Arguments {
321-
if !jobarg.re.MatchString(args[i]) {
386+
if !jobarg.re.MatchString(parsed.args[i]) {
322387
Log(robot.Error, "Queue provider '%s' message '%s' argument %d for job '%s' did not match configured argument pattern", provider, msg.ID, i+1, task.name)
323388
return robot.QueueAck
324389
}
@@ -336,6 +401,6 @@ func triggerJobFromQueue(provider string, msg robot.QueueMessage) robot.QueueDis
336401
queueProvider: provider,
337402
queueMessageID: msg.ID,
338403
}
339-
go w.startPipeline(nil, taskItem, queuedJob, "run", args...)
404+
go w.startPipeline(nil, taskItem, queuedJob, "run", parsed.args...)
340405
return robot.QueueAck
341406
}

0 commit comments

Comments
 (0)