Skip to content

Commit cb54728

Browse files
simpler
1 parent 8b52f50 commit cb54728

File tree

5 files changed

+44
-60
lines changed

5 files changed

+44
-60
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,41 +1366,37 @@ client
13661366
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
13671367
let THandleParams {thVersion} = thParams'
13681368
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
1369-
process prepared t acc@(rs, msgs) =
1369+
process batchSubs t acc@(rs, msgs) =
13701370
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
1371-
<$> processCommand clntServiceId thVersion prepared t
1371+
<$> processCommand clntServiceId thVersion batchSubs t
13721372
forever $ do
13731373
batch <- atomically (readTBQueue rcvQ)
1374-
prepared <- prepareBatch clntServiceId batch
1375-
foldrM (process prepared) ([], []) batch
1374+
batchSubs <- prepareBatchSubs clntServiceId batch
1375+
foldrM (process batchSubs) ([], []) batch
13761376
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
13771377
where
1378-
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
1379-
prepareBatch clntServiceId_ batch = do
1380-
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldl' classify ([], [], []) $ L.toList batch
1381-
classify (!msgQs, !rcvQs, !ntfQs) = \case
1378+
prepareBatchSubs ::
1379+
Maybe ServiceId ->
1380+
NonEmpty (VerifiedTransmission s) ->
1381+
M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())))
1382+
prepareBatchSubs clntServiceId_ batch = do
1383+
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldr partitionSubs ([], [], []) batch
1384+
partitionSubs t (msgQs, rcvQs, ntfQs) = case t of
13821385
(Just (q, qr), (_, _, Cmd SRecipient SUB))
13831386
| clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs)
13841387
| otherwise -> (q : msgQs, rcvQs, ntfQs)
13851388
(Just (q, qr), (_, _, Cmd SNotifier NSUB))
13861389
| clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs)
1387-
| otherwise -> (msgQs, rcvQs, ntfQs)
13881390
_ -> (msgQs, rcvQs, ntfQs)
13891391
liftIO $ runExceptT $ do
1390-
msgs <- if null subMsgQs then pure M.empty else tryPeekMsgs ms subMsgQs
1391-
rcvUpdated <- if null rcvAssocQs then pure S.empty else ExceptT $ Right <$> setRcvQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ rcvAssocQs
1392-
ntfUpdated <- if null ntfAssocQs then pure S.empty else ExceptT $ Right <$> setNtfQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ ntfAssocQs
1393-
let rcvSet = S.fromList $ map recipientId rcvAssocQs
1394-
assocResult rId updated = if S.member rId updated then Right () else Left AUTH
1395-
mkEntry q =
1396-
let rId = recipientId q
1397-
msg_ = M.lookup rId msgs
1398-
assoc_
1399-
| S.member rId rcvSet = Just $ assocResult rId rcvUpdated
1400-
| otherwise = Nothing
1401-
in (rId, (msg_, assoc_))
1402-
mkNtfEntry q = let rId = recipientId q in (rId, (Nothing, Just $ assocResult rId ntfUpdated))
1403-
pure $ M.fromList $ map mkEntry subMsgQs <> map mkNtfEntry ntfAssocQs
1392+
rcvAssocs <- ifNotNull rcvAssocQs $ ExceptT . setService SRecipientService clntServiceId_
1393+
ntfAssocs <- ifNotNull ntfAssocQs $ ExceptT . setService SNotifierService clntServiceId_
1394+
msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms
1395+
pure (msgs, rcvAssocs, ntfAssocs)
1396+
where
1397+
ifNotNull qs f = if null qs then pure M.empty else f qs
1398+
setService :: (PartyI p, ServiceParty p) => SParty p -> Maybe ServiceId -> [StoreQueue s] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ())))
1399+
setService = setQueueServices (queueStore ms)
14041400

14051401
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
14061402
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
@@ -1482,8 +1478,8 @@ client
14821478
mkIncProxyStats ps psOwn own sel = do
14831479
incStat $ sel ps
14841480
when own $ incStat $ sel psOwn
1485-
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1486-
processCommand clntServiceId clntVersion prepared (q_, (corrId, entId, cmd)) = case cmd of
1481+
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1482+
processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of
14871483
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
14881484
Cmd SSender command -> case command of
14891485
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1495,10 +1491,8 @@ client
14951491
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
14961492
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
14971493
Just (q, QueueRec {notifier = Just ntfCreds}) ->
1498-
let assoc_ = case prepared of
1499-
Left _ -> Nothing
1500-
Right prepMap -> M.lookup entId prepMap >>= snd
1501-
in subscribeNotifications assoc_ q ntfCreds
1494+
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
1495+
$ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n
15021496
_ -> pure $ ERR INTERNAL
15031497
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
15041498
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
@@ -1511,10 +1505,11 @@ client
15111505
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
15121506
Cmd SRecipient command ->
15131507
case command of
1514-
SUB -> case prepared of
1508+
SUB -> case batchSubs of
15151509
Left e -> pure $ Just (err e, Nothing)
1516-
Right prepMap -> let (msg_, assoc_) = maybe (Nothing, Nothing) id (M.lookup entId prepMap)
1517-
in withQueue' $ subscribeQueueAndDeliver msg_ assoc_
1510+
Right (msgs, rcvAssocs, _) -> case sequence $ M.lookup entId rcvAssocs of
1511+
Left e -> pure $ Just (err e, Nothing)
1512+
Right _ -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
15181513
GET -> withQueue getMessage
15191514
ACK msgId -> withQueue $ acknowledgeMsg msgId
15201515
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1655,11 +1650,11 @@ client
16551650
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
16561651
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
16571652

1658-
subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
1659-
subscribeQueueAndDeliver msg_ assocResult q qr@QueueRec {rcvServiceId} =
1653+
subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
1654+
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
16601655
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
16611656
Nothing ->
1662-
sharedSubscribeQueue assocResult q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
1657+
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
16631658
Left e -> pure (err e, Nothing)
16641659
Right s -> deliver s
16651660
Just s@Sub {subThread} -> do
@@ -1761,9 +1756,9 @@ client
17611756
then action q qr
17621757
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
17631758

1764-
subscribeNotifications :: Maybe (Either ErrorType ()) -> StoreQueue s -> NtfCreds -> M s BrokerMsg
1765-
subscribeNotifications assocResult q NtfCreds {ntfServiceId} =
1766-
sharedSubscribeQueue assocResult q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
1759+
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
1760+
subscribeNotifications q NtfCreds {ntfServiceId} =
1761+
sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
17671762
Left e -> pure $ ERR e
17681763
Right (hasSub, _) -> do
17691764
when (isNothing clntServiceId) $
@@ -1772,7 +1767,6 @@ client
17721767

17731768
sharedSubscribeQueue ::
17741769
(PartyI p, ServiceParty p) =>
1775-
Maybe (Either ErrorType ()) ->
17761770
StoreQueue s ->
17771771
SParty p ->
17781772
Maybe ServiceId ->
@@ -1782,7 +1776,7 @@ client
17821776
STM sub ->
17831777
(ServerStats -> ServiceStats) ->
17841778
M s (Either ErrorType (Bool, Maybe sub))
1785-
sharedSubscribeQueue assocResult q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
1779+
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
17861780
stats <- asks serverStats
17871781
let incSrvStat sel = incStat $ sel $ servicesSel stats
17881782
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)
@@ -1797,9 +1791,7 @@ client
17971791
incSrvStat srvSubQueues
17981792
incSrvStat srvAssocDuplicate
17991793
pure $ Right (hasSub, Nothing)
1800-
| otherwise -> case assocResult of
1801-
Just (Left e) -> pure $ Left e
1802-
_ -> runExceptT $ do
1794+
| otherwise -> runExceptT $ do
18031795
-- association already done in prepareBatch
18041796
hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
18051797
atomically writeSub
@@ -1813,9 +1805,7 @@ client
18131805
-- This function is used when queue association with the service is created.
18141806
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash
18151807
Nothing -> case queueServiceId of
1816-
Just _ -> case assocResult of
1817-
Just (Left e) -> pure $ Left e
1818-
_ -> runExceptT $ do
1808+
Just _ -> runExceptT $ do
18191809
-- unassociation already done in prepareBatch
18201810
liftIO $ incSrvStat srvAssocRemoved
18211811
-- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
@@ -2125,7 +2115,7 @@ client
21252115
-- rejectOrVerify filters allowed commands, no need to repeat it here.
21262116
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
21272117
-- `fst` removes empty message that is only returned for `SUB` command
2128-
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
2118+
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'')
21292119
-- encode response
21302120
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
21312121
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
353353
{-# INLINE getCreateService #-}
354354
setQueueService = withQS setQueueService
355355
{-# INLINE setQueueService #-}
356-
setRcvQueueServices st = withQS (\qs -> setRcvQueueServices qs) st
357-
{-# INLINE setRcvQueueServices #-}
358-
setNtfQueueServices st = withQS (\qs -> setNtfQueueServices qs) st
359-
{-# INLINE setNtfQueueServices #-}
356+
setQueueServices st = withQS (\qs -> setQueueServices qs) st
357+
{-# INLINE setQueueServices #-}
360358
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
361359
{-# INLINE getQueueNtfServices #-}
362360
getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s))

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
504504
atomically $ writeTVar (queueRec sq) $ Just q'
505505
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
506506

507-
setRcvQueueServices _ _ [] = pure S.empty
508-
setRcvQueueServices _ _ _ = pure S.empty -- TODO batch implementation
509-
setNtfQueueServices _ _ [] = pure S.empty
510-
setNtfQueueServices _ _ _ = pure S.empty -- TODO batch implementation
507+
setQueueServices _ _ _ [] = pure $ Right M.empty
508+
setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO batch implementation
511509

512510
getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
513511
getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
337337
mapM_ (removeServiceQueue st serviceSel qId) prevSrvId
338338
mapM_ (addServiceQueue st serviceSel qId) serviceId
339339

340-
setRcvQueueServices _ _ _ = pure S.empty -- TODO loop implementation
341-
setNtfQueueServices _ _ _ = pure S.empty -- TODO loop implementation
340+
setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO loop implementation
342341

343342
getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
344343
getQueueNtfServices st ntfs = do

src/Simplex/Messaging/Server/QueueStore/Types.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Control.Concurrent.STM
1616
import Control.Monad
1717
import Data.Int (Int64)
1818
import Data.List.NonEmpty (NonEmpty)
19-
import Data.Set (Set)
19+
import Data.Map.Strict (Map)
2020
import Data.Text (Text)
2121
import Simplex.Messaging.Protocol
2222
import Simplex.Messaging.Server.QueueStore
@@ -52,8 +52,7 @@ class StoreQueueClass q => QueueStoreClass q s where
5252
deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec)
5353
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
5454
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
55-
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
56-
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
55+
setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ())))
5756
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
5857
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))
5958

0 commit comments

Comments
 (0)