diff --git a/django_celery_beat/admin.py b/django_celery_beat/admin.py index f02df397..f742b45f 100644 --- a/django_celery_beat/admin.py +++ b/django_celery_beat/admin.py @@ -115,13 +115,15 @@ class PeriodicTaskAdmin(admin.ModelAdmin): celery_app = current_app date_hierarchy = 'start_time' list_display = ('name', 'enabled', 'scheduler', 'interval', 'start_time', - 'last_run_at', 'one_off') - list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at'] + 'last_run_at', 'one_off', 'from_configuration') + list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at', + 'from_configuration'] actions = ('enable_tasks', 'disable_tasks', 'toggle_tasks', 'run_tasks') search_fields = ('name', 'task',) fieldsets = ( (None, { - 'fields': ('name', 'regtask', 'task', 'enabled', 'description',), + 'fields': ('name', 'regtask', 'task', 'enabled', + 'from_configuration', 'description',), 'classes': ('extrapretty', 'wide'), }), (_('Schedule'), { @@ -140,7 +142,7 @@ class PeriodicTaskAdmin(admin.ModelAdmin): }), ) readonly_fields = ( - 'last_run_at', 'crontab_translation', + 'last_run_at', 'crontab_translation', 'from_configuration', ) def crontab_translation(self, obj): @@ -156,6 +158,17 @@ def changeform_view(self, request, object_id=None, form_url='', for crontab in crontabs: crontab_dict[crontab.id] = crontab.human_readable extra_context['readable_crontabs'] = crontab_dict + if object_id and request.method == 'GET': + obj = self.get_object(request, object_id) + if obj is not None and obj.from_configuration: + messages.warning( + request, + _('This task was imported from CELERY_BEAT_SCHEDULE. ' + 'Any changes saved here will be reverted the next ' + 'time celery beat restarts. To make changes durable, ' + 'edit the task in your application configuration, or ' + 'remove it from CELERY_BEAT_SCHEDULE first.'), + ) return super().changeform_view(request, object_id, extra_context=extra_context) diff --git a/django_celery_beat/migrations/0020_periodictask_from_configuration.py b/django_celery_beat/migrations/0020_periodictask_from_configuration.py new file mode 100644 index 00000000..fbcf120d --- /dev/null +++ b/django_celery_beat/migrations/0020_periodictask_from_configuration.py @@ -0,0 +1,28 @@ +# flake8: noqa +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0019_alter_periodictasks_options'), + ] + + operations = [ + migrations.AddField( + model_name='periodictask', + name='from_configuration', + field=models.BooleanField( + default=False, + editable=False, + help_text=( + 'Set automatically when the task is imported from the ' + 'celery beat_schedule configuration. Edits made here ' + 'will be reverted on the next celery beat startup. ' + 'Remove the entry from beat_schedule to manage this ' + 'task only via the database.' + ), + verbose_name='From configuration', + ), + ), + ] diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index 6f6fdc40..42cbfbd3 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -569,6 +569,16 @@ class PeriodicTask(models.Model): verbose_name=_('Enabled'), help_text=_('Set to False to disable the schedule'), ) + from_configuration = models.BooleanField( + default=False, + editable=False, + verbose_name=_('From configuration'), + help_text=_( + 'Set automatically when the task is imported from the celery ' + 'beat_schedule configuration. Edits made here will be reverted ' + 'on the next celery beat startup. Remove the entry from ' + 'beat_schedule to manage this task only via the database.'), + ) last_run_at = models.DateTimeField( auto_now=False, auto_now_add=False, editable=False, blank=True, null=True, diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 74898f40..d32cab7f 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -189,8 +189,14 @@ def to_model_schedule(cls, schedule): @classmethod def from_entry(cls, name, app=None, **entry): + defaults = cls._unpack_fields(**entry) + defaults['from_configuration'] = True + # Config is the source of truth: re-add to beat_schedule re-enables a + # row we previously auto-disabled, and overrides any manual disable + # done in the admin (the admin warns about this). + defaults['enabled'] = True obj, created = PeriodicTask._default_manager.update_or_create( - name=name, defaults=cls._unpack_fields(**entry), + name=name, defaults=defaults, ) return cls(obj, app=app) @@ -255,8 +261,10 @@ def __init__(self, *args, **kwargs): or DEFAULT_MAX_INTERVAL) def setup_schedule(self): - self.install_default_entries(self.schedule) - self.update_from_dict(self.app.conf.beat_schedule) + installed_names = set() + installed_names |= self.install_default_entries(self.schedule) + installed_names |= self.update_from_dict(self.app.conf.beat_schedule) + self._disable_removed_from_configuration(installed_names) def all_as_schedule(self): debug('DatabaseScheduler: Fetching database schedule') @@ -471,17 +479,20 @@ def sync(self): def update_from_dict(self, mapping): s = {} + installed = set() for name, entry_fields in mapping.items(): try: entry = self.Entry.from_entry(name, app=self.app, **entry_fields) + installed.add(name) if entry.model.enabled: s[name] = entry except Exception as exc: logger.exception(ADD_ENTRY_ERROR, name, exc, entry_fields) self.schedule.update(s) + return installed def install_default_entries(self, data): entries = {} @@ -493,7 +504,27 @@ def install_default_entries(self, data): 'options': {'expire_seconds': 12 * 3600}, }, ) - self.update_from_dict(entries) + return self.update_from_dict(entries) + + def _disable_removed_from_configuration(self, installed_names): + """Disable rows imported from config whose names are no longer there. + + Tasks created directly (admin/ORM) have ``from_configuration=False`` + and are left untouched. We disable rather than delete so history + (last_run_at, total_run_count) is preserved and admins can review + what disappeared. + """ + qs = self.Model.objects.filter( + from_configuration=True, enabled=True, + ).exclude(name__in=installed_names) + removed = list(qs.values_list('name', flat=True)) + if removed: + qs.update(enabled=False, last_run_at=None) + PeriodicTasks.update_changed() + info( + 'DatabaseScheduler: Disabled %d task(s) removed from ' + 'configuration: %s', len(removed), ', '.join(removed), + ) def schedules_equal(self, *args, **kwargs): if self._heap_invalidated: diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 01850ea0..3a694871 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -179,6 +179,30 @@ You can use the ``enabled`` flag to temporarily disable a periodic task: >>> periodic_task.save() +Tasks imported from ``CELERY_BEAT_SCHEDULE`` +-------------------------------------------- + +Tasks defined in ``CELERY_BEAT_SCHEDULE`` (or ``app.conf.beat_schedule``) are +imported into the database when the **celery beat** service starts. Each +imported row is flagged with ``from_configuration=True`` so the scheduler can +tell it apart from tasks created directly via the admin or the ORM. + +Two consequences follow: + +* If you remove an entry from ``CELERY_BEAT_SCHEDULE`` and restart **beat**, + the corresponding ``PeriodicTask`` row is automatically **disabled** (not + deleted) on next startup. Re-adding the entry to ``CELERY_BEAT_SCHEDULE`` + and restarting **beat** re-enables the row. Tasks created by hand via the + admin or the ORM are never touched. + +* Edits made in the Django admin to a task imported from configuration are + **reverted on the next beat restart**, because the configuration is + re-imported via ``update_or_create`` and ``enabled`` is restored to + ``True``. The admin shows a warning banner on the change form for these + tasks. To make changes durable, edit the task in your application + configuration, or remove it from ``CELERY_BEAT_SCHEDULE`` first. + + Example running periodic tasks ------------------------------ diff --git a/t/unit/test_schedulers.py b/t/unit/test_schedulers.py index 85c6694f..02043c57 100644 --- a/t/unit/test_schedulers.py +++ b/t/unit/test_schedulers.py @@ -590,16 +590,20 @@ def test_periodic_task_model_enabled_schedule(self): assert e.model.expires is None assert e.model.expire_seconds == 12 * 3600 - def test_periodic_task_model_disabled_schedule(self): - self.m1.enabled = False + def test_periodic_task_model_disabled_schedule_is_re_enabled(self): + # Disabling a row that is still in beat_schedule does not stick: + # config is the source of truth, so the import re-enables it. + # (The admin warns that edits to imported tasks revert on restart.) self.m1.save() + PeriodicTask.objects.filter(name=self.entry_name).update(enabled=False) s = self.Scheduler(app=self.app) sched = s.schedule - assert sched - assert len(sched) == 1 + assert len(sched) == 2 assert 'celery.backend_cleanup' in sched - assert self.entry_name not in sched + assert self.entry_name in sched + task = PeriodicTask.objects.get(name=self.entry_name) + assert task.enabled is True def test_periodic_task_model_schedule_type_change(self): self.m1.interval = None @@ -611,6 +615,117 @@ def test_periodic_task_model_schedule_type_change(self): assert self.m1.interval assert self.m1.crontab is None + def test_imported_task_marked_from_configuration(self): + self.Scheduler(app=self.app) + task = PeriodicTask.objects.get(name=self.entry_name) + assert task.from_configuration is True + backend_cleanup = PeriodicTask.objects.get( + name='celery.backend_cleanup') + assert backend_cleanup.from_configuration is True + + +@pytest.mark.django_db +class test_DatabaseSchedulerRemovedFromConfiguration(SchedulerCase): + """Tasks removed from beat_schedule are auto-disabled (#248, #654).""" + + Scheduler = TrackingScheduler + + @pytest.fixture(autouse=True) + def setup_scheduler(self, app): + self.app = app + self.entry_name, entry = self.create_conf_entry() + self.app.conf.beat_schedule = {self.entry_name: entry} + + def test_disables_task_when_removed_from_configuration(self): + scheduler = self.Scheduler(app=self.app) + task = PeriodicTask.objects.get(name=self.entry_name) + assert task.enabled is True + assert task.from_configuration is True + task.last_run_at = timezone.now() + task.save() + + # Remove from configuration and reprocess. + self.app.conf.beat_schedule = {} + scheduler.setup_schedule() + + # Row preserved, but disabled. last_run_at cleared. + task.refresh_from_db() + assert task.enabled is False + assert task.from_configuration is True + assert task.last_run_at is None + assert self.entry_name not in scheduler.schedule + + def test_does_not_touch_orm_created_tasks(self): + # Drop config entirely; only an ORM-created task exists. + self.app.conf.beat_schedule = {} + orm_task = self.create_model_interval(schedule(timedelta(seconds=30))) + orm_task.name = 'orm-only-task' + orm_task.save() + + self.Scheduler(app=self.app) + + orm_task.refresh_from_db() + assert orm_task.enabled is True + assert orm_task.from_configuration is False + + def test_backend_cleanup_disabled_when_result_expires_cleared(self): + self.app.conf.result_expires = 3600 + scheduler = self.Scheduler(app=self.app) + cleanup = PeriodicTask.objects.get(name='celery.backend_cleanup') + assert cleanup.enabled is True + assert cleanup.from_configuration is True + + # Clear result_expires: install_default_entries no longer adds it, + # so the orphan-disable step disables the existing row. + self.app.conf.result_expires = 0 + self.app.conf.beat_schedule = {} + scheduler.setup_schedule() + + cleanup.refresh_from_db() + assert cleanup.enabled is False + assert cleanup.from_configuration is True + + def test_re_adding_task_to_configuration_re_enables_it(self): + scheduler = self.Scheduler(app=self.app) + + # Remove from config and reprocess: orphan-disable kicks in. + original_schedule = dict(self.app.conf.beat_schedule) + self.app.conf.beat_schedule = {} + scheduler.setup_schedule() + task = PeriodicTask.objects.get(name=self.entry_name) + assert task.enabled is False + + # Re-add to config and reprocess: row is re-enabled. + self.app.conf.beat_schedule = original_schedule + scheduler.setup_schedule() + task.refresh_from_db() + assert task.enabled is True + assert task.from_configuration is True + + def test_invalid_edit_disables_task(self): + # An entry edited to an invalid form (e.g. unrecognized schedule + # type) must NOT silently keep running on the old stale schedule. + # The existing DB row gets disabled; the error is logged. + scheduler = self.Scheduler(app=self.app) + task = PeriodicTask.objects.get(name=self.entry_name) + assert task.enabled is True + + broken_entry = dict( + task='djcelery.unittest.add', + schedule=object(), + args=(), + relative=False, + kwargs={}, + options={'queue': 'extra_queue'}, + ) + self.app.conf.beat_schedule = {self.entry_name: broken_entry} + scheduler.setup_schedule() + + task.refresh_from_db() + assert task.enabled is False + assert task.from_configuration is True + assert self.entry_name not in scheduler.schedule + @pytest.mark.django_db class test_DatabaseScheduler(SchedulerCase): @@ -1648,6 +1763,40 @@ def mock_apply_async(*args, **kwargs): assert 'periodic_task_name' in self.captured_headers assert self.captured_headers['periodic_task_name'] == self.m1.name + def test_changeform_warns_for_from_configuration_task(self, monkeypatch): + # Bypass the parent's template rendering; we only care about the + # message side effect. + monkeypatch.setattr( + 'django.contrib.admin.ModelAdmin.changeform_view', + lambda self, request, object_id=None, form_url='', + extra_context=None: None, + ) + self.m1.from_configuration = True + self.m1.save() + + ma = PeriodicTaskAdmin(PeriodicTask, self.site) + self.request = self.patch_request(self.request_factory.get('/')) + ma.changeform_view(self.request, object_id=str(self.m1.pk)) + + queued = self.request._messages._queued_messages + assert len(queued) == 1 + assert 'CELERY_BEAT_SCHEDULE' in str(queued[0].message) + + def test_changeform_no_warning_for_regular_task(self, monkeypatch): + monkeypatch.setattr( + 'django.contrib.admin.ModelAdmin.changeform_view', + lambda self, request, object_id=None, form_url='', + extra_context=None: None, + ) + # m1 was created without going through config import. + assert self.m1.from_configuration is False + + ma = PeriodicTaskAdmin(PeriodicTask, self.site) + self.request = self.patch_request(self.request_factory.get('/')) + ma.changeform_view(self.request, object_id=str(self.m1.pk)) + + assert self.request._messages._queued_messages == [] + @pytest.mark.django_db class test_timezone_offset_handling: