Skip to content

Commit 9950260

Browse files
authored
test: add basic correctness test (#51)
1 parent 89ba250 commit 9950260

5 files changed

Lines changed: 102 additions & 12 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
- name: Code quality check
2525
run: uv run poe cq-check
2626
- name: Unit tests
27-
run: uv run pytest tests/ -v -m 'not (account or basin or stream or metrics)'
27+
run: uv run pytest tests/ -v -m 'not (account or basin or stream or metrics or correctness)'
2828
- name: Check docs build
2929
working-directory: ./docs
3030
run: |

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@ e2e-tests = "uv run pytest tests/ -v -s -m 'account or basin or stream'"
7575
e2e-account-tests = "uv run pytest tests/ -v -s -m account"
7676
e2e-basin-tests = "uv run pytest tests/ -v -s -m basin"
7777
e2e-stream-tests = "uv run pytest tests/ -v -s -m stream"
78+
correctness-tests = "uv run pytest tests/ -v -s -m correctness"

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ markers =
1818
stream: tests for stream operations
1919
metrics: tests for metrics operations
2020
access_tokens: tests for access token operations
21+
correctness: correctness tests

tests/conftest.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pytest
66
import pytest_asyncio
77

8-
from s2_sdk import S2, Compression, Endpoints, S2Basin, S2Stream
8+
from s2_sdk import S2, Compression, Endpoints, Retry, S2Basin, S2Stream
99

1010
pytest_plugins = ["pytest_asyncio"]
1111

@@ -50,22 +50,35 @@ def endpoints() -> Endpoints | None:
5050
return None
5151

5252

53+
@pytest.fixture(scope="session")
54+
def retry() -> Retry | None:
55+
return None
56+
57+
5358
@pytest_asyncio.fixture(scope="session")
5459
async def s2(
55-
access_token: str, compression: Compression, endpoints: Endpoints | None
60+
access_token: str,
61+
compression: Compression,
62+
endpoints: Endpoints | None,
63+
retry: Retry | None,
5664
) -> AsyncGenerator[S2, None]:
57-
async with S2(access_token, endpoints=endpoints, compression=compression) as s2:
65+
async with S2(
66+
access_token,
67+
endpoints=endpoints,
68+
compression=compression,
69+
retry=retry,
70+
) as s2:
5871
yield s2
5972

6073

6174
@pytest.fixture
62-
def basin_name() -> str:
63-
return _basin_name()
75+
def basin_name(basin_prefix: str) -> str:
76+
return _basin_name(basin_prefix)
6477

6578

6679
@pytest.fixture
67-
def basin_names() -> list[str]:
68-
return [_basin_name() for _ in range(3)]
80+
def basin_names(basin_prefix: str) -> list[str]:
81+
return [_basin_name(basin_prefix) for _ in range(3)]
6982

7083

7184
@pytest.fixture
@@ -94,8 +107,8 @@ async def basin(s2: S2, basin_name: str) -> AsyncGenerator[S2Basin, None]:
94107

95108

96109
@pytest_asyncio.fixture(scope="class")
97-
async def shared_basin(s2: S2) -> AsyncGenerator[S2Basin, None]:
98-
basin_name = _basin_name()
110+
async def shared_basin(s2: S2, basin_prefix: str) -> AsyncGenerator[S2Basin, None]:
111+
basin_name = _basin_name(basin_prefix)
99112
await s2.create_basin(name=basin_name)
100113

101114
try:
@@ -117,8 +130,12 @@ async def stream(
117130
await basin.delete_stream(stream_name)
118131

119132

120-
def _basin_name() -> str:
121-
return f"{BASIN_PREFIX}-{uuid.uuid4().hex[:8]}"
133+
def _basin_name(prefix: str) -> str:
134+
suffix = uuid.uuid4().hex[:8]
135+
prefix = prefix.strip("-")[: 48 - len(suffix) - 1].strip("-")
136+
if not prefix:
137+
return suffix
138+
return f"{prefix}-{suffix}"
122139

123140

124141
def _stream_name() -> str:

tests/test_correctness.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import asyncio
2+
import sys
3+
4+
import pytest
5+
6+
from s2_sdk import Batching, ReadLimit, Record, Retry, S2Stream, SeqNum
7+
8+
TOTAL_RECORDS = 1024
9+
10+
11+
@pytest.fixture(scope="session")
12+
def retry() -> Retry:
13+
return Retry(max_attempts=sys.maxsize)
14+
15+
16+
@pytest.fixture(scope="session")
17+
def basin_prefix() -> str:
18+
return "python-correctness"
19+
20+
21+
@pytest.mark.correctness
22+
@pytest.mark.asyncio
23+
async def test_concurrent_producer_and_consumer_remain_gapless(stream: S2Stream):
24+
async def read_records() -> None:
25+
highest_contiguous_index = -1
26+
last_seq_num: int | None = None
27+
observed_records = 0
28+
29+
async for batch in stream.read_session(
30+
start=SeqNum(0), limit=ReadLimit(count=TOTAL_RECORDS), wait=60
31+
):
32+
for record in batch.records:
33+
assert observed_records < TOTAL_RECORDS
34+
35+
seq_num = record.seq_num
36+
if last_seq_num is None:
37+
assert seq_num == 0
38+
else:
39+
assert seq_num == last_seq_num + 1
40+
last_seq_num = seq_num
41+
42+
body = record.body.decode()
43+
index = int(body)
44+
assert 0 <= index < TOTAL_RECORDS
45+
assert index <= highest_contiguous_index + 1
46+
47+
if index == highest_contiguous_index + 1:
48+
highest_contiguous_index = index
49+
observed_records += 1
50+
51+
assert highest_contiguous_index == TOTAL_RECORDS - 1
52+
assert last_seq_num == TOTAL_RECORDS - 1
53+
assert observed_records == TOTAL_RECORDS
54+
55+
async def append_records() -> None:
56+
async with stream.producer(batching=Batching(max_records=16)) as producer:
57+
tickets = []
58+
for i in range(TOTAL_RECORDS):
59+
ticket = await producer.submit(Record(body=str(i).encode()))
60+
tickets.append(ticket)
61+
62+
for ticket in tickets:
63+
ack = await ticket
64+
assert ack.seq_num >= 0
65+
66+
async with asyncio.TaskGroup() as task_group:
67+
task_group.create_task(read_records())
68+
task_group.create_task(append_records())
69+
70+
tail = await stream.check_tail()
71+
assert tail.seq_num == TOTAL_RECORDS

0 commit comments

Comments
 (0)