Skip to content

Commit 6dce105

Browse files
authored
Improve flakiness of SSB tests (#2015)
* Add pytest timeouts: - mark batch tests specifically with long timeouts - add timeouts to GH action jobs as a backup - dump full stack-trace on timeout detection to debug hangs in the CI * Avoid crashing the xdist node on failure * Timeout with signal rather than thread to avoid crashing xdist worker node * Add test to verify behaviour with xdist, change behaviour to match test * Attempt bug squashing of hanging test flake in CI through stricter inflight_{objs,refs} logic * Fix typo * Move inflight updates before yield to grpc to avoid racing * Remove debugging logs * Add safety hatch in-case server doesn't clos its side of the stream in time (300s) * Improve shutdown behaviour of client by adding timeouts to joins and cancelling stream if necessary * Improvements to batching logic: - Avoid 60s timeout putting into self.__reqs - Handle graceful stopping of all bg threads - Allow cancelling of hanging streams - Reraise bg_exceptions when they happen - Align shutdown timeout with client-defined insert timeout - Add mock tests of cancelling bidi streamsa * Remove redundant unit tests * Remove wrong if clause when req is Empty * Remove redundant stream cancellation logic, align async with sync impl * Fix missed await * Remove redundant code * Achieve parity in async/sync impls
1 parent 5a32208 commit 6dce105

11 files changed

Lines changed: 330 additions & 90 deletions

File tree

.github/workflows/main.yaml

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ env:
2222
WEAVIATE_128: 1.28.16
2323
WEAVIATE_129: 1.29.11
2424
WEAVIATE_130: 1.30.22
25-
WEAVIATE_131: 1.31.20
26-
WEAVIATE_132: 1.32.23
27-
WEAVIATE_133: 1.33.10
28-
WEAVIATE_134: 1.34.5
29-
WEAVIATE_135: 1.35.16-efdedfa
30-
WEAVIATE_136: 1.36.9-d905e6c
31-
WEAVIATE_137: 1.37.0-rc.0-b313954.amd64
32-
25+
WEAVIATE_131: 1.31.22
26+
WEAVIATE_132: 1.32.27
27+
WEAVIATE_133: 1.33.18
28+
WEAVIATE_134: 1.34.19
29+
WEAVIATE_135: 1.35.15
30+
WEAVIATE_136: 1.36.6-8edcf08.amd64
31+
WEAVIATE_137: 1.37.0-dev-29d5c87.amd64
3332

3433
jobs:
3534
lint-and-format:
3635
name: Run Linter and Formatter
3736
runs-on: ubuntu-latest
37+
timeout-minutes: 5
3838
steps:
3939
- uses: actions/checkout@v4
4040
- uses: actions/setup-python@v5
@@ -60,6 +60,7 @@ jobs:
6060
type-checking:
6161
name: Run Type Checking
6262
runs-on: ubuntu-latest
63+
timeout-minutes: 5
6364
strategy:
6465
fail-fast: false
6566
matrix:
@@ -80,6 +81,7 @@ jobs:
8081
unit-tests:
8182
name: Run Unit Tests
8283
runs-on: ubuntu-latest
84+
timeout-minutes: 5
8385
strategy:
8486
fail-fast: false
8587
matrix:
@@ -104,6 +106,7 @@ jobs:
104106
proto-test:
105107
name: Run importing protos test
106108
runs-on: ubuntu-latest
109+
timeout-minutes: 5
107110
strategy:
108111
fail-fast: false
109112
matrix:
@@ -124,6 +127,7 @@ jobs:
124127
integration-tests-embedded:
125128
name: Run Integration Tests Embedded
126129
runs-on: ubuntu-latest
130+
timeout-minutes: 30
127131
strategy:
128132
matrix:
129133
version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
@@ -153,6 +157,7 @@ jobs:
153157
integration-tests:
154158
name: Run Integration Tests
155159
runs-on: ubuntu-latest
160+
timeout-minutes: 30
156161
strategy:
157162
fail-fast: false
158163
matrix:
@@ -208,6 +213,7 @@ jobs:
208213
journey-tests:
209214
name: Run Journey Tests
210215
runs-on: ubuntu-latest
216+
timeout-minutes: 30
211217
strategy:
212218
fail-fast: false
213219
matrix:
@@ -243,6 +249,7 @@ jobs:
243249
Codecov:
244250
needs: [Unit-Tests, Integration-Tests]
245251
runs-on: ubuntu-latest
252+
timeout-minutes: 5
246253
if: github.ref_name != 'main' && !github.event.pull_request.head.repo.fork
247254
steps:
248255
- uses: actions/checkout@v4
@@ -273,6 +280,7 @@ jobs:
273280
build-package:
274281
name: Build package
275282
runs-on: ubuntu-latest
283+
timeout-minutes: 10
276284
steps:
277285
- name: Checkout
278286
uses: actions/checkout@v4
@@ -297,6 +305,7 @@ jobs:
297305
test-package:
298306
needs: [build-package]
299307
runs-on: ubuntu-latest
308+
timeout-minutes: 30
300309
strategy:
301310
fail-fast: false
302311
matrix:
@@ -341,6 +350,7 @@ jobs:
341350
name: Build and publish Python 🐍 distributions 📦 to PyPI and TestPyPI
342351
needs: [integration-tests, unit-tests, lint-and-format, type-checking, test-package, proto-test]
343352
runs-on: ubuntu-latest
353+
timeout-minutes: 20
344354
steps:
345355
- name: Checkout
346356
uses: actions/checkout@v4
@@ -366,6 +376,7 @@ jobs:
366376
name: Create a GitHub Release on new tags
367377
if: startsWith(github.ref, 'refs/tags')
368378
runs-on: ubuntu-latest
379+
timeout-minutes: 5
369380
needs: [build-and-publish]
370381
steps:
371382
- name: Download build artifact to append to release

conftest.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import faulthandler
2+
import os
3+
import threading
4+
5+
import pytest
6+
7+
DEFAULT_TIMEOUT = 300 # 5 minutes
8+
9+
_timeout_timer: threading.Timer | None = None
10+
11+
12+
def _get_timeout(item: pytest.Item) -> float:
13+
marker = item.get_closest_marker("timeout")
14+
if marker and marker.args:
15+
return float(marker.args[0])
16+
return float(DEFAULT_TIMEOUT)
17+
18+
19+
def pytest_runtest_setup(item: pytest.Item) -> None:
20+
"""Start a watchdog timer that dumps all thread stack traces on timeout.
21+
22+
Unlike pytest-timeout, this does NOT raise KeyboardInterrupt (which crashes
23+
xdist workers and corrupts asyncio event loops). Instead it:
24+
1. Writes the test name + all thread tracebacks directly to fd 2 (stderr).
25+
With --capture=sys in pytest.ini, fd 2 is the real stderr (not captured),
26+
so the output goes directly to the CI log even under xdist.
27+
2. Calls os._exit(1) to terminate the worker process.
28+
29+
xdist will report 'node down: Not properly terminated' which is expected —
30+
the diagnostic output will already be in the CI logs above that message.
31+
"""
32+
global _timeout_timer
33+
timeout = _get_timeout(item)
34+
if timeout <= 0:
35+
return
36+
37+
def _on_timeout() -> None:
38+
banner = "=" * 70
39+
os.write(2, f"\n\n{banner}\n".encode())
40+
os.write(2, f"TIMEOUT: {item.nodeid} exceeded {timeout}s\n".encode())
41+
os.write(2, f"{banner}\n\n".encode())
42+
# faulthandler needs a file object — wrap a dup of fd 2 to avoid closing it
43+
with os.fdopen(os.dup(2), "w") as f:
44+
faulthandler.dump_traceback(file=f)
45+
f.flush()
46+
os.write(2, f"\n{banner}\n\n".encode())
47+
os._exit(1)
48+
49+
_timeout_timer = threading.Timer(timeout, _on_timeout)
50+
_timeout_timer.daemon = True
51+
_timeout_timer.start()
52+
53+
54+
def pytest_runtest_teardown(item: pytest.Item, nextitem: pytest.Item | None) -> None:
55+
"""Cancel the watchdog timer after each test completes."""
56+
global _timeout_timer
57+
if _timeout_timer is not None:
58+
_timeout_timer.cancel()
59+
_timeout_timer = None

integration/test_batch_v4.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
410410
assert ret_obj.references["test"].objects[0].uuid == obj[0]
411411

412412

413+
@pytest.mark.timeout(600)
413414
@pytest.mark.parametrize(
414415
"batching_method",
415416
[
@@ -717,6 +718,7 @@ def test_non_existant_collection(client_factory: ClientFactory) -> None:
717718
# not, so we do not check for errors here
718719

719720

721+
@pytest.mark.timeout(600)
720722
def test_number_of_stored_results_in_batch(client_factory: ClientFactory) -> None:
721723
client, name = client_factory()
722724
with client.batch.dynamic() as batch:
@@ -816,6 +818,7 @@ def test_references_with_to_uuids(client_factory: ClientFactory) -> None:
816818

817819

818820
@pytest.mark.asyncio
821+
@pytest.mark.timeout(600)
819822
async def test_add_one_hundred_thousand_objects_async_client(
820823
async_client_factory: AsyncClientFactory,
821824
) -> None:
@@ -846,6 +849,7 @@ async def test_add_one_hundred_thousand_objects_async_client(
846849
await client.collections.delete(name)
847850

848851

852+
@pytest.mark.timeout(600)
849853
def test_add_one_hundred_thousand_objects_sync_client(
850854
client_factory: ClientFactory,
851855
) -> None:

integration/test_collection_batch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ def test_non_existant_collection(collection_factory_get: CollectionFactoryGet) -
271271

272272

273273
@pytest.mark.asyncio
274+
@pytest.mark.timeout(600)
274275
async def test_batch_one_hundred_thousand_objects_async_collection(
275276
batch_collection_async: BatchCollectionAsync,
276277
) -> None:
@@ -298,6 +299,7 @@ async def test_batch_one_hundred_thousand_objects_async_collection(
298299

299300

300301
@pytest.mark.asyncio
302+
@pytest.mark.timeout(600)
301303
async def test_ingest_one_hundred_thousand_data_objects_async(
302304
batch_collection_async: BatchCollectionAsync,
303305
) -> None:
@@ -319,6 +321,7 @@ async def test_ingest_one_hundred_thousand_data_objects_async(
319321
assert len(results.errors) == 0, [obj.message for obj in results.errors.values()]
320322

321323

324+
@pytest.mark.timeout(600)
322325
def test_ingest_one_hundred_thousand_data_objects(
323326
batch_collection: BatchCollection,
324327
) -> None:

mock_tests/conftest.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import time
33
from concurrent import futures
4-
from typing import Generator, Mapping
4+
from typing import AsyncGenerator, Generator, Mapping
55

66
import grpc
77
import pytest
@@ -141,6 +141,16 @@ def weaviate_client(
141141
client.close()
142142

143143

144+
@pytest.fixture(scope="function")
145+
async def weaviate_client_async(
146+
weaviate_mock: HTTPServer, start_grpc_server: grpc.Server
147+
) -> AsyncGenerator[weaviate.WeaviateAsyncClient, None]:
148+
client = weaviate.use_async_with_local(port=MOCK_PORT, host=MOCK_IP, grpc_port=MOCK_PORT_GRPC)
149+
await client.connect()
150+
yield client
151+
await client.close()
152+
153+
144154
@pytest.fixture(scope="function")
145155
def weaviate_timeouts_client(
146156
weaviate_timeouts_mock: HTTPServer, start_grpc_server: grpc.Server
@@ -368,3 +378,39 @@ def forbidden(
368378
service = MockForbiddenWeaviateService()
369379
weaviate_pb2_grpc.add_WeaviateServicer_to_server(service, start_grpc_server)
370380
return weaviate_client.collections.use("ForbiddenCollection")
381+
382+
383+
class MockWeaviateService(weaviate_pb2_grpc.WeaviateServicer):
384+
def BatchStream(
385+
self,
386+
request_iterator: Generator[batch_pb2.BatchStreamRequest, None, None],
387+
context: grpc.ServicerContext,
388+
) -> Generator[batch_pb2.BatchStreamReply, None, None]:
389+
while True:
390+
if context.is_active():
391+
time.sleep(0.1)
392+
else:
393+
raise grpc.RpcError(grpc.StatusCode.DEADLINE_EXCEEDED, "Deadline exceeded")
394+
395+
396+
@pytest.fixture(scope="function")
397+
def stream_cancel(
398+
weaviate_client: weaviate.WeaviateClient,
399+
weaviate_mock: HTTPServer,
400+
start_grpc_server: grpc.Server,
401+
) -> Generator[weaviate.collections.Collection, None, None]:
402+
name = "StreamCancelCollection"
403+
weaviate_mock.expect_request(f"/v1/schema/{name}").respond_with_response(
404+
Response(status=404)
405+
) # skips __create_batch_reset vectorizer logic
406+
weaviate_pb2_grpc.add_WeaviateServicer_to_server(MockWeaviateService(), start_grpc_server)
407+
client = weaviate.connect_to_local(
408+
port=MOCK_PORT,
409+
host=MOCK_IP,
410+
grpc_port=MOCK_PORT_GRPC,
411+
additional_config=weaviate.classes.init.AdditionalConfig(
412+
timeout=weaviate.classes.init.Timeout(insert=1)
413+
),
414+
)
415+
yield client.collections.use(name)
416+
client.close()

pytest.ini

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[pytest]
2-
addopts = -m 'not profiling' --benchmark-skip -l
2+
addopts = -m 'not profiling' --benchmark-skip -l --capture=sys --max-worker-restart=3
33
markers =
44
profiling: marks tests that can be profiled
5+
timeout: marks tests with a custom timeout in seconds (default: 300)
56
asyncio_default_fixture_loop_scope = function

test/test_timeout.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""Tests for the custom per-test timeout mechanism in conftest.py.
2+
3+
Uses subprocess because the timeout mechanism calls os._exit(1).
4+
"""
5+
6+
import subprocess
7+
import sys
8+
import textwrap
9+
from pathlib import Path
10+
11+
PROJECT_ROOT = Path(__file__).parent.parent
12+
13+
14+
def _run_pytest(tmp_path: Path, test_code: str, *extra_args: str) -> subprocess.CompletedProcess:
15+
"""Run pytest in a subprocess with a copy of our timeout conftest."""
16+
(tmp_path / "conftest.py").write_text((PROJECT_ROOT / "conftest.py").read_text())
17+
(tmp_path / "pytest.ini").write_text(
18+
"[pytest]\naddopts = --capture=sys --max-worker-restart=0\nmarkers =\n timeout: custom timeout\n"
19+
)
20+
(tmp_path / "test_it.py").write_text(textwrap.dedent(test_code))
21+
return subprocess.run(
22+
[
23+
sys.executable,
24+
"-m",
25+
"pytest",
26+
"-v",
27+
"-n",
28+
"auto",
29+
"--dist",
30+
"loadgroup",
31+
"test_it.py",
32+
*extra_args,
33+
],
34+
capture_output=True,
35+
text=True,
36+
timeout=60,
37+
cwd=str(tmp_path),
38+
)
39+
40+
41+
def test_timeout_prints_test_name_and_stacktrace(tmp_path: Path) -> None:
42+
result = _run_pytest(
43+
tmp_path,
44+
"""\
45+
import time
46+
import pytest
47+
48+
@pytest.mark.timeout(2)
49+
def test_hangs():
50+
time.sleep(999)
51+
""",
52+
)
53+
assert result.returncode != 0
54+
assert "TIMEOUT: test_it.py::test_hangs exceeded 2.0s" in result.stderr
55+
assert "test_hangs" in result.stderr
56+
57+
58+
def test_fast_test_not_killed(tmp_path: Path) -> None:
59+
result = _run_pytest(
60+
tmp_path,
61+
"""\
62+
import pytest
63+
64+
@pytest.mark.timeout(10)
65+
def test_fast():
66+
assert True
67+
""",
68+
)
69+
assert result.returncode == 0
70+
assert "TIMEOUT" not in result.stderr
71+
72+
73+
def test_timeout_with_passing_and_hanging_test(tmp_path: Path) -> None:
74+
result = _run_pytest(
75+
tmp_path,
76+
"""\
77+
import time
78+
import pytest
79+
80+
@pytest.mark.timeout(2)
81+
def test_hangs_in_worker():
82+
time.sleep(999)
83+
84+
def test_passes():
85+
assert True
86+
""",
87+
)
88+
assert result.returncode != 0
89+
assert "TIMEOUT: test_it.py::test_hangs_in_worker exceeded 2.0s" in result.stderr
90+
assert "test_hangs_in_worker" in result.stderr

0 commit comments

Comments
 (0)