Skip to content

Commit 525b82e

Browse files
committed
WPB-24076: Add meeting cleaner job in background-worker
1 parent 0b26efe commit 525b82e

20 files changed

Lines changed: 491 additions & 3 deletions

File tree

changelog.d/5-internal/WPB-24076

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add meeting cleaner job in `background-worker`.

charts/wire-server/templates/background-worker/configmap.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ data:
9090
{{toYaml .backendNotificationPusher | indent 6 }}
9191
{{- with .backgroundJobs }}
9292
backgroundJobs:
93+
{{ toYaml . | indent 6 }}
94+
{{- end }}
95+
{{- with .meetingsCleanup }}
96+
meetingsCleanup:
9397
{{ toYaml . | indent 6 }}
9498
{{- end }}
9599
{{- if .postgresMigration }}

charts/wire-server/values.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,15 @@ background-worker:
977977
# Total attempts, including the first try
978978
maxAttempts: 3
979979

980+
# Meetings cleanup configuration
981+
meetingsCleanup:
982+
# Delete meetings older than this many hours (48 hours = 2 days)
983+
cleanOlderThanHours: 48.0
984+
# Maximum number of meetings to delete per batch
985+
batchSize: 1000
986+
# Cron schedule for the cleanup job (0 * * * * = every hour)
987+
schedule: "0 * * * *"
988+
980989
# Controls where conversation data is stored/accessed
981990
postgresMigration:
982991
conversation: cassandra

integration/test/Test/Meetings.hs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ module Test.Meetings where
44

55
import API.Galley
66
import qualified API.GalleyInternal as I
7+
import Control.Monad.Reader (ask)
8+
import qualified Data.Text as Text
9+
import qualified Data.Text.Encoding as Text
710
import Data.Time.Clock
811
import qualified Data.Time.Format as Time
912
import SetupHelpers
13+
import System.Timeout (timeout)
1014
import Testlib.Prelude
15+
import Text.Regex.TDFA ((=~))
16+
import UnliftIO.Concurrent (threadDelay)
1117

1218
-- Helper to extract meetingId and domain from a meeting JSON object
1319
getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String)
@@ -359,3 +365,93 @@ testMeetingDeleteUnauthorized = do
359365
meeting <- getJSON 201 r1
360366
(meetingId, domain) <- getMeetingIdAndDomain meeting
361367
deleteMeeting otherUser domain meetingId >>= assertStatus 404
368+
369+
testMeetingCleanup :: (HasCallStack) => App ()
370+
testMeetingCleanup = do
371+
env <- ask
372+
timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do
373+
-- 2 minutes timeout
374+
(owner, _tid, _members) <- createTeam OwnDomain 1
375+
now <- liftIO getCurrentTime
376+
-- Create a meeting that ends now.
377+
-- Configured retention is 0.0014 hours (~5 seconds).
378+
-- cutoffTime will be now' - 5s.
379+
-- We need end_date < cutoffTime.
380+
-- If we wait 6 seconds, now' = now + 6s.
381+
-- cutoffTime = now + 6s - 5s = now + 1s.
382+
-- end_date (now) < cutoffTime (now + 1s).
383+
let startTime = addUTCTime (negate 3600) now
384+
endTime = now
385+
newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime []
386+
387+
r1 <- postMeetings owner newMeeting
388+
assertSuccess r1
389+
meeting <- getJSON 201 r1
390+
(meetingId, domain) <- getMeetingIdAndDomain meeting
391+
392+
-- Wait 6 seconds to ensure meeting is old enough
393+
liftIO $ threadDelay 6_000_000
394+
395+
-- Wait for cleanup job to run
396+
waitForCleanupJob OwnDomain
397+
398+
-- Check it's gone
399+
getMeeting owner domain meetingId >>= assertStatus 404
400+
401+
case timedOutResult of
402+
Just () -> pure ()
403+
Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes"
404+
405+
waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App ()
406+
waitForCleanupJob domain = do
407+
initialMetrics <- getMetricsBody domain
408+
let initialCount = getRunCount initialMetrics
409+
410+
waitForIncrease domain initialCount
411+
where
412+
getMetricsBody d = do
413+
getMetrics d BackgroundWorker `bindResponse` \resp -> do
414+
resp.status `shouldMatchInt` 200
415+
pure $ Text.unpack $ Text.decodeUtf8 resp.body
416+
417+
getRunCount metrics =
418+
let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)")
419+
in case matches of
420+
[val] -> read val :: Int
421+
_ -> 0
422+
423+
waitForIncrease d oldVal = do
424+
metrics <- getMetricsBody d
425+
let newVal = getRunCount metrics
426+
-- We wait until it increases.
427+
-- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run.
428+
-- If it runs, it should become >= 1.
429+
-- But wait, if matches is empty, we return 0.
430+
-- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0).
431+
-- If it runs, it increments.
432+
when (newVal <= oldVal) $ do
433+
liftIO $ threadDelay 1_000_000 -- Wait 1s
434+
waitForIncrease d oldVal
435+
436+
testMeetingExpiration :: (HasCallStack) => App ()
437+
testMeetingExpiration = do
438+
(owner, _tid, _members) <- createTeam OwnDomain 1
439+
now <- liftIO getCurrentTime
440+
let startTime = addUTCTime (negate 3600) now
441+
-- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml
442+
endTime = now
443+
newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime []
444+
445+
r1 <- postMeetings owner newMeeting
446+
assertSuccess r1
447+
meeting <- getJSON 201 r1
448+
(meetingId, domain) <- getMeetingIdAndDomain meeting
449+
450+
-- Check it is accessible immediately (endDate = now, so valid until now + 5s)
451+
getMeeting owner domain meetingId >>= assertStatus 200
452+
453+
-- Wait 6 seconds
454+
liftIO $ threadDelay 6_000_000
455+
456+
-- Check it is expired
457+
getMeeting owner domain meetingId >>= assertStatus 404

