-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathWorker.hs
More file actions
320 lines (302 loc) · 14.2 KB
/
Worker.hs
File metadata and controls
320 lines (302 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE StandaloneDeriving #-}
-- | This module provides the background worker for sending notification webhooks.
module Share.BackgroundJobs.Webhooks.Worker (worker) where
import Control.Lens hiding ((.=))
import Control.Monad.Except (ExceptT (..), runExceptT)
import Crypto.JWT (JWTError)
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy.Char8 qualified as BL
import Data.List.Extra qualified as List
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.Time (UTCTime)
import Ki.Unlifted qualified as Ki
import Network.HTTP.Client qualified as HTTPClient
import Network.HTTP.Types qualified as HTTP
import Network.URI (URI)
import Network.URI qualified as URI
import Share.BackgroundJobs.Errors (reportError)
import Share.BackgroundJobs.Monad (Background)
import Share.BackgroundJobs.Webhooks.Queries qualified as WQ
import Share.BackgroundJobs.Workers (newWorker)
import Share.ChatApps (Author (..))
import Share.ChatApps qualified as ChatApps
import Share.Env qualified as Env
import Share.IDs
import Share.IDs qualified as IDs
import Share.JWT (JWTParam (..))
import Share.JWT qualified as JWT
import Share.Metrics qualified as Metrics
import Share.Notifications.Ops qualified as NotOps
import Share.Notifications.Queries qualified as NQ
import Share.Notifications.Types
import Share.Notifications.Webhooks.Secrets (WebhookConfig (..), WebhookSecretError)
import Share.Notifications.Webhooks.Secrets qualified as Webhooks
import Share.Postgres qualified as PG
import Share.Postgres.Notifications qualified as Notif
import Share.Prelude
import Share.Utils.Logging qualified as Logging
import Share.Utils.URI (URIParam (..))
import Share.Web.Authorization qualified as AuthZ
import Share.Web.Share.DisplayInfo.Queries qualified as DisplayInfoQ
import Share.Web.Share.DisplayInfo.Types (UnifiedDisplayInfo)
import Share.Web.Share.DisplayInfo.Types qualified as DisplayInfo
import Share.Web.UI.Links qualified as Links
import UnliftIO qualified
data WebhookSendFailure
= ReceiverError NotificationEventId NotificationWebhookId HTTP.Status BL.ByteString
| InvalidRequest NotificationEventId NotificationWebhookId UnliftIO.SomeException
| WebhookSecretFetchError NotificationEventId NotificationWebhookId WebhookSecretError
| JWTError NotificationEventId NotificationWebhookId JWTError
deriving stock (Show)
instance Logging.Loggable WebhookSendFailure where
toLog = \case
ReceiverError eventId webhookId status body ->
Logging.textLog
( "Webhook receiver error: "
<> Text.pack (show status)
<> " "
<> Text.decodeUtf8 (BL.toStrict body)
)
& Logging.withTag ("status", tShow status)
& Logging.withTag ("event_id", tShow eventId)
& Logging.withTag ("webhook_id", tShow webhookId)
& Logging.withSeverity Logging.UserFault
InvalidRequest eventId webhookId err ->
Logging.textLog ("Invalid request: " <> Text.pack (show err))
& Logging.withTag ("event_id", tShow eventId)
& Logging.withTag ("webhook_id", tShow webhookId)
& Logging.withSeverity Logging.UserFault
WebhookSecretFetchError eventId webhookId err ->
Logging.textLog ("Failed to fetch webhook secret: " <> Text.pack (show err))
& Logging.withTag ("event_id", tShow eventId)
& Logging.withTag ("webhook_id", tShow webhookId)
& Logging.withSeverity Logging.Error
JWTError eventId webhookId err ->
Logging.textLog ("JWT error: " <> Text.pack (show err))
& Logging.withTag ("event_id", tShow eventId)
& Logging.withTag ("webhook_id", tShow webhookId)
& Logging.withSeverity Logging.Error
-- | Check every 10 minutes if we haven't heard on the notifications channel.
-- Just in case we missed a notification.
maxPollingIntervalSeconds :: Int
maxPollingIntervalSeconds = 10 * 60 -- 10 minutes
worker :: Ki.Scope -> Background ()
worker scope = do
authZReceipt <- AuthZ.backgroundJobAuthZ
newWorker scope "notifications:webhooks" $ forever do
Notif.waitOnChannel Notif.WebhooksChannel (maxPollingIntervalSeconds * 1000000)
processWebhooks authZReceipt
where
processWebhooks :: AuthZ.AuthZReceipt -> Background ()
processWebhooks authZReceipt = do
toIO <- UnliftIO.askRunInIO
-- Need to unlift so we can use this in transactions
let tryWebhookIO eventData webhookId = toIO $ tryWebhook eventData webhookId
mayResult <- Metrics.recordWebhookSendingDuration $ PG.runTransactionMode PG.ReadCommitted PG.ReadWrite $ runMaybeT $ do
webhookInfo@(eventId, webhookId) <- MaybeT WQ.getUnsentWebhook
mayErr <- lift $ attemptWebhookSend authZReceipt tryWebhookIO eventId webhookId
pure (mayErr, webhookInfo)
case mayResult of
Just (Just err, _) -> do
case err of
WebhookSecretFetchError {} -> reportError err
_ -> Logging.logErrorText $ "Webhook send errors: " <> tShow err
-- If we got an error, we can retry it.
-- TODO: Add some sort of backoff?
processWebhooks authZReceipt
(Just (Nothing, (eventId, webhookId))) -> do
Logging.textLog ("Webhook sent successfully: " <> Text.pack (show webhookId))
& Logging.withTag ("webhook_id", tShow webhookId)
& Logging.withTag ("event_id", tShow eventId)
& Logging.withSeverity Logging.Info
& Logging.logMsg
-- Keep processing releases until we run out of them.
processWebhooks authZReceipt
Nothing -> do
-- No webhooks ready to try at the moment, we can wait till more are available.
pure ()
--
webhookTimeout :: HTTPClient.ResponseTimeout
webhookTimeout = HTTPClient.responseTimeoutMicro (20 * 1000000 {- 20 seconds -})
data WebhookEventPayload jwt = WebhookEventPayload
{ -- | The event ID of the notification event.
eventId :: NotificationEventId,
-- | The time at which the event occurred.
occurredAt :: UTCTime,
-- | The topic of the notification event.
topic :: NotificationTopic,
-- | The data associated with the notification event.
data_ :: HydratedEvent,
-- | A signed token containing all of the same data.
jwt :: jwt
}
deriving stock (Show, Eq)
deriving via JWT.JSONJWTClaims (WebhookEventPayload ()) instance JWT.AsJWTClaims (WebhookEventPayload ())
instance ToJSON (WebhookEventPayload JWTParam) where
toJSON WebhookEventPayload {eventId, occurredAt, topic, data_, jwt} =
Aeson.object
[ "eventId" Aeson..= eventId,
"occurredAt" Aeson..= occurredAt,
"topic" Aeson..= topic,
"data" Aeson..= data_,
"signed" Aeson..= jwt
]
instance ToJSON (WebhookEventPayload ()) where
toJSON WebhookEventPayload {eventId, occurredAt, topic, data_} =
Aeson.object
[ "eventId" Aeson..= eventId,
"occurredAt" Aeson..= occurredAt,
"topic" Aeson..= topic,
"data" Aeson..= data_
]
instance FromJSON (WebhookEventPayload ()) where
parseJSON = Aeson.withObject "WebhookEventPayload" $ \o ->
WebhookEventPayload
<$> o Aeson..: "eventId"
<*> o Aeson..: "occurredAt"
<*> o Aeson..: "topic"
<*> o Aeson..: "data"
<*> pure ()
tryWebhook ::
NotificationEvent NotificationEventId UnifiedDisplayInfo UTCTime HydratedEvent ->
NotificationWebhookId ->
Background (Maybe WebhookSendFailure)
tryWebhook event webhookId = UnliftIO.handleAny (\someException -> pure $ Just $ InvalidRequest event.eventId webhookId someException) do
fmap (either Just (const Nothing)) $ runExceptT do
proxiedHTTPManager <- asks Env.proxiedHttpClient
WebhookConfig {uri = URIParam uri} <-
lift (Webhooks.fetchWebhookConfig webhookId) >>= \case
Left err -> throwError $ WebhookSecretFetchError event.eventId webhookId err
Right config -> pure config
jwtSettings <- asks Env.jwtSettings
let payload =
WebhookEventPayload
{ eventId = event.eventId,
occurredAt = event.eventOccurredAt,
topic = hydratedEventTopic event.eventData,
data_ = event.eventData,
jwt = ()
}
payloadJWT <-
JWT.signJWT jwtSettings payload >>= \case
Left jwtErr -> throwError $ JWTError event.eventId webhookId jwtErr
Right jwt -> pure jwt
let payloadWithJWT = payload {jwt = JWTParam payloadJWT}
req <- ExceptT $ buildWebhookRequest webhookId uri event payloadWithJWT
resp <- liftIO $ HTTPClient.httpLbs req proxiedHTTPManager
case HTTPClient.responseStatus resp of
httpStatus@(HTTP.Status status _)
| status >= 400 -> throwError $ ReceiverError event.eventId webhookId httpStatus $ HTTPClient.responseBody resp
| otherwise -> pure ()
buildWebhookRequest :: NotificationWebhookId -> URI -> NotificationEvent NotificationEventId UnifiedDisplayInfo UTCTime HydratedEvent -> WebhookEventPayload JWTParam -> Background (Either WebhookSendFailure HTTPClient.Request)
buildWebhookRequest webhookId uri event defaultPayload = do
if
| isSlackWebhook uri -> buildChatAppPayload (Proxy @ChatApps.Slack) uri
| isDiscordWebhook uri -> buildChatAppPayload (Proxy @ChatApps.Discord) uri
| otherwise -> pure $ buildDefaultPayload
where
isSlackWebhook :: URI -> Bool
isSlackWebhook uri =
case URI.uriRegName <$> URI.uriAuthority uri of
Nothing -> False
Just regName -> List.isPrefixOf "hooks.slack.com" regName
isDiscordWebhook :: URI -> Bool
isDiscordWebhook uri =
case (URI.uriRegName <$> URI.uriAuthority uri) of
Just regName ->
Text.isPrefixOf "discord.com" (Text.pack regName)
&& Text.isPrefixOf "/api/webhooks" (Text.pack $ URI.uriPath uri)
_ -> False
buildDefaultPayload :: Either WebhookSendFailure HTTPClient.Request
buildDefaultPayload =
HTTPClient.requestFromURI uri
& mapLeft (\e -> InvalidRequest event.eventId webhookId e)
<&> \req ->
req
{ HTTPClient.method = "POST",
HTTPClient.responseTimeout = webhookTimeout,
HTTPClient.requestHeaders = [(HTTP.hContentType, "application/json")],
HTTPClient.requestBody = HTTPClient.RequestBodyLBS $ Aeson.encode defaultPayload
}
buildChatAppPayload :: forall provider. (ToJSON (ChatApps.MessageContent provider)) => Proxy provider -> URI -> Background (Either WebhookSendFailure HTTPClient.Request)
buildChatAppPayload _ uri = do
let actorName = event.eventActor ^. DisplayInfo.name_
actorHandle = "(" <> IDs.toText (PrefixedID @"@" $ event.eventActor ^. DisplayInfo.handle_) <> ")"
actorAuthor = maybe "" (<> " ") actorName <> actorHandle
actorAvatarUrl = event.eventActor ^. DisplayInfo.avatarUrl_
actorLink <- Links.userProfilePage (event.eventActor ^. DisplayInfo.handle_)
let mainLink = Just event.eventData.hydratedEventLink
messageContent :: ChatApps.MessageContent provider <- case event.eventData.hydratedEventPayload of
HydratedProjectBranchUpdatedPayload payload -> do
let pbShorthand = (projectBranchShortHandFromParts payload.projectInfo.projectShortHand payload.branchInfo.branchShortHand)
title = "Branch " <> IDs.toText pbShorthand <> " was just updated."
preText = title
pure $
ChatApps.MessageContent
{ preText = preText,
content = "Branch updated",
title = title,
mainLink,
author =
Author
{ authorName = Just actorAuthor,
authorLink = Just actorLink,
authorAvatarUrl = actorAvatarUrl
},
thumbnailUrl = Nothing,
timestamp = event.eventOccurredAt
}
HydratedProjectContributionCreatedPayload payload -> do
let pbShorthand = (projectBranchShortHandFromParts payload.projectInfo.projectShortHand payload.contributionInfo.contributionSourceBranch.branchShortHand)
title = payload.contributionInfo.contributionTitle
description = fromMaybe "" $ payload.contributionInfo.contributionDescription
preText = "New Contribution in " <> IDs.toText pbShorthand
pure $
ChatApps.MessageContent
{ preText = preText,
content = description,
title = title,
mainLink,
author =
Author
{ authorName = Just actorAuthor,
authorLink = Just actorLink,
authorAvatarUrl = actorAvatarUrl
},
thumbnailUrl = Nothing,
timestamp = event.eventOccurredAt
}
pure $
HTTPClient.requestFromURI uri
& mapLeft (\e -> InvalidRequest event.eventId webhookId e)
<&> ( \req ->
req
{ HTTPClient.method = "POST",
HTTPClient.responseTimeout = webhookTimeout,
HTTPClient.requestHeaders = [(HTTP.hContentType, "application/json")],
HTTPClient.requestBody = HTTPClient.RequestBodyLBS $ Aeson.encode messageContent
}
)
attemptWebhookSend ::
AuthZ.AuthZReceipt ->
(NotificationEvent NotificationEventId UnifiedDisplayInfo UTCTime HydratedEvent -> NotificationWebhookId -> IO (Maybe WebhookSendFailure)) ->
NotificationEventId ->
NotificationWebhookId ->
PG.Transaction e (Maybe WebhookSendFailure)
attemptWebhookSend _authZReceipt tryWebhookIO eventId webhookId = do
event <- NQ.expectEvent eventId
hydratedEventPayload <- forOf eventData_ event NQ.hydrateEventPayload
hydratedEvent <- for hydratedEventPayload NotOps.hydrateEvent
populatedEvent <- hydratedEvent & DisplayInfoQ.unifiedDisplayInfoForUserOf eventUserInfo_
PG.transactionUnsafeIO (tryWebhookIO populatedEvent webhookId) >>= \case
Just err -> do
WQ.recordFailedDeliveryAttempt eventId webhookId
pure $ Just err
Nothing -> do
WQ.markWebhookAsDelivered eventId webhookId
pure Nothing