@@ -1412,37 +1412,39 @@ sendMessagesB' c reqs = do
14121412sendMessagesB_ :: forall t . Traversable t => AgentClient -> t (Either AgentErrorType MsgReq ) -> Set ConnId -> AM' (t (Either AgentErrorType (AgentMsgId , PQEncryption )))
14131413sendMessagesB_ c reqs connIds = withConnLocks c connIds " sendMessages" $ do
14141414 prev <- newTVarIO Nothing
1415- reqs' <- withStoreBatch c $ \ db -> fmap (bindRight $ getConn_ db prev) reqs
1415+ reqs' <- withStoreBatch c $ \ db -> fmap (mapM $ getConn_ db prev) reqs
14161416 let (toEnable, reqs'') = mapAccumL prepareConn [] reqs'
14171417 void $ withStoreBatch' c $ \ db -> map (\ connId -> setConnPQSupport db connId PQSupportOn ) $ S. toList toEnable
14181418 enqueueMessagesB c reqs''
14191419 where
1420- getConn_ :: DB. Connection -> TVar (Maybe (Either AgentErrorType SomeConn )) -> MsgReq -> IO (Either AgentErrorType ( MsgReq , SomeConn ) )
1420+ getConn_ :: DB. Connection -> TVar (Maybe (Either AgentErrorType SomeConn )) -> MsgReq -> IO (MsgReq , Either AgentErrorType SomeConn )
14211421 getConn_ db prev req@ (connId, _, _, _) =
14221422 (req,)
1423- <$$ > if B. null connId
1423+ <$> if B. null connId
14241424 then fromMaybe (Left $ INTERNAL " sendMessagesB_: empty prev connId" ) <$> readTVarIO prev
14251425 else do
14261426 conn <- first storeError <$> getConn db connId
14271427 conn <$ atomically (writeTVar prev $ Just conn)
1428- prepareConn :: Set ConnId -> Either AgentErrorType (MsgReq , SomeConn ) -> (Set ConnId , Either AgentErrorType (ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ))
1428+ prepareConn :: Set ConnId -> Either AgentErrorType (MsgReq , Either AgentErrorType SomeConn ) -> (Set ConnId , Either AgentErrorType (Either AgentErrorType ( ConnData , NonEmpty SndQueue ) , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ))
14291429 prepareConn s (Left e) = (s, Left e)
1430- prepareConn s (Right ((_, pqEnc, msgFlags, msgOrRef), SomeConn _ conn)) = case conn of
1431- DuplexConnection cData _ sqs -> prepareMsg cData sqs
1432- SndConnection cData sq -> prepareMsg cData [sq]
1433- _ -> (s, Left $ CONN SIMPLEX " sendMessagesB_" )
1430+ prepareConn s (Right ((_, pqEnc, msgFlags, msgOrRef), conn_)) = case conn_ of
1431+ Right (SomeConn cType conn) -> case conn of
1432+ DuplexConnection cData _ sqs -> prepareMsg cData sqs
1433+ SndConnection cData sq -> prepareMsg cData [sq]
1434+ -- we can't fail here, as it may prevent delivery of subsequent messages that reference the body of the failed message.
1435+ _ -> (s, mkReq $ Left $ CONN SIMPLEX $ " sendMessagesB_ " <> show (connType cType))
1436+ Left e -> (s, mkReq $ Left e)
14341437 where
1435- prepareMsg :: ConnData -> NonEmpty SndQueue -> (Set ConnId , Either AgentErrorType (ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ))
1438+ prepareMsg :: ConnData -> NonEmpty SndQueue -> (Set ConnId , Either AgentErrorType (Either AgentErrorType ( ConnData , NonEmpty SndQueue ) , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ))
14361439 prepareMsg cData@ ConnData {connId, pqSupport} sqs
1437- | ratchetSyncSendProhibited cData = (s, Left $ CMD PROHIBITED " sendMessagesB: send prohibited" )
1440+ | ratchetSyncSendProhibited cData = (s, mkReq $ Left $ CMD PROHIBITED " sendMessagesB: send prohibited" )
14381441 -- connection is only updated if PQ encryption was disabled, and now it has to be enabled.
14391442 -- support for PQ encryption (small message envelopes) will not be disabled when message is sent.
14401443 | pqEnc == PQEncOn && pqSupport == PQSupportOff =
14411444 let cData' = cData {pqSupport = PQSupportOn } :: ConnData
1442- in (S. insert connId s, mkReq cData')
1443- | otherwise = (s, mkReq cData)
1444- where
1445- mkReq cData' = Right (cData', sqs, Just pqEnc, msgFlags, A_MSG <$> msgOrRef)
1445+ in (S. insert connId s, mkReq $ Right (cData', sqs))
1446+ | otherwise = (s, mkReq $ Right (cData, sqs))
1447+ mkReq csqs_ = Right (csqs_, Just pqEnc, msgFlags, A_MSG <$> msgOrRef)
14461448
14471449-- / async command processing v v v
14481450
@@ -1626,10 +1628,10 @@ enqueueMessages c cData sqs msgFlags aMessage = do
16261628
16271629enqueueMessages' :: AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> AM (AgentMsgId , CR. PQEncryption )
16281630enqueueMessages' c cData sqs msgFlags aMessage =
1629- ExceptT $ runIdentity <$> enqueueMessagesB c (Identity (Right (cData, sqs, Nothing , msgFlags, vrValue aMessage)))
1631+ ExceptT $ runIdentity <$> enqueueMessagesB c (Identity (Right (Right ( cData, sqs) , Nothing , msgFlags, vrValue aMessage)))
16301632{-# INLINE enqueueMessages' #-}
16311633
1632- enqueueMessagesB :: Traversable t => AgentClient -> t (Either AgentErrorType (ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage )) -> AM' (t (Either AgentErrorType (AgentMsgId , PQEncryption )))
1634+ enqueueMessagesB :: Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType ( ConnData , NonEmpty SndQueue ) , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage )) -> AM' (t (Either AgentErrorType (AgentMsgId , PQEncryption )))
16331635enqueueMessagesB c reqs = do
16341636 reqs' <- enqueueMessageB c reqs
16351637 enqueueSavedMessageB c $ mapMaybe snd $ rights $ toList reqs'
@@ -1641,35 +1643,50 @@ isActiveSndQ SndQueue {status} = status == Secured || status == Active
16411643
16421644enqueueMessage :: AgentClient -> ConnData -> SndQueue -> MsgFlags -> AMessage -> AM (AgentMsgId , PQEncryption )
16431645enqueueMessage c cData sq msgFlags aMessage =
1644- ExceptT $ fmap fst . runIdentity <$> enqueueMessageB c (Identity (Right (cData, [sq], Nothing , msgFlags, vrValue aMessage)))
1646+ ExceptT $ fmap fst . runIdentity <$> enqueueMessageB c (Identity (Right (Right ( cData, [sq]) , Nothing , msgFlags, vrValue aMessage)))
16451647{-# INLINE enqueueMessage #-}
16461648
16471649-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
1648- enqueueMessageB :: forall t . Traversable t => AgentClient -> t (Either AgentErrorType (ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage )) -> AM' (t (Either AgentErrorType ((AgentMsgId , PQEncryption ), Maybe (ConnData , [SndQueue ], AgentMsgId ))))
1650+ enqueueMessageB :: forall t . Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType ( ConnData , NonEmpty SndQueue ) , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage )) -> AM' (t (Either AgentErrorType ((AgentMsgId , PQEncryption ), Maybe (ConnData , [SndQueue ], AgentMsgId ))))
16491651enqueueMessageB c reqs = do
16501652 cfg <- asks config
16511653 (_, reqMids) <- unsafeWithStore c $ \ db -> do
16521654 mapAccumLM (\ ids r -> storeSentMsg db cfg ids r `E.catchAny` \ e -> (ids,) <$> handleInternal e) IM. empty reqs
1653- forME reqMids $ \ ((cData, sq :| sqs, _, _, _), InternalId msgId, pqSecr) -> do
1655+ forME reqMids $ \ ((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \ (cData, sq :| sqs ) -> do
16541656 submitPendingMsg c cData sq
16551657 let sqs' = filter isActiveSndQ sqs
1656- pure $ Right ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
1658+ pure ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
16571659 where
1658- storeSentMsg :: DB. Connection -> AgentConfig -> IntMap (Int64 , AMessage ) -> Either AgentErrorType (ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ) -> IO (IntMap (Int64 , AMessage ), Either AgentErrorType ((ConnData , NonEmpty SndQueue , Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ), InternalId , PQEncryption ))
1660+ storeSentMsg ::
1661+ DB. Connection ->
1662+ AgentConfig ->
1663+ IntMap (Maybe Int64 , AMessage ) ->
1664+ Either AgentErrorType (Either AgentErrorType (ConnData , NonEmpty SndQueue ), Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ) ->
1665+ IO (IntMap (Maybe Int64 , AMessage ), Either AgentErrorType ((Either AgentErrorType (ConnData , NonEmpty SndQueue ), Maybe PQEncryption , MsgFlags , ValueOrRef AMessage ), InternalId , PQEncryption ))
16591666 storeSentMsg db cfg aMessageIds = \ case
16601667 Left e -> pure (aMessageIds, Left e)
1661- Right req@ (cData @ ConnData {connId}, sq :| _ , pqEnc_, msgFlags, mbr) -> case mbr of
1668+ Right req@ (csqs_ , pqEnc_, msgFlags, mbr) -> case mbr of
16621669 VRValue i_ aMessage -> case i_ >>= (`IM.lookup` aMessageIds) of
16631670 Just _ -> pure (aMessageIds, Left $ INTERNAL " enqueueMessageB: storeSentMsg duplicate saved message body" )
16641671 Nothing -> do
1665- mbId <- createSndMsgBody db aMessage
1666- let aMessageIds' = maybe id (`IM.insert` (mbId, aMessage)) i_ aMessageIds
1667- (aMessageIds',) <$> storeSentMsg_ mbId aMessage
1668- VRRef i -> (aMessageIds,) <$> case IM. lookup i aMessageIds of
1669- Just (mbId, aMessage) -> storeSentMsg_ mbId aMessage
1670- Nothing -> pure $ Left $ INTERNAL " enqueueMessageB: storeSentMsg missing saved message body id"
1672+ (mbId_, r) <- case csqs_ of
1673+ Left e -> pure (Nothing , Left e)
1674+ Right (cData, sq :| _) -> do
1675+ mbId <- createSndMsgBody db aMessage
1676+ (Just mbId,) <$> storeSentMsg_ cData sq mbId aMessage
1677+ let aMessageIds' = maybe id (`IM.insert` (mbId_, aMessage)) i_ aMessageIds
1678+ pure (aMessageIds', r)
1679+ VRRef i -> case csqs_ of
1680+ Left e -> pure $ (aMessageIds, Left e)
1681+ Right (cData, sq :| _) -> case IM. lookup i aMessageIds of
1682+ Just (Just mbId, aMessage) -> (aMessageIds,) <$> storeSentMsg_ cData sq mbId aMessage
1683+ Just (Nothing , aMessage) -> do
1684+ mbId <- createSndMsgBody db aMessage
1685+ let aMessageIds' = IM. insert i (Just mbId, aMessage) aMessageIds
1686+ (aMessageIds',) <$> storeSentMsg_ cData sq mbId aMessage
1687+ Nothing -> pure (aMessageIds, Left $ INTERNAL " enqueueMessageB: storeSentMsg missing saved message body id" )
16711688 where
1672- storeSentMsg_ sndMsgBodyId aMessage = fmap (first storeError) $ runExceptT $ do
1689+ storeSentMsg_ cData @ ConnData {connId} sq sndMsgBodyId aMessage = fmap (first storeError) $ runExceptT $ do
16731690 let AgentConfig {e2eEncryptVRange} = cfg
16741691 internalTs <- liftIO getCurrentTime
16751692 (internalId, internalSndId, prevMsgHash) <- ExceptT $ updateSndIds db connId
0 commit comments