Skip to content

Commit 0468584

Browse files
agent: process SMP messages concurrently between different connections
1 parent f0b7a4b commit 0468584

8 files changed

Lines changed: 309 additions & 139 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Parallel Message Processing - Eliminate Single-Thread Bottlenecks
2+
3+
## Problem
4+
5+
Message reception flows through two single-thread bottlenecks:
6+
7+
1. **Agent `msgQ` bottleneck**: Multiple SMP server connections write to one shared `TBQueue` (`AgentClient.msgQ` / `SMPClientAgent.msgQ`). A single `subscriber` thread reads and processes all messages sequentially - DB lookups, double-ratchet decryption, DB writes - regardless of which connection they came from.
8+
9+
2. **Chat `subQ` bottleneck**: The agent's `subscriber` thread writes processed events to one shared `TBQueue` (`AgentClient.subQ`). A single `agentSubscriber` thread in simplex-chat reads and processes all events sequentially.
10+
11+
Both bottlenecks serialize work that could run in parallel, since messages from different connections are independent.
12+
13+
## Solution
14+
15+
Replace queues with callbacks at both layers. The producer calls a processing function directly in its own thread.
16+
17+
### Layer 1: SMP client - eliminate `msgQ`
18+
19+
**Current flow:**
20+
```
21+
SMP connection thread -> writeTBQueue msgQ -> subscriber thread -> processSMPTransmissions
22+
```
23+
24+
**New flow:**
25+
```
26+
SMP connection thread -> processMsg callback (with per-client MVar lock)
27+
```
28+
29+
**Why the MVar lock:** Within one SMP client, two threads produce messages:
30+
- The receive loop (`processMsgs` in `Client.hs:686`)
31+
- `writeSMPMessage` (`Client.hs:874`) - called from `processSUBResponse_` when a SUB response includes an inline MSG
32+
33+
These two must be serialized within one client. An MVar lock ensures they take turns calling the callback. Across different clients (different server connections), no lock is shared - natural parallelism.
34+
35+
#### Changes
36+
37+
**`src/Simplex/Messaging/Client.hs`:**
38+
- In `PClient`: replace `msgQ :: Maybe (TBQueue ...)` with `processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ())` and `processLock :: MVar ()`
39+
- `processMsgs`: acquire `processLock`, call `processServerMsg` with the batch
40+
- `writeSMPMessage`: acquire `processLock`, call `processServerMsg`
41+
- `getProtocolClient`: takes `Maybe (ServerTransmissionBatch v err msg -> IO ())` instead of `Maybe (TBQueue ...)`
42+
- `smpClientStub`: sets `processServerMsg = Nothing`
43+
- `serverTransmission`: unchanged
44+
45+
**`src/Simplex/Messaging/Agent/Client.hs`:**
46+
- Remove `msgQ` field from `AgentClient`
47+
- `smpConnectClient`: pass `processSMPTransmissions` wrapper as callback instead of `Just msgQ`
48+
- Remove `AgentQueuesInfo` and `getAgentQueuesInfo` entirely (dead with no queues to monitor)
49+
- Add `inflightCallbacks :: TVar Int` for monitoring instead - increment before callback, decrement in bracket
50+
51+
**`src/Simplex/Messaging/Agent.hs`:**
52+
- Remove `subscriber` function
53+
- Remove `subscriber` from `runAgentThreads`
54+
- `processSMPTransmissions` stays, called directly from SMP client threads
55+
- `agentOperationBracket c AORcvNetwork` moves into the callback wrapper
56+
- Exception handling: wrap callback with `catchOwn` matching current `subscriber`'s error handling
57+
58+
**`src/Simplex/Messaging/Client/Agent.hs`:**
59+
- `SMPClientAgent`: replace `msgQ` with callback field `processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()`
60+
- `newSMPClientAgent`: takes callback parameter instead of creating `msgQ`
61+
- `connectClient`: passes callback to `getProtocolClient`
62+
63+
**`src/Simplex/Messaging/Notifications/Server.hs`:**
64+
- `ntfSubscriber`: remove `receiveSMP` loop; the processing logic becomes the callback passed via `SMPClientAgent`
65+
- Processing stays in M (via `UnliftIO` or pre-bound env)
66+
67+
**Tests (`tests/SMPProxyTests.hs`):**
68+
- 2 sites: change `getProtocolClient ... (Just msgQ) ...` to pass a callback that writes to a local test TBQueue
69+
70+
### Layer 2: Agent to chat - eliminate `subQ`
71+
72+
**Current flow:**
73+
```
74+
agent processSMPTransmissions -> writeTBQueue subQ -> chat agentSubscriber -> process
75+
```
76+
77+
**New flow:**
78+
```
79+
agent processSMPTransmissions -> processEvent callback [events]
80+
```
81+
82+
**Key design decisions:**
83+
- Callback takes `[ATransmission]` (list), not single event. All events from one connection batch are passed together to maintain ordering within a connection.
84+
- Error notifications (currently `nonBlockingWriteTBQueue`) use `forkIO $ callback [event]` - fire-and-forget, order doesn't matter for errors.
85+
- The `isFullTBQueue subQ` / pending mechanism disappears - the callback receives the full list directly, no need to buffer/flush.
86+
- `AgentClient` keeps `testQ :: Maybe (TBQueue ATransmission)` for tests only.
87+
88+
#### Changes
89+
90+
**`src/Simplex/Messaging/Agent/Client.hs`:**
91+
- Replace `subQ :: TBQueue ATransmission` with:
92+
- `processEvent :: [ATransmission] -> IO ()` - callback, accepts event list
93+
- `testQ :: Maybe (TBQueue ATransmission)` - test-only, `Nothing` in production
94+
- Remove `AgentQueuesInfo` / `getAgentQueuesInfo`
95+
- Add `inflightCallbacks :: TVar Int` with bracket: `withInflight c $ processEvent c events`
96+
97+
**`src/Simplex/Messaging/Agent.hs`:**
98+
- `processSMPTransmissions`: accumulate events in a local list (currently uses `pendingMsgs` TVar + flush pattern). Call `processEvent` once at end with the full list.
99+
- `runCommandProcessing`: same - call `processEvent` once with all events for the command batch. Remove `isFullTBQueue`/pending logic.
100+
- All `notify`/`notify'` helpers within `processSMPTransmissions` write to a local `TVar [ATransmission]` instead of directly to `subQ`. Flushed at end as single `processEvent` call.
101+
- Error sites (currently `nonBlockingWriteTBQueue`): use `forkIO $ processEvent c [event]`
102+
- Other direct `writeTBQueue subQ` sites (CONNECT/DISCONNECT events, SUSPENDED, etc.): call `processEvent c [event]` directly.
103+
- Remove `subscriber` function entirely.
104+
- Exception safety: `processEvent` call wrapped in bracket that catches "own" exceptions and logs them.
105+
106+
**`src/Simplex/Messaging/Agent/Client.hs`:**
107+
- `notifySub'` (line 838): change to `forkIO $ processEvent c [event]` (non-blocking error notification)
108+
109+
**`src/Simplex/Messaging/Agent/NtfSubSupervisor.hs`:**
110+
- 1 site: change `nonBlockingWriteTBQueue subQ event` to `forkIO $ processEvent c [event]`
111+
112+
**`src/Simplex/FileTransfer/Agent.hs`:**
113+
- 1 site (line 351): `notify` helper changes to `processEvent c [event]`
114+
115+
**`simplex-chat/src/Simplex/Chat/Library/Commands.hs`:**
116+
- Remove `agentSubscriber` thread
117+
- Pass chat's `process` function (adapted to accept `[ATransmission]`) as `processEvent` callback at agent initialization
118+
119+
**Tests:**
120+
- `pGet` changes from `readTBQueue (subQ c)` to `readTBQueue (fromJust $ testQ c)` - 1 line
121+
- Agent test setup: `processEvent = mapM_ (atomically . writeTBQueue q)` where `q` is `testQ`
122+
- ~348 test call sites unchanged
123+
124+
## Concurrency Safety
125+
126+
- **Per-SMP-connection:** MVar in each SMP client serializes `processMsgs` and `writeSMPMessage`
127+
- **Cross-connection:** Different SMP clients have different MVars, run in different threads - fully parallel
128+
- **Per-connection-id:** `withConnLock connId` in `processSMPTransmissions` handles per-connection locking
129+
- **Chat callback:** Must be safe for concurrent calls from different agent threads. Chat dispatches by entity type and connection ID; individual handlers use their own locks.
130+
- **Exception safety:** Callback wrapped with bracket pattern - catches own exceptions, logs, decrements inflight counter. Exceptions don't kill SMP client threads.
131+
132+
## Implementation Order
133+
134+
Both layers change in one PR since they share `Client.hs`.
135+
136+
### Phase 1: SMP client callback (`Client.hs` + both agent types)
137+
138+
- [ ] 1.1 `Client.hs`: Replace `msgQ` with `processServerMsg` callback + `processLock` MVar in `PClient`
139+
- [ ] 1.2 `Client.hs`: Update `processMsgs`, `writeSMPMessage`, `getProtocolClient`, `smpClientStub`
140+
- [ ] 1.3 `Client/Agent.hs`: Replace `msgQ` in `SMPClientAgent` with callback field, update `newSMPClientAgent`, `connectClient`
141+
- [ ] 1.4 `Agent/Client.hs`: Remove `msgQ` from `AgentClient`, update `smpConnectClient` to pass `processSMPTransmissions` as callback
142+
- [ ] 1.5 `Agent.hs`: Remove `subscriber` thread from `runAgentThreads`, add exception wrapper to callback
143+
- [ ] 1.6 `Notifications/Server.hs`: Convert `receiveSMP` from loop to callback passed to `SMPClientAgent`
144+
- [ ] 1.7 `SMPProxyTests.hs`: Update 2 call sites to use callback + local test queue
145+
146+
### Phase 2: Agent event callback (`subQ` -> `processEvent`)
147+
148+
- [ ] 2.1 `Agent/Client.hs`: Add `processEvent :: [ATransmission] -> IO ()` and `testQ :: Maybe (TBQueue ATransmission)`, remove `subQ`, remove `AgentQueuesInfo`
149+
- [ ] 2.2 `Agent.hs`: Rewrite `processSMPTransmissions` to accumulate events in local list and call `processEvent` once at end
150+
- [ ] 2.3 `Agent.hs`: Update `runCommandProcessing` - remove pending/isFullTBQueue pattern, call `processEvent` with list
151+
- [ ] 2.4 `Agent.hs`, `Agent/Client.hs`, `NtfSubSupervisor.hs`, `FileTransfer/Agent.hs`: Update all `writeTBQueue subQ` / `nonBlockingWriteTBQueue subQ` sites (~32 total)
152+
- [ ] 2.5 `Agent/Client.hs`: Add inflight counter with bracket
153+
- [ ] 2.6 Update `pGet` to read from `testQ` (1 line), update test agent setup
154+
- [ ] 2.7 `simplex-chat`: Pass chat's `process` as callback, remove `agentSubscriber`
155+
- [ ] 2.8 Fix any multi-server test ordering issues
156+
157+
## Risks
158+
159+
- **Chat thread safety:** Chat's `process` may not be safe for concurrent calls. Audit needed.
160+
- **Backpressure:** Slow callback blocks SMP client receive thread. Acceptable - the connection that produced the message waits. Cross-connection interference eliminated.
161+
- **Ordering:** Within one SMP connection - preserved (MVar + list callback). Across connections - non-deterministic (same as today, since `msgQ` interleaving was arbitrary). Most tests use 1 server.

