Skip to content

Commit c379a36

Browse files
authored
[feat] Support reverse mapping from global_indexes to keys in KV interface (Ascend#41)
## Background Previously, the KV interface only provided the `(async_)kv_retrieve_meta` function, which translates `keys` into `BatchMeta`. It lacked the inverse translation capability to map `BatchMeta (global_indexes)` back to their corresponding `keys`. ## Changes - Introduced new `(async_)kv_retrieve_keys` functions. - Function rename: `(async_)kv_retrieve_keys` -> `(async_)kv_retrieve_meta` --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1 parent ba5710e commit c379a36

7 files changed

Lines changed: 616 additions & 87 deletions

File tree

tests/test_client.py

Lines changed: 254 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ def _handle_requests(self):
144144
"message": "Consumption reset successfully",
145145
}
146146
response_type = ZMQRequestType.RESET_CONSUMPTION_RESPONSE
147+
elif request_msg.request_type == ZMQRequestType.KV_RETRIEVE_META:
148+
response_body = self._mock_kv_retrieve_meta(request_msg.body)
149+
response_type = ZMQRequestType.KV_RETRIEVE_META_RESPONSE
147150
elif request_msg.request_type == ZMQRequestType.KV_RETRIEVE_KEYS:
148151
response_body = self._mock_kv_retrieve_keys(request_msg.body)
149152
response_type = ZMQRequestType.KV_RETRIEVE_KEYS_RESPONSE
@@ -193,7 +196,7 @@ def _mock_batch_meta(self, request_body):
193196

194197
return {"metadata": metadata}
195198

196-
def _mock_kv_retrieve_keys(self, request_body):
199+
def _mock_kv_retrieve_meta(self, request_body):
197200
"""Mock KV retrieve keys response."""
198201
keys = request_body.get("keys", [])
199202
create = request_body.get("create", False)
@@ -250,6 +253,42 @@ def _mock_kv_list(self, request_body):
250253

251254
return {"partition_info": {partition_id: {k: {} for k in keys}}, "message": "success"}
252255

256+
def _mock_kv_retrieve_keys(self, request_body):
257+
"""Mock KV retrieve indexes response."""
258+
global_indexes = request_body.get("global_indexes", [])
259+
partition_id = request_body.get("partition_id", "")
260+
261+
# Initialize key tracking if not exists
262+
if not hasattr(self, "_kv_partition_keys"):
263+
self._kv_partition_keys = {}
264+
265+
# Initialize index to key mapping if not exists
266+
if not hasattr(self, "_kv_index_to_key"):
267+
self._kv_index_to_key = {}
268+
269+
# Get keys for this partition
270+
partition_keys = self._kv_partition_keys.get(partition_id, [])
271+
272+
# Build reverse mapping from index to key if needed
273+
if not hasattr(self, "_kv_partition_index_map"):
274+
self._kv_partition_index_map = {}
275+
276+
if partition_id not in self._kv_partition_index_map:
277+
# Build the mapping from stored keys
278+
start_idx = self._get_next_kv_index(partition_id) - len(partition_keys)
279+
self._kv_partition_index_map[partition_id] = {}
280+
for i, key in enumerate(partition_keys):
281+
self._kv_partition_index_map[partition_id][start_idx + i] = key
282+
283+
index_map = self._kv_partition_index_map.get(partition_id, {})
284+
285+
# Retrieve keys for the given global_indexes
286+
keys = []
287+
for idx in global_indexes:
288+
keys.append(index_map.get(idx, None))
289+
290+
return {"keys": keys}
291+
253292
def _get_next_kv_index(self, partition_id):
254293
"""Get next available index for KV keys in partition."""
255294
if not hasattr(self, "_kv_index_map"):
@@ -970,12 +1009,12 @@ class TestClientKVInterface:
9701009
"""Tests for client KV interface methods."""
9711010

9721011
@pytest.mark.asyncio
973-
async def test_async_kv_retrieve_keys_single(self, client_setup):
974-
"""Test async_kv_retrieve_keys with single key."""
1012+
async def test_async_kv_retrieve_meta_single(self, client_setup):
1013+
"""Test async_kv_retrieve_meta with single key."""
9751014
client, _, _ = client_setup
9761015

