|
| 1 | +# Design: Temporal Signals & Task Versioning (API/SDK) |
| 2 | + |
| 3 | +## Status: Draft / Proposal |
| 4 | + |
| 5 | +## Problem |
| 6 | + |
| 7 | +When a task definition changes between deploys, in-flight runs can encounter mismatches: |
| 8 | + |
| 9 | +1. **Input streams** - A sender calls `.send()` with data shaped for the new schema, but the running task was deployed with the old schema and has different `.on()` / `.once()` handlers. |
| 10 | +2. **Wait tokens** - A token created by version N is completed with data shaped for version N+1 (or vice versa), causing runtime deserialization failures or silent data loss. |
| 11 | +3. **Trigger payloads** - A run was triggered against version N's schema, but by the time it executes the worker is running version N+1. |
| 12 | + |
| 13 | +Today there is no mechanism for the platform to detect or prevent these mismatches. This design proposes **version-aware delivery** for input streams and wait tokens so that senders can target or adapt to the version a run is actually executing. |
| 14 | + |
| 15 | +--- |
| 16 | + |
| 17 | +## Goals |
| 18 | + |
| 19 | +- Expose the **version of the running task** to any code that sends data to it (input streams, wait token completion). |
| 20 | +- Let senders **opt in** to version-aware delivery: either target a specific version or receive the version and branch on it. |
| 21 | +- Remain **fully backwards-compatible** — existing code that ignores versioning continues to work unchanged. |
| 22 | +- Keep the **SDK surface small** — no new top-level concepts; extend existing `streams.input` and `wait` APIs. |
| 23 | + |
| 24 | +## Non-goals (for this document) |
| 25 | + |
| 26 | +- Database schema changes (separate design). |
| 27 | +- Automatic schema migration / coercion at the platform level. |
| 28 | +- Breaking changes to any public API. |
| 29 | + |
| 30 | +--- |
| 31 | + |
| 32 | +## Design |
| 33 | + |
| 34 | +### 1. Version Metadata on Runs |
| 35 | + |
| 36 | +Every task run already carries metadata about which deployment created it. We surface a **version identifier** (the deployment version string, e.g. `"20240815.1"`) in two places: |
| 37 | + |
| 38 | +| Surface | How | |
| 39 | +|---------|-----| |
| 40 | +| **Run object returned by `.trigger()` / `.batchTrigger()`** | Add `version: string` to the returned handle | |
| 41 | +| **`wait.retrieveToken()` / `wait.listTokens()`** | Add `runVersion?: string` to each token item — the version of the run that is waiting on this token | |
| 42 | + |
| 43 | +This lets external callers discover what version a run is executing before they send it data. |
| 44 | + |
| 45 | +### 2. Input Streams — Version-Aware `.send()` |
| 46 | + |
| 47 | +#### Current API |
| 48 | + |
| 49 | +```ts |
| 50 | +const approval = streams.input<ApprovalData>({ id: "approval" }); |
| 51 | + |
| 52 | +// sender side |
| 53 | +await approval.send(runId, { approved: true, reviewer: "alice" }); |
| 54 | +``` |
| 55 | + |
| 56 | +#### Proposed Addition |
| 57 | + |
| 58 | +```ts |
| 59 | +await approval.send(runId, data, { |
| 60 | + // New optional field: |
| 61 | + ifVersion?: string | ((version: string) => boolean); |
| 62 | +}); |
| 63 | +``` |
| 64 | + |
| 65 | +**Semantics:** |
| 66 | + |
| 67 | +| `ifVersion` value | Behavior | |
| 68 | +|---|---| |
| 69 | +| _omitted_ | Send unconditionally (current behavior, fully backwards-compatible) | |
| 70 | +| `"20240815.1"` (string) | Send only if the run's task version matches exactly; otherwise reject with `VersionMismatchError` | |
| 71 | +| `(v) => v >= "20240815.1"` (predicate) | Send only if predicate returns `true` for the run's version; otherwise reject with `VersionMismatchError` | |
| 72 | + |
| 73 | +This keeps `.send()` simple for callers who don't care about versioning while giving precise control to those who do. |
| 74 | + |
| 75 | +#### Alternative: Version-Returning `.send()` |
| 76 | + |
| 77 | +Instead of (or in addition to) guarding, `.send()` could return metadata: |
| 78 | + |
| 79 | +```ts |
| 80 | +const result = await approval.send(runId, data); |
| 81 | +// result.runVersion === "20240815.1" |
| 82 | +``` |
| 83 | + |
| 84 | +This lets the caller inspect the version after the fact. Useful for logging/observability but doesn't prevent mismatched data from being delivered. |
| 85 | + |
| 86 | +**Recommendation:** Support both — the return value always includes `runVersion`, and the optional `ifVersion` guard prevents delivery on mismatch. |
| 87 | + |
| 88 | +### 3. Wait Tokens — Version-Aware `.completeToken()` |
| 89 | + |
| 90 | +#### Current API |
| 91 | + |
| 92 | +```ts |
| 93 | +await wait.completeToken(tokenId, { status: "done" }); |
| 94 | +``` |
| 95 | + |
| 96 | +#### Proposed Addition |
| 97 | + |
| 98 | +```ts |
| 99 | +await wait.completeToken(tokenId, data, { |
| 100 | + // New optional field: |
| 101 | + ifVersion?: string | ((version: string) => boolean); |
| 102 | +}); |
| 103 | +``` |
| 104 | + |
| 105 | +Same semantics as input streams above. The platform checks the version of the run that owns the waitpoint before delivering the completion. |
| 106 | + |
| 107 | +Additionally, `wait.createToken()` response already includes a `url` for webhook-based completion. The webhook endpoint should accept an optional `X-Trigger-If-Version` header with the same guard semantics, returning `409 Conflict` on mismatch. |
| 108 | + |
| 109 | +### 4. Extracting Version — `runs.retrieve()` Enhancement |
| 110 | + |
| 111 | +To support the "check then act" pattern, `runs.retrieve()` should include the version: |
| 112 | + |
| 113 | +```ts |
| 114 | +const run = await runs.retrieve(runId); |
| 115 | +// run.version === "20240815.1" |
| 116 | +// run.taskIdentifier === "my-task" |
| 117 | +``` |
| 118 | + |
| 119 | +This field already exists internally on the `TaskRun` model via the associated `BackgroundWorkerTask` → `BackgroundWorker` → `version`. We just need to surface it in the API response. |
| 120 | + |
| 121 | +### 5. Version Inside the Running Task |
| 122 | + |
| 123 | +Task code can already access `ctx.run` metadata. We add: |
| 124 | + |
| 125 | +```ts |
| 126 | +export const myTask = task({ |
| 127 | + id: "my-task", |
| 128 | + run: async (payload, { ctx }) => { |
| 129 | + console.log(ctx.deployment.version); // "20240815.1" |
| 130 | + }, |
| 131 | +}); |
| 132 | +``` |
| 133 | + |
| 134 | +This lets `.on()` handlers inside a task know their own version (useful for logging or conditional logic): |
| 135 | + |
| 136 | +```ts |
| 137 | +approval.on((data) => { |
| 138 | + logger.info("Received approval", { |
| 139 | + taskVersion: ctx.deployment.version, |
| 140 | + }); |
| 141 | +}); |
| 142 | +``` |
| 143 | + |
| 144 | +### 6. `VersionMismatchError` |
| 145 | + |
| 146 | +A new typed error for version guard failures: |
| 147 | + |
| 148 | +```ts |
| 149 | +import { VersionMismatchError } from "@trigger.dev/sdk"; |
| 150 | + |
| 151 | +try { |
| 152 | + await approval.send(runId, data, { ifVersion: "20240815.1" }); |
| 153 | +} catch (err) { |
| 154 | + if (err instanceof VersionMismatchError) { |
| 155 | + console.log(err.expectedVersion); // "20240815.1" |
| 156 | + console.log(err.actualVersion); // "20240816.3" |
| 157 | + console.log(err.runId); // "run_abc123" |
| 158 | + // Decide: retry with adapted payload, skip, alert, etc. |
| 159 | + } |
| 160 | +} |
| 161 | +``` |
| 162 | + |
| 163 | +```ts |
| 164 | +class VersionMismatchError extends Error { |
| 165 | + name = "VersionMismatchError"; |
| 166 | + constructor( |
| 167 | + public readonly runId: string, |
| 168 | + public readonly expectedVersion: string, |
| 169 | + public readonly actualVersion: string, |
| 170 | + ) { |
| 171 | + super( |
| 172 | + `Version mismatch for run ${runId}: expected ${expectedVersion}, got ${actualVersion}` |
| 173 | + ); |
| 174 | + } |
| 175 | +} |
| 176 | +``` |
| 177 | + |
| 178 | +--- |
| 179 | + |
| 180 | +## API Surface Summary |
| 181 | + |
| 182 | +### New Fields on Existing Types |
| 183 | + |
| 184 | +| Type | New Field | Description | |
| 185 | +|------|-----------|-------------| |
| 186 | +| `TriggerResult` (from `.trigger()`) | `version: string` | Deployment version of the triggered run | |
| 187 | +| `RetrievedRun` (from `runs.retrieve()`) | `version: string` | Deployment version | |
| 188 | +| `WaitpointRetrievedToken` | `runVersion?: string` | Version of the run waiting on this token | |
| 189 | +| `TaskRunContext` (`ctx`) | `deployment.version: string` | Version of the current deployment | |
| 190 | + |
| 191 | +### New Options on Existing Methods |
| 192 | + |
| 193 | +| Method | New Option | Type | |
| 194 | +|--------|-----------|------| |
| 195 | +| `streams.input().send()` | `ifVersion` | `string \| ((version: string) => boolean)` | |
| 196 | +| `wait.completeToken()` | `ifVersion` | `string \| ((version: string) => boolean)` | |
| 197 | + |
| 198 | +### New Return Fields on Existing Methods |
| 199 | + |
| 200 | +| Method | New Return Field | Type | |
| 201 | +|--------|-----------------|------| |
| 202 | +| `streams.input().send()` | `runVersion` | `string` | |
| 203 | +| `wait.completeToken()` | `runVersion` | `string` | |
| 204 | + |
| 205 | +### New Types |
| 206 | + |
| 207 | +| Type | Location | |
| 208 | +|------|----------| |
| 209 | +| `VersionMismatchError` | `@trigger.dev/sdk` | |
| 210 | + |
| 211 | +--- |
| 212 | + |
| 213 | +## Usage Examples |
| 214 | + |
| 215 | +### Example 1: Guard Input Stream Delivery |
| 216 | + |
| 217 | +```ts |
| 218 | +import { streams, VersionMismatchError } from "@trigger.dev/sdk"; |
| 219 | + |
| 220 | +const chatInput = streams.input<{ message: string }>({ id: "chat" }); |
| 221 | + |
| 222 | +// Only deliver if the run is on the expected version |
| 223 | +try { |
| 224 | + await chatInput.send(runId, { message: "hello" }, { |
| 225 | + ifVersion: deployedVersion, |
| 226 | + }); |
| 227 | +} catch (err) { |
| 228 | + if (err instanceof VersionMismatchError) { |
| 229 | + // Run is on a different version — handle gracefully |
| 230 | + console.warn(`Skipping: run is on ${err.actualVersion}`); |
| 231 | + } |
| 232 | +} |
| 233 | +``` |
| 234 | + |
| 235 | +### Example 2: Adaptive Token Completion |
| 236 | + |
| 237 | +```ts |
| 238 | +import { wait, runs } from "@trigger.dev/sdk"; |
| 239 | + |
| 240 | +// Check the run's version, send version-appropriate data |
| 241 | +const run = await runs.retrieve(runId); |
| 242 | + |
| 243 | +if (run.version >= "20240815.1") { |
| 244 | + await wait.completeToken(tokenId, { status: "done", metadata: { v2: true } }); |
| 245 | +} else { |
| 246 | + await wait.completeToken(tokenId, { status: "done" }); // legacy shape |
| 247 | +} |
| 248 | +``` |
| 249 | + |
| 250 | +### Example 3: Version Predicate |
| 251 | + |
| 252 | +```ts |
| 253 | +await approval.send(runId, data, { |
| 254 | + ifVersion: (v) => v.startsWith("2024"), |
| 255 | +}); |
| 256 | +``` |
| 257 | + |
| 258 | +--- |
| 259 | + |
| 260 | +## Migration & Backwards Compatibility |
| 261 | + |
| 262 | +- **All new fields are optional / additive** — no existing call signatures change. |
| 263 | +- **`ifVersion` defaults to `undefined`** — omitting it preserves current unconditional behavior. |
| 264 | +- **Return type changes are additive** — new fields on response objects don't break existing destructuring. |
| 265 | +- **No new required configuration** — versioning is opt-in for senders. |
| 266 | + |
| 267 | +--- |
| 268 | + |
| 269 | +## Open Questions |
| 270 | + |
| 271 | +1. **Version format** — Should we use the deployment version string as-is (e.g. `"20240815.1"`), or introduce a monotonically increasing integer version? Strings are human-readable but harder to compare with `>=`. Integers are easy to compare but less meaningful. |
| 272 | + |
| 273 | +2. **Predicate serialization** — The `ifVersion: (v) => boolean` form only works in SDK calls (can't be sent over HTTP). For the webhook/REST path, should we support a simple comparison DSL (e.g. `">=20240815.1"`) or only exact match strings? |
| 274 | + |
| 275 | +3. **Batch operations** — For `batchTrigger()` where runs may land on different versions, should we return per-item version info? |
| 276 | + |
| 277 | +4. **Event-driven alternative** — Instead of (or in addition to) guards, should we emit a `version.mismatch` event/hook that middleware can intercept? |
0 commit comments