From 00c1c80b30eafb69ba67084a0fc6170da68a09e8 Mon Sep 17 00:00:00 2001 From: wardpeet Date: Wed, 29 Apr 2026 23:54:36 +0200 Subject: [PATCH] feat(worker): allow async configureWorker and configureReplayWorker plugin hooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin hooks on the Worker side previously had to be synchronous. This made it awkward for plugins that need to perform asynchronous setup — for example, loading remote configuration, dynamically resolving workflow paths, or initializing instrumentation that exposes async factories — since plugin authors had to do that work eagerly outside the hook and assemble the result synchronously. This change widens the return type of `WorkerPlugin.configureWorker` and `WorkerPlugin.configureReplayWorker` to also accept a Promise, and awaits the result at both call sites in `Worker`. Synchronous implementations continue to work unchanged (`await` is transparent on non-Promise values), so this is fully backwards compatible. Tests: - Add `AsyncExamplePlugin` mirroring `ExamplePlugin` but with an async `configureWorker` that yields to the event loop before returning. - Add two new tests (`Basic plugin with async hooks` and `Bundler plugins are passed from worker with async hooks`) that mirror the existing sync plugin tests, proving the worker is fully constructed and runs a workflow end-to-end after the async hook resolves. --- packages/test/src/test-plugins.ts | 70 +++++++++++++++++++++++++++ packages/worker/src/worker-options.ts | 4 +- packages/worker/src/worker.ts | 4 +- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/packages/test/src/test-plugins.ts b/packages/test/src/test-plugins.ts index 735cb1fc7..181f86847 100644 --- a/packages/test/src/test-plugins.ts +++ b/packages/test/src/test-plugins.ts @@ -246,3 +246,73 @@ test('SimplePlugin with activities merges them correctly', async (t) => { t.truthy(worker.options.activities.has('existingActivity')); t.truthy(worker.options.activities.has('pluginActivity')); }); + +export class AsyncExamplePlugin implements WorkerPlugin, BundlerPlugin { + readonly name: string = 'async-example-plugin'; + + constructor() {} + + async configureWorker(config: WorkerOptions): Promise { + console.log('AsyncExamplePlugin: Configuring worker'); + await new Promise((resolve) => setImmediate(resolve)); + config.taskQueue = 'plugin-task-queue' + randomUUID(); + return config; + } + + configureBundler(config: BundleOptions): BundleOptions { + console.log('AsyncExamplePlugin: Configuring bundler'); + config.workflowsPath = require.resolve('./workflows/plugins'); + return config; + } +} + +test('Basic plugin with async hooks', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const plugin = new AsyncExamplePlugin(); + const bundle = await bundleWorkflowCode({ + workflowsPath: 'replaced', + plugins: [plugin], + }); + + const worker = await Worker.create({ + workflowBundle: bundle, + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [plugin], + }); + + await worker.runUntil(async () => { + t.true(worker.options.taskQueue.startsWith('plugin-task-queue')); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: worker.options.taskQueue, + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); + +test('Bundler plugins are passed from worker with async hooks', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [new AsyncExamplePlugin()], + }); + await worker.runUntil(async () => { + t.true(worker.options.taskQueue.startsWith('plugin-task-queue')); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: worker.options.taskQueue, + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index adc14aec8..19bc33869 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1211,7 +1211,7 @@ export interface WorkerPlugin { * the worker configuration before the worker is fully initialized. Plugins * can add activities, workflows, interceptors, or change other settings. */ - configureWorker?(options: WorkerOptions): WorkerOptions; + configureWorker?(options: WorkerOptions): WorkerOptions | Promise; /** * Hook called when creating a replay worker to allow modification of configuration. @@ -1220,7 +1220,7 @@ export interface WorkerPlugin { * the worker configuration before the worker is fully initialized. Plugins * can add workflows, interceptors, or change other settings. */ - configureReplayWorker?(options: ReplayWorkerOptions): ReplayWorkerOptions; + configureReplayWorker?(options: ReplayWorkerOptions): ReplayWorkerOptions | Promise; /** * Hook called when running a worker. diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 5ba752f94..dd152a001 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -493,7 +493,7 @@ export class Worker { options.plugins = (options.plugins ?? []).concat(options.connection?.plugins ?? []); for (const plugin of options.plugins) { if (plugin.configureWorker !== undefined) { - options = plugin.configureWorker(options); + options = await plugin.configureWorker(options); } } if (!options.taskQueue) { @@ -697,7 +697,7 @@ export class Worker { const plugins = options.plugins ?? []; for (const plugin of plugins) { if (plugin.configureReplayWorker !== undefined) { - options = plugin.configureReplayWorker(options); + options = await plugin.configureReplayWorker(options); } } const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor;