Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions packages/statement-store/src/session/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ import { StatementData } from './scale/statementData.js';
import type { StatementProver } from './statementProver.js';
import type { Filter, Message, ResponseMessage, Session } from './types.js';

declare global {
/**
* Optional hook a host application can install to attach human-readable
* labels to statement-store session topics (e.g. `peer-incoming`,
* `self-outgoing`). Called from within {@link createSession} when the
* session opens its first subscription, so a custom statement-store
* adapter can render log lines like `sub#1[peer-incoming]` instead of
* opaque topic hex. Undefined by default — host apps that don't care
* about diagnostics can ignore it.
*/
// eslint-disable-next-line no-var
var __registerStatementStoreTopicLabel: ((topic: unknown, label: string) => void) | undefined;
}

export type SessionParams = {
localAccount: LocalSessionAccount;
remoteAccount: RemoteSessionAccount;
Expand Down Expand Up @@ -165,6 +179,9 @@ export function createSession({
deliverStatementData(statementData);
} else if (statementData.tag === 'response') {
if (state.outgoingRequest?.requestId !== statementData.value.requestId) return;
console.info(
`[StatementStore][session] response received — ${state.outgoingRequest.tokens.length} pending message(s) in batch will be resolved`,
);
const responseMessage: ResponseMessage = {
type: 'response',
localId: statementData.value.requestId,
Expand All @@ -189,16 +206,23 @@ export function createSession({
if (state.outgoingRequest === null) {
const requestId = nanoid();
state.outgoingRequest = { requestId, messages: [encoded], tokens: [token] };
console.info(`[StatementStore][session] outgoing batch START — 1 message (no pending)`);
encodeAndSubmitRequest(requestId, state.outgoingRequest.messages);
} else {
const currentTotal = state.outgoingRequest.messages.reduce((s, m) => s + m.length, 0);
if (currentTotal + encoded.length <= maxRequestSize) {
state.outgoingRequest.messages.push(encoded);
state.outgoingRequest.tokens.push(token);
state.outgoingRequest.requestId = nanoid();
console.info(
`[StatementStore][session] outgoing batch APPEND — ${state.outgoingRequest.messages.length} messages now (previous batch had no response yet)`,
);
encodeAndSubmitRequest(state.outgoingRequest.requestId, state.outgoingRequest.messages);
} else {
state.messageQueue.push({ encoded, token });
console.info(
`[StatementStore][session] outgoing batch FULL — message queued, ${state.messageQueue.length} in queue`,
);
}
}
}
Expand All @@ -215,6 +239,14 @@ export function createSession({

function ensureStoreSubscription(): void {
if (storeUnsub) return;

// Optional consumer-side hook: register human-readable labels for the
// session's topics so any custom statement-store wrapper can render
// logs like `sub#1[peer-incoming]` instead of opaque hex. The hook is
// a global no-op unless the host app has installed it.
globalThis.__registerStatementStoreTopicLabel?.(incomingSessionId, 'peer-incoming');
globalThis.__registerStatementStoreTopicLabel?.(outgoingSessionId, 'self-outgoing');

storeUnsub = statementStore.subscribeStatements({ matchAll: [incomingSessionId] }, page => {
for (const statement of page.statements) {
processIncomingStatement(statement);
Expand Down