Skip to content

Commit 4289966

Browse files
RealOrangeOnenessita
authored andcommitted
Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14 (https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst). Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce, Jacob Walls, and Natalia Bidart for the reviews.
1 parent 218f69f commit 4289966

29 files changed

Lines changed: 3094 additions & 0 deletions

django/conf/global_settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,3 +672,8 @@ def gettext_noop(s):
672672
# HTTPS as the default protocol in urlize and urlizetrunc when no protocol is
673673
# provided. Set to True to assume HTTPS during the Django 6.x release cycle.
674674
URLIZE_ASSUME_HTTPS = False
675+
676+
#########
677+
# TASKS #
678+
#########
679+
TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}

django/tasks/__init__.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from django.utils.connection import BaseConnectionHandler, ConnectionProxy
2+
from django.utils.module_loading import import_string
3+
4+
from . import checks, signals # NOQA
5+
from .base import (
6+
DEFAULT_TASK_BACKEND_ALIAS,
7+
DEFAULT_TASK_QUEUE_NAME,
8+
Task,
9+
TaskContext,
10+
TaskResult,
11+
TaskResultStatus,
12+
task,
13+
)
14+
from .exceptions import InvalidTaskBackend
15+
16+
__all__ = [
17+
"DEFAULT_TASK_BACKEND_ALIAS",
18+
"DEFAULT_TASK_QUEUE_NAME",
19+
"default_task_backend",
20+
"task",
21+
"task_backends",
22+
"Task",
23+
"TaskContext",
24+
"TaskResult",
25+
"TaskResultStatus",
26+
]
27+
28+
29+
class TaskBackendHandler(BaseConnectionHandler):
30+
settings_name = "TASKS"
31+
exception_class = InvalidTaskBackend
32+
33+
def create_connection(self, alias):
34+
params = self.settings[alias]
35+
backend = params["BACKEND"]
36+
try:
37+
backend_cls = import_string(backend)
38+
except ImportError as e:
39+
raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e
40+
return backend_cls(alias=alias, params=params)
41+
42+
43+
task_backends = TaskBackendHandler()
44+
45+
default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS)

django/tasks/backends/__init__.py

Whitespace-only changes.

