Skip to content

Commit 167203e

Browse files
agent: process SMP messages concurrently between different connections
1 parent f0b7a4b commit 167203e

1 file changed

Lines changed: 161 additions & 0 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Parallel Message Processing - Eliminate Single-Thread Bottlenecks
2+
3+
## Problem
4+
5+
Message reception flows through two single-thread bottlenecks:
6+
7+
1. **Agent `msgQ` bottleneck**: Multiple SMP server connections write to one shared `TBQueue` (`AgentClient.msgQ` / `SMPClientAgent.msgQ`). A single `subscriber` thread reads and processes all messages sequentially - DB lookups, double-ratchet decryption, DB writes - regardless of which connection they came from.
8+
9+
2. **Chat `subQ` bottleneck**: The agent's `subscriber` thread writes processed events to one shared `TBQueue` (`AgentClient.subQ`). A single `agentSubscriber` thread in simplex-chat reads and processes all events sequentially.
10+
11+
Both bottlenecks serialize work that could run in parallel, since messages from different connections are independent.
12+
13+
## Solution
14+
15+
Replace queues with callbacks at both layers. The producer calls a processing function directly in its own thread.
16+
17+
### Layer 1: SMP client - eliminate `msgQ`
18+
19+
**Current flow:**
20+
```
21+
SMP connection thread -> writeTBQueue msgQ -> subscriber thread -> processSMPTransmissions
22+
```
23+
24+
**New flow:**
25+
```
26+
SMP connection thread -> processMsg callback (with per-client MVar lock)
27+
```
28+
29+
**Why the MVar lock:** Within one SMP client, two threads produce messages:
30+
- The receive loop (`processMsgs` in `Client.hs:686`)
31+
- `writeSMPMessage` (`Client.hs:874`) - called from `processSUBResponse_` when a SUB response includes an inline MSG
32+
33+
These two must be serialized within one client. An MVar lock ensures they take turns calling the callback. Across different clients (different server connections), no lock is shared - natural parallelism.
34+
35+
#### Changes
36+
37+
**`src/Simplex/Messaging/Client.hs`:**
38+
- In `PClient`: replace `msgQ :: Maybe (TBQueue ...)` with `processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ())` and `processLock :: MVar ()`
39+
- `processMsgs`: acquire `processLock`, call `processServerMsg` with the batch
40+
- `writeSMPMessage`: acquire `processLock`, call `processServerMsg`
41+
- `getProtocolClient`: takes `Maybe (ServerTransmissionBatch v err msg -> IO ())` instead of `Maybe (TBQueue ...)`
42+
- `smpClientStub`: sets `processServerMsg = Nothing`
43+
- `serverTransmission`: unchanged
44+
45+
**`src/Simplex/Messaging/Agent/Client.hs`:**
46+
- Remove `msgQ` field from `AgentClient`
47+
- `smpConnectClient`: pass `processSMPTransmissions` wrapper as callback instead of `Just msgQ`
48+
- Remove `AgentQueuesInfo` and `getAgentQueuesInfo` entirely (dead with no queues to monitor)
49+
- Add `inflightCallbacks :: TVar Int` for monitoring instead - increment before callback, decrement in bracket
50+
51+
**`src/Simplex/Messaging/Agent.hs`:**
52+
- Remove `subscriber` function
53+
- Remove `subscriber` from `runAgentThreads`
54+
- `processSMPTransmissions` stays, called directly from SMP client threads
55+
- `agentOperationBracket c AORcvNetwork` moves into the callback wrapper
56+
- Exception handling: wrap callback with `catchOwn` matching current `subscriber`'s error handling
57+
58+
**`src/Simplex/Messaging/Client/Agent.hs`:**
59+
- `SMPClientAgent`: replace `msgQ` with callback field `processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()`
60+
- `newSMPClientAgent`: takes callback parameter instead of creating `msgQ`
61+
- `connectClient`: passes callback to `getProtocolClient`
62+
63+
**`src/Simplex/Messaging/Notifications/Server.hs`:**
64+
- `ntfSubscriber`: remove `receiveSMP` loop; the processing logic becomes the callback passed via `SMPClientAgent`
65+
- Processing stays in M (via `UnliftIO` or pre-bound env)
66+
67+
**Tests (`tests/SMPProxyTests.hs`):**
68+
- 2 sites: change `getProtocolClient ... (Just msgQ) ...` to pass a callback that writes to a local test TBQueue
69+
70+
### Layer 2: Agent to chat - eliminate `subQ`
71+
72+
**Current flow:**
73+
```
74+
agent processSMPTransmissions -> writeTBQueue subQ -> chat agentSubscriber -> process
75+
```
76+
77+
**New flow:**
78+
```
79+
agent processSMPTransmissions -> processEvent callback [events]
80+
```
81+
82+
**Key design decisions:**
83+
- Callback takes `[ATransmission]` (list), not single event. All events from one connection batch are passed together to maintain ordering within a connection.
84+
- Error notifications (currently `nonBlockingWriteTBQueue`) use `forkIO $ callback [event]` - fire-and-forget, order doesn't matter for errors.
85+
- The `isFullTBQueue subQ` / pending mechanism disappears - the callback receives the full list directly, no need to buffer/flush.
86+
- `AgentClient` keeps `testQ :: Maybe (TBQueue ATransmission)` for tests only.
87+
88+
#### Changes
89+
90+
**`src/Simplex/Messaging/Agent/Client.hs`:**
91+
- Replace `subQ :: TBQueue ATransmission` with:
92+
- `processEvent :: [ATransmission] -> IO ()` - callback, accepts event list
93+
- `testQ :: Maybe (TBQueue ATransmission)` - test-only, `Nothing` in production
94+
- Remove `AgentQueuesInfo` / `getAgentQueuesInfo`
95+
- Add `inflightCallbacks :: TVar Int` with bracket: `withInflight c $ processEvent c events`
96+
97+
**`src/Simplex/Messaging/Agent.hs`:**
98+
- `processSMPTransmissions`: accumulate events in a local list (currently uses `pendingMsgs` TVar + flush pattern). Call `processEvent` once at end with the full list.
99+
- `runCommandProcessing`: same - call `processEvent` once with all events for the command batch. Remove `isFullTBQueue`/pending logic.
100+
- All `notify`/`notify'` helpers within `processSMPTransmissions` write to a local `TVar [ATransmission]` instead of directly to `subQ`. Flushed at end as single `processEvent` call.
101+
- Error sites (currently `nonBlockingWriteTBQueue`): use `forkIO $ processEvent c [event]`
102+
- Other direct `writeTBQueue subQ` sites (CONNECT/DISCONNECT events, SUSPENDED, etc.): call `processEvent c [event]` directly.
103+
- Remove `subscriber` function entirely.
104+
- Exception safety: `processEvent` call wrapped in bracket that catches "own" exceptions and logs them.
105+
106+
**`src/Simplex/Messaging/Agent/Client.hs`:**
107+
- `notifySub'` (line 838): change to `forkIO $ processEvent c [event]` (non-blocking error notification)
108+
109+
**`src/Simplex/Messaging/Agent/NtfSubSupervisor.hs`:**
110+
- 1 site: change `nonBlockingWriteTBQueue subQ event` to `forkIO $ processEvent c [event]`
111+
112+
**`src/Simplex/FileTransfer/Agent.hs`:**
113+
- 1 site (line 351): `notify` helper changes to `processEvent c [event]`
114+
115+
**`simplex-chat/src/Simplex/Chat/Library/Commands.hs`:**
116+
- Remove `agentSubscriber` thread
117+
- Pass chat's `process` function (adapted to accept `[ATransmission]`) as `processEvent` callback at agent initialization
118+
119+
**Tests:**
120+
- `pGet` changes from `readTBQueue (subQ c)` to `readTBQueue (fromJust $ testQ c)` - 1 line
121+
- Agent test setup: `processEvent = mapM_ (atomically . writeTBQueue q)` where `q` is `testQ`
122+
- ~348 test call sites unchanged
123+
124+
## Concurrency Safety
125+
126+
- **Per-SMP-connection:** MVar in each SMP client serializes `processMsgs` and `writeSMPMessage`
127+
- **Cross-connection:** Different SMP clients have different MVars, run in different threads - fully parallel
128+
- **Per-connection-id:** `withConnLock connId` in `processSMPTransmissions` handles per-connection locking
129+
- **Chat callback:** Must be safe for concurrent calls from different agent threads. Chat dispatches by entity type and connection ID; individual handlers use their own locks.
130+
- **Exception safety:** Callback wrapped with bracket pattern - catches own exceptions, logs, decrements inflight counter. Exceptions don't kill SMP client threads.
131+
132+
## Implementation Order
133+
134+
Both layers change in one PR since they share `Client.hs`.
135+
136+
### Phase 1: SMP client callback (`Client.hs` + both agent types)
137+
138+
- [ ] 1.1 `Client.hs`: Replace `msgQ` with `processServerMsg` callback + `processLock` MVar in `PClient`
139+
- [ ] 1.2 `Client.hs`: Update `processMsgs`, `writeSMPMessage`, `getProtocolClient`, `smpClientStub`
140+
- [ ] 1.3 `Client/Agent.hs`: Replace `msgQ` in `SMPClientAgent` with callback field, update `newSMPClientAgent`, `connectClient`
141+
- [ ] 1.4 `Agent/Client.hs`: Remove `msgQ` from `AgentClient`, update `smpConnectClient` to pass `processSMPTransmissions` as callback
142+
- [ ] 1.5 `Agent.hs`: Remove `subscriber` thread from `runAgentThreads`, add exception wrapper to callback
143+
- [ ] 1.6 `Notifications/Server.hs`: Convert `receiveSMP` from loop to callback passed to `SMPClientAgent`
144+
- [ ] 1.7 `SMPProxyTests.hs`: Update 2 call sites to use callback + local test queue
145+
146+
### Phase 2: Agent event callback (`subQ` -> `processEvent`)
147+
148+
- [ ] 2.1 `Agent/Client.hs`: Add `processEvent :: [ATransmission] -> IO ()` and `testQ :: Maybe (TBQueue ATransmission)`, remove `subQ`, remove `AgentQueuesInfo`
149+
- [ ] 2.2 `Agent.hs`: Rewrite `processSMPTransmissions` to accumulate events in local list and call `processEvent` once at end
150+
- [ ] 2.3 `Agent.hs`: Update `runCommandProcessing` - remove pending/isFullTBQueue pattern, call `processEvent` with list
151+
- [ ] 2.4 `Agent.hs`, `Agent/Client.hs`, `NtfSubSupervisor.hs`, `FileTransfer/Agent.hs`: Update all `writeTBQueue subQ` / `nonBlockingWriteTBQueue subQ` sites (~32 total)
152+
- [ ] 2.5 `Agent/Client.hs`: Add inflight counter with bracket
153+
- [ ] 2.6 Update `pGet` to read from `testQ` (1 line), update test agent setup
154+
- [ ] 2.7 `simplex-chat`: Pass chat's `process` as callback, remove `agentSubscriber`
155+
- [ ] 2.8 Fix any multi-server test ordering issues
156+
157+
## Risks
158+
159+
- **Chat thread safety:** Chat's `process` may not be safe for concurrent calls. Audit needed.
160+
- **Backpressure:** Slow callback blocks SMP client receive thread. Acceptable - the connection that produced the message waits. Cross-connection interference eliminated.
161+
- **Ordering:** Within one SMP connection - preserved (MVar + list callback). Across connections - non-deterministic (same as today, since `msgQ` interleaving was arbitrary). Most tests use 1 server.

0 commit comments

Comments
 (0)