Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions django/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,3 +672,8 @@ def gettext_noop(s):
# HTTPS as the default protocol in urlize and urlizetrunc when no protocol is
# provided. Set to True to assume HTTPS during the Django 6.x release cycle.
URLIZE_ASSUME_HTTPS = False

#########
# TASKS #
#########
TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}
45 changes: 45 additions & 0 deletions django/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.utils.connection import BaseConnectionHandler, ConnectionProxy
from django.utils.module_loading import import_string

from . import checks, signals # NOQA
from .base import (
DEFAULT_TASK_BACKEND_ALIAS,
DEFAULT_TASK_QUEUE_NAME,
Task,
TaskContext,
TaskResult,
TaskResultStatus,
task,
)
from .exceptions import InvalidTaskBackend

__all__ = [
"DEFAULT_TASK_BACKEND_ALIAS",
"DEFAULT_TASK_QUEUE_NAME",
"default_task_backend",
"task",
"task_backends",
"Task",
"TaskContext",
"TaskResult",
"TaskResultStatus",
]


class TaskBackendHandler(BaseConnectionHandler):
settings_name = "TASKS"
exception_class = InvalidTaskBackend

def create_connection(self, alias):
params = self.settings[alias]
backend = params["BACKEND"]
try:
backend_cls = import_string(backend)
except ImportError as e:
raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e
return backend_cls(alias=alias, params=params)


task_backends = TaskBackendHandler()

default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS)
Empty file.
138 changes: 138 additions & 0 deletions django/tasks/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from abc import ABCMeta, abstractmethod
from inspect import iscoroutinefunction

from asgiref.sync import sync_to_async

from django.conf import settings
from django.core import checks
from django.db import connections
from django.tasks import DEFAULT_TASK_QUEUE_NAME
from django.tasks.base import (
DEFAULT_TASK_PRIORITY,
TASK_MAX_PRIORITY,
TASK_MIN_PRIORITY,
Task,
)
from django.tasks.exceptions import InvalidTask
from django.utils import timezone
from django.utils.inspect import get_func_args, is_module_level_function


class BaseTaskBackend(metaclass=ABCMeta):
task_class = Task

# Does the backend support Tasks to be enqueued with the run_after
# attribute?
supports_defer = False

# Does the backend support coroutines to be enqueued?
supports_async_task = False

# Does the backend support results being retrieved (from any
# thread/process)?
supports_get_result = False

# Does the backend support executing Tasks in a given
# priority order?
supports_priority = False

def __init__(self, alias, params):
self.alias = alias
self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
self.enqueue_on_commit = bool(params.get("ENQUEUE_ON_COMMIT", True))
self.options = params.get("OPTIONS", {})

def _get_enqueue_on_commit_for_task(self, task):
return (
task.enqueue_on_commit
if task.enqueue_on_commit is not None
else self.enqueue_on_commit
)

def validate_task(self, task):
"""
Determine whether the provided Task can be executed by the backend.
"""
if not is_module_level_function(task.func):
raise InvalidTask("Task function must be defined at a module level.")

if not self.supports_async_task and iscoroutinefunction(task.func):
raise InvalidTask("Backend does not support async Tasks.")

task_func_args = get_func_args(task.func)
if task.takes_context and (
not task_func_args or task_func_args[0] != "context"
):
raise InvalidTask(
"Task takes context but does not have a first argument of 'context'."
)

if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY:
raise InvalidTask("Backend does not support setting priority of tasks.")
if (
task.priority < TASK_MIN_PRIORITY
or task.priority > TASK_MAX_PRIORITY
or int(task.priority) != task.priority
):
raise InvalidTask(
f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
f"{TASK_MAX_PRIORITY}."
)

if not self.supports_defer and task.run_after is not None:
raise InvalidTask("Backend does not support run_after.")

if (
settings.USE_TZ
and task.run_after is not None
and not timezone.is_aware(task.run_after)
):
raise InvalidTask("run_after must be an aware datetime.")

if self.queues and task.queue_name not in self.queues:
raise InvalidTask(f"Queue '{task.queue_name}' is not valid for backend.")

@abstractmethod
def enqueue(self, task, args, kwargs):
"""Queue up a task to be executed."""

async def aenqueue(self, task, args, kwargs):
"""Queue up a task function (or coroutine) to be executed."""
return await sync_to_async(self.enqueue, thread_sensitive=True)(
task=task, args=args, kwargs=kwargs
)

def get_result(self, result_id):
"""
Retrieve a task result by id.

Raise TaskResultDoesNotExist if such result does not exist.
"""
raise NotImplementedError(
"This backend does not support retrieving or refreshing results."
)

