Skip to content

Commit 96a844b

Browse files
Copilothotlong
andcommitted
feat(json-rpc): Add session management, progress notifications, and method call chaining
- Implement session management with ID passing and timeout - Add session.create, session.get, session.set, session.destroy methods - Implement progress notifications via Server-Sent Events (SSE) - Add GET /rpc/progress endpoint for SSE connections - Support method call chaining in batch requests - Implement result reference syntax ($1.result.id) - Execute batch requests in dependency order with chaining enabled - Add session storage with configurable timeout (default: 30 minutes) - Clean up sessions and SSE clients on timeout/disconnect - Update version to 0.2.0 Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
1 parent 2c3143c commit 96a844b

1 file changed

Lines changed: 271 additions & 11 deletions

File tree

packages/protocols/json-rpc/src/index.ts

Lines changed: 271 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,37 @@ export interface JSONRPCPluginConfig {
2121
enableCORS?: boolean;
2222
/** Enable introspection methods */
2323
enableIntrospection?: boolean;
24+
/** Enable session management */
25+
enableSessions?: boolean;
26+
/** Session timeout in milliseconds (default: 30 minutes) */
27+
sessionTimeout?: number;
28+
/** Enable progress notifications via SSE */
29+
enableProgress?: boolean;
30+
/** Enable method call chaining in batch requests */
31+
enableChaining?: boolean;
32+
}
33+
34+
/**
35+
* Session data
36+
*/
37+
interface Session {
38+
id: string;
39+
data: Record<string, any>;
40+
lastAccess: number;
41+
timeout?: NodeJS.Timeout;
42+
}
43+
44+
/**
45+
* Progress notification
46+
*/
47+
interface ProgressNotification {
48+
method: string;
49+
params: {
50+
id: string;
51+
progress: number;
52+
total: number;
53+
message?: string;
54+
};
2455
}
2556

