Skip to content

Commit 08d3bfc

Browse files
feat: introduce send_on_commit and send_async helpers in send_event()
Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 1796c5c commit 08d3bfc

6 files changed

Lines changed: 411 additions & 18 deletions

File tree

CHANGELOG.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,26 @@ Change Log
1616
Unreleased
1717
__________
1818

19+
Added
20+
~~~~~
21+
22+
* ``OpenEdxPublicSignal.send_event`` now accepts two new keyword arguments:
23+
24+
* ``send_on_commit`` (bool, default ``False``): defers sending the event until
25+
the current database transaction commits, using ``django.db.transaction.on_commit``.
26+
Sends immediately if there is no open transaction, and does not send at all
27+
if the transaction rolls back.
28+
* ``send_async`` (bool, default ``False``): sends the event from a Celery
29+
task instead of the caller's thread. Event data is serialized with the Avro
30+
serializer so that attrs-based payload classes can round-trip through
31+
Celery's JSON transport. Requires a Celery worker that imports
32+
``openedx_events.tasks``.
33+
34+
The two options compose: combining them defers the Celery dispatch until the
35+
transaction commits.
36+
37+
* Added ``celery`` to ``requirements/base.in``.
38+
1939
[11.2.0] - 2026-04-20
2040
---------------------
2141

docs/how-tos/create-a-new-event.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,25 @@ Here is how the integration could look like:
248248
- Ensure that the event is triggered consistently and only when the event should be triggered. Avoid triggering the event multiple times for the same event unless necessary, e.g., when there is no other way to ensure that the event is triggered consistently.
249249
- Try placing the event after the triggering logic completes successfully to ensure that it is triggered only when needed. This will help ensure that the event is triggered only for factual events. If the triggering logic fails, the event should not be triggered.
250250

251+
Deferring and Backgrounding Event Sends
252+
-----------------------------------------
253+
254+
``send_event`` accepts two optional keyword arguments that control *when* and *where* receivers run:
255+
256+
- ``send_on_commit=True``: Defer the send until the current database transaction commits (via ``django.db.transaction.on_commit``). If there is no open transaction, the event is sent immediately. If the transaction rolls back, the event is not sent. Use this when the event reports a database change that may still be rolled back, to avoid "counterfactual" events.
257+
258+
- ``send_async=True``: Send the event from an asynchronous Celery task instead of the caller's thread. Event data is serialized with the Avro serializer (so that ``attrs`` payload classes round-trip through Celery's JSON transport) and handed off to ``openedx_events.tasks.send_async_event``. Use this when receivers are slow, non-critical, or you want to decouple their cost from the triggering request. Requires a Celery worker that imports ``openedx_events.tasks``.
259+
260+
Both options can be combined. For example, to send a notification event only if the enrollment commits, and to run the (possibly slow) receivers in a worker:
261+
262+
.. code-block:: python
263+
264+
COURSE_ENROLLMENT_CREATED.send_event(
265+
enrollment=enrollment_data,
266+
send_on_commit=True,
267+
send_async=True,
268+
)
269+
251270
Step 7: Test the Event
252271
========================
253272

openedx_events/tasks.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""
2+
Celery tasks for sending Open edX events asynchronously.
3+
"""
4+
5+
import base64
6+
from logging import getLogger
7+
8+
from celery import shared_task
9+
10+
log = getLogger(__name__)
11+
12+
13+
@shared_task(name="openedx_events.tasks.send_async_event")
14+
def send_async_event(event_type, metadata_json, event_data_b64) -> None:
15+
"""
16+
Re-send an event from a Celery worker.
17+
18+
Event data is serialized using the Avro serializer (because event payloads
19+
are typically attrs classes that are not JSON-serializable) and base64-encoded
20+
so that it can be passed as a JSON-safe argument to Celery. Metadata is
21+
passed as a JSON string.
22+
23+
Arguments:
24+
event_type (str): The event type of the signal to re-send.
25+
metadata_json (str): JSON-serialized EventsMetadata.
26+
event_data_b64 (str): Base64-encoded Avro-serialized event data.
27+
"""
28+
# Imported here to avoid a circular import at module load time.
29+
from .data import EventsMetadata # pylint: disable=import-outside-toplevel
30+
from .event_bus.avro.deserializer import ( # pylint: disable=import-outside-toplevel
31+
deserialize_bytes_to_event_data,
32+
)
33+
from .tooling import OpenEdxPublicSignal # pylint: disable=import-outside-toplevel
34+
35+
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
36+
metadata = EventsMetadata.from_json(metadata_json)
37+
event_data = deserialize_bytes_to_event_data(
38+
base64.b64decode(event_data_b64), signal
39+
)
40+
signal._send_event_with_metadata( # pylint: disable=protected-access
41+
metadata=metadata,
42+
**event_data,
43+
)

openedx_events/tests/test_tooling.py

Lines changed: 227 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,23 @@
44
Classes:
55
EventsToolingTest: Test events tooling.
66
"""
7+
import base64
78
import datetime
89
import sys
910
from contextlib import contextmanager
10-
from unittest.mock import Mock, patch
11+
from unittest.mock import ANY, Mock, patch
1112
from uuid import UUID, uuid1
1213

