Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions django_celery_beat/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,15 @@ class PeriodicTaskAdmin(admin.ModelAdmin):
celery_app = current_app
date_hierarchy = 'start_time'
list_display = ('name', 'enabled', 'scheduler', 'interval', 'start_time',
'last_run_at', 'one_off')
list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at']
'last_run_at', 'one_off', 'from_configuration')
list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at',
'from_configuration']
actions = ('enable_tasks', 'disable_tasks', 'toggle_tasks', 'run_tasks')
search_fields = ('name', 'task',)
fieldsets = (
(None, {
'fields': ('name', 'regtask', 'task', 'enabled', 'description',),
'fields': ('name', 'regtask', 'task', 'enabled',
'from_configuration', 'description',),
'classes': ('extrapretty', 'wide'),
}),
(_('Schedule'), {
Expand All @@ -140,7 +142,7 @@ class PeriodicTaskAdmin(admin.ModelAdmin):
}),
)
readonly_fields = (
'last_run_at', 'crontab_translation',
'last_run_at', 'crontab_translation', 'from_configuration',
)

def crontab_translation(self, obj):
Expand All @@ -156,6 +158,17 @@ def changeform_view(self, request, object_id=None, form_url='',
for crontab in crontabs:
crontab_dict[crontab.id] = crontab.human_readable
extra_context['readable_crontabs'] = crontab_dict
if object_id and request.method == 'GET':
obj = self.get_object(request, object_id)
if obj is not None and obj.from_configuration:
messages.warning(
request,
_('This task was imported from CELERY_BEAT_SCHEDULE. '
'Any changes saved here will be reverted the next '
'time celery beat restarts. To make changes durable, '
'edit the task in your application configuration, or '
'remove it from CELERY_BEAT_SCHEDULE first.'),
)
return super().changeform_view(request, object_id,
extra_context=extra_context)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# flake8: noqa
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_celery_beat', '0019_alter_periodictasks_options'),
]

operations = [
migrations.AddField(
model_name='periodictask',
name='from_configuration',
field=models.BooleanField(
default=False,
editable=False,
help_text=(
'Set automatically when the task is imported from the '
'celery beat_schedule configuration. Edits made here '
'will be reverted on the next celery beat startup. '
'Remove the entry from beat_schedule to manage this '
'task only via the database.'
),
verbose_name='From configuration',
),
),
]
10 changes: 10 additions & 0 deletions django_celery_beat/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,16 @@ class PeriodicTask(models.Model):
verbose_name=_('Enabled'),
help_text=_('Set to False to disable the schedule'),
)
from_configuration = models.BooleanField(
default=False,
editable=False,
verbose_name=_('From configuration'),
help_text=_(
'Set automatically when the task is imported from the celery '
'beat_schedule configuration. Edits made here will be reverted '
'on the next celery beat startup. Remove the entry from '
'beat_schedule to manage this task only via the database.'),
)
last_run_at = models.DateTimeField(
auto_now=False, auto_now_add=False,
editable=False, blank=True, null=True,
Expand Down
39 changes: 35 additions & 4 deletions django_celery_beat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,14 @@ def to_model_schedule(cls, schedule):

@classmethod
def from_entry(cls, name, app=None, **entry):
defaults = cls._unpack_fields(**entry)
defaults['from_configuration'] = True
# Config is the source of truth: re-add to beat_schedule re-enables a
# row we previously auto-disabled, and overrides any manual disable
# done in the admin (the admin warns about this).
defaults['enabled'] = True
obj, created = PeriodicTask._default_manager.update_or_create(
name=name, defaults=cls._unpack_fields(**entry),
name=name, defaults=defaults,
)
return cls(obj, app=app)

Expand Down Expand Up @@ -255,8 +261,10 @@ def __init__(self, *args, **kwargs):
or DEFAULT_MAX_INTERVAL)

def setup_schedule(self):
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.beat_schedule)
installed_names = set()
installed_names |= self.install_default_entries(self.schedule)
installed_names |= self.update_from_dict(self.app.conf.beat_schedule)
self._disable_removed_from_configuration(installed_names)
Comment thread
auvipy marked this conversation as resolved.

def all_as_schedule(self):
debug('DatabaseScheduler: Fetching database schedule')
Expand Down Expand Up @@ -471,17 +479,20 @@ def sync(self):

def update_from_dict(self, mapping):
s = {}
installed = set()
for name, entry_fields in mapping.items():
try:
entry = self.Entry.from_entry(name,
app=self.app,
**entry_fields)
installed.add(name)
if entry.model.enabled:
Comment thread
serl marked this conversation as resolved.
s[name] = entry

