Skip to content

Commit 1af1c4f

Browse files
committed
WPB-24076: Add meeting cleaner job in background-worker
1 parent a2aa8c4 commit 1af1c4f

17 files changed

Lines changed: 480 additions & 3 deletions

File tree

charts/wire-server/templates/background-worker/configmap.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ data:
9090
{{toYaml .backendNotificationPusher | indent 6 }}
9191
{{- with .backgroundJobs }}
9292
backgroundJobs:
93+
{{ toYaml . | indent 6 }}
94+
{{- end }}
95+
{{- with .meetingsCleanup }}
96+
meetingsCleanup:
9397
{{ toYaml . | indent 6 }}
9498
{{- end }}
9599
{{- if .postgresMigration }}

integration/test/Test/Meetings.hs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ module Test.Meetings where
44

55
import API.Galley
66
import qualified API.GalleyInternal as I
7+
import Control.Monad.Reader (ask)
8+
import qualified Data.Text as Text
9+
import qualified Data.Text.Encoding as Text
710
import Data.Time.Clock
811
import qualified Data.Time.Format as Time
912
import SetupHelpers
13+
import System.Timeout (timeout)
1014
import Testlib.Prelude
15+
import Text.Regex.TDFA ((=~))
16+
import UnliftIO.Concurrent (threadDelay)
1117

1218
-- Helper to extract meetingId and domain from a meeting JSON object
1319
getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String)
@@ -311,3 +317,93 @@ testMeetingRemoveInvitationNotFound = do
311317
let removeInvitation = object ["emails" .= ["alice@example.com"]]
312318

313319
deleteMeetingInvitation owner "example.com" fakeMeetingId removeInvitation >>= assertStatus 404
320+
321+
testMeetingCleanup :: (HasCallStack) => App ()
322+
testMeetingCleanup = do
323+
env <- ask
324+
timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do
325+
-- 2 minutes timeout
326+
(owner, _tid, _members) <- createTeam OwnDomain 1
327+
now <- liftIO getCurrentTime
328+
-- Create a meeting that ends now.
329+
-- Configured retention is 0.0014 hours (~5 seconds).
330+
-- cutoffTime will be now' - 5s.
331+
-- We need end_date < cutoffTime.
332+
-- If we wait 6 seconds, now' = now + 6s.
333+
-- cutoffTime = now + 6s - 5s = now + 1s.
334+
-- end_date (now) < cutoffTime (now + 1s).
335+
let startTime = addUTCTime (negate 3600) now
336+
endTime = now
337+
newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime []
338+
339+
r1 <- postMeetings owner newMeeting
340+
assertSuccess r1
341+
meeting <- getJSON 201 r1
342+
(meetingId, domain) <- getMeetingIdAndDomain meeting
343+
344+
-- Wait 6 seconds to ensure meeting is old enough
345+
liftIO $ threadDelay 6_000_000
346+
347+
-- Wait for cleanup job to run
348+
waitForCleanupJob OwnDomain
349+
350+
-- Check it's gone
351+
getMeeting owner domain meetingId >>= assertStatus 404
352+
353+
case timedOutResult of
354+
Just () -> pure ()
355+
Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes"
356+
357+
waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App ()
358+
waitForCleanupJob domain = do
359+
initialMetrics <- getMetricsBody domain
360+
let initialCount = getRunCount initialMetrics
361+
362+
waitForIncrease domain initialCount
363+
where
364+
getMetricsBody d = do
365+
getMetrics d BackgroundWorker `bindResponse` \resp -> do
366+
resp.status `shouldMatchInt` 200
367+
pure $ Text.unpack $ Text.decodeUtf8 resp.body
368+
369+
getRunCount metrics =
370+
let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)")
371+
in case matches of
372+
[val] -> read val :: Int
373+
_ -> 0
374+
375+
waitForIncrease d oldVal = do
376+
metrics <- getMetricsBody d
377+
let newVal = getRunCount metrics
378+
-- We wait until it increases.
379+
-- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run.
380+
-- If it runs, it should become >= 1.
381+
-- But wait, if matches is empty, we return 0.
382+
-- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0).
383+
-- If it runs, it increments.
384+
when (newVal <= oldVal) $ do
385+
liftIO $ threadDelay 1_000_000 -- Wait 1s
386+
waitForIncrease d oldVal
387+
388+
testMeetingExpiration :: (HasCallStack) => App ()
389+
testMeetingExpiration = do
390+
(owner, _tid, _members) <- createTeam OwnDomain 1
391+
now <- liftIO getCurrentTime
392+
let startTime = addUTCTime (negate 3600) now
393+
-- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml
394+
endTime = now
395+
newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime []
396+
397+
r1 <- postMeetings owner newMeeting
398+
assertSuccess r1
399+
meeting <- getJSON 201 r1
400+
(meetingId, domain) <- getMeetingIdAndDomain meeting
401+
402+
-- Check it is accessible immediately (endDate = now, so valid until now + 5s)
403+
getMeeting owner domain meetingId >>= assertStatus 200
404+
405+
-- Wait 6 seconds
406+
liftIO $ threadDelay 6_000_000
407+
408+
-- Check it is expired
409+
getMeeting owner domain meetingId >>= assertStatus 404

libs/wire-subsystems/src/Wire/MeetingsStore.hs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,5 +164,13 @@ data MeetingsStore m a where
164164
MeetingId ->
165165
[EmailAddress] ->
166166
MeetingsStore m ()
167+
-- Cleanup operations
168+
GetOldMeetings ::
169+
UTCTime ->
170+
Int ->
171+
MeetingsStore m [StoredMeeting]
172+
DeleteMeetingBatch ::
173+
[MeetingId] ->
174+
MeetingsStore m Int64
167175

168176
makeSem ''MeetingsStore

libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import Hasql.Statement
3636
import Hasql.TH
3737
import Imports
3838
import Polysemy
39-
import Polysemy.Error (throw)
39+
import Polysemy.Error (Error, throw)
4040
import Polysemy.Input
4141
import Wire.API.Meeting (Recurrence)
4242
import Wire.API.PostgresMarshall (PostgresMarshall (..), PostgresUnmarshall (..), dimapPG)
@@ -63,6 +63,10 @@ interpretMeetingsStoreToPostgres =
6363
addInvitedEmailsImpl meetingId email
6464
RemoveInvitedEmails meetingId emails ->
6565
removeInvitedEmailsImpl meetingId emails
66+
GetOldMeetings cutoffTime batchSize ->
67+
getOldMeetingsImpl cutoffTime batchSize
68+
DeleteMeetingBatch meetingIds ->
69+
deleteMeetingBatchImpl meetingIds
6670

6771
-- * Create
6872

