Skip to content

Commit 80fa258

Browse files
vishal-balaclaude
andauthored
test: harden Redis-backed test isolation under pytest-xdist (#546) (#636)
## Motivation The integration suite runs under `pytest -n auto`, and now and then a single matrix job fails on a test that passes everywhere else — and passes again on rerun. The most recent example was `test_hybrid_query_with_geo_filter` returning 0 results instead of 3 on exactly one Python/redis-py/Redis combination. These failures read as version-specific, but they aren't: they're a test-isolation problem wearing a disguise. The key fact is how the suite isolates workers. Each xdist worker gets its **own** Redis container, so two tests can only interfere when they run on the **same** worker *and* reach for the **same** Redis key names. For that to actually corrupt a result, two things have to line up: 1. a **shared resource name** (a name scoped only to the worker, or a hard-coded one), and 2. a **way for state to outlive its test** — cleanup that only runs on the success path, or an index recreated with `overwrite=True` but no `drop=True`, leaving stale documents under a reused prefix. When both hold, one test sees another's leftovers — empty results, inflated counts, dtype mismatches — and which test loses depends entirely on the order the scheduler picked. That's the retry-passing, pseudo-random signature. #543 introduced a `redis_test_name` helper that mints per-test-unique names and began migrating fixtures onto it. This PR finishes that migration for the modules that can genuinely cause flakiness, and tightens their recreate/cleanup so leftovers can't survive. ## What changed **Recreate group** — per-test names + `create(overwrite=True, drop=True)`: - `test_hybrid.py` and `test_aggregation.py` (these two shared the *identical* `user_index_{worker_id}` / `v1_{worker_id}` — the direct cause of the geo-filter failure), plus `test_search_results.py`, `test_svs_integration.py`, and the shared `conftest.py` index fixtures. **Fixed-name collisions** — per-test names + guaranteed teardown: - `from_existing_complex` (hard-coded `"test"`) and `no_proactive_module_validation` (`"my_index"`) in the sync + async search-index suites. - The three `test_embedcache_warnings.py` tests sharing `"test_cache"`. - The shared `"doc"` prefix / `"test-index"` name across `test_no_proactive_module_checks.py`. **Raw `worker_id` collision** — unique names + cleanup: - The two `test_llmcache.py` tests that both created `float64_cache_{worker_id}` and never cleaned up. **Cluster tests** — the one place cross-worker collisions are real, since every cluster test points at a single hard-coded `redis://localhost:7001`: `test_redis_cluster_support.py` and `test_cluster_pipelining.py` now use per-test names, `drop=True`, and `try/finally` cleanup. ## Scope and follow-ups - The `SemanticRouter.delete()` leak of its `{name}:route_config` key is a product-code gap, not a test-only fix, so it's split into #634 (which also owns the dependent `test_key_separator_handling.py` cleanup). - The remaining #546 items are **benign hygiene** — leaks under per-test-unique names that can't contaminate another test, only accumulate harmless keys in a worker's throwaway Redis. Their exact remaining scope is enumerated in a comment on #546. ## Testing Honest caveat for reviewers: this was developed in an environment without Docker, so I could **not** run the integration suite. `black`, `isort`, and `mypy` pass. Please run `make test` and a repeated `pytest -n auto` pass to confirm the flake is gone before merging — this is a test-only change, so behavior is fully validated by the suite itself. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > Changes are confined to the test suite; production library behavior is unchanged. > > **Overview** > **Hardens integration tests** so parallel `pytest-xdist` workers and shared Redis/cluster endpoints do not leave stale keys or collide on index/cache names. > > Fixtures and tests now use **`redis_test_name`** (per-test unique index/prefix/cache names) instead of **`worker_id`-only** or hard-coded names like `"test"` / `"my_index"` / `"test_cache"`. Shared **`conftest`** index fixtures (`flat_index`, `hnsw_index`, async variants) and modules such as **`test_hybrid`**, **`test_aggregation`**, **`test_search_results`**, and **`test_svs_integration`** adopt that naming and call **`create(overwrite=True, drop=True)`** so documents under a reused prefix are cleared before load. > > **Teardown** is tightened with **`try/finally`**, **`delete(drop=True)`**, and cache **`clear()`** / **`aclear()`** on failure paths (e.g. `from_existing_complex`, module-validation tests, **`test_llmcache`** dtype caches, **`test_embedcache_warnings`**). **Cluster** tests use per-test names and **`drop=True`** because all workers share **`localhost:7001`**. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit c4e6b33. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 93b29be commit 80fa258

12 files changed

Lines changed: 268 additions & 185 deletions

tests/conftest.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ def flat_index(sample_data, redis_url, redis_test_name):
518518
)
519519

