1- From 30d49b1b5adf0acf72d331b2572f5e54af1c35fb Mon Sep 17 00:00:00 2001
1+ From 52efb38152f8a4386a7e289cd716741ff6f59f66 Mon Sep 17 00:00:00 2001
22From: Dennis Kliban <dkliban@redhat.com>
33Date: Tue, 25 Nov 2025 11:06:48 -0500
44Subject: [PATCH] Adds 'redis' WORKER_TYPE
@@ -13,12 +13,12 @@ PostgreSQL database.
1313 pulpcore/app/viewsets/task.py | 22 +-
1414 pulpcore/tasking/_util.py | 9 +-
1515 pulpcore/tasking/entrypoint.py | 13 +-
16- pulpcore/tasking/redis_locks.py | 245 ++++++
17- pulpcore/tasking/redis_tasks.py | 607 ++++++++++++++
16+ pulpcore/tasking/redis_locks.py | 263 ++++++
17+ pulpcore/tasking/redis_tasks.py | 631 ++++++++++++++
1818 pulpcore/tasking/redis_worker.py | 778 ++++++++++++++++++
1919 pulpcore/tasking/tasks.py | 17 +
2020 pulpcore/tests/functional/api/test_tasking.py | 288 +++++++
21- 11 files changed, 2032 insertions(+), 12 deletions(-)
21+ 11 files changed, 2074 insertions(+), 12 deletions(-)
2222 create mode 100644 pulpcore/tasking/redis_locks.py
2323 create mode 100644 pulpcore/tasking/redis_tasks.py
2424 create mode 100644 pulpcore/tasking/redis_worker.py
@@ -244,10 +244,10 @@ index fdc9b0de8..a3eb7e01c 100644
244244+ raise ValueError(f"Invalid WORKER_TYPE: {worker_type}. Must be 'pulpcore' or 'redis'.")
245245diff --git a/pulpcore/tasking/redis_locks.py b/pulpcore/tasking/redis_locks.py
246246new file mode 100644
247- index 000000000..84e8c6a4b
247+ index 000000000..0ddc6dfff
248248--- /dev/null
249249+++ b/pulpcore/tasking/redis_locks.py
250- @@ -0,0 +1,245 @@
250+ @@ -0,0 +1,263 @@
251251+ """
252252+ Redis distributed lock utilities for task resource coordination.
253253+
@@ -465,40 +465,58 @@ index 000000000..84e8c6a4b
465465+ release_shared_resource_locks(redis_conn, lock_owner, shared_resources)
466466+
467467+
468- + async def async_release_resource_locks(redis_conn, lock_owner, resources):
468+ + async def async_release_resource_locks(redis_conn, lock_owner, resources, shared_resources=None ):
469469+ """
470- + Async version: Release Redis distributed locks for the given resources.
470+ + Async version: Release Redis distributed locks for exclusive and shared resources.
471471+
472- + Uses a Lua script to ensure we only release locks that we own.
472+ + Uses a Lua script to ensure we only release exclusive locks that we own.
473+ + Removes task from shared resource sets.
473474+
474475+ Args:
475476+ redis_conn: Redis connection
476477+ lock_owner (str): The identifier of the lock owner
477- + resources (list): List of resource names to release locks for
478+ + resources (list): List of exclusive resource names to release locks for
479+ + shared_resources (list): Optional list of shared resource names
478480+ """
479481+ if not redis_conn:
480482+ return
481483+
482- + # Register the unlock script
483- + unlock_script = await sync_to_async(redis_conn.register_script)(REDIS_UNLOCK_SCRIPT)
484+ + # Release exclusive locks
485+ + if resources:
486+ + # Register the unlock script
487+ + unlock_script = await sync_to_async(redis_conn.register_script)(REDIS_UNLOCK_SCRIPT)
484488+
485- + for resource in resources:
486- + try:
487- + lock_key = resource_to_lock_key(resource)
488- + # Use Lua script to atomically check and delete only if we own the lock
489- + released = await sync_to_async(unlock_script)(keys=[lock_key], args=[lock_owner])
490- + if released:
491- + _logger.debug("Released lock for resource: %s", resource)
492- + else:
493- + _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner)
494- + except Exception as e:
495- + _logger.error("Error releasing lock for resource %s: %s", resource, e)
489+ + for resource in resources:
490+ + try:
491+ + lock_key = resource_to_lock_key(resource)
492+ + # Use Lua script to atomically check and delete only if we own the lock
493+ + released = await sync_to_async(unlock_script)(keys=[lock_key], args=[lock_owner])
494+ + if released:
495+ + _logger.debug("Released exclusive lock for resource: %s", resource)
496+ + else:
497+ + _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner)
498+ + except Exception as e:
499+ + _logger.error("Error releasing lock for resource %s: %s", resource, e)
500+ +
501+ + # Release shared resources
502+ + if shared_resources:
503+ + for resource in shared_resources:
504+ + try:
505+ + lock_key = resource_to_lock_key(resource)
506+ + # Remove this task from the shared resource set
507+ + removed = await sync_to_async(redis_conn.srem)(lock_key, lock_owner)
508+ + if removed:
509+ + _logger.debug("Released shared resource: %s", resource)
510+ + else:
511+ + _logger.warning("Shared resource %s did not contain %s", resource, lock_owner)
512+ + except Exception as e:
513+ + _logger.error("Error releasing shared resource %s: %s", resource, e)
496514diff --git a/pulpcore/tasking/redis_tasks.py b/pulpcore/tasking/redis_tasks.py
497515new file mode 100644
498- index 000000000..8c759c5db
516+ index 000000000..cc950f1db
499517--- /dev/null
500518+++ b/pulpcore/tasking/redis_tasks.py
501- @@ -0,0 +1,607 @@
519+ @@ -0,0 +1,631 @@
502520+ """
503521+ Task dispatch functions for Redis-based worker implementation.
504522+
@@ -726,20 +744,32 @@ index 000000000..8c759c5db
726744+ return None
727745+ finally:
728746+ # Release Redis locks if this was an immediate task
729- + if hasattr(task, '_locked_resources') and task._locked_resources:
747+ + exclusive_resources = getattr(task, '_locked_resources', None)
748+ + shared_resources = getattr(task, '_locked_shared_resources', None)
749+ +
750+ + if exclusive_resources or shared_resources:
730751+ current_app = AppStatus.objects.current()
731752+ redis_conn = get_redis_connection()
732753+ lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
733754+ _logger.info(
734- + "RESOURCE LOCK RELEASE: Task %s releasing resource locks with owner=%s (AppStatus.current=%s) for resources: %s ",
755+ + "RESOURCE LOCK RELEASE: Task %s releasing %d exclusive and %d shared resource locks with owner=%s (AppStatus.current=%s)",
735756+ task.pk,
757+ + len(exclusive_resources) if exclusive_resources else 0,
758+ + len(shared_resources) if shared_resources else 0,
759+ + lock_owner,
760+ + current_app.name if current_app else "None"
761+ + )
762+ + release_resource_locks(
763+ + redis_conn,
736764+ lock_owner,
737- + current_app.name if current_app else "None" ,
738- + task._locked_resources
765+ + exclusive_resources or [] ,
766+ + shared_resources or []
739767+ )
740- + release_resource_locks(redis_conn, lock_owner, task._locked_resources)
741- + # Clear the attribute so worker knows locks were released
742- + del task._locked_resources
768+ + # Clear the attributes so worker knows locks were released
769+ + if hasattr(task, '_locked_resources'):
770+ + del task._locked_resources
771+ + if hasattr(task, '_locked_shared_resources'):
772+ + del task._locked_shared_resources
743773+
744774+
745775+ async def aexecute_task(task):
@@ -784,20 +814,32 @@ index 000000000..8c759c5db
784814+ return None
785815+ finally:
786816+ # Release Redis locks if this was an immediate task
787- + if hasattr(task, '_locked_resources') and task._locked_resources:
817+ + exclusive_resources = getattr(task, '_locked_resources', None)
818+ + shared_resources = getattr(task, '_locked_shared_resources', None)
819+ +
820+ + if exclusive_resources or shared_resources:
788821+ current_app = await sync_to_async(AppStatus.objects.current)()
789822+ redis_conn = get_redis_connection()
790823+ lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
791824+ _logger.info(
792- + "RESOURCE LOCK RELEASE (async): Task %s releasing resource locks with owner=%s (AppStatus.current=%s) for resources: %s ",
825+ + "RESOURCE LOCK RELEASE (async): Task %s releasing %d exclusive and %d shared resource locks with owner=%s (AppStatus.current=%s)",
793826+ task.pk,
827+ + len(exclusive_resources) if exclusive_resources else 0,
828+ + len(shared_resources) if shared_resources else 0,
829+ + lock_owner,
830+ + current_app.name if current_app else "None"
831+ + )
832+ + await async_release_resource_locks(
833+ + redis_conn,
794834+ lock_owner,
795- + current_app.name if current_app else "None" ,
796- + task._locked_resources
835+ + exclusive_resources or [] ,
836+ + shared_resources or []
797837+ )
798- + await async_release_resource_locks(redis_conn, lock_owner, task._locked_resources)
799- + # Clear the attribute so worker knows locks were released
800- + del task._locked_resources
838+ + # Clear the attributes so worker knows locks were released
839+ + if hasattr(task, '_locked_resources'):
840+ + del task._locked_resources
841+ + if hasattr(task, '_locked_shared_resources'):
842+ + del task._locked_shared_resources
801843+
802844+
803845+ def are_resources_available(colliding_resources, task: Task) -> bool:
0 commit comments