977-
# Test async_kv_retrieve_keys with single key
978-
metadata = await client.async_kv_retrieve_keys(
1016+
# Test async_kv_retrieve_meta with single key
1017+
metadata = await client.async_kv_retrieve_meta(
9791018
keys="test_key_1",
9801019
partition_id="test_partition",
9811020
create=True,
@@ -988,13 +1027,13 @@ async def test_async_kv_retrieve_keys_single(self, client_setup):
9881027
assert metadata.size == 1
9891028

9901029
@pytest.mark.asyncio
991-
async def test_async_kv_retrieve_keys_multiple(self, client_setup):
992-
"""Test async_kv_retrieve_keys with multiple keys."""
1030+
async def test_async_kv_retrieve_meta_multiple(self, client_setup):
1031+
"""Test async_kv_retrieve_meta with multiple keys."""
9931032
client, _, _ = client_setup
9941033

995-
# Test async_kv_retrieve_keys with multiple keys
1034+
# Test async_kv_retrieve_meta with multiple keys
9961035
keys = ["key_a", "key_b", "key_c"]
997-
metadata = await client.async_kv_retrieve_keys(
1036+
metadata = await client.async_kv_retrieve_meta(
9981037
keys=keys,
9991038
partition_id="test_partition",
10001039
create=True,
@@ -1007,19 +1046,19 @@ async def test_async_kv_retrieve_keys_multiple(self, client_setup):
10071046
assert metadata.size == 3
10081047

10091048
@pytest.mark.asyncio
1010-
async def test_async_kv_retrieve_keys_create_false(self, client_setup):
1011-
"""Test async_kv_retrieve_keys with create=False (retrieve existing keys)."""
1049+
async def test_async_kv_retrieve_meta_create_false(self, client_setup):
1050+
"""Test async_kv_retrieve_meta with create=False (retrieve existing keys)."""
10121051
client, _, _ = client_setup
10131052

10141053
# create some keys
1015-
await client.async_kv_retrieve_keys(
1054+
await client.async_kv_retrieve_meta(
10161055
keys="existing_key",
10171056
partition_id="existing_partition",
10181057
create=True,
10191058
)
10201059

10211060
# Then retrieve them with create=False
1022-
metadata = await client.async_kv_retrieve_keys(
1061+
metadata = await client.async_kv_retrieve_meta(
10231062
keys="existing_key",
10241063
partition_id="existing_partition",
10251064
create=False,
@@ -1030,13 +1069,13 @@ async def test_async_kv_retrieve_keys_create_false(self, client_setup):
10301069
assert metadata.size == 1
10311070

10321071
@pytest.mark.asyncio
1033-
async def test_async_kv_retrieve_keys_invalid_keys_type(self, client_setup):
1034-
"""Test async_kv_retrieve_keys raises error with invalid keys type."""
1072+
async def test_async_kv_retrieve_meta_invalid_keys_type(self, client_setup):
1073+
"""Test async_kv_retrieve_meta raises error with invalid keys type."""
10351074
client, _, _ = client_setup
10361075

10371076
# Test with invalid keys type (not string or list)
10381077
with pytest.raises(TypeError):
1039-
await client.async_kv_retrieve_keys(
1078+
await client.async_kv_retrieve_meta(
10401079
keys=123, # Invalid type
10411080
partition_id="test_partition",
10421081
create=True,
@@ -1048,7 +1087,7 @@ async def test_async_kv_list_with_keys(self, client_setup):
10481087
client, mock_controller, _ = client_setup
10491088

10501089
# First register some keys
1051-
await client.async_kv_retrieve_keys(
1090+
await client.async_kv_retrieve_meta(
10521091
keys=["key_1", "key_2"],
10531092
partition_id="kv_partition",
10541093
create=True,
@@ -1069,12 +1108,12 @@ async def test_async_kv_list_multiple_partitions(self, client_setup):
10691108
client, _, _ = client_setup
10701109

10711110
# Create keys in different partitions
1072-
await client.async_kv_retrieve_keys(
1111+
await client.async_kv_retrieve_meta(
10731112
keys="partition_a_key",
10741113
partition_id="partition_a",
10751114
create=True,
10761115
)
1077-
await client.async_kv_retrieve_keys(
1116+
await client.async_kv_retrieve_meta(
10781117
keys="partition_b_key",
10791118
partition_id="partition_b",
10801119
create=True,
@@ -1096,19 +1135,212 @@ async def test_async_kv_list_multiple_partitions(self, client_setup):
10961135
assert list(partition_a["partition_a"].values()) == [{}]
10971136
assert list(partition_b["partition_b"].values()) == [{}]
10981137

1099-
def test_kv_retrieve_keys_type_validation(self, client_setup):
1100-
"""Test synchronous kv_retrieve_keys type validation."""
1138+
def test_kv_retrieve_meta_type_validation(self, client_setup):
1139+
"""Test synchronous kv_retrieve_meta type validation."""
11011140
import asyncio
11021141

11031142
client, _, _ = client_setup
11041143

11051144
# Test with non-string element in list
11061145
async def test_invalid_list():
11071146
with pytest.raises(TypeError):
1108-
await client.async_kv_retrieve_keys(
1147+
await client.async_kv_retrieve_meta(
11091148
keys=["valid_key", 123], # Invalid: 123 is not a string
11101149
partition_id="test_partition",
11111150
create=True,
11121151
)
11131152

11141153
asyncio.run(test_invalid_list())
1154+
1155+
@pytest.mark.asyncio
1156+
async def test_async_kv_retrieve_keys_single(self, client_setup):
1157+
"""Test async_kv_retrieve_keys with single global_index."""
1158+
client, _, _ = client_setup
1159+
partition_id = "test_partition_idx"
1160+
1161+
# First create a key using kv_retrieve_meta
1162+
await client.async_kv_retrieve_meta(
1163+
keys=["test_key"],
1164+
partition_id=partition_id,
1165+
create=True,
1166+
)
1167+
1168+
# Now retrieve the key using global_index 0
1169+
keys = await client.async_kv_retrieve_keys(
1170+
global_indexes=[0],
1171+
partition_id=partition_id,
1172+
)
1173+
1174+
assert keys == ["test_key"]
1175+
1176+
@pytest.mark.asyncio
1177+
async def test_async_kv_retrieve_keys_multiple(self, client_setup):
1178+
"""Test async_kv_retrieve_keys with multiple global_indexes."""
1179+
client, _, _ = client_setup
1180+
partition_id = "test_partition_idx"
1181+
1182+
# First create keys using kv_retrieve_meta
1183+
keys_to_create = ["key_a", "key_b", "key_c"]
1184+
await client.async_kv_retrieve_meta(
1185+
keys=keys_to_create,
1186+
partition_id=partition_id,
1187+
create=True,
1188+
)
1189+
1190+
# Retrieve keys using global_indexes [0, 1, 2]
1191+
keys = await client.async_kv_retrieve_keys(
1192+
global_indexes=[0, 1, 2],
1193+
partition_id=partition_id,
1194+
)
1195+
1196+
assert keys == ["key_a", "key_b", "key_c"]
1197+
1198+
@pytest.mark.asyncio
1199+
async def test_async_kv_retrieve_keys_partial(self, client_setup):
1200+
"""Test async_kv_retrieve_keys with subset of global_indexes."""
1201+
client, _, _ = client_setup
1202+
partition_id = "test_partition_idx"
1203+
1204+
# First create keys using kv_retrieve_meta
1205+
await client.async_kv_retrieve_meta(
1206+
keys=["first_key", "second_key", "third_key"],
1207+
partition_id=partition_id,
1208+
create=True,
1209+
)
1210+
1211+
# Retrieve only first and third keys
1212+
keys = await client.async_kv_retrieve_keys(
1213+
global_indexes=[0, 2],
1214+
partition_id=partition_id,
1215+
)
1216+
1217+
assert keys == ["first_key", "third_key"]
1218+
1219+
@pytest.mark.asyncio
1220+
async def test_async_kv_retrieve_keys_single_int(self, client_setup):
1221+
"""Test async_kv_retrieve_keys accepts a single int."""
1222+
client, _, _ = client_setup
1223+
partition_id = "test_partition_idx"
1224+
1225+
# First create a key using kv_retrieve_meta
1226+
await client.async_kv_retrieve_meta(
1227+
keys=["single_key"],
1228+
partition_id=partition_id,
1229+
create=True,
1230+
)
1231+
1232+
# Now retrieve the key using a single int (not a list)
1233+
keys = await client.async_kv_retrieve_keys(
1234+
global_indexes=0,
1235+
partition_id=partition_id,
1236+
)
1237+
1238+
assert keys == ["single_key"]
1239+
1240+
@pytest.mark.asyncio
1241+
async def test_async_kv_retrieve_keys_invalid_type(self, client_setup):
1242+
"""Test async_kv_retrieve_keys raises error with invalid global_indexes type."""
1243+
client, _, _ = client_setup
1244+
1245+
# Test with invalid type (string instead of int)
1246+
with pytest.raises(TypeError):
1247+
await client.async_kv_retrieve_keys(
1248+
global_indexes=["not_an_int"],
1249+
partition_id="test_partition",
1250+
)
1251+
1252+
@pytest.mark.asyncio
1253+
async def test_async_kv_retrieve_keys_empty_list(self, client_setup):
1254+
"""Test async_kv_retrieve_keys raises error with empty list."""
1255+
client, _, _ = client_setup
1256+
1257+
with pytest.raises(ValueError):
1258+
await client.async_kv_retrieve_keys(
1259+
global_indexes=[],
1260+
partition_id="test_partition",
1261+
)
1262+
1263+
@pytest.mark.asyncio
1264+
async def test_async_kv_retrieve_keys_non_existent(self, client_setup):
1265+
"""Test async_kv_retrieve_keys returns None for non-existent global_indexes."""
1266+
client, _, _ = client_setup
1267+
partition_id = "test_partition_idx"
1268+
1269+
# First create a key using kv_retrieve_meta
1270+
await client.async_kv_retrieve_meta(
1271+
keys=["existing_key"],
1272+
partition_id=partition_id,
1273+
create=True,
1274+
)
1275+
1276+
# Try to retrieve a non-existent global_index
1277+
keys = await client.async_kv_retrieve_keys(
1278+
global_indexes=[99],
1279+
partition_id=partition_id,
1280+
)
1281+
assert keys == [None]
1282+
1283+
@pytest.mark.asyncio
1284+
async def test_async_kv_retrieve_keys_multiple_partitions(self, client_setup):
1285+
"""Test async_kv_retrieve_keys returns keys from the correct partition."""
1286+
client, _, _ = client_setup
1287+
partition_1 = "partition_1"
1288+
partition_2 = "partition_2"
1289+
1290+
# Create keys in both partitions
1291+
await client.async_kv_retrieve_meta(
1292+
keys=["key_1"],
1293+
partition_id=partition_1,
1294+
create=True,
1295+
)
1296+
await client.async_kv_retrieve_meta(
1297+
keys=["key_2"],
1298+
partition_id=partition_2,
1299+
create=True,
1300+
)
1301+
1302+
# Retrieve key from partition_1 (global_index 0)
1303+
keys_1 = await client.async_kv_retrieve_keys(
1304+
global_indexes=[0],
1305+
partition_id=partition_1,
1306+
)
1307+
1308+
# Retrieve key from partition_2 (global_index 0)
1309+
keys_2 = await client.async_kv_retrieve_keys(
1310+
global_indexes=[0],
1311+
partition_id=partition_2,
1312+
)
1313+
1314+
assert keys_1 == ["key_1"]
1315+
assert keys_2 == ["key_2"]
1316+
1317+
def test_kv_retrieve_keys_sync(self, client_setup):
1318+
"""Test synchronous kv_retrieve_keys."""
1319+
client, _, _ = client_setup
1320+
partition_id = "test_partition_sync"
1321+
1322+
# First create a key using kv_retrieve_meta
1323+
client.kv_retrieve_meta(
1324+
keys=["sync_key"],
1325+
partition_id=partition_id,
1326+
create=True,
1327+
)
1328+
1329+
# Now retrieve the key using global_index
1330+
keys = client.kv_retrieve_keys(
1331+
global_indexes=[0],
1332+
partition_id=partition_id,
1333+
)
1334+
1335+
assert keys == ["sync_key"]
1336+
1337+
def test_kv_retrieve_keys_type_validation(self, client_setup):
1338+
"""Test synchronous kv_retrieve_keys type validation."""
1339+
client, _, _ = client_setup
1340+
1341+
# Test with non-int element in list
1342+
with pytest.raises(TypeError):
1343+
client.kv_retrieve_keys(
1344+
global_indexes=[0, "invalid"],
1345+
partition_id="test_partition",
1346+
)

0 commit comments

Comments
 (0)