except Exception as exc:
logger.exception(ADD_ENTRY_ERROR, name, exc, entry_fields)
self.schedule.update(s)
return installed

def install_default_entries(self, data):
entries = {}
Expand All @@ -493,7 +504,27 @@ def install_default_entries(self, data):
'options': {'expire_seconds': 12 * 3600},
},
)
self.update_from_dict(entries)
return self.update_from_dict(entries)

def _disable_removed_from_configuration(self, installed_names):
"""Disable rows imported from config whose names are no longer there.

Tasks created directly (admin/ORM) have ``from_configuration=False``
and are left untouched. We disable rather than delete so history
(last_run_at, total_run_count) is preserved and admins can review
what disappeared.
"""
qs = self.Model.objects.filter(
from_configuration=True, enabled=True,
).exclude(name__in=installed_names)
removed = list(qs.values_list('name', flat=True))
if removed:
qs.update(enabled=False, last_run_at=None)
PeriodicTasks.update_changed()
info(
'DatabaseScheduler: Disabled %d task(s) removed from '
'configuration: %s', len(removed), ', '.join(removed),
)

def schedules_equal(self, *args, **kwargs):
if self._heap_invalidated:
Expand Down
24 changes: 24 additions & 0 deletions docs/includes/introduction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,30 @@ You can use the ``enabled`` flag to temporarily disable a periodic task:
>>> periodic_task.save()


Tasks imported from ``CELERY_BEAT_SCHEDULE``
--------------------------------------------

Tasks defined in ``CELERY_BEAT_SCHEDULE`` (or ``app.conf.beat_schedule``) are
imported into the database when the **celery beat** service starts. Each
imported row is flagged with ``from_configuration=True`` so the scheduler can
tell it apart from tasks created directly via the admin or the ORM.

Two consequences follow:

* If you remove an entry from ``CELERY_BEAT_SCHEDULE`` and restart **beat**,
the corresponding ``PeriodicTask`` row is automatically **disabled** (not
deleted) on next startup. Re-adding the entry to ``CELERY_BEAT_SCHEDULE``
and restarting **beat** re-enables the row. Tasks created by hand via the
admin or the ORM are never touched.

* Edits made in the Django admin to a task imported from configuration are
**reverted on the next beat restart**, because the configuration is
re-imported via ``update_or_create`` and ``enabled`` is restored to
``True``. The admin shows a warning banner on the change form for these
tasks. To make changes durable, edit the task in your application
configuration, or remove it from ``CELERY_BEAT_SCHEDULE`` first.


Example running periodic tasks
------------------------------

Expand Down
159 changes: 154 additions & 5 deletions t/unit/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,16 +590,20 @@ def test_periodic_task_model_enabled_schedule(self):
assert e.model.expires is None
assert e.model.expire_seconds == 12 * 3600

def test_periodic_task_model_disabled_schedule(self):
self.m1.enabled = False
def test_periodic_task_model_disabled_schedule_is_re_enabled(self):
# Disabling a row that is still in beat_schedule does not stick:
# config is the source of truth, so the import re-enables it.
# (The admin warns that edits to imported tasks revert on restart.)
self.m1.save()
PeriodicTask.objects.filter(name=self.entry_name).update(enabled=False)

s = self.Scheduler(app=self.app)
sched = s.schedule
assert sched
assert len(sched) == 1
assert len(sched) == 2
assert 'celery.backend_cleanup' in sched
assert self.entry_name not in sched
assert self.entry_name in sched
task = PeriodicTask.objects.get(name=self.entry_name)
assert task.enabled is True

def test_periodic_task_model_schedule_type_change(self):
self.m1.interval = None
Expand All @@ -611,6 +615,117 @@ def test_periodic_task_model_schedule_type_change(self):
assert self.m1.interval
assert self.m1.crontab is None

def test_imported_task_marked_from_configuration(self):
self.Scheduler(app=self.app)
task = PeriodicTask.objects.get(name=self.entry_name)
assert task.from_configuration is True
backend_cleanup = PeriodicTask.objects.get(
name='celery.backend_cleanup')
assert backend_cleanup.from_configuration is True


@pytest.mark.django_db
class test_DatabaseSchedulerRemovedFromConfiguration(SchedulerCase):
"""Tasks removed from beat_schedule are auto-disabled (#248, #654)."""

Scheduler = TrackingScheduler

@pytest.fixture(autouse=True)
def setup_scheduler(self, app):
self.app = app
self.entry_name, entry = self.create_conf_entry()
self.app.conf.beat_schedule = {self.entry_name: entry}