django/tasks/backends/base.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from abc import ABCMeta, abstractmethod
2+
from inspect import iscoroutinefunction
3+
4+
from asgiref.sync import sync_to_async
5+
6+
from django.conf import settings
7+
from django.core import checks
8+
from django.db import connections
9+
from django.tasks import DEFAULT_TASK_QUEUE_NAME
10+
from django.tasks.base import (
11+
DEFAULT_TASK_PRIORITY,
12+
TASK_MAX_PRIORITY,
13+
TASK_MIN_PRIORITY,
14+
Task,
15+
)
16+
from django.tasks.exceptions import InvalidTask
17+
from django.utils import timezone
18+
from django.utils.inspect import get_func_args, is_module_level_function
19+
20+
21+
class BaseTaskBackend(metaclass=ABCMeta):
22+
task_class = Task
23+
24+
# Does the backend support Tasks to be enqueued with the run_after
25+
# attribute?
26+
supports_defer = False
27+
28+
# Does the backend support coroutines to be enqueued?
29+
supports_async_task = False
30+
31+
# Does the backend support results being retrieved (from any
32+
# thread/process)?
33+
supports_get_result = False
34+
35+
# Does the backend support executing Tasks in a given
36+
# priority order?
37+
supports_priority = False
38+
39+
def __init__(self, alias, params):
40+
self.alias = alias
41+
self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
42+
self.enqueue_on_commit = bool(params.get("ENQUEUE_ON_COMMIT", True))
43+
self.options = params.get("OPTIONS", {})
44+
45+
def _get_enqueue_on_commit_for_task(self, task):
46+
return (
47+
task.enqueue_on_commit
48+
if task.enqueue_on_commit is not None
49+
else self.enqueue_on_commit
50+
)
51+
52+
def validate_task(self, task):
53+
"""
54+
Determine whether the provided Task can be executed by the backend.
55+
"""
56+
if not is_module_level_function(task.func):
57+
raise InvalidTask("Task function must be defined at a module level.")
58+
59+
if not self.supports_async_task and iscoroutinefunction(task.func):
60+
raise InvalidTask("Backend does not support async Tasks.")
61+
62+
task_func_args = get_func_args(task.func)
63+
if task.takes_context and (
64+
not task_func_args or task_func_args[0] != "context"
65+
):
66+
raise InvalidTask(
67+
"Task takes context but does not have a first argument of 'context'."
68+
)
69+
70+
if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY:
71+
raise InvalidTask("Backend does not support setting priority of tasks.")
72+
if (
73+
task.priority < TASK_MIN_PRIORITY
74+
or task.priority > TASK_MAX_PRIORITY
75+
or int(task.priority) != task.priority
76+
):
77+
raise InvalidTask(
78+
f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
79+
f"{TASK_MAX_PRIORITY}."
80+
)
81+
82+
if not self.supports_defer and task.run_after is not None:
83+
raise InvalidTask("Backend does not support run_after.")
84+
85+
if (
86+
settings.USE_TZ
87+
and task.run_after is not None
88+
and not timezone.is_aware(task.run_after)
89+
):
90+
raise InvalidTask("run_after must be an aware datetime.")
91+
92+
if self.queues and task.queue_name not in self.queues:
93+
raise InvalidTask(f"Queue '{task.queue_name}' is not valid for backend.")
94+
95+
@abstractmethod
96+
def enqueue(self, task, args, kwargs):
97+
"""Queue up a task to be executed."""
98+
99+
async def aenqueue(self, task, args, kwargs):
100+
"""Queue up a task function (or coroutine) to be executed."""
101+
return await sync_to_async(self.enqueue, thread_sensitive=True)(
102+
task=task, args=args, kwargs=kwargs
103+
)
104+
105+
def get_result(self, result_id):
106+
"""
107+
Retrieve a task result by id.
108+
109+
Raise TaskResultDoesNotExist if such result does not exist.
110+
"""
111+
raise NotImplementedError(
112+
"This backend does not support retrieving or refreshing results."
113+
)
114+
115+
async def aget_result(self, result_id):
116+
"""See get_result()."""
117+
return await sync_to_async(self.get_result, thread_sensitive=True)(
118+
result_id=result_id
119+
)
120+
121+
def check(self, **kwargs):
122+
if self.enqueue_on_commit and not connections._settings:
123+
yield checks.Error(
124+
"ENQUEUE_ON_COMMIT cannot be used when no databases are configured.",
125+
hint="Set ENQUEUE_ON_COMMIT to False",
126+
id="tasks.E001",
127+
)
128+
129+
elif (
130+
self.enqueue_on_commit
131+
and not connections["default"].features.supports_transactions
132+
):
133+
yield checks.Error(
134+
"ENQUEUE_ON_COMMIT cannot be used on a database which doesn't support "
135+
"transactions.",
136+
hint="Set ENQUEUE_ON_COMMIT to False",
137+
id="tasks.E002",
138+
)

django/tasks/backends/dummy.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from copy import deepcopy
2+
from functools import partial
3+
4+
from django.db import transaction
5+
from django.tasks.base import TaskResult, TaskResultStatus
6+
from django.tasks.exceptions import TaskResultDoesNotExist
7+
from django.tasks.signals import task_enqueued
8+
from django.utils import timezone
9+
from django.utils.crypto import get_random_string
10+
11+
from .base import BaseTaskBackend
12+
13+
14+
class DummyBackend(BaseTaskBackend):
15+
supports_defer = True
16+
supports_async_task = True
17+
supports_priority = True
18+
19+
def __init__(self, alias, params):
20+
super().__init__(alias, params)
21+
self.results = []
22+
23+
def _store_result(self, result):
24+
object.__setattr__(result, "enqueued_at", timezone.now())
25+
self.results.append(result)
26+
task_enqueued.send(type(self), task_result=result)
27+
28+
def enqueue(self, task, args, kwargs):
29+
self.validate_task(task)
30+
31+
result = TaskResult(
32+
task=task,
33+
id=get_random_string(32),
34+
status=TaskResultStatus.READY,
35+
enqueued_at=None,
36+
started_at=None,
37+
last_attempted_at=None,
38+
finished_at=None,
39+
args=args,
40+
kwargs=kwargs,
41+
backend=self.alias,
42+
errors=[],
43+
worker_ids=[],
44+
)
45+
46+
if self._get_enqueue_on_commit_for_task(task) is not False:
47+
transaction.on_commit(partial(self._store_result, result))
48+
else:
49+
self._store_result(result)
50+
51+
# Copy the task to prevent mutation issues.
52+
return deepcopy(result)
53+
54+
def get_result(self, result_id):
55+
# Results are only scoped to the current thread, hence
56+
# supports_get_result is False.
57+
try:
58+
return next(result for result in self.results if result.id == result_id)
59+
except StopIteration:
60+
raise TaskResultDoesNotExist(result_id) from None
61+
62+
async def aget_result(self, result_id):
63+
try:
64+
return next(result for result in self.results if result.id == result_id)
65+
except StopIteration:
66+
raise TaskResultDoesNotExist(result_id) from None
67+
68+
def clear(self):
69+
self.results.clear()