libs/wire-subsystems/src/Wire/MeetingsStore.hs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,5 +167,13 @@ data MeetingsStore m a where
167167
MeetingId ->
168168
[EmailAddress] ->
169169
MeetingsStore m ()
170+
-- Cleanup operations
171+
GetOldMeetings ::
172+
UTCTime ->
173+
Int ->
174+
MeetingsStore m [StoredMeeting]
175+
DeleteMeetingBatch ::
176+
[MeetingId] ->
177+
MeetingsStore m Int64
170178

171179
makeSem ''MeetingsStore

libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ interpretMeetingsStoreToPostgres =
6565
addInvitedEmailsImpl meetingId email
6666
RemoveInvitedEmails meetingId emails ->
6767
removeInvitedEmailsImpl meetingId emails
68+
GetOldMeetings cutoffTime batchSize ->
69+
getOldMeetingsImpl cutoffTime batchSize
70+
DeleteMeetingBatch meetingIds ->
71+
deleteMeetingBatchImpl meetingIds
6872

6973
-- * Create
7074

@@ -395,3 +399,56 @@ removeInvitedEmailsImpl meetingId emails = do
395399
updated_at = NOW()
396400
WHERE id = ($2 :: uuid)
397401
|]
402+
403+
getOldMeetingsImpl ::
404+
( Member (Input Pool) r,
405+
Member (Embed IO) r,
406+
Member (Error UsageError) r
407+
) =>
408+
UTCTime ->
409+
Int ->
410+
Sem r [StoredMeeting]
411+
getOldMeetingsImpl cutoffTime batchSize = do
412+
pool <- input
413+
result <- liftIO $ use pool session
414+
either throw pure result
415+
where
416+
session :: Session [StoredMeeting]
417+
session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement
418+
listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting)
419+
listStatement =
420+
refineResult
421+
(traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting))
422+
$ [vectorStatement|
423+
SELECT
424+
id :: uuid, title :: text, creator :: uuid,
425+
start_time :: timestamptz, end_time :: timestamptz,
426+
recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?,
427+
conversation_id :: uuid, invited_emails :: text[], trial :: boolean,
428+
created_at :: timestamptz, updated_at :: timestamptz
429+
FROM meetings
430+
WHERE end_time < ($1 :: timestamptz)
431+
ORDER BY end_time ASC
432+
LIMIT ($2 :: int4)
433+
|]
434+
435+
deleteMeetingBatchImpl ::
436+
( Member (Input Pool) r,
437+
Member (Embed IO) r,
438+
Member (Error UsageError) r
439+
) =>
440+
[MeetingId] ->
441+
Sem r Int64
442+
deleteMeetingBatchImpl meetingIds = do
443+
pool <- input
444+
result <- liftIO $ use pool session
445+
either throw pure result
446+
where
447+
session :: Session Int64
448+
session = statement (V.fromList (toUUID <$> meetingIds)) deleteStatement
449+
deleteStatement :: Statement (V.Vector UUID) Int64
450+
deleteStatement =
451+
[rowsAffectedStatement|
452+
DELETE FROM meetings
453+
WHERE id IN (SELECT unnest($1::uuid[]))
454+
|]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{-# LANGUAGE TemplateHaskell #-}
2+
3+
-- This file is part of the Wire Server implementation.
4+
--
5+
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
6+
--
7+
-- This program is free software: you can redistribute it and/or modify it under
8+
-- the terms of the GNU Affero General Public License as published by the Free
9+
-- Software Foundation, either version 3 of the License, or (at your option) any
10+
-- later version.
11+
--
12+
-- This program is distributed in the hope that it will be useful, but WITHOUT
13+
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
15+
-- details.
16+
--
17+
-- You should have received a copy of the GNU Affero General Public License along
18+
-- with this program. If not, see <https://www.gnu.org/licenses/>.
19+
20+
module Wire.MeetingsSubsystemCleaning where
21+
22+
import Data.Time.Clock (UTCTime)
23+
import Imports
24+
import Polysemy
25+
26+
data MeetingsSubsystemCleaning m a where
27+
CleanupOldMeetings ::
28+
UTCTime ->
29+
Int ->
30+
MeetingsSubsystemCleaning m Int64
31+
32+
makeSem ''MeetingsSubsystemCleaning
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{-# LANGUAGE DuplicateRecordFields #-}
2+
3+
-- This file is part of the Wire Server implementation.
4+
--
5+
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
6+
--
7+
-- This program is free software: you can redistribute it and/or modify it under
8+
-- the terms of the GNU Affero General Public License as published by the Free
9+
-- Software Foundation, either version 3 of the License, or (at your option) any
10+
-- later version.
11+
--
12+
-- This program is distributed in the hope that it will be useful, but WITHOUT
13+
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14+
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
15+
-- details.
16+
--
17+
-- You should have received a copy of the GNU Affero General Public License along
18+
-- with this program. If not, see <https://www.gnu.org/licenses/>.
19+
20+
module Wire.MeetingsSubsystemCleaning.Interpreter where
21+
22+
import Data.Time.Clock (UTCTime)
23+
import Imports
24+
import Polysemy
25+
import Wire.API.Conversation (GroupConvType (MeetingConversation), cnvmGroupConvType)
26+
import Wire.ConversationStore qualified as ConvStore
27+
import Wire.MeetingsStore qualified as Store
28+
import Wire.MeetingsSubsystemCleaning
29+
import Wire.StoredConversation (StoredConversation (..))
30+
31+
interpretMeetingsSubsystemCleaning ::
32+
( Member Store.MeetingsStore r,
33+
Member ConvStore.ConversationStore r
34+
) =>
35+
InterpreterFor MeetingsSubsystemCleaning r
36+
interpretMeetingsSubsystemCleaning = interpret $ \case
37+
CleanupOldMeetings cutoffTime batchSize ->
38+
cleanupOldMeetingsImpl cutoffTime batchSize
39+
40+
cleanupOldMeetingsImpl ::
41+
( Member Store.MeetingsStore r,
42+
Member ConvStore.ConversationStore r
43+
) =>
44+
UTCTime ->
45+
Int ->
46+
Sem r Int64
47+
cleanupOldMeetingsImpl cutoffTime batchSize = do
48+
-- 1. Fetch old meetings
49+
oldMeetings <- Store.getOldMeetings cutoffTime batchSize
50+
51+
if null oldMeetings
52+
then pure 0
53+
else do
54+
-- 2. Extract meeting IDs and conversation IDs
55+
let meetingIds = map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings
56+
convIds = map (\Store.StoredMeeting {conversationId = cid} -> cid) oldMeetings
57+
58+
-- 3. Delete meetings from database
59+
deletedCount <- Store.deleteMeetingBatch meetingIds
60+
61+
-- 4. Delete associated conversations if they are meeting conversations
62+
-- We need to check if conversation has GroupConvType = MeetingConversation
63+
for_ (zip oldMeetings convIds) $ \(meeting, convId) -> do
64+
maybeConv <- ConvStore.getConversation convId
65+
case maybeConv of
66+
Just conv
67+
| conv.metadata.cnvmGroupConvType == Just MeetingConversation,
68+
conv.id_ == convId,
69+
meeting.conversationId == convId ->
70+
ConvStore.deleteConversation convId
71+
_ -> pure ()
72+
73+
pure deletedCount

libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,15 @@ inMemoryMeetingsStoreInterpreter = interpret $ \case
103103
}
104104
modify (Map.insert mid updatedMeeting)
105105
DeleteMeeting mid -> modify (Map.delete mid)
106+
GetOldMeetings cutoffTime batchSize ->
107+
gets $
108+
take batchSize
109+
. List.sortOn (.endTime)
110+
. filter (\sm -> sm.endTime < cutoffTime)
111+
. Map.elems
112+
DeleteMeetingBatch meetingIds -> do
113+
let deleteOne mid = do
114+
exists <- gets (Map.member mid)
115+
when exists $ modify (Map.delete mid)
116+
pure exists
117+
fromIntegral . length <$> traverse deleteOne meetingIds

libs/wire-subsystems/wire-subsystems.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ library
367367
Wire.MeetingsStore.Postgres
368368
Wire.MeetingsSubsystem
369369
Wire.MeetingsSubsystem.Interpreter
370+
Wire.MeetingsSubsystemCleaning
371+
Wire.MeetingsSubsystemCleaning.Interpreter
370372
Wire.Migration
371373
Wire.MigrationLock
372374
Wire.NotificationSubsystem

0 commit comments

Comments
 (0)