Skip to content

Commit 46066f7

Browse files
authored
Add nx and xx flags to save() for conditional persistence (#746)
* Add nx and xx flags to save() for conditional persistence Adds support for Redis NX (only set if not exists) and XX (only set if exists) semantics to the save() method on both HashModel and JsonModel. - save(nx=True): Only saves if the key does NOT exist (insert-only) - save(xx=True): Only saves if the key already exists (update-only) - Returns None if the condition is not met, the model instance otherwise For JsonModel, this uses JSON.SET's native NX/XX support. For HashModel, this uses an EXISTS check before HSET. Closes #703 * Add pipeline tests for nx/xx and prevent HashModel nx/xx with pipeline - HashModel raises ValueError when nx/xx used with pipeline (not atomic) - JsonModel supports nx/xx with pipeline (JSON.SET is atomic) - Added test coverage for these behaviors * fix docs GHA workflow syntax
1 parent 4c37519 commit 46066f7

4 files changed

Lines changed: 301 additions & 13 deletions

File tree

.github/workflows/docs-pages.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Docs: Build and deploy MkDocs site
1+
name: Docs Build and deploy MkDocs site
22

33
on:
44
push:

aredis_om/model/model.py

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2378,8 +2378,25 @@ async def update(self, **field_values):
23782378
raise NotImplementedError
23792379

23802380
async def save(
2381-
self: "Model", pipeline: Optional[redis.client.Pipeline] = None
2382-
) -> "Model":
2381+
self: "Model",
2382+
pipeline: Optional[redis.client.Pipeline] = None,
2383+
nx: bool = False,
2384+
xx: bool = False,
2385+
) -> Optional["Model"]:
2386+
"""Save the model instance to Redis.
2387+
2388+
Args:
2389+
pipeline: Optional Redis pipeline for batching operations.
2390+
nx: If True, only save if the key does NOT exist (insert-only).
2391+
xx: If True, only save if the key already exists (update-only).
2392+
2393+
Returns:
2394+
The model instance if saved successfully, None if nx/xx condition
2395+
was not met.
2396+
2397+
Raises:
2398+
ValueError: If both nx and xx are True.
2399+
"""
23832400
raise NotImplementedError
23842401

23852402
async def expire(
@@ -2615,8 +2632,19 @@ def __init_subclass__(cls, **kwargs):
26152632
)
26162633

26172634
async def save(
2618-
self: "Model", pipeline: Optional[redis.client.Pipeline] = None
2619-
) -> "Model":
2635+
self: "Model",
2636+
pipeline: Optional[redis.client.Pipeline] = None,
2637+
nx: bool = False,
2638+
xx: bool = False,
2639+
) -> Optional["Model"]:
2640+
if nx and xx:
2641+
raise ValueError("Cannot specify both nx and xx")
2642+
if pipeline and (nx or xx):
2643+
raise ValueError(
2644+
"Cannot use nx or xx with pipeline for HashModel. "
2645+
"Use JsonModel if you need conditional saves with pipelines."
2646+
)
2647+
26202648
self.check()
26212649
db = self._get_db(pipeline)
26222650

@@ -2636,20 +2664,33 @@ async def save(
26362664
for k, v in document.items()
26372665
}
26382666

2667+
key = self.key()
2668+
2669+
async def _do_save(conn):
2670+
# Check nx/xx conditions (HSET doesn't support these natively)
2671+
if nx or xx:
2672+
exists = await conn.exists(key)
2673+
if nx and exists:
2674+
return None # Key exists, nx means don't overwrite
2675+
if xx and not exists:
2676+
return None # Key doesn't exist, xx means only update existing
2677+
2678+
await conn.hset(key, mapping=document)
2679+
return self
2680+
26392681
# TODO: Wrap any Redis response errors in a custom exception?
26402682
try:
2641-
await db.hset(self.key(), mapping=document)
2683+
return await _do_save(db)
26422684
except RuntimeError as e:
26432685
if "Event loop is closed" in str(e):
26442686
# Connection is bound to closed event loop, refresh it and retry
26452687
from ..connections import get_redis_connection
26462688

26472689
self.__class__._meta.database = get_redis_connection()
26482690
db = self._get_db(pipeline)
2649-
await db.hset(self.key(), mapping=document)
2691+
return await _do_save(db)
26502692
else:
26512693
raise
2652-
return self
26532694

26542695
@classmethod
26552696
async def all_pks(cls): # type: ignore
@@ -2835,8 +2876,14 @@ def __init__(self, *args, **kwargs):
28352876
super().__init__(*args, **kwargs)
28362877

28372878
async def save(
2838-
self: "Model", pipeline: Optional[redis.client.Pipeline] = None
2839-
) -> "Model":
2879+
self: "Model",
2880+
pipeline: Optional[redis.client.Pipeline] = None,
2881+
nx: bool = False,
2882+
xx: bool = False,
2883+
) -> Optional["Model"]:
2884+
if nx and xx:
2885+
raise ValueError("Cannot specify both nx and xx")
2886+
28402887
self.check()
28412888
db = self._get_db(pipeline)
28422889

@@ -2847,20 +2894,30 @@ async def save(
28472894
# Apply JSON encoding for complex types (Enums, UUIDs, Sets, etc.)
28482895
data = jsonable_encoder(data)
28492896

2897+
key = self.key()
2898+
path = Path.root_path()
2899+
2900+
async def _do_save(conn):
2901+
# JSON.SET supports nx and xx natively
2902+
result = await conn.json().set(key, path, data, nx=nx, xx=xx)
2903+
# JSON.SET returns None if nx/xx condition not met, "OK" otherwise
2904+
if result is None:
2905+
return None
2906+
return self
2907+
28502908
# TODO: Wrap response errors in a custom exception?
28512909
try:
2852-
await db.json().set(self.key(), Path.root_path(), data)
2910+
return await _do_save(db)
28532911
except RuntimeError as e:
28542912
if "Event loop is closed" in str(e):
28552913
# Connection is bound to closed event loop, refresh it and retry
28562914
from ..connections import get_redis_connection
28572915

28582916
self.__class__._meta.database = get_redis_connection()
28592917
db = self._get_db(pipeline)
2860-
await db.json().set(self.key(), Path.root_path(), data)
2918+
return await _do_save(db)
28612919
else:
28622920
raise
2863-
return self
28642921

28652922
@classmethod
28662923
async def all_pks(cls): # type: ignore

tests/test_hash_model.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,3 +1357,113 @@ class Meta:
13571357

13581358
assert len(rematerialized) == 1
13591359
assert rematerialized[0].pk == loc1.pk
1360+
1361+
1362+
@py_test_mark_asyncio
1363+
async def test_save_nx_only_saves_if_not_exists(m):
1364+
"""Test that save(nx=True) only saves if the key doesn't exist."""
1365+
await Migrator().run()
1366+
1367+
member = m.Member(
1368+
id=1000,
1369+
first_name="Andrew",
1370+
last_name="Brookins",
1371+
email="a@example.com",
1372+
join_date=today,
1373+
age=38,
1374+
bio="Original bio",
1375+
)
1376+
1377+
# First save should succeed with nx=True
1378+
result = await member.save(nx=True)
1379+
assert result is not None
1380+
assert result.pk == member.pk
1381+
1382+
# Second save with same pk should return None (key exists)
1383+
member2 = m.Member(
1384+
id=1000,
1385+
first_name="Different",
1386+
last_name="Name",
1387+
email="b@example.com",
1388+
join_date=today,
1389+
age=25,
1390+
bio="Different bio",
1391+
)
1392+
result = await member2.save(nx=True)
1393+
assert result is None
1394+
1395+
# Verify original data is unchanged
1396+
fetched = await m.Member.get(member.id)
1397+
assert fetched.first_name == "Andrew"
1398+
1399+
1400+
@py_test_mark_asyncio
1401+
async def test_save_xx_only_saves_if_exists(m):
1402+
"""Test that save(xx=True) only saves if the key already exists."""
1403+
await Migrator().run()
1404+
1405+
member = m.Member(
1406+
id=2000,
1407+
first_name="Andrew",
1408+
last_name="Brookins",
1409+
email="a@example.com",
1410+
join_date=today,
1411+
age=38,
1412+
bio="Original bio",
1413+
)
1414+
1415+
# First save with xx=True should return None (key doesn't exist)
1416+
result = await member.save(xx=True)
1417+
assert result is None
1418+
1419+
# Save without flags to create the key
1420+
await member.save()
1421+
1422+
# Now update with xx=True should succeed
1423+
member.first_name = "Updated"
1424+
result = await member.save(xx=True)
1425+
assert result is not None
1426+
1427+
# Verify data was updated
1428+
fetched = await m.Member.get(member.id)
1429+
assert fetched.first_name == "Updated"
1430+
1431+
1432+
@py_test_mark_asyncio
1433+
async def test_save_nx_xx_mutually_exclusive(m):
1434+
"""Test that save() raises ValueError if both nx and xx are True."""
1435+
await Migrator().run()
1436+
1437+
member = m.Member(
1438+
id=3000,
1439+
first_name="Andrew",
1440+
last_name="Brookins",
1441+
email="a@example.com",
1442+
join_date=today,
1443+
age=38,
1444+
bio="Some bio",
1445+
)
1446+
1447+
with pytest.raises(ValueError, match="Cannot specify both nx and xx"):
1448+
await member.save(nx=True, xx=True)
1449+
1450+
1451+
@py_test_mark_asyncio
1452+
async def test_save_nx_with_pipeline_raises_error(m):
1453+
"""Test that save(nx=True) with pipeline raises an error for HashModel."""
1454+
await Migrator().run()
1455+
1456+
member = m.Member(
1457+
id=4000,
1458+
first_name="Andrew",
1459+
last_name="Brookins",
1460+
email="a@example.com",
1461+
join_date=today,
1462+
age=38,
1463+
bio="Bio 1",
1464+
)
1465+
1466+
# HashModel doesn't support nx/xx with pipeline (HSET doesn't support it natively)
1467+
async with m.Member.db().pipeline(transaction=True) as pipe:
1468+
with pytest.raises(ValueError, match="Cannot use nx or xx with pipeline"):
1469+
await member.save(pipeline=pipe, nx=True)

tests/test_json_model.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,3 +1572,124 @@ class Meta:
15721572
# Test sorting by NUMERIC field still works
15731573
results = await Product.find().sort_by("price").all()
15741574
assert results == [product3, product2, product1] # 30, 50, 100
1575+
1576+
1577+
@py_test_mark_asyncio
1578+
async def test_save_nx_only_saves_if_not_exists(m, address):
1579+
"""Test that save(nx=True) only saves if the key doesn't exist."""
1580+
await Migrator().run()
1581+
1582+
member = m.Member(
1583+
first_name="Andrew",
1584+
last_name="Brookins",
1585+
email="a@example.com",
1586+
join_date=today,
1587+
age=38,
1588+
address=address,
1589+
)
1590+
1591+
# First save should succeed with nx=True
1592+
result = await member.save(nx=True)
1593+
assert result is not None
1594+
assert result.pk == member.pk
1595+
1596+
# Second save with same pk should return None (key exists)
1597+
member2 = m.Member(
1598+
pk=member.pk,
1599+
first_name="Different",
1600+
last_name="Name",
1601+
email="b@example.com",
1602+
join_date=today,
1603+
age=25,
1604+
address=address,
1605+
)
1606+
result = await member2.save(nx=True)
1607+
assert result is None
1608+
1609+
# Verify original data is unchanged
1610+
fetched = await m.Member.get(member.pk)
1611+
assert fetched.first_name == "Andrew"
1612+
1613+
1614+
@py_test_mark_asyncio
1615+
async def test_save_xx_only_saves_if_exists(m, address):
1616+
"""Test that save(xx=True) only saves if the key already exists."""
1617+
await Migrator().run()
1618+
1619+
member = m.Member(
1620+
first_name="Andrew",
1621+
last_name="Brookins",
1622+
email="a@example.com",
1623+
join_date=today,
1624+
age=38,
1625+
address=address,
1626+
)
1627+
1628+
# First save with xx=True should return None (key doesn't exist)
1629+
result = await member.save(xx=True)
1630+
assert result is None
1631+
1632+
# Save without flags to create the key
1633+
await member.save()
1634+
1635+
# Now update with xx=True should succeed
1636+
member.first_name = "Updated"
1637+
result = await member.save(xx=True)
1638+
assert result is not None
1639+
1640+
# Verify data was updated
1641+
fetched = await m.Member.get(member.pk)
1642+
assert fetched.first_name == "Updated"
1643+
1644+
1645+
@py_test_mark_asyncio
1646+
async def test_save_nx_xx_mutually_exclusive(m, address):
1647+
"""Test that save() raises ValueError if both nx and xx are True."""
1648+
await Migrator().run()
1649+
1650+
member = m.Member(
1651+
first_name="Andrew",
1652+
last_name="Brookins",
1653+
email="a@example.com",
1654+
join_date=today,
1655+
age=38,
1656+
address=address,
1657+
)
1658+
1659+
with pytest.raises(ValueError, match="Cannot specify both nx and xx"):
1660+
await member.save(nx=True, xx=True)
1661+
1662+
1663+
@py_test_mark_asyncio
1664+
async def test_save_nx_with_pipeline(m, address):
1665+
"""Test that save(nx=True) works with pipeline."""
1666+
await Migrator().run()
1667+
1668+
member1 = m.Member(
1669+
first_name="Andrew",
1670+
last_name="Brookins",
1671+
email="a@example.com",
1672+
join_date=today,
1673+
age=38,
1674+
address=address,
1675+
)
1676+
member2 = m.Member(
1677+
first_name="Kim",
1678+
last_name="Brookins",
1679+
email="k@example.com",
1680+
join_date=today,
1681+
age=34,
1682+
address=address,
1683+
)
1684+
1685+
# Save both with nx=True via pipeline
1686+
async with m.Member.db().pipeline(transaction=True) as pipe:
1687+
await member1.save(pipeline=pipe, nx=True)
1688+
await member2.save(pipeline=pipe, nx=True)
1689+
await pipe.execute()
1690+
1691+
# Verify both were saved
1692+
fetched1 = await m.Member.get(member1.pk)
1693+
fetched2 = await m.Member.get(member2.pk)
1694+
assert fetched1.first_name == "Andrew"
1695+
assert fetched2.first_name == "Kim"

0 commit comments

Comments
 (0)