From 8a298db56e8f7afa266ee485713494bc6b988daf Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Wed, 8 Apr 2026 19:29:39 +0200 Subject: [PATCH 1/9] feat: add request/response support to server ActionDispatcher Enable the server to send requests to the client and await responses, and to issue requests handled locally by server-side handlers. Complements the client-side changes in glsp-client. - Add `request()` and `requestUntil()` to ActionDispatcher - Intercept responses in `dispatch()` and `doDispatch()` to resolve pending requests - Translate `RejectAction` responses to promise rejections - Bypass action queue for nested requests to prevent deadlocks - Tighten `shouldForwardToClient()` to use `hasValidResponseId()` - Add async `handleClientRequest()` in `DefaultGLSPServer.process()` - Add tests for request/response, deadlocks, timeouts, late responses, and dispose cleanup Relates to https://github.com/eclipse-glsp/glsp/issues/607 --- .../common/actions/action-dispatcher.spec.ts | 246 +++++++++++++++++- .../src/common/actions/action-dispatcher.ts | 214 ++++++++++++++- .../common/actions/client-action-handler.ts | 2 +- .../server/src/common/protocol/glsp-server.ts | 31 +++ packages/server/src/common/test/mock-util.ts | 16 +- 5 files changed, 499 insertions(+), 10 deletions(-) diff --git a/packages/server/src/common/actions/action-dispatcher.spec.ts b/packages/server/src/common/actions/action-dispatcher.spec.ts index 35286f3..8444f70 100644 --- a/packages/server/src/common/actions/action-dispatcher.spec.ts +++ b/packages/server/src/common/actions/action-dispatcher.spec.ts @@ -13,7 +13,7 @@ * * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -import { Action, UpdateModelAction } from '@eclipse-glsp/protocol'; +import { Action, Deferred, RequestAction, ResponseAction, UpdateModelAction } from '@eclipse-glsp/protocol'; import { expect } from 'chai'; import { Container, ContainerModule } from 'inversify'; import * as sinon from 'sinon'; @@ -41,14 +41,16 @@ describe('test DefaultActionDispatcher', () => { let registry_get_stub: sinon.SinonStub<[string], ActionHandler[]>; const sandbox = sinon.createSandbox(); + const clientActionForwarderStub = sinon.createStubInstance(ClientActionForwarder); + container.load( new ContainerModule(bind => { bind(Logger).toConstantValue(new mock.StubLogger()); bind(ClientSessionManager).toConstantValue(new mock.StubClientSessionManager()); bind(ClientId).toConstantValue(clientId); bind(ActionHandlerRegistry).toConstantValue(actionHandlerRegistry); - bind(ClientActionKinds).toConstantValue(['response', 'response1', 'response2']); - bind(ClientActionForwarder).toConstantValue(sinon.createStubInstance(ClientActionForwarder)); + bind(ClientActionKinds).toConstantValue(new Set(['response', 'response1', 'response2'])); + bind(ClientActionForwarder).toConstantValue(clientActionForwarderStub); }) ); const actionDispatcher = container.resolve(DefaultActionDispatcher); @@ -288,4 +290,242 @@ describe('test DefaultActionDispatcher', () => { expect(spy_postUpdateHandler_execute.calledOnce); }); }); + + describe('test request/response', () => { + it('request - resolves when matching response is dispatched', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_1' + }; + const responseAction: ResponseAction = { + kind: 'testResponse', + responseId: 'req_1' + }; + + // Configure forwarder: testRequest is forwarded to the client + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + registry_get_stub.callsFake(() => []); + + const responsePromise = actionDispatcher.request(requestAction); + await actionDispatcher.dispatch(responseAction); + + const result = await responsePromise; + expect(result.responseId).to.equal('req_1'); + }); + + it('request - response bypasses queue even when queue is busy', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_deadlock' + }; + const responseAction: ResponseAction = { + kind: 'testResponse', + responseId: 'req_deadlock' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + + const handlerRunning = new Deferred(); + + const slowAction = 'slowAction'; + const slowHandler = new mock.StubActionHandler([slowAction]); + slowHandler.execute = async () => { + const resultPromise = actionDispatcher.request(requestAction); + handlerRunning.resolve(); + const result = await resultPromise; + expect(result.responseId).to.equal('req_deadlock'); + return []; + }; + registry_get_stub.callsFake((kind: string) => + kind === slowAction ? [slowHandler] : [] + ); + + const dispatchPromise = actionDispatcher.dispatch({ kind: slowAction }); + await handlerRunning.promise; + + // Response must resolve even though the queue is busy + await actionDispatcher.dispatch(responseAction); + await dispatchPromise; + }); + + it('request - resolves for locally handled request (server→server)', async () => { + const localRequestKind = 'localRequest'; + const localResponseKind = 'localResponse'; + + const handler = new mock.StubActionHandler([localRequestKind]); + sinon.stub(handler, 'execute').callsFake(() => { + const response: ResponseAction = { kind: localResponseKind, responseId: '' }; + return [response]; + }); + registry_get_stub.callsFake((kind: string) => + kind === localRequestKind ? [handler] : [] + ); + + const requestAction: RequestAction = { + kind: localRequestKind, + requestId: '' + }; + + const result = await actionDispatcher.request(requestAction); + expect(result).to.exist; + expect(result.responseId).to.equal(requestAction.requestId); + }); + + it('request - resolves for locally handled request called from inside a handler', async () => { + const innerRequestKind = 'innerRequest'; + const innerResponseKind = 'innerResponse'; + const outerActionKind = 'outerAction'; + + const innerHandler = new mock.StubActionHandler([innerRequestKind]); + sinon.stub(innerHandler, 'execute').callsFake(() => { + const response: ResponseAction = { kind: innerResponseKind, responseId: '' }; + return [response]; + }); + + const outerHandler = new mock.StubActionHandler([outerActionKind]); + outerHandler.execute = async () => { + const innerRequest: RequestAction = { + kind: innerRequestKind, + requestId: '' + }; + const result = await actionDispatcher.request(innerRequest); + expect(result).to.exist; + return []; + }; + + registry_get_stub.callsFake((kind: string) => { + if (kind === outerActionKind) return [outerHandler]; + if (kind === innerRequestKind) return [innerHandler]; + return []; + }); + + // This must not deadlock + await actionDispatcher.dispatch({ kind: outerActionKind }); + }); + + it('requestUntil - rejects on timeout when rejectOnTimeout is true', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_hard' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + registry_get_stub.callsFake(() => []); + + try { + await actionDispatcher.requestUntil(requestAction, 100, true); + expect.fail('Should have thrown'); + } catch (error: unknown) { + expect((error as Error).message).to.include('timed out'); + } + }); + + it('requestUntil - resolves undefined on timeout when rejectOnTimeout is false', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_soft' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + registry_get_stub.callsFake(() => []); + + const result = await actionDispatcher.requestUntil(requestAction, 100, false); + expect(result).to.be.undefined; + }); + + it('request - auto-generates requestId when empty', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: '' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + registry_get_stub.callsFake(() => []); + + const responsePromise = actionDispatcher.request(requestAction); + expect(requestAction.requestId).to.match(/^server_.*_\d+$/); + + await actionDispatcher.dispatch({ + kind: 'testResponse', + responseId: requestAction.requestId + } as ResponseAction); + + const result = await responsePromise; + expect(result).to.exist; + }); + + it('request - rejects when dispatch fails (no handler, not a client action)', async () => { + const requestAction: RequestAction = { + kind: 'unknownRequest', + requestId: 'req_fail' + }; + + // NOT forwarded to client, no handler registered → dispatch throws + clientActionForwarderStub.shouldForwardToClient.returns(false); + clientActionForwarderStub.handle.returns(false); + registry_get_stub.callsFake(() => []); + + try { + await actionDispatcher.request(requestAction); + expect.fail('Should have thrown'); + } catch (error: unknown) { + expect((error as Error).message).to.include('No handler registered'); + } + }); + + it('requestUntil - late response after timeout has responseId cleared', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_late' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + // Register a no-op handler for testResponse so the late response can be dispatched normally + const noopHandler = new mock.StubActionHandler(['testResponse']); + registry_get_stub.callsFake((kind: string) => + kind === 'testResponse' ? [noopHandler] : [] + ); + + // Request times out + try { + await actionDispatcher.requestUntil(requestAction, 50, true); + expect.fail('Should have thrown'); + } catch (error: unknown) { + expect((error as Error).message).to.include('timed out'); + } + + // Late response arrives — responseId should be cleared, dispatched as normal action + const lateResponse: ResponseAction = { kind: 'testResponse', responseId: 'req_late' }; + await actionDispatcher.dispatch(lateResponse); + expect(lateResponse.responseId).to.equal(''); + }); + + it('dispose - rejects all pending requests', async () => { + const requestAction: RequestAction = { + kind: 'testRequest', + requestId: 'req_dispose' + }; + + clientActionForwarderStub.shouldForwardToClient.callsFake(action => action.kind === 'testRequest'); + clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); + registry_get_stub.callsFake(() => []); + + const responsePromise = actionDispatcher.request(requestAction); + (actionDispatcher as any).dispose(); + + try { + await responsePromise; + expect.fail('Should have thrown'); + } catch (error: unknown) { + expect((error as Error).message).to.include('cancelled'); + expect((error as Error).message).to.include('req_dispose'); + } + }); + }); }); diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index 63fc208..05b5b8c 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -15,8 +15,10 @@ ********************************************************************************/ import { Action, + Deferred, Disposable, MaybeArray, + RejectAction, RequestAction, ResponseAction, SetModelAction, @@ -65,6 +67,45 @@ export interface ActionDispatcher { */ dispatchAfterNextUpdate(actions: Action[]): void; dispatchAfterNextUpdate(...actions: Action[]): void; + + /** + * Dispatches a request action and returns a promise that resolves when a matching response + * action is dispatched or rejects if the response is a {@link RejectAction}. The response is + * _not_ passed to the registered action handlers. Instead, it is the responsibility of the + * caller of this method to handle the response properly. + * + * If the request's `kind` is registered in `ClientActionKinds`, it is forwarded to the client + * via {@link ClientActionForwarder}. Otherwise it is dispatched locally through server-side + * handlers. + * + * The promise waits indefinitely until a response arrives or the dispatcher is disposed. + * Use {@link requestUntil} if a timeout is needed. + * + * @param action The request action to dispatch. + * @returns A promise that resolves with the matching response action. + */ + request(action: RequestAction): Promise; + + /** + * Dispatches a request and waits for a response until the timeout given in `timeoutMs` has + * been reached. The returned promise is resolved when a response with a matching identifier + * is dispatched or when the timeout has been reached. That response is _not_ passed to the + * registered action handlers. Instead, it is the responsibility of the caller of this method + * to handle the response properly. + * If `rejectOnTimeout` is set to `false` (default) the returned promise will be resolved with + * no value, otherwise it will be rejected. + * + * @param action The request action to dispatch. + * @param timeoutMs Maximum wait time in milliseconds. Defaults to + * {@link RequestAction.timeout} if set, otherwise 2000 ms. + * @param rejectOnTimeout Whether to reject the promise on timeout. + * @returns The matching response, or `undefined` on soft timeout. + */ + requestUntil( + action: RequestAction, + timeoutMs?: number, + rejectOnTimeout?: boolean + ): Promise; } @injectable() @@ -83,8 +124,26 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { protected actionQueue = new PromiseQueue(); protected postUpdateQueue: Action[] = []; + protected readonly pendingRequests = new Map>(); + protected readonly timeouts = new Map(); + protected nextRequestId = 1; + + /** + * Generates a unique request ID for this dispatcher. The `clientId` prefix makes IDs + * distinguishable across sessions in logs. + */ + protected generateRequestId(): string { + return `server_${this.clientId}_${this.nextRequestId++}`; + } dispatch(action: Action): Promise { + // Fast-path: resolve pending requests before the queue to prevent deadlock + // when request() is awaited inside a queued handler and the response + // arrives via process() -> dispatch(). + if (this.interceptPendingResponse(action)) { + return Promise.resolve(); + } + // Dont queue actions that are just delegated to the client if (this.clientActionForwarder.shouldForwardToClient(action)) { return this.doDispatch(action); @@ -93,6 +152,12 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } protected async doDispatch(action: Action): Promise { + // Intercept responses to pending requests produced by local handlers + // (via dispatchResponses). This enables server→server requests. + if (this.interceptPendingResponse(action)) { + return; + } + this.logger.debug('Dispatch action:', action.kind); const handledOnClient = this.clientActionForwarder.handle(action); @@ -107,10 +172,8 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { responses.push(...response); } - if (this.postUpdateQueue.length > 0 && (UpdateModelAction.is(action) || SetModelAction.is(action))) { - responses.push(...this.postUpdateQueue); - this.postUpdateQueue = []; - } + // Append post-update actions to responses (preserves original dispatch ordering) + responses.push(...this.drainPostUpdateQueue(action)); await this.dispatchResponses(responses); } @@ -144,8 +207,149 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } } + request(action: RequestAction): Promise { + return this.doRequest(action, undefined, true) as Promise; + } + + requestUntil( + action: RequestAction, + timeoutMs: number = action.timeout ?? 2000, + rejectOnTimeout = false + ): Promise { + return this.doRequest(action, timeoutMs, rejectOnTimeout); + } + + protected doRequest( + action: RequestAction, + timeoutMs: number | undefined, + rejectOnTimeout: boolean + ): Promise { + if (!action.requestId || action.requestId === '') { + action.requestId = this.generateRequestId(); + } + // Stamp the effective timeout onto the action so the receiving side + // (handleServerRequest/handleClientRequest) can respect it. + action.timeout = timeoutMs; + + const deferred = new Deferred(); + this.pendingRequests.set(action.requestId, deferred); + + if (timeoutMs !== undefined) { + const timeout = setTimeout(() => { + if (this.pendingRequests.delete(action.requestId)) { + // Intentionally keep the timeouts entry (do NOT delete). + // The stale entry signals "this request existed but timed out", + // matching the client-side GLSPActionDispatcher pattern. + // Cleaned up when the late response arrives or on dispose(). + const message = `Request '${action.requestId}' (${action.kind}) timed out after ${timeoutMs}ms`; + if (rejectOnTimeout) { + deferred.reject(new Error(message)); + } else { + this.logger.info(message); + deferred.resolve(undefined); + } + } + }, timeoutMs); + + this.timeouts.set(action.requestId, timeout); + } + + // When the queue is busy (request() called from inside a handler), + // bypass it via doDispatch() to avoid deadlock. + // When idle, go through dispatch() to preserve action ordering. + const dispatchPromise = this.actionQueue.isBusy + ? this.doDispatch(action) + : this.dispatch(action); + + dispatchPromise.catch(error => { + if (this.pendingRequests.delete(action.requestId)) { + const timeout = this.timeouts.get(action.requestId); + if (timeout !== undefined) { + clearTimeout(timeout); + this.timeouts.delete(action.requestId); + } + deferred.reject(error); + } + }); + + return deferred.promise as Promise; + } + + /** + * If the given action is a {@link SetModelAction} or {@link UpdateModelAction} and there are actions + * queued via {@link dispatchAfterNextUpdate}, drain and return them. Used in both the normal + * `doDispatch()` path and the `interceptPendingResponse()` short-circuit path. + * + * @returns The drained actions, or an empty array if nothing to drain. + */ + protected drainPostUpdateQueue(action: Action): Action[] { + if (this.postUpdateQueue.length > 0 && (UpdateModelAction.is(action) || SetModelAction.is(action))) { + const actions = [...this.postUpdateQueue]; + this.postUpdateQueue = []; + return actions; + } + return []; + } + + /** + * Checks whether the given action is a response matching a pending {@link request} or + * {@link requestUntil} call. If matched, resolves (or rejects) the corresponding deferred + * and returns `true` so the caller can short-circuit normal dispatch. + * + * For responses with a valid `responseId` but no matching pending request, checks for a stale + * timeout entry (timed-out request) and clears the `responseId` so the action is not forwarded + * by {@link ClientActionForwarder}. If no stale entry exists, the `responseId` is left intact + * for normal forwarding. + * + * Called from both `dispatch()` (for responses arriving from the client) and `doDispatch()` + * (for responses produced by local handlers). + */ + protected interceptPendingResponse(action: Action): boolean { + if (!ResponseAction.hasValidResponseId(action)) { + return false; + } + // Resolve pending request + const deferred = this.pendingRequests.get(action.responseId); + if (deferred !== undefined) { + this.pendingRequests.delete(action.responseId); + const timeout = this.timeouts.get(action.responseId); + if (timeout !== undefined) { + clearTimeout(timeout); + this.timeouts.delete(action.responseId); + } + // Drain post-update actions before resolving — in the intercept path + // there's no responses array, so dispatch them directly. + const postUpdateActions = this.drainPostUpdateQueue(action); + if (RejectAction.is(action)) { + deferred.reject(new Error(`${action.message}${action.detail ? ': ' + action.detail : ''}`)); + } else { + deferred.resolve(action); + } + if (postUpdateActions.length > 0) { + this.dispatchAll(postUpdateActions); + } + return true; + } + // No matching pending request — check for stale timeout (timed-out request). + // Only clear responseId if this response was tracked by request(). + const staleTimeout = this.timeouts.get(action.responseId); + if (staleTimeout !== undefined) { + clearTimeout(staleTimeout); + this.timeouts.delete(action.responseId); + this.logger.debug(`Late response for timed-out request '${action.responseId}', dispatching as normal action`); + action.responseId = ''; + } + return false; + } + dispose(): void { this.actionQueue.clear(); + this.pendingRequests.forEach((deferred, id) => + deferred.reject(new Error(`Request '${id}' cancelled: dispatcher disposed`)) + ); + this.pendingRequests.clear(); + this.timeouts.forEach(timeout => clearTimeout(timeout)); + this.timeouts.clear(); } } @@ -158,7 +362,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { */ export function respond(request: Action, response: Action): Action { if (RequestAction.is(request) && ResponseAction.is(response)) { - (response as any).responseId = request.requestId; + response.responseId = request.requestId; } return response; } diff --git a/packages/server/src/common/actions/client-action-handler.ts b/packages/server/src/common/actions/client-action-handler.ts index 2cfa8d7..13942c1 100644 --- a/packages/server/src/common/actions/client-action-handler.ts +++ b/packages/server/src/common/actions/client-action-handler.ts @@ -54,6 +54,6 @@ export class ClientActionForwarder { if (ClientAction.is(action)) { return false; } - return this.actionKinds.has(action.kind) || ResponseAction.is(action); + return this.actionKinds.has(action.kind) || ResponseAction.hasValidResponseId(action); } } diff --git a/packages/server/src/common/protocol/glsp-server.ts b/packages/server/src/common/protocol/glsp-server.ts index ef7571f..796551a 100644 --- a/packages/server/src/common/protocol/glsp-server.ts +++ b/packages/server/src/common/protocol/glsp-server.ts @@ -25,6 +25,9 @@ import { InitializeResult, MaybePromise, MessageAction, + RejectAction, + RequestAction, + ResponseAction, ServerActions, distinctAdd, remove @@ -150,9 +153,37 @@ export class DefaultGLSPServer implements GLSPServer { } const action = message.action; ClientAction.mark(action); + if (RequestAction.is(action)) { + this.handleClientRequest(clientSession, action, message.clientId); + return; + } clientSession.actionDispatcher.dispatch(action).catch(error => this.handleProcessError(message, error)); } + // Fire-and-forget: intentionally not awaited by process() + protected async handleClientRequest( + clientSession: ClientSession, + action: RequestAction, + clientId: string + ): Promise { + try { + const response = action.timeout !== undefined + ? await clientSession.actionDispatcher.requestUntil(action, action.timeout, true) + : await clientSession.actionDispatcher.request(action); + if (response) { + this.sendToClient({ clientId, action: response }); + } + } catch (error) { + const detail = error instanceof GLSPServerError ? error.cause?.toString?.() : error?.toString?.(); + this.logger.error(`Failed to handle request '${action.kind}' (${action.requestId}):`, detail); + const reject = RejectAction.create( + `Failed to handle request '${action.kind}' (${action.requestId})`, + { responseId: action.requestId, detail } + ); + this.sendToClient({ clientId, action: reject }); + } + } + protected handleProcessError(message: ActionMessage, reason: any): void | PromiseLike { let errorMsg = `Could not process action: '${message.action.kind}`; this.logger.error(errorMsg); diff --git a/packages/server/src/common/test/mock-util.ts b/packages/server/src/common/test/mock-util.ts index 755a560..76914cc 100644 --- a/packages/server/src/common/test/mock-util.ts +++ b/packages/server/src/common/test/mock-util.ts @@ -30,7 +30,9 @@ import { MaybeArray, MaybePromise, Point, + RequestAction, RequestEditValidationAction, + ResponseAction, ShapeTypeHint, ValidationStatus } from '@eclipse-glsp/protocol'; @@ -95,7 +97,7 @@ export function createClientSession( export class StubActionHandler implements ActionHandler { constructor(public actionKinds: string[]) {} - execute(action: Action): Action[] { + execute(action: Action): MaybePromise { return []; } } @@ -136,6 +138,18 @@ export class StubActionDispatcher implements ActionDispatcher { dispatchAll(...actions: MaybeArray[]): Promise { return Promise.resolve(); } + + request(action: RequestAction): Promise { + return Promise.reject(new Error('Not implemented in stub')); + } + + requestUntil( + action: RequestAction, + timeoutMs?: number, + rejectOnTimeout?: boolean + ): Promise { + return Promise.reject(new Error('Not implemented in stub')); + } } export class StubClientSessionFactory implements ClientSessionFactory { From f2b499b4742f67983d8df06fe7c725feb7cb5a9f Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Fri, 10 Apr 2026 09:48:43 +0200 Subject: [PATCH 2/9] fix: address PR review feedback - Document single-handler assumption and late/extra response behavior on request() - Shorten RejectAction/postUpdateQueue comment - Add dispatchDirectly() to bypass the action queue for actions that need immediate processing (e.g. progress notifications) - Use dispatchDirectly() for handler responses and nested requests - Remove client-action queue bypass from dispatch(); all actions are now enqueued to preserve sequential ordering - Extract sendResponseToClient() hook in DefaultGLSPServer --- .../src/common/actions/action-dispatcher.ts | 62 ++++++++++++------- .../model/model-submission-handler.ts | 2 +- .../model/request-model-action-handler.ts | 4 +- .../features/progress/progress-service.ts | 6 +- .../server/src/common/protocol/glsp-server.ts | 8 ++- packages/server/src/common/test/mock-util.ts | 4 ++ 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index 05b5b8c..c95c2b7 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -43,6 +43,8 @@ export const ActionDispatcher = Symbol('ActionDispatcher'); export interface ActionDispatcher { /** * Processes the given action by dispatching it to all registered handlers. + * Actions are enqueued to preserve sequential ordering. Response actions + * produced by handlers are dispatched directly via {@link dispatchDirectly}. * * @param action The action that should be dispatched. * @returns A promise indicating when the action processing is complete. @@ -58,6 +60,18 @@ export interface ActionDispatcher { dispatchAll(actions: Action[]): Promise; dispatchAll(...actions: Action[]): Promise; + /** + * Dispatches an action directly, bypassing the action queue. Use this for actions that + * need to be processed immediately, e.g. progress notifications sent from inside a handler. + * + * Actions dispatched this way are not sequenced with other queued actions. Callers are + * responsible for ensuring that concurrent execution is safe. + * + * @param action The action to dispatch directly. + * @returns A promise indicating when the action processing is complete. + */ + dispatchDirectly(action: Action): Promise; + /** * Processes all given actions, by dispatching them to the corresponding handlers, after the next model update. * The given actions are queued until the next model update cycle has been completed i.e. an @@ -74,9 +88,12 @@ export interface ActionDispatcher { * _not_ passed to the registered action handlers. Instead, it is the responsibility of the * caller of this method to handle the response properly. * - * If the request's `kind` is registered in `ClientActionKinds`, it is forwarded to the client - * via {@link ClientActionForwarder}. Otherwise it is dispatched locally through server-side - * handlers. + * The request is dispatched directly (bypassing the queue). If its `kind` is registered in + * `ClientActionKinds`, it is forwarded to the client via {@link ClientActionForwarder}. + * If server-side handlers are registered, they are also executed. + * + * Only the first matching response resolves the request. Any additional or late responses + * are dispatched as normal actions. * * The promise waits indefinitely until a response arrives or the dispatcher is disposed. * Use {@link requestUntil} if a timeout is needed. @@ -137,20 +154,16 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } dispatch(action: Action): Promise { - // Fast-path: resolve pending requests before the queue to prevent deadlock - // when request() is awaited inside a queued handler and the response - // arrives via process() -> dispatch(). if (this.interceptPendingResponse(action)) { return Promise.resolve(); } - - // Dont queue actions that are just delegated to the client - if (this.clientActionForwarder.shouldForwardToClient(action)) { - return this.doDispatch(action); - } return this.actionQueue.enqueue(() => this.doDispatch(action)); } + dispatchDirectly(action: Action): Promise { + return this.doDispatch(action); + } + protected async doDispatch(action: Action): Promise { // Intercept responses to pending requests produced by local handlers // (via dispatchResponses). This enables server→server requests. @@ -183,12 +196,16 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return responseActions.map(action => respond(request, action)); } + /** + * Dispatches response actions produced by handlers. Responses are dispatched directly + * (bypassing the queue) but sequenced relative to each other via an internal response queue. + */ protected dispatchResponses(actions: Action[]): Promise { if (actions.length === 0) { return Promise.resolve(); } const responseQueue = new PromiseQueue(); - const responses = actions.map(action => responseQueue.enqueue(() => this.doDispatch(action))); + const responses = actions.map(action => responseQueue.enqueue(() => this.dispatchDirectly(action))); return Promise.all(responses).then(() => Promise.resolve()); } @@ -254,12 +271,10 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { this.timeouts.set(action.requestId, timeout); } - // When the queue is busy (request() called from inside a handler), - // bypass it via doDispatch() to avoid deadlock. - // When idle, go through dispatch() to preserve action ordering. - const dispatchPromise = this.actionQueue.isBusy - ? this.doDispatch(action) - : this.dispatch(action); + // Always dispatch directly to avoid deadlock when request() is called + // from inside a queued handler. The response is intercepted before + // normal dispatch, so queue ordering is not affected. + const dispatchPromise = this.dispatchDirectly(action); dispatchPromise.catch(error => { if (this.pendingRequests.delete(action.requestId)) { @@ -301,8 +316,8 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { * by {@link ClientActionForwarder}. If no stale entry exists, the `responseId` is left intact * for normal forwarding. * - * Called from both `dispatch()` (for responses arriving from the client) and `doDispatch()` - * (for responses produced by local handlers). + * Called from `dispatch()` (for responses arriving from the client, before the queue) and + * from `doDispatch()` (for responses produced by local handlers via `dispatchDirectly`). */ protected interceptPendingResponse(action: Action): boolean { if (!ResponseAction.hasValidResponseId(action)) { @@ -317,8 +332,9 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { clearTimeout(timeout); this.timeouts.delete(action.responseId); } - // Drain post-update actions before resolving — in the intercept path - // there's no responses array, so dispatch them directly. + // Drain post-update actions. A RejectAction won't trigger a drain, + // so pending post-update actions remain queued until the next + // successful model update. const postUpdateActions = this.drainPostUpdateQueue(action); if (RejectAction.is(action)) { deferred.reject(new Error(`${action.message}${action.detail ? ': ' + action.detail : ''}`)); @@ -326,7 +342,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { deferred.resolve(action); } if (postUpdateActions.length > 0) { - this.dispatchAll(postUpdateActions); + this.dispatchResponses(postUpdateActions); } return true; } diff --git a/packages/server/src/common/features/model/model-submission-handler.ts b/packages/server/src/common/features/model/model-submission-handler.ts index f3e05ef..fe0555b 100644 --- a/packages/server/src/common/features/model/model-submission-handler.ts +++ b/packages/server/src/common/features/model/model-submission-handler.ts @@ -180,7 +180,7 @@ export class ModelSubmissionHandler { } protected async performLiveValidation(validator: ModelValidator): Promise { - this.actionDispatcher.dispatch(StatusAction.create('Validate Model...')); + this.actionDispatcher.dispatchDirectly(StatusAction.create('Validate Model...')); const markers = await validator.validate([this.modelState.root], MarkersReason.LIVE); return this.actionDispatcher.dispatchAll( SetMarkersAction.create(markers, { reason: MarkersReason.LIVE }), diff --git a/packages/server/src/common/features/model/request-model-action-handler.ts b/packages/server/src/common/features/model/request-model-action-handler.ts index 2b054dd..567b260 100644 --- a/packages/server/src/common/features/model/request-model-action-handler.ts +++ b/packages/server/src/common/features/model/request-model-action-handler.ts @@ -81,12 +81,12 @@ export class RequestModelActionHandler implements ActionHandler { } protected reportModelLoading(message: string): ProgressMonitor | undefined { - this.actionDispatcher.dispatch(StatusAction.create(message, { severity: 'INFO' })); + this.actionDispatcher.dispatchDirectly(StatusAction.create(message, { severity: 'INFO' })); return this.progressService.start(message); } protected reportModelLoadingFinished(monitor?: ProgressMonitor): void { - this.actionDispatcher.dispatch(StatusAction.create('', { severity: 'NONE' })); + this.actionDispatcher.dispatchDirectly(StatusAction.create('', { severity: 'NONE' })); monitor?.end(); } diff --git a/packages/server/src/common/features/progress/progress-service.ts b/packages/server/src/common/features/progress/progress-service.ts index b14f2bc..dffa3b6 100644 --- a/packages/server/src/common/features/progress/progress-service.ts +++ b/packages/server/src/common/features/progress/progress-service.ts @@ -65,10 +65,10 @@ export class DefaultProgressService implements ProgressService { start(title: string, options?: ProgressOptions): ProgressMonitor { const progressId = uuid.v4(); - this.actionDispatcher.dispatch(StartProgressAction.create({ progressId, title, ...options })); + this.actionDispatcher.dispatchDirectly(StartProgressAction.create({ progressId, title, ...options })); return { - update: updateOptions => this.actionDispatcher.dispatch(UpdateProgressAction.create(progressId, updateOptions)), - end: () => this.actionDispatcher.dispatch(EndProgressAction.create(progressId)) + update: updateOptions => this.actionDispatcher.dispatchDirectly(UpdateProgressAction.create(progressId, updateOptions)), + end: () => this.actionDispatcher.dispatchDirectly(EndProgressAction.create(progressId)) }; } } diff --git a/packages/server/src/common/protocol/glsp-server.ts b/packages/server/src/common/protocol/glsp-server.ts index 796551a..a5325a4 100644 --- a/packages/server/src/common/protocol/glsp-server.ts +++ b/packages/server/src/common/protocol/glsp-server.ts @@ -171,7 +171,7 @@ export class DefaultGLSPServer implements GLSPServer { ? await clientSession.actionDispatcher.requestUntil(action, action.timeout, true) : await clientSession.actionDispatcher.request(action); if (response) { - this.sendToClient({ clientId, action: response }); + this.sendResponseToClient(clientId, response); } } catch (error) { const detail = error instanceof GLSPServerError ? error.cause?.toString?.() : error?.toString?.(); @@ -180,10 +180,14 @@ export class DefaultGLSPServer implements GLSPServer { `Failed to handle request '${action.kind}' (${action.requestId})`, { responseId: action.requestId, detail } ); - this.sendToClient({ clientId, action: reject }); + this.sendResponseToClient(clientId, reject); } } + protected sendResponseToClient(clientId: string, response: ResponseAction): void { + this.sendToClient({ clientId, action: response }); + } + protected handleProcessError(message: ActionMessage, reason: any): void | PromiseLike { let errorMsg = `Could not process action: '${message.action.kind}`; this.logger.error(errorMsg); diff --git a/packages/server/src/common/test/mock-util.ts b/packages/server/src/common/test/mock-util.ts index 76914cc..6ecc113 100644 --- a/packages/server/src/common/test/mock-util.ts +++ b/packages/server/src/common/test/mock-util.ts @@ -135,6 +135,10 @@ export class StubActionDispatcher implements ActionDispatcher { return Promise.resolve(); } + dispatchDirectly(action: Action): Promise { + return Promise.resolve(); + } + dispatchAll(...actions: MaybeArray[]): Promise { return Promise.resolve(); } From 0410c3ee19e973df307c506096262a7a712f9c13 Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Sun, 19 Apr 2026 17:47:31 +0200 Subject: [PATCH 3/9] refactor: align ActionDispatcher reentrancy with Java Flow: - External dispatch(): queued to ActionChannel, processed by consumer loop. - Reentrant dispatch() (inside a handler): runs inline via AsyncLocalStorage. - Handler responses / post-update drain: reentrant, dispatched inline in order. - request() response: intercepted at dispatch() entry to avoid deadlock when a handler is awaiting the request. Delta vs Java: - AsyncLocalStorage replaces Thread.currentThread() for reentrancy detection. - ActionChannel + consumer loop replaces BlockingQueue + dedicated thread; no backpressure yet. - interceptPendingResponse() is Node-only; Java has no request/response. - dispose() rejects queued actions and pending requests instead of draining; Disposable.dispose() is synchronous so JoinAction/.join() has no equivalent. --- .../common/actions/action-dispatcher.spec.ts | 145 ++++++++++++++++-- .../src/common/actions/action-dispatcher.ts | 136 ++++++++-------- .../common/actions/client-action-handler.ts | 2 +- .../model/model-submission-handler.ts | 2 +- .../model/request-model-action-handler.ts | 6 +- .../features/progress/progress-service.ts | 8 +- packages/server/src/common/index.ts | 1 + .../server/src/common/protocol/glsp-server.ts | 15 +- packages/server/src/common/test/mock-util.ts | 4 - .../src/common/utils/action-channel.spec.ts | 120 +++++++++++++++ .../server/src/common/utils/action-channel.ts | 101 ++++++++++++ .../server/src/common/utils/promise-queue.ts | 4 + 12 files changed, 453 insertions(+), 91 deletions(-) create mode 100644 packages/server/src/common/utils/action-channel.spec.ts create mode 100644 packages/server/src/common/utils/action-channel.ts diff --git a/packages/server/src/common/actions/action-dispatcher.spec.ts b/packages/server/src/common/actions/action-dispatcher.spec.ts index 8444f70..d8b0411 100644 --- a/packages/server/src/common/actions/action-dispatcher.spec.ts +++ b/packages/server/src/common/actions/action-dispatcher.spec.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -338,9 +338,7 @@ describe('test DefaultActionDispatcher', () => { expect(result.responseId).to.equal('req_deadlock'); return []; }; - registry_get_stub.callsFake((kind: string) => - kind === slowAction ? [slowHandler] : [] - ); + registry_get_stub.callsFake((kind: string) => (kind === slowAction ? [slowHandler] : [])); const dispatchPromise = actionDispatcher.dispatch({ kind: slowAction }); await handlerRunning.promise; @@ -359,9 +357,7 @@ describe('test DefaultActionDispatcher', () => { const response: ResponseAction = { kind: localResponseKind, responseId: '' }; return [response]; }); - registry_get_stub.callsFake((kind: string) => - kind === localRequestKind ? [handler] : [] - ); + registry_get_stub.callsFake((kind: string) => (kind === localRequestKind ? [handler] : [])); const requestAction: RequestAction = { kind: localRequestKind, @@ -488,9 +484,7 @@ describe('test DefaultActionDispatcher', () => { clientActionForwarderStub.handle.callsFake(action => action.kind === 'testRequest'); // Register a no-op handler for testResponse so the late response can be dispatched normally const noopHandler = new mock.StubActionHandler(['testResponse']); - registry_get_stub.callsFake((kind: string) => - kind === 'testResponse' ? [noopHandler] : [] - ); + registry_get_stub.callsFake((kind: string) => (kind === 'testResponse' ? [noopHandler] : [])); // Request times out try { @@ -506,6 +500,100 @@ describe('test DefaultActionDispatcher', () => { expect(lateResponse.responseId).to.equal(''); }); + it('request - resolves when response intercept happens from inside doDispatch', async () => { + // A local handler for the request kind returns the matching response action directly. + // The response is dispatched via dispatchResponses() -> dispatch() and intercepted + // via the reentrant path (not via the external dispatch() entry). + const requestKind = 'inlineRequest'; + const responseKind = 'inlineResponse'; + + const handler = new mock.StubActionHandler([requestKind]); + sinon.stub(handler, 'execute').callsFake(() => [{ kind: responseKind, responseId: '' } as ResponseAction]); + registry_get_stub.callsFake((kind: string) => (kind === requestKind ? [handler] : [])); + + const requestAction: RequestAction = { kind: requestKind, requestId: '' }; + const result = await actionDispatcher.request(requestAction); + + expect(result.responseId).to.equal(requestAction.requestId); + }); + }); + + describe('test reentrancy and ordering', () => { + it('dispatch from within a handler runs inline before the next queued action', async () => { + const outerKind = 'reentrantOuter'; + const innerKind = 'reentrantInner'; + const followerKind = 'reentrantFollower'; + + const events: string[] = []; + + const innerHandler = new mock.StubActionHandler([innerKind]); + sinon.stub(innerHandler, 'execute').callsFake(() => { + events.push('inner'); + return []; + }); + + const outerHandler = new mock.StubActionHandler([outerKind]); + outerHandler.execute = async () => { + events.push('outer-start'); + await actionDispatcher.dispatch({ kind: innerKind }); + events.push('outer-end'); + return []; + }; + + const followerHandler = new mock.StubActionHandler([followerKind]); + sinon.stub(followerHandler, 'execute').callsFake(() => { + events.push('follower'); + return []; + }); + + registry_get_stub.callsFake((kind: string) => { + if (kind === outerKind) return [outerHandler]; + if (kind === innerKind) return [innerHandler]; + if (kind === followerKind) return [followerHandler]; + return []; + }); + + actionDispatcher.dispatch({ kind: outerKind }); + await actionDispatcher.dispatch({ kind: followerKind }); + + expect(events).to.deep.equal(['outer-start', 'inner', 'outer-end', 'follower']); + }); + + it('handler response actions are dispatched in order without an ephemeral queue', async () => { + const requestKind = 'orderedRequest'; + const firstResponse = 'orderedResponse1'; + const secondResponse = 'orderedResponse2'; + const order: string[] = []; + + const requestHandler = new mock.StubActionHandler([requestKind]); + sinon.stub(requestHandler, 'execute').callsFake(() => [{ kind: firstResponse }, { kind: secondResponse }]); + + const firstHandler = new mock.StubActionHandler([firstResponse]); + sinon.stub(firstHandler, 'execute').callsFake(async () => { + await mock.delay(20); + order.push(firstResponse); + return []; + }); + + const secondHandler = new mock.StubActionHandler([secondResponse]); + sinon.stub(secondHandler, 'execute').callsFake(() => { + order.push(secondResponse); + return []; + }); + + registry_get_stub.callsFake((kind: string) => { + if (kind === requestKind) return [requestHandler]; + if (kind === firstResponse) return [firstHandler]; + if (kind === secondResponse) return [secondHandler]; + return []; + }); + + await actionDispatcher.dispatch({ kind: requestKind }); + expect(order).to.deep.equal([firstResponse, secondResponse]); + }); + }); + + describe('test dispose', () => { it('dispose - rejects all pending requests', async () => { const requestAction: RequestAction = { kind: 'testRequest', @@ -517,7 +605,7 @@ describe('test DefaultActionDispatcher', () => { registry_get_stub.callsFake(() => []); const responsePromise = actionDispatcher.request(requestAction); - (actionDispatcher as any).dispose(); + actionDispatcher.dispose(); try { await responsePromise; @@ -527,5 +615,40 @@ describe('test DefaultActionDispatcher', () => { expect((error as Error).message).to.include('req_dispose'); } }); + + it('dispose - rejects queued dispatch() promises instead of orphaning them', async () => { + // Use a fresh dispatcher so the shared one is not affected. + const localDispatcher = container.resolve(DefaultActionDispatcher); + const slowKind = 'slowDispose'; + const queuedKind = 'queuedDispose'; + + const slowHandler = new mock.StubActionHandler([slowKind]); + const slowStarted = new Deferred(); + const releaseSlow = new Deferred(); + slowHandler.execute = async () => { + slowStarted.resolve(); + await releaseSlow.promise; + return []; + }; + + registry_get_stub.callsFake((kind: string) => (kind === slowKind ? [slowHandler] : [])); + + const slowPromise = localDispatcher.dispatch({ kind: slowKind }); + await slowStarted.promise; + const queuedPromise = localDispatcher.dispatch({ kind: queuedKind }); + + localDispatcher.dispose(); + + try { + await queuedPromise; + expect.fail('Queued dispatch should have rejected'); + } catch (error: unknown) { + expect((error as Error).message).to.include('ActionDispatcher disposed'); + } + + // Let the slow handler finish so the local dispatcher's consumer loop can exit cleanly. + releaseSlow.resolve(); + await slowPromise; + }); }); }); diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index c95c2b7..ec53793 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -25,11 +25,12 @@ import { UpdateModelAction, flatPush } from '@eclipse-glsp/protocol'; -import { inject, injectable } from 'inversify'; +import { AsyncLocalStorage } from 'async_hooks'; +import { inject, injectable, postConstruct } from 'inversify'; import { ClientId } from '../di/service-identifiers'; import { GLSPServerError } from '../utils/glsp-server-error'; +import { ActionChannel } from '../utils/action-channel'; import { Logger } from '../utils/logger'; -import { PromiseQueue } from '../utils/promise-queue'; import { ActionHandler } from './action-handler'; import { ActionHandlerRegistry } from './action-handler-registry'; import { ClientActionForwarder } from './client-action-handler'; @@ -43,8 +44,15 @@ export const ActionDispatcher = Symbol('ActionDispatcher'); export interface ActionDispatcher { /** * Processes the given action by dispatching it to all registered handlers. - * Actions are enqueued to preserve sequential ordering. Response actions - * produced by handlers are dispatched directly via {@link dispatchDirectly}. + * + * External callers (e.g. the transport layer or background jobs) are serialized through the + * internal channel. Dispatches that originate from within an action handler are recognized + * via the {@link AsyncLocalStorage} dispatch context and run inline to preserve ordering + * with the containing action. This mirrors the Java dispatcher's `Thread.currentThread()` + * check. + * + * Responses to pending {@link request} calls short-circuit at the entry and resolve the + * corresponding deferred without going through the queue or handlers. * * @param action The action that should be dispatched. * @returns A promise indicating when the action processing is complete. @@ -60,18 +68,6 @@ export interface ActionDispatcher { dispatchAll(actions: Action[]): Promise; dispatchAll(...actions: Action[]): Promise; - /** - * Dispatches an action directly, bypassing the action queue. Use this for actions that - * need to be processed immediately, e.g. progress notifications sent from inside a handler. - * - * Actions dispatched this way are not sequenced with other queued actions. Callers are - * responsible for ensuring that concurrent execution is safe. - * - * @param action The action to dispatch directly. - * @returns A promise indicating when the action processing is complete. - */ - dispatchDirectly(action: Action): Promise; - /** * Processes all given actions, by dispatching them to the corresponding handlers, after the next model update. * The given actions are queued until the next model update cycle has been completed i.e. an @@ -88,9 +84,9 @@ export interface ActionDispatcher { * _not_ passed to the registered action handlers. Instead, it is the responsibility of the * caller of this method to handle the response properly. * - * The request is dispatched directly (bypassing the queue). If its `kind` is registered in - * `ClientActionKinds`, it is forwarded to the client via {@link ClientActionForwarder}. - * If server-side handlers are registered, they are also executed. + * If the request's `kind` is registered in `ClientActionKinds`, it is forwarded to the client + * via {@link ClientActionForwarder}. If server-side handlers are registered, they are also + * executed. * * Only the first matching response resolves the request. Any additional or late responses * are dispatched as normal actions. @@ -139,12 +135,20 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @inject(ClientId) protected clientId: string; - protected actionQueue = new PromiseQueue(); + protected channel = new ActionChannel(); + protected dispatchContext = new AsyncLocalStorage(); + protected consumerLoop: Promise | undefined; + protected postUpdateQueue: Action[] = []; protected readonly pendingRequests = new Map>(); protected readonly timeouts = new Map(); protected nextRequestId = 1; + @postConstruct() + protected initialize(): void { + this.consumerLoop = this.runConsumerLoop(); + } + /** * Generates a unique request ID for this dispatcher. The `clientId` prefix makes IDs * distinguishable across sessions in logs. @@ -154,23 +158,35 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } dispatch(action: Action): Promise { + // Order matters: + // 1. Responses to pending request() calls must short-circuit here; queueing would + // deadlock if the awaiting handler is the action the consumer is currently running. + // 2. Reentrant dispatches from inside a handler run inline via the AsyncLocalStorage + // context. Equivalent to the Java thread-identity check. + // 3. Everything else is external and is queued to the channel. if (this.interceptPendingResponse(action)) { return Promise.resolve(); } - return this.actionQueue.enqueue(() => this.doDispatch(action)); + if (this.dispatchContext.getStore()) { + return this.doDispatch(action); + } + return this.channel.push(action); } - dispatchDirectly(action: Action): Promise { - return this.doDispatch(action); + protected async runConsumerLoop(): Promise { + // Enter the dispatch context for each dequeued action so nested dispatch() calls from + // handlers and their awaited continuations are recognized as reentrant. + for await (const entry of this.channel.consume()) { + try { + await this.dispatchContext.run(true, () => this.doDispatch(entry.item)); + entry.resolve(); + } catch (error) { + entry.reject(error); + } + } } protected async doDispatch(action: Action): Promise { - // Intercept responses to pending requests produced by local handlers - // (via dispatchResponses). This enables server→server requests. - if (this.interceptPendingResponse(action)) { - return; - } - this.logger.debug('Dispatch action:', action.kind); const handledOnClient = this.clientActionForwarder.handle(action); @@ -185,7 +201,8 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { responses.push(...response); } - // Append post-update actions to responses (preserves original dispatch ordering) + // Append post-update actions to responses so they are dispatched in the same inline + // batch as the handler responses, preserving sequential order. responses.push(...this.drainPostUpdateQueue(action)); await this.dispatchResponses(responses); @@ -196,17 +213,12 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return responseActions.map(action => respond(request, action)); } - /** - * Dispatches response actions produced by handlers. Responses are dispatched directly - * (bypassing the queue) but sequenced relative to each other via an internal response queue. - */ - protected dispatchResponses(actions: Action[]): Promise { - if (actions.length === 0) { - return Promise.resolve(); + protected async dispatchResponses(actions: Action[]): Promise { + // Sequential dispatch inside the current dispatch context. Each response goes inline via + // the reentrant path, or is intercepted if it resolves a pending request(). + for (const action of actions) { + await this.dispatch(action); } - const responseQueue = new PromiseQueue(); - const responses = actions.map(action => responseQueue.enqueue(() => this.dispatchDirectly(action))); - return Promise.all(responses).then(() => Promise.resolve()); } dispatchAll(...actions: MaybeArray[]): Promise { @@ -271,10 +283,10 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { this.timeouts.set(action.requestId, timeout); } - // Always dispatch directly to avoid deadlock when request() is called - // from inside a queued handler. The response is intercepted before - // normal dispatch, so queue ordering is not affected. - const dispatchPromise = this.dispatchDirectly(action); + // dispatch() routes correctly on its own: external callers queue, handler-internal + // callers run inline via the AsyncLocalStorage context. The matching response resolves + // the deferred out-of-band via interceptPendingResponse(). + const dispatchPromise = this.dispatch(action); dispatchPromise.catch(error => { if (this.pendingRequests.delete(action.requestId)) { @@ -292,8 +304,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { /** * If the given action is a {@link SetModelAction} or {@link UpdateModelAction} and there are actions - * queued via {@link dispatchAfterNextUpdate}, drain and return them. Used in both the normal - * `doDispatch()` path and the `interceptPendingResponse()` short-circuit path. + * queued via {@link dispatchAfterNextUpdate}, drain and return them. * * @returns The drained actions, or an empty array if nothing to drain. */ @@ -315,15 +326,13 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { * timeout entry (timed-out request) and clears the `responseId` so the action is not forwarded * by {@link ClientActionForwarder}. If no stale entry exists, the `responseId` is left intact * for normal forwarding. - * - * Called from `dispatch()` (for responses arriving from the client, before the queue) and - * from `doDispatch()` (for responses produced by local handlers via `dispatchDirectly`). */ protected interceptPendingResponse(action: Action): boolean { if (!ResponseAction.hasValidResponseId(action)) { return false; } - // Resolve pending request + // Node-only: responses to server-initiated requests are resolved here instead of going + // through action handlers. No Java equivalent. const deferred = this.pendingRequests.get(action.responseId); if (deferred !== undefined) { this.pendingRequests.delete(action.responseId); @@ -332,9 +341,9 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { clearTimeout(timeout); this.timeouts.delete(action.responseId); } - // Drain post-update actions. A RejectAction won't trigger a drain, - // so pending post-update actions remain queued until the next - // successful model update. + // Intercepted responses skip doDispatch, so drain post-update actions here when the + // response is an UpdateModel/SetModel. RejectAction does not trigger a drain; pending + // post-update actions stay queued until the next successful update. const postUpdateActions = this.drainPostUpdateQueue(action); if (RejectAction.is(action)) { deferred.reject(new Error(`${action.message}${action.detail ? ': ' + action.detail : ''}`)); @@ -346,8 +355,8 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } return true; } - // No matching pending request — check for stale timeout (timed-out request). - // Only clear responseId if this response was tracked by request(). + // Late response for a timed-out request: clear responseId so ClientActionForwarder does + // not re-emit it to the client. const staleTimeout = this.timeouts.get(action.responseId); if (staleTimeout !== undefined) { clearTimeout(staleTimeout); @@ -358,14 +367,21 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return false; } + /** + * Shuts down the dispatcher. Stops the channel, rejects queued actions and pending request() + * deferreds, and clears timers and the post-update queue. The action currently being + * processed runs to completion. + */ dispose(): void { - this.actionQueue.clear(); - this.pendingRequests.forEach((deferred, id) => - deferred.reject(new Error(`Request '${id}' cancelled: dispatcher disposed`)) - ); + // No graceful drain: Disposable.dispose() is synchronous so we cannot await the consumer + // loop. Java's JoinAction/.join() pattern has no equivalent. + this.channel.rejectPending(new Error('ActionDispatcher disposed')); + this.channel.stop(); + this.pendingRequests.forEach((deferred, id) => deferred.reject(new Error(`Request '${id}' cancelled: dispatcher disposed`))); this.pendingRequests.clear(); this.timeouts.forEach(timeout => clearTimeout(timeout)); this.timeouts.clear(); + this.postUpdateQueue = []; } } diff --git a/packages/server/src/common/actions/client-action-handler.ts b/packages/server/src/common/actions/client-action-handler.ts index 13942c1..36c26ed 100644 --- a/packages/server/src/common/actions/client-action-handler.ts +++ b/packages/server/src/common/actions/client-action-handler.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at diff --git a/packages/server/src/common/features/model/model-submission-handler.ts b/packages/server/src/common/features/model/model-submission-handler.ts index fe0555b..f3e05ef 100644 --- a/packages/server/src/common/features/model/model-submission-handler.ts +++ b/packages/server/src/common/features/model/model-submission-handler.ts @@ -180,7 +180,7 @@ export class ModelSubmissionHandler { } protected async performLiveValidation(validator: ModelValidator): Promise { - this.actionDispatcher.dispatchDirectly(StatusAction.create('Validate Model...')); + this.actionDispatcher.dispatch(StatusAction.create('Validate Model...')); const markers = await validator.validate([this.modelState.root], MarkersReason.LIVE); return this.actionDispatcher.dispatchAll( SetMarkersAction.create(markers, { reason: MarkersReason.LIVE }), diff --git a/packages/server/src/common/features/model/request-model-action-handler.ts b/packages/server/src/common/features/model/request-model-action-handler.ts index 567b260..0b71e21 100644 --- a/packages/server/src/common/features/model/request-model-action-handler.ts +++ b/packages/server/src/common/features/model/request-model-action-handler.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2025 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -81,12 +81,12 @@ export class RequestModelActionHandler implements ActionHandler { } protected reportModelLoading(message: string): ProgressMonitor | undefined { - this.actionDispatcher.dispatchDirectly(StatusAction.create(message, { severity: 'INFO' })); + this.actionDispatcher.dispatch(StatusAction.create(message, { severity: 'INFO' })); return this.progressService.start(message); } protected reportModelLoadingFinished(monitor?: ProgressMonitor): void { - this.actionDispatcher.dispatchDirectly(StatusAction.create('', { severity: 'NONE' })); + this.actionDispatcher.dispatch(StatusAction.create('', { severity: 'NONE' })); monitor?.end(); } diff --git a/packages/server/src/common/features/progress/progress-service.ts b/packages/server/src/common/features/progress/progress-service.ts index dffa3b6..a7f58a1 100644 --- a/packages/server/src/common/features/progress/progress-service.ts +++ b/packages/server/src/common/features/progress/progress-service.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2023 EclipseSource and others. + * Copyright (c) 2023-2026 EclipseSource and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -65,10 +65,10 @@ export class DefaultProgressService implements ProgressService { start(title: string, options?: ProgressOptions): ProgressMonitor { const progressId = uuid.v4(); - this.actionDispatcher.dispatchDirectly(StartProgressAction.create({ progressId, title, ...options })); + this.actionDispatcher.dispatch(StartProgressAction.create({ progressId, title, ...options })); return { - update: updateOptions => this.actionDispatcher.dispatchDirectly(UpdateProgressAction.create(progressId, updateOptions)), - end: () => this.actionDispatcher.dispatchDirectly(EndProgressAction.create(progressId)) + update: updateOptions => this.actionDispatcher.dispatch(UpdateProgressAction.create(progressId, updateOptions)), + end: () => this.actionDispatcher.dispatch(EndProgressAction.create(progressId)) }; } } diff --git a/packages/server/src/common/index.ts b/packages/server/src/common/index.ts index da86f5a..ae6fe43 100644 --- a/packages/server/src/common/index.ts +++ b/packages/server/src/common/index.ts @@ -95,6 +95,7 @@ export * from './session/client-session-factory'; export * from './session/client-session-initializer'; export * from './session/client-session-listener'; export * from './session/client-session-manager'; +export * from './utils/action-channel'; export * from './utils/args-util'; export * from './utils/client-options-util'; export * from './utils/console-logger'; diff --git a/packages/server/src/common/protocol/glsp-server.ts b/packages/server/src/common/protocol/glsp-server.ts index a5325a4..76a4cba 100644 --- a/packages/server/src/common/protocol/glsp-server.ts +++ b/packages/server/src/common/protocol/glsp-server.ts @@ -167,19 +167,20 @@ export class DefaultGLSPServer implements GLSPServer { clientId: string ): Promise { try { - const response = action.timeout !== undefined - ? await clientSession.actionDispatcher.requestUntil(action, action.timeout, true) - : await clientSession.actionDispatcher.request(action); + const response = + action.timeout !== undefined + ? await clientSession.actionDispatcher.requestUntil(action, action.timeout, true) + : await clientSession.actionDispatcher.request(action); if (response) { this.sendResponseToClient(clientId, response); } } catch (error) { const detail = error instanceof GLSPServerError ? error.cause?.toString?.() : error?.toString?.(); this.logger.error(`Failed to handle request '${action.kind}' (${action.requestId}):`, detail); - const reject = RejectAction.create( - `Failed to handle request '${action.kind}' (${action.requestId})`, - { responseId: action.requestId, detail } - ); + const reject = RejectAction.create(`Failed to handle request '${action.kind}' (${action.requestId})`, { + responseId: action.requestId, + detail + }); this.sendResponseToClient(clientId, reject); } } diff --git a/packages/server/src/common/test/mock-util.ts b/packages/server/src/common/test/mock-util.ts index 6ecc113..76914cc 100644 --- a/packages/server/src/common/test/mock-util.ts +++ b/packages/server/src/common/test/mock-util.ts @@ -135,10 +135,6 @@ export class StubActionDispatcher implements ActionDispatcher { return Promise.resolve(); } - dispatchDirectly(action: Action): Promise { - return Promise.resolve(); - } - dispatchAll(...actions: MaybeArray[]): Promise { return Promise.resolve(); } diff --git a/packages/server/src/common/utils/action-channel.spec.ts b/packages/server/src/common/utils/action-channel.spec.ts new file mode 100644 index 0000000..08ad664 --- /dev/null +++ b/packages/server/src/common/utils/action-channel.spec.ts @@ -0,0 +1,120 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { expect } from 'chai'; +import { expectToThrowAsync } from '../test/mock-util'; +import { ActionChannel } from './action-channel'; + +describe('ActionChannel', () => { + it('yields pushed items in FIFO order', async () => { + const channel = new ActionChannel(); + const consumed: number[] = []; + + const consumer = (async (): Promise => { + for await (const entry of channel.consume()) { + consumed.push(entry.item); + entry.resolve(); + } + })(); + + await Promise.all([channel.push(1), channel.push(2), channel.push(3)]); + channel.stop(); + await consumer; + + expect(consumed).to.deep.equal([1, 2, 3]); + }); + + it('resolves the push promise once the consumer resolves the entry', async () => { + const channel = new ActionChannel(); + let entryResolver: (() => void) | undefined; + + const consumer = (async (): Promise => { + for await (const entry of channel.consume()) { + entryResolver = entry.resolve; + return; + } + })(); + + const pushed = channel.push('a'); + // Give the consumer a turn to pick up the entry. + await Promise.resolve(); + await consumer; + expect(entryResolver).to.exist; + entryResolver!(); + await pushed; + }); + + it('propagates reject() from the consumer back to the pushing caller', async () => { + const channel = new ActionChannel(); + + const consumer = (async (): Promise => { + for await (const entry of channel.consume()) { + entry.reject(new Error('boom')); + return; + } + })(); + + const pushed = channel.push(1); + await consumer; + await expectToThrowAsync(() => pushed, 'boom'); + }); + + it('rejects push() after stop()', async () => { + const channel = new ActionChannel(); + channel.stop(); + await expectToThrowAsync(() => channel.push(1), 'ActionChannel is stopped'); + }); + + it('consumer exits after stop() and drain', async () => { + const channel = new ActionChannel(); + const consumed: number[] = []; + + const consumer = (async (): Promise => { + for await (const entry of channel.consume()) { + consumed.push(entry.item); + entry.resolve(); + } + })(); + + await channel.push(1); + await channel.push(2); + channel.stop(); + await consumer; + + expect(consumed).to.deep.equal([1, 2]); + expect(channel.isStopped).to.be.true; + }); + + it('rejectPending() rejects all queued push() promises without stopping', async () => { + const channel = new ActionChannel(); + const pushes = [channel.push(1), channel.push(2)]; + expect(channel.size).to.equal(2); + + channel.rejectPending(new Error('cleared')); + + await expectToThrowAsync(() => pushes[0], 'cleared'); + await expectToThrowAsync(() => pushes[1], 'cleared'); + expect(channel.size).to.equal(0); + expect(channel.isStopped).to.be.false; + }); + + it('size reflects the number of unconsumed entries', async () => { + const channel = new ActionChannel(); + channel.push(1); + channel.push(2); + channel.push(3); + expect(channel.size).to.equal(3); + }); +}); diff --git a/packages/server/src/common/utils/action-channel.ts b/packages/server/src/common/utils/action-channel.ts new file mode 100644 index 0000000..5cc498b --- /dev/null +++ b/packages/server/src/common/utils/action-channel.ts @@ -0,0 +1,101 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +/** + * An entry yielded by {@link ActionChannel.consume}. The consumer must call either + * `resolve()` or `reject(error)` exactly once after processing `item`. + */ +export interface ActionChannelEntry { + item: T; + resolve: () => void; + reject: (error: unknown) => void; +} + +/** + * Producer/consumer channel with a single async consumer loop. Mirrors the Java + * dispatcher's `BlockingQueue` + consumer thread architecture. + * + * Items pushed via {@link push} are yielded by {@link consume} in FIFO order. + * The promise returned by `push()` resolves or rejects when the consumer finishes + * processing the item (so producers can propagate errors back to callers). + */ +export class ActionChannel { + protected queue: ActionChannelEntry[] = []; + protected notify: (() => void) | undefined; + protected stopped = false; + + /** + * Enqueues an item and returns a promise that settles when the consumer processes it. + * Rejects immediately if the channel has been stopped. + */ + push(item: T): Promise { + if (this.stopped) { + return Promise.reject(new Error('ActionChannel is stopped')); + } + return new Promise((resolve, reject) => { + this.queue.push({ item, resolve, reject }); + this.notify?.(); + }); + } + + /** + * Yields pending entries in FIFO order, blocking on a notification promise when empty. + * Exits once the channel is stopped and the queue has been drained. + */ + async *consume(): AsyncGenerator> { + while (!this.stopped || this.queue.length > 0) { + while (this.queue.length > 0) { + yield this.queue.shift()!; + } + if (this.stopped) { + return; + } + await new Promise(resolve => { + this.notify = resolve; + }); + this.notify = undefined; + } + } + + /** + * Stops the channel. Further {@link push} calls reject. The consumer loop exits after + * the remaining queued entries have been yielded (or immediately if the queue is empty). + */ + stop(): void { + this.stopped = true; + this.notify?.(); + } + + /** + * Rejects all queued entries with the given reason so producers awaiting their + * `push()` promises do not hang. Does not stop the channel. + */ + rejectPending(reason: Error = new Error('ActionChannel cleared')): void { + const pending = this.queue; + this.queue = []; + for (const entry of pending) { + entry.reject(reason); + } + } + + get size(): number { + return this.queue.length; + } + + get isStopped(): boolean { + return this.stopped; + } +} diff --git a/packages/server/src/common/utils/promise-queue.ts b/packages/server/src/common/utils/promise-queue.ts index ae256a1..83bd02e 100644 --- a/packages/server/src/common/utils/promise-queue.ts +++ b/packages/server/src/common/utils/promise-queue.ts @@ -29,6 +29,10 @@ export interface PromiseQueueElement { * of promises. Promises that are put in this queue are processed one by one. * i.e. After the first promise in the queue is resolved, it will be removed from the queue and the resolving of the * the next promise (if present) will start. The queue can only resolve one promise at a given time. + * + * @deprecated Since 2.7. The `DefaultActionDispatcher` no longer uses this queue. Kept for + * backwards compatibility; will be removed in a future release. New code should use + * {@link ActionChannel} or native async patterns instead. */ export class PromiseQueue { protected queue: PromiseQueueElement[] = []; From 19868025798065b20fdf4d86b97817dd2a522f89 Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Fri, 24 Apr 2026 10:26:32 +0200 Subject: [PATCH 4/9] refactor: address ActionDispatcher review feedback - Trim dispatch() interface JSDoc; move reentrancy/queue prose to class doc - Drop unused consumerLoop field; start loop fire-and-forget - Drop verbose comments on generateRequestId and dispose - Replace dispatch() order-matters block with per-branch one-liners - Simplify ActionChannel JSDoc; drop Java reference - Guard ActionChannel against multiple consumers (throw + JSDoc) --- .../src/common/actions/action-dispatcher.ts | 42 +++++------------ packages/server/src/common/index.ts | 2 +- .../src/common/utils/action-channel.spec.ts | 13 ++++++ .../server/src/common/utils/action-channel.ts | 46 +++++++++++-------- .../server/src/common/utils/promise-queue.ts | 2 +- 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index ec53793..336a399 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -44,15 +44,8 @@ export const ActionDispatcher = Symbol('ActionDispatcher'); export interface ActionDispatcher { /** * Processes the given action by dispatching it to all registered handlers. - * - * External callers (e.g. the transport layer or background jobs) are serialized through the - * internal channel. Dispatches that originate from within an action handler are recognized - * via the {@link AsyncLocalStorage} dispatch context and run inline to preserve ordering - * with the containing action. This mirrors the Java dispatcher's `Thread.currentThread()` - * check. - * - * Responses to pending {@link request} calls short-circuit at the entry and resolve the - * corresponding deferred without going through the queue or handlers. + * Responses matching a pending {@link request} short-circuit and resolve that + * request without being passed to handlers. * * @param action The action that should be dispatched. * @returns A promise indicating when the action processing is complete. @@ -121,6 +114,10 @@ export interface ActionDispatcher { ): Promise; } +/** + * Default {@link ActionDispatcher}. External dispatches are queued and processed one at a + * time; dispatches made from within a running handler run inline with the containing action. + */ @injectable() export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @inject(ActionHandlerRegistry) @@ -137,7 +134,6 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { protected channel = new ActionChannel(); protected dispatchContext = new AsyncLocalStorage(); - protected consumerLoop: Promise | undefined; protected postUpdateQueue: Action[] = []; protected readonly pendingRequests = new Map>(); @@ -146,36 +142,28 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @postConstruct() protected initialize(): void { - this.consumerLoop = this.runConsumerLoop(); + this.runConsumerLoop(); } - /** - * Generates a unique request ID for this dispatcher. The `clientId` prefix makes IDs - * distinguishable across sessions in logs. - */ protected generateRequestId(): string { return `server_${this.clientId}_${this.nextRequestId++}`; } dispatch(action: Action): Promise { - // Order matters: - // 1. Responses to pending request() calls must short-circuit here; queueing would - // deadlock if the awaiting handler is the action the consumer is currently running. - // 2. Reentrant dispatches from inside a handler run inline via the AsyncLocalStorage - // context. Equivalent to the Java thread-identity check. - // 3. Everything else is external and is queued to the channel. + // Intercept first to avoid deadlock: a handler may be awaiting this response. if (this.interceptPendingResponse(action)) { return Promise.resolve(); } + // Reentrant dispatches run inline to preserve ordering with the containing action. if (this.dispatchContext.getStore()) { return this.doDispatch(action); } + // External dispatches are queued and processed sequentially. return this.channel.push(action); } protected async runConsumerLoop(): Promise { - // Enter the dispatch context for each dequeued action so nested dispatch() calls from - // handlers and their awaited continuations are recognized as reentrant. + // Run each action inside the dispatch context so reentrant dispatch() calls are recognized. for await (const entry of this.channel.consume()) { try { await this.dispatchContext.run(true, () => this.doDispatch(entry.item)); @@ -367,14 +355,8 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return false; } - /** - * Shuts down the dispatcher. Stops the channel, rejects queued actions and pending request() - * deferreds, and clears timers and the post-update queue. The action currently being - * processed runs to completion. - */ dispose(): void { - // No graceful drain: Disposable.dispose() is synchronous so we cannot await the consumer - // loop. Java's JoinAction/.join() pattern has no equivalent. + // Reject queued actions: no further processing should happen after dispose. this.channel.rejectPending(new Error('ActionDispatcher disposed')); this.channel.stop(); this.pendingRequests.forEach((deferred, id) => deferred.reject(new Error(`Request '${id}' cancelled: dispatcher disposed`))); diff --git a/packages/server/src/common/index.ts b/packages/server/src/common/index.ts index ae6fe43..265e5f5 100644 --- a/packages/server/src/common/index.ts +++ b/packages/server/src/common/index.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2024 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at diff --git a/packages/server/src/common/utils/action-channel.spec.ts b/packages/server/src/common/utils/action-channel.spec.ts index 08ad664..0d92bc4 100644 --- a/packages/server/src/common/utils/action-channel.spec.ts +++ b/packages/server/src/common/utils/action-channel.spec.ts @@ -117,4 +117,17 @@ describe('ActionChannel', () => { channel.push(3); expect(channel.size).to.equal(3); }); + + it('throws when a second consumer is started', async () => { + const channel = new ActionChannel(); + const first = channel.consume(); + // Kick off the first consumer so it registers as the active consumer. + const firstStep = first.next(); + + const second = channel.consume(); + await expectToThrowAsync(() => second.next().then(() => undefined), 'ActionChannel supports only a single consumer'); + + channel.stop(); + await firstStep; + }); }); diff --git a/packages/server/src/common/utils/action-channel.ts b/packages/server/src/common/utils/action-channel.ts index 5cc498b..28d2736 100644 --- a/packages/server/src/common/utils/action-channel.ts +++ b/packages/server/src/common/utils/action-channel.ts @@ -25,21 +25,17 @@ export interface ActionChannelEntry { } /** - * Producer/consumer channel with a single async consumer loop. Mirrors the Java - * dispatcher's `BlockingQueue` + consumer thread architecture. - * - * Items pushed via {@link push} are yielded by {@link consume} in FIFO order. - * The promise returned by `push()` resolves or rejects when the consumer finishes - * processing the item (so producers can propagate errors back to callers). + * Producer/consumer channel with a single async consumer loop. Items are processed in FIFO order. */ export class ActionChannel { protected queue: ActionChannelEntry[] = []; protected notify: (() => void) | undefined; protected stopped = false; + protected consuming = false; /** - * Enqueues an item and returns a promise that settles when the consumer processes it. - * Rejects immediately if the channel has been stopped. + * Enqueues an item. The returned promise settles when the consumer finishes processing it, + * propagating results back to the producer. Rejects immediately if the channel has been stopped. */ push(item: T): Promise { if (this.stopped) { @@ -52,21 +48,31 @@ export class ActionChannel { } /** - * Yields pending entries in FIFO order, blocking on a notification promise when empty. - * Exits once the channel is stopped and the queue has been drained. + * Yields pending entries, suspending until the next {@link push} when the queue is empty. Exits + * once the channel is stopped and the queue has been drained. + * + * Single-consumer: calling `consume()` a second time throws an error. */ async *consume(): AsyncGenerator> { - while (!this.stopped || this.queue.length > 0) { - while (this.queue.length > 0) { - yield this.queue.shift()!; - } - if (this.stopped) { - return; + if (this.consuming) { + throw new Error('ActionChannel supports only a single consumer'); + } + this.consuming = true; + try { + while (!this.stopped || this.queue.length > 0) { + while (this.queue.length > 0) { + yield this.queue.shift()!; + } + if (this.stopped) { + return; + } + await new Promise(resolve => { + this.notify = resolve; + }); + this.notify = undefined; } - await new Promise(resolve => { - this.notify = resolve; - }); - this.notify = undefined; + } finally { + this.consuming = false; } } diff --git a/packages/server/src/common/utils/promise-queue.ts b/packages/server/src/common/utils/promise-queue.ts index 83bd02e..b2cc064 100644 --- a/packages/server/src/common/utils/promise-queue.ts +++ b/packages/server/src/common/utils/promise-queue.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at From 0af3f6212d98111aa1a28274d63214c46e6c410e Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Fri, 24 Apr 2026 12:09:22 +0200 Subject: [PATCH 5/9] refactor: decouple AsyncLocalStorage via DI - Introduce ActionDispatchContext interface + DI token in common - Dispatcher injects the context instance directly (toDynamicValue) - Node app-module binds native AsyncLocalStorage from async_hooks - Browser app-module binds AsyncLocalStorage from als-browser polyfill - Keeps the common package runtime-neutral for browser bundling --- packages/server/package.json | 1 + packages/server/src/browser/di/app-module.ts | 7 +++++-- .../src/common/actions/action-dispatcher.ts | 9 +++++---- .../src/common/di/service-identifiers.ts | 13 ++++++++++++- .../src/common/utils/promise-queue.spec.ts | 8 ++++++-- .../actions/action-dispatcher.spec.ts | 18 ++++++++++-------- packages/server/src/node/di/app-module.ts | 6 ++++-- yarn.lock | 5 +++++ 8 files changed, 48 insertions(+), 19 deletions(-) rename packages/server/src/{common => node}/actions/action-dispatcher.spec.ts (97%) diff --git a/packages/server/package.json b/packages/server/package.json index 9b156c4..ffa09bf 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -61,6 +61,7 @@ "@eclipse-glsp/graph": "2.7.0-next", "@eclipse-glsp/protocol": "next", "@types/uuid": "8.3.1", + "als-browser": "^1.0.1", "commander": "^8.3.0", "fast-json-patch": "^3.1.0", "lodash": "4.17.21", diff --git a/packages/server/src/browser/di/app-module.ts b/packages/server/src/browser/di/app-module.ts index 004e77b..b7dfeee 100644 --- a/packages/server/src/browser/di/app-module.ts +++ b/packages/server/src/browser/di/app-module.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 EclipseSource and others. + * Copyright (c) 2022-2026 EclipseSource and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -14,13 +14,16 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ +// Side-effect import: patches Promise, timers, XHR, observers on the current realm to preserve async context across awaits. +import { AsyncLocalStorage } from 'als-browser'; import { ContainerModule } from 'inversify'; -import { InjectionContainer, LogLevel, LoggerConfigOptions, configureConsoleLogger } from '../../common/'; +import { ActionDispatchContext, InjectionContainer, LogLevel, LoggerConfigOptions, configureConsoleLogger } from '../../common/'; export function createAppModule(options: LoggerConfigOptions = {}): ContainerModule { const resolvedOptions: LoggerConfigOptions = { consoleLog: true, logLevel: LogLevel.info, ...options }; return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); + bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); const context = { bind, unbind, isBound, rebind }; configureConsoleLogger(context, resolvedOptions); }); diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index 336a399..20bf2ce 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -25,11 +25,10 @@ import { UpdateModelAction, flatPush } from '@eclipse-glsp/protocol'; -import { AsyncLocalStorage } from 'async_hooks'; import { inject, injectable, postConstruct } from 'inversify'; -import { ClientId } from '../di/service-identifiers'; -import { GLSPServerError } from '../utils/glsp-server-error'; +import { ActionDispatchContext, ClientId } from '../di/service-identifiers'; import { ActionChannel } from '../utils/action-channel'; +import { GLSPServerError } from '../utils/glsp-server-error'; import { Logger } from '../utils/logger'; import { ActionHandler } from './action-handler'; import { ActionHandlerRegistry } from './action-handler-registry'; @@ -132,8 +131,10 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @inject(ClientId) protected clientId: string; + @inject(ActionDispatchContext) + protected dispatchContext: ActionDispatchContext; + protected channel = new ActionChannel(); - protected dispatchContext = new AsyncLocalStorage(); protected postUpdateQueue: Action[] = []; protected readonly pendingRequests = new Map>(); diff --git a/packages/server/src/common/di/service-identifiers.ts b/packages/server/src/common/di/service-identifiers.ts index 1a5872f..a7f5abe 100644 --- a/packages/server/src/common/di/service-identifiers.ts +++ b/packages/server/src/common/di/service-identifiers.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -37,3 +37,14 @@ export const NavigationTargetProviders = Symbol('NavigationTargetProviders'); export type ValidateLabelEditAdapterFactory = (validator: LabelEditValidator) => ValidateLabelEditAdapter; export const Operations = Symbol('Operations'); + +/** + * Scope marker that lets the {@link ActionDispatcher} know whether a call to `dispatch()` + * originates from inside a running handler (reentrant) or from outside (external). + */ +export interface ActionDispatchContext { + run(store: boolean, callback: () => R): R; + getStore(): boolean | undefined; +} + +export const ActionDispatchContext = Symbol('ActionDispatchContext'); diff --git a/packages/server/src/common/utils/promise-queue.spec.ts b/packages/server/src/common/utils/promise-queue.spec.ts index 9c2e1aa..ef195fc 100644 --- a/packages/server/src/common/utils/promise-queue.spec.ts +++ b/packages/server/src/common/utils/promise-queue.spec.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2023 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -14,8 +14,10 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ import { delay } from '../test/mock-util'; -import { PromiseQueue } from './promise-queue'; + import { expect } from 'chai'; +// eslint-disable-next-line import-x/no-deprecated +import { PromiseQueue } from './promise-queue'; // Helper types and functions that are needed for test setup @@ -74,11 +76,13 @@ function newTestPromise(resolveTime: number): TestPromise { return { state, promise }; } +// eslint-disable-next-line @typescript-eslint/no-deprecated, import-x/no-deprecated let queue = new PromiseQueue(); // Test execution describe('test PromiseQueue', () => { beforeEach(() => { + // eslint-disable-next-line import-x/no-deprecated, @typescript-eslint/no-deprecated queue = new PromiseQueue(); }); it('enqueue - one element', async () => { diff --git a/packages/server/src/common/actions/action-dispatcher.spec.ts b/packages/server/src/node/actions/action-dispatcher.spec.ts similarity index 97% rename from packages/server/src/common/actions/action-dispatcher.spec.ts rename to packages/server/src/node/actions/action-dispatcher.spec.ts index d8b0411..4c9bd21 100644 --- a/packages/server/src/common/actions/action-dispatcher.spec.ts +++ b/packages/server/src/node/actions/action-dispatcher.spec.ts @@ -14,17 +14,18 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ import { Action, Deferred, RequestAction, ResponseAction, UpdateModelAction } from '@eclipse-glsp/protocol'; +import { AsyncLocalStorage } from 'async_hooks'; import { expect } from 'chai'; import { Container, ContainerModule } from 'inversify'; import * as sinon from 'sinon'; -import { ClientActionKinds, ClientId } from '../di/service-identifiers'; -import { ClientSessionManager } from '../session/client-session-manager'; -import * as mock from '../test/mock-util'; -import { Logger } from '../utils/logger'; -import { DefaultActionDispatcher } from './action-dispatcher'; -import { ActionHandler } from './action-handler'; -import { ActionHandlerRegistry } from './action-handler-registry'; -import { ClientActionForwarder } from './client-action-handler'; +import { DefaultActionDispatcher } from '../../common/actions/action-dispatcher'; +import { ActionHandler } from '../../common/actions/action-handler'; +import { ActionHandlerRegistry } from '../../common/actions/action-handler-registry'; +import { ClientActionForwarder } from '../../common/actions/client-action-handler'; +import { ActionDispatchContext, ClientActionKinds, ClientId } from '../../common/di/service-identifiers'; +import { ClientSessionManager } from '../../common/session/client-session-manager'; +import * as mock from '../../common/test/mock-util'; +import { Logger } from '../../common/utils/logger'; function waitSync(timeInMillis: number): void { const start = Date.now(); @@ -51,6 +52,7 @@ describe('test DefaultActionDispatcher', () => { bind(ActionHandlerRegistry).toConstantValue(actionHandlerRegistry); bind(ClientActionKinds).toConstantValue(new Set(['response', 'response1', 'response2'])); bind(ClientActionForwarder).toConstantValue(clientActionForwarderStub); + bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); }) ); const actionDispatcher = container.resolve(DefaultActionDispatcher); diff --git a/packages/server/src/node/di/app-module.ts b/packages/server/src/node/di/app-module.ts index 06c1a4d..2b1b508 100644 --- a/packages/server/src/node/di/app-module.ts +++ b/packages/server/src/node/di/app-module.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2025 STMicroelectronics and others. + * Copyright (c) 2022-2026 STMicroelectronics and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -14,15 +14,17 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ import { BindingContext } from '@eclipse-glsp/protocol/lib/di'; +import { AsyncLocalStorage } from 'async_hooks'; import { ContainerModule } from 'inversify'; import * as winston from 'winston'; -import { InjectionContainer, LogLevel, Logger, LoggerFactory, NullLogger, getRequestParentName } from '../../common'; +import { ActionDispatchContext, InjectionContainer, LogLevel, Logger, LoggerFactory, NullLogger, getRequestParentName } from '../../common'; import { LaunchOptions } from '../launch/cli-parser'; import { WinstonLogger } from './winston-logger'; export function createAppModule(options: LaunchOptions): ContainerModule { return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); + bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); const context = { bind, unbind, isBound, rebind }; configureWinstonLogger(context, options); }); diff --git a/yarn.lock b/yarn.lock index ac1b054..4b52522 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1982,6 +1982,11 @@ ajv@^6.12.4, ajv@^6.12.5: json-schema-traverse "^0.4.1" uri-js "^4.2.2" +als-browser@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/als-browser/-/als-browser-1.0.1.tgz#ddd9c2ac8ad2817e7d55f0d470b76aaa70f3d521" + integrity sha512-DjavKf6zf4DFPdEmgsEM474MBjFcZG/1amv2/+WHGf61kVQWqf7XEn4jvpjFS4ssQbh/pkmYThaPfQK1ERC+3g== + ansi-colors@4.1.1: version "4.1.1" resolved "https://registry.yarnpkg.com/ansi-colors/-/ansi-colors-4.1.1.tgz#cbb9ae256bf750af1eab344f229aa27fe94ba348" From 62faece4fed788da451fe7ec4b4abfc990b4dcc3 Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Fri, 24 Apr 2026 17:58:16 +0200 Subject: [PATCH 6/9] Update next dependencies --- yarn.lock | 106 +++++++++++++++++++++++++++--------------------------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/yarn.lock b/yarn.lock index 4b52522..e4469ad 100644 --- a/yarn.lock +++ b/yarn.lock @@ -223,18 +223,18 @@ resolved "https://registry.yarnpkg.com/@discoveryjs/json-ext/-/json-ext-0.5.7.tgz#1d572bfbbe14b7704e0ba0f39b74815b84870d70" integrity sha512-dBVuXR082gk3jsFp7Rd/JI4kytwGHecnCoTtXFb7DB6CNHp4rg5k1bhg0nWdLGLnOV71lmDzGQaLMy8iPLY0pw== -"@eclipse-glsp/cli@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/cli/-/cli-2.7.0-next.10.tgz#75cf853feb77396b534495fd50f31b26b271db0d" - integrity sha512-/r8TGvp8jE0Tdzp4Sl7QJxyGL8OmaCBOcwl6V53F6xnay9WmO/U6LNDDc4RLDXLXzNiUEFy4J2V5+nPcdTa2jg== - -"@eclipse-glsp/config-test@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/config-test/-/config-test-2.7.0-next.10.tgz#aea5b8d3c86027c76b9ee06c42dfbb71746dc22c" - integrity sha512-xhgO3MnDJ5d/xhicXxIQgC6Q5EMXg7pqZfZNdhrhMTmLgx4iR292WAMGNAA8qSVETxddNTaw7zOb8gFoOYXeRA== - dependencies: - "@eclipse-glsp/mocha-config" "2.7.0-next.10+743aad5" - "@eclipse-glsp/nyc-config" "2.7.0-next.10+743aad5" +"@eclipse-glsp/cli@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/cli/-/cli-2.7.0-next.13.tgz#03d49ae55bb551631154d114e03e7ab32df87faf" + integrity sha512-5Rj+J5ikKDnjpkoYZc2LCIP9KaP09SXF3Ftefj3XYc3BD44/GJdngBe2uKNAxM71rSmBihBmbLoF/LxL0nr+Ow== + +"@eclipse-glsp/config-test@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/config-test/-/config-test-2.7.0-next.13.tgz#f190f3f08e7d7b0ac54c851e1d2ce411a9ddbbfd" + integrity sha512-8iYAhMfEfCSVPVndqfyRCAexuXEUpdyic1zbXwuiAfAYlH5kAGVAkzCqyeXdo71Be4eX4bS43iRbi91Nmks6Dg== + dependencies: + "@eclipse-glsp/mocha-config" "2.7.0-next.13+90c0040" + "@eclipse-glsp/nyc-config" "2.7.0-next.13+90c0040" "@istanbuljs/nyc-config-typescript" "^1.0.2" "@types/chai" "^4.3.7" "@types/mocha" "^10.0.2" @@ -247,14 +247,14 @@ sinon "^15.1.0" ts-node "^10.9.1" -"@eclipse-glsp/config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/config/-/config-2.7.0-next.10.tgz#e6aa4ab50057f828facd521cd2dee198aaf973e9" - integrity sha512-ZqIcL8nPAKLduPsoyOYvUrCb6kRv8YQQ7JXcp831YDzj1Ay3vlOq9yG2My6ytyQvR3hs9MFD09467elgYq9xrw== +"@eclipse-glsp/config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/config/-/config-2.7.0-next.13.tgz#56f615c0a4520d5cec48200200649d6c9d52c36d" + integrity sha512-mkqntgl3ARfHx3jMhTYSEPUlHyJ8NJaPfSu0nGiUrmOE8qcgrmc76fdynXymeSlkmbnWgS3iFmR28/kdzyAXUw== dependencies: - "@eclipse-glsp/eslint-config" "2.7.0-next.10+743aad5" - "@eclipse-glsp/prettier-config" "2.7.0-next.10+743aad5" - "@eclipse-glsp/ts-config" "2.7.0-next.10+743aad5" + "@eclipse-glsp/eslint-config" "2.7.0-next.13+90c0040" + "@eclipse-glsp/prettier-config" "2.7.0-next.13+90c0040" + "@eclipse-glsp/ts-config" "2.7.0-next.13+90c0040" "@eslint/js" "^9.0.0" "@stylistic/eslint-plugin" "^2.0.0" "@tony.ganchev/eslint-plugin-header" "^3.1.1" @@ -271,49 +271,49 @@ typescript-eslint "^8.0.0" "@eclipse-glsp/dev@next": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/dev/-/dev-2.7.0-next.10.tgz#81b1b86c2e17d21074c3e6c8fccaf0d396270fad" - integrity sha512-lsYYLkLoj35Q3Q8Uet+WCHJQQANHFGFR9/o6DcSIMdUrRhUkyX1uS7XezQr7u9+GJ76yIwp5Q1yMYrcEBpCuzg== - dependencies: - "@eclipse-glsp/cli" "2.7.0-next.10+743aad5" - "@eclipse-glsp/config" "2.7.0-next.10+743aad5" - "@eclipse-glsp/config-test" "2.7.0-next.10+743aad5" - -"@eclipse-glsp/eslint-config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/eslint-config/-/eslint-config-2.7.0-next.10.tgz#c3a0bd614f52ca3d3c25f1512b8bd28c30777b9a" - integrity sha512-3bUCnz/qPQbABYimg0Bfo2TFMV5Qsk5tcy2uxs6ZTefX+7+AkUHOc57/A9XgRt/o3ngt3wCD+XdRkosBO/N6vw== - -"@eclipse-glsp/mocha-config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/mocha-config/-/mocha-config-2.7.0-next.10.tgz#a804c7d3f60974b3df2dc6893efda2e6b1d97c5b" - integrity sha512-mHQh6XCDnSYloOtON4qN1w8N1mSu9siZi32lfXclh2oVRu16ykUn2oaSQ8aXS6HgMK8FoWaItj5HWNP82qmfwg== - -"@eclipse-glsp/nyc-config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/nyc-config/-/nyc-config-2.7.0-next.10.tgz#c8cf5a3e00020fefb4d8cd2368bc6da458b9dabb" - integrity sha512-aYHgC8vgdGw8iK9ncNUkHgeHi6xaFso+8dECzeoQ+zLAx8aKqmh1//iyHXSWCIzwg1GsU0LkFX8FWdvX866jUg== - -"@eclipse-glsp/prettier-config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/prettier-config/-/prettier-config-2.7.0-next.10.tgz#da3c8b84b033b4c2c0ee809c29fc650386527d62" - integrity sha512-ovjSsvyt487lbrJ1+s2bTX4X2L5vZHW8gMUT9w18YjzPUQPAN6wCfmCVa5NqA+NP9U9DeDOSFp9G5g+YePdUqA== + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/dev/-/dev-2.7.0-next.13.tgz#cd29d7f8fc130602433c05b7e39c14f669aebc06" + integrity sha512-xnMKsqBtq1BZUPI2gC7/o5Le7mFP5KDZLm+tCZV08RNDkNs9o60Ijf+j8s02nLmo/IV8i0U5+dqh8SC15oUf+w== + dependencies: + "@eclipse-glsp/cli" "2.7.0-next.13+90c0040" + "@eclipse-glsp/config" "2.7.0-next.13+90c0040" + "@eclipse-glsp/config-test" "2.7.0-next.13+90c0040" + +"@eclipse-glsp/eslint-config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/eslint-config/-/eslint-config-2.7.0-next.13.tgz#8d5ff34651a397afe5a491ba53be94ca40bb440c" + integrity sha512-qaUVaG4ymXXuyPr+7UTq2X5Dbh6Jp4wIYkCwN8YxDZtPdoZKWo5AW7wNTUOpG+P1z8ishSIQI7ZzHpkyqDMXaw== + +"@eclipse-glsp/mocha-config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/mocha-config/-/mocha-config-2.7.0-next.13.tgz#0faa95ca78b61e999d37b512b26f56a0ec3f9dc8" + integrity sha512-oxTPOmn45TYJvG6GszXy9BZHRJprZuFDWRYRHhUasFd4HPNIGrchyxtxeXm8qRaVhR0bbKKTfvu14RcWx4AuCQ== + +"@eclipse-glsp/nyc-config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/nyc-config/-/nyc-config-2.7.0-next.13.tgz#19e8e28e3914a0a20008136906267600f02dc632" + integrity sha512-HY6AN3eiIM5gHCGnWnKAlGRoSh8JV1094caXw6aKfJ9BlD2lLFfK/G97Z//rlyTB2I+CIGyHPo46m65qh0FPKQ== + +"@eclipse-glsp/prettier-config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/prettier-config/-/prettier-config-2.7.0-next.13.tgz#b819c1c384ca9a0cc662727f8fa15b35ab488c59" + integrity sha512-8nbWre4W/t6gbVbVE7yz1Cf883pRA1rnWlkgtMF9OtwfhzEBeJxgpFWIe6I+nBlU8WQ+qxdZ0GOge0eJNMTSfg== dependencies: prettier-plugin-packagejson "~2.4.6" "@eclipse-glsp/protocol@next": - version "2.7.0-next.3" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/protocol/-/protocol-2.7.0-next.3.tgz#446fb5f0b13ca49651b35951903e92c1ef142f28" - integrity sha512-0QAsHKxCDaEqcqjdVIrYFlUhfbzCwcmHruTaYc74hnM/tOQbyk+DMIknhS8fGWGTb5orHjZM2hQTPrTjtHsUgg== + version "2.7.0-next.12" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/protocol/-/protocol-2.7.0-next.12.tgz#af91ae8a48ef8772a189537ada77949995710b6a" + integrity sha512-POB7bGy24sjQ5tPL4XrYZJeupLJNvDhI5jYFwKr3+wx86PZDstVz8bWydpHOa346Q8m4KKwU0vMJS+XMmyEFxQ== dependencies: sprotty-protocol "1.4.0" uuid "~10.0.0" vscode-jsonrpc "8.2.0" -"@eclipse-glsp/ts-config@2.7.0-next.10+743aad5": - version "2.7.0-next.10" - resolved "https://registry.yarnpkg.com/@eclipse-glsp/ts-config/-/ts-config-2.7.0-next.10.tgz#b3e4443d720318408ff299b7613521f226b0a2f2" - integrity sha512-Parige4p4pLPt2mlBnH9AKUXOqAQcOSjjQYVJZJ5vVpVIcuVJr7+q8PVyezlQ/8MmHIrCaJ4o+/xcs6Oft+8cg== +"@eclipse-glsp/ts-config@2.7.0-next.13+90c0040": + version "2.7.0-next.13" + resolved "https://registry.yarnpkg.com/@eclipse-glsp/ts-config/-/ts-config-2.7.0-next.13.tgz#9891d9dbe75dcaf099de44186b4dba6af836bb8d" + integrity sha512-EQEoM982uyxRQyjnnX5AVs54WnhcG7DiDu4cGF+2PWNZRn609wCgxYGz/SXEQCBZMJ2zGhsLX7AVmwEQZHn7Dw== "@emnapi/core@^1.1.0": version "1.6.0" From aaee3dbbca3df4e14c9cd06c79089427565c2ebe Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Sat, 25 Apr 2026 17:14:51 +0200 Subject: [PATCH 7/9] Fix dispatching for browser entrypoint` Refactor ActionDispatchContext and implementation for the browser side. The used ALS polyfill is not reliable and does not work with all browsers, v8 engines. --- packages/server/src/browser/di/app-module.ts | 5 +- .../browser/di/browser-dispatch-context.ts | 54 +++++++++++++++++++ packages/server/src/browser/index.ts | 1 + .../src/common/actions/action-dispatcher.ts | 36 +++++++++++-- .../src/common/di/service-identifiers.ts | 11 ---- .../node/actions/action-dispatcher.spec.ts | 8 +-- packages/server/src/node/di/app-module.ts | 4 +- .../src/node/di/node-dispatch-context.ts | 33 ++++++++++++ packages/server/src/node/index.ts | 1 + 9 files changed, 130 insertions(+), 23 deletions(-) create mode 100644 packages/server/src/browser/di/browser-dispatch-context.ts create mode 100644 packages/server/src/node/di/node-dispatch-context.ts diff --git a/packages/server/src/browser/di/app-module.ts b/packages/server/src/browser/di/app-module.ts index b7dfeee..7eaaeb9 100644 --- a/packages/server/src/browser/di/app-module.ts +++ b/packages/server/src/browser/di/app-module.ts @@ -14,16 +14,15 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -// Side-effect import: patches Promise, timers, XHR, observers on the current realm to preserve async context across awaits. -import { AsyncLocalStorage } from 'als-browser'; import { ContainerModule } from 'inversify'; import { ActionDispatchContext, InjectionContainer, LogLevel, LoggerConfigOptions, configureConsoleLogger } from '../../common/'; +import { BrowserDispatchContext } from './browser-dispatch-context'; export function createAppModule(options: LoggerConfigOptions = {}): ContainerModule { const resolvedOptions: LoggerConfigOptions = { consoleLog: true, logLevel: LogLevel.info, ...options }; return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); - bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); + bind(ActionDispatchContext).toDynamicValue(() => new BrowserDispatchContext()); const context = { bind, unbind, isBound, rebind }; configureConsoleLogger(context, resolvedOptions); }); diff --git a/packages/server/src/browser/di/browser-dispatch-context.ts b/packages/server/src/browser/di/browser-dispatch-context.ts new file mode 100644 index 0000000..4f0476a --- /dev/null +++ b/packages/server/src/browser/di/browser-dispatch-context.ts @@ -0,0 +1,54 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { Action } from '@eclipse-glsp/protocol'; +import { ActionDispatchContext } from '../../common/actions/action-dispatcher'; +import { ClientAction } from '../../common/protocol/client-action'; + +/** + * Browser-compatible {@link ActionDispatchContext} that uses a simple flag instead of + * `AsyncLocalStorage`. Currently available polyfill implementations of `AsyncLocalStorage` do not work correctly + * in certain browser/javascript engines (e.g. V8) + * + * The flag-based approach has one limitation: + * it cannot distinguish "within the handler's async continuation" from "new event that arrived during an await gap. + * This means, that subsequent client actions might be dispatched inline instead of being queued, if they arrive during an await gap. + * To prevent this, we always treat client-originated actions as out-of-context, ensuring they are queued rather than dispatched inline. + * + * There still is a corner case for server-originated actions that are dispatched in a different async chain e.g timer-based. + * However, in practice this means that these actions are dispatched inline instead of being queued, which does not cause any issues. + * Since they originated from a different async chain, there were no order guarantees with respect to the current action anyway. + */ +export class BrowserDispatchContext implements ActionDispatchContext { + protected active = false; + + run(callback: () => R): R { + const prior = this.active; + this.active = true; + const result = callback(); + if (result instanceof Promise) { + return result.finally(() => { + this.active = prior; + }) as unknown as R; + } + this.active = prior; + return result; + } + + isInContext(action: Action): boolean { + return this.active && !ClientAction.is(action); + } +} diff --git a/packages/server/src/browser/index.ts b/packages/server/src/browser/index.ts index 2f5a370..0285cc0 100644 --- a/packages/server/src/browser/index.ts +++ b/packages/server/src/browser/index.ts @@ -14,5 +14,6 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ export * from './di/app-module'; +export * from './di/browser-dispatch-context'; export * from './launch/worker-server-launcher'; export * from './reexport'; diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index 20bf2ce..ba7e3e6 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -26,7 +26,7 @@ import { flatPush } from '@eclipse-glsp/protocol'; import { inject, injectable, postConstruct } from 'inversify'; -import { ActionDispatchContext, ClientId } from '../di/service-identifiers'; +import { ClientId } from '../di/service-identifiers'; import { ActionChannel } from '../utils/action-channel'; import { GLSPServerError } from '../utils/glsp-server-error'; import { Logger } from '../utils/logger'; @@ -113,6 +113,36 @@ export interface ActionDispatcher { ): Promise; } +export const ActionDispatchContext = Symbol('ActionDispatchContext'); + +/** + * Scope marker that lets the {@link ActionDispatcher} know whether a call to `dispatch()` + * originates from inside a running handler (reentrant) or from outside (external). + * + * The consumer loop wraps each action in {@link run} so that reentrant `dispatch()` calls + * (handler responses, injected dispatcher calls) can be recognized via {@link isInContext} + * and executed inline instead of being queued. + * + * Used by the {@link DefaultActionDispatcher} implementation. + */ +export interface ActionDispatchContext { + /** + * Executes the callback inside the dispatch context. While the callback (and its full + * async continuation) is running, {@link isInContext} returns `true` for reentrant calls. + */ + run(callback: () => R): R; + + /** + * Returns `true` if the caller is executing inside a {@link run} callback, meaning the + * dispatch is reentrant (e.g. a handler response or an injected dispatcher call) and + * should run inline rather than being queued. + * + * Implementations may inspect the action to apply additional guards, e.g. to ensure + * client-originated actions are always queued regardless of context state. + */ + isInContext(action: Action): boolean; +} + /** * Default {@link ActionDispatcher}. External dispatches are queued and processed one at a * time; dispatches made from within a running handler run inline with the containing action. @@ -156,7 +186,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return Promise.resolve(); } // Reentrant dispatches run inline to preserve ordering with the containing action. - if (this.dispatchContext.getStore()) { + if (this.dispatchContext.isInContext(action)) { return this.doDispatch(action); } // External dispatches are queued and processed sequentially. @@ -167,7 +197,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { // Run each action inside the dispatch context so reentrant dispatch() calls are recognized. for await (const entry of this.channel.consume()) { try { - await this.dispatchContext.run(true, () => this.doDispatch(entry.item)); + await this.dispatchContext.run(() => this.doDispatch(entry.item)); entry.resolve(); } catch (error) { entry.reject(error); diff --git a/packages/server/src/common/di/service-identifiers.ts b/packages/server/src/common/di/service-identifiers.ts index a7f5abe..079f41b 100644 --- a/packages/server/src/common/di/service-identifiers.ts +++ b/packages/server/src/common/di/service-identifiers.ts @@ -37,14 +37,3 @@ export const NavigationTargetProviders = Symbol('NavigationTargetProviders'); export type ValidateLabelEditAdapterFactory = (validator: LabelEditValidator) => ValidateLabelEditAdapter; export const Operations = Symbol('Operations'); - -/** - * Scope marker that lets the {@link ActionDispatcher} know whether a call to `dispatch()` - * originates from inside a running handler (reentrant) or from outside (external). - */ -export interface ActionDispatchContext { - run(store: boolean, callback: () => R): R; - getStore(): boolean | undefined; -} - -export const ActionDispatchContext = Symbol('ActionDispatchContext'); diff --git a/packages/server/src/node/actions/action-dispatcher.spec.ts b/packages/server/src/node/actions/action-dispatcher.spec.ts index 4c9bd21..d38f827 100644 --- a/packages/server/src/node/actions/action-dispatcher.spec.ts +++ b/packages/server/src/node/actions/action-dispatcher.spec.ts @@ -14,18 +14,18 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ import { Action, Deferred, RequestAction, ResponseAction, UpdateModelAction } from '@eclipse-glsp/protocol'; -import { AsyncLocalStorage } from 'async_hooks'; import { expect } from 'chai'; import { Container, ContainerModule } from 'inversify'; import * as sinon from 'sinon'; -import { DefaultActionDispatcher } from '../../common/actions/action-dispatcher'; +import { ActionDispatchContext, DefaultActionDispatcher } from '../../common/actions/action-dispatcher'; import { ActionHandler } from '../../common/actions/action-handler'; import { ActionHandlerRegistry } from '../../common/actions/action-handler-registry'; import { ClientActionForwarder } from '../../common/actions/client-action-handler'; -import { ActionDispatchContext, ClientActionKinds, ClientId } from '../../common/di/service-identifiers'; +import { ClientActionKinds, ClientId } from '../../common/di/service-identifiers'; import { ClientSessionManager } from '../../common/session/client-session-manager'; import * as mock from '../../common/test/mock-util'; import { Logger } from '../../common/utils/logger'; +import { NodeDispatchContext } from '../di/node-dispatch-context'; function waitSync(timeInMillis: number): void { const start = Date.now(); @@ -52,7 +52,7 @@ describe('test DefaultActionDispatcher', () => { bind(ActionHandlerRegistry).toConstantValue(actionHandlerRegistry); bind(ClientActionKinds).toConstantValue(new Set(['response', 'response1', 'response2'])); bind(ClientActionForwarder).toConstantValue(clientActionForwarderStub); - bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); + bind(ActionDispatchContext).toDynamicValue(() => new NodeDispatchContext()); }) ); const actionDispatcher = container.resolve(DefaultActionDispatcher); diff --git a/packages/server/src/node/di/app-module.ts b/packages/server/src/node/di/app-module.ts index 2b1b508..c746e63 100644 --- a/packages/server/src/node/di/app-module.ts +++ b/packages/server/src/node/di/app-module.ts @@ -14,17 +14,17 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ import { BindingContext } from '@eclipse-glsp/protocol/lib/di'; -import { AsyncLocalStorage } from 'async_hooks'; import { ContainerModule } from 'inversify'; import * as winston from 'winston'; import { ActionDispatchContext, InjectionContainer, LogLevel, Logger, LoggerFactory, NullLogger, getRequestParentName } from '../../common'; import { LaunchOptions } from '../launch/cli-parser'; +import { NodeDispatchContext } from './node-dispatch-context'; import { WinstonLogger } from './winston-logger'; export function createAppModule(options: LaunchOptions): ContainerModule { return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); - bind(ActionDispatchContext).toDynamicValue(() => new AsyncLocalStorage()); + bind(ActionDispatchContext).toDynamicValue(() => new NodeDispatchContext()); const context = { bind, unbind, isBound, rebind }; configureWinstonLogger(context, options); }); diff --git a/packages/server/src/node/di/node-dispatch-context.ts b/packages/server/src/node/di/node-dispatch-context.ts new file mode 100644 index 0000000..e0dc7d6 --- /dev/null +++ b/packages/server/src/node/di/node-dispatch-context.ts @@ -0,0 +1,33 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { AsyncLocalStorage } from 'async_hooks'; +import { ActionDispatchContext } from '../../common/actions/action-dispatcher'; + +/** + * Node.js {@link ActionDispatchContext} backed by native `AsyncLocalStorage`. + */ +export class NodeDispatchContext implements ActionDispatchContext { + protected storage = new AsyncLocalStorage(); + + run(callback: () => R): R { + return this.storage.run(true, callback); + } + + isInContext(): boolean { + return this.storage.getStore() === true; + } +} diff --git a/packages/server/src/node/index.ts b/packages/server/src/node/index.ts index 9de6264..a34c815 100644 --- a/packages/server/src/node/index.ts +++ b/packages/server/src/node/index.ts @@ -15,6 +15,7 @@ ********************************************************************************/ export * from './abstract-json-model-storage'; export * from './di/app-module'; +export * from './di/node-dispatch-context'; export * from './di/winston-logger'; export * from './gmodel/gmodel-storage'; export * from './launch/cli-parser'; From 12201601e2fa5acadb03aef2bde010374ba88f71 Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Mon, 27 Apr 2026 14:07:16 +0200 Subject: [PATCH 8/9] refactor: address review feedback and rename to ActionDispatchScope - Remove unused `als-browser` dependency - Fix browser scope leaking `active=true` on sync throw - Add grace-period cleanup for stale request-timeout markers (memory leak) - Add `.catch()` to fire-and-forget dispatches in `processActionQueue` and `interceptPendingResponse` - Harden `handleClientRequest` catch: wrap rejection-send in inner try/catch - Make `dispatchAll` sequential to give reentrant calls deterministic ordering - Tighten JSDoc on the browser scope; document residual interleaving risk - Note action mutation in `request()`/`requestUntil()` JSDoc - Document serial-invocation assumption on `BrowserActionDispatchScope.enter()` - Reword "Node-only" comment to "no equivalent in the Java GLSP server" - Rename `ActionDispatchContext` -> `ActionDispatchScope` (interface, symbol, impls) - Rename `run` -> `enter`, `isInContext` -> `isReentrant` - Rename fields: `dispatchContext` -> `dispatchScope`, `channel` -> `actionQueue`, `timeouts` -> `requestTimeouts` - Rename `runConsumerLoop` -> `processActionQueue` - Rename `executeHandler` param `request` -> `action` - Switch scope bindings to `to(Class)` form (transient on purpose: per-session isolation) - Add `BrowserActionDispatchScope` unit tests (sync/async/throw/reject/ClientAction) --- packages/server/package.json | 1 - packages/server/src/browser/di/app-module.ts | 8 +- .../di/browser-action-dispatch-scope.spec.ts | 75 +++++++++++ .../di/browser-action-dispatch-scope.ts | 67 ++++++++++ .../browser/di/browser-dispatch-context.ts | 54 -------- packages/server/src/browser/index.ts | 4 +- .../src/common/actions/action-dispatcher.ts | 120 ++++++++++-------- .../server/src/common/protocol/glsp-server.ts | 14 +- .../node/actions/action-dispatcher.spec.ts | 6 +- packages/server/src/node/di/app-module.ts | 8 +- ...ntext.ts => node-action-dispatch-scope.ts} | 12 +- packages/server/src/node/index.ts | 4 +- yarn.lock | 5 - 13 files changed, 245 insertions(+), 133 deletions(-) create mode 100644 packages/server/src/browser/di/browser-action-dispatch-scope.spec.ts create mode 100644 packages/server/src/browser/di/browser-action-dispatch-scope.ts delete mode 100644 packages/server/src/browser/di/browser-dispatch-context.ts rename packages/server/src/node/di/{node-dispatch-context.ts => node-action-dispatch-scope.ts} (75%) diff --git a/packages/server/package.json b/packages/server/package.json index ffa09bf..9b156c4 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -61,7 +61,6 @@ "@eclipse-glsp/graph": "2.7.0-next", "@eclipse-glsp/protocol": "next", "@types/uuid": "8.3.1", - "als-browser": "^1.0.1", "commander": "^8.3.0", "fast-json-patch": "^3.1.0", "lodash": "4.17.21", diff --git a/packages/server/src/browser/di/app-module.ts b/packages/server/src/browser/di/app-module.ts index 7eaaeb9..023b89b 100644 --- a/packages/server/src/browser/di/app-module.ts +++ b/packages/server/src/browser/di/app-module.ts @@ -15,14 +15,16 @@ ********************************************************************************/ import { ContainerModule } from 'inversify'; -import { ActionDispatchContext, InjectionContainer, LogLevel, LoggerConfigOptions, configureConsoleLogger } from '../../common/'; -import { BrowserDispatchContext } from './browser-dispatch-context'; +import { ActionDispatchScope, InjectionContainer, LogLevel, LoggerConfigOptions, configureConsoleLogger } from '../../common/'; +import { BrowserActionDispatchScope } from './browser-action-dispatch-scope'; export function createAppModule(options: LoggerConfigOptions = {}): ContainerModule { const resolvedOptions: LoggerConfigOptions = { consoleLog: true, logLevel: LogLevel.info, ...options }; return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); - bind(ActionDispatchContext).toDynamicValue(() => new BrowserDispatchContext()); + // Transient on purpose: a singleton at the server-container level would be shared across + // sessions and leak the browser flag between them. + bind(ActionDispatchScope).to(BrowserActionDispatchScope); const context = { bind, unbind, isBound, rebind }; configureConsoleLogger(context, resolvedOptions); }); diff --git a/packages/server/src/browser/di/browser-action-dispatch-scope.spec.ts b/packages/server/src/browser/di/browser-action-dispatch-scope.spec.ts new file mode 100644 index 0000000..b910496 --- /dev/null +++ b/packages/server/src/browser/di/browser-action-dispatch-scope.spec.ts @@ -0,0 +1,75 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { Action } from '@eclipse-glsp/protocol'; +import { expect } from 'chai'; +import { ClientAction } from '../../common/protocol/client-action'; +import * as mock from '../../common/test/mock-util'; +import { BrowserActionDispatchScope } from './browser-action-dispatch-scope'; + +describe('BrowserActionDispatchScope', () => { + const action: Action = { kind: 'foo' }; + const markedClientAction = ((): Action => { + const a: Action = { kind: 'bar' }; + ClientAction.mark(a); + return a; + })(); + + let scope: BrowserActionDispatchScope; + beforeEach(() => { + scope = new BrowserActionDispatchScope(); + }); + + it('isReentrant is false outside enter()', () => { + expect(scope.isReentrant(action)).to.be.false; + }); + + it('isReentrant is true during a synchronous enter()', () => { + scope.enter(() => { + expect(scope.isReentrant(action)).to.be.true; + }); + expect(scope.isReentrant(action)).to.be.false; + }); + + it('isReentrant is true during an async enter() and false after settle', async () => { + const probe: Promise = scope.enter(async () => { + await Promise.resolve(); + return scope.isReentrant(action); + }); + expect(await probe).to.be.true; + expect(scope.isReentrant(action)).to.be.false; + }); + + it('resets active flag when callback throws synchronously', () => { + expect(() => + scope.enter(() => { + throw new Error('boom'); + }) + ).to.throw('boom'); + expect(scope.isReentrant(action)).to.be.false; + }); + + it('resets active flag when async callback rejects', async () => { + await mock.expectToThrowAsync(() => scope.enter(() => Promise.reject(new Error('boom'))), 'boom'); + expect(scope.isReentrant(action)).to.be.false; + }); + + it('isReentrant is false for client-originated actions even when scope is active', () => { + scope.enter(() => { + expect(scope.isReentrant(markedClientAction)).to.be.false; + expect(scope.isReentrant(action)).to.be.true; + }); + }); +}); diff --git a/packages/server/src/browser/di/browser-action-dispatch-scope.ts b/packages/server/src/browser/di/browser-action-dispatch-scope.ts new file mode 100644 index 0000000..f9d3825 --- /dev/null +++ b/packages/server/src/browser/di/browser-action-dispatch-scope.ts @@ -0,0 +1,67 @@ +/******************************************************************************** + * Copyright (c) 2026 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { Action } from '@eclipse-glsp/protocol'; +import { injectable } from 'inversify'; +import { ActionDispatchScope } from '../../common/actions/action-dispatcher'; +import { ClientAction } from '../../common/protocol/client-action'; + +/** + * Browser-compatible {@link ActionDispatchScope} backed by a single boolean flag, used because + * available `AsyncLocalStorage` polyfills do not work reliably across browser engines (e.g. V8). + * + * The flag cannot distinguish "still inside the handler's async continuation" from "unrelated + * event fired during the handler's await". Any dispatch arriving in such a gap is observed as + * reentrant and routed inline. Client-originated actions are explicitly treated as non-reentrant + * to cover the dominant case, but server-side dispatches from non-handler contexts (timer + * callbacks, event listeners, adopter code) cannot be filtered this way and may interleave with + * the in-flight handler. + * + * The dispatcher normally serializes handler execution; the inline interleaving breaks that + * guarantee. A handler that pauses on `await` may resume to find that another handler has mutated + * state in between (model state, command stack, caches), leading to unexpected behavior. + * Avoid dispatching from non-handler contexts where possible. + */ +@injectable() +export class BrowserActionDispatchScope implements ActionDispatchScope { + protected active = false; + + // Assumes serial invocation by the dispatcher's queue processor; concurrent enter() calls + // would corrupt the prior-restore logic and leave the flag stuck. + enter(callback: () => R): R { + const prior = this.active; + this.active = true; + let result: R; + try { + result = callback(); + } catch (error) { + this.active = prior; + throw error; + } + if (result instanceof Promise) { + // Cast required because TS cannot prove the .finally() result matches the generic R. + return result.finally(() => { + this.active = prior; + }) as unknown as R; + } + this.active = prior; + return result; + } + + isReentrant(action: Action): boolean { + return this.active && !ClientAction.is(action); + } +} diff --git a/packages/server/src/browser/di/browser-dispatch-context.ts b/packages/server/src/browser/di/browser-dispatch-context.ts deleted file mode 100644 index 4f0476a..0000000 --- a/packages/server/src/browser/di/browser-dispatch-context.ts +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2026 EclipseSource and others. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v. 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0. - * - * This Source Code may also be made available under the following Secondary - * Licenses when the conditions for such availability set forth in the Eclipse - * Public License v. 2.0 are satisfied: GNU General Public License, version 2 - * with the GNU Classpath Exception which is available at - * https://www.gnu.org/software/classpath/license.html. - * - * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 - ********************************************************************************/ - -import { Action } from '@eclipse-glsp/protocol'; -import { ActionDispatchContext } from '../../common/actions/action-dispatcher'; -import { ClientAction } from '../../common/protocol/client-action'; - -/** - * Browser-compatible {@link ActionDispatchContext} that uses a simple flag instead of - * `AsyncLocalStorage`. Currently available polyfill implementations of `AsyncLocalStorage` do not work correctly - * in certain browser/javascript engines (e.g. V8) - * - * The flag-based approach has one limitation: - * it cannot distinguish "within the handler's async continuation" from "new event that arrived during an await gap. - * This means, that subsequent client actions might be dispatched inline instead of being queued, if they arrive during an await gap. - * To prevent this, we always treat client-originated actions as out-of-context, ensuring they are queued rather than dispatched inline. - * - * There still is a corner case for server-originated actions that are dispatched in a different async chain e.g timer-based. - * However, in practice this means that these actions are dispatched inline instead of being queued, which does not cause any issues. - * Since they originated from a different async chain, there were no order guarantees with respect to the current action anyway. - */ -export class BrowserDispatchContext implements ActionDispatchContext { - protected active = false; - - run(callback: () => R): R { - const prior = this.active; - this.active = true; - const result = callback(); - if (result instanceof Promise) { - return result.finally(() => { - this.active = prior; - }) as unknown as R; - } - this.active = prior; - return result; - } - - isInContext(action: Action): boolean { - return this.active && !ClientAction.is(action); - } -} diff --git a/packages/server/src/browser/index.ts b/packages/server/src/browser/index.ts index 0285cc0..ca99842 100644 --- a/packages/server/src/browser/index.ts +++ b/packages/server/src/browser/index.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2024 EclipseSource and others. + * Copyright (c) 2022-2026 EclipseSource and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -14,6 +14,6 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ export * from './di/app-module'; -export * from './di/browser-dispatch-context'; +export * from './di/browser-action-dispatch-scope'; export * from './launch/worker-server-launcher'; export * from './reexport'; diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index ba7e3e6..736ed8b 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -86,6 +86,8 @@ export interface ActionDispatcher { * The promise waits indefinitely until a response arrives or the dispatcher is disposed. * Use {@link requestUntil} if a timeout is needed. * + * Note: mutates `action.requestId` (if unset) and `action.timeout`. + * * @param action The request action to dispatch. * @returns A promise that resolves with the matching response action. */ @@ -100,6 +102,8 @@ export interface ActionDispatcher { * If `rejectOnTimeout` is set to `false` (default) the returned promise will be resolved with * no value, otherwise it will be rejected. * + * Note: mutates `action.requestId` (if unset) and `action.timeout`. + * * @param action The request action to dispatch. * @param timeoutMs Maximum wait time in milliseconds. Defaults to * {@link RequestAction.timeout} if set, otherwise 2000 ms. @@ -113,34 +117,34 @@ export interface ActionDispatcher { ): Promise; } -export const ActionDispatchContext = Symbol('ActionDispatchContext'); +export const ActionDispatchScope = Symbol('ActionDispatchScope'); /** * Scope marker that lets the {@link ActionDispatcher} know whether a call to `dispatch()` * originates from inside a running handler (reentrant) or from outside (external). * - * The consumer loop wraps each action in {@link run} so that reentrant `dispatch()` calls - * (handler responses, injected dispatcher calls) can be recognized via {@link isInContext} - * and executed inline instead of being queued. + * The {@link DefaultActionDispatcher.processActionQueue} loop wraps each action in {@link enter} + * so that reentrant `dispatch()` calls (handler responses, injected dispatcher calls) can be + * recognized via {@link isReentrant} and executed inline instead of being queued. * * Used by the {@link DefaultActionDispatcher} implementation. */ -export interface ActionDispatchContext { +export interface ActionDispatchScope { /** - * Executes the callback inside the dispatch context. While the callback (and its full - * async continuation) is running, {@link isInContext} returns `true` for reentrant calls. + * Executes the callback inside the dispatch scope. While the callback (and its full async + * continuation) is running, {@link isReentrant} returns `true` for reentrant calls. */ - run(callback: () => R): R; + enter(callback: () => R): R; /** - * Returns `true` if the caller is executing inside a {@link run} callback, meaning the - * dispatch is reentrant (e.g. a handler response or an injected dispatcher call) and - * should run inline rather than being queued. + * Returns `true` if the given dispatch is reentrant — i.e. it originates from within a + * running {@link enter} callback (handler response or injected dispatcher call) and should + * run inline rather than being queued. * * Implementations may inspect the action to apply additional guards, e.g. to ensure - * client-originated actions are always queued regardless of context state. + * client-originated actions are always queued regardless of scope state. */ - isInContext(action: Action): boolean; + isReentrant(action: Action): boolean; } /** @@ -149,6 +153,8 @@ export interface ActionDispatchContext { */ @injectable() export class DefaultActionDispatcher implements ActionDispatcher, Disposable { + protected static readonly STALE_TIMEOUT_GRACE_MS = 30_000; + @inject(ActionHandlerRegistry) protected actionHandlerRegistry: ActionHandlerRegistry; @@ -161,19 +167,21 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @inject(ClientId) protected clientId: string; - @inject(ActionDispatchContext) - protected dispatchContext: ActionDispatchContext; + @inject(ActionDispatchScope) + protected dispatchScope: ActionDispatchScope; - protected channel = new ActionChannel(); + protected actionQueue = new ActionChannel(); protected postUpdateQueue: Action[] = []; protected readonly pendingRequests = new Map>(); - protected readonly timeouts = new Map(); + protected readonly requestTimeouts = new Map(); protected nextRequestId = 1; @postConstruct() protected initialize(): void { - this.runConsumerLoop(); + // Fire-and-forget: the loop is meant to run for the dispatcher's lifetime; surface any + // unexpected termination via the logger instead of an unhandled rejection. + this.processActionQueue().catch(error => this.logger.error('Action queue processor terminated unexpectedly', error)); } protected generateRequestId(): string { @@ -186,18 +194,18 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { return Promise.resolve(); } // Reentrant dispatches run inline to preserve ordering with the containing action. - if (this.dispatchContext.isInContext(action)) { + if (this.dispatchScope.isReentrant(action)) { return this.doDispatch(action); } // External dispatches are queued and processed sequentially. - return this.channel.push(action); + return this.actionQueue.push(action); } - protected async runConsumerLoop(): Promise { - // Run each action inside the dispatch context so reentrant dispatch() calls are recognized. - for await (const entry of this.channel.consume()) { + protected async processActionQueue(): Promise { + // Process each action inside the dispatch scope so reentrant dispatch() calls are recognized. + for await (const entry of this.actionQueue.consume()) { try { - await this.dispatchContext.run(() => this.doDispatch(entry.item)); + await this.dispatchScope.enter(() => this.doDispatch(entry.item)); entry.resolve(); } catch (error) { entry.reject(error); @@ -227,26 +235,30 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { await this.dispatchResponses(responses); } - protected async executeHandler(handler: ActionHandler, request: Action): Promise { - const responseActions = await handler.execute(request); - return responseActions.map(action => respond(request, action)); + protected async executeHandler(handler: ActionHandler, action: Action): Promise { + const responseActions = await handler.execute(action); + return responseActions.map(response => respond(action, response)); } protected async dispatchResponses(actions: Action[]): Promise { - // Sequential dispatch inside the current dispatch context. Each response goes inline via + // Sequential dispatch inside the current dispatch scope. Each response goes inline via // the reentrant path, or is intercepted if it resolves a pending request(). for (const action of actions) { await this.dispatch(action); } } - dispatchAll(...actions: MaybeArray[]): Promise { + async dispatchAll(...actions: MaybeArray[]): Promise { if (actions.length === 0) { - return Promise.resolve(); + return; } const flat: Action[] = []; flatPush(flat, actions); - return Promise.all(flat.map(action => this.dispatch(action))).then(() => Promise.resolve()); + // Sequential dispatch: external calls were already FIFO via the queue, but reentrant + // calls also need deterministic ordering so handlers see each other's effects in order. + for (const action of flat) { + await this.dispatch(action); + } } dispatchAfterNextUpdate(...actions: MaybeArray[]): void { @@ -285,10 +297,14 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { if (timeoutMs !== undefined) { const timeout = setTimeout(() => { if (this.pendingRequests.delete(action.requestId)) { - // Intentionally keep the timeouts entry (do NOT delete). - // The stale entry signals "this request existed but timed out", - // matching the client-side GLSPActionDispatcher pattern. - // Cleaned up when the late response arrives or on dispose(). + // Keep the requestTimeouts entry briefly as a stale marker so a late response + // can be filtered, then drop it after a grace period to avoid leaking markers + // for requests whose late responses never arrive. + const cleanup = setTimeout( + () => this.requestTimeouts.delete(action.requestId), + DefaultActionDispatcher.STALE_TIMEOUT_GRACE_MS + ); + cleanup.unref?.(); const message = `Request '${action.requestId}' (${action.kind}) timed out after ${timeoutMs}ms`; if (rejectOnTimeout) { deferred.reject(new Error(message)); @@ -299,20 +315,20 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { } }, timeoutMs); - this.timeouts.set(action.requestId, timeout); + this.requestTimeouts.set(action.requestId, timeout); } // dispatch() routes correctly on its own: external callers queue, handler-internal - // callers run inline via the AsyncLocalStorage context. The matching response resolves + // callers run inline via the ActionDispatchScope. The matching response resolves // the deferred out-of-band via interceptPendingResponse(). const dispatchPromise = this.dispatch(action); dispatchPromise.catch(error => { if (this.pendingRequests.delete(action.requestId)) { - const timeout = this.timeouts.get(action.requestId); + const timeout = this.requestTimeouts.get(action.requestId); if (timeout !== undefined) { clearTimeout(timeout); - this.timeouts.delete(action.requestId); + this.requestTimeouts.delete(action.requestId); } deferred.reject(error); } @@ -350,15 +366,15 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { if (!ResponseAction.hasValidResponseId(action)) { return false; } - // Node-only: responses to server-initiated requests are resolved here instead of going - // through action handlers. No Java equivalent. + // Responses to server-initiated requests are resolved here instead of going through + // action handlers. No equivalent in the Java GLSP server implementation. const deferred = this.pendingRequests.get(action.responseId); if (deferred !== undefined) { this.pendingRequests.delete(action.responseId); - const timeout = this.timeouts.get(action.responseId); + const timeout = this.requestTimeouts.get(action.responseId); if (timeout !== undefined) { clearTimeout(timeout); - this.timeouts.delete(action.responseId); + this.requestTimeouts.delete(action.responseId); } // Intercepted responses skip doDispatch, so drain post-update actions here when the // response is an UpdateModel/SetModel. RejectAction does not trigger a drain; pending @@ -370,16 +386,20 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { deferred.resolve(action); } if (postUpdateActions.length > 0) { - this.dispatchResponses(postUpdateActions); + // Fire-and-forget: callers of request() expect the resolved response, not the + // unrelated post-update fan-out; awaiting here would couple them unnecessarily. + this.dispatchResponses(postUpdateActions).catch(error => + this.logger.error('Failed to dispatch post-update actions', error) + ); } return true; } // Late response for a timed-out request: clear responseId so ClientActionForwarder does // not re-emit it to the client. - const staleTimeout = this.timeouts.get(action.responseId); + const staleTimeout = this.requestTimeouts.get(action.responseId); if (staleTimeout !== undefined) { clearTimeout(staleTimeout); - this.timeouts.delete(action.responseId); + this.requestTimeouts.delete(action.responseId); this.logger.debug(`Late response for timed-out request '${action.responseId}', dispatching as normal action`); action.responseId = ''; } @@ -388,12 +408,12 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { dispose(): void { // Reject queued actions: no further processing should happen after dispose. - this.channel.rejectPending(new Error('ActionDispatcher disposed')); - this.channel.stop(); + this.actionQueue.rejectPending(new Error('ActionDispatcher disposed')); + this.actionQueue.stop(); this.pendingRequests.forEach((deferred, id) => deferred.reject(new Error(`Request '${id}' cancelled: dispatcher disposed`))); this.pendingRequests.clear(); - this.timeouts.forEach(timeout => clearTimeout(timeout)); - this.timeouts.clear(); + this.requestTimeouts.forEach(timeout => clearTimeout(timeout)); + this.requestTimeouts.clear(); this.postUpdateQueue = []; } } diff --git a/packages/server/src/common/protocol/glsp-server.ts b/packages/server/src/common/protocol/glsp-server.ts index 76a4cba..d43f063 100644 --- a/packages/server/src/common/protocol/glsp-server.ts +++ b/packages/server/src/common/protocol/glsp-server.ts @@ -177,11 +177,15 @@ export class DefaultGLSPServer implements GLSPServer { } catch (error) { const detail = error instanceof GLSPServerError ? error.cause?.toString?.() : error?.toString?.(); this.logger.error(`Failed to handle request '${action.kind}' (${action.requestId}):`, detail); - const reject = RejectAction.create(`Failed to handle request '${action.kind}' (${action.requestId})`, { - responseId: action.requestId, - detail - }); - this.sendResponseToClient(clientId, reject); + try { + const reject = RejectAction.create(`Failed to handle request '${action.kind}' (${action.requestId})`, { + responseId: action.requestId, + detail + }); + this.sendResponseToClient(clientId, reject); + } catch (sendError) { + this.logger.error(`Failed to send rejection for request '${action.requestId}':`, sendError); + } } } diff --git a/packages/server/src/node/actions/action-dispatcher.spec.ts b/packages/server/src/node/actions/action-dispatcher.spec.ts index d38f827..1532b92 100644 --- a/packages/server/src/node/actions/action-dispatcher.spec.ts +++ b/packages/server/src/node/actions/action-dispatcher.spec.ts @@ -17,7 +17,7 @@ import { Action, Deferred, RequestAction, ResponseAction, UpdateModelAction } fr import { expect } from 'chai'; import { Container, ContainerModule } from 'inversify'; import * as sinon from 'sinon'; -import { ActionDispatchContext, DefaultActionDispatcher } from '../../common/actions/action-dispatcher'; +import { ActionDispatchScope, DefaultActionDispatcher } from '../../common/actions/action-dispatcher'; import { ActionHandler } from '../../common/actions/action-handler'; import { ActionHandlerRegistry } from '../../common/actions/action-handler-registry'; import { ClientActionForwarder } from '../../common/actions/client-action-handler'; @@ -25,7 +25,7 @@ import { ClientActionKinds, ClientId } from '../../common/di/service-identifiers import { ClientSessionManager } from '../../common/session/client-session-manager'; import * as mock from '../../common/test/mock-util'; import { Logger } from '../../common/utils/logger'; -import { NodeDispatchContext } from '../di/node-dispatch-context'; +import { NodeActionDispatchScope } from '../di/node-action-dispatch-scope'; function waitSync(timeInMillis: number): void { const start = Date.now(); @@ -52,7 +52,7 @@ describe('test DefaultActionDispatcher', () => { bind(ActionHandlerRegistry).toConstantValue(actionHandlerRegistry); bind(ClientActionKinds).toConstantValue(new Set(['response', 'response1', 'response2'])); bind(ClientActionForwarder).toConstantValue(clientActionForwarderStub); - bind(ActionDispatchContext).toDynamicValue(() => new NodeDispatchContext()); + bind(ActionDispatchScope).to(NodeActionDispatchScope); }) ); const actionDispatcher = container.resolve(DefaultActionDispatcher); diff --git a/packages/server/src/node/di/app-module.ts b/packages/server/src/node/di/app-module.ts index c746e63..6033e58 100644 --- a/packages/server/src/node/di/app-module.ts +++ b/packages/server/src/node/di/app-module.ts @@ -16,15 +16,17 @@ import { BindingContext } from '@eclipse-glsp/protocol/lib/di'; import { ContainerModule } from 'inversify'; import * as winston from 'winston'; -import { ActionDispatchContext, InjectionContainer, LogLevel, Logger, LoggerFactory, NullLogger, getRequestParentName } from '../../common'; +import { ActionDispatchScope, InjectionContainer, LogLevel, Logger, LoggerFactory, NullLogger, getRequestParentName } from '../../common'; import { LaunchOptions } from '../launch/cli-parser'; -import { NodeDispatchContext } from './node-dispatch-context'; +import { NodeActionDispatchScope } from './node-action-dispatch-scope'; import { WinstonLogger } from './winston-logger'; export function createAppModule(options: LaunchOptions): ContainerModule { return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); - bind(ActionDispatchContext).toDynamicValue(() => new NodeDispatchContext()); + // Transient on purpose: kept symmetric with the browser binding, which cannot share a + // singleton across sessions. + bind(ActionDispatchScope).to(NodeActionDispatchScope); const context = { bind, unbind, isBound, rebind }; configureWinstonLogger(context, options); }); diff --git a/packages/server/src/node/di/node-dispatch-context.ts b/packages/server/src/node/di/node-action-dispatch-scope.ts similarity index 75% rename from packages/server/src/node/di/node-dispatch-context.ts rename to packages/server/src/node/di/node-action-dispatch-scope.ts index e0dc7d6..359cb86 100644 --- a/packages/server/src/node/di/node-dispatch-context.ts +++ b/packages/server/src/node/di/node-action-dispatch-scope.ts @@ -15,19 +15,21 @@ ********************************************************************************/ import { AsyncLocalStorage } from 'async_hooks'; -import { ActionDispatchContext } from '../../common/actions/action-dispatcher'; +import { injectable } from 'inversify'; +import { ActionDispatchScope } from '../../common/actions/action-dispatcher'; /** - * Node.js {@link ActionDispatchContext} backed by native `AsyncLocalStorage`. + * Node.js {@link ActionDispatchScope} backed by native `AsyncLocalStorage`. */ -export class NodeDispatchContext implements ActionDispatchContext { +@injectable() +export class NodeActionDispatchScope implements ActionDispatchScope { protected storage = new AsyncLocalStorage(); - run(callback: () => R): R { + enter(callback: () => R): R { return this.storage.run(true, callback); } - isInContext(): boolean { + isReentrant(): boolean { return this.storage.getStore() === true; } } diff --git a/packages/server/src/node/index.ts b/packages/server/src/node/index.ts index a34c815..3806206 100644 --- a/packages/server/src/node/index.ts +++ b/packages/server/src/node/index.ts @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2022-2024 EclipseSource and others. + * Copyright (c) 2022-2026 EclipseSource and others. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -15,7 +15,7 @@ ********************************************************************************/ export * from './abstract-json-model-storage'; export * from './di/app-module'; -export * from './di/node-dispatch-context'; +export * from './di/node-action-dispatch-scope'; export * from './di/winston-logger'; export * from './gmodel/gmodel-storage'; export * from './launch/cli-parser'; diff --git a/yarn.lock b/yarn.lock index e4469ad..5eedfcf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1982,11 +1982,6 @@ ajv@^6.12.4, ajv@^6.12.5: json-schema-traverse "^0.4.1" uri-js "^4.2.2" -als-browser@^1.0.1: - version "1.0.1" - resolved "https://registry.yarnpkg.com/als-browser/-/als-browser-1.0.1.tgz#ddd9c2ac8ad2817e7d55f0d470b76aaa70f3d521" - integrity sha512-DjavKf6zf4DFPdEmgsEM474MBjFcZG/1amv2/+WHGf61kVQWqf7XEn4jvpjFS4ssQbh/pkmYThaPfQK1ERC+3g== - ansi-colors@4.1.1: version "4.1.1" resolved "https://registry.yarnpkg.com/ansi-colors/-/ansi-colors-4.1.1.tgz#cbb9ae256bf750af1eab344f229aa27fe94ba348" From 27241716bf196347a55d85d9ca71305d8fbb1a86 Mon Sep 17 00:00:00 2001 From: Martin Fleck Date: Mon, 27 Apr 2026 15:22:34 +0200 Subject: [PATCH 9/9] refactor: address tortmayr review feedback (round 3) - Rename `ActionChannel` -> `ActionQueue` (class, file, all refs) - Drop redundant comment + Java reference in `interceptPendingResponse` - Use `inSingletonScope` for the Node `ActionDispatchScope` binding --- .../src/common/actions/action-dispatcher.ts | 6 ++--- packages/server/src/common/index.ts | 2 +- ...n-channel.spec.ts => action-queue.spec.ts} | 24 ++++++++--------- .../{action-channel.ts => action-queue.ts} | 26 +++++++++---------- .../server/src/common/utils/promise-queue.ts | 2 +- packages/server/src/node/di/app-module.ts | 4 +-- 6 files changed, 30 insertions(+), 34 deletions(-) rename packages/server/src/common/utils/{action-channel.spec.ts => action-queue.spec.ts} (86%) rename packages/server/src/common/utils/{action-channel.ts => action-queue.ts} (76%) diff --git a/packages/server/src/common/actions/action-dispatcher.ts b/packages/server/src/common/actions/action-dispatcher.ts index 736ed8b..8605ef1 100644 --- a/packages/server/src/common/actions/action-dispatcher.ts +++ b/packages/server/src/common/actions/action-dispatcher.ts @@ -27,7 +27,7 @@ import { } from '@eclipse-glsp/protocol'; import { inject, injectable, postConstruct } from 'inversify'; import { ClientId } from '../di/service-identifiers'; -import { ActionChannel } from '../utils/action-channel'; +import { ActionQueue } from '../utils/action-queue'; import { GLSPServerError } from '../utils/glsp-server-error'; import { Logger } from '../utils/logger'; import { ActionHandler } from './action-handler'; @@ -170,7 +170,7 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { @inject(ActionDispatchScope) protected dispatchScope: ActionDispatchScope; - protected actionQueue = new ActionChannel(); + protected actionQueue = new ActionQueue(); protected postUpdateQueue: Action[] = []; protected readonly pendingRequests = new Map>(); @@ -366,8 +366,6 @@ export class DefaultActionDispatcher implements ActionDispatcher, Disposable { if (!ResponseAction.hasValidResponseId(action)) { return false; } - // Responses to server-initiated requests are resolved here instead of going through - // action handlers. No equivalent in the Java GLSP server implementation. const deferred = this.pendingRequests.get(action.responseId); if (deferred !== undefined) { this.pendingRequests.delete(action.responseId); diff --git a/packages/server/src/common/index.ts b/packages/server/src/common/index.ts index 265e5f5..1051a83 100644 --- a/packages/server/src/common/index.ts +++ b/packages/server/src/common/index.ts @@ -95,7 +95,7 @@ export * from './session/client-session-factory'; export * from './session/client-session-initializer'; export * from './session/client-session-listener'; export * from './session/client-session-manager'; -export * from './utils/action-channel'; +export * from './utils/action-queue'; export * from './utils/args-util'; export * from './utils/client-options-util'; export * from './utils/console-logger'; diff --git a/packages/server/src/common/utils/action-channel.spec.ts b/packages/server/src/common/utils/action-queue.spec.ts similarity index 86% rename from packages/server/src/common/utils/action-channel.spec.ts rename to packages/server/src/common/utils/action-queue.spec.ts index 0d92bc4..86ae482 100644 --- a/packages/server/src/common/utils/action-channel.spec.ts +++ b/packages/server/src/common/utils/action-queue.spec.ts @@ -15,11 +15,11 @@ ********************************************************************************/ import { expect } from 'chai'; import { expectToThrowAsync } from '../test/mock-util'; -import { ActionChannel } from './action-channel'; +import { ActionQueue } from './action-queue'; -describe('ActionChannel', () => { +describe('ActionQueue', () => { it('yields pushed items in FIFO order', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); const consumed: number[] = []; const consumer = (async (): Promise => { @@ -37,7 +37,7 @@ describe('ActionChannel', () => { }); it('resolves the push promise once the consumer resolves the entry', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); let entryResolver: (() => void) | undefined; const consumer = (async (): Promise => { @@ -57,7 +57,7 @@ describe('ActionChannel', () => { }); it('propagates reject() from the consumer back to the pushing caller', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); const consumer = (async (): Promise => { for await (const entry of channel.consume()) { @@ -72,13 +72,13 @@ describe('ActionChannel', () => { }); it('rejects push() after stop()', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); channel.stop(); - await expectToThrowAsync(() => channel.push(1), 'ActionChannel is stopped'); + await expectToThrowAsync(() => channel.push(1), 'ActionQueue is stopped'); }); it('consumer exits after stop() and drain', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); const consumed: number[] = []; const consumer = (async (): Promise => { @@ -98,7 +98,7 @@ describe('ActionChannel', () => { }); it('rejectPending() rejects all queued push() promises without stopping', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); const pushes = [channel.push(1), channel.push(2)]; expect(channel.size).to.equal(2); @@ -111,7 +111,7 @@ describe('ActionChannel', () => { }); it('size reflects the number of unconsumed entries', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); channel.push(1); channel.push(2); channel.push(3); @@ -119,13 +119,13 @@ describe('ActionChannel', () => { }); it('throws when a second consumer is started', async () => { - const channel = new ActionChannel(); + const channel = new ActionQueue(); const first = channel.consume(); // Kick off the first consumer so it registers as the active consumer. const firstStep = first.next(); const second = channel.consume(); - await expectToThrowAsync(() => second.next().then(() => undefined), 'ActionChannel supports only a single consumer'); + await expectToThrowAsync(() => second.next().then(() => undefined), 'ActionQueue supports only a single consumer'); channel.stop(); await firstStep; diff --git a/packages/server/src/common/utils/action-channel.ts b/packages/server/src/common/utils/action-queue.ts similarity index 76% rename from packages/server/src/common/utils/action-channel.ts rename to packages/server/src/common/utils/action-queue.ts index 28d2736..e0718b4 100644 --- a/packages/server/src/common/utils/action-channel.ts +++ b/packages/server/src/common/utils/action-queue.ts @@ -15,31 +15,31 @@ ********************************************************************************/ /** - * An entry yielded by {@link ActionChannel.consume}. The consumer must call either + * An entry yielded by {@link ActionQueue.consume}. The consumer must call either * `resolve()` or `reject(error)` exactly once after processing `item`. */ -export interface ActionChannelEntry { +export interface ActionQueueEntry { item: T; resolve: () => void; reject: (error: unknown) => void; } /** - * Producer/consumer channel with a single async consumer loop. Items are processed in FIFO order. + * Producer/consumer queue with a single async consumer loop. Items are processed in FIFO order. */ -export class ActionChannel { - protected queue: ActionChannelEntry[] = []; +export class ActionQueue { + protected queue: ActionQueueEntry[] = []; protected notify: (() => void) | undefined; protected stopped = false; protected consuming = false; /** * Enqueues an item. The returned promise settles when the consumer finishes processing it, - * propagating results back to the producer. Rejects immediately if the channel has been stopped. + * propagating results back to the producer. Rejects immediately if the queue has been stopped. */ push(item: T): Promise { if (this.stopped) { - return Promise.reject(new Error('ActionChannel is stopped')); + return Promise.reject(new Error('ActionQueue is stopped')); } return new Promise((resolve, reject) => { this.queue.push({ item, resolve, reject }); @@ -49,13 +49,13 @@ export class ActionChannel { /** * Yields pending entries, suspending until the next {@link push} when the queue is empty. Exits - * once the channel is stopped and the queue has been drained. + * once the queue is stopped and has been drained. * * Single-consumer: calling `consume()` a second time throws an error. */ - async *consume(): AsyncGenerator> { + async *consume(): AsyncGenerator> { if (this.consuming) { - throw new Error('ActionChannel supports only a single consumer'); + throw new Error('ActionQueue supports only a single consumer'); } this.consuming = true; try { @@ -77,7 +77,7 @@ export class ActionChannel { } /** - * Stops the channel. Further {@link push} calls reject. The consumer loop exits after + * Stops the queue. Further {@link push} calls reject. The consumer loop exits after * the remaining queued entries have been yielded (or immediately if the queue is empty). */ stop(): void { @@ -87,9 +87,9 @@ export class ActionChannel { /** * Rejects all queued entries with the given reason so producers awaiting their - * `push()` promises do not hang. Does not stop the channel. + * `push()` promises do not hang. Does not stop the queue. */ - rejectPending(reason: Error = new Error('ActionChannel cleared')): void { + rejectPending(reason: Error = new Error('ActionQueue cleared')): void { const pending = this.queue; this.queue = []; for (const entry of pending) { diff --git a/packages/server/src/common/utils/promise-queue.ts b/packages/server/src/common/utils/promise-queue.ts index b2cc064..4725ab0 100644 --- a/packages/server/src/common/utils/promise-queue.ts +++ b/packages/server/src/common/utils/promise-queue.ts @@ -32,7 +32,7 @@ export interface PromiseQueueElement { * * @deprecated Since 2.7. The `DefaultActionDispatcher` no longer uses this queue. Kept for * backwards compatibility; will be removed in a future release. New code should use - * {@link ActionChannel} or native async patterns instead. + * {@link ActionQueue} or native async patterns instead. */ export class PromiseQueue { protected queue: PromiseQueueElement[] = []; diff --git a/packages/server/src/node/di/app-module.ts b/packages/server/src/node/di/app-module.ts index 6033e58..d03e199 100644 --- a/packages/server/src/node/di/app-module.ts +++ b/packages/server/src/node/di/app-module.ts @@ -24,9 +24,7 @@ import { WinstonLogger } from './winston-logger'; export function createAppModule(options: LaunchOptions): ContainerModule { return new ContainerModule((bind, unbind, isBound, rebind) => { bind(InjectionContainer).toDynamicValue(dynamicContext => dynamicContext.container); - // Transient on purpose: kept symmetric with the browser binding, which cannot share a - // singleton across sessions. - bind(ActionDispatchScope).to(NodeActionDispatchScope); + bind(ActionDispatchScope).to(NodeActionDispatchScope).inSingletonScope(); const context = { bind, unbind, isBound, rebind }; configureWinstonLogger(context, options); });