@@ -271,9 +271,7 @@ getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, netC
271271 currentTs <- liftIO getCurrentTime
272272 notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure []
273273 env <- ask
274- let processMsg c t =
275- agentOperationBracket c AORcvNetwork waitUntilActive (processSMPTransmissions c t) `runReaderT` env
276- `catchOwn` \ e -> atomically $ writeTBQueue (subQ c) (" " , " " , AEvt SAEConn $ ERR $ CRITICAL True $ " subscriber error: " <> show e)
274+ let processMsg c t = subscriber c t `runReaderT` env
277275 c@ AgentClient {acThread} <- liftIO $ newAgentClient clientId initServers currentTs notices processMsg env
278276 t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c)
279277 atomically . writeTVar acThread . Just =<< mkWeakThreadId t
@@ -2985,6 +2983,14 @@ getNextSMPServer :: AgentClient -> UserId -> [SMPServer] -> AM SMPServerWithAuth
29852983getNextSMPServer c userId = getNextServer c userId storageSrvs
29862984{-# INLINE getNextSMPServer #-}
29872985
2986+ subscriber :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
2987+ subscriber c@ AgentClient {subQ} t = run $
2988+ agentOperationBracket c AORcvNetwork waitUntilActive $
2989+ processSMPTransmissions c t
2990+ where
2991+ run a = a `catchOwn` \ e -> notify $ CRITICAL True $ " subscriber error: " <> show e
2992+ notify err = atomically $ writeTBQueue subQ (" " , " " , AEvt SAEConn $ ERR err)
2993+
29882994
29892995cleanupManager :: AgentClient -> AM' ()
29902996cleanupManager c@ AgentClient {subQ} = do
0 commit comments