Skip to content

Commit 4df6f04

Browse files
committed
feat(DEVC-1286): add migration logic with tests
1 parent e3dd0b4 commit 4df6f04

5 files changed

Lines changed: 203 additions & 3 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ dependencies = [
1919
"pydantic-settings >=2.0, <3.0",
2020
"redis == 6.2.0",
2121
"requests >=2.32.3, <3.0.0",
22-
"urllib3 == 2.5.0"
22+
"urllib3 == 2.5.0",
23+
"semver==3.0.4"
2324
]
2425
classifiers = [
2526
"Development Status :: 5 - Production/Stable",

src/corva/cache_adapter.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88

99
import redis
10+
import semver
1011

1112

1213
class CacheRepositoryProtocol(Protocol):
@@ -81,3 +82,70 @@ def delete_many(self, keys: Sequence[str]) -> None:
8182
def delete_all(self) -> None:
8283
self.client.delete(self.hash_name)
8384

85+
86+
class HashMigrator:
87+
MINIMUM_ALLOWED_REDIS_SERVER = semver.Version(major=7, minor=4, patch=0)
88+
89+
def __init__(self, hash_name: str, client: redis.Redis):
90+
self.hash_name = hash_name
91+
self.zset_name = f'{hash_name}.EXPIREAT'
92+
self.client = client
93+
94+
def run(self) -> bool:
95+
"""Migrate from old Lua+ZSET per-field TTL to Redis built-in per-field TTL.
96+
97+
Safe to call multiple times. Behavior:
98+
- If the legacy ZSET ("<hash>.EXPIREAT") does not exist → return False.
99+
- Requires Redis server version >= 7.4.0 (built-in per-field hash TTLs).
100+
- For each zset member (field → absolute ms deadline):
101+
• Past deadline → HDEL the field.
102+
• Future deadline → set per-field TTL via HPEXPIRE (milliseconds).
103+
- After processing completes: PERSIST the hash and DELETE the legacy ZSET.
104+
105+
Returns True if migration was attempted on a supported server (i.e., legacy
106+
ZSET existed); otherwise False.
107+
"""
108+
# Legacy structure must exist; otherwise nothing to do
109+
if not self.client.exists(self.zset_name):
110+
return False
111+
112+
# Require Redis 7.4+ for per-field TTL commands
113+
redis_version_str = self.client.info(section="server").get("redis_version")
114+
if not redis_version_str:
115+
return False
116+
117+
server_version = semver.Version.parse(version=redis_version_str)
118+
119+
if server_version < self.MINIMUM_ALLOWED_REDIS_SERVER:
120+
return False
121+
122+
from corva import Logger
123+
124+
# Current server time in ms
125+
sec, micro = self.client.time()
126+
now_ms = int(sec) * 1000 + int(micro) // 1000
127+
128+
# Queue all operations in a single pipeline and execute once
129+
pipe = self.client.pipeline()
130+
131+
for field, score in self.client.zscan_iter(self.zset_name):
132+
# score is the absolute deadline in ms
133+
deadline_ms = int(float(score))
134+
ttl_ms = deadline_ms - now_ms
135+
if ttl_ms <= 0:
136+
pipe.hdel(self.hash_name, field)
137+
else:
138+
pipe.execute_command(
139+
"HPEXPIRE", self.hash_name, ttl_ms, "FIELDS", 1, field
140+
)
141+
142+
# Execute queued field operations (no-op if nothing queued)
143+
pipe.execute()
144+
145+
# Remove key-level TTL and legacy ZSET now that fields are migrated
146+
self.client.persist(self.hash_name)
147+
self.client.delete(self.zset_name)
148+
149+
# Migration was attempted because legacy ZSET existed
150+
Logger.info(f"Migration success: hash_name = '{self.hash_name}'")
151+
return True

src/corva/service/cache_sdk.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import redis
1414

1515
from corva import cache_adapter
16+
from corva.cache_adapter import HashMigrator
1617

1718

1819
class UserCacheSdkProtocol(Protocol):
@@ -74,6 +75,12 @@ def __init__(
7475
client=cast(redis.Redis, redis_client),
7576
)
7677

78+
try:
79+
migrator = HashMigrator(hash_name, redis_client)
80+
migrator.run()
81+
except Exception:
82+
pass
83+
7784
def set(self, key: str, value: str, ttl: int = SIXTY_DAYS) -> None:
7885
self.cache_repo.set(key=key, value=value, ttl=ttl)
7986

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111

1212
@pytest.fixture(scope='function', autouse=True)
1313
def clean_real_redis():
14-
redis_client = Redis.from_url(url=SETTINGS.CACHE_URL)
14+
redis_client = Redis.from_url(url=SETTINGS.CACHE_URL, decode_responses=True)
1515

1616
redis_client.flushall()
1717

18-
yield
18+
yield redis_client
1919

2020
redis_client.flushall()
2121

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import time
2+
from typing import Dict
3+
4+
import pytest
5+
6+
from corva.cache_adapter import HashMigrator
7+
8+
9+
def _now_ms(redis_client) -> int:
10+
sec, micro = redis_client.time()
11+
return int(sec) * 1000 + int(micro) // 1000
12+
13+
14+
def _zset_name(hash_name: str) -> str:
15+
return f"{hash_name}.EXPIREAT"
16+
17+
18+
@pytest.mark.integration
19+
def test_migrate_returns_false_when_no_legacy_zset_present(clean_real_redis):
20+
hash_name = "test:migration:nozset:" + str(time.time_ns())
21+
zset = _zset_name(hash_name)
22+
23+
clean_real_redis.delete(hash_name, zset)
24+
clean_real_redis.hset(hash_name, mapping={"a": "1"})
25+
26+
migrator = HashMigrator(hash_name=hash_name, client=clean_real_redis)
27+
28+
# No legacy zset exists -> migration should be a no-op and return False
29+
assert clean_real_redis.exists(zset) == 0
30+
migrated = migrator.run()
31+
assert not migrated
32+
assert clean_real_redis.hget(hash_name, "a") == "1"
33+
34+
35+
@pytest.mark.integration
36+
def test_migrate_converts_fields_and_cleans_legacy_structures(clean_real_redis):
37+
hash_name = "test:migration:convert:" + str(time.time_ns())
38+
zset = _zset_name(hash_name)
39+
40+
clean_real_redis.delete(hash_name, zset)
41+
42+
# Prepopulate legacy hash + per-field expirations in zset (absolute ms deadlines)
43+
clean_real_redis.hset(hash_name, mapping={"f1": "v1", "f2": "v2"})
44+
now = _now_ms(clean_real_redis)
45+
future_deadline = now + 2000 # 2 seconds in the future
46+
past_deadline = now - 100 # already expired
47+
48+
clean_real_redis.zadd(zset, mapping={"f1": future_deadline, "f2": past_deadline})
49+
# Simulate old behavior of setting key-level expiry as backstop (optional)
50+
clean_real_redis.pexpireat(hash_name, future_deadline)
51+
clean_real_redis.pexpireat(zset, future_deadline)
52+
53+
migrator = HashMigrator(hash_name=hash_name, client=clean_real_redis)
54+
55+
migrated = migrator.run()
56+
assert migrated
57+
58+
# zset removed and key-level TTL removed (persisted)
59+
assert clean_real_redis.exists(zset) == 0
60+
assert clean_real_redis.ttl(hash_name) == -1
61+
62+
# f1 remains and has a per-field TTL
63+
assert clean_real_redis.hget(hash_name, "f1") == "v1"
64+
f1_hpttl = clean_real_redis.execute_command("HPTTL", hash_name, "FIELDS", 1, "f1")
65+
if isinstance(f1_hpttl, list):
66+
f1_hpttl = f1_hpttl[0]
67+
assert isinstance(f1_hpttl, int) and f1_hpttl > 0
68+
69+
# f2 was expired -> should be deleted from hash
70+
assert clean_real_redis.hget(hash_name, "f2") is None
71+
72+
73+
@pytest.mark.integration
74+
def test_migrate_is_idempotent(clean_real_redis):
75+
hash_name = "test:migration:idempotent:" + str(time.time_ns())
76+
zset = _zset_name(hash_name)
77+
78+
clean_real_redis.delete(hash_name, zset)
79+
80+
clean_real_redis.hset(hash_name, mapping={"k": "v"})
81+
now = _now_ms(clean_real_redis)
82+
clean_real_redis.zadd(zset, mapping={"k": now + 5000})
83+
84+
migrator = HashMigrator(hash_name, clean_real_redis)
85+
migrated = migrator.run()
86+
assert migrated
87+
# Run again – zset is gone so it should be a no-op returning False
88+
migrated = migrator.run()
89+
assert not migrated
90+
91+
92+
@pytest.mark.integration
93+
def test_migrate_large_batch_processes_all_fields(clean_real_redis):
94+
hash_name = "test:migration:large:" + str(time.time_ns())
95+
zset = _zset_name(hash_name)
96+
97+
clean_real_redis.delete(hash_name, zset)
98+
99+
# Create 300 fields to exceed default batch=256
100+
mapping: Dict[str, str] = {f"k{i}": f"v{i}" for i in range(300)}
101+
clean_real_redis.hset(hash_name, mapping=mapping)
102+
103+
now = _now_ms(clean_real_redis)
104+
future = now + 60_000
105+
zmap = {f"k{i}": future for i in range(300)}
106+
clean_real_redis.zadd(zset, mapping=zmap)
107+
108+
migrator = HashMigrator(hash_name, clean_real_redis)
109+
110+
assert migrator.run() is True
111+
112+
# All fields should still be present
113+
assert clean_real_redis.hlen(hash_name) == 300
114+
115+
# zset removed, hash persisted
116+
assert clean_real_redis.exists(zset) == 0
117+
assert clean_real_redis.ttl(hash_name) == -1
118+
119+
# Sample a few fields to ensure per-field TTLs were applied
120+
for key in ["k0", "k128", "k256"]:
121+
hpttl = clean_real_redis.execute_command("HPTTL", hash_name, "FIELDS", 1, key)
122+
if isinstance(hpttl, list):
123+
hpttl = hpttl[0]
124+
assert isinstance(hpttl, int) and hpttl > 0

0 commit comments

Comments
 (0)