Skip to content

Commit 3d2b6ed

Browse files
added async iterator to read channel messages
1 parent 4746321 commit 3d2b6ed

9 files changed

Lines changed: 53 additions & 71 deletions

File tree

node/README.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ Define your Redis connection and service details.
2525
import { Mesh, MeshConfig } from '@marketrix/mesh';
2626

2727
const config: MeshConfig = {
28+
redis: { host: 'localhost', port: 6379 }, // Shared Redis Config
2829
service_discovery: {
29-
redis_config: { host: 'localhost', port: 6379 },
3030
heartbeat_interval: 2,
3131
heartbeat_threshold: 3
3232
},
33-
host: '127.0.0.1', // Your local IP/Host
33+
// host: '127.0.0.1', // Optional: Auto-detected if omitted
3434
port: 0 // 0 for random available port
3535
};
3636
```
@@ -51,14 +51,13 @@ await mesh.registerService('my-service-name', {
5151
// 1. Streaming Handler
5252
onRequestChannel: async (channel) => {
5353
console.log(`Accepted connection`);
54-
while (true) {
55-
try {
56-
const msg = await channel.receive();
54+
try {
55+
for await (const msg of channel) {
5756
console.log('Received:', msg.payload);
5857
await channel.send({ function_name: 'reply', payload: 'Got it!' });
59-
} catch (e) {
60-
break;
6158
}
59+
} catch (e) {
60+
console.error(e);
6261
}
6362
},
6463
// 2. Unary Handler (Request/Reply)
@@ -73,14 +72,14 @@ await mesh.registerService('my-service-name', {
7372
Discover and connect to another service.
7473

7574
```typescript
76-
const channel = await mesh.requestChannel('target-service-name', 'session-id-123');
75+
const channel = await mesh.service('target-service-name').requestChannel('session-id-123');
7776

7877
await channel.send({
7978
function_name: 'greet',
8079
payload: 'Hello World'
8180
});
8281

83-
const reply = await channel.receive();
82+
const reply = await channel.receive(); // You can still use receive() for single messages
8483
console.log('Reply:', reply.payload);
8584

8685
await channel.close();

node/mesh_peer.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ async function main() {
5757

5858
// Receive Loop
5959
const receiveLoop = async () => {
60-
while (true) {
61-
const msg = await channel.receive();
60+
for await (const msg of channel) {
6261
console.log(`[Node] Received: ${msg.payload}`);
6362
}
6463
};

node/src/abstract.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export interface IChannel {
1414
// "but python abstraction has more interfaces than node"
1515
// I will include onClose as it is useful for Node.
1616
onClose?(handler: () => void): void;
17+
[Symbol.asyncIterator](): AsyncIterator<Message>;
1718
}
1819

1920
export interface ICommunication {

node/src/communication/h2-communication.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,20 @@ export class H2Channel implements IChannel {
121121
this.closeHandlers.push(handler);
122122
}
123123
}
124+
125+
public async *[Symbol.asyncIterator](): AsyncIterator<Message> {
126+
while (!this.isClosed) {
127+
try {
128+
const msg = await this.receive();
129+
yield msg;
130+
} catch (e: any) {
131+
if (e.message === 'Channel closed') {
132+
return;
133+
}
134+
throw e;
135+
}
136+
}
137+
}
124138
}
125139

126140
export class H2Communication extends EventEmitter implements ICommunication {

node/src/demo_client.ts

Lines changed: 0 additions & 49 deletions
This file was deleted.

python/README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ from mesh_lib import Mesh
2626
from mesh_lib.shared.types import MeshConfig, ServiceDiscoveryConfig, Message
2727

2828
config = MeshConfig(
29+
redis={"host": "localhost", "port": 6379}, # Shared Redis Config
2930
service_discovery=ServiceDiscoveryConfig(
30-
redis_config={"host": "localhost", "port": 6379},
3131
heartbeat_interval=2,
3232
heartbeat_threshold=3
3333
),
34-
host="127.0.0.1", # Your local IP/Host
35-
port=0 # 0 for random available port
34+
# host="127.0.0.1", # Optional: Auto-detected if omitted
35+
port=0
3636
)
3737
```
3838

@@ -60,14 +60,14 @@ Register a service name to handle incoming connections.
6060
```python
6161
async def stream_handler(channel):
6262
print("Accepted connection")
63-
while True:
64-
try:
65-
msg = await channel.receive()
63+
print("Accepted connection")
64+
try:
65+
async for msg in channel:
6666
print(f"Received: {msg.payload}")
6767
response = Message(function_name="reply", payload="Got it!")
6868
await channel.send(response)
69-
except Exception:
70-
break
69+
except Exception:
70+
pass
7171

7272
async def unary_handler(msg):
7373
print(f"Received Unary: {msg.payload}")
@@ -83,7 +83,7 @@ await mesh.register_service("my-service-name",
8383
Discover and connect to another service.
8484

8585
```python
86-
channel = await mesh.request_channel("target-service-name", "session-id-123")
86+
channel = await mesh.service("target-service-name").request_channel("session-id-123")
8787

8888
out_msg = Message(function_name="greet", payload="Hello World")
8989
await channel.send(out_msg)

python/mesh_peer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ async def send_loop():
4848
# Start parallel receive loop
4949
async def receive_loop():
5050
try:
51-
while True:
52-
msg = await channel.receive()
51+
async for msg in channel:
5352
logger.info(f"Received: {msg.payload}")
5453
except Exception as e:
5554
logger.warning(f"Receive loop ended: {e}")

python/src/mesh_lib/abstract.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ async def close(self):
8888
"""
8989
pass
9090

91+
def __aiter__(self):
92+
"""
93+
Return an async iterator for receiving messages.
94+
"""
95+
return self
96+
9197
class ICommunication(ABC):
9298
@abstractmethod
9399
async def create_channel(self, target: Union[str, Node]) -> IChannel:

python/src/mesh_lib/communication/h2_communication.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ async def close(self):
5454
# End stream
5555
self.protocol.send_data(self.stream_id, b'', end_stream=True)
5656

57+
async def __aiter__(self):
58+
while True:
59+
try:
60+
msg = await self.receive()
61+
yield msg
62+
except Exception as e:
63+
# Check for stream closed messages
64+
# In receive(): raise Exception("Stream closed") or "Incomplete message... closed"
65+
msg_str = str(e).lower()
66+
if "stream closed" in msg_str or "incomplete" in msg_str:
67+
return
68+
raise e
69+
5770

5871
class H2Communication(ICommunication):
5972
def __init__(self, config: MeshConfig, service_discovery: IServiceDiscovery):

0 commit comments

Comments
 (0)