async def aget_result(self, result_id):
"""See get_result()."""
return await sync_to_async(self.get_result, thread_sensitive=True)(
result_id=result_id
)

def check(self, **kwargs):
if self.enqueue_on_commit and not connections._settings:
yield checks.Error(
"ENQUEUE_ON_COMMIT cannot be used when no databases are configured.",
hint="Set ENQUEUE_ON_COMMIT to False",
id="tasks.E001",
)

elif (
self.enqueue_on_commit
and not connections["default"].features.supports_transactions
):
yield checks.Error(
"ENQUEUE_ON_COMMIT cannot be used on a database which doesn't support "
"transactions.",
hint="Set ENQUEUE_ON_COMMIT to False",
id="tasks.E002",
)
69 changes: 69 additions & 0 deletions django/tasks/backends/dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from copy import deepcopy
from functools import partial

from django.db import transaction
from django.tasks.base import TaskResult, TaskResultStatus
from django.tasks.exceptions import TaskResultDoesNotExist
from django.tasks.signals import task_enqueued
from django.utils import timezone
from django.utils.crypto import get_random_string

from .base import BaseTaskBackend


class DummyBackend(BaseTaskBackend):
supports_defer = True
supports_async_task = True
supports_priority = True

def __init__(self, alias, params):
super().__init__(alias, params)
self.results = []

def _store_result(self, result):
object.__setattr__(result, "enqueued_at", timezone.now())
self.results.append(result)
task_enqueued.send(type(self), task_result=result)

def enqueue(self, task, args, kwargs):
self.validate_task(task)

result = TaskResult(
task=task,
id=get_random_string(32),
status=TaskResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
worker_ids=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
transaction.on_commit(partial(self._store_result, result))
else:
self._store_result(result)

# Copy the task to prevent mutation issues.
return deepcopy(result)

def get_result(self, result_id):
# Results are only scoped to the current thread, hence
# supports_get_result is False.
try:
return next(result for result in self.results if result.id == result_id)
except StopIteration:
raise TaskResultDoesNotExist(result_id) from None

async def aget_result(self, result_id):
try:
return next(result for result in self.results if result.id == result_id)
except StopIteration:
raise TaskResultDoesNotExist(result_id) from None

def clear(self):
self.results.clear()
100 changes: 100 additions & 0 deletions django/tasks/backends/immediate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
from functools import partial
from traceback import format_exception

from django.db import transaction
from django.tasks.base import TaskContext, TaskError, TaskResult, TaskResultStatus
from django.tasks.signals import task_enqueued, task_finished, task_started
from django.utils import timezone
from django.utils.crypto import get_random_string
from django.utils.json import normalize_json

from .base import BaseTaskBackend

logger = logging.getLogger(__name__)


class ImmediateBackend(BaseTaskBackend):
supports_async_task = True
supports_priority = True

def __init__(self, alias, params):
super().__init__(alias, params)
self.worker_id = get_random_string(32)

def _execute_task(self, task_result):
"""
Execute the Task for the given TaskResult, mutating it with the
outcome.
"""
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)

task = task_result.task
task_start_time = timezone.now()
object.__setattr__(task_result, "status", TaskResultStatus.RUNNING)
object.__setattr__(task_result, "started_at", task_start_time)
object.__setattr__(task_result, "last_attempted_at", task_start_time)
task_result.worker_ids.append(self.worker_id)
task_started.send(sender=type(self), task_result=task_result)

try:
if task.takes_context:
raw_return_value = task.call(
TaskContext(task_result=task_result),
*task_result.args,
**task_result.kwargs,
)
else:
raw_return_value = task.call(*task_result.args, **task_result.kwargs)

object.__setattr__(
task_result,
"_return_value",
normalize_json(raw_return_value),
)
except KeyboardInterrupt:
# If the user tried to terminate, let them
raise
except BaseException as e:
object.__setattr__(task_result, "finished_at", timezone.now())
exception_type = type(e)
task_result.errors.append(
TaskError(
exception_class_path=(
f"{exception_type.__module__}.{exception_type.__qualname__}"
),
traceback="".join(format_exception(e)),
)
)
object.__setattr__(task_result, "status", TaskResultStatus.FAILED)
task_finished.send(type(self), task_result=task_result)
else:
object.__setattr__(task_result, "finished_at", timezone.now())
object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL)
task_finished.send(type(self), task_result=task_result)

def enqueue(self, task, args, kwargs):
self.validate_task(task)

task_result = TaskResult(
task=task,
id=get_random_string(32),
status=TaskResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
worker_ids=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
transaction.on_commit(partial(self._execute_task, task_result))
else:
self._execute_task(task_result)

return task_result
Loading
Loading