Skip to content

Commit 50c8a38

Browse files
authored
Store shared resources in Redis. (#814)
This moves all the resource locking logic into a Lua script executed atomically in Redis.
1 parent dac763d commit 50c8a38

2 files changed

Lines changed: 288 additions & 117 deletions

File tree

images/assets/patches/0033-Adds-use-of-NewPulpcoreWorker-from-pulp_service.patch

Lines changed: 232 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
From f721dcadf3fd422743f7ee83b5e36ef8ca6fd197 Mon Sep 17 00:00:00 2001
22
From: Dennis Kliban <dkliban@redhat.com>
33
Date: Tue, 25 Nov 2025 11:06:48 -0500
4-
Subject: [PATCH 1/9] Adds use of NewPulpcoreWorker from pulp_service.
4+
Subject: [PATCH 01/10] Adds use of NewPulpcoreWorker from pulp_service.
55

66
This also adds Redis based locking for tasks and resources.
77
---
@@ -570,7 +570,7 @@ index c187bd919..3ef86ba19 100644
570570
From 6ac2f55c2aae9ee53da0d5f4cac4384bff84feb7 Mon Sep 17 00:00:00 2001
571571
From: Dennis Kliban <dkliban@redhat.com>
572572
Date: Fri, 12 Dec 2025 21:27:32 -0500
573-
Subject: [PATCH 2/9] fix releasing of locks for immediate tasks
573+
Subject: [PATCH 02/10] fix releasing of locks for immediate tasks
574574

575575
---
576576
pulpcore/tasking/tasks.py | 94 ++++++++++++++++++++++++++++++++++++---
@@ -727,7 +727,7 @@ index 3ef86ba19..6b7466980 100644
727727
From e70783f2bc64a99dda25c829ff133a0d059585e2 Mon Sep 17 00:00:00 2001
728728
From: Dennis Kliban <dkliban@redhat.com>
729729
Date: Fri, 12 Dec 2025 21:51:11 -0500
730-
Subject: [PATCH 3/9] Release locks in execute task
730+
Subject: [PATCH 03/10] Release locks in execute task
731731

732732
---
733733
pulpcore/tasking/tasks.py | 66 ++++++++++++---------------------------
@@ -828,7 +828,7 @@ index 6b7466980..9e9e6f6d7 100644
828828
From 75f249190eeef4acbae366bb2b5392d01643e41a Mon Sep 17 00:00:00 2001
829829
From: Dennis Kliban <dkliban@redhat.com>
830830
Date: Sat, 13 Dec 2025 05:46:49 -0500
831-
Subject: [PATCH 4/9] Make sure locks are released even if exceptions occur
831+
Subject: [PATCH 04/10] Make sure locks are released even if exceptions occur
832832
outside of execute_task.
833833

834834
---
@@ -932,7 +932,7 @@ index 9e9e6f6d7..2d854099c 100644
932932
From ab0cd196500334388f63087714bdb6b235ef7c50 Mon Sep 17 00:00:00 2001
933933
From: Dennis Kliban <dkliban@redhat.com>
934934
Date: Fri, 19 Dec 2025 14:26:52 -0500
935-
Subject: [PATCH 5/9] API workers acquire Task lock before executing task
935+
Subject: [PATCH 05/10] API workers acquire Task lock before executing task
936936

937937
---
938938
pulpcore/tasking/tasks.py | 142 +++++++++++++++++++++++++++-----------
@@ -1113,7 +1113,7 @@ index 2d854099c..3e7ee72d6 100644
11131113
From 5b024ec6e69c53ca897b2697507c0af1b1b70c83 Mon Sep 17 00:00:00 2001
11141114
From: Dennis Kliban <dkliban@redhat.com>
11151115
Date: Fri, 19 Dec 2025 14:59:28 -0500
1116-
Subject: [PATCH 6/9] Improve logging for resource lock release
1116+
Subject: [PATCH 06/10] Improve logging for resource lock release
11171117

11181118
---
11191119
pulpcore/tasking/tasks.py | 10 +++++++---
@@ -1173,7 +1173,7 @@ index 3e7ee72d6..a9e5c4885 100644
11731173
From d384eecf3d640cfb800a8aa7a80f1847b0d87ac1 Mon Sep 17 00:00:00 2001
11741174
From: Dennis Kliban <dkliban@redhat.com>
11751175
Date: Fri, 19 Dec 2025 21:23:28 -0500
1176-
Subject: [PATCH 7/9] Revert the set_guid() in immediate tasks
1176+
Subject: [PATCH 07/10] Revert the set_guid() in immediate tasks
11771177

11781178
---
11791179
pulpcore/tasking/tasks.py | 6 +-----
@@ -1217,7 +1217,7 @@ index a9e5c4885..23a3641ad 100644
12171217
From 46c224559161991c4ae29a537e94065acd18e73b Mon Sep 17 00:00:00 2001
12181218
From: Dennis Kliban <dkliban@redhat.com>
12191219
Date: Fri, 19 Dec 2025 23:00:16 -0500
1220-
Subject: [PATCH 8/9] Log task state after setting to running.
1220+
Subject: [PATCH 08/10] Log task state after setting to running.
12211221

12221222
---
12231223
pulpcore/tasking/tasks.py | 16 ++++++++++++++++
@@ -1264,7 +1264,7 @@ index 23a3641ad..e52be163d 100644
12641264
From 24f1d953a03bf7145e506fbcbabd920f822fead7 Mon Sep 17 00:00:00 2001
12651265
From: Dennis Kliban <dkliban@redhat.com>
12661266
Date: Sun, 21 Dec 2025 07:53:12 -0500
1267-
Subject: [PATCH 9/9] Don't save immediate Tasks a second time when resources
1267+
Subject: [PATCH 09/10] Don't save immediate Tasks a second time when resources
12681268
are not available
12691269

12701270
---
@@ -1314,3 +1314,226 @@ index e52be163d..8725c3055 100644
13141314
--
13151315
2.52.0
13161316

1317+
1318+
From 93f91370dbb4f007598ec9ab2bcddd404f4ac658 Mon Sep 17 00:00:00 2001
1319+
From: Dennis Kliban <dkliban@redhat.com>
1320+
Date: Mon, 22 Dec 2025 14:48:06 -0500
1321+
Subject: [PATCH 10/10] Move exclusive and shared lock acquisitin to a Lua
1322+
script
1323+
1324+
This acquires resource locks using a Lua script which executes in Redis.
1325+
---
1326+
pulpcore/tasking/_util.py | 178 ++++++++++++++++++++++++++++++++++----
1327+
1 file changed, 162 insertions(+), 16 deletions(-)
1328+
1329+
diff --git a/pulpcore/tasking/_util.py b/pulpcore/tasking/_util.py
1330+
index cdc3303ef..d257323f0 100644
1331+
--- a/pulpcore/tasking/_util.py
1332+
+++ b/pulpcore/tasking/_util.py
1333+
@@ -45,6 +45,72 @@ else
1334+
end
1335+
"""
1336+
1337+
+REDIS_ACQUIRE_LOCKS_SCRIPT = """
1338+
+-- KEYS: [exclusive_lock_keys..., shared_lock_keys...]
1339+
+-- ARGV[1]: lock_owner (task identifier)
1340+
+-- ARGV[2]: number of exclusive resources
1341+
+-- ARGV[3...]: exclusive resource names (for error reporting)
1342+
+-- Returns: empty table if success, table of blocked exclusive resource names if failed
1343+
+
1344+
+local num_exclusive = tonumber(ARGV[2])
1345+
+local lock_owner = ARGV[1]
1346+
+local acquired_exclusive = {}
1347+
+local acquired_shared = {}
1348+
+local blocked_resources = {}
1349+
+
1350+
+-- Try to acquire exclusive locks
1351+
+for i = 1, num_exclusive do
1352+
+ local key = KEYS[i]
1353+
+ local resource_name = ARGV[2 + i]
1354+
+
1355+
+ -- Check if lock exists
1356+
+ if redis.call("exists", key) == 1 then
1357+
+ -- Lock already held, add to blocked list
1358+
+ table.insert(blocked_resources, resource_name)
1359+
+ end
1360+
+end
1361+
+
1362+
+-- If any exclusive locks were blocked, don't proceed
1363+
+if #blocked_resources > 0 then
1364+
+ return blocked_resources
1365+
+end
1366+
+
1367+
+-- Check shared resources - ensure no exclusive locks exist
1368+
+for i = num_exclusive + 1, #KEYS do
1369+
+ local key = KEYS[i]
1370+
+ local shared_resource_name = ARGV[2 + i]
1371+
+
1372+
+ -- Check if there's an exclusive lock (string value)
1373+
+ local lock_type = redis.call("type", key)
1374+
+ if lock_type["ok"] == "string" then
1375+
+ -- Exclusive lock exists on a shared resource we need
1376+
+ -- This counts as a blocked resource
1377+
+ table.insert(blocked_resources, shared_resource_name)
1378+
+ end
1379+
+end
1380+
+
1381+
+-- If any shared resources are blocked by exclusive locks, fail
1382+
+if #blocked_resources > 0 then
1383+
+ return blocked_resources
1384+
+end
1385+
+
1386+
+-- All checks passed, acquire the locks
1387+
+for i = 1, num_exclusive do
1388+
+ local key = KEYS[i]
1389+
+ redis.call("set", key, lock_owner)
1390+
+ table.insert(acquired_exclusive, key)
1391+
+end
1392+
+
1393+
+for i = num_exclusive + 1, #KEYS do
1394+
+ local key = KEYS[i]
1395+
+ redis.call("sadd", key, lock_owner)
1396+
+ table.insert(acquired_shared, key)
1397+
+end
1398+
+
1399+
+-- Return empty table to indicate success
1400+
+return {}
1401+
+"""
1402+
+
1403+
1404+
def resource_to_lock_key(resource_name):
1405+
"""
1406+
@@ -59,34 +125,114 @@ def resource_to_lock_key(resource_name):
1407+
return f"{REDIS_LOCK_PREFIX}{resource_name}"
1408+
1409+
1410+
-def release_resource_locks(redis_conn, lock_owner, resources):
1411+
+def acquire_locks(redis_conn, lock_owner, exclusive_resources, shared_resources):
1412+
"""
1413+
- Release Redis distributed locks for the given resources.
1414+
-
1415+
- Uses a Lua script to ensure we only release locks that we own.
1416+
+ Atomically try to acquire exclusive and shared locks for resources.
1417+
1418+
Args:
1419+
redis_conn: Redis connection
1420+
- lock_owner (str): The identifier of the lock owner
1421+
- resources (list): List of resource names to release locks for
1422+
+ lock_owner (str): The identifier of the lock owner (worker/task)
1423+
+ exclusive_resources (list): List of exclusive resource names
1424+
+ shared_resources (list): List of shared resource names
1425+
+
1426+
+ Returns:
1427+
+ list: Empty list if all locks acquired successfully,
1428+
+ list of blocked resource names if acquisition failed
1429+
"""
1430+
if not redis_conn:
1431+
- return
1432+
+ return []
1433+
+
1434+
+ # Sort resources deterministically to prevent deadlocks
1435+
+ exclusive_resources = sorted(exclusive_resources) if exclusive_resources else []
1436+
+ shared_resources = sorted(shared_resources) if shared_resources else []
1437+
+
1438+
+ if not exclusive_resources and not shared_resources:
1439+
+ return []
1440+
+
1441+
+ # Build KEYS list: exclusive lock keys + shared lock keys
1442+
+ keys = []
1443+
+ for resource in exclusive_resources:
1444+
+ keys.append(resource_to_lock_key(resource))
1445+
+ for resource in shared_resources:
1446+
+ keys.append(resource_to_lock_key(resource))
1447+
+
1448+
+ # Build ARGV list: lock_owner, num_exclusive, resource names (for error reporting)
1449+
+ args = [lock_owner, str(len(exclusive_resources))]
1450+
+ args.extend(exclusive_resources)
1451+
+ args.extend(shared_resources)
1452+
+
1453+
+ # Register and execute the Lua script
1454+
+ acquire_script = redis_conn.register_script(REDIS_ACQUIRE_LOCKS_SCRIPT)
1455+
+ try:
1456+
+ blocked_resources = acquire_script(keys=keys, args=args)
1457+
+ # Redis returns list of blocked resources or empty list
1458+
+ return blocked_resources if blocked_resources else []
1459+
+ except Exception as e:
1460+
+ _logger.error("Error acquiring locks: %s", e)
1461+
+ return ["error"] # Return non-empty list to indicate failure
1462+
+
1463+
+
1464+
+def release_shared_resource_locks(redis_conn, lock_owner, shared_resources):
1465+
+ """
1466+
+ Release shared resource locks by removing task from Redis sets.
1467+
1468+
- # Register the unlock script
1469+
- unlock_script = redis_conn.register_script(REDIS_UNLOCK_SCRIPT)
1470+
+ Args:
1471+
+ redis_conn: Redis connection
1472+
+ lock_owner (str): The identifier of the lock owner (task ID)
1473+
+ shared_resources (list): List of shared resource names
1474+
+ """
1475+
+ if not redis_conn or not shared_resources:
1476+
+ return
1477+
1478+
- for resource in resources:
1479+
+ for resource in shared_resources:
1480+
try:
1481+
lock_key = resource_to_lock_key(resource)
1482+
- # Use Lua script to atomically check and delete only if we own the lock
1483+
- released = unlock_script(keys=[lock_key], args=[lock_owner])
1484+
- if released:
1485+
- _logger.debug("Released lock for resource: %s", resource)
1486+
+ # Remove this task from the shared resource set
1487+
+ removed = redis_conn.srem(lock_key, lock_owner)
1488+
+ if removed:
1489+
+ _logger.debug("Released shared resource: %s", resource)
1490+
else:
1491+
- _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner)
1492+
+ _logger.warning("Shared resource %s did not contain %s", resource, lock_owner)
1493+
except Exception as e:
1494+
- _logger.error("Error releasing lock for resource %s: %s", resource, e)
1495+
+ _logger.error("Error releasing shared resource %s: %s", resource, e)
1496+
+
1497+
+
1498+
+def release_resource_locks(redis_conn, lock_owner, resources, shared_resources=None):
1499+
+ """
1500+
+ Release Redis distributed locks for exclusive and shared resources.
1501+
+
1502+
+ Uses a Lua script to ensure we only release exclusive locks that we own.
1503+
+ Removes task from shared resource sets.
1504+
+
1505+
+ Args:
1506+
+ redis_conn: Redis connection
1507+
+ lock_owner (str): The identifier of the lock owner
1508+
+ resources (list): List of exclusive resource names to release locks for
1509+
+ shared_resources (list): Optional list of shared resource names
1510+
+ """
1511+
+ if not redis_conn:
1512+
+ return
1513+
+
1514+
+ # Release exclusive locks
1515+
+ if resources:
1516+
+ # Register the unlock script
1517+
+ unlock_script = redis_conn.register_script(REDIS_UNLOCK_SCRIPT)
1518+
+
1519+
+ for resource in resources:
1520+
+ try:
1521+
+ lock_key = resource_to_lock_key(resource)
1522+
+ # Use Lua script to atomically check and delete only if we own the lock
1523+
+ released = unlock_script(keys=[lock_key], args=[lock_owner])
1524+
+ if released:
1525+
+ _logger.debug("Released exclusive lock for resource: %s", resource)
1526+
+ else:
1527+
+ _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner)
1528+
+ except Exception as e:
1529+
+ _logger.error("Error releasing lock for resource %s: %s", resource, e)
1530+
+
1531+
+ # Release shared resources
1532+
+ if shared_resources:
1533+
+ release_shared_resource_locks(redis_conn, lock_owner, shared_resources)
1534+
1535+
1536+
async def async_release_resource_locks(redis_conn, lock_owner, resources):
1537+
--
1538+
2.52.0
1539+

0 commit comments

Comments
 (0)