Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 148 additions & 58 deletions crates/bindings-typescript/src/sdk/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ export type ConnectionState = {

type Listener = () => void;

export const CONNECTION_MANAGER_RECONNECT_DELAY_MS = 1000;

type ManagedConnection = {
connection?: DbConnectionImpl<any>;
builder?: DbConnectionBuilder<any>;
refCount: number;
state: ConnectionState;
listeners: Set<Listener>;
pendingRelease: ReturnType<typeof setTimeout> | null;
reconnectTimer: ReturnType<typeof setTimeout> | null;
onConnect?: (conn: DbConnectionImpl<any>) => void;
onDisconnect?: (ctx: ErrorContextInterface<any>, error?: Error) => void;
onConnectError?: (ctx: ErrorContextInterface<any>, error: Error) => void;
Expand Down Expand Up @@ -91,10 +95,12 @@ class ConnectionManagerImpl {
}
const managed: ManagedConnection = {
connection: undefined,
builder: undefined,
refCount: 0,
state: defaultState(),
listeners: new Set(),
pendingRelease: null,
reconnectTimer: null,
};
this.#connections.set(key, managed);
return managed;
Expand All @@ -106,47 +112,24 @@ class ConnectionManagerImpl {
}
}

/**
* Retains a connection, incrementing its reference count.
* Creates the connection on first call; returns existing connection on subsequent calls.
* Cancels any pending release if the connection was about to be cleaned up.
*
* @param key - Unique identifier for the connection (use getKey to generate)
* @param builder - Connection builder to create the connection if needed
* @returns The managed connection instance
*/
retain<T extends DbConnectionImpl<any>>(
key: string,
builder: DbConnectionBuilder<T>
): T {
const managed = this.#ensureEntry(key);
if (managed.pendingRelease) {
clearTimeout(managed.pendingRelease);
managed.pendingRelease = null;
}
managed.refCount += 1;
if (managed.connection) {
return managed.connection as T;
}

const connection = builder.build();
managed.connection = connection;

const updateState = (updates: Partial<ConnectionState>) => {
managed.state = { ...managed.state, ...updates };
this.#notify(managed);
};
#updateState(
managed: ManagedConnection,
updates: Partial<ConnectionState>
): void {
managed.state = { ...managed.state, ...updates };
this.#notify(managed);
}

updateState({
isActive: connection.isActive,
identity: connection.identity,
token: connection.token,
connectionId: connection.connectionId,
connectionError: undefined,
});
#ensureCallbacks(managed: ManagedConnection): void {
if (managed.onConnect) {
return;
}

managed.onConnect = conn => {
updateState({
if (conn !== managed.connection) {
return;
}
this.#updateState(managed, {
isActive: conn.isActive,
identity: conn.identity,
token: conn.token,
Expand All @@ -156,26 +139,136 @@ class ConnectionManagerImpl {
};

managed.onDisconnect = (ctx, error) => {
updateState({
isActive: ctx.isActive,
if (ctx !== managed.connection) {
return;
}
this.#updateState(managed, {
isActive: false,
connectionError: error ?? undefined,
});
this.#scheduleReconnect(managed);
};

managed.onConnectError = (ctx, error) => {
updateState({
isActive: ctx.isActive,
if (ctx !== managed.connection) {
return;
}
this.#updateState(managed, {
isActive: false,
connectionError: error,
});
this.#scheduleReconnect(managed);
};
}

#attachCallbacks<T extends DbConnectionImpl<any>>(
managed: ManagedConnection,
builder: DbConnectionBuilder<T>
): void {
this.#ensureCallbacks(managed);
builder.onConnect(managed.onConnect!);
builder.onDisconnect(managed.onDisconnect!);
builder.onConnectError(managed.onConnectError!);
}

#detachCallbacks(
managed: ManagedConnection,
connection: DbConnectionImpl<any>
): void {
if (managed.onConnect) {
connection.removeOnConnect(managed.onConnect as any);
}
if (managed.onDisconnect) {
connection.removeOnDisconnect(managed.onDisconnect as any);
}
if (managed.onConnectError) {
connection.removeOnConnectError(managed.onConnectError as any);
}
}

builder.onConnect(managed.onConnect);
builder.onDisconnect(managed.onDisconnect);
builder.onConnectError(managed.onConnectError);
#buildManagedConnection<T extends DbConnectionImpl<any>>(
managed: ManagedConnection,
builder: DbConnectionBuilder<T>
): T {
managed.builder = builder;
const connection = builder.build();
managed.connection = connection;
this.#attachCallbacks(managed, builder);

this.#updateState(managed, {
isActive: connection.isActive,
identity: connection.identity,
token: connection.token,
connectionId: connection.connectionId,
connectionError: undefined,
});

