diff --git a/packages/test/src/test-plugins.ts b/packages/test/src/test-plugins.ts index 735cb1fc7..181f86847 100644 --- a/packages/test/src/test-plugins.ts +++ b/packages/test/src/test-plugins.ts @@ -246,3 +246,73 @@ test('SimplePlugin with activities merges them correctly', async (t) => { t.truthy(worker.options.activities.has('existingActivity')); t.truthy(worker.options.activities.has('pluginActivity')); }); + +export class AsyncExamplePlugin implements WorkerPlugin, BundlerPlugin { + readonly name: string = 'async-example-plugin'; + + constructor() {} + + async configureWorker(config: WorkerOptions): Promise { + console.log('AsyncExamplePlugin: Configuring worker'); + await new Promise((resolve) => setImmediate(resolve)); + config.taskQueue = 'plugin-task-queue' + randomUUID(); + return config; + } + + configureBundler(config: BundleOptions): BundleOptions { + console.log('AsyncExamplePlugin: Configuring bundler'); + config.workflowsPath = require.resolve('./workflows/plugins'); + return config; + } +} + +test('Basic plugin with async hooks', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const plugin = new AsyncExamplePlugin(); + const bundle = await bundleWorkflowCode({ + workflowsPath: 'replaced', + plugins: [plugin], + }); + + const worker = await Worker.create({ + workflowBundle: bundle, + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [plugin], + }); + + await worker.runUntil(async () => { + t.true(worker.options.taskQueue.startsWith('plugin-task-queue')); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: worker.options.taskQueue, + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); + +test('Bundler plugins are passed from worker with async hooks', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [new AsyncExamplePlugin()], + }); + await worker.runUntil(async () => { + t.true(worker.options.taskQueue.startsWith('plugin-task-queue')); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: worker.options.taskQueue, + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index adc14aec8..19bc33869 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1211,7 +1211,7 @@ export interface WorkerPlugin { * the worker configuration before the worker is fully initialized. Plugins * can add activities, workflows, interceptors, or change other settings. */ - configureWorker?(options: WorkerOptions): WorkerOptions; + configureWorker?(options: WorkerOptions): WorkerOptions | Promise; /** * Hook called when creating a replay worker to allow modification of configuration. @@ -1220,7 +1220,7 @@ export interface WorkerPlugin { * the worker configuration before the worker is fully initialized. Plugins * can add workflows, interceptors, or change other settings. */ - configureReplayWorker?(options: ReplayWorkerOptions): ReplayWorkerOptions; + configureReplayWorker?(options: ReplayWorkerOptions): ReplayWorkerOptions | Promise; /** * Hook called when running a worker. diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 5ba752f94..dd152a001 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -493,7 +493,7 @@ export class Worker { options.plugins = (options.plugins ?? []).concat(options.connection?.plugins ?? []); for (const plugin of options.plugins) { if (plugin.configureWorker !== undefined) { - options = plugin.configureWorker(options); + options = await plugin.configureWorker(options); } } if (!options.taskQueue) { @@ -697,7 +697,7 @@ export class Worker { const plugins = options.plugins ?? []; for (const plugin of plugins) { if (plugin.configureReplayWorker !== undefined) { - options = plugin.configureReplayWorker(options); + options = await plugin.configureReplayWorker(options); } } const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor;