Skip to content

Commit 2089fea

Browse files
hc-sousacursoragent
andcommitted
feat(ops): add AUTH_KEY endpoints to trigger feed sync tasks
Expose POST/GET /api/v1/ops/feeds/sync for inline debug runs or async Celery queueing of news, seismic, and trails sync jobs. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 14df5d4 commit 2089fea

5 files changed

Lines changed: 248 additions & 21 deletions

File tree

src/shared/feed_syncs.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"""Feed sync registry — shared by bootstrap command and ops HTTP triggers."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from typing import Any, Callable
7+
8+
logger = logging.getLogger(__name__)
9+
10+
FeedRunner = Callable[[str | None], dict[str, Any]]
11+
12+
FEED_LABELS: tuple[str, ...] = ('news', 'seismic', 'trails')
13+
14+
FEED_TASK_NAMES: dict[str, str] = {
15+
'news': 'news.poll_sources',
16+
'seismic': 'seismic.sync_events',
17+
'trails': 'trails.sync_open_data',
18+
}
19+
20+
21+
def _run_news(island_key: str | None) -> dict[str, Any]:
22+
from news.services import poll_all_sources
23+
24+
return {'status': 'ok', **poll_all_sources(island_key=island_key)}
25+
26+
27+
def _run_seismic(island_key: str | None) -> dict[str, Any]:
28+
from seismic.services import sync_all_events
29+
30+
return {'status': 'ok', **sync_all_events(island_key=island_key)}
31+
32+
33+
def _run_trails(island_key: str | None) -> dict[str, Any]:
34+
from trails.services import sync_all_open_data
35+
36+
return {'status': 'ok', **sync_all_open_data(island_key=island_key)}
37+
38+
39+
FEED_RUNNERS: dict[str, FeedRunner] = {
40+
'news': _run_news,
41+
'seismic': _run_seismic,
42+
'trails': _run_trails,
43+
}
44+
45+
46+
def _queue_news(island_key: str | None):
47+
from news.tasks import poll_sources_task
48+
49+
return poll_sources_task.delay(island_key=island_key)
50+
51+
52+
def _queue_seismic(island_key: str | None):
53+
from seismic.tasks import sync_events_task
54+
55+
return sync_events_task.delay(island_key=island_key)
56+
57+
58+
def _queue_trails(island_key: str | None):
59+
from trails.tasks import sync_open_data_task
60+
61+
return sync_open_data_task.delay(island_key=island_key)
62+
63+
64+
FEED_QUEUERS = {
65+
'news': _queue_news,
66+
'seismic': _queue_seismic,
67+
'trails': _queue_trails,
68+
}
69+
70+
71+
def normalize_feed_param(feed: str) -> list[str]:
72+
value = (feed or 'all').strip().lower()
73+
if value == 'all':
74+
return list(FEED_LABELS)
75+
if value in FEED_LABELS:
76+
return [value]
77+
raise ValueError(f'Unknown feed {feed!r}; use all or one of {", ".join(FEED_LABELS)}')
78+
79+
80+
def run_feed_sync(label: str, *, island_key: str | None = None) -> dict[str, Any]:
81+
runner = FEED_RUNNERS[label]
82+
return runner(island_key)
83+
84+
85+
def queue_feed_sync(label: str, *, island_key: str | None = None) -> dict[str, Any]:
86+
async_result = FEED_QUEUERS[label](island_key)
87+
return {
88+
'task': FEED_TASK_NAMES[label],
89+
'celery_task_id': async_result.id,
90+
}
91+
92+
93+
def trigger_feed_syncs(
94+
labels: list[str],
95+
*,
96+
island_key: str | None = None,
97+
run_async: bool = False,
98+
) -> dict[str, Any]:
99+
results: dict[str, Any] = {}
100+
for label in labels:
101+
try:
102+
if run_async:
103+
queued = queue_feed_sync(label, island_key=island_key)
104+
results[label] = {'ok': True, 'mode': 'async', **queued}
105+
else:
106+
payload = run_feed_sync(label, island_key=island_key)
107+
results[label] = {'ok': True, 'mode': 'sync', **payload}
108+
except Exception as exc:
109+
logger.exception('feed sync failed label=%s island=%s', label, island_key)
110+
results[label] = {
111+
'ok': False,
112+
'mode': 'async' if run_async else 'sync',
113+
'task': FEED_TASK_NAMES.get(label),
114+
'error': str(exc),
115+
'error_type': type(exc).__name__,
116+
}
117+
return results

src/shared/management/commands/bootstrap_feed_syncs.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,22 @@
66

77
from django.core.management.base import BaseCommand
88

9-
logger = logging.getLogger(__name__)
9+
from shared.feed_syncs import FEED_LABELS, queue_feed_sync
1010

11-
FEED_TASKS: tuple[tuple[str, str], ...] = (
12-
('news', 'news.poll_sources'),
13-
('seismic', 'seismic.sync_events'),
14-
('trails', 'trails.sync_open_data'),
15-
)
11+
logger = logging.getLogger(__name__)
1612

1713

1814
class Command(BaseCommand):
1915
help = 'Queue initial news, seismic, and trails feed sync tasks (runs after migrate on deploy).'
2016

2117
def handle(self, *args: object, **options: object) -> None:
2218
queued: list[str] = []
23-
for label, task_name in FEED_TASKS:
19+
for label in FEED_LABELS:
2420
try:
25-
if label == 'news':
26-
from news.tasks import poll_sources_task
27-
28-
poll_sources_task.delay()
29-
elif label == 'seismic':
30-
from seismic.tasks import sync_events_task
31-
32-
sync_events_task.delay()
33-
elif label == 'trails':
34-
from trails.tasks import sync_open_data_task
35-
36-
sync_open_data_task.delay()
37-
queued.append(task_name)
21+
info = queue_feed_sync(label)
22+
queued.append(info['task'])
3823
except Exception:
39-
logger.exception('bootstrap_feed_syncs failed to queue %s', task_name)
24+
logger.exception('bootstrap_feed_syncs failed to queue %s', label)
4025

4126
if queued:
4227
self.stdout.write(f'Queued feed sync tasks: {", ".join(queued)}')
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Ops feed sync trigger tests."""
2+
3+
from unittest.mock import patch
4+
5+
from django.conf import settings
6+
from django.test import TestCase, override_settings
7+
from rest_framework.test import APIClient
8+
9+
10+
@override_settings(AUTH_KEY='test-auth-key')
11+
class OpsFeedSyncTestCase(TestCase):
12+
def setUp(self):
13+
self.client = APIClient()
14+
self.url = '/api/v1/ops/feeds/sync'
15+
self.auth = {'QUERY_STRING': f'key={settings.AUTH_KEY}'}
16+
17+
def test_requires_auth_key(self):
18+
response = self.client.post(f'{self.url}?feed=news')
19+
self.assertEqual(response.status_code, 401)
20+
21+
def test_invalid_feed(self):
22+
response = self.client.post(f'{self.url}?key={settings.AUTH_KEY}&feed=bad')
23+
self.assertEqual(response.status_code, 400)
24+
25+
@patch('shared.feed_syncs.run_feed_sync')
26+
def test_sync_trails_inline(self, mock_run):
27+
mock_run.return_value = {
28+
'status': 'ok',
29+
'islands': 1,
30+
'trails_created': 2,
31+
'trails_updated': 0,
32+
'pois_created': 0,
33+
'pois_updated': 0,
34+
'skipped': 0,
35+
}
36+
response = self.client.post(
37+
f'{self.url}?key={settings.AUTH_KEY}&feed=trails&island=sao-miguel',
38+
)
39+
self.assertEqual(response.status_code, 200)
40+
body = response.json()
41+
self.assertTrue(body['ok'])
42+
self.assertFalse(body['async'])
43+
self.assertEqual(body['feeds']['trails']['mode'], 'sync')
44+
self.assertEqual(body['feeds']['trails']['trails_created'], 2)
45+
mock_run.assert_called_once_with('trails', island_key='sao-miguel')
46+
47+
@patch('shared.feed_syncs.queue_feed_sync')
48+
def test_async_queues_celery(self, mock_queue):
49+
mock_queue.return_value = {
50+
'task': 'trails.sync_open_data',
51+
'celery_task_id': 'abc-123',
52+
}
53+
response = self.client.post(
54+
f'{self.url}?key={settings.AUTH_KEY}&feed=trails&async=true',
55+
)
56+
self.assertEqual(response.status_code, 200)
57+
body = response.json()
58+
self.assertTrue(body['async'])
59+
self.assertEqual(body['feeds']['trails']['celery_task_id'], 'abc-123')
60+
61+
@patch('shared.feed_syncs.run_feed_sync')
62+
def test_sync_all_feeds(self, mock_run):
63+
mock_run.return_value = {'status': 'ok', 'created': 0}
64+
response = self.client.get(f'{self.url}?key={settings.AUTH_KEY}&feed=all')
65+
self.assertEqual(response.status_code, 200)
66+
self.assertEqual(mock_run.call_count, 3)
67+
68+
@patch('shared.feed_syncs.run_feed_sync', side_effect=RuntimeError('boom'))
69+
def test_sync_error_returns_502(self, _mock_run):
70+
response = self.client.post(f'{self.url}?key={settings.AUTH_KEY}&feed=news')
71+
self.assertEqual(response.status_code, 502)
72+
body = response.json()
73+
self.assertFalse(body['ok'])
74+
self.assertEqual(body['feeds']['news']['error_type'], 'RuntimeError')

src/tenancy/urls.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,9 @@
88
views.cancel_all_celery_jobs,
99
name='ops_celery_cancel_all',
1010
),
11+
path(
12+
'feeds/sync',
13+
views.trigger_feed_sync,
14+
name='ops_feed_sync',
15+
),
1116
]

src/tenancy/views.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,49 @@ def cancel_all_celery_jobs(request: Request) -> Response:
5151
**result,
5252
}
5353
)
54+
55+
56+
@api_view(['GET', 'POST'])
57+
@permission_classes([AllowAny])
58+
def trigger_feed_sync(request: Request) -> Response:
59+
"""
60+
Run or queue feed sync tasks (news, seismic, trails) for debugging.
61+
62+
Query params:
63+
- key / X-Auth-Key: AUTH_KEY (required)
64+
- feed: all | news | seismic | trails (default all)
65+
- island: island key slug, e.g. sao-miguel (optional)
66+
- async: true to queue on Celery; false to run inline and return counts (default false)
67+
"""
68+
denied = _require_auth_key(request)
69+
if denied:
70+
return denied
71+
72+
from shared.feed_syncs import FEED_LABELS, normalize_feed_param, trigger_feed_syncs
73+
74+
feed_param = request.query_params.get('feed', 'all')
75+
island_key = (request.query_params.get('island') or '').strip() or None
76+
run_async = request.query_params.get('async', 'false').lower() in ('1', 'true', 'yes')
77+
78+
try:
79+
labels = normalize_feed_param(feed_param)
80+
except ValueError as exc:
81+
return Response(
82+
{
83+
'error': str(exc),
84+
'allowed_feeds': ['all', *FEED_LABELS],
85+
},
86+
status=400,
87+
)
88+
89+
results = trigger_feed_syncs(labels, island_key=island_key, run_async=run_async)
90+
all_ok = all(item.get('ok') for item in results.values())
91+
return Response(
92+
{
93+
'ok': all_ok,
94+
'island_key': island_key,
95+
'async': run_async,
96+
'feeds': results,
97+
},
98+
status=200 if all_ok else 502,
99+
)

0 commit comments

Comments
 (0)