Skip to content

Commit 4708948

Browse files
Jdubricktisnik
authored andcommitted
clear test registry with dedicated user ids and request ids
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
1 parent d8c3585 commit 4708948

2 files changed

Lines changed: 52 additions & 43 deletions

File tree

tests/integration/endpoints/test_stream_interrupt_integration.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,35 @@
1010
from models.requests import StreamingInterruptRequest
1111
from utils.stream_interrupts import StreamInterruptRegistry
1212

13+
TEST_REQUEST_ID = "123e4567-e89b-12d3-a456-426614174003"
14+
OWNER_USER_ID = "00000001-0001-0001-0001-000000000001"
15+
1316

1417
@pytest.fixture(name="registry")
1518
def registry_fixture() -> Generator[StreamInterruptRegistry, None, None]:
16-
"""Provide the shared registry, cleared before and after each test."""
19+
"""Provide singleton registry with deterministic per-test cleanup."""
1720
registry = StreamInterruptRegistry()
18-
registry._streams.clear()
21+
registry.deregister_stream(TEST_REQUEST_ID)
1922
yield registry
20-
registry._streams.clear()
23+
registry.deregister_stream(TEST_REQUEST_ID)
2124

2225

2326
@pytest.mark.asyncio
2427
async def test_stream_interrupt_full_round_trip(
2528
registry: StreamInterruptRegistry,
2629
) -> None:
2730
"""Full lifecycle: register, interrupt, then verify deregistration."""
28-
request_id = "123e4567-e89b-12d3-a456-426614174003"
29-
user_id = "00000001-0001-0001-0001-000000000001"
30-
3131
async def pending_stream() -> None:
3232
await asyncio.sleep(10)
3333

3434
task = asyncio.create_task(pending_stream())
35-
registry.register_stream(request_id, user_id, task)
35+
registry.register_stream(TEST_REQUEST_ID, OWNER_USER_ID, task)
3636

37-
assert registry.get_stream(request_id) is not None
37+
assert registry.get_stream(TEST_REQUEST_ID) is not None
3838

3939
response = await stream_interrupt_endpoint_handler(
40-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
41-
auth=(user_id, "mock_username", False, "mock_token"),
40+
interrupt_request=StreamingInterruptRequest(request_id=TEST_REQUEST_ID),
41+
auth=(OWNER_USER_ID, "mock_username", False, "mock_token"),
4242
registry=registry,
4343
)
4444
assert response.interrupted is True
@@ -47,19 +47,19 @@ async def pending_stream() -> None:
4747
await task
4848

4949
completed_response = await stream_interrupt_endpoint_handler(
50-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
51-
auth=(user_id, "mock_username", False, "mock_token"),
50+
interrupt_request=StreamingInterruptRequest(request_id=TEST_REQUEST_ID),
51+
auth=(OWNER_USER_ID, "mock_username", False, "mock_token"),
5252
registry=registry,
5353
)
5454
assert completed_response.interrupted is False
5555

56-
registry.deregister_stream(request_id)
57-
assert registry.get_stream(request_id) is None
56+
registry.deregister_stream(TEST_REQUEST_ID)
57+
assert registry.get_stream(TEST_REQUEST_ID) is None
5858

5959
with pytest.raises(HTTPException) as exc_info:
6060
await stream_interrupt_endpoint_handler(
61-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
62-
auth=(user_id, "mock_username", False, "mock_token"),
61+
interrupt_request=StreamingInterruptRequest(request_id=TEST_REQUEST_ID),
62+
auth=(OWNER_USER_ID, "mock_username", False, "mock_token"),
6363
registry=registry,
6464
)
6565
assert exc_info.value.status_code == 404

tests/unit/app/endpoints/test_stream_interrupt.py

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,52 @@
1111
from models.responses import StreamingInterruptResponse
1212
from utils.stream_interrupts import StreamInterruptRegistry
1313

14+
REQUEST_ID_SUCCESS = "123e4567-e89b-12d3-a456-426614174000"
15+
REQUEST_ID_NOT_FOUND = "123e4567-e89b-12d3-a456-426614174001"
16+
REQUEST_ID_WRONG_USER = "123e4567-e89b-12d3-a456-426614174002"
17+
REQUEST_ID_ALREADY_COMPLETED = "123e4567-e89b-12d3-a456-426614174004"
18+
19+
OWNER_USER_ID = "00000001-0001-0001-0001-000000000001"
20+
NON_OWNER_USER_ID = "00000001-0001-0001-0001-000000000999"
21+
22+
TEST_REQUEST_IDS = (
23+
REQUEST_ID_SUCCESS,
24+
REQUEST_ID_NOT_FOUND,
25+
REQUEST_ID_WRONG_USER,
26+
REQUEST_ID_ALREADY_COMPLETED,
27+
)
28+
1429