2657
/**
@@ -62,9 +93,12 @@ interface MethodSignature {
6293
*
6394
* Key Features:
6495
* - Full JSON-RPC 2.0 specification compliance
65-
* - Batch request support
96+
* - Batch request support with call chaining
6697
* - Notification support (requests without id)
6798
* - Built-in introspection methods (system.listMethods, system.describe)
99+
* - Session management for stateful operations
100+
* - Progress notifications via Server-Sent Events (SSE)
101+
* - Method call chaining with result references
68102
* - CRUD operations mapped to RPC methods
69103
* - Named and positional parameter support
70104
* - No direct database access - all operations through ObjectStackProtocolImplementation
@@ -79,6 +113,10 @@ interface MethodSignature {
79113
* - metadata.list() - List all objects
80114
* - metadata.get(objectName) - Get object metadata
81115
* - action.execute(actionName, params) - Execute custom action
116+
* - session.create() - Create a new session (if sessions enabled)
117+
* - session.get(key) - Get session value (if sessions enabled)
118+
* - session.set(key, value) - Set session value (if sessions enabled)
119+
* - session.destroy() - Destroy current session (if sessions enabled)
82120
* - system.listMethods() - List available methods (if introspection enabled)
83121
* - system.describe(method) - Describe method signature (if introspection enabled)
84122
*
@@ -88,7 +126,13 @@ interface MethodSignature {
88126
* import { JSONRPCPlugin } from '@objectql/protocol-json-rpc';
89127
*
90128
* const kernel = new ObjectKernel([
91-
* new JSONRPCPlugin({ port: 9000, basePath: '/rpc' })
129+
* new JSONRPCPlugin({
130+
* port: 9000,
131+
* basePath: '/rpc',
132+
* enableSessions: true,
133+
* enableProgress: true,
134+
* enableChaining: true
135+
* })
92136
* ]);
93137
* await kernel.start();
94138
*
@@ -99,24 +143,37 @@ interface MethodSignature {
99143
* // Client request (named):
100144
* // POST /rpc
101145
* // {"jsonrpc":"2.0","method":"object.find","params":{"objectName":"users","query":{"where":{"active":true}}},"id":1}
146+
*
147+
* // Batch request with chaining:
148+
* // [{"jsonrpc":"2.0","method":"object.create","params":["users",{"name":"John"}],"id":1},
149+
* // {"jsonrpc":"2.0","method":"object.update","params":["users","$1.result.id",{"active":true}],"id":2}]
150+
*
151+
* // Progress notifications (SSE):
152+
* // GET /rpc/progress?session=<session-id>
102153
* ```
103154
*/
104155
export class JSONRPCPlugin implements RuntimePlugin {
105156
name = '@objectql/protocol-json-rpc';
106-
version = '0.1.0';
157+
version = '0.2.0';
107158

108159
private server?: Server;
109160
private engine?: any;
110161
private config: Required<JSONRPCPluginConfig>;
111162
private methods: Map<string, Function>;
112163
private methodSignatures: Map<string, MethodSignature>;
164+
private sessions: Map<string, Session> = new Map();
165+
private progressClients: Map<string, ServerResponse> = new Map();
113166

114167
constructor(config: JSONRPCPluginConfig = {}) {
115168
this.config = {
116169
port: config.port || 9000,
117170
basePath: config.basePath || '/rpc',
118171
enableCORS: config.enableCORS !== false,
119-
enableIntrospection: config.enableIntrospection !== false
172+
enableIntrospection: config.enableIntrospection !== false,
173+
enableSessions: config.enableSessions !== false,
174+
sessionTimeout: config.sessionTimeout || 30 * 60 * 1000, // 30 minutes
175+
enableProgress: config.enableProgress !== false,
176+
enableChaining: config.enableChaining !== false
120177
};
121178

122179
this.methods = new Map();
@@ -444,6 +501,55 @@ export class JSONRPCPlugin implements RuntimePlugin {
444501
description: 'List all registered actions'
445502
});
446503

504+
// Session methods (if enabled)
505+
if (this.config.enableSessions) {
506+
this.methods.set('session.create', async () => {
507+
const sessionId = this.generateSessionId();
508+
this.createSession(sessionId);
509+
return { sessionId };
510+
});
511+
this.methodSignatures.set('session.create', {
512+
params: [],
513+
description: 'Create a new session'
514+
});
515+
516+
this.methods.set('session.get', async (sessionId: string, key: string) => {
517+
const session = this.sessions.get(sessionId);
518+
if (!session) {
519+
throw new Error('Session not found or expired');
520+
}
521+
this.updateSessionAccess(sessionId);
522+
return session.data[key];
523+
});
524+
this.methodSignatures.set('session.get', {
525+
params: ['sessionId', 'key'],
526+
description: 'Get value from session'
527+
});
528+
529+
this.methods.set('session.set', async (sessionId: string, key: string, value: any) => {
530+
const session = this.sessions.get(sessionId);
531+
if (!session) {
532+
throw new Error('Session not found or expired');
533+
}
534+
this.updateSessionAccess(sessionId);
535+
session.data[key] = value;
536+
return { success: true };
537+
});
538+
this.methodSignatures.set('session.set', {
539+
params: ['sessionId', 'key', 'value'],
540+
description: 'Set value in session'
541+
});
542+
543+
this.methods.set('session.destroy', async (sessionId: string) => {
544+
this.destroySession(sessionId);
545+
return { success: true };
546+
});
547+
this.methodSignatures.set('session.destroy', {
548+
params: ['sessionId'],
549+
description: 'Destroy a session'
550+
});
551+
}
552+
447553
// Introspection methods (if enabled)
448554
if (this.config.enableIntrospection) {
449555
this.methods.set('system.listMethods', async () => {
@@ -539,7 +645,7 @@ export class JSONRPCPlugin implements RuntimePlugin {
539645
// Enable CORS if configured
540646
if (this.config.enableCORS) {
541647
res.setHeader('Access-Control-Allow-Origin', '*');
542-
res.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS');
648+
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
543649
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
544650

545651
if (req.method === 'OPTIONS') {
@@ -558,7 +664,32 @@ export class JSONRPCPlugin implements RuntimePlugin {
558664
return;
559665
}
560666

561-
// Only accept POST requests
667+
// Handle progress SSE endpoint (GET /rpc/progress?session=<id>)
668+
if (this.config.enableProgress && req.method === 'GET' && url.includes('/progress')) {
669+
const sessionId = new URL(url, 'http://localhost').searchParams.get('session');
670+
if (sessionId) {
671+
// Setup SSE
672+
res.writeHead(200, {
673+
'Content-Type': 'text/event-stream',
674+
'Cache-Control': 'no-cache',
675+
'Connection': 'keep-alive'
676+
});
677+
678+
this.progressClients.set(sessionId, res);
679+
680+
// Send initial connection message
681+
res.write('data: {"type":"connected"}\n\n');
682+
683+
// Cleanup on close
684+
req.on('close', () => {
685+
this.progressClients.delete(sessionId);
686+
});
687+
688+
return;
689+
}
690+
}
691+
692+
// Only accept POST requests for RPC
562693
if (req.method !== 'POST') {
563694
this.sendError(res, null, -32600, 'Invalid Request: Method must be POST');
564695
return;
@@ -567,12 +698,17 @@ export class JSONRPCPlugin implements RuntimePlugin {
567698
try {
568699
const body = await this.readBody(req);
569700

570-
// Handle batch requests
701+
// Handle batch requests with optional chaining
571702
if (Array.isArray(body)) {
572-
const responses = await Promise.all(
573-
body.map(request => this.processRequest(request))
574-
);
575-
this.sendJSON(res, 200, responses);
703+
if (this.config.enableChaining) {
704+
const responses = await this.processBatchWithChaining(body);
705+
this.sendJSON(res, 200, responses);
706+
} else {
707+
const responses = await Promise.all(
708+
body.map(request => this.processRequest(request))
709+
);
710+
this.sendJSON(res, 200, responses);
711+
}
576712
} else {
577713
const response = await this.processRequest(body);
578714
// Don't send response for notifications
@@ -659,6 +795,35 @@ export class JSONRPCPlugin implements RuntimePlugin {
659795
}
660796
}
661797

798+
/**
799+
* Process batch requests with call chaining support
800+
*/
801+
private async processBatchWithChaining(requests: any[]): Promise<JSONRPCResponse[]> {
802+
const results = new Map<number | string, any>();
803+
const responses: JSONRPCResponse[] = [];
804+
805+
for (const request of requests) {
806+
// Resolve parameter references
807+
if (request.params) {
808+
request.params = this.resolveReferences(request.params, results);
809+
}
810+
811+
// Process request
812+
const response = await this.processRequest(request);
813+
814+
// Store result for future references
815+
if (response && request.id !== undefined) {
816+
results.set(request.id, response);
817+
}
818+
819+
if (response) {
820+
responses.push(response);
821+
}
822+
}
823+
824+
return responses;
825+
}
826+
662827
/**
663828
* Create JSON-RPC error response
664829
*/
@@ -711,4 +876,99 @@ export class JSONRPCPlugin implements RuntimePlugin {
711876
private sendError(res: ServerResponse, id: any, code: number, message: string): void {
712877
this.sendJSON(res, 200, this.createErrorResponse(id, code, message));
713878
}
879+
880+
/**
881+
* Generate a unique session ID
882+
*/
883+
private generateSessionId(): string {
884+
return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
885+
}
886+
887+
/**
888+
* Create a new session
889+
*/
890+
private createSession(sessionId: string): void {
891+
const session: Session = {
892+
id: sessionId,
893+
data: {},
894+
lastAccess: Date.now(),
895+
timeout: setTimeout(() => {
896+
this.destroySession(sessionId);
897+
}, this.config.sessionTimeout)
898+
};
899+
900+
this.sessions.set(sessionId, session);
901+
}
902+
903+
/**
904+
* Update session access time and reset timeout
905+
*/
906+
private updateSessionAccess(sessionId: string): void {
907+
const session = this.sessions.get(sessionId);
908+
if (!session) return;
909+
910+
session.lastAccess = Date.now();
911+
912+
// Reset timeout
913+
if (session.timeout) {
914+
clearTimeout(session.timeout);
915+
}
916+
session.timeout = setTimeout(() => {
917+
this.destroySession(sessionId);
918+
}, this.config.sessionTimeout);
919+
}
920+
921+
/**
922+
* Destroy a session
923+
*/
924+
private destroySession(sessionId: string): void {
925+
const session = this.sessions.get(sessionId);
926+
if (session && session.timeout) {
927+
clearTimeout(session.timeout);
928+
}
929+
this.sessions.delete(sessionId);
930+
}
931+
932+
/**
933+
* Send progress notification to SSE clients
934+
*/
935+
private sendProgress(sessionId: string, progress: ProgressNotification): void {
936+
const client = this.progressClients.get(sessionId);
937+
if (client) {
938+
client.write(`data: ${JSON.stringify(progress)}\n\n`);
939+
}
940+
}
941+
942+
/**
943+
* Resolve result references in batch requests (e.g., $1.result.id)
944+
*/
945+
private resolveReferences(params: any, results: Map<number | string, any>): any {
946+
if (typeof params === 'string' && params.startsWith('$')) {
947+
// Parse reference: $1.result.id
948+
const match = params.match(/^\$(\d+)\.(.+)$/);
949+
if (match) {
950+
const [, refId, path] = match;
951+
const result = results.get(parseInt(refId));
952+
if (result) {
953+
// Navigate path: result.id
954+
const parts = path.split('.');
955+
let value = result;
956+
for (const part of parts) {
957+
value = value?.[part];
958+
}
959+
return value;
960+
}
961+
}
962+
} else if (Array.isArray(params)) {
963+
return params.map(p => this.resolveReferences(p, results));
964+
} else if (typeof params === 'object' && params !== null) {
965+
const resolved: any = {};
966+
for (const [key, value] of Object.entries(params)) {
967+
resolved[key] = this.resolveReferences(value, results);
968+
}
969+
return resolved;
970+
}
971+
972+
return params;
973+
}
714974
}

0 commit comments

Comments
 (0)