Skip to content

Commit 719cd6b

Browse files
authored
Merge pull request #9 from Boanerges1996/feat/cleanup-stale-conferences
feat: add management command to close stale ongoing conferences
2 parents b267fa4 + de092bd commit 719cd6b

4 files changed

Lines changed: 164 additions & 1 deletion

File tree

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,28 @@ The table below lists all environment variables used by the API. Variables marke
109109
| `GOOGLE_TASK_QUEUE_NAME` | No | - | Name of the Cloud Tasks queue (e.g. `queue-1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
110110
| `APP_ENGINE_LOCATION` | No | - | App Engine location (e.g. `us-east1`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
111111
| `TASK_QUEUE_DOMAIN` | No | - | Domain for task queue callbacks (e.g. `https://api.example.com/`). Required when `USE_GOOGLE_TASK_QUEUE` is `True`. |
112+
| **Conference Cleanup** | | | |
113+
| `CONFERENCE_TIMEOUT_HOURS` | No | `4` | Close ongoing conferences with no activity for this many hours. |
114+
| `ENABLE_INLINE_CONFERENCE_CLEANUP` | No | `false` | Set to `true` to run cleanup in a background thread (dev/single-process only). |
115+
| `CONFERENCE_CLEANUP_INTERVAL_SECONDS` | No | `3600` | How often the inline cleanup runs in seconds. Set to `0` to disable. Only applies when `ENABLE_INLINE_CONFERENCE_CLEANUP` is `true`. |
116+
117+
### Stale Conference Cleanup
118+
119+
Conferences can get stuck as "Ongoing" if the client fails to send the end signal (browser tab closed, network drop). The `cleanup_stale_conferences` management command closes conferences with no activity for a configurable period.
120+
121+
```bash
122+
python manage.py cleanup_stale_conferences # default 4 hour threshold
123+
python manage.py cleanup_stale_conferences --hours 2 # custom threshold
124+
python manage.py cleanup_stale_conferences --dry-run # preview without changes
125+
```
126+
127+
**Development:** Set `ENABLE_INLINE_CONFERENCE_CLEANUP=true` to run the cleanup automatically in a background thread inside the API process.
128+
129+
**Production (multi-worker):** Do not use the inline loop with multiple gunicorn workers as each worker runs its own cleanup loop. Instead, use an external scheduler:
130+
131+
- **Kubernetes:** CronJob running `python manage.py cleanup_stale_conferences`
132+
- **Docker Compose:** Host cron or a sidecar container on a timer
133+
- **ECS:** Scheduled task via EventBridge
112134

113135
**Example `.env` file for local development:**
114136

app/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
default_app_config = 'app.apps.UsersConfig'

app/apps.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,52 @@
1+
import logging
2+
import os
3+
import threading
4+
import time
5+
16
from django.apps import AppConfig
27

8+
logger = logging.getLogger(__name__)
9+
10+
_cleanup_started = False
11+
_cleanup_lock = threading.Lock()
12+
313

414
class UsersConfig(AppConfig):
5-
name = 'api'
15+
name = 'app'
16+
17+
def ready(self):
18+
global _cleanup_started
19+
20+
# The inline cleanup loop is for dev/single-process only.
21+
# In production with multiple gunicorn workers, each worker forks
22+
# a separate process so the threading lock cannot prevent duplicate
23+
# loops. Use an external scheduler instead:
24+
# python manage.py cleanup_stale_conferences
25+
# Set ENABLE_INLINE_CONFERENCE_CLEANUP=true to enable the loop.
26+
if os.getenv('ENABLE_INLINE_CONFERENCE_CLEANUP', '').lower() != 'true':
27+
return
28+
29+
interval = int(os.getenv('CONFERENCE_CLEANUP_INTERVAL_SECONDS', 3600))
30+
if interval <= 0:
31+
return
32+
33+
with _cleanup_lock:
34+
if _cleanup_started:
35+
return
36+
_cleanup_started = True
37+
38+
def cleanup_loop():
39+
time.sleep(60)
40+
41+
while True:
42+
try:
43+
from app.management.commands.cleanup_stale_conferences import Command
44+
hours = int(os.getenv('CONFERENCE_TIMEOUT_HOURS', 4))
45+
Command().handle(hours=hours, dry_run=False, verbosity=0)
46+
except Exception as e:
47+
logger.warning(f'[ConferenceCleanup] Error: {e}')
48+
49+
time.sleep(interval)
50+
51+
thread = threading.Thread(target=cleanup_loop, daemon=True, name='conference-cleanup')
52+
thread.start()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import datetime
2+
import os
3+
4+
from django.core.management.base import BaseCommand
5+
from django.db import transaction
6+
from django.db.models import Max, Subquery, OuterRef
7+
8+
from app.models.conference import Conference
9+
from app.models.generic_event import GenericEvent
10+
11+
12+
DEFAULT_TIMEOUT_HOURS = 4
13+
14+
15+
class Command(BaseCommand):
16+
help = 'Close conferences with no activity for longer than the specified hours'
17+
18+
def add_arguments(self, parser):
19+
parser.add_argument(
20+
'--hours',
21+
type=int,
22+
default=int(os.getenv('CONFERENCE_TIMEOUT_HOURS', DEFAULT_TIMEOUT_HOURS)),
23+
help=f'Close conferences with no activity for this many hours (default: {DEFAULT_TIMEOUT_HOURS}, env: CONFERENCE_TIMEOUT_HOURS)',
24+
)
25+
parser.add_argument(
26+
'--dry-run',
27+
action='store_true',
28+
help='Show what would be closed without making changes',
29+
)
30+
31+
def handle(self, *args, **options):
32+
hours = options['hours']
33+
dry_run = options['dry_run']
34+
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
35+
now = datetime.datetime.utcnow()
36+
37+
# Single annotated query to get last activity per conference (avoids N+1)
38+
ongoing = Conference.objects.filter(ongoing=True).annotate(
39+
last_event_at=Subquery(
40+
GenericEvent.objects.filter(conference=OuterRef('pk'))
41+
.order_by('-created_at')
42+
.values('created_at')[:1]
43+
),
44+
last_connection_at=Max('connections__created_at'),
45+
)
46+
47+
if not ongoing.exists():
48+
self.stdout.write('No ongoing conferences found.')
49+
return
50+
51+
stale = []
52+
for conference in ongoing:
53+
last_activity = conference.last_event_at or conference.last_connection_at or conference.created_at
54+
55+
if last_activity < cutoff:
56+
idle = now - last_activity
57+
stale.append((conference, last_activity, idle))
58+
59+
if not stale:
60+
self.stdout.write(f'{ongoing.count()} ongoing conferences found, all have recent activity.')
61+
return
62+
63+
self.stdout.write(f'Found {len(stale)} conferences with no activity for more than {hours} hours.')
64+
65+
closed = 0
66+
failed = 0
67+
68+
for conference, last_activity, idle in stale:
69+
self.stdout.write(f' {conference.id} ({conference.conference_name or conference.conference_id}) - last activity {idle} ago')
70+
71+
if not dry_run:
72+
try:
73+
with transaction.atomic():
74+
for connection in conference.connections.filter(end_time__isnull=True):
75+
connection.end(now)
76+
connection.save()
77+
78+
for session in conference.sessions.filter(end_time__isnull=True):
79+
session.should_stop_call(now)
80+
session.save()
81+
82+
conference.should_stop_call(now)
83+
conference.save()
84+
85+
closed += 1
86+
except Exception as e:
87+
failed += 1
88+
self.stderr.write(f' Error closing {conference.id}: {e}')
89+
90+
if dry_run:
91+
self.stdout.write(f'\nDry run - no changes made. Run without --dry-run to close these conferences.')
92+
else:
93+
self.stdout.write(f'\nClosed {closed} stale conferences.' + (f' {failed} failed.' if failed else ''))

0 commit comments

Comments
 (0)