1530
@pytest.fixture(name="registry")
1631
def registry_fixture() -> Generator[StreamInterruptRegistry, None, None]:
17-
"""Provide the shared registry, cleared before and after each test."""
32+
"""Provide singleton registry with deterministic per-test cleanup."""
1833
registry = StreamInterruptRegistry()
19-
registry._streams.clear()
34+
for request_id in TEST_REQUEST_IDS:
35+
registry.deregister_stream(request_id)
2036
yield registry
21-
registry._streams.clear()
37+
for request_id in TEST_REQUEST_IDS:
38+
registry.deregister_stream(request_id)
2239

2340

2441
@pytest.mark.asyncio
2542
async def test_stream_interrupt_endpoint_success(
2643
registry: StreamInterruptRegistry,
2744
) -> None:
2845
"""Interrupt endpoint cancels an active stream for the same user."""
29-
request_id = "123e4567-e89b-12d3-a456-426614174000"
30-
user_id = "00000001-0001-0001-0001-000000000001"
31-
3246
async def pending_stream() -> None:
3347
await asyncio.sleep(10)
3448

3549
task = asyncio.create_task(pending_stream())
36-
registry.register_stream(request_id, user_id, task)
50+
registry.register_stream(REQUEST_ID_SUCCESS, OWNER_USER_ID, task)
3751

3852
response = await stream_interrupt_endpoint_handler(
39-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
40-
auth=(user_id, "mock_username", False, "mock_token"),
53+
interrupt_request=StreamingInterruptRequest(request_id=REQUEST_ID_SUCCESS),
54+
auth=(OWNER_USER_ID, "mock_username", False, "mock_token"),
4155
registry=registry,
4256
)
4357

4458
assert isinstance(response, StreamingInterruptResponse)
45-
assert response.request_id == request_id
59+
assert response.request_id == REQUEST_ID_SUCCESS
4660
assert response.interrupted is True
4761

4862
with pytest.raises(asyncio.CancelledError):
@@ -54,13 +68,11 @@ async def test_stream_interrupt_endpoint_not_found(
5468
registry: StreamInterruptRegistry,
5569
) -> None:
5670
"""Interrupt endpoint returns 404 for unknown request id."""
57-
request_id = "123e4567-e89b-12d3-a456-426614174001"
58-
5971
with pytest.raises(HTTPException) as exc_info:
6072
await stream_interrupt_endpoint_handler(
61-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
73+
interrupt_request=StreamingInterruptRequest(request_id=REQUEST_ID_NOT_FOUND),
6274
auth=(
63-
"00000001-0001-0001-0001-000000000001",
75+
OWNER_USER_ID,
6476
"mock_username",
6577
False,
6678
"mock_token",
@@ -76,23 +88,21 @@ async def test_stream_interrupt_endpoint_wrong_user(
7688
registry: StreamInterruptRegistry,
7789
) -> None:
7890
"""Interrupt endpoint does not cancel streams owned by other users."""
79-
request_id = "123e4567-e89b-12d3-a456-426614174002"
80-
8191
async def pending_stream() -> None:
8292
await asyncio.sleep(10)
8393

8494
task = asyncio.create_task(pending_stream())
8595
registry.register_stream(
86-
request_id=request_id,
87-
user_id="00000001-0001-0001-0001-000000000001",
96+
request_id=REQUEST_ID_WRONG_USER,
97+
user_id=OWNER_USER_ID,
8898
task=task,
8999
)
90100

91101
with pytest.raises(HTTPException) as exc_info:
92102
await stream_interrupt_endpoint_handler(
93-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
103+
interrupt_request=StreamingInterruptRequest(request_id=REQUEST_ID_WRONG_USER),
94104
auth=(
95-
"00000001-0001-0001-0001-000000000999",
105+
NON_OWNER_USER_ID,
96106
"mock_username",
97107
False,
98108
"mock_token",
@@ -113,22 +123,21 @@ async def test_stream_interrupt_endpoint_already_completed(
113123
registry: StreamInterruptRegistry,
114124
) -> None:
115125
"""Interrupt endpoint reports already-completed streams without error."""
116-
request_id = "123e4567-e89b-12d3-a456-426614174004"
117-
user_id = "00000001-0001-0001-0001-000000000001"
118-
119126
async def completed_stream() -> None:
120127
return None
121128

122129
task = asyncio.create_task(completed_stream())
123130
await task
124-
registry.register_stream(request_id, user_id, task)
131+
registry.register_stream(REQUEST_ID_ALREADY_COMPLETED, OWNER_USER_ID, task)
125132

126133
response = await stream_interrupt_endpoint_handler(
127-
interrupt_request=StreamingInterruptRequest(request_id=request_id),
128-
auth=(user_id, "mock_username", False, "mock_token"),
134+
interrupt_request=StreamingInterruptRequest(
135+
request_id=REQUEST_ID_ALREADY_COMPLETED
136+
),
137+
auth=(OWNER_USER_ID, "mock_username", False, "mock_token"),
129138
registry=registry,
130139
)
131140

132141
assert isinstance(response, StreamingInterruptResponse)
133-
assert response.request_id == request_id
142+
assert response.request_id == REQUEST_ID_ALREADY_COMPLETED
134143
assert response.interrupted is False

0 commit comments

Comments
 (0)