-
-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathNtfSubSupervisor.hs
More file actions
583 lines (554 loc) · 29.1 KB
/
NtfSubSupervisor.hs
File metadata and controls
583 lines (554 loc) · 29.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Messaging.Agent.NtfSubSupervisor
( runNtfSupervisor,
nsUpdateToken,
nsRemoveNtfToken,
sendNtfSubCommand,
instantNotifications,
deleteToken,
closeNtfSupervisor,
)
where
import Control.Logger.Simple (logError, logInfo)
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import qualified Data.Set as S
import Data.Time (UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock (diffUTCTime)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow)
import System.Random (randomR)
import UnliftIO
import UnliftIO.Concurrent (forkIO)
import qualified UnliftIO.Exception as E
runNtfSupervisor :: AgentClient -> AM' ()
runNtfSupervisor c = do
ns <- asks ntfSupervisor
runExceptT startTknDelete >>= \case
Left e -> notifyErr e
Right _ -> pure ()
forever $ do
cmd <- atomically . readTBQueue $ ntfSubQ ns
handleErr . agentOperationBracket c AONtfNetwork waitUntilActive $
runExceptT (processNtfCmd c cmd) >>= \case
Left e -> notifyErr e
Right _ -> return ()
where
startTknDelete :: AM ()
startTknDelete = do
pendingDelServers <- withStore' c getPendingDelTknServers
lift . forM_ pendingDelServers $ getNtfTknDelWorker True c
handleErr :: AM' () -> AM' ()
handleErr = E.handle $ \(e :: E.SomeException) -> do
logError $ "runNtfSupervisor error " <> tshow e
notifyErr e
notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e
partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs
{-# INLINE partitionErrs #-}
ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId NtfSubscription {connId} = connId
processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
processNtfCmd c (cmd, connIds) = do
logInfo $ "processNtfCmd - cmd = " <> tshow cmd
let connIds' = L.toList connIds
case cmd of
NSCCreate -> do
(cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueueSub db) connIds')
notifyErrs c cErrs
logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions)
let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions
createNewSubs ns
resetSubs rs
lift $ do
mapM_ (getNtfSMPWorker True c) (S.fromList css)
mapM_ (getNtfNTFWorker True c) (S.fromList cns)
where
getQueueSub ::
DB.Connection ->
ConnId ->
IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub))
getQueueSub db connId = fmap (first storeError) $ runExceptT $ do
rq <- ExceptT $ getPrimaryRcvQueue db connId
sub <- liftIO $ getNtfSubscription db connId
pure (rq, sub)
createNewSubs :: [RcvQueue] -> AM ()
createNewSubs rqs = do
withTokenServer $ \ntfServer -> do
let newSubs = map (rqToNewSub ntfServer) rqs
(cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> withStoreBatch c (\db -> map (storeNewSub db) newSubs)
notifyErrs c cErrs
kickSMPWorkers rqs
where
rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
rqToNewSub ntfServer RcvQueue {userId, connId, server} = newNtfSubscription userId connId server Nothing ntfServer NASNew
storeNewSub :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType ())
storeNewSub db sub = first storeError <$> createNtfSubscription db sub (NSASMP NSASmpKey)
resetSubs :: [(RcvQueue, NtfSubscription)] -> AM ()
resetSubs rqSubs = do
withTokenServer $ \ntfServer -> do
let subsToReset = map (toResetSub ntfServer) rqSubs
(cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> withStoreBatch' c (\db -> map (storeResetSub db) subsToReset)
notifyErrs c cErrs
let rqs = map fst rqSubs
kickSMPWorkers rqs
where
toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub ntfServer (rq, sub) =
let RcvQueue {server = smpServer} = rq
in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)
partitionQueueSubActions ::
[(RcvQueue, Maybe NtfSupervisorSub)] ->
( [RcvQueue], -- new subs
[(RcvQueue, NtfSubscription)], -- reset subs
[SMPServer], -- continue work (SMP)
[NtfServer] -- continue work (Ntf)
)
partitionQueueSubActions = foldr decideSubWork ([], [], [], [])
where
-- sub = Nothing, needs to be created
decideSubWork (rq, Nothing) (ns, rs, css, cns) = (rq : ns, rs, css, cns)
decideSubWork (rq, Just (sub, subAction_)) (ns, rs, css, cns) =
case (clientNtfCreds rq, ntfQueueId sub) of
-- notifier ID created on SMP server (on ntf server subscription can be registered or not yet),
-- need to clarify action
(Just ClientNtfCreds {notifierId}, Just ntfQueueId')
| sameSrvAddr (qServer rq) subSMPServer && notifierId == ntfQueueId' -> contOrReset
| otherwise -> reset
(Nothing, Nothing) -> contOrReset
_ -> reset
where
NtfSubscription {ntfServer = subNtfServer, smpServer = subSMPServer} = sub
contOrReset = case subAction_ of
-- action was set to NULL after worker internal error
Nothing -> reset
Just (action, _)
-- subscription was marked for deletion / is being deleted
| isDeleteNtfSubAction action -> reset
-- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription)
| otherwise -> case action of
NSASMP _ -> (ns, rs, qServer rq : css, cns)
NSANtf _ -> (ns, rs, css, subNtfServer : cns)
reset = (ns, (rq, sub) : rs, css, cns)
NSCSmpDelete -> do
(cErrs, rqs) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueue db) connIds')
logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs)
(cErrs', _) <- lift $ partitionErrs qConnId rqs <$> withStoreBatch' c (\db -> map (updateAction db) rqs)
notifyErrs c (cErrs <> cErrs')
kickSMPWorkers rqs
where
getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId
updateAction :: DB.Connection -> RcvQueue -> IO ()
updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds'
where
kickSMPWorkers :: [RcvQueue] -> AM ()
kickSMPWorkers rqs = do
let smpServers = S.fromList $ map qServer rqs
lift $ mapM_ (getNtfSMPWorker True c) smpServers
getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker hasWork c server = do
ws <- asks $ ntfWorkers . ntfSupervisor
getAgentWorker "ntf_ntf" hasWork c server ws $ runNtfWorker c server
getNtfSMPWorker :: Bool -> AgentClient -> SMPServer -> AM' Worker
getNtfSMPWorker hasWork c server = do
ws <- asks $ ntfSMPWorkers . ntfSupervisor
getAgentWorker "ntf_smp" hasWork c server ws $ runNtfSMPWorker c server
getNtfTknDelWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfTknDelWorker hasWork c server = do
ws <- asks $ ntfTknDelWorkers . ntfSupervisor
getAgentWorker "ntf_tkn_del" hasWork c server ws $ runNtfTknDelWorker c server
withTokenServer :: (NtfServer -> AM ()) -> AM ()
withTokenServer action = lift getNtfToken >>= mapM_ (\NtfToken {ntfServer} -> action ntfServer)
runNtfWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfWorker c srv Worker {doWork} =
forever $ do
waitForWork doWork
ExceptT $ agentOperationBracket c AONtfNetwork throwWhenInactive $ runExceptT runNtfOperation
where
runNtfOperation :: AM ()
runNtfOperation = do
ntfBatchSize <- asks $ ntfBatchSize . config
withWorkItems c doWork (\db -> getNextNtfSubNTFActions db srv ntfBatchSize) $ \nextSubs -> do
logInfo $ "runNtfWorker - length nextSubs = " <> tshow (length nextSubs)
currTs <- liftIO getCurrentTime
let (creates, checks, deletes, rotates) = splitActions currTs nextSubs
if null creates && null checks && null deletes && null rotates
then
let (_, _, firstActionTs) = L.head nextSubs
in lift $ rescheduleWork doWork currTs firstActionTs
else do
retrySubActions c creates createSubs
retrySubActions c checks checkSubs
retrySubActions c deletes deleteSubs
retrySubActions c rotates rotateSubs
splitActions :: UTCTime -> NonEmpty (NtfSubNTFAction, NtfSubscription, NtfActionTs) -> ([NtfSubscription], [NtfSubscription], [NtfSubscription], [NtfSubscription])
splitActions currTs = foldr addAction ([], [], [], [])
where
addAction (cmd, sub, ts) acc@(creates, checks, deletes, rotates) = case cmd of
NSACreate -> (sub : creates, checks, deletes, rotates)
NSACheck
| ts <= currTs -> (creates, sub : checks, deletes, rotates)
| otherwise -> acc
NSADelete -> (creates, checks, sub : deletes, rotates)
NSARotate -> (creates, checks, deletes, sub : rotates)
createSubs :: [NtfSubscription] -> AM' [NtfSubscription]
createSubs ntfSubs =
getNtfToken >>= \case
Just tkn@NtfToken {ntfServer, ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
subsRqs_ <- zip ntfSubs <$> withStoreBatch c (\db -> map (getQueue db) ntfSubs)
let (errs1, subs_, newSubs_) = splitSubs tknId subsRqs_
incStatByUserId ntfServer ntfCreateAttempts subs_
case (L.nonEmpty subs_, L.nonEmpty newSubs_) of
(Just subs, Just newSubs) -> do
rs <- L.zip subs <$> agentNtfCreateSubscriptions c tkn newSubs
let (ntfSubs', errs2, nSubIds) = splitResults $ L.toList rs
subs' = map fst nSubIds
errs2' = map (first ntfSubConnId) errs2
incStatByUserId ntfServer ntfCreated subs'
ts <- liftIO getCurrentTime
int <- asks $ ntfSubFirstCheckInterval . config
let checkTs = addUTCTime int ts
(errs3, _) <- partitionErrs ntfSubConnId subs' <$> withStoreBatch' c (\db -> map (updateSubNSACheck db checkTs) nSubIds)
workerErrors c $ errs1 <> errs2' <> errs3
pure ntfSubs'
_ -> workerErrors c errs1 $> []
_ -> do
let errs = map (\sub -> (ntfSubConnId sub, INTERNAL "NSACreate - no active token")) ntfSubs
workerErrors c errs
pure []
where
getQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType RcvQueue)
getQueue db NtfSubscription {connId} = first storeError <$> getPrimaryRcvQueue db connId
splitSubs :: NtfTokenId -> [(NtfSubscription, Either AgentErrorType RcvQueue)] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NewNtfEntity 'Subscription])
splitSubs tknId = foldr splitSub ([], [], [])
where
splitSub (sub, rq) (errs, subs, newSubs) = case rq of
Right RcvQueue {clientNtfCreds = Just creds} -> (errs, sub : subs, toNewSub sub creds : newSubs)
Right _ -> ((ntfSubConnId sub, INTERNAL "NSACreate - no notifier queue credentials") : errs, subs, newSubs)
Left e -> ((ntfSubConnId sub, e) : errs, subs, newSubs)
toNewSub NtfSubscription {smpServer} ClientNtfCreds {ntfPrivateKey, notifierId} =
NewNtfSub tknId (SMPQueueNtf smpServer notifierId) ntfPrivateKey
updateSubNSACheck :: DB.Connection -> UTCTime -> (NtfSubscription, NtfSubscriptionId) -> IO ()
updateSubNSACheck db checkTs (sub, nSubId) = updateNtfSubscription db sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NSANtf NSACheck) checkTs
checkSubs :: [NtfSubscription] -> AM' [NtfSubscription]
checkSubs ntfSubs =
getNtfToken >>= \case
Just tkn@NtfToken {ntfServer, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
let (errs1, subs_, subIds_) = splitSubs ntfSubs
incStatByUserId ntfServer ntfCheckAttempts subs_
case (L.nonEmpty subs_, L.nonEmpty subIds_) of
(Just subs, Just subIds) -> do
rs <- L.zip subs <$> agentNtfCheckSubscriptions c tkn subIds
let (ntfSubs', errs2, nSubStatuses) = splitResults $ L.toList rs
subs' = map fst nSubStatuses
(errs2', authSubs) = partitionEithers $ map (\case (sub, NTF _ SMP.AUTH) -> Right sub; e -> Left $ first ntfSubConnId e) errs2
incStatByUserId ntfServer ntfChecked subs'
ts <- liftIO getCurrentTime
int <- asks $ ntfSubCheckInterval . config
let nextCheckTs = addUTCTime int ts
(errs3, srvs) <- partitionErrs ntfSubConnId subs' <$> withStoreBatch' c (\db -> map (updateSub db ntfServer ts nextCheckTs) nSubStatuses)
(errs4, srvs') <- partitionErrs ntfSubConnId authSubs <$> withStoreBatch' c (\db -> map (recreateNtfSub db ntfServer ts) authSubs)
mapM_ (getNtfSMPWorker True c) $ S.fromList (catMaybes srvs <> srvs')
workerErrors c $ errs1 <> errs2' <> errs3 <> errs4
pure ntfSubs'
_ -> workerErrors c errs1 $> []
_ -> do
let errs = map (\sub -> (ntfSubConnId sub, INTERNAL "NSACheck - no active token")) ntfSubs
workerErrors c errs
pure []
where
splitSubs :: [NtfSubscription] -> ([(ConnId, AgentErrorType)], [NtfSubscription], [NtfSubscriptionId])
splitSubs = foldr splitSub ([], [], [])
where
splitSub sub (errs, subs, subIds) = case sub of
NtfSubscription {ntfSubId = Just subId} -> (errs, sub : subs, subId : subIds)
_ -> ((ntfSubConnId sub, INTERNAL "NSACheck - no subscription ID") : errs, subs, subIds)
updateSub :: DB.Connection -> NtfServer -> UTCTime -> UTCTime -> (NtfSubscription, NtfSubStatus) -> IO (Maybe SMPServer)
updateSub db ntfServer ts nextCheckTs (sub, status)
| ntfShouldSubscribe status =
let sub' = sub {ntfSubStatus = NASCreated status}
in Nothing <$ updateNtfSubscription db sub' (NSANtf NSACheck) nextCheckTs
-- ntf server stopped subscribing to this queue
| otherwise = Just <$> recreateNtfSub db ntfServer ts sub
recreateNtfSub :: DB.Connection -> NtfServer -> UTCTime -> NtfSubscription -> IO SMPServer
recreateNtfSub db ntfServer ts sub@NtfSubscription {smpServer} =
let sub' = sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew}
in smpServer <$ updateNtfSubscription db sub' (NSASMP NSASmpKey) ts
incStatByUserId :: NtfServer -> (AgentNtfServerStats -> TVar Int) -> [NtfSubscription] -> AM' ()
incStatByUserId ntfServer sel ss =
forM_ (M.assocs userIdsCounts) $ \(userId, count) ->
atomically $ incNtfServerStat' c userId ntfServer sel count
where
userIdsCounts = foldl' (\acc NtfSubscription {userId} -> M.insertWith (+) userId 1 acc) M.empty ss
-- NSADelete and NSARotate are deprecated, but their processing is kept for legacy db records;
-- These actions are not batched
deleteSubs :: [NtfSubscription] -> AM' [NtfSubscription]
deleteSubs ntfSubs = do
retrySubs_ <- mapM (runCatching deleteSub) ntfSubs
pure $ catMaybes retrySubs_
where
deleteSub :: NtfSubscription -> AM (Maybe NtfSubscription)
deleteSub sub@NtfSubscription {smpServer} =
deleteNtfSub sub $ do
let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
ts <- liftIO getCurrentTime
withStore' c $ \db -> updateNtfSubscription db sub' (NSASMP NSASmpDelete) ts
lift . void $ getNtfSMPWorker True c smpServer
rotateSubs :: [NtfSubscription] -> AM' [NtfSubscription]
rotateSubs ntfSubs = do
retrySubs_ <- mapM (runCatching rotateSub) ntfSubs
pure $ catMaybes retrySubs_
where
rotateSub :: NtfSubscription -> AM (Maybe NtfSubscription)
rotateSub sub@NtfSubscription {connId} =
deleteNtfSub sub $ do
withStore' c $ \db -> deleteNtfSubscription db connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (NSCCreate, [connId])
runCatching :: (NtfSubscription -> AM (Maybe NtfSubscription)) -> NtfSubscription -> AM' (Maybe NtfSubscription)
runCatching action sub@NtfSubscription {connId} =
fromRight Nothing
<$> runExceptT (action sub `catchAgentError` \e -> workerInternalError c connId (show e) $> Nothing)
-- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated
deleteNtfSub :: NtfSubscription -> AM () -> AM (Maybe NtfSubscription)
deleteNtfSub sub@NtfSubscription {userId, ntfSubId} continue = case ntfSubId of
Just nSubId ->
lift getNtfToken >>= \case
Just tkn@NtfToken {ntfServer} -> do
atomically $ incNtfServerStat c userId ntfServer ntfDelAttempts
tryAgentError (agentNtfDeleteSubscription c nSubId tkn) >>= \case
Right _ -> do
atomically $ incNtfServerStat c userId ntfServer ntfDeleted
continue'
Left e
| temporaryOrHostError e -> pure $ Just sub -- don't continue, retry
| otherwise -> continue'
Nothing -> continue'
_ -> continue'
where
continue' = continue $> Nothing -- continue without retry
runNtfSMPWorker :: AgentClient -> SMPServer -> Worker -> AM ()
runNtfSMPWorker c srv Worker {doWork} = forever $ do
waitForWork doWork
ExceptT $ agentOperationBracket c AONtfNetwork throwWhenInactive $ runExceptT runNtfSMPOperation
where
runNtfSMPOperation :: AM ()
runNtfSMPOperation = do
ntfBatchSize <- asks $ ntfBatchSize . config
withWorkItems c doWork (\db -> getNextNtfSubSMPActions db srv ntfBatchSize) $ \nextSubs -> do
logInfo $ "runNtfSMPWorker - length nextSubs = " <> tshow (length nextSubs)
let (creates, deletes) = splitActions nextSubs
retrySubActions c creates createNotifierKeys
retrySubActions c deletes deleteNotifierKeys
splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription) -> ([NtfSubscription], [NtfSubscription])
splitActions = foldr addAction ([], [])
where
addAction (cmd, sub) (creates, deletes) = case cmd of
NSASmpKey -> (sub : creates, deletes)
NSASmpDelete -> (creates, sub : deletes)
createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys ntfSubs =
getNtfToken >>= \case
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
(errs1, subRqKeys) <- prepareQueueSmpKey ntfSubs
rs <- enableQueuesNtfs c subRqKeys
let (subRqKeys', errs2, successes) = splitResults rs
ntfSubs' = map eqnrNtfSub subRqKeys'
errs2' = map (first (qConnId . eqnrRq)) errs2
ts <- liftIO getCurrentTime
(errs3, srvs) <- partitionErrs (qConnId . eqnrRq . fst) successes <$> withStoreBatch' c (\db -> map (storeNtfSubCreds db ts) successes)
mapM_ (getNtfNTFWorker True c) $ S.fromList srvs
workerErrors c $ errs1 <> errs2' <> errs3
pure ntfSubs'
_ -> do
let errs = map (\sub -> (ntfSubConnId sub, INTERNAL "NSASmpKey - no active token")) ntfSubs
workerErrors c errs
pure []
where
prepareQueueSmpKey :: [NtfSubscription] -> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey subs = do
alg <- asks (rcvAuthAlg . config)
g <- asks random
partitionErrs ntfSubConnId subs <$> withStoreBatch c (\db -> map (getQueue db alg g) subs)
where
getQueue :: DB.Connection -> C.AuthAlg -> TVar ChaChaDRG -> NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue db (C.AuthAlg a) g sub = fmap (first storeError) $ runExceptT $ do
rq <- ExceptT $ getPrimaryRcvQueue db (ntfSubConnId sub)
authKeyPair <- atomically $ C.generateAuthKeyPair a g
rcvNtfKeyPair <- atomically $ C.generateKeyPair g
pure (EnableQueueNtfReq sub rq authKeyPair rcvNtfKeyPair)
storeNtfSubCreds :: DB.Connection -> UTCTime -> (EnableQueueNtfReq, (SMP.NotifierId, SMP.RcvNtfPublicDhKey)) -> IO NtfServer
storeNtfSubCreds db ts (EnableQueueNtfReq {eqnrNtfSub, eqnrAuthKeyPair = (ntfPublicKey, ntfPrivateKey), eqnrRcvKeyPair = (_, pk)}, (notifierId, srvPubDhKey)) = do
let NtfSubscription {ntfServer} = eqnrNtfSub
rcvNtfDhSecret = C.dh' srvPubDhKey pk
setRcvQueueNtfCreds db (ntfSubConnId eqnrNtfSub) $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
updateNtfSubscription db eqnrNtfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts
pure ntfServer
deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys ntfSubs = do
(errs1, subRqs) <- partitionErrs ntfSubConnId ntfSubs <$> withStoreBatch c (\db -> map (resetCredsGetQueue db) ntfSubs)
rs <- disableQueuesNtfs c subRqs
let (subRqs', errs2, successes) = splitResults rs
ntfSubs' = map fst subRqs'
errs2' = map (first (qConnId . snd)) errs2
disabledRqs = map (snd . fst) successes
(errs3, _) <- partitionErrs qConnId disabledRqs <$> withStoreBatch' c (\db -> map (deleteSub db) disabledRqs)
workerErrors c $ errs1 <> errs2' <> errs3
pure ntfSubs'
where
resetCredsGetQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue db sub@NtfSubscription {connId} = fmap (first storeError) $ runExceptT $ do
liftIO $ setRcvQueueNtfCreds db connId Nothing
rq <- ExceptT $ getPrimaryRcvQueue db connId
pure (sub, rq)
deleteSub :: DB.Connection -> RcvQueue -> IO ()
deleteSub db rq = deleteNtfSubscription db (qConnId rq)
retrySubActions :: AgentClient -> [NtfSubscription] -> ([NtfSubscription] -> AM' [NtfSubscription]) -> AM ()
retrySubActions _ [] _ = pure ()
retrySubActions c subs action = do
v <- newTVarIO subs
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
subs' <- readTVarIO v
retrySubs <- lift $ action subs'
unless (null retrySubs) $ do
atomically $ writeTVar v retrySubs
retryNetworkLoop c loop
-- (temporary errs, other errs, successes)
splitResults :: [(a, Either AgentErrorType r)] -> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults = foldr addRes ([], [], [])
where
addRes (a, r_) (as, errs, rs) = case r_ of
Right r -> (as, errs, (a, r) : rs)
Left e
| temporaryOrHostError e -> (a : as, errs, rs)
| otherwise -> (as, (a, e) : errs, rs)
rescheduleWork :: TMVar () -> UTCTime -> UTCTime -> AM' ()
rescheduleWork doWork ts actionTs = do
void . atomically $ tryTakeTMVar doWork
void . forkIO $ do
liftIO $ threadDelay' $ diffToMicroseconds $ diffUTCTime actionTs ts
atomically $ hasWorkToDo' doWork
retryNetworkLoop :: AgentClient -> AM () -> AM ()
retryNetworkLoop c loop = do
atomically $ endAgentOperation c AONtfNetwork
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AONtfNetwork
loop
workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors c connErrs =
unless (null connErrs) $ do
void $ withStoreBatch' c (\db -> map (setNullNtfSubscriptionAction db . fst) connErrs)
notifyErrs c connErrs
workerInternalError :: AgentClient -> ConnId -> String -> AM ()
workerInternalError c connId internalErrStr = do
withStore' c $ \db -> setNullNtfSubscriptionAction db connId
notifyInternalError c connId internalErrStr
-- TODO change error
notifyInternalError :: MonadIO m => AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient {subQ} connId internalErrStr = atomically $ writeTBQueue subQ ("", connId, AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
{-# INLINE notifyInternalError #-}
notifyInternalError' :: MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
{-# INLINE notifyInternalError' #-}
notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient {subQ} connErrs = unless (null connErrs) $ atomically $ writeTBQueue subQ ("", "", AEvt SAENone $ ERRS connErrs)
{-# INLINE notifyErrs #-}
getNtfToken :: AM' (Maybe NtfToken)
getNtfToken = do
tkn <- asks $ ntfTkn . ntfSupervisor
readTVarIO tkn
nsUpdateToken :: NtfSupervisor -> NtfToken -> STM ()
nsUpdateToken ns tkn = writeTVar (ntfTkn ns) $ Just tkn
nsRemoveNtfToken :: NtfSupervisor -> STM ()
nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing
sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> STM ()
sendNtfSubCommand ns cmd = do
tkn <- readTVar (ntfTkn ns)
when (instantNotifications tkn) $ writeTBQueue (ntfSubQ ns) cmd
instantNotifications :: Maybe NtfToken -> Bool
instantNotifications = \case
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> True
_ -> False
deleteToken :: AgentClient -> NtfToken -> AM ()
deleteToken c tkn@NtfToken {ntfServer, ntfTokenId, ntfPrivKey} = do
setToDelete <- withStore' c $ \db -> do
removeNtfToken db tkn
case ntfTokenId of
Just tknId -> addNtfTokenToDelete db ntfServer ntfPrivKey tknId $> True
Nothing -> pure False
ns <- asks ntfSupervisor
atomically $ nsRemoveNtfToken ns
when setToDelete $ void $ lift $ getNtfTknDelWorker True c ntfServer
runNtfTknDelWorker :: AgentClient -> NtfServer -> Worker -> AM ()
runNtfTknDelWorker c srv Worker {doWork} =
forever $ do
waitForWork doWork
ExceptT $ agentOperationBracket c AONtfNetwork throwWhenInactive $ runExceptT runNtfOperation
where
runNtfOperation :: AM ()
runNtfOperation =
withWork c doWork (`getNextNtfTokenToDelete` srv) $
\nextTknToDelete -> do
logInfo $ "runNtfTknDelWorker, nextTknToDelete " <> tshow nextTknToDelete
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
processTknToDelete nextTknToDelete `catchAgentError` retryTmpError loop nextTknToDelete
retryTmpError :: AM () -> NtfTokenToDelete -> AgentErrorType -> AM ()
retryTmpError loop (tknDbId, _, _) e = do
logError $ "ntf tkn del error: " <> tshow e
if temporaryOrHostError e
then retryNetworkLoop c loop
else do
withStore' c $ \db -> deleteNtfTokenToDelete db tknDbId
notifyInternalError' c (show e)
processTknToDelete :: NtfTokenToDelete -> AM ()
processTknToDelete (tknDbId, ntfPrivKey, tknId) = do
agentNtfDeleteToken c srv ntfPrivKey tknId
withStore' c $ \db -> deleteNtfTokenToDelete db tknDbId
closeNtfSupervisor :: NtfSupervisor -> IO ()
closeNtfSupervisor ns = do
stopWorkers $ ntfWorkers ns
stopWorkers $ ntfSMPWorkers ns
stopWorkers $ ntfTknDelWorkers ns
where
stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker)