Skip to content

Commit 3dad45f

Browse files
removed message class, can send any via channel
1 parent 2f1cc31 commit 3dad45f

13 files changed

Lines changed: 56 additions & 65 deletions

File tree

node/mesh_peer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ async function main() {
4747
const sendLoop = async () => {
4848
let counter = 0;
4949
while (true) {
50-
const payload = `Message from Node #${counter}`;
51-
await channel.send({ function_name: "stream", payload });
52-
console.log(`[Node] Sent: ${payload}`);
50+
const payload = { text: `Message from Node #${counter}`, count: counter };
51+
await channel.send(payload);
52+
console.log(`[Node] Sent:`, payload);
5353
counter++;
5454
await new Promise(r => setTimeout(r, 1000)); // Send every 1s
5555
}

node/src/abstract.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { Node, Message } from './shared/types';
1+
import { Node } from './shared/types';
22
// Removed import { Channel } from './communication/h2-communication' to avoid circular dependency
33
// Channel is now defined here.
44
// User requested interfaces start with I.
55
export interface IChannel {
6-
send(message: Message): Promise<void>;
7-
receive(): Promise<Message>;
6+
send(message: any): Promise<void>;
7+
receive(): Promise<any>;
88
close(): Promise<void>;
99
// onClose is specific to H2 implementation? Python abstract Channel doesn't have it.
1010
// Python abstract Channel has: send, receive, close.
@@ -14,7 +14,7 @@ export interface IChannel {
1414
// "but python abstraction has more interfaces than node"
1515
// I will include onClose as it is useful for Node.
1616
onClose?(handler: () => void): void;
17-
[Symbol.asyncIterator](): AsyncIterator<Message>;
17+
[Symbol.asyncIterator](): AsyncIterator<any>;
1818
}
1919

2020
export interface ICommunication {
@@ -27,8 +27,8 @@ export interface ICommunication {
2727
listenChannel(serviceName: string, callback: (channel: IChannel) => void): void;
2828

2929
// Unary support
30-
sendUnary(node: Node, message: Message): Promise<Message>;
31-
listenUnary(serviceName: string, callback: (msg: Message) => Promise<Message>): void;
30+
sendUnary(node: Node, message: any): Promise<any>;
31+
listenUnary(serviceName: string, callback: (msg: any) => Promise<any>): void;
3232
}
3333

3434
export interface IServiceDiscovery {
@@ -38,7 +38,7 @@ export interface IServiceDiscovery {
3838
}
3939

4040
export interface IServiceClient {
41-
requestReply(sessionId: string, message: Message): Promise<Message>;
41+
requestReply(sessionId: string, message: any): Promise<any>;
4242
requestChannel(sessionId: string): Promise<IChannel>;
4343
}
4444

@@ -63,7 +63,7 @@ export interface IPubSub {
6363

6464
export interface IServiceHandlers {
6565
onRequestChannel?: (channel: IChannel) => void;
66-
onRequestReply?: (msg: Message) => Promise<Message>;
66+
onRequestReply?: (msg: any) => Promise<any>;
6767
}
6868

6969
export interface IMesh {

node/src/communication/h2-communication.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as http2 from 'http2';
22
import { EventEmitter } from 'events';
3-
import { Node, MeshConfig, Message } from '../shared/types';
3+
import { Node, MeshConfig } from '../shared/types';
44

55
import { IChannel, ICommunication } from '../abstract';
66

@@ -9,8 +9,8 @@ import { IChannel, ICommunication } from '../abstract';
99
export class H2Channel implements IChannel {
1010
private stream: http2.ClientHttp2Stream | http2.ServerHttp2Stream;
1111
// We need a queue for received messages because receive() calls might not align with data events
12-
private receiveQueue: Message[] = [];
13-
private receiveResolvers: Array<{ resolve: (msg: Message) => void, reject: (err: Error) => void }> = [];
12+
private receiveQueue: any[] = [];
13+
private receiveResolvers: Array<{ resolve: (msg: any) => void, reject: (err: Error) => void }> = [];
1414
private buffer: Buffer = Buffer.alloc(0);
1515
private isClosed: boolean = false;
1616
private closeHandlers: (() => void)[] = [];
@@ -58,7 +58,7 @@ export class H2Channel implements IChannel {
5858

5959
try {
6060
const msgString = payloadBuf.toString('utf8');
61-
const msg: Message = JSON.parse(msgString);
61+
const msg = JSON.parse(msgString);
6262
this._pushMessage(msg);
6363
} catch (e) {
6464
console.error("Failed to parse message", e);
@@ -69,7 +69,7 @@ export class H2Channel implements IChannel {
6969
}
7070
}
7171

72-
private _pushMessage(msg: Message) {
72+
private _pushMessage(msg: any) {
7373
if (this.receiveResolvers.length > 0) {
7474
const { resolve } = this.receiveResolvers.shift()!;
7575
resolve(msg);
@@ -78,7 +78,7 @@ export class H2Channel implements IChannel {
7878
}
7979
}
8080

81-
public async send(message: Message): Promise<void> {
81+
public async send(message: any): Promise<void> {
8282
if (this.isClosed) {
8383
throw new Error("Channel closed");
8484
}
@@ -95,7 +95,7 @@ export class H2Channel implements IChannel {
9595
});
9696
}
9797

98-
public async receive(): Promise<Message> {
98+
public async receive(): Promise<any> {
9999
if (this.receiveQueue.length > 0) {
100100
return this.receiveQueue.shift()!;
101101
}
@@ -122,7 +122,7 @@ export class H2Channel implements IChannel {
122122
}
123123
}
124124

125-
public async *[Symbol.asyncIterator](): AsyncIterator<Message> {
125+
public async *[Symbol.asyncIterator](): AsyncIterator<any> {
126126
while (!this.isClosed) {
127127
try {
128128
const msg = await this.receive();
@@ -141,7 +141,7 @@ export class H2Communication extends EventEmitter implements ICommunication {
141141
private config: MeshConfig;
142142
private server?: http2.Http2Server;
143143
private channelListeners: { [service: string]: (channel: IChannel) => void } = {};
144-
private unaryListeners: { [service: string]: (msg: Message) => Promise<Message> } = {};
144+
private unaryListeners: { [service: string]: (msg: any) => Promise<any> } = {};
145145

146146
constructor(config: MeshConfig) {
147147
super();
@@ -213,7 +213,7 @@ export class H2Communication extends EventEmitter implements ICommunication {
213213
this.channelListeners[serviceName] = callback;
214214
}
215215

216-
public listenUnary(serviceName: string, callback: (msg: Message) => Promise<Message>) {
216+
public listenUnary(serviceName: string, callback: (msg: any) => Promise<any>) {
217217
this.unaryListeners[serviceName] = callback;
218218
}
219219

@@ -233,7 +233,7 @@ export class H2Communication extends EventEmitter implements ICommunication {
233233
});
234234
}
235235

236-
public async sendUnary(node: Node, message: Message): Promise<Message> {
236+
public async sendUnary(node: Node, message: any): Promise<any> {
237237
return new Promise((resolve, reject) => {
238238
const client = http2.connect(`http://${node.host}:${node.port}`);
239239
client.on('error', (err) => reject(err));

node/src/mesh.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { v4 as uuidv4 } from 'uuid';
22
import * as os from 'os';
3-
import { MeshConfig, Node, Message } from './shared/types';
3+
import { MeshConfig, Node } from './shared/types';
44
import { RedisServiceDiscovery } from './discovery/redis-discovery';
55
import { H2Communication } from './communication/h2-communication';
66
import { ServiceClient } from './service-client';
77
import { IMesh, IServiceClient, ITopic, IServiceHandlers, IChannel, IServiceDiscovery, ICommunication, IPubSub } from './abstract';
88
import { RedisPubSub, Topic } from './communication/pubsub';
99

10-
export { MeshConfig, Node, Message, IChannel, IServiceHandlers, IChannel as Channel, IServiceHandlers as ServiceHandlers, Topic }; // Export both for compat
10+
export { MeshConfig, Node, IChannel, IServiceHandlers, IChannel as Channel, IServiceHandlers as ServiceHandlers, Topic }; // Export both for compat
1111
// User asked to standardize naming. I'll export renamed IChannel and IServiceHandlers.
1212
// IF old code relies on Channel/ServiceHandlers export from Mesh, I should alias them or update consumer code.
1313
// Given "standardize abstraction", ideally I use IChannel everywhere.

node/src/service-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Message } from './shared/types';
1+
import { Node } from './shared/types';
22
import { IChannel, ICommunication, IServiceDiscovery } from './abstract';
33
import { ConsistentHash } from './shared/hashing';
44
import { IServiceClient } from './abstract';
@@ -10,7 +10,7 @@ export class ServiceClient implements IServiceClient {
1010
private communication: ICommunication
1111
) { }
1212

13-
public async requestReply(sessionId: string, message: Message): Promise<Message> {
13+
public async requestReply(sessionId: string, message: any): Promise<any> {
1414
const nodes = await this.discovery.getService(this.serviceName);
1515
if (nodes.length === 0) {
1616
throw new Error(`No nodes found for service '${this.serviceName}'`);

node/src/shared/types.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ export interface Node {
55
service_name: string;
66
}
77

8-
export interface Message {
9-
function_name: string;
10-
payload: any;
11-
}
8+
129

1310
// Configuration Interfaces
1411
export interface RedisConfig {

python/mesh_peer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import random
44
from mesh_lib import Mesh
5-
from mesh_lib.shared.types import MeshConfig, ServiceDiscoveryConfig, Message
5+
from mesh_lib.shared.types import MeshConfig, ServiceDiscoveryConfig
66

77
# --- Configuration & Setup ---
88
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [Python] - %(message)s')
@@ -37,7 +37,7 @@ async def send_loop():
3737
try:
3838
while True:
3939
msg_payload = f"Message from Python #{counter}"
40-
out_msg = Message(function_name="stream", payload=msg_payload)
40+
out_msg = {"text": msg_payload, "count": counter}
4141
await channel.send(out_msg)
4242
logger.info(f"Sent: {msg_payload}")
4343
counter += 1
@@ -49,7 +49,7 @@ async def send_loop():
4949
async def receive_loop():
5050
try:
5151
async for msg in channel:
52-
logger.info(f"Received: {msg.payload}")
52+
logger.info(f"Received: {msg}")
5353
except Exception as e:
5454
logger.warning(f"Receive loop ended: {e}")
5555

python/src/mesh_lib/abstract.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from abc import ABC, abstractmethod
22
from typing import Callable, Awaitable, List, Optional, Any, TypedDict, Union
3-
from mesh_lib.shared.types import Message, Node
3+
from mesh_lib.shared.types import Node
44

55
class IServiceHandlers(TypedDict, total=False):
66
on_request_channel: Optional[Callable[['IChannel'], Awaitable[None]]]
7-
on_request_reply: Optional[Callable[[Message], Awaitable[Message]]]
7+
on_request_reply: Optional[Callable[[Any], Awaitable[Any]]]
88

99
class IServiceClient(ABC):
1010
@abstractmethod
11-
async def request_reply(self, session_id: str, message: Message) -> Message:
11+
async def request_reply(self, session_id: str, message: Any) -> Any:
1212
pass
1313

1414
@abstractmethod
@@ -68,14 +68,14 @@ async def close(self):
6868

6969
class IChannel(ABC):
7070
@abstractmethod
71-
async def send(self, message: Message):
71+
async def send(self, message: Any):
7272
"""
7373
Send a message over this channel.
7474
"""
7575
pass
7676

7777
@abstractmethod
78-
async def receive(self) -> Message:
78+
async def receive(self) -> Any:
7979
"""
8080
Receive the next message from this channel.
8181
"""

python/src/mesh_lib/communication/h2_communication.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import json
3-
from typing import Callable, Awaitable, Dict, List, Union
4-
from mesh_lib.shared.types import Message, MeshConfig, Node
3+
from typing import Callable, Awaitable, Dict, List, Union, Any
4+
from mesh_lib.shared.types import MeshConfig, Node
55
from mesh_lib.communication.h2_protocol import H2Protocol
66
from mesh_lib.abstract import ICommunication, IChannel, IServiceDiscovery
77

@@ -12,16 +12,13 @@ def __init__(self, protocol: H2Protocol, stream_id: int):
1212
self.protocol.ensure_stream_queue(stream_id)
1313
self.buffer = b''
1414

15-
async def send(self, message: Message):
16-
payload = json.dumps({
17-
"function_name": message.function_name,
18-
"payload": message.payload
19-
}).encode('utf-8')
15+
async def send(self, message: Any):
16+
payload = json.dumps(message).encode('utf-8')
2017
length = len(payload)
2118
header = length.to_bytes(4, byteorder='big')
2219
self.protocol.send_data(self.stream_id, header + payload)
2320

24-
async def receive(self) -> Message:
21+
async def receive(self) -> Any:
2522
# Read until we have at least 4 bytes for length
2623
while len(self.buffer) < 4:
2724
chunk = await self.protocol.read_stream(self.stream_id)
@@ -45,7 +42,7 @@ async def receive(self) -> Message:
4542

4643
try:
4744
obj = json.loads(payload_data.decode('utf-8'))
48-
return Message(**obj)
45+
return obj
4946
except Exception as e:
5047
print(f"Decode error: {e}")
5148
raise
@@ -73,7 +70,7 @@ def __init__(self, config: MeshConfig, service_discovery: IServiceDiscovery):
7370
self.config = config
7471
self.service_discovery = service_discovery
7572
self.listeners: Dict[str, Callable[[IChannel], Awaitable[None]]] = {}
76-
self.unary_listeners: Dict[str, Callable[[Message], Awaitable[Message]]] = {}
73+
self.unary_listeners: Dict[str, Callable[[Any], Awaitable[Any]]] = {}
7774
self._server = None
7875

7976
async def start(self):
@@ -145,7 +142,7 @@ def _handle_channel(self, stream_id: int, service_name: str, protocol: H2Protoco
145142
def listen_channel(self, service_name: str, callback: Callable[[IChannel], Awaitable[None]]):
146143
self.listeners[service_name] = callback
147144

148-
def listen_unary(self, service_name: str, callback: Callable[[Message], Awaitable[Message]]):
145+
def listen_unary(self, service_name: str, callback: Callable[[Any], Awaitable[Any]]):
149146
self.unary_listeners[service_name] = callback
150147

151148
async def create_channel(self, target: Union[str, Node]) -> IChannel:
@@ -184,7 +181,7 @@ async def create_channel(self, target: Union[str, Node]) -> IChannel:
184181

185182
return H2Channel(protocol, stream_id)
186183

187-
async def send_unary(self, node: Node, message: Message) -> Message:
184+
async def send_unary(self, node: Node, message: Any) -> Any:
188185
loop = asyncio.get_running_loop()
189186
try:
190187
transport, protocol = await loop.create_connection(

python/src/mesh_lib/mesh.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
import socket
44
from typing import Optional, List, Callable, Awaitable
5-
from mesh_lib.shared.types import MeshConfig, Node, Message
5+
from mesh_lib.shared.types import MeshConfig, Node
66
from mesh_lib.service_discovery import RedisServiceDiscovery
77
from mesh_lib.communication import H2Communication
88
from mesh_lib.communication import H2Communication

0 commit comments

Comments
 (0)