1314
import attr
1415
import ddt
1516
import pytest
16-
from django.test import TestCase, override_settings
17+
from django.db import transaction
18+
from django.test import TestCase, TransactionTestCase, override_settings
1719

1820
from openedx_events.data import EventsMetadata
1921
from openedx_events.exceptions import SenderValidationError
22+
from openedx_events.learning.data import UserData, UserPersonalData
23+
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
2024
from openedx_events.testing import FreezeSignalCacheMixin
2125
from openedx_events.tooling import OpenEdxPublicSignal, _process_all_signals_modules, load_all_signals
2226

@@ -257,7 +261,8 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata):
257261

258262
assert response == expected_response
259263
mock_send_event_with_metadata.assert_called_once_with(
260-
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True
264+
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True,
265+
send_on_commit=False, send_async=False,
261266
)
262267

263268
@ddt.data(
@@ -333,6 +338,225 @@ def test_send_event_disabled(self, send_mock):
333338
self.assertListEqual([], result)
334339

335340

341+
def _make_user_data():
342+
"""Build a UserData payload for the real SESSION_LOGIN_COMPLETED signal."""
343+
return UserData(
344+
pii=UserPersonalData(username="alice", email="alice@example.com", name="Alice"),
345+
id=1,
346+
is_active=True,
347+
)
348+
349+
350+
class SendEventOnCommitTests(TestCase):
351+
"""
352+
Tests for the ``send_on_commit`` parameter of ``send_event``.
353+
354+
Uses ``TestCase.captureOnCommitCallbacks`` to observe callbacks that
355+
``transaction.on_commit`` registers inside the outer test-level
356+
transaction.
357+
"""
358+
359+
def setUp(self):
360+
super().setUp()
361+
self.receiver = Mock(return_value="ok")
362+
SESSION_LOGIN_COMPLETED.connect(self.receiver)
363+
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)
364+
365+
def test_send_on_commit_defers_until_commit(self):
366+
"""
367+
With ``send_on_commit=True`` inside a transaction, the receiver is
368+
only called once the enclosing transaction commits.
369+
"""
370+
with self.captureOnCommitCallbacks(execute=False) as callbacks:
371+
result = SESSION_LOGIN_COMPLETED.send_event(
372+
send_on_commit=True, user=_make_user_data(),
373+
)
374+
375+
self.receiver.assert_not_called()
376+
self.assertEqual(result, [])
377+
self.assertEqual(len(callbacks), 1)
378+
379+
# Now "commit" and verify receiver runs.
380+
for cb in callbacks:
381+
cb()
382+
self.receiver.assert_called_once()
383+
384+
def test_send_on_commit_callback_runs_receiver(self):
385+
"""
386+
Executing the captured ``on_commit`` callback (simulating a commit)
387+
triggers the receiver, verifying that the work registered under the
388+
callback is the actual signal send.
389+
"""
390+
with self.captureOnCommitCallbacks(execute=True):
391+
SESSION_LOGIN_COMPLETED.send_event(
392+
send_on_commit=True, user=_make_user_data(),
393+
)
394+
self.receiver.assert_called_once()
395+
396+
397+
class SendEventOnCommitRollbackTests(TransactionTestCase):
398+
"""
399+
Tests that ``send_on_commit`` suppresses sending when the transaction
400+
rolls back. Uses ``TransactionTestCase`` so transactions actually commit
401+
and roll back.
402+
"""
403+
404+
def setUp(self):
405+
super().setUp()
406+
self.receiver = Mock(return_value="ok")
407+
SESSION_LOGIN_COMPLETED.connect(self.receiver)
408+
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)
409+
410+
def test_send_on_commit_immediate_when_no_transaction(self):
411+
"""
412+
Outside any transaction, ``send_on_commit=True`` sends immediately
413+
(per Django's ``transaction.on_commit`` contract).
414+
"""
415+
SESSION_LOGIN_COMPLETED.send_event(
416+
send_on_commit=True, user=_make_user_data(),
417+
)
418+
self.receiver.assert_called_once()
419+
420+
def test_send_on_commit_not_sent_on_rollback(self):
421+
"""
422+
If the transaction rolls back, the on_commit callback is never run,
423+
so the event is not sent.
424+
"""
425+
class _Rollback(Exception):
426+
pass
427+
428+
with self.assertRaises(_Rollback):
429+
with transaction.atomic():
430+
SESSION_LOGIN_COMPLETED.send_event(
431+
send_on_commit=True, user=_make_user_data(),
432+
)
433+
raise _Rollback()
434+
435+
self.receiver.assert_not_called()
436+
437+
438+
class SendEventAsyncTests(TestCase):
439+
"""
440+
Tests for the ``send_async`` parameter of ``send_event``.
441+
"""
442+
443+
def setUp(self):
444+
super().setUp()
445+
self.receiver = Mock(return_value="ok")
446+
SESSION_LOGIN_COMPLETED.connect(self.receiver)
447+
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)
448+
449+
@patch("openedx_events.tasks.send_async_event.delay")
450+
def test_send_async_dispatches_celery_task(self, mock_delay):
451+
"""
452+
With ``send_async=True``, the receiver is not called synchronously.
453+
Instead a Celery task is dispatched with the serialized event data.
454+
"""
455+
result = SESSION_LOGIN_COMPLETED.send_event(
456+
send_async=True, user=_make_user_data(),
457+
)
458+
459+
self.assertEqual(result, [])
460+
self.receiver.assert_not_called()
461+
mock_delay.assert_called_once_with(
462+
SESSION_LOGIN_COMPLETED.event_type, ANY, ANY,
463+
)
464+
_, metadata_json, event_data_b64 = mock_delay.call_args.args
465+
# The metadata round-trips through JSON.
466+
self.assertEqual(
467+
EventsMetadata.from_json(metadata_json).event_type,
468+
SESSION_LOGIN_COMPLETED.event_type,
469+
)
470+
# The event data is valid base64.
471+
base64.b64decode(event_data_b64)
472+
473+
def test_send_async_task_sends_event(self):
474+
"""
475+
End-to-end: when ``send_async=True``, running the dispatched Celery
476+
task synchronously delivers the event to the receiver with the same
477+
metadata and a payload that round-trips through Avro.
478+
"""
479+
captured = {}
480+
481+
def _capture_delay(event_type, metadata_json, event_data_b64):
482+
captured["args"] = (event_type, metadata_json, event_data_b64)
483+
484+
with patch("openedx_events.tasks.send_async_event.delay", side_effect=_capture_delay):
485+
SESSION_LOGIN_COMPLETED.send_event(
486+
send_async=True, user=_make_user_data(),
487+
)
488+
489+
self.receiver.assert_not_called()
490+
491+
# Now run the task body directly, simulating a Celery worker.
492+
from openedx_events.tasks import send_async_event # pylint: disable=import-outside-toplevel
493+
send_async_event(*captured["args"])
494+
495+
self.receiver.assert_called_once()
496+
call_kwargs = self.receiver.call_args.kwargs
497+
self.assertEqual(call_kwargs["signal"], SESSION_LOGIN_COMPLETED)
498+
self.assertEqual(call_kwargs["user"], _make_user_data())
499+
self.assertEqual(call_kwargs["metadata"].event_type, SESSION_LOGIN_COMPLETED.event_type)
500+
self.assertEqual(call_kwargs["from_event_bus"], False)
501+
502+
503+
class SendEventAsyncOnCommitTests(TestCase):
504+
"""
505+
Tests combining ``send_on_commit=True`` with ``send_async=True``: the
506+
Celery task dispatch should be deferred until the transaction commits.
507+
"""
508+
509+
def setUp(self):
510+
super().setUp()
511+
self.receiver = Mock(return_value="ok")
512+
SESSION_LOGIN_COMPLETED.connect(self.receiver)
513+
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)
514+
515+
@patch("openedx_events.tasks.send_async_event.delay")
516+
def test_async_on_commit_defers_dispatch(self, mock_delay):
517+
"""
518+
``send_async=True`` + ``send_on_commit=True`` in a transaction: the
519+
Celery dispatch is registered as an on_commit callback, not invoked
520+
immediately.
521+
"""
522+
with self.captureOnCommitCallbacks(execute=False) as callbacks:
523+
SESSION_LOGIN_COMPLETED.send_event(
524+
send_async=True, send_on_commit=True, user=_make_user_data(),
525+
)
526+
527+
mock_delay.assert_not_called()
528+
self.assertEqual(len(callbacks), 1)
529+
530+
for cb in callbacks:
531+
cb()
532+
mock_delay.assert_called_once()
533+
534+
535+
class SendEventAsyncOnCommitRollbackTests(TransactionTestCase):
536+
"""Rollback behavior for ``send_async`` + ``send_on_commit``."""
537+
538+
def setUp(self):
539+
super().setUp()
540+
self.receiver = Mock(return_value="ok")
541+
SESSION_LOGIN_COMPLETED.connect(self.receiver)
542+
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)
543+
544+
@patch("openedx_events.tasks.send_async_event.delay")
545+
def test_async_on_commit_not_dispatched_on_rollback(self, mock_delay):
546+
class _Rollback(Exception):
547+
pass
548+
549+
with self.assertRaises(_Rollback):
550+
with transaction.atomic():
551+
SESSION_LOGIN_COMPLETED.send_event(
552+
send_async=True, send_on_commit=True, user=_make_user_data(),
553+
)
554+
raise _Rollback()
555+
556+
mock_delay.assert_not_called()
557+
self.receiver.assert_not_called()
558+
559+
336560
class TestLoadAllSignals(FreezeSignalCacheMixin, TestCase):
337561
""" Tests for the load_all_signals method"""
338562
def setUp(self):

0 commit comments

Comments
 (0)