def test_disables_task_when_removed_from_configuration(self):
scheduler = self.Scheduler(app=self.app)
task = PeriodicTask.objects.get(name=self.entry_name)
assert task.enabled is True
assert task.from_configuration is True
task.last_run_at = timezone.now()
task.save()

# Remove from configuration and reprocess.
self.app.conf.beat_schedule = {}
scheduler.setup_schedule()

# Row preserved, but disabled. last_run_at cleared.
task.refresh_from_db()
assert task.enabled is False
assert task.from_configuration is True
assert task.last_run_at is None
assert self.entry_name not in scheduler.schedule

def test_does_not_touch_orm_created_tasks(self):
# Drop config entirely; only an ORM-created task exists.
self.app.conf.beat_schedule = {}
orm_task = self.create_model_interval(schedule(timedelta(seconds=30)))
orm_task.name = 'orm-only-task'
orm_task.save()

self.Scheduler(app=self.app)

orm_task.refresh_from_db()
assert orm_task.enabled is True
assert orm_task.from_configuration is False

def test_backend_cleanup_disabled_when_result_expires_cleared(self):
self.app.conf.result_expires = 3600
scheduler = self.Scheduler(app=self.app)
cleanup = PeriodicTask.objects.get(name='celery.backend_cleanup')
assert cleanup.enabled is True
assert cleanup.from_configuration is True

# Clear result_expires: install_default_entries no longer adds it,
# so the orphan-disable step disables the existing row.
self.app.conf.result_expires = 0
self.app.conf.beat_schedule = {}
scheduler.setup_schedule()

cleanup.refresh_from_db()
assert cleanup.enabled is False
assert cleanup.from_configuration is True

def test_re_adding_task_to_configuration_re_enables_it(self):
scheduler = self.Scheduler(app=self.app)

# Remove from config and reprocess: orphan-disable kicks in.
original_schedule = dict(self.app.conf.beat_schedule)
self.app.conf.beat_schedule = {}
scheduler.setup_schedule()
task = PeriodicTask.objects.get(name=self.entry_name)
assert task.enabled is False

# Re-add to config and reprocess: row is re-enabled.
self.app.conf.beat_schedule = original_schedule
scheduler.setup_schedule()
task.refresh_from_db()
assert task.enabled is True
assert task.from_configuration is True

def test_invalid_edit_disables_task(self):
# An entry edited to an invalid form (e.g. unrecognized schedule
# type) must NOT silently keep running on the old stale schedule.
# The existing DB row gets disabled; the error is logged.
scheduler = self.Scheduler(app=self.app)
task = PeriodicTask.objects.get(name=self.entry_name)
assert task.enabled is True

broken_entry = dict(
task='djcelery.unittest.add',
schedule=object(),
args=(),
relative=False,
kwargs={},
options={'queue': 'extra_queue'},
)
self.app.conf.beat_schedule = {self.entry_name: broken_entry}
scheduler.setup_schedule()

task.refresh_from_db()
assert task.enabled is False
assert task.from_configuration is True
assert self.entry_name not in scheduler.schedule


@pytest.mark.django_db
class test_DatabaseScheduler(SchedulerCase):
Expand Down Expand Up @@ -1648,6 +1763,40 @@ def mock_apply_async(*args, **kwargs):
assert 'periodic_task_name' in self.captured_headers
assert self.captured_headers['periodic_task_name'] == self.m1.name

def test_changeform_warns_for_from_configuration_task(self, monkeypatch):
# Bypass the parent's template rendering; we only care about the
# message side effect.
monkeypatch.setattr(
'django.contrib.admin.ModelAdmin.changeform_view',
lambda self, request, object_id=None, form_url='',
extra_context=None: None,
)
self.m1.from_configuration = True
self.m1.save()

ma = PeriodicTaskAdmin(PeriodicTask, self.site)
self.request = self.patch_request(self.request_factory.get('/'))
ma.changeform_view(self.request, object_id=str(self.m1.pk))

queued = self.request._messages._queued_messages
assert len(queued) == 1
assert 'CELERY_BEAT_SCHEDULE' in str(queued[0].message)

def test_changeform_no_warning_for_regular_task(self, monkeypatch):
monkeypatch.setattr(
'django.contrib.admin.ModelAdmin.changeform_view',
lambda self, request, object_id=None, form_url='',
extra_context=None: None,
)
# m1 was created without going through config import.
assert self.m1.from_configuration is False

ma = PeriodicTaskAdmin(PeriodicTask, self.site)
self.request = self.patch_request(self.request_factory.get('/'))
ma.changeform_view(self.request, object_id=str(self.m1.pk))

assert self.request._messages._queued_messages == []


@pytest.mark.django_db
class test_timezone_offset_handling:
Expand Down
Loading