99import redis
1010import semver
1111
12+ from version import VERSION
13+
1214
1315class CacheRepositoryProtocol (Protocol ):
1416 def set (
1517 self ,
1618 key : str ,
1719 value : str ,
1820 ttl : int ,
19- ) -> None :
20- ...
21+ ) -> None : ...
2122
22- def set_many (self , data : Sequence [Tuple [str , str , int ]]) -> None :
23- ...
23+ def set_many (self , data : Sequence [Tuple [str , str , int ]]) -> None : ...
2424
25- def get (self , key : str ) -> Optional [str ]:
26- ...
25+ def get (self , key : str ) -> Optional [str ]: ...
2726
28- def get_many (self , keys : Sequence [str ]) -> Dict [str , Optional [str ]]:
29- ...
27+ def get_many (self , keys : Sequence [str ]) -> Dict [str , Optional [str ]]: ...
3028
31- def get_all (self ) -> Dict [str , str ]:
32- ...
29+ def get_all (self ) -> Dict [str , str ]: ...
3330
34- def delete (self , key : str ) -> None :
35- ...
31+ def delete (self , key : str ) -> None : ...
3632
37- def delete_many (self , keys : Sequence [str ]) -> None :
38- ...
33+ def delete_many (self , keys : Sequence [str ]) -> None : ...
3934
40- def delete_all (self ) -> None :
41- ...
35+ def delete_all (self ) -> None : ...
4236
4337
4438class RedisRepository :
45-
4639 def __init__ (self , hash_name : str , client : redis .Redis ):
4740 self .hash_name = hash_name
4841 self .client = client
@@ -85,45 +78,49 @@ def delete_all(self) -> None:
8578
8679class HashMigrator :
8780 MINIMUM_ALLOWED_REDIS_SERVER = semver .Version (major = 7 , minor = 4 , patch = 0 )
81+ NEW_HASH_PREFIX = "/new"
8882
8983 def __init__ (self , hash_name : str , client : redis .Redis ):
9084 self .hash_name = hash_name
91- self .zset_name = f' { hash_name } .EXPIREAT'
85+ self .zset_name = f" { hash_name } .EXPIREAT"
9286 self .client = client
9387
94- def _proper_redis_server_version (self ) -> bool :
88+ def check_redis_server_version (self ) -> None :
9589 # Require Redis 7.4+ for per-field TTL commands
96- redis_version_str = self .client .info (section = "server" ).get ("redis_version" )
97-
98- if not redis_version_str :
99- return False
100-
90+ redis_version_str = self .client .info (section = "server" )["redis_version" ]
10191 server_version = semver .Version .parse (version = redis_version_str )
10292
10393 if server_version < self .MINIMUM_ALLOWED_REDIS_SERVER :
104- return False
105-
106- return True
94+ raise RuntimeError (
95+ f"Redis server version { server_version } "
96+ f"less then { self .MINIMUM_ALLOWED_REDIS_SERVER } -> "
97+ f"incompatible with used python SDK version `{ VERSION } `" )
10798
10899 def run (self ) -> bool :
109- """Migrate from old Lua+ZSET per-field TTL to Redis built-in per-field TTL .
100+ """Prepare parallel new-style cache while keeping legacy structures .
110101
111- Safe to call multiple times. Behavior:
112- - If the legacy ZSET ("<hash>.EXPIREAT") does not exist → return False.
113- - Requires Redis server version >= 7.4.0 (built-in per-field hash TTLs).
102+ Behavior (idempotent):
103+ - If legacy ZSET ("<hash>.EXPIREAT") does not exist → return False.
104+ - Requires Redis server >= 7.4.0 for per-field hash TTL commands.
105+ - Creates a new hash key with prefix NEW_HASH_PREFIX + hash_name.
114106 - For each zset member (field → absolute ms deadline):
115- • Past deadline → HDEL the field.
116- • Future deadline → set per-field TTL via HPEXPIRE (milliseconds).
117- - After processing completes: PERSIST the hash and DELETE the legacy ZSET.
107+ • Past deadline → do not copy field to new hash
108+ • Future deadline → copy current value from legacy hash to new hash and
109+ set per-field TTL via HPEXPIRE (milliseconds) on the new hash.
110+ - Legacy hash and legacy ZSET are preserved intact to allow rollback.
118111
119- Returns True if migration was attempted on a supported server (i.e., legacy
120- ZSET existed); otherwise False.
112+ Returns True if the new-style cache was created during this run
121113 """
114+ self .check_redis_server_version ()
115+
122116 # Legacy structure must exist; otherwise nothing to do
123117 if not self .client .exists (self .zset_name ):
124118 return False
125119
126- if not self ._proper_redis_server_version ():
120+ new_hash_name = self .NEW_HASH_PREFIX + self .hash_name
121+
122+ # If new hash already exists, consider migration already done
123+ if self .client .exists (new_hash_name ):
127124 return False
128125
129126 from corva import Logger
@@ -132,27 +129,35 @@ def run(self) -> bool:
132129 sec , micro = self .client .time ()
133130 now_ms = int (sec ) * 1000 + int (micro ) // 1000
134131
135- # Queue all operations in a single pipeline and execute once
132+ # Create pipeline for batched ops on the NEW hash
136133 pipe = self .client .pipeline ()
137134
135+ # Ensure the new hash key exists
136+ # Copy fields from old hash into the new one based on ZSET deadlines
138137 for field , score in self .client .zscan_iter (self .zset_name ):
139138 # score is the absolute deadline in ms
140139 deadline_ms = int (float (score ))
141- ttl_ms = deadline_ms - now_ms
142- if ttl_ms <= 0 :
143- pipe .hdel (self .hash_name , field )
144- else :
145- pipe .execute_command (
146- "HPEXPIRE" , self .hash_name , ttl_ms , "FIELDS" , 1 , field
147- )
140+ remaining_ttl_ms = deadline_ms - now_ms
141+ if remaining_ttl_ms <= 0 :
142+ continue
143+
144+ value = self .client .hget (self .hash_name , field )
145+ if value is None :
146+ # No value to copy (may have been removed already)
147+ continue
148+
149+ # Write to the new hash and apply per-field TTL there
150+ pipe .hset (new_hash_name , field , value )
151+ pipe .execute_command (
152+ "HPEXPIRE" , new_hash_name , remaining_ttl_ms , "FIELDS" , 1 , field
153+ )
148154
149155 # Execute queued field operations (no-op if nothing queued)
150156 pipe .execute ()
151157
152- # Remove key-level TTL and legacy ZSET now that fields are migrated
153- self .client .persist (self .hash_name )
154- self .client .delete (self .zset_name )
155-
156- # Migration was attempted because legacy ZSET existed
157- Logger .info (f"Migration success: hash_name = '{ self .hash_name } '" )
158+ # Do NOT modify/persist legacy structures; keep them for rollback
159+ Logger .info (
160+ f"Migration prepared parallel cache: legacy='{ self .hash_name } ', "
161+ f"new='{ new_hash_name } '"
162+ )
158163 return True
0 commit comments