django/tasks/backends/immediate.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import logging
2+
from functools import partial
3+
from traceback import format_exception
4+
5+
from django.db import transaction
6+
from django.tasks.base import TaskContext, TaskError, TaskResult, TaskResultStatus
7+
from django.tasks.signals import task_enqueued, task_finished, task_started
8+
from django.utils import timezone
9+
from django.utils.crypto import get_random_string
10+
from django.utils.json import normalize_json
11+
12+
from .base import BaseTaskBackend
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class ImmediateBackend(BaseTaskBackend):
18+
supports_async_task = True
19+
supports_priority = True
20+
21+
def __init__(self, alias, params):
22+
super().__init__(alias, params)
23+
self.worker_id = get_random_string(32)
24+
25+
def _execute_task(self, task_result):
26+
"""
27+
Execute the Task for the given TaskResult, mutating it with the
28+
outcome.
29+
"""
30+
object.__setattr__(task_result, "enqueued_at", timezone.now())
31+
task_enqueued.send(type(self), task_result=task_result)
32+
33+
task = task_result.task
34+
task_start_time = timezone.now()
35+
object.__setattr__(task_result, "status", TaskResultStatus.RUNNING)
36+
object.__setattr__(task_result, "started_at", task_start_time)
37+
object.__setattr__(task_result, "last_attempted_at", task_start_time)
38+
task_result.worker_ids.append(self.worker_id)
39+
task_started.send(sender=type(self), task_result=task_result)
40+
41+
try:
42+
if task.takes_context:
43+
raw_return_value = task.call(
44+
TaskContext(task_result=task_result),
45+
*task_result.args,
46+
**task_result.kwargs,
47+
)
48+
else:
49+
raw_return_value = task.call(*task_result.args, **task_result.kwargs)
50+
51+
object.__setattr__(
52+
task_result,
53+
"_return_value",
54+
normalize_json(raw_return_value),
55+
)
56+
except KeyboardInterrupt:
57+
# If the user tried to terminate, let them
58+
raise
59+
except BaseException as e:
60+
object.__setattr__(task_result, "finished_at", timezone.now())
61+
exception_type = type(e)
62+
task_result.errors.append(
63+
TaskError(
64+
exception_class_path=(
65+
f"{exception_type.__module__}.{exception_type.__qualname__}"
66+
),
67+
traceback="".join(format_exception(e)),
68+
)
69+
)
70+
object.__setattr__(task_result, "status", TaskResultStatus.FAILED)
71+
task_finished.send(type(self), task_result=task_result)
72+
else:
73+
object.__setattr__(task_result, "finished_at", timezone.now())
74+
object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL)
75+
task_finished.send(type(self), task_result=task_result)
76+
77+
def enqueue(self, task, args, kwargs):
78+
self.validate_task(task)
79+
80+
task_result = TaskResult(
81+
task=task,
82+
id=get_random_string(32),
83+
status=TaskResultStatus.READY,
84+
enqueued_at=None,
85+
started_at=None,
86+
last_attempted_at=None,
87+
finished_at=None,
88+
args=args,
89+
kwargs=kwargs,
90+
backend=self.alias,
91+
errors=[],
92+
worker_ids=[],
93+
)
94+
95+
if self._get_enqueue_on_commit_for_task(task) is not False:
96+
transaction.on_commit(partial(self._execute_task, task_result))
97+
else:
98+
self._execute_task(task_result)
99+
100+
return task_result

0 commit comments

Comments
 (0)