diff --git a/.github/workflows/conventions.yml b/.github/workflows/conventions.yml index 9743a883ba..d7ed482e9d 100644 --- a/.github/workflows/conventions.yml +++ b/.github/workflows/conventions.yml @@ -51,5 +51,13 @@ jobs: - name: Compile all non-rust code run: pnpm --recursive --stream --filter '!@temporalio/core-bridge' --filter '!typescript-sdk' run build + - name: Check generated payload visitor is up to date + run: | + pnpm run gen:payload-visitor + if ! git diff --exit-code -- packages/proto/src/payload-visitor.generated.ts; then + echo "::error::payload-visitor.generated.ts is out of date with the protos. Run 'pnpm run gen:payload-visitor' and commit the result." + exit 1 + fi + - run: pnpm run lint:check - run: pnpm run lint:prune diff --git a/eslint.config.mjs b/eslint.config.mjs index 8b89ca8558..ae68f4d9e9 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -4,7 +4,7 @@ import importPlugin from 'eslint-plugin-import'; import prettierConfig from 'eslint-config-prettier'; export default tseslint.config( - { ignores: ['**/node_modules/**', '**/lib/**', '**/*.js', '**/*.mjs', '**/*.cjs'] }, + { ignores: ['**/node_modules/**', '**/lib/**', '**/*.js', '**/*.mjs', '**/*.cjs', '**/*.generated.ts'] }, { files: ['packages/*/src/**/*.ts', 'contrib/*/src/**/*.ts'], extends: [js.configs.recommended, ...tseslint.configs.recommended, prettierConfig], diff --git a/package.json b/package.json index 40643e14b4..509a9972a3 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,9 @@ "scripts": { "rebuild": "pnpm run clean && pnpm run build", "build": "pnpm --recursive --stream run build", - "build:watch": "pnpm run build:protos && tsc --build --watch packages/*/tsconfig.json", + "build:watch": "pnpm run build:protos && pnpm run gen:payload-visitor && tsc --build --watch packages/*/tsconfig.json", "build:protos": "tsx ./packages/proto/scripts/compile-proto.ts", + "gen:payload-visitor": "pnpm --filter @temporalio/proto run gen:payload-visitor", "test": "pnpm --recursive --parallel --workspace-concurrency=1 --stream run test", "test:watch": "pnpm --recursive --stream run test:watch", "ci-stress": "node ./packages/test/lib/load/run-all-stress-ci-scenarios.js", diff --git a/packages/common/src/__tests__/test-payload-visitor.ts b/packages/common/src/__tests__/test-payload-visitor.ts new file mode 100644 index 0000000000..56962f1734 --- /dev/null +++ b/packages/common/src/__tests__/test-payload-visitor.ts @@ -0,0 +1,411 @@ +import test from 'ava'; +import { coresdk } from '@temporalio/proto'; +import type { Payload } from '../interfaces'; +import { + boundTransform, + drain, + visitWorkflowActivation, + visitWorkflowActivationCompletion, + type PayloadTransform, +} from '../internal-non-workflow/payload-visitor'; + +// Lets the event loop run every async step that is currently ready: `setImmediate` fires only after +// the microtask queue drains, so one `await tick()` advances all pending transforms to their next +// `await`. Deterministic alternative to setTimeout. +const tick = (): Promise => new Promise((resolve) => setImmediate(resolve)); + +// A one-shot gate. A transform `await`s `gate.wait()` to park mid-run; the test calls `gate.open()` +// to let every parked transform continue at once. This holds work in flight on demand so a test can +// observe how many transforms ran concurrently, without relying on timing. +const createGate = (): { wait: () => Promise; open: () => void } => { + let open!: () => void; + const opened = new Promise((resolve) => { + open = resolve; + }); + return { wait: () => opened, open }; +}; + +// Never resolves; rejects only when the signal aborts. Lets a transform "run until cancelled" +// without a timer that would hang the test if the abort were not wired up. +const untilAborted = (signal: AbortSignal): Promise => + new Promise((_resolve, reject) => { + signal.addEventListener('abort', () => reject(signal.reason), { once: true }); + }); + +const payload = (data: string): Payload => ({ data: new TextEncoder().encode(data) }); +const read = (p: Payload): string => new TextDecoder().decode(p.data ?? new Uint8Array()); + +test('boundTransform caps the number of in-flight calls', async (t) => { + let active = 0; + let maxActive = 0; + const gate = createGate(); + const bounded = boundTransform( + async (payloads) => { + active += 1; + maxActive = Math.max(maxActive, active); + await gate.wait(); + active -= 1; + return payloads; + }, + 2, + new AbortController() + ); + + const calls = Array.from({ length: 6 }, () => bounded([payload('x')])); + await tick(); // let every call that can acquire a permit start + t.is(maxActive, 2); + + gate.open(); + await drain(calls); + t.is(maxActive, 2, 'never exceeded the limit as the queued calls drained'); +}); + +test('boundTransform with concurrency 1 runs calls one at a time, in launch order', async (t) => { + const order: string[] = []; + let active = 0; + let maxActive = 0; + const bounded = boundTransform( + async (payloads) => { + active += 1; + maxActive = Math.max(maxActive, active); + await tick(); // a broken limit would let a sibling interleave during this yield + order.push(read(payloads[0]!)); + active -= 1; + return payloads; + }, + 1, + new AbortController() + ); + + await drain(['a', 'b', 'c'].map((tag) => bounded([payload(tag)]))); + t.is(maxActive, 1); + t.deepEqual(order, ['a', 'b', 'c']); +}); + +test('drain applies each writeback once its transform resolves', async (t) => { + const message: { result: Payload; arguments: Payload[] } = { + result: payload('r'), + arguments: [payload('x'), payload('y')], + }; + const bounded = boundTransform( + async (payloads) => payloads.map((p) => payload(`${read(p)}!`)), + 4, + new AbortController() + ); + + await drain([ + bounded([message.result]).then(([replacement]) => { + message.result = replacement!; + }), + bounded(message.arguments).then((replacements) => { + message.arguments = replacements; + }), + ]); + + t.is(read(message.result), 'r!'); + t.deepEqual(message.arguments.map(read), ['x!', 'y!']); +}); + +test('first error is surfaced and not-yet-started transforms are skipped', async (t) => { + const started: string[] = []; + const bounded = boundTransform( + async (payloads) => { + const tag = read(payloads[0]!); + started.push(tag); + await tick(); + if (tag === 'boom') throw new Error('boom'); + return payloads; + }, + 1, + new AbortController() + ); + + const error = await t.throwsAsync(drain(['ok', 'boom', 'never'].map((tag) => bounded([payload(tag)])))); + t.is(error?.message, 'boom'); + t.deepEqual(started, ['ok', 'boom'], '"never" is skipped after the failure'); +}); + +test('a failure aborts in-flight siblings instead of waiting them out', async (t) => { + const settled: string[] = []; + const bounded = boundTransform( + async (payloads, signal) => { + const tag = read(payloads[0]!); + if (tag === 'boom') { + await tick(); + throw new Error('boom'); + } + try { + await untilAborted(signal!); + settled.push(tag); + return payloads; + } catch (reason) { + settled.push(`${tag}:aborted`); + throw reason; + } + }, + 3, + new AbortController() + ); + + const error = await t.throwsAsync(drain(['slow1', 'boom', 'slow2'].map((tag) => bounded([payload(tag)])))); + t.is(error?.message, 'boom'); + t.deepEqual(settled.sort(), ['slow1:aborted', 'slow2:aborted']); +}); + +const completionWith = (...results: string[]): coresdk.workflow_completion.IWorkflowActivationCompletion => ({ + successful: { + commands: results.map((r) => ({ completeWorkflowExecution: { result: payload(r) } })), + }, +}); + +test('visitWorkflowActivationCompletion with an already-aborted signal skips the walk', async (t) => { + let calls = 0; + const transform: PayloadTransform = async (payloads) => { + calls += 1; + return payloads; + }; + + const error = await t.throwsAsync( + visitWorkflowActivationCompletion(completionWith('a', 'b'), transform, { + abortSignal: AbortSignal.abort(new Error('stop')), + }) + ); + t.is(error?.message, 'stop'); + t.is(calls, 0); +}); + +// An activation exercising every payload-bearing site shape: repeated lists, singular fields, +// the three map sites (headers / memo / search attributes), a `Payloads`-wrapped field, a oneof, +// and a Failure with a multi-level cause chain. +const activationWithEveryPayloadSite = (): coresdk.workflow_activation.IWorkflowActivation => ({ + jobs: [ + { + initializeWorkflow: { + arguments: [payload('arg0'), payload('arg1')], + headers: { h: payload('hdr') }, + lastCompletionResult: { payloads: [payload('lcr0'), payload('lcr1')] }, + memo: { fields: { m: payload('memo') } }, + searchAttributes: { indexedFields: { s: payload('sa') } }, + continuedFailure: { + encodedAttributes: payload('cf0'), + cause: { encodedAttributes: payload('cf1'), cause: { encodedAttributes: payload('cf2') } }, + }, + }, + }, + { resolveActivity: { result: { completed: { result: payload('act') } } } }, + { + resolveActivity: { + result: { + failed: { + failure: { + encodedAttributes: payload('fenc'), + applicationFailureInfo: { details: { payloads: [payload('afi0'), payload('afi1')] } }, + }, + }, + }, + }, + }, + { signalWorkflow: { input: [payload('sig')], headers: { sh: payload('shdr') } } }, + ], +}); + +const EVERY_SITE_PAYLOAD = [ + 'arg0', + 'arg1', + 'hdr', + 'lcr0', + 'lcr1', + 'memo', + 'sa', + 'cf0', + 'cf1', + 'cf2', + 'act', + 'fenc', + 'afi0', + 'afi1', + 'sig', + 'shdr', +]; + +test('visits every payload site exactly once and writes back in place', async (t) => { + const activation = activationWithEveryPayloadSite(); + + const seen: string[] = []; + await visitWorkflowActivation( + activation, + async (payloads) => { + payloads.forEach((p) => seen.push(read(p))); + return payloads.map((p) => payload(`${read(p)}#`)); + }, + { concurrency: 3 } + ); + t.deepEqual(seen.slice().sort(), EVERY_SITE_PAYLOAD.slice().sort(), 'sees every site exactly once'); + + // A second walk should observe the first walk's rewrites, proving writeback landed in place + // at every site (including deep in the cause chain and inside maps). + const seenAgain: string[] = []; + await visitWorkflowActivation(activation, async (payloads) => { + payloads.forEach((p) => seenAgain.push(read(p))); + return payloads; + }); + t.deepEqual( + seenAgain.slice().sort(), + EVERY_SITE_PAYLOAD.map((s) => `${s}#`).sort(), + 'in-place rewrites from the first walk are visible to the second' + ); +}); + +test('an activation with no payloads makes no transform calls and resolves', async (t) => { + let calls = 0; + const transform: PayloadTransform = async (payloads) => { + calls += 1; + return payloads; + }; + + await visitWorkflowActivation({ jobs: [] }, transform); + await visitWorkflowActivation({}, transform); // no jobs field at all + await visitWorkflowActivation( + { + // Empty repeated / empty map / a non-payload job variant — none should call the transform. + jobs: [{ initializeWorkflow: { arguments: [], headers: {} } }, { fireTimer: {} }], + }, + transform + ); + + t.is(calls, 0); +}); + +test('a transform that breaks singular cardinality fails loudly', async (t) => { + const singular = (): coresdk.workflow_activation.IWorkflowActivation => ({ + jobs: [{ resolveActivity: { result: { completed: { result: payload('x') } } } }], + }); + + await t.throwsAsync( + visitWorkflowActivation(singular(), async () => []), + { message: /expected 1/ }, + 'returning zero payloads for a singular site throws' + ); + await t.throwsAsync( + visitWorkflowActivation(singular(), async (payloads) => [...payloads, payload('extra')]), + { message: /expected 1/ }, + 'returning two payloads for a singular site throws' + ); +}); + +test('a transform that changes a repeated field length fails loudly', async (t) => { + const withArgs = (): coresdk.workflow_activation.IWorkflowActivation => ({ + jobs: [{ signalWorkflow: { input: [payload('a'), payload('b')] } }], + }); + + await t.throwsAsync( + visitWorkflowActivation(withArgs(), async (payloads) => payloads.slice(1)), + { message: /expected 2/ }, + 'dropping a payload from a repeated site throws' + ); + await t.throwsAsync( + visitWorkflowActivation(withArgs(), async (payloads) => [...payloads, payload('c')]), + { message: /expected 2/ }, + 'adding a payload to a repeated site throws' + ); +}); + +test('visits completion command payloads: user metadata and a rejected update', async (t) => { + const completion: coresdk.workflow_completion.IWorkflowActivationCompletion = { + successful: { + commands: [ + { userMetadata: { summary: payload('summary'), details: payload('details') } }, + { updateResponse: { rejected: { encodedAttributes: payload('rejection') } } }, + ], + }, + }; + + const seen: string[] = []; + await visitWorkflowActivationCompletion(completion, async (payloads) => { + payloads.forEach((p) => seen.push(read(p))); + return payloads.map((p) => payload(`${read(p)}!`)); + }); + + t.deepEqual(seen.slice().sort(), ['details', 'rejection', 'summary']); + const commands = completion.successful!.commands!; + t.is(read(commands[0]!.userMetadata!.summary!), 'summary!'); + t.is(read(commands[0]!.userMetadata!.details!), 'details!'); + t.is(read(commands[1]!.updateResponse!.rejected!.encodedAttributes!), 'rejection!'); +}); + +test('walks a decoded protobuf message instance, not just object literals', async (t) => { + const Type = coresdk.workflow_completion.WorkflowActivationCompletion; + const decoded = Type.decode(Type.encode(completionWith('a', 'b')).finish()); + + await visitWorkflowActivationCompletion(decoded, async (payloads) => payloads.map((p) => payload(`${read(p)}!`))); + + const results = decoded.successful!.commands!.map((c) => read(c.completeWorkflowExecution!.result!)); + t.deepEqual(results, ['a!', 'b!']); +}); + +test('an identity transform leaves a real message byte-for-byte unchanged', async (t) => { + const Type = coresdk.workflow_activation.WorkflowActivation; + const original = Type.encode(activationWithEveryPayloadSite()).finish(); + const decoded = Type.decode(original); + + await visitWorkflowActivation(decoded, async (payloads) => payloads); + + t.deepEqual(Buffer.from(Type.encode(decoded).finish()), Buffer.from(original)); +}); + +test('concurrency caps in-flight transforms across the whole activation', async (t) => { + const tracked = () => { + let active = 0; + let max = 0; + const gate = createGate(); + const transform: PayloadTransform = async (payloads) => { + active += 1; + max = Math.max(max, active); + await gate.wait(); + active -= 1; + return payloads; + }; + return { transform, open: gate.open, max: () => max }; + }; + + const capped = tracked(); + const cappedVisit = visitWorkflowActivation(activationWithEveryPayloadSite(), capped.transform, { concurrency: 4 }); + await tick(); + t.is(capped.max(), 4); + capped.open(); + await cappedVisit; + + const sequential = tracked(); + const sequentialVisit = visitWorkflowActivation(activationWithEveryPayloadSite(), sequential.transform, { + concurrency: 1, + }); + await tick(); + t.is(sequential.max(), 1); + sequential.open(); + await sequentialVisit; +}); + +test('aborting mid-walk rejects and cancels in-flight transforms', async (t) => { + const controller = new AbortController(); + let aborted = 0; + const transform: PayloadTransform = async (payloads, signal) => { + try { + await untilAborted(signal!); + return payloads; + } catch (reason) { + aborted += 1; + throw reason; + } + }; + + const visiting = visitWorkflowActivation(activationWithEveryPayloadSite(), transform, { + concurrency: 16, + abortSignal: controller.signal, + }); + await tick(); + controller.abort(new Error('shutdown')); + + const error = await t.throwsAsync(visiting); + t.is(error?.message, 'shutdown'); + t.true(aborted > 0, 'in-flight transforms received the forwarded abort signal'); +}); diff --git a/packages/common/src/internal-non-workflow/payload-visitor.ts b/packages/common/src/internal-non-workflow/payload-visitor.ts new file mode 100644 index 0000000000..78ca1468c6 --- /dev/null +++ b/packages/common/src/internal-non-workflow/payload-visitor.ts @@ -0,0 +1,167 @@ +import type { coresdk } from '@temporalio/proto'; +import { + walkWorkflowActivation, + walkWorkflowActivationCompletion, +} from '@temporalio/proto/lib/payload-visitor.generated'; +import type { Payload } from '../interfaces'; + +/** + * Receives the payloads found at one payload-bearing field of a proto message and returns + * their replacements. The input array MAY have length 0, and the returned array MUST have the + * same length. + * + * `abortSignal` fires when the walk is aborted or when a sibling site's transform rejects. + * + * @internal + * @experimental + */ +export type PayloadTransform = (payloads: Payload[], abortSignal?: AbortSignal) => Promise; + +/** + * @internal + * @experimental + */ +export interface VisitOptions { + /** + * Maximum number of {@link PayloadTransform} calls in flight at once, across all sites in + * the message. Defaults to 1, in which case sites are visited sequentially. + */ + concurrency?: number; + + /** + * Aborts the walk. Already-running transforms are awaited (never left running in the + * background) and not-yet-started ones are skipped. + */ + abortSignal?: AbortSignal; +} + +/** + * A counting semaphore. `acquire` resolves once a permit is available; `release` returns one, + * handing it directly to the longest-waiting acquirer if any. + */ +class Semaphore { + private permits: number; + private readonly waiters: Array<() => void> = []; + + constructor(permits: number) { + this.permits = permits; + } + + async acquire(): Promise { + if (this.permits > 0) { + this.permits -= 1; + return; + } + return new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + release(): void { + const waiter = this.waiters.shift(); + if (waiter !== undefined) { + waiter(); + } else { + this.permits += 1; + } + } +} + +/** + * Wraps a {@link PayloadTransform} so that at most `concurrency` transformations run at once. + * The first rejection aborts the rest via `failure` AbortController and subsequent calls are + * skipped. + * + * @internal + * @experimental + */ +export function boundTransform( + transform: PayloadTransform, + concurrency: number, + failure: AbortController +): PayloadTransform { + const semaphore = new Semaphore(Math.max(1, concurrency)); + + return async (payloads) => { + failure.signal.throwIfAborted(); + await semaphore.acquire(); + try { + failure.signal.throwIfAborted(); + return await transform(payloads, failure.signal); + } catch (reason) { + failure.abort(reason); + throw reason; + } finally { + semaphore.release(); + } + }; +} + +/** + * Awaits every promise a walk produced, then throws the first rejection in traversal order. + * `allSettled` guarantees no in-flight transform is left running on the error path. + * + * @internal + * @experimental + */ +export async function drain(pending: Promise[]): Promise { + const results = await Promise.allSettled(pending); + for (const result of results) { + if (result.status === 'rejected') { + throw result.reason; + } + } +} + +async function runVisit( + walk: (root: Root, transform: PayloadTransform) => Promise[], + root: Root, + transform: PayloadTransform, + { concurrency = 1, abortSignal }: VisitOptions = {} +): Promise { + const abortController = new AbortController(); + let removeListener: (() => void) | undefined; + if (abortSignal) { + if (abortSignal.aborted) { + abortController.abort(abortSignal.reason); + } else { + const onAbort = () => abortController.abort(abortSignal.reason); + abortSignal.addEventListener('abort', onAbort, { once: true }); + removeListener = () => abortSignal.removeEventListener('abort', onAbort); + } + } + try { + await drain(walk(root, boundTransform(transform, concurrency, abortController))); + } finally { + removeListener?.(); + } +} + +/** + * Applies `transform` to every {@link Payload} in a `WorkflowActivation`, mutating it in place. + * + * @internal + * @experimental + */ +export function visitWorkflowActivation( + root: coresdk.workflow_activation.IWorkflowActivation, + transform: PayloadTransform, + options?: VisitOptions +): Promise { + return runVisit(walkWorkflowActivation, root, transform, options); +} + +/** + * Applies `transform` to every {@link Payload} in a `WorkflowActivationCompletion`, mutating it + * in place. + * + * @internal + * @experimental + */ +export function visitWorkflowActivationCompletion( + root: coresdk.workflow_completion.IWorkflowActivationCompletion, + transform: PayloadTransform, + options?: VisitOptions +): Promise { + return runVisit(walkWorkflowActivationCompletion, root, transform, options); +} diff --git a/packages/proto/package.json b/packages/proto/package.json index 5cfd2a95c9..8d50900688 100644 --- a/packages/proto/package.json +++ b/packages/proto/package.json @@ -10,9 +10,10 @@ "lib/" ], "scripts": { - "build": "pnpm run build:protos && pnpm run build:ts", + "build": "pnpm run build:protos && pnpm run gen:payload-visitor && pnpm run build:ts", "build:ts": "tsc --build", - "build:protos": "tsx ./scripts/compile-proto.ts" + "build:protos": "tsx ./scripts/compile-proto.ts", + "gen:payload-visitor": "tsx ./scripts/gen-payload-visitor.ts" }, "keywords": [ "temporal", diff --git a/packages/proto/scripts/gen-payload-visitor.ts b/packages/proto/scripts/gen-payload-visitor.ts new file mode 100644 index 0000000000..a81d5c6c48 --- /dev/null +++ b/packages/proto/scripts/gen-payload-visitor.ts @@ -0,0 +1,201 @@ +import { resolve } from 'node:path'; +import { writeFileSync } from 'node:fs'; +import * as protobuf from 'protobufjs'; +import * as prettier from 'prettier'; + +// Generates `@temporalio/common`'s `internal-non-workflow/payload-visitor.generated.ts`: a +// synchronous walk of the WorkflowActivation / WorkflowActivationCompletion message trees that +// calls a PayloadTransform at every Payload-bearing field. +// Run explicitly via `pnpm gen:payload-visitor`. + +const PAYLOAD = 'temporal.api.common.v1.Payload'; +const ANY = 'google.protobuf.Any'; + +const ROOTS = [ + { type: 'coresdk.workflow_activation.WorkflowActivation', entry: 'walkWorkflowActivation' }, + { type: 'coresdk.workflow_completion.WorkflowActivationCompletion', entry: 'walkWorkflowActivationCompletion' }, +] as const; + +const jsonModule = require(resolve(__dirname, '../protos/json-module.js')); +const root = protobuf.Root.fromJSON(jsonModule); +root.resolveAll(); + +const fqn = (type: protobuf.Type): string => type.fullName.replace(/^\./, ''); +const fnName = (type: protobuf.Type): string => `walk_${fqn(type).replace(/\./g, '_')}`; +// The top-level proto namespace a type lives under, e.g. `coresdk` for `coresdk.workflow_activation.X`. +const topLevelNamespace = (type: protobuf.Type): string => fqn(type).split('.')[0]!; + +// The message a field points to, or undefined when the field is a scalar or enum (the only +// fields that can hold a Payload, directly or transitively, are message-typed). +const resolvedMessage = (field: protobuf.Field): protobuf.Type | undefined => + field.resolvedType instanceof protobuf.Type ? field.resolvedType : undefined; + +const byFqn = (a: protobuf.Type, b: protobuf.Type): number => fqn(a).localeCompare(fqn(b)); + +/** TS type reference for a message, e.g. `temporal.api.failure.v1.IFailure`. */ +function tsType(type: protobuf.Type): string { + const segments = fqn(type).split('.'); + const last = segments.length - 1; + segments[last] = `I${segments[last]}`; + return segments.join('.'); +} + +// All message types reachable from the roots, excluding the terminal Payload and the opaque Any. +const reachableTypes = (): Map => { + const types = new Map(); + const discover = (type: protobuf.Type): void => { + const key = fqn(type); + if (key === PAYLOAD || key === ANY || types.has(key)) return; + types.set(key, type); + for (const field of type.fieldsArray) { + const message = resolvedMessage(field); + if (message) discover(message); + } + }; + for (const { type } of ROOTS) discover(root.lookupType(type)); + return types; +}; + +const types = reachableTypes(); + +// Which of those types can contain a Payload, directly or nested inside another message? We work +// backwards from Payload. First index the reverse references: `referrers.get(X)` is every type +// that has a field of type X. +const referrers = new Map(); +for (const type of types.values()) { + for (const field of type.fieldsArray) { + const target = resolvedMessage(field); + if (!target) continue; + const list = referrers.get(fqn(target)) ?? []; + list.push(fqn(type)); + referrers.set(fqn(target), list); + } +} + +// Then, starting from Payload, mark every type that can reach it. +const reachesPayload = new Set(); +const toVisit = [PAYLOAD]; +while (toVisit.length > 0) { + const target = toVisit.pop()!; + for (const referrer of referrers.get(target) ?? []) { + if (reachesPayload.has(referrer)) continue; + reachesPayload.add(referrer); + toVisit.push(referrer); + } +} + +// A message contains a Payload if it is one, or if it was marked above. +const hasPayload = (type: protobuf.Type): boolean => fqn(type) === PAYLOAD || reachesPayload.has(fqn(type)); + +/** Every reachable message type that needs a walker (i.e. can contain a Payload), sorted by name. */ +const collectWalkers = (): protobuf.Type[] => [...types.values()].filter(hasPayload).sort(byFqn); + +/** Lines of the walker body for one field, or [] if the field reaches no payload. */ +function emitField(field: protobuf.Field): string[] { + const message = resolvedMessage(field); + if (!message || !hasPayload(message)) return []; + + const access = `o.${field.name}`; + const isPayload = fqn(message) === PAYLOAD; + + if (isPayload) { + if (field.map) { + return [ + `const m = ${access};`, + `if (m) for (const [k, v] of Object.entries(m)) pending.push(visit([v]).then((r) => { m[k] = one(r); }));`, + ]; + } + if (field.repeated) { + return [ + `const a = ${access};`, + `if (a && a.length) pending.push(visit(a).then((r) => { ${access} = many(r, a.length); }));`, + ]; + } + return [`const p = ${access};`, `if (p != null) pending.push(visit([p]).then((r) => { ${access} = one(r); }));`]; + } + + const walk = fnName(message); + if (field.map) { + return [`const m = ${access};`, `if (m) for (const v of Object.values(m)) ${walk}(v, visit, pending);`]; + } + if (field.repeated) { + return [`const a = ${access};`, `if (a) for (const v of a) ${walk}(v, visit, pending);`]; + } + return [`const c = ${access};`, `if (c != null) ${walk}(c, visit, pending);`]; +} + +function emitWalker(type: protobuf.Type): string { + // Each field's statements get their own `{ }` block so the emitted per-field locals don't + // collide. Indentation is left to prettier, which formats the generated file afterward. + const blocks = type.fieldsArray + .map(emitField) + .filter((lines) => lines.length > 0) + .map((lines) => `{\n${lines.join('\n')}\n}`); + return [ + `function ${fnName(type)}(o: ${tsType(type)}, visit: Visit, pending: Promise[]): void {`, + ...blocks, + `}`, + ].join('\n'); +} + +// Prepended to the generated file. `one`/`many` enforce that a transform returns exactly as +// many payloads as it received, failing loudly instead of silently corrupting the message. +const RUNTIME_HELPERS = ` +function one(payloads: Payload[]): Payload { + if (payloads.length !== 1) { + throw new Error(\`payload visitor: a singular field transform returned \${payloads.length} payloads, expected 1\`); + } + return payloads[0]!; +} + +function many(payloads: Payload[], expected: number): Payload[] { + if (payloads.length !== expected) { + throw new Error(\`payload visitor: a repeated field transform returned \${payloads.length} payloads, expected \${expected}\`); + } + return payloads; +}`; + +function emit(): string { + const walkers = collectWalkers(); + const namespaces = [...new Set(walkers.map(topLevelNamespace))].sort(); + + const entries = ROOTS.map(({ type, entry }) => { + const t = root.lookupType(type); + return [ + `export function ${entry}(root: ${tsType(t)}, visit: Visit): Promise[] {`, + ` const pending: Promise[] = [];`, + ` ${fnName(t)}(root, visit, pending);`, + ` return pending;`, + `}`, + ].join('\n'); + }); + + return [ + `// Code generated by packages/proto/scripts/gen-payload-visitor.ts. DO NOT EDIT.`, + ``, + `import type { ${namespaces.join(', ')} } from '../protos/root';`, + ``, + `type Payload = temporal.api.common.v1.IPayload;`, + `type Visit = (payloads: Payload[], abortSignal?: AbortSignal) => Promise;`, + RUNTIME_HELPERS, + ``, + entries.join('\n\n'), + ``, + walkers.map(emitWalker).join('\n\n'), + ``, + ].join('\n'); +} + +const outPath = resolve(__dirname, '../src/payload-visitor.generated.ts'); + +async function main(): Promise { + const config = await prettier.resolveConfig(outPath); + const formatted = await prettier.format(emit(), { ...config, parser: 'typescript' }); + writeFileSync(outPath, formatted); + console.log(`Wrote ${outPath}`); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/packages/proto/src/payload-visitor.generated.ts b/packages/proto/src/payload-visitor.generated.ts new file mode 100644 index 0000000000..ebc82a08ab --- /dev/null +++ b/packages/proto/src/payload-visitor.generated.ts @@ -0,0 +1,977 @@ +// Code generated by packages/proto/scripts/gen-payload-visitor.ts. DO NOT EDIT. + +import type { coresdk, temporal } from '../protos/root'; + +type Payload = temporal.api.common.v1.IPayload; +type Visit = (payloads: Payload[], abortSignal?: AbortSignal) => Promise; + +function one(payloads: Payload[]): Payload { + if (payloads.length !== 1) { + throw new Error(`payload visitor: a singular field transform returned ${payloads.length} payloads, expected 1`); + } + return payloads[0]!; +} + +function many(payloads: Payload[], expected: number): Payload[] { + if (payloads.length !== expected) { + throw new Error( + `payload visitor: a repeated field transform returned ${payloads.length} payloads, expected ${expected}` + ); + } + return payloads; +} + +export function walkWorkflowActivation( + root: coresdk.workflow_activation.IWorkflowActivation, + visit: Visit +): Promise[] { + const pending: Promise[] = []; + walk_coresdk_workflow_activation_WorkflowActivation(root, visit, pending); + return pending; +} + +export function walkWorkflowActivationCompletion( + root: coresdk.workflow_completion.IWorkflowActivationCompletion, + visit: Visit +): Promise[] { + const pending: Promise[] = []; + walk_coresdk_workflow_completion_WorkflowActivationCompletion(root, visit, pending); + return pending; +} + +function walk_coresdk_activity_result_ActivityResolution( + o: coresdk.activity_result.IActivityResolution, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.completed; + if (c != null) walk_coresdk_activity_result_Success(c, visit, pending); + } + { + const c = o.failed; + if (c != null) walk_coresdk_activity_result_Failure(c, visit, pending); + } + { + const c = o.cancelled; + if (c != null) walk_coresdk_activity_result_Cancellation(c, visit, pending); + } +} + +function walk_coresdk_activity_result_Cancellation( + o: coresdk.activity_result.ICancellation, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_activity_result_Failure( + o: coresdk.activity_result.IFailure, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_activity_result_Success( + o: coresdk.activity_result.ISuccess, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.result; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.result = one(r); + }) + ); + } +} + +function walk_coresdk_child_workflow_Cancellation( + o: coresdk.child_workflow.ICancellation, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_child_workflow_ChildWorkflowResult( + o: coresdk.child_workflow.IChildWorkflowResult, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.completed; + if (c != null) walk_coresdk_child_workflow_Success(c, visit, pending); + } + { + const c = o.failed; + if (c != null) walk_coresdk_child_workflow_Failure(c, visit, pending); + } + { + const c = o.cancelled; + if (c != null) walk_coresdk_child_workflow_Cancellation(c, visit, pending); + } +} + +function walk_coresdk_child_workflow_Failure( + o: coresdk.child_workflow.IFailure, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_child_workflow_Success( + o: coresdk.child_workflow.ISuccess, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.result; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.result = one(r); + }) + ); + } +} + +function walk_coresdk_nexus_NexusOperationResult( + o: coresdk.nexus.INexusOperationResult, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.completed; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.completed = one(r); + }) + ); + } + { + const c = o.failed; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } + { + const c = o.cancelled; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } + { + const c = o.timedOut; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_DoUpdate( + o: coresdk.workflow_activation.IDoUpdate, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.input; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.input = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_activation_InitializeWorkflow( + o: coresdk.workflow_activation.IInitializeWorkflow, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.arguments; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.arguments = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const c = o.continuedFailure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } + { + const c = o.lastCompletionResult; + if (c != null) walk_temporal_api_common_v1_Payloads(c, visit, pending); + } + { + const c = o.memo; + if (c != null) walk_temporal_api_common_v1_Memo(c, visit, pending); + } + { + const c = o.searchAttributes; + if (c != null) walk_temporal_api_common_v1_SearchAttributes(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_QueryWorkflow( + o: coresdk.workflow_activation.IQueryWorkflow, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.arguments; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.arguments = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_activation_ResolveActivity( + o: coresdk.workflow_activation.IResolveActivity, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.result; + if (c != null) walk_coresdk_activity_result_ActivityResolution(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveChildWorkflowExecution( + o: coresdk.workflow_activation.IResolveChildWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.result; + if (c != null) walk_coresdk_child_workflow_ChildWorkflowResult(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart( + o: coresdk.workflow_activation.IResolveChildWorkflowExecutionStart, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.cancelled; + if (c != null) walk_coresdk_workflow_activation_ResolveChildWorkflowExecutionStartCancelled(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveChildWorkflowExecutionStartCancelled( + o: coresdk.workflow_activation.IResolveChildWorkflowExecutionStartCancelled, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveNexusOperation( + o: coresdk.workflow_activation.IResolveNexusOperation, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.result; + if (c != null) walk_coresdk_nexus_NexusOperationResult(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveNexusOperationStart( + o: coresdk.workflow_activation.IResolveNexusOperationStart, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failed; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow( + o: coresdk.workflow_activation.IResolveRequestCancelExternalWorkflow, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_ResolveSignalExternalWorkflow( + o: coresdk.workflow_activation.IResolveSignalExternalWorkflow, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_activation_SignalWorkflow( + o: coresdk.workflow_activation.ISignalWorkflow, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.input; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.input = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_activation_WorkflowActivation( + o: coresdk.workflow_activation.IWorkflowActivation, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.jobs; + if (a) for (const v of a) walk_coresdk_workflow_activation_WorkflowActivationJob(v, visit, pending); + } +} + +function walk_coresdk_workflow_activation_WorkflowActivationJob( + o: coresdk.workflow_activation.IWorkflowActivationJob, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.initializeWorkflow; + if (c != null) walk_coresdk_workflow_activation_InitializeWorkflow(c, visit, pending); + } + { + const c = o.queryWorkflow; + if (c != null) walk_coresdk_workflow_activation_QueryWorkflow(c, visit, pending); + } + { + const c = o.signalWorkflow; + if (c != null) walk_coresdk_workflow_activation_SignalWorkflow(c, visit, pending); + } + { + const c = o.resolveActivity; + if (c != null) walk_coresdk_workflow_activation_ResolveActivity(c, visit, pending); + } + { + const c = o.resolveChildWorkflowExecutionStart; + if (c != null) walk_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(c, visit, pending); + } + { + const c = o.resolveChildWorkflowExecution; + if (c != null) walk_coresdk_workflow_activation_ResolveChildWorkflowExecution(c, visit, pending); + } + { + const c = o.resolveSignalExternalWorkflow; + if (c != null) walk_coresdk_workflow_activation_ResolveSignalExternalWorkflow(c, visit, pending); + } + { + const c = o.resolveRequestCancelExternalWorkflow; + if (c != null) walk_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(c, visit, pending); + } + { + const c = o.doUpdate; + if (c != null) walk_coresdk_workflow_activation_DoUpdate(c, visit, pending); + } + { + const c = o.resolveNexusOperationStart; + if (c != null) walk_coresdk_workflow_activation_ResolveNexusOperationStart(c, visit, pending); + } + { + const c = o.resolveNexusOperation; + if (c != null) walk_coresdk_workflow_activation_ResolveNexusOperation(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_CompleteWorkflowExecution( + o: coresdk.workflow_commands.ICompleteWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.result; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.result = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_commands_ContinueAsNewWorkflowExecution( + o: coresdk.workflow_commands.IContinueAsNewWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.arguments; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.arguments = many(r, a.length); + }) + ); + } + { + const m = o.memo; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const c = o.searchAttributes; + if (c != null) walk_temporal_api_common_v1_SearchAttributes(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_FailWorkflowExecution( + o: coresdk.workflow_commands.IFailWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_ModifyWorkflowProperties( + o: coresdk.workflow_commands.IModifyWorkflowProperties, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.upsertedMemo; + if (c != null) walk_temporal_api_common_v1_Memo(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_QueryResult( + o: coresdk.workflow_commands.IQueryResult, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.succeeded; + if (c != null) walk_coresdk_workflow_commands_QuerySuccess(c, visit, pending); + } + { + const c = o.failed; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_QuerySuccess( + o: coresdk.workflow_commands.IQuerySuccess, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.response; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.response = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_commands_ScheduleActivity( + o: coresdk.workflow_commands.IScheduleActivity, + visit: Visit, + pending: Promise[] +): void { + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const a = o.arguments; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.arguments = many(r, a.length); + }) + ); + } +} + +function walk_coresdk_workflow_commands_ScheduleLocalActivity( + o: coresdk.workflow_commands.IScheduleLocalActivity, + visit: Visit, + pending: Promise[] +): void { + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const a = o.arguments; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.arguments = many(r, a.length); + }) + ); + } +} + +function walk_coresdk_workflow_commands_ScheduleNexusOperation( + o: coresdk.workflow_commands.IScheduleNexusOperation, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.input; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.input = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_commands_SignalExternalWorkflowExecution( + o: coresdk.workflow_commands.ISignalExternalWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.args; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.args = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_commands_StartChildWorkflowExecution( + o: coresdk.workflow_commands.IStartChildWorkflowExecution, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.input; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.input = many(r, a.length); + }) + ); + } + { + const m = o.headers; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const m = o.memo; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } + { + const c = o.searchAttributes; + if (c != null) walk_temporal_api_common_v1_SearchAttributes(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_UpdateResponse( + o: coresdk.workflow_commands.IUpdateResponse, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.rejected; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } + { + const p = o.completed; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.completed = one(r); + }) + ); + } +} + +function walk_coresdk_workflow_commands_UpsertWorkflowSearchAttributes( + o: coresdk.workflow_commands.IUpsertWorkflowSearchAttributes, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.searchAttributes; + if (c != null) walk_temporal_api_common_v1_SearchAttributes(c, visit, pending); + } +} + +function walk_coresdk_workflow_commands_WorkflowCommand( + o: coresdk.workflow_commands.IWorkflowCommand, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.userMetadata; + if (c != null) walk_temporal_api_sdk_v1_UserMetadata(c, visit, pending); + } + { + const c = o.scheduleActivity; + if (c != null) walk_coresdk_workflow_commands_ScheduleActivity(c, visit, pending); + } + { + const c = o.respondToQuery; + if (c != null) walk_coresdk_workflow_commands_QueryResult(c, visit, pending); + } + { + const c = o.completeWorkflowExecution; + if (c != null) walk_coresdk_workflow_commands_CompleteWorkflowExecution(c, visit, pending); + } + { + const c = o.failWorkflowExecution; + if (c != null) walk_coresdk_workflow_commands_FailWorkflowExecution(c, visit, pending); + } + { + const c = o.continueAsNewWorkflowExecution; + if (c != null) walk_coresdk_workflow_commands_ContinueAsNewWorkflowExecution(c, visit, pending); + } + { + const c = o.startChildWorkflowExecution; + if (c != null) walk_coresdk_workflow_commands_StartChildWorkflowExecution(c, visit, pending); + } + { + const c = o.signalExternalWorkflowExecution; + if (c != null) walk_coresdk_workflow_commands_SignalExternalWorkflowExecution(c, visit, pending); + } + { + const c = o.scheduleLocalActivity; + if (c != null) walk_coresdk_workflow_commands_ScheduleLocalActivity(c, visit, pending); + } + { + const c = o.upsertWorkflowSearchAttributes; + if (c != null) walk_coresdk_workflow_commands_UpsertWorkflowSearchAttributes(c, visit, pending); + } + { + const c = o.modifyWorkflowProperties; + if (c != null) walk_coresdk_workflow_commands_ModifyWorkflowProperties(c, visit, pending); + } + { + const c = o.updateResponse; + if (c != null) walk_coresdk_workflow_commands_UpdateResponse(c, visit, pending); + } + { + const c = o.scheduleNexusOperation; + if (c != null) walk_coresdk_workflow_commands_ScheduleNexusOperation(c, visit, pending); + } +} + +function walk_coresdk_workflow_completion_Failure( + o: coresdk.workflow_completion.IFailure, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.failure; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } +} + +function walk_coresdk_workflow_completion_Success( + o: coresdk.workflow_completion.ISuccess, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.commands; + if (a) for (const v of a) walk_coresdk_workflow_commands_WorkflowCommand(v, visit, pending); + } +} + +function walk_coresdk_workflow_completion_WorkflowActivationCompletion( + o: coresdk.workflow_completion.IWorkflowActivationCompletion, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.successful; + if (c != null) walk_coresdk_workflow_completion_Success(c, visit, pending); + } + { + const c = o.failed; + if (c != null) walk_coresdk_workflow_completion_Failure(c, visit, pending); + } +} + +function walk_temporal_api_common_v1_Memo( + o: temporal.api.common.v1.IMemo, + visit: Visit, + pending: Promise[] +): void { + { + const m = o.fields; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_temporal_api_common_v1_Payloads( + o: temporal.api.common.v1.IPayloads, + visit: Visit, + pending: Promise[] +): void { + { + const a = o.payloads; + if (a && a.length) + pending.push( + visit(a).then((r) => { + o.payloads = many(r, a.length); + }) + ); + } +} + +function walk_temporal_api_common_v1_SearchAttributes( + o: temporal.api.common.v1.ISearchAttributes, + visit: Visit, + pending: Promise[] +): void { + { + const m = o.indexedFields; + if (m) + for (const [k, v] of Object.entries(m)) + pending.push( + visit([v]).then((r) => { + m[k] = one(r); + }) + ); + } +} + +function walk_temporal_api_failure_v1_ApplicationFailureInfo( + o: temporal.api.failure.v1.IApplicationFailureInfo, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.details; + if (c != null) walk_temporal_api_common_v1_Payloads(c, visit, pending); + } +} + +function walk_temporal_api_failure_v1_CanceledFailureInfo( + o: temporal.api.failure.v1.ICanceledFailureInfo, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.details; + if (c != null) walk_temporal_api_common_v1_Payloads(c, visit, pending); + } +} + +function walk_temporal_api_failure_v1_Failure( + o: temporal.api.failure.v1.IFailure, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.encodedAttributes; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.encodedAttributes = one(r); + }) + ); + } + { + const c = o.cause; + if (c != null) walk_temporal_api_failure_v1_Failure(c, visit, pending); + } + { + const c = o.applicationFailureInfo; + if (c != null) walk_temporal_api_failure_v1_ApplicationFailureInfo(c, visit, pending); + } + { + const c = o.timeoutFailureInfo; + if (c != null) walk_temporal_api_failure_v1_TimeoutFailureInfo(c, visit, pending); + } + { + const c = o.canceledFailureInfo; + if (c != null) walk_temporal_api_failure_v1_CanceledFailureInfo(c, visit, pending); + } + { + const c = o.resetWorkflowFailureInfo; + if (c != null) walk_temporal_api_failure_v1_ResetWorkflowFailureInfo(c, visit, pending); + } +} + +function walk_temporal_api_failure_v1_ResetWorkflowFailureInfo( + o: temporal.api.failure.v1.IResetWorkflowFailureInfo, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.lastHeartbeatDetails; + if (c != null) walk_temporal_api_common_v1_Payloads(c, visit, pending); + } +} + +function walk_temporal_api_failure_v1_TimeoutFailureInfo( + o: temporal.api.failure.v1.ITimeoutFailureInfo, + visit: Visit, + pending: Promise[] +): void { + { + const c = o.lastHeartbeatDetails; + if (c != null) walk_temporal_api_common_v1_Payloads(c, visit, pending); + } +} + +function walk_temporal_api_sdk_v1_UserMetadata( + o: temporal.api.sdk.v1.IUserMetadata, + visit: Visit, + pending: Promise[] +): void { + { + const p = o.summary; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.summary = one(r); + }) + ); + } + { + const p = o.details; + if (p != null) + pending.push( + visit([p]).then((r) => { + o.details = one(r); + }) + ); + } +}