src/Simplex/Messaging/Agent.hs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ where
145145

146146
import Control.Applicative ((<|>))
147147
import Control.Concurrent.STM (retry)
148+
import Data.IORef
148149
import Control.Logger.Simple
149150
import Control.Monad
150151
import Control.Monad.Except
@@ -270,19 +271,25 @@ getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, netC
270271
liftIO $ checkServers "SMP" smp >> checkServers "XFTP" xftp
271272
currentTs <- liftIO getCurrentTime
272273
notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure []
273-
c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs notices =<< ask
274+
env <- ask
275+
cRef <- liftIO $ newIORef (error "agent client not initialized")
276+
let processMsg t = do
277+
c <- readIORef cRef
278+
agentOperationBracket c AORcvNetwork waitUntilActive (processSMPTransmissions c t) `runReaderT` env
279+
`catchOwn` \e -> atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ CRITICAL True $ "subscriber error: " <> show e)
280+
c@AgentClient {acThread} <- liftIO $ newAgentClient clientId initServers currentTs notices processMsg env
281+
liftIO $ writeIORef cRef c
274282
t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c)
275283
atomically . writeTVar acThread . Just =<< mkWeakThreadId t
276284
pure c
277285
checkServers protocol srvs =
278286
forM_ (M.assocs srvs) $ \(userId, srvs') -> checkUserServers ("getSMPAgentClient " <> protocol <> " " <> tshow userId) srvs'
279287
runAgentThreads c
280-
| backgroundMode = run c "subscriber" $ subscriber c
288+
| backgroundMode = forever $ liftIO $ threadDelay maxBound
281289
| otherwise = do
282290
restoreServersStats c
283291
raceAny_
284-
[ run c "subscriber" $ subscriber c,
285-
run c "runNtfSupervisor" $ runNtfSupervisor c,
292+
[ run c "runNtfSupervisor" $ runNtfSupervisor c,
286293
run c "cleanupManager" $ cleanupManager c,
287294
run c "logServersStats" $ logServersStats c
288295
]
@@ -2982,14 +2989,6 @@ getNextSMPServer :: AgentClient -> UserId -> [SMPServer] -> AM SMPServerWithAuth
29822989
getNextSMPServer c userId = getNextServer c userId storageSrvs
29832990
{-# INLINE getNextSMPServer #-}
29842991

2985-
subscriber :: AgentClient -> AM' ()
2986-
subscriber c@AgentClient {msgQ, subQ} = run $ forever $ do
2987-
t <- atomically $ readTBQueue msgQ
2988-
agentOperationBracket c AORcvNetwork waitUntilActive $
2989-
processSMPTransmissions c t
2990-
where
2991-
run a = a `catchOwn` \e -> notify $ CRITICAL True $ "Agent subscriber stopped: " <> show e
2992-
notify err = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR err)
29932992

29942993
cleanupManager :: AgentClient -> AM' ()
29952994
cleanupManager c@AgentClient {subQ} = do

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ data AgentClient = AgentClient
338338
{ acThread :: TVar (Maybe (Weak ThreadId)),
339339
active :: TVar Bool,
340340
subQ :: TBQueue ATransmission,
341-
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
341+
processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (),
342342
smpServers :: TMap UserId (UserServers 'PSMP),
343343
smpClients :: TMap SMPTransportSession SMPClientVar,
344344
useClientServices :: TMap UserId Bool,
@@ -505,15 +505,14 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther
505505
deriving (Eq, Show)
506506

507507
-- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's.
508-
newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient
509-
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices agentEnv = do
508+
newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Env -> IO AgentClient
509+
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices processServerMsg agentEnv = do
510510
let cfg = config agentEnv
511511
qSize = tbqSize cfg
512512
proxySessTs <- newTVarIO =<< getCurrentTime
513513
acThread <- newTVarIO Nothing
514514
active <- newTVarIO True
515515
subQ <- newTBQueueIO qSize
516-
msgQ <- newTBQueueIO qSize
517516
smpServers <- newTVarIO $ M.map mkUserServers smp
518517
smpClients <- TM.emptyIO
519518
useClientServices <- newTVarIO useServices
@@ -553,7 +552,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices
553552
{ acThread,
554553
active,
555554
subQ,
556-
msgQ,
555+
processServerMsg,
557556
smpServers,
558557
smpClients,
559558
useClientServices,
@@ -733,7 +732,7 @@ getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq
733732
Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT
734733

735734
smpConnectClient :: AgentClient -> NetworkRequestMode -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient
736-
smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v =
735+
smpConnectClient c@AgentClient {smpClients, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v =
737736
newProtocolClient c tSess smpClients connectClient v
738737
`catchAllErrors` \e -> lift (resubscribeSMPSession c tSess) >> throwE e
739738
where
@@ -746,7 +745,7 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
746745
env <- ask
747746
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
748747
ts <- readTVarIO proxySessTs
749-
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
748+
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just $ processServerMsg c) ts $ smpClientDisconnected c tSess env v' prs
750749
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
751750
updateClientService service smp
752751
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
@@ -2835,8 +2834,8 @@ data ClientInfo
28352834
deriving (Show)
28362835

28372836
getAgentQueuesInfo :: AgentClient -> IO AgentQueuesInfo
2838-
getAgentQueuesInfo AgentClient {msgQ, subQ, smpClients} = do
2839-
msgQInfo <- atomically $ getTBQueueInfo msgQ
2837+
getAgentQueuesInfo AgentClient {subQ, smpClients} = do
2838+
let msgQInfo = TBQueueInfo {qLength = 0, qFull = False}
28402839
subQInfo <- atomically $ getTBQueueInfo subQ
28412840
smpClientsMap <- readTVarIO smpClients
28422841
let smpClientsMap' = M.mapKeys (decodeLatin1 . strEncode) smpClientsMap

0 commit comments

Comments
 (0)