Skip to content

Commit 52876a0

Browse files
mesh changed sdk
1 parent 094189c commit 52876a0

7 files changed

Lines changed: 66 additions & 39 deletions

File tree

node/mesh_peer.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ async function main() {
2222
await mesh.start();
2323

2424
// 3. Register Local Service (Just to be a good citizen, though we mainly act as client here for this test layout)
25-
await mesh.registerService(MY_SERVICE_NAME, {
26-
onRequestChannel: async (channel: IChannel) => {
25+
mesh.registerService(MY_SERVICE_NAME)
26+
.onRequestChannel(async (channel: IChannel) => {
2727
console.log(`[Node] Accepted inbound connection (Unexpected for this test flow, but handling...)`);
2828
channel.close();
29-
}
30-
});
29+
});
3130
console.log(`[Node] Registered '${MY_SERVICE_NAME}'`);
3231

3332
// 4. Connect to Python Peer and Start Streaming

node/src/abstract.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,14 @@ export interface IServiceHandlers {
6666
onRequestReply?: (msg: any) => Promise<any>;
6767
}
6868

69+
export interface IServiceRegistration {
70+
onRequestChannel(handler: (channel: IChannel) => void): IServiceRegistration;
71+
onRequestReply(handler: (msg: any) => Promise<any>): IServiceRegistration;
72+
}
73+
6974
export interface IMesh {
7075
start(): Promise<void>;
71-
registerService(serviceName: string, handlers: IServiceHandlers): Promise<void>;
76+
registerService(serviceName: string): IServiceRegistration;
7277
service(name: string): IServiceClient;
7378
topic(name: string): ITopic;
7479
close(): Promise<void>;

node/src/mesh.ts

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import { MeshConfig, Node } from './shared/types';
44
import { RedisServiceDiscovery } from './discovery/redis-discovery';
55
import { H2Communication } from './communication/h2-communication';
66
import { ServiceClient } from './service-client';
7-
import { IMesh, IServiceClient, ITopic, IServiceHandlers, IChannel, IServiceDiscovery, ICommunication, IPubSub } from './abstract';
7+
import { IMesh, IServiceClient, ITopic, IServiceHandlers, IChannel, IServiceDiscovery, ICommunication, IPubSub, IServiceRegistration } from './abstract';
88
import { RedisPubSub, Topic } from './communication/pubsub';
99

10-
export { MeshConfig, Node, IChannel, IServiceHandlers, IChannel as Channel, IServiceHandlers as ServiceHandlers, Topic }; // Export both for compat
10+
export { MeshConfig, Node, IChannel, IServiceHandlers, IServiceRegistration, IChannel as Channel, IServiceHandlers as ServiceHandlers, Topic }; // Export both for compat
1111
// User asked to standardize naming. I'll export renamed IChannel and IServiceHandlers.
1212
// IF old code relies on Channel/ServiceHandlers export from Mesh, I should alias them or update consumer code.
1313
// Given "standardize abstraction", ideally I use IChannel everywhere.
@@ -46,25 +46,31 @@ export class Mesh implements IMesh {
4646
await this.pubsub.start();
4747
}
4848

49-
public async registerService(serviceName: string, handlers: IServiceHandlers) {
50-
// Setup listeners
51-
if (handlers.onRequestChannel) {
52-
this.communication.listenChannel(serviceName, handlers.onRequestChannel);
53-
}
54-
if (handlers.onRequestReply) {
55-
this.communication.listenUnary(serviceName, handlers.onRequestReply);
56-
}
49+
public registerService(serviceName: string): IServiceRegistration {
50+
const registration = {
51+
onRequestChannel: (handler: (channel: IChannel) => void) => {
52+
this.communication.listenChannel(serviceName, handler);
53+
return registration;
54+
},
55+
onRequestReply: (handler: (msg: any) => Promise<any>) => {
56+
this.communication.listenUnary(serviceName, handler);
57+
return registration;
58+
}
59+
};
5760

58-
// Register with SD
61+
// Register with SD immediately
5962
const node: Node = {
6063
id: this.nodeId,
61-
host: this.config.host!, // Detected in constructor
64+
host: this.config.host!,
6265
port: this.config.port,
6366
service_name: serviceName
6467
};
6568

66-
await this.discovery.registerService(node);
67-
console.log(`[Mesh] Registered service '${serviceName}' on ${node.host}:${node.port}`);
69+
this.discovery.registerService(node)
70+
.then(() => console.log(`[Mesh] Registered service '${serviceName}' on ${node.host}:${node.port}`))
71+
.catch(err => console.error(`[Mesh] Failed to register service '${serviceName}':`, err));
72+
73+
return registration;
6874
}
6975

7076
private getLocalIP(): string {

python/mesh_peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async def receive_loop():
6767

6868
logger.info("Connection handler finished.")
6969

70-
await mesh.register_service("echo-service", {"on_request_channel": incoming_handler})
70+
mesh.register_service("echo-service").on_request_channel(incoming_handler)
7171
logger.info(f"Registered 'echo-service' and waiting for connections...")
7272

7373
# Register signal handlers

python/src/mesh/abstract.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ def get_service(self, service_name: str) -> List[Node]:
141141
"""
142142
pass
143143

144+
class IServiceRegistration(ABC):
145+
@abstractmethod
146+
def on_request_channel(self, handler: Callable[['IChannel'], Awaitable[None]]) -> 'IServiceRegistration':
147+
pass
148+
149+
@abstractmethod
150+
def on_request_reply(self, handler: Callable[[Any], Awaitable[Any]]) -> 'IServiceRegistration':
151+
pass
152+
144153
class IMesh(ABC):
145154
@abstractmethod
146155
async def start(self):
@@ -150,9 +159,9 @@ async def start(self):
150159
pass
151160

152161
@abstractmethod
153-
async def register_service(self, service_name: str, handlers: IServiceHandlers):
162+
def register_service(self, service_name: str) -> IServiceRegistration:
154163
"""
155-
Register a service provider with handlers.
164+
Register a service provider. Returns a registration object to attach handlers.
156165
"""
157166
pass
158167

python/src/mesh/mesh.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import asyncio
22
import uuid
33
import socket
4-
from typing import Optional, List, Callable, Awaitable
4+
from typing import Optional, List, Callable, Awaitable, Any
55
from mesh.shared.types import MeshConfig, Node
66
from mesh.service_discovery import RedisServiceDiscovery
77
from mesh.communication import H2Communication
88
from mesh.communication import H2Communication
99
from mesh.communication.pubsub import RedisPubSub
10-
from mesh.abstract import IMesh, IChannel, ITopic, IServiceHandlers
10+
from mesh.abstract import IMesh, IChannel, ITopic, IServiceHandlers, IServiceRegistration
1111
from mesh.service_client import ServiceClient
1212

1313
class Mesh(IMesh):
@@ -46,18 +46,24 @@ async def start(self):
4646
# Start a background task to keep the process alive or handle signals?
4747
# For now, just starts the asyncio server.
4848

49-
async def register_service(
49+
def register_service(
5050
self,
51-
service_name: str,
52-
handlers: IServiceHandlers
53-
):
54-
# 1. Setup listeners
55-
if handlers.get('on_request_channel'):
56-
self.communication.listen_channel(service_name, handlers['on_request_channel'])
57-
58-
if handlers.get('on_request_reply'):
59-
self.communication.listen_unary(service_name, handlers['on_request_reply'])
60-
51+
service_name: str
52+
) -> IServiceRegistration:
53+
# Internal registration class
54+
class ServiceRegistration(IServiceRegistration):
55+
def __init__(self, mesh, service_name):
56+
self.mesh = mesh
57+
self.service_name = service_name
58+
59+
def on_request_channel(self, handler: Callable[['IChannel'], Awaitable[None]]) -> 'IServiceRegistration':
60+
self.mesh.communication.listen_channel(self.service_name, handler)
61+
return self
62+
63+
def on_request_reply(self, handler: Callable[[Any], Awaitable[Any]]) -> 'IServiceRegistration':
64+
self.mesh.communication.listen_unary(self.service_name, handler)
65+
return self
66+
6167
# 2. Register with SD
6268
node = Node(
6369
id=self._node_id,
@@ -69,6 +75,8 @@ async def register_service(
6975
# Register with SD
7076
self.service_discovery.register_service(node)
7177
print(f"Mesh: Registered service '{service_name}' on {node.host}:{node.port} (ID: {node.id})")
78+
79+
return ServiceRegistration(self, service_name)
7280

7381

7482
def service(self, service_name: str) -> ServiceClient:

python/tests/test_mesh.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ async def handler2(channel):
6666
await mesh1.start()
6767
await mesh2.start()
6868

69-
await mesh1.register_service(service_name, {"on_request_channel": handler1})
70-
await mesh2.register_service(service_name, {"on_request_channel": handler2})
69+
mesh1.register_service(service_name).on_request_channel(handler1)
70+
mesh2.register_service(service_name).on_request_channel(handler2)
7171

7272
# Wait for binding
7373
await asyncio.sleep(0.1)
@@ -112,8 +112,8 @@ def get_service(self, service_name):
112112
# Re-register (since we swapped the class, previous registers might be lost or we just restart flow)
113113
# Actually simpler to just restart flow with correct mock
114114

115-
await mesh1.register_service(service_name, {})
116-
await mesh2.register_service(service_name, {})
115+
mesh1.register_service(service_name)
116+
mesh2.register_service(service_name)
117117

118118
# Verify SD
119119
nodes = SharedMockSD(None).get_service(service_name)

0 commit comments

Comments
 (0)