|
17 | 17 | from streamstore.utils import metered_bytes |
18 | 18 |
|
19 | 19 |
|
20 | | -@pytest.mark.asyncio(loop_scope="session") |
21 | 20 | @pytest.mark.stream |
22 | 21 | class TestStreamOperations: |
23 | 22 | async def test_check_tail_empty_stream(self, stream: Stream): |
@@ -64,24 +63,24 @@ async def test_append_with_match_seq_num(self, stream: Stream): |
64 | 63 | assert output_1.next_seq_num == 2 |
65 | 64 |
|
66 | 65 | async def test_append_with_timestamp(self, stream: Stream): |
67 | | - timestamp_1 = int(time.time()) |
| 66 | + timestamp_0 = int(time.time()) |
68 | 67 | await asyncio.sleep(0.1) |
69 | | - timestamp_2 = int(time.time()) |
| 68 | + timestamp_1 = int(time.time()) |
70 | 69 |
|
71 | 70 | input = AppendInput( |
72 | 71 | records=[ |
73 | | - Record(body=b"record-0", timestamp=timestamp_1), |
74 | | - Record(body=b"record-1", timestamp=timestamp_2), |
| 72 | + Record(body=b"record-0", timestamp=timestamp_0), |
| 73 | + Record(body=b"record-1", timestamp=timestamp_1), |
75 | 74 | ] |
76 | 75 | ) |
77 | 76 | output = await stream.append(input) |
78 | 77 |
|
79 | 78 | assert output.start_seq_num == 0 |
80 | | - assert output.start_timestamp == timestamp_1 |
| 79 | + assert output.start_timestamp == timestamp_0 |
81 | 80 | assert output.end_seq_num == 2 |
82 | | - assert output.end_timestamp == timestamp_2 |
| 81 | + assert output.end_timestamp == timestamp_1 |
83 | 82 | assert output.next_seq_num == 2 |
84 | | - assert output.last_timestamp == timestamp_2 |
| 83 | + assert output.last_timestamp == timestamp_1 |
85 | 84 |
|
86 | 85 | async def test_read_from_seq_num_zero(self, stream: Stream): |
87 | 86 | await stream.append( |
@@ -133,6 +132,29 @@ async def test_read_from_tail_offset(self, stream: Stream): |
133 | 132 | assert records[0].body == b"record-3" |
134 | 133 | assert records[1].body == b"record-4" |
135 | 134 |
|
| 135 | + async def test_read_until_timestamp(self, stream: Stream): |
| 136 | + timestamp_0 = int(time.time() * 1000) |
| 137 | + await asyncio.sleep(0.2) |
| 138 | + timestamp_1 = int(time.time() * 1000) |
| 139 | + await asyncio.sleep(0.2) |
| 140 | + timestamp_2 = int(time.time() * 1000) |
| 141 | + |
| 142 | + await stream.append( |
| 143 | + AppendInput( |
| 144 | + records=[ |
| 145 | + Record(body=b"record-0", timestamp=timestamp_0), |
| 146 | + Record(body=b"record-1", timestamp=timestamp_1), |
| 147 | + Record(body=b"record-2", timestamp=timestamp_2), |
| 148 | + ] |
| 149 | + ) |
| 150 | + ) |
| 151 | + |
| 152 | + records = await stream.read(start=Timestamp(timestamp_0), until=timestamp_2) |
| 153 | + assert isinstance(records, list) |
| 154 | + assert len(records) == 2 |
| 155 | + assert records[0].timestamp == timestamp_0 |
| 156 | + assert records[1].timestamp == timestamp_1 |
| 157 | + |
136 | 158 | async def test_read_beyond_tail(self, stream: Stream): |
137 | 159 | await stream.append( |
138 | 160 | AppendInput(records=[Record(body=f"record-{i}".encode()) for i in range(5)]) |
@@ -188,7 +210,9 @@ async def producer(): |
188 | 210 | producer_task = asyncio.create_task(producer()) |
189 | 211 |
|
190 | 212 | try: |
191 | | - async for output in stream.read_session(start=SeqNum(tail.next_seq_num)): |
| 213 | + async for output in stream.read_session( |
| 214 | + start=SeqNum(tail.next_seq_num), clamp=True |
| 215 | + ): |
192 | 216 | if isinstance(output, list) and len(output) > 0: |
193 | 217 | assert output[0].body == b"record-0" |
194 | 218 | break |
|
0 commit comments