@@ -370,3 +374,56 @@ removeInvitedEmailsImpl meetingId emails = do
370374
updated_at = NOW()
371375
WHERE id = ($2 :: uuid)
372376
|]
377+
378+
getOldMeetingsImpl ::
379+
( Member (Input Pool) r,
380+
Member (Embed IO) r,
381+
Member (Error UsageError) r
382+
) =>
383+
UTCTime ->
384+
Int ->
385+
Sem r [StoredMeeting]
386+
getOldMeetingsImpl cutoffTime batchSize = do
387+
pool <- input
388+
result <- liftIO $ use pool session
389+
either throw pure result
390+
where
391+
session :: Session [StoredMeeting]
392+
session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement
393+
listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting)
394+
listStatement =
395+
refineResult
396+
(traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting))
397+
$ [vectorStatement|
398+
SELECT
399+
id :: uuid, title :: text, creator :: uuid,
400+
start_time :: timestamptz, end_time :: timestamptz,
401+
recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?,
402+
conversation_id :: uuid, invited_emails :: text[], trial :: boolean,
403+
created_at :: timestamptz, updated_at :: timestamptz
404+
FROM meetings
405+
WHERE end_date < ($1 :: timestamptz)
406+
ORDER BY end_date ASC
407+
LIMIT ($2 :: int4)
408+
|]
409+
410+
deleteMeetingBatchImpl ::
411+
( Member (Input Pool) r,
412+
Member (Embed IO) r,
413+
Member (Error UsageError) r
414+
) =>
415+
[MeetingId] ->
416+
Sem r Int64
417+
deleteMeetingBatchImpl meetingIds = do
418+
pool <- input
419+
result <- liftIO $ use pool session
420+
either throw pure result
421+
where
422+
session :: Session Int64
423+
session = statement (V.fromList (toUUID <$> meetingIds)) deleteStatement
424+
deleteStatement :: Statement (V.Vector UUID) Int64
425+
deleteStatement =
426+
[rowsAffectedStatement|
427+
DELETE FROM meetings
428+
WHERE (id, domain) IN (SELECT * FROM unnest($1::uuid[]))
429+
|]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{-# LANGUAGE TemplateHaskell #-}
2+
3+
-- This file is part of the Wire Server implementation.
4+
--
5+
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
6+
--
7+
-- This program is free software: you can redistribute it and/or modify it under
8+
-- the terms of the GNU Affero General Public License as published by the Free
9+
-- Software Foundation, either version 3 of the License, or (at your option) any
10+
-- later version.
11+
--
12+
-- This program is distributed in the hope that it will be useful, but WITHOUT
13+
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
15+
-- details.
16+
--
17+
-- You should have received a copy of the GNU Affero General Public License along
18+
-- with this program. If not, see <https://www.gnu.org/licenses/>.
19+
20+
module Wire.MeetingsSubsystemCleaning where
21+
22+
import Data.Time.Clock (UTCTime)
23+
import Imports
24+
import Polysemy
25+
26+
data MeetingsSubsystemCleaning m a where
27+
CleanupOldMeetings ::
28+
UTCTime ->
29+
Int ->
30+
MeetingsSubsystemCleaning m Int64
31+
32+
makeSem ''MeetingsSubsystemCleaning
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{-# LANGUAGE DuplicateRecordFields #-}
2+
3+
-- This file is part of the Wire Server implementation.
4+
--
5+
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
6+
--
7+
-- This program is free software: you can redistribute it and/or modify it under
8+
-- the terms of the GNU Affero General Public License as published by the Free
9+
-- Software Foundation, either version 3 of the License, or (at your option) any
10+
-- later version.
11+
--
12+
-- This program is distributed in the hope that it will be useful, but WITHOUT
13+
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
15+
-- details.
16+
--
17+
-- You should have received a copy of the GNU Affero General Public License along
18+
-- with this program. If not, see <https://www.gnu.org/licenses/>.
19+
20+
module Wire.MeetingsSubsystemCleaning.Interpreter where
21+
22+
import Data.Time.Clock (UTCTime)
23+
import Imports
24+
import Polysemy
25+
import Wire.API.Conversation (GroupConvType (MeetingConversation), cnvmGroupConvType)
26+
import Wire.ConversationStore qualified as ConvStore
27+
import Wire.MeetingsStore qualified as Store
28+
import Wire.MeetingsSubsystemCleaning
29+
import Wire.StoredConversation (StoredConversation (..))
30+
31+
interpretMeetingsSubsystemCleaning ::
32+
( Member Store.MeetingsStore r,
33+
Member ConvStore.ConversationStore r
34+
) =>
35+
InterpreterFor MeetingsSubsystemCleaning r
36+
interpretMeetingsSubsystemCleaning = interpret $ \case
37+
CleanupOldMeetings cutoffTime batchSize ->
38+
cleanupOldMeetingsImpl cutoffTime batchSize
39+
40+
cleanupOldMeetingsImpl ::
41+
( Member Store.MeetingsStore r,
42+
Member ConvStore.ConversationStore r
43+
) =>
44+
UTCTime ->
45+
Int ->
46+
Sem r Int64
47+
cleanupOldMeetingsImpl cutoffTime batchSize = do
48+
-- 1. Fetch old meetings
49+
oldMeetings <- Store.getOldMeetings cutoffTime batchSize
50+
51+
if null oldMeetings
52+
then pure 0
53+
else do
54+
-- 2. Extract meeting IDs and conversation IDs
55+
let meetingIds = map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings
56+
convIds = map (\Store.StoredMeeting {conversationId = cid} -> cid) oldMeetings
57+
58+
-- 3. Delete meetings from database
59+
deletedCount <- Store.deleteMeetingBatch meetingIds
60+
61+
-- 4. Delete associated conversations if they are meeting conversations
62+
-- We need to check if conversation has GroupConvType = MeetingConversation
63+
for_ (zip oldMeetings convIds) $ \(meeting, convId) -> do
64+
maybeConv <- ConvStore.getConversation convId
65+
case maybeConv of
66+
Just conv
67+
| conv.metadata.cnvmGroupConvType == Just MeetingConversation,
68+
conv.id_ == convId,
69+
meeting.conversationId == convId ->
70+
ConvStore.deleteConversation convId
71+
_ -> pure ()
72+
73+
pure deletedCount

libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,15 @@ inMemoryMeetingsStoreInterpreter = interpret $ \case
102102
updatedAt = now
103103
}
104104
modify (Map.insert mid updatedMeeting)
105+
GetOldMeetings cutoffTime batchSize ->
106+
gets $
107+
take batchSize
108+
. List.sortOn (.endTime)
109+
. filter (\sm -> sm.endTime < cutoffTime)
110+
. Map.elems
111+
DeleteMeetingBatch meetingIds -> do
112+
let deleteOne mid = do
113+
exists <- gets (Map.member mid)
114+
when exists $ modify (Map.delete mid)
115+
pure exists
116+
fromIntegral . length <$> traverse deleteOne meetingIds

libs/wire-subsystems/wire-subsystems.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ library
367367
Wire.MeetingsStore.Postgres
368368
Wire.MeetingsSubsystem
369369
Wire.MeetingsSubsystem.Interpreter
370+
Wire.MeetingsSubsystemCleaning
371+
Wire.MeetingsSubsystemCleaning.Interpreter
370372
Wire.Migration
371373
Wire.MigrationLock
372374
Wire.NotificationSubsystem

services/background-worker/background-worker.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ library
2121
Wire.BackgroundWorker.Options
2222
Wire.BackgroundWorker.Util
2323
Wire.DeadUserNotificationWatcher
24+
Wire.MeetingsCleanupWorker
2425
Wire.PostgresMigrations
2526

2627
hs-source-dirs: src
@@ -39,6 +40,7 @@ library
3940
, bytestring-conversion
4041
, cassandra-util
4142
, containers
43+
, cron
4244
, data-timeout
4345
, exceptions
4446
, extended
@@ -61,6 +63,7 @@ library
6163
, ssl-util
6264
, tagged
6365
, text
66+
, time
6467
, tinylog
6568
, transformers
6669
, transformers-base

services/background-worker/background-worker.integration.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ backgroundJobs:
6464
jobTimeout: 5s
6565
maxAttempts: 3
6666

67+
# Meetings cleanup configuration for integration
68+
meetingsCleanup:
69+
cleanOlderThanHours: 0.0014 # Clean meetings older than ~5 seconds
70+
batchSize: 100
71+
schedule: "* * * * *" # Run every minute
72+
6773
postgresMigration:
6874
conversation: postgresql
6975
conversationCodes: postgresql

0 commit comments

Comments
 (0)