520520
# create the index (no data yet)
521-
index.create(overwrite=True)
521+
index.create(overwrite=True, drop=True)
522522

523523
# Prepare and load the data
524524
def hash_preprocess(item: dict) -> dict:
@@ -575,7 +575,7 @@ async def async_flat_index(sample_data, redis_url, redis_test_name):
575575
)
576576

577577
# create the index (no data yet)
578-
await index.create(overwrite=True)
578+
await index.create(overwrite=True, drop=True)
579579

580580
# Prepare and load the data
581581
def hash_preprocess(item: dict) -> dict:
@@ -631,7 +631,7 @@ async def async_hnsw_index(sample_data, redis_url, redis_test_name):
631631
)
632632

633633
# create the index (no data yet)
634-
await index.create(overwrite=True)
634+
await index.create(overwrite=True, drop=True)
635635

636636
# Prepare and load the data
637637
def hash_preprocess(item: dict) -> dict:
@@ -687,7 +687,7 @@ def hnsw_index(sample_data, redis_url, redis_test_name):
687687
)
688688

689689
# create the index (no data yet)
690-
index.create(overwrite=True)
690+
index.create(overwrite=True, drop=True)
691691

692692
# Prepare and load the data
693693
def hash_preprocess(item: dict) -> dict:

tests/integration/test_aggregation.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88

99

1010
@pytest.fixture
11-
def index(multi_vector_data, redis_url, worker_id):
11+
def index(multi_vector_data, redis_url, redis_test_name):
1212

