Skip to content

Commit 208aeb9

Browse files
revert to use async method onData
1 parent 41a18cd commit 208aeb9

8 files changed

Lines changed: 19 additions & 24 deletions

File tree

node/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ mesh.registerService('my-service-name')
4646
.onRequestChannel(async (channel) => {
4747
console.log(`Accepted connection`);
4848
try {
49-
channel.onData(async (msg) => {
49+
await channel.onData(async (msg) => {
5050
console.log('Received:', msg.payload);
5151
await channel.send({ function_name: 'reply', payload: 'Got it!' });
5252
});
@@ -73,7 +73,7 @@ await channel.send({
7373
});
7474

7575
// Reading messages (Callback handles backpressure)
76-
channel.onData(async (msg) => {
76+
await channel.onData(async (msg) => {
7777
console.log('Reply:', msg.payload);
7878
// To stop receiving, you can close the channel from within handler or check explicitly
7979
if (msg.payload === 'Hello World') {
@@ -92,11 +92,11 @@ await mesh.close();
9292
```
9393

9494
### 6. Callback-based Backpressure
95-
Mesh supports `onData(handler)` for consumers who prefer callbacks but need backpressure. The method returns `void` (fire-and-forget), and the processing loop runs in the background.
95+
Mesh supports `onData(handler)` for consumers who prefer callbacks but need backpressure. The method returns a `Promise` that resolves when the stream is closed, allowing you to `await` the entire processing session.
9696

9797
```typescript
9898
```typescript
99-
channel.onData(async (msg) => {
99+
await channel.onData(async (msg) => {
100100
await processMessage(msg);
101101
// The next message is only requested after this line completes
102102
});

node/mesh_peer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ async function main() {
5656

5757
// Receive Loop
5858
const receiveLoop = async () => {
59-
channel.onData(async (msg) => {
59+
await channel.onData(async (msg) => {
6060
console.log(`[Node] Received: ${msg.payload}`);
6161
});
6262
};

node/src/abstract.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export interface IChannel {
1414
// "but python abstraction has more interfaces than node"
1515
// I will include onClose as it is useful for Node.
1616
onClose?(handler: () => void): void;
17-
onData(handler: (msg: any) => Promise<void>): void;
17+
onData(handler: (msg: any) => Promise<void>): Promise<void>;
1818
[Symbol.asyncIterator](): AsyncIterator<any>;
1919
}
2020

node/src/communication/h2-communication.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,10 @@ export class H2Channel implements IChannel {
122122
}
123123
}
124124

125-
public onData(handler: (msg: any) => Promise<void>): void {
126-
(async () => {
127-
for await (const msg of this) {
128-
await handler(msg);
129-
}
130-
})();
125+
public async onData(handler: (msg: any) => Promise<void>): Promise<void> {
126+
for await (const msg of this) {
127+
await handler(msg);
128+
}
131129
}
132130

133131
public async *[Symbol.asyncIterator](): AsyncIterator<any> {

python/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async def stream_handler(channel):
5959
response = {"function_name": "reply", "payload": "Got it!"}
6060
await channel.send(response)
6161

62-
channel.on_data(handler)
62+
await channel.on_data(handler)
6363
except Exception:
6464
pass
6565

@@ -88,18 +88,18 @@ async def reply_handler(msg):
8888
async def reply_handler(msg):
8989
print(f"Reply: {msg['payload']}")
9090

91-
channel.on_data(reply_handler)
91+
await channel.on_data(reply_handler)
9292

9393
await channel.close()
9494
```
9595

9696
### 5. Callback-based Backpressure
97-
Mesh supports `on_data(handler)` for consuming messages with backpressure. The method returns `None` (fire-and-forget), and the processing loop runs in the background using `asyncio.create_task`.
97+
Mesh supports `on_data(handler)` for consuming messages with backpressure. The method returns an `Awaitable` that resolves when the stream is closed, allowing you to `await` the entire processing session.
9898

9999
```python
100100
async def handler(msg):
101101
await process_message(msg)
102102
# The next message is consumed only after this function returns
103103

104-
channel.on_data(handler)
104+
await channel.on_data(handler)
105105
```

python/mesh_peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def receive_loop():
5050
async def msg_handler(msg):
5151
logger.info(f"Received: {msg}")
5252

53-
channel.on_data(msg_handler)
53+
await channel.on_data(msg_handler)
5454
except Exception as e:
5555
logger.warning(f"Receive loop ended: {e}")
5656

python/src/mesh/abstract.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,10 @@ def __aiter__(self):
9595
return self
9696

9797
@abstractmethod
98-
def on_data(self, handler: Callable[[Any], Awaitable[None]]) -> None:
98+
async def on_data(self, handler: Callable[[Any], Awaitable[None]]):
9999
"""
100100
Process messages with a callback for backpressure.
101101
The next message is read only after the handler completes.
102-
Run in background.
103102
"""
104103
pass
105104

python/src/mesh/communication/h2_communication.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,9 @@ async def __aiter__(self):
6464
return
6565
raise e
6666

67-
def on_data(self, handler: Callable[[Any], Awaitable[None]]) -> None:
68-
async def loop():
69-
async for msg in self:
70-
await handler(msg)
71-
asyncio.create_task(loop())
67+
async def on_data(self, handler: Callable[[Any], Awaitable[None]]):
68+
async for msg in self:
69+
await handler(msg)
7270

7371

7472
class H2Communication(ICommunication):

0 commit comments

Comments
 (0)