|
| 1 | +# Pub/Sub Event System — Audit Fix Plan |
| 2 | + |
| 3 | +Findings from 5-agent parallel audit (2026-03-03). Organized by priority. |
| 4 | +Follow [Implementation Process & Guidelines](pubsub-roadmap.md#implementation-process--guidelines) for each phase. |
| 5 | + |
| 6 | +--- |
| 7 | + |
| 8 | +## Phase 10: Audit Fixes — CRITICAL + HIGH |
| 9 | + |
| 10 | +### 10.1 — Fix `expireTtlRuns` Lua: global concurrency slot leak (CRITICAL) |
| 11 | + |
| 12 | +**Found by**: Redis auditor |
| 13 | +**Severity**: CRITICAL — permanent slot leak |
| 14 | +**File**: `internal-packages/run-engine/src/run-queue/index.ts:2633-2726` |
| 15 | + |
| 16 | +**Problem**: The `expireTtlRuns` Lua script removes from `queueCurrentConcurrency`, `queueCurrentDequeued`, `envCurrentConcurrency`, `envCurrentDequeued` but does NOT remove from `globalCurrentConcurrency`. When a run with an ordering key expires via TTL, the global concurrency slot is permanently leaked, eventually starving the entire queue. |
| 17 | + |
| 18 | +**Fix**: |
| 19 | +1. Add `globalCurrentConcurrencyKey` as a new KEYS parameter to the `expireTtlRuns` Lua |
| 20 | +2. Add `redis.call('SREM', globalCurrentConcurrencyKey, messageId)` alongside existing SREMs |
| 21 | +3. Update `numberOfKeys`, type declaration, and caller to pass the key |
| 22 | +4. Update the caller that invokes `expireTtlRuns` to compute and pass `queueGlobalCurrentConcurrencyKeyFromQueue` |
| 23 | + |
| 24 | +**Verify**: Run existing run-engine TTL tests |
| 25 | + |
| 26 | +### 10.2 — Fix `clearMessageFromConcurrencySets` bare queue name (HIGH) |
| 27 | + |
| 28 | +**Found by**: Redis auditor |
| 29 | +**Severity**: HIGH — SREM to wrong key, slot never released |
| 30 | +**File**: `internal-packages/run-engine/src/engine/index.ts:2240-2243` |
| 31 | + |
| 32 | +**Problem**: `clearMessageFromConcurrencySets` is called with `taskRun.queue` which is a bare queue name (e.g. `"my-task"`), not a full Redis key. `queueGlobalCurrentConcurrencyKeyFromQueue()` expects a full key like `{org:X}:proj:Y:env:Z:queue:my-task` and produces a nonsense key from a bare name. |
| 33 | + |
| 34 | +**Fix**: Trace how other callers of similar methods get the full queue key (likely from the `message.queue` field which includes the full path). Ensure `clearMessageFromConcurrencySets` either: |
| 35 | +- Receives the full queue key, or |
| 36 | +- Has access to the env/org/project context to construct it |
| 37 | + |
| 38 | +**Verify**: Check that the same issue exists for the existing per-key `queueCurrentConcurrencyKeyFromQueue` call (it probably does but SREM on a wrong key is a no-op, not a crash). |
| 39 | + |
| 40 | +### 10.3 — Add `.max()` to batch publish items array (HIGH) |
| 41 | + |
| 42 | +**Found by**: Security auditor |
| 43 | +**Severity**: HIGH — potential DoS |
| 44 | +**File**: `packages/core/src/v3/schemas/api.ts` — `BatchPublishEventRequestBody` |
| 45 | + |
| 46 | +**Fix**: Add `.max(100)` (or similar) to the `items` array in `BatchPublishEventRequestBody`. Matches the pattern of existing batch trigger which has limits. |
| 47 | + |
| 48 | +### 10.4 — Fix publishAndWait schema: parentRunId required but options optional (HIGH) |
| 49 | + |
| 50 | +**Found by**: API auditor |
| 51 | +**Severity**: HIGH — schema mismatch causes runtime 400 instead of Zod validation error |
| 52 | +**File**: `packages/core/src/v3/schemas/api.ts:1658-1671` |
| 53 | + |
| 54 | +**Fix**: Either: |
| 55 | +- Make `options` required in `PublishAndWaitEventRequestBody`, or |
| 56 | +- Move `parentRunId` to be a top-level required field outside of `options` |
| 57 | + |
| 58 | +### 10.5 — Fix ClickHouse interval string interpolation (HIGH) |
| 59 | + |
| 60 | +**Found by**: Security auditor |
| 61 | +**Severity**: HIGH — fragile pattern |
| 62 | +**File**: `apps/webapp/app/routes/api.v1.events.$eventId.stats.ts:54` |
| 63 | + |
| 64 | +**Fix**: Use parameterized query or keep the whitelist validation but use a safer pattern (map from allowed period to interval string rather than interpolating user input). |
| 65 | + |
| 66 | +### 10.6 — Add missing index for pattern subscription query (HIGH) |
| 67 | + |
| 68 | +**Found by**: DB auditor |
| 69 | +**Severity**: HIGH — full table scan on every publish |
| 70 | +**File**: `internal-packages/database/prisma/schema.prisma` — `EventSubscription` |
| 71 | + |
| 72 | +**Fix**: |
| 73 | +1. Add `@@index([projectId, environmentId, enabled])` to EventSubscription model |
| 74 | +2. Create migration with `CREATE INDEX CONCURRENTLY` in its own file |
| 75 | +3. Run `pnpm run db:migrate:deploy && pnpm run generate` |
| 76 | + |
| 77 | +### 10.7 — Fix batch publish partial failure semantics (HIGH) |
| 78 | + |
| 79 | +**Found by**: API auditor |
| 80 | +**Severity**: HIGH — client can't determine which items succeeded |
| 81 | +**File**: `apps/webapp/app/routes/api.v1.events.$eventId.batchPublish.ts:40-57` |
| 82 | + |
| 83 | +**Fix**: Two options: |
| 84 | +- **Option A**: Validate ALL items upfront before triggering any (current approach fails mid-batch) |
| 85 | +- **Option B**: Return partial results with per-item status (more complex but more resilient) |
| 86 | + |
| 87 | +Recommend Option A — validate schema + rate limits for all items first, then trigger. |
| 88 | + |
| 89 | +--- |
| 90 | + |
| 91 | +## Phase 11: Audit Fixes — MEDIUM |
| 92 | + |
| 93 | +### 11.1 — Fix N+1 in DLQ retryAll |
| 94 | + |
| 95 | +**File**: `apps/webapp/app/v3/services/events/deadLetterManagement.server.ts:126-148` |
| 96 | +**Fix**: Remove redundant re-fetch in `retry()` when called from `retryAll()`, or batch the operations. |
| 97 | + |
| 98 | +### 11.2 — Add payload size check before fan-out |
| 99 | + |
| 100 | +**File**: `apps/webapp/app/v3/services/events/publishEvent.server.ts` |
| 101 | +**Fix**: Check payload byte size before triggering subscribers. Return 413 if over limit and object store is not configured. |
| 102 | + |
| 103 | +### 11.3 — Fix inconsistent error handling in routes |
| 104 | + |
| 105 | +**Files**: `api.v1.events.dlq.retry-all.ts`, `api.v1.events.ts` |
| 106 | +**Fix**: Add try/catch with ServiceValidationError handling, matching other routes. |
| 107 | + |
| 108 | +### 11.4 — Add CLI publish options support |
| 109 | + |
| 110 | +**File**: `packages/cli-v3/src/commands/events/publish.ts` |
| 111 | +**Fix**: Add `--delay`, `--tags`, `--idempotency-key`, `--ordering-key` options. |
| 112 | + |
| 113 | +### 11.5 — Fix schema validation silent pass on compilation error |
| 114 | + |
| 115 | +**File**: `apps/webapp/app/v3/services/events/schemaRegistry.server.ts:198-201` |
| 116 | +**Fix**: Log a warning when ajv compilation fails, and optionally reject the publish. |
| 117 | + |
| 118 | +### 11.6 — Add stale subscription cleanup |
| 119 | + |
| 120 | +**File**: `apps/webapp/app/v3/services/events/publishEvent.server.ts` |
| 121 | +**Fix**: When a subscriber trigger fails consistently, log a warning and optionally disable the subscription after N consecutive failures. |
| 122 | + |
| 123 | +### 11.7 — Add data cleanup mechanism |
| 124 | + |
| 125 | +**Fix**: Add a periodic cleanup job (or TTL-based approach) for: |
| 126 | +- Disabled EventSubscriptions older than 30 days |
| 127 | +- Processed DeadLetterEvents (RETRIED/DISCARDED) older than 30 days |
| 128 | +- Deprecated EventDefinitions with no active subscriptions |
| 129 | + |
| 130 | +--- |
| 131 | + |
| 132 | +## Phase 12: Test Coverage |
| 133 | + |
| 134 | +### 12.1 — Tests for ReplayEventsService |
| 135 | + |
| 136 | +**File**: `apps/webapp/test/engine/replayEvents.test.ts` (new) |
| 137 | +**Tests**: |
| 138 | +- Replay with date range filter |
| 139 | +- Replay with task filter |
| 140 | +- Replay dry run (count only) |
| 141 | +- Replay with idempotency (no duplicate triggers) |
| 142 | +- Replay when ClickHouse is unavailable (graceful error) |
| 143 | + |
| 144 | +Note: These require ClickHouse in testcontainers or mocking. |
| 145 | + |
| 146 | +### 12.2 — Tests for DeadLetterService |
| 147 | + |
| 148 | +**File**: `apps/webapp/test/engine/deadLetterService.test.ts` (new) |
| 149 | +**Tests**: |
| 150 | +- Failed event-triggered run creates DLQ entry |
| 151 | +- Non-event run does NOT create DLQ entry |
| 152 | +- DLQ entry has correct eventType, payload, error |
| 153 | +- Multiple failures create separate DLQ entries |
| 154 | + |
| 155 | +### 12.3 — Tests for DeadLetterManagementService |
| 156 | + |
| 157 | +**File**: `apps/webapp/test/engine/deadLetterManagement.test.ts` (new) |
| 158 | +**Tests**: |
| 159 | +- List DLQ entries with pagination |
| 160 | +- List with eventType filter |
| 161 | +- List with status filter |
| 162 | +- Retry creates new run with correct payload |
| 163 | +- Retry marks DLQ entry as RETRIED |
| 164 | +- Discard marks entry as DISCARDED |
| 165 | +- RetryAll processes up to 1000 items |
| 166 | +- Retry/discard nonexistent ID returns error |
| 167 | + |
| 168 | +### 12.4 — Tests for RedisEventRateLimitChecker |
| 169 | + |
| 170 | +**File**: `apps/webapp/test/engine/eventRateLimiter.test.ts` (extend) |
| 171 | +**Tests**: |
| 172 | +- Redis checker allows under limit |
| 173 | +- Redis checker blocks over limit |
| 174 | +- Redis checker returns correct remaining/retryAfter |
| 175 | +- Different configs get separate Ratelimit instances |
| 176 | + |
| 177 | +Note: Requires Redis in testcontainers. |
| 178 | + |
| 179 | +### 12.5 — Tests for SchemaRegistryService.checkCompatibility |
| 180 | + |
| 181 | +**File**: extend existing SchemaRegistryService tests |
| 182 | +**Tests**: |
| 183 | +- Compatible schema change (add optional field) |
| 184 | +- Incompatible change (remove required field) |
| 185 | +- Incompatible change (change field type) |
| 186 | + |
| 187 | +--- |
| 188 | + |
| 189 | +## Phase 13: LOW Priority Fixes |
| 190 | + |
| 191 | +### 13.1 — Add LRU bounds to caches |
| 192 | +- `validatorCache` in SchemaRegistryService: max 1000 entries |
| 193 | +- `patternCache`/`filterCache` in core evaluators: max 1000 entries |
| 194 | +- `InMemoryEventRateLimitChecker.windows`: evict entries older than 2x window |
| 195 | + |
| 196 | +### 13.2 — Tighten Zod schemas |
| 197 | +- `payload: z.any()` → `payload: z.unknown()` |
| 198 | +- `metadata: z.any()` → `metadata: z.record(z.unknown())` |
| 199 | +- Add `.max(256)` to idempotencyKey |
| 200 | +- Add DLQ status validation with Zod instead of `as` cast |
| 201 | + |
| 202 | +### 13.3 — Remove dead code |
| 203 | +- Unused `compileFilter`/`evaluateFilter` exports from core filterEvaluator |
| 204 | + |
| 205 | +### 13.4 — Fix batchPublish URL naming |
| 206 | +- Current: `/api/v1/events/:id/batchPublish` (camelCase) |
| 207 | +- Consider: `/api/v1/events/:id/batch-publish` or keep for consistency |
| 208 | + |
| 209 | +--- |
| 210 | + |
| 211 | +## Execution Order |
| 212 | + |
| 213 | +``` |
| 214 | +Phase 10 (CRITICAL+HIGH) → Phase 12 (Tests) → Phase 11 (MEDIUM) → Phase 13 (LOW) |
| 215 | +``` |
| 216 | + |
| 217 | +Phase 10 first because it contains a CRITICAL bug (permanent slot leak). |
| 218 | +Phase 12 second because tests validate the fixes and catch regressions. |
| 219 | +Phase 11 and 13 are improvements, not blockers. |
| 220 | + |
| 221 | +## Verification per phase |
| 222 | + |
| 223 | +Same as roadmap guidelines: |
| 224 | +1. `pnpm run build --filter @internal/run-engine --filter webapp --filter @trigger.dev/core --filter @trigger.dev/sdk` |
| 225 | +2. `cd internal-packages/run-engine && pnpm run test --run` (run-engine: 236+ must pass) |
| 226 | +3. `cd apps/webapp && pnpm run test ./test/engine/publishEvent.test.ts --run` (24+ must pass) |
| 227 | +4. `cd apps/webapp && pnpm run test ./test/engine/eventRateLimiter.test.ts --run` (11+ must pass) |
| 228 | +5. New test files must pass |
| 229 | +6. Commit after each sub-step: `feat(events): phase X.Y — <description>` |
0 commit comments