1313
index = SearchIndex.from_dict(
1414
{
1515
"index": {
16-
"name": f"user_index_{worker_id}",
17-
"prefix": f"v1_{worker_id}",
16+
"name": redis_test_name("user_index"),
17+
"prefix": redis_test_name("v1"),
1818
"storage_type": "hash",
1919
},
2020
"fields": [
@@ -59,8 +59,9 @@ def index(multi_vector_data, redis_url, worker_id):
5959
redis_url=redis_url,
6060
)
6161

62-
# create the index (no data yet)
63-
index.create(overwrite=True)
62+
# create the index (no data yet); drop any stale docs left by an
63+
# interrupted earlier run sharing this worker's Redis database
64+
index.create(overwrite=True, drop=True)
6465

6566
# prepare and load the data
6667
def hash_preprocess(item: dict) -> dict:

tests/integration/test_async_search_index.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ async def test_search_index_from_existing_url(async_index, redis_url):
113113

114114

115115
@pytest.mark.asyncio
116-
async def test_search_index_from_existing_complex(async_client):
116+
async def test_search_index_from_existing_complex(async_client, redis_test_name):
117117
schema = {
118118
"index": {
119-
"name": "test",
120-
"prefix": "test",
119+
"name": redis_test_name("complex_index"),
120+
"prefix": redis_test_name("complex"),
121121
"storage_type": "json",
122122
},
123123
"fields": [
@@ -143,33 +143,37 @@ async def test_search_index_from_existing_complex(async_client):
143143
],
144144
}
145145
async_index = AsyncSearchIndex.from_dict(schema, redis_client=async_client)
146-
await async_index.create(overwrite=True)
146+
await async_index.create(overwrite=True, drop=True)
147147

148148
try:
149-
async_index2 = await AsyncSearchIndex.from_existing(
150-
async_index.name, redis_client=async_client
151-
)
152-
except Exception as e:
153-
pytest.skip(str(e))
154-
155-
# Verify index metadata matches
156-
assert async_index2.schema.index.name == async_index.schema.index.name
157-
assert async_index2.schema.index.prefix == async_index.schema.index.prefix
158-
assert (
159-
async_index2.schema.index.storage_type == async_index.schema.index.storage_type
160-
)
161-
162-
# Verify non-vector fields are present
163-
for field_name in ["user", "credit_score", "job", "age"]:
164-
assert field_name in async_index2.schema.fields
149+
try:
150+
async_index2 = await AsyncSearchIndex.from_existing(
151+
async_index.name, redis_client=async_client
152+
)
153+
except Exception as e:
154+
pytest.skip(str(e))
155+
156+
# Verify index metadata matches
157+
assert async_index2.schema.index.name == async_index.schema.index.name
158+
assert async_index2.schema.index.prefix == async_index.schema.index.prefix
165159
assert (
166-
async_index2.schema.fields[field_name].type
167-
== async_index.schema.fields[field_name].type
160+
async_index2.schema.index.storage_type
161+
== async_index.schema.index.storage_type
168162
)
169163

170-
# Vector field may not be present on older Redis versions
171-
if "user_embedding" in async_index2.schema.fields:
172-
assert async_index2.schema.fields["user_embedding"].type == "vector"
164+
# Verify non-vector fields are present
165+
for field_name in ["user", "credit_score", "job", "age"]:
166+
assert field_name in async_index2.schema.fields
167+
assert (
168+
async_index2.schema.fields[field_name].type
169+
== async_index.schema.fields[field_name].type
170+
)
171+
172+
# Vector field may not be present on older Redis versions
173+
if "user_embedding" in async_index2.schema.fields:
174+
assert async_index2.schema.fields["user_embedding"].type == "vector"
175+
finally:
176+
await async_index.delete(drop=True)
173177

174178

175179
def test_search_index_no_prefix(index_schema):
@@ -493,7 +497,9 @@ async def test_search_index_that_owns_client_disconnect_sync(index_schema, redis
493497

494498

495499
@pytest.mark.asyncio
496-
async def test_async_search_index_no_proactive_module_validation(redis_url):
500+
async def test_async_search_index_no_proactive_module_validation(
501+
redis_url, redis_test_name
502+
):
497503
"""
498504
Updated test for issue #370: AsyncSearchIndex should not validate modules proactively.
499505
Operations should fail naturally if modules are missing.
@@ -505,7 +511,7 @@ async def test_async_search_index_no_proactive_module_validation(redis_url):
505511
# Create index - validation should only set lib name, not check modules
506512
index = AsyncSearchIndex(
507513
schema=IndexSchema.from_dict(
508-
{"index": {"name": "my_index"}, "fields": fields}
514+
{"index": {"name": redis_test_name("my_index")}, "fields": fields}
509515
),
510516
redis_client=client,
511517
)
@@ -517,8 +523,11 @@ async def test_async_search_index_no_proactive_module_validation(redis_url):
517523
# The actual operation (create) will succeed if modules are present
518524
await index.create(overwrite=True, drop=True)
519525

520-
# Verify index was created successfully (modules are present in test env)
521-
assert await index.exists()
526+
try:
527+
# Verify index was created successfully (modules are present in test env)
528+
assert await index.exists()
529+
finally:
530+
await index.delete(drop=True)
522531

523532

524533
@pytest.mark.asyncio

tests/integration/test_cluster_pipelining.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,20 @@ def test_real_cluster_pipeline_get_protocol_version(redis_cluster_url):
3333

3434

3535
@pytest.mark.requires_cluster
36-
def test_real_searchindex_with_cluster_batch_operations(redis_cluster_url):
36+
def test_real_searchindex_with_cluster_batch_operations(
37+
redis_cluster_url, redis_test_name
38+
):
3739
"""
3840
Test SearchIndex.load() with Redis Cluster.
3941
"""
4042
# Create schema like the user had
43+
index_prefix = redis_test_name("doc")
4144
schema_dict = {
42-
"index": {"name": "test-real-365", "prefix": "doc", "storage_type": "hash"},
45+
"index": {
46+
"name": redis_test_name("test-real-365"),
47+
"prefix": index_prefix,
48+
"storage_type": "hash",
49+
},
4350
"fields": [
4451
{"name": "id", "type": "tag"},
4552
{"name": "text", "type": "text"},
@@ -52,7 +59,7 @@ def test_real_searchindex_with_cluster_batch_operations(redis_cluster_url):
5259
index = SearchIndex(schema, redis_url=redis_cluster_url)
5360

5461
# Create the index
55-
index.create(overwrite=True)
62+
index.create(overwrite=True, drop=True)
5663

5764
try:
5865
# Test data like user had
@@ -67,11 +74,11 @@ def test_real_searchindex_with_cluster_batch_operations(redis_cluster_url):
6774
)
6875

6976
assert len(keys) == 10
70-
assert all(k.startswith("doc:") for k in keys)
77+
assert all(k.startswith(f"{index_prefix}:") for k in keys)
7178

7279
finally:
7380
# Clean up
74-
index.delete()
81+
index.delete(drop=True)
7582

7683

7784
@pytest.mark.requires_cluster
@@ -114,14 +121,18 @@ def test_cluster_pipeline_protocol_version_directly():
114121

115122

116123
@pytest.mark.requires_cluster
117-
def test_batch_search_with_real_cluster(redis_cluster_url):
124+
def test_batch_search_with_real_cluster(redis_cluster_url, redis_test_name):
118125
"""
119126
Test batch_search which uses get_protocol_version internally.
120127
"""
121128
from redisvl.query import FilterQuery
122129

123130
schema_dict = {
124-
"index": {"name": "test-batch-365", "prefix": "batch", "storage_type": "json"},
131+
"index": {
132+
"name": redis_test_name("test-batch-365"),
133+
"prefix": redis_test_name("batch"),
134+
"storage_type": "json",
135+
},
125136
"fields": [
126137
{"name": "id", "type": "tag"},
127138
{"name": "category", "type": "tag"},
@@ -131,7 +142,7 @@ def test_batch_search_with_real_cluster(redis_cluster_url):
131142
schema = IndexSchema.from_dict(schema_dict)
132143
index = SearchIndex(schema, redis_url=redis_cluster_url)
133144

134-
index.create(overwrite=True)
145+
index.create(overwrite=True, drop=True)
135146

136147
try:
137148
# Load test data
@@ -151,17 +162,21 @@ def test_batch_search_with_real_cluster(redis_cluster_url):
151162
assert len(results) == 3
152163

153164
finally:
154-
index.delete()
165+
index.delete(drop=True)
155166

156167

157168
@pytest.mark.requires_cluster
158169
@pytest.mark.parametrize("ttl", [None, 30])
159-
def test_cluster_load_with_ttl(redis_cluster_url, ttl):
170+
def test_cluster_load_with_ttl(redis_cluster_url, ttl, redis_test_name):
160171
"""
161172
Test that TTL is correctly set on keys when using load() with ttl parameter on cluster.
162173
"""
163174
schema_dict = {
164-
"index": {"name": "test-ttl-cluster", "prefix": "ttl", "storage_type": "hash"},
175+
"index": {
176+
"name": redis_test_name("test-ttl-cluster"),
177+
"prefix": redis_test_name("ttl"),
178+
"storage_type": "hash",
179+
},
165180
"fields": [
166181
{"name": "id", "type": "tag"},
167182
{"name": "text", "type": "text"},
@@ -171,7 +186,7 @@ def test_cluster_load_with_ttl(redis_cluster_url, ttl):
171186
schema = IndexSchema.from_dict(schema_dict)
172187
index = SearchIndex(schema, redis_url=redis_cluster_url)
173188

174-
index.create(overwrite=True)
189+
index.create(overwrite=True, drop=True)
175190

176191
try:
177192
# Load test data with TTL parameter
@@ -190,4 +205,4 @@ def test_cluster_load_with_ttl(redis_cluster_url, ttl):
190205
assert abs(key_ttl - ttl) <= 5
191206

192207
finally:
193-
index.delete()
208+
index.delete(drop=True)

tests/integration/test_embedcache_warnings.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@ def reset_warning_flag():
1919

2020

2121
@pytest.mark.asyncio
22-
async def test_sync_methods_warn_with_async_only_client(async_client, caplog):
22+
async def test_sync_methods_warn_with_async_only_client(
23+
async_client, caplog, redis_test_name
24+
):
2325
"""Test that sync methods warn when only async client is provided."""
2426
# Initialize EmbeddingsCache with only async_redis_client
25-
cache = EmbeddingsCache(name="test_cache", async_redis_client=async_client)
27+
cache = EmbeddingsCache(
28+
name=redis_test_name("test_cache"), async_redis_client=async_client
29+
)
2630

2731
# Mock _get_redis_client to prevent actual connection attempt
2832
with patch.object(cache, "_get_redis_client") as mock_get_client:
@@ -53,15 +57,15 @@ async def test_sync_methods_warn_with_async_only_client(async_client, caplog):
5357
assert len(caplog.records) == 0
5458

5559

56-
def test_no_warning_with_sync_client(redis_url):
60+
def test_no_warning_with_sync_client(redis_url, redis_test_name):
5761
"""Test that no warning is shown when sync client is provided."""
5862
# Create sync redis client from redis_url
5963
sync_client = Redis.from_url(redis_url)
6064

65+
cache = EmbeddingsCache(
66+
name=redis_test_name("test_cache"), redis_client=sync_client
67+
)
6168
try:
62-
# Initialize EmbeddingsCache with sync_redis_client
63-
cache = EmbeddingsCache(name="test_cache", redis_client=sync_client)
64-
6569
with patch("redisvl.utils.log.get_logger") as mock_logger:
6670
# Sync methods should not warn
6771
_ = cache.get_by_key("test_key")
@@ -70,19 +74,27 @@ def test_no_warning_with_sync_client(redis_url):
7074
# No warnings should have been logged
7175
mock_logger.return_value.warning.assert_not_called()
7276
finally:
77+
cache.clear()
7378
sync_client.close()
7479

7580

7681
@pytest.mark.asyncio
77-
async def test_async_methods_no_warning(async_client):
82+
async def test_async_methods_no_warning(async_client, redis_test_name):
7883
"""Test that async methods don't trigger warnings."""
7984
# Initialize EmbeddingsCache with only async_redis_client
80-
cache = EmbeddingsCache(name="test_cache", async_redis_client=async_client)
85+
cache = EmbeddingsCache(
86+
name=redis_test_name("test_cache"), async_redis_client=async_client
87+
)
8188

82-
with patch("redisvl.utils.log.get_logger") as mock_logger:
83-
# Async methods should not warn
84-
_ = await cache.aget_by_key("test_key")
85-
_ = await cache.aset(content="test", model_name="model", embedding=[0.1, 0.2])
89+
try:
90+
with patch("redisvl.utils.log.get_logger") as mock_logger:
91+
# Async methods should not warn
92+
_ = await cache.aget_by_key("test_key")
93+
_ = await cache.aset(
94+
content="test", model_name="model", embedding=[0.1, 0.2]
95+
)
8696

87-
# No warnings should have been logged
88-
mock_logger.return_value.warning.assert_not_called()
97+
# No warnings should have been logged
98+
mock_logger.return_value.warning.assert_not_called()
99+
finally:
100+
await cache.aclear()

tests/integration/test_hybrid.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020

2121
@pytest.fixture
22-
def index_schema(worker_id):
22+
def index_schema(redis_test_name):
2323
return IndexSchema.from_dict(
2424
{
2525
"index": {
26-
"name": f"user_index_{worker_id}",
27-
"prefix": f"v1_{worker_id}",
26+
"name": redis_test_name("user_index"),
27+
"prefix": redis_test_name("v1"),
2828
"storage_type": "hash",
2929
},
3030
"fields": [
@@ -73,8 +73,9 @@ def index_schema(worker_id):
7373
def index(index_schema, multi_vector_data, redis_url):
7474
index = SearchIndex(schema=index_schema, redis_url=redis_url)
7575

76-
# create the index (no data yet)
77-
index.create(overwrite=True)
76+
# create the index (no data yet); drop any stale docs left by an
77+
# interrupted earlier run sharing this worker's Redis database
78+
index.create(overwrite=True, drop=True)
7879

7980
# prepare and load the data
8081
def hash_preprocess(item: dict) -> dict:
@@ -97,7 +98,7 @@ def hash_preprocess(item: dict) -> dict:
9798
@pytest.fixture
9899
async def async_index(index_schema, multi_vector_data, async_client):
99100
index = AsyncSearchIndex(schema=index_schema, redis_client=async_client)
100-
await index.create(overwrite=True)
101+
await index.create(overwrite=True, drop=True)
101102

102103
def hash_preprocess(item: dict) -> dict:
103104
return {

0 commit comments

Comments
 (0)