Skip to content

Commit 83b6a87

Browse files
committed
feat(DEVC-1286): init changes
- upgrade redis version for test server - bump bunch of related third party dependencies: - fakeredis[lua] -> removed prefix LUA - redis == 6.2.0 latest supported with Python3.9+ - types-redis~=4.6.0 - removed: - all LUA bindings from RedisRepository - use_lua_52 - vacuum(...) and dependant codes - InternalCacheSdkProtocol - InternalRedisSdk - FakeInternalCacheSdk
1 parent 75a363e commit 83b6a87

9 files changed

Lines changed: 76 additions & 357 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ up-cache:
112112
-d \
113113
--name python-sdk-redis \
114114
-p 6379:6379 \
115-
redis:6.0.9 # apps use 6.0.9 or 6.2.3
115+
redis:7.4.3 # The same as on PROD, at least at the 15.09.2025 moment
116116

117117
# down-cache: Stop Redis.
118118
.PHONY: down-cache

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ authors = [
1414
keywords = ["corva", "sdk"]
1515
requires-python = ">=3.9,<4.0"
1616
dependencies = [
17-
"fakeredis[lua] >=2.26.2, <2.30.0",
17+
"fakeredis >=2.30.0, <2.32.0",
1818
"pydantic >= 2.0, <3.0",
1919
"pydantic-settings >=2.0, <3.0",
20-
"redis >=5.2.1, <6.0.0",
20+
"redis == 6.2.0",
2121
"requests >=2.32.3, <3.0.0",
22-
"urllib3==2.5.0"
22+
"urllib3 == 2.5.0"
2323
]
2424
classifiers = [
2525
"Development Status :: 5 - Production/Stable",
@@ -40,7 +40,7 @@ dev = [
4040
"ruff==0.12.11",
4141
"mypy>=1.10,<2",
4242
"types-freezegun~=1.1.9",
43-
"types-redis~=4.2.4",
43+
"types-redis~=4.6.0",
4444
"types-requests~=2.27.27",
4545
"coverage==7.6.1",
4646
"freezegun==1.5.1",

src/corva/cache_adapter.py

Lines changed: 19 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import itertools
21
from datetime import timedelta
32
from typing import (
43
Dict,
@@ -8,7 +7,6 @@
87
Sequence,
98
Tuple,
109
Union,
11-
cast,
1210
overload,
1311
)
1412

@@ -47,206 +45,45 @@ def delete_all(self) -> None:
4745

4846

4947
class RedisRepository:
50-
# Deletes M expired keys from the hash.
51-
#
52-
# Complexity: O(log(N) + M) + O(M) + O(M * log(N)) where N is the size of hash
53-
# and M is the number of elements deleted. For constant M (e.g., 3) the complexity
54-
# is O(log(N)).
55-
#
56-
# KEYS:
57-
# hash_name.
58-
# zset_name.
59-
#
60-
# ARGV:
61-
# delete_count.
62-
#
63-
# Returns: nil.
64-
LUA_VACUUM_SCRIPT = """
65-
local hash_name = KEYS[1]
66-
local zset_name = KEYS[2]
67-
local delete_count = tonumber(ARGV[1])
68-
local time = redis.call('TIME')
69-
local pnow = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000)
70-
71-
local keys_to_delete = redis.call(
72-
'ZRANGEBYSCORE', zset_name, '-inf', pnow, 'LIMIT', 0, delete_count
73-
)
74-
75-
if not next(keys_to_delete) then
76-
return
77-
end
78-
79-
redis.call('HDEL', hash_name, unpack(keys_to_delete))
80-
redis.call('ZREM', zset_name, unpack(keys_to_delete))
81-
"""
82-
83-
# Gets either all or only requested keys and values from hash.
84-
#
85-
# Gets all keys if no keys specified in ARGV.
86-
#
87-
# Complexity: O(N) where N is the number of requested keys.
88-
#
89-
# KEYS:
90-
# hash_name.
91-
# zset_name.
92-
#
93-
# Optional ARGV:
94-
# key1.
95-
# key2.
96-
# ...
97-
#
98-
# Returns: list of names and values for non-expired keys.
99-
LUA_GET_SCRIPT = """
100-
local hash_name = KEYS[1]
101-
local zset_name = KEYS[2]
102-
local time = redis.call('TIME')
103-
local pnow = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000)
104-
105-
local keys = ARGV
106-
if next(ARGV) == nil then
107-
keys = redis.call('HKEYS', hash_name)
108-
end
109-
110-
if not next(keys) then
111-
return {}
112-
end
113-
114-
local hash = redis.call('HMGET', hash_name, unpack(keys))
115-
116-
local result = {}
117-
118-
for i, key in ipairs(keys) do
119-
local pexpireat = redis.call('ZSCORE', zset_name, key)
120-
121-
if not pexpireat or pnow < tonumber(pexpireat) then
122-
table.insert(result, key)
123-
table.insert(result, hash[i])
124-
end
125-
end
126-
127-
return result
128-
"""
129-
130-
# Inserts list of keys-value-expiration tuples into the hash.
131-
#
132-
# Complexity: O(N * log(M)), where N is the number of inserted elements and
133-
# M is the hash size.
134-
#
135-
# 1. If hash does not exist, it will automatically create one.
136-
# 2. If the field already exists, its value and ttl will be overwritten.
137-
# 3. Hash and zset ttl is always set to the biggest field's ttl.
138-
# 4. When the field expires it may be deleted by:
139-
# - Manually invoking `vacuum` script.
140-
# - Redis automatically deleting expired hash (see note #3).
141-
#
142-
# KEYS:
143-
# hash_name.
144-
# zset_name.
145-
#
146-
# ARGV:
147-
# key.
148-
# value.
149-
# ttl.
150-
# ...
151-
#
152-
# Returns: nil.
153-
LUA_SET_SCRIPT = """
154-
local hash_name = KEYS[1]
155-
local zset_name = KEYS[2]
156-
local time = redis.call('TIME')
157-
158-
for i, _ in ipairs(ARGV) do
159-
if i % 3 == 1 then
160-
local key = ARGV[i]
161-
local value = ARGV[i + 1]
162-
local ttl = ARGV[i + 2]
163-
local pexpireat = (
164-
(tonumber(time[1]) + ttl) * 1000 + math.floor(tonumber(time[2]) / 1000)
165-
)
166-
167-
redis.call('HSET', hash_name, key, value)
168-
redis.call('ZADD', zset_name, pexpireat, key)
169-
170-
end
171-
end
172-
173-
local max_pexpireat = tonumber(redis.call(
174-
'ZREVRANGEBYSCORE', zset_name, '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1
175-
)[2])
176-
177-
redis.call('PEXPIREAT', hash_name, max_pexpireat)
178-
redis.call('PEXPIREAT', zset_name, max_pexpireat)
179-
"""
180-
181-
# Deletes all data from hash and zset.
182-
#
183-
# Complexity: O(N) where N is the hash size.
184-
#
185-
# KEYS:
186-
# hash_name.
187-
# zset_name.
188-
#
189-
# Returns: nil.
190-
LUA_DELETE_ALL_SCRIPT = """
191-
local hash_name = KEYS[1]
192-
local zset_name = KEYS[2]
193-
194-
redis.call('DEL', hash_name, zset_name)
195-
"""
19648

197-
def __init__(self, hash_name: str, client: redis.Redis, use_lua_52: bool = False):
49+
def __init__(self, hash_name: str, client: redis.Redis):
19850
self.hash_name = hash_name
199-
self.zset_name = f'{hash_name}.EXPIREAT'
20051
self.client = client
201-
self.lua_set_many = self.client.register_script(self.LUA_SET_SCRIPT)
202-
203-
lua_get_script = self.LUA_GET_SCRIPT
204-
if use_lua_52:
205-
# Hack for tests to work with fakeredis, as it uses Lua version > 5.1.
206-
# In Lua 5.1, unpack was a global, but in 5.2 it's been moved to
207-
# table.unpack.
208-
lua_get_script = self.LUA_GET_SCRIPT.replace('unpack', 'table.unpack')
209-
self.lua_get = self.client.register_script(lua_get_script)
210-
211-
self.lua_vacuum = self.client.register_script(self.LUA_VACUUM_SCRIPT)
212-
self.lua_delete_all = self.client.register_script(self.LUA_DELETE_ALL_SCRIPT)
21352

21453
def set(self, key: str, value: str, ttl: int) -> None:
21554
self.set_many(data=[(key, value, ttl)])
21655

21756
def set_many(self, data: Sequence[Tuple[str, str, int]]) -> None:
218-
self.lua_set_many(
219-
keys=[self.hash_name, self.zset_name],
220-
args=list(itertools.chain.from_iterable(data)),
221-
)
57+
pipe = self.client.pipeline()
58+
for key, value, ttl in data:
59+
pipe.hset(self.hash_name, key, value)
60+
pipe.execute_command("HEXPIRE", self.hash_name, ttl, "FIELDS", 1, key)
61+
pipe.execute()
22262

22363
def get(self, key: str) -> Optional[str]:
224-
return self.get_many(keys=[key]).get(key)
64+
val = self.client.hget(self.hash_name, key)
65+
return None if val is None else str(val)
22566

22667
def get_many(self, keys: Sequence[str]) -> Dict[str, Optional[str]]:
227-
data = self.lua_get(keys=[self.hash_name, self.zset_name], args=list(keys))
228-
229-
data = dict(zip(data[::2], data[1::2]))
230-
231-
for missing_key in set(keys) - set(data):
232-
data[missing_key] = None
233-
234-
return data
68+
if not keys:
69+
return {}
70+
values = self.client.hmget(self.hash_name, keys)
71+
# redis-py returns a list of values where non-existent/expired are None
72+
return {k: (None if v is None else str(v)) for k, v in zip(keys, values)}
23573

23674
def get_all(self) -> Dict[str, str]:
237-
return cast(Dict[str, str], self.get_many(keys=[]))
75+
raw = self.client.hgetall(self.hash_name)
76+
return dict(raw)
23877

23978
def delete(self, key: str) -> None:
240-
self.delete_many(keys=[key])
79+
self.client.hdel(self.hash_name, key)
24180

24281
def delete_many(self, keys: Sequence[str]) -> None:
243-
self.set_many(data=[(key, '', -1) for key in keys])
82+
if keys:
83+
self.client.hdel(self.hash_name, *keys)
24484

24585
def delete_all(self) -> None:
246-
self.lua_delete_all(keys=[self.hash_name, self.zset_name])
247-
248-
def vacuum(self, delete_count: int) -> None:
249-
self.lua_vacuum(keys=[self.hash_name, self.zset_name], args=[delete_count])
86+
self.client.delete(self.hash_name)
25087

25188

25289
class DeprecatedRedisAdapter:

src/corva/handlers.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from corva.models.task import RawTaskEvent, TaskEvent, TaskStatus
3737
from corva.service import service
3838
from corva.service.api_sdk import CachingApiSdk, CorvaApiSdk
39-
from corva.service.cache_sdk import FakeInternalCacheSdk, InternalRedisSdk, UserRedisSdk
39+
from corva.service.cache_sdk import UserRedisSdk
4040
from corva.validate_app_init import validate_app_type_context
4141

4242
StreamEventT = TypeVar("StreamEventT", bound=StreamEvent)
@@ -187,9 +187,6 @@ def wrapper(
187187
user_cache_sdk = UserRedisSdk(
188188
hash_name=hash_name, redis_dsn=SETTINGS.CACHE_URL, redis_client=redis_client
189189
)
190-
internal_cache_sdk = InternalRedisSdk(
191-
hash_name=hash_name, redis_client=redis_client
192-
)
193190

194191
records = event.filter_records(
195192
old_max_record_value=event.get_cached_max_record_value(cache=user_cache_sdk)
@@ -222,7 +219,6 @@ def wrapper(
222219
api_sdk=CorvaApiSdk(api_adapter=api),
223220
ttl=SETTINGS.SECRETS_CACHE_TTL,
224221
),
225-
cache_sdk=internal_cache_sdk,
226222
app=functools.partial(
227223
cast(Callable[[StreamEvent, Api, UserRedisSdk], Any], func),
228224
app_event,
@@ -295,9 +291,6 @@ def wrapper(
295291
user_cache_sdk = UserRedisSdk(
296292
hash_name=hash_name, redis_dsn=SETTINGS.CACHE_URL, redis_client=redis_client
297293
)
298-
internal_cache_sdk = InternalRedisSdk(
299-
hash_name=hash_name, redis_client=redis_client
300-
)
301294

302295
if isinstance(event, RawScheduledDataTimeEvent) and event.merge_metadata:
303296
event = event.rebuild_with_modified_times(
@@ -327,7 +320,6 @@ def wrapper(
327320
api_sdk=CorvaApiSdk(api_adapter=api),
328321
ttl=SETTINGS.SECRETS_CACHE_TTL,
329322
),
330-
cache_sdk=internal_cache_sdk,
331323
app=functools.partial(
332324
cast(Callable[[ScheduledEvent, Api, UserRedisSdk], Any], func),
333325
cast(ScheduledEvent, app_event),
@@ -380,7 +372,7 @@ def wrapper(
380372
api_key: str,
381373
aws_request_id: str,
382374
logging_ctx: LoggingContext,
383-
redis_client: redis.Redis,
375+
redis_client: Optional[redis.Redis] = None, # noqa, for safe reasons
384376
) -> Any:
385377
status = TaskStatus.fail
386378
data: Dict[str, Union[dict, str]] = {"payload": {}}
@@ -418,7 +410,6 @@ def wrapper(
418410
api_sdk=CorvaApiSdk(api_adapter=api),
419411
ttl=SETTINGS.SECRETS_CACHE_TTL,
420412
),
421-
cache_sdk=FakeInternalCacheSdk(),
422413
app=functools.partial(
423414
cast(Callable[[TaskEvent, Api], Any], func), app_event, api
424415
),
@@ -505,14 +496,6 @@ def wrapper(
505496
app_key=SETTINGS.APP_KEY,
506497
app_connection_id=event.data.rerun_app_connection_id,
507498
)
508-
internal_cache_hash_name = get_cache_key(
509-
provider=SETTINGS.PROVIDER,
510-
asset_id=event.data.rerun_asset_id,
511-
app_stream_id=event.data.rerun_app_stream_id,
512-
app_key=SETTINGS.APP_KEY,
513-
app_connection_id=event.data.app_connection_id,
514-
)
515-
516499
asset_cache = UserRedisSdk(
517500
hash_name=asset_cache_hash_name,
518501
redis_dsn=SETTINGS.CACHE_URL,
@@ -547,9 +530,6 @@ def wrapper(
547530
api_sdk=CorvaApiSdk(api_adapter=api),
548531
ttl=SETTINGS.SECRETS_CACHE_TTL,
549532
),
550-
cache_sdk=InternalRedisSdk(
551-
hash_name=internal_cache_hash_name, redis_client=redis_client
552-
),
553533
app=functools.partial(
554534
cast(
555535
Callable[

0 commit comments

Comments
 (0)