return connection as T;
}

#scheduleReconnect(managed: ManagedConnection): void {
if (
managed.refCount <= 0 ||
managed.pendingRelease ||
managed.reconnectTimer ||
!managed.builder
) {
return;
}

const connection = managed.connection;
if (connection) {
this.#detachCallbacks(managed, connection);
}
managed.connection = undefined;
managed.reconnectTimer = setTimeout(() => {
managed.reconnectTimer = null;
if (
managed.refCount <= 0 ||
managed.pendingRelease ||
managed.connection ||
!managed.builder
) {
return;
}

this.#buildManagedConnection(managed, managed.builder);
}, CONNECTION_MANAGER_RECONNECT_DELAY_MS);
}

/**
* Retains a connection, incrementing its reference count.
* Creates the connection on first call; returns existing connection on subsequent calls.
* Cancels any pending release if the connection was about to be cleaned up.
*
* @param key - Unique identifier for the connection (use getKey to generate)
* @param builder - Connection builder to create the connection if needed
* @returns The managed connection instance
*/
retain<T extends DbConnectionImpl<any>>(
key: string,
builder: DbConnectionBuilder<T>
): T {
const managed = this.#ensureEntry(key);
if (managed.pendingRelease) {
clearTimeout(managed.pendingRelease);
managed.pendingRelease = null;
}
if (managed.reconnectTimer) {
clearTimeout(managed.reconnectTimer);
managed.reconnectTimer = null;
}

managed.refCount += 1;
managed.builder = builder;

if (managed.connection) {
return managed.connection as T;
}

return this.#buildManagedConnection(managed, builder);
}

release(key: string): void {
const managed = this.#connections.get(key);
if (!managed) {
Expand All @@ -187,24 +280,21 @@ class ConnectionManagerImpl {
return;
}

if (managed.reconnectTimer) {
clearTimeout(managed.reconnectTimer);
managed.reconnectTimer = null;
}

managed.pendingRelease = setTimeout(() => {
managed.pendingRelease = null;
if (managed.refCount > 0) {
return;
}
if (managed.connection) {
if (managed.onConnect) {
managed.connection.removeOnConnect(managed.onConnect as any);
}
if (managed.onDisconnect) {
managed.connection.removeOnDisconnect(managed.onDisconnect as any);
}
if (managed.onConnectError) {
managed.connection.removeOnConnectError(
managed.onConnectError as any
);
}
managed.connection.disconnect();
const connection = managed.connection;
managed.connection = undefined;
if (connection) {
this.#detachCallbacks(managed, connection);
connection.disconnect();
}
this.#connections.delete(key);
}, 0);
Expand Down
4 changes: 2 additions & 2 deletions crates/bindings-typescript/src/sdk/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,12 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
this.ws = v;

this.ws.onclose = () => {
this.#emitter.emit('disconnect', this);
this.isActive = false;
this.#emitter.emit('disconnect', this);
};
this.ws.onerror = (e: ErrorEvent) => {
this.#emitter.emit('connectError', this, e);
this.isActive = false;
this.#emitter.emit('connectError', this, e);
};
this.ws.onopen = this.#handleOnOpen.bind(this);
this.ws.onmessage = this.#handleOnMessage.bind(this);
Expand Down
9 changes: 8 additions & 1 deletion crates/bindings-typescript/src/sdk/websocket_test_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class WebsocketTestAdapter implements WebSocketAdapter {
#onclose: (ev: CloseEvent) => void = () => {};
#onopen: () => void = () => {};
#onmessage: (msg: { data: Uint8Array }) => void = () => {};
#onerror: (msg: ErrorEvent) => void = () => {};

constructor() {
this.messageQueue = [];
Expand All @@ -39,7 +40,13 @@ class WebsocketTestAdapter implements WebSocketAdapter {
this.#onmessage = handler;
}

set onerror(_handler: (msg: ErrorEvent) => void) {}
set onerror(handler: (msg: ErrorEvent) => void) {
this.#onerror = handler;
}

error(error: Error): void {
this.#onerror(error as unknown as ErrorEvent);
}

send(message: Uint8Array<ArrayBuffer>): void {
const rawMessage = message.slice();
Expand Down
Loading
Loading