|
1 | 1 | import uuid |
| 2 | +from datetime import datetime |
2 | 3 |
|
3 | 4 | from natsrpy.js import ( |
4 | 5 | AckPolicy, |
@@ -166,3 +167,107 @@ async def test_message_headers_empty(js: JetStream) -> None: |
166 | 167 | assert isinstance(messages[0].headers, dict) |
167 | 168 | finally: |
168 | 169 | await js.streams.delete(stream_name) |
| 170 | + |
| 171 | + |
| 172 | +async def test_message_stream_sequence_and_consumer_sequence(js: JetStream) -> None: |
| 173 | + stream_name = f"test-seqs-{uuid.uuid4().hex[:8]}" |
| 174 | + subj = f"{stream_name}.data" |
| 175 | + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) |
| 176 | + stream = await js.streams.create(config) |
| 177 | + try: |
| 178 | + await js.publish(subj, b"seq-msg", wait=True) |
| 179 | + consumer = await stream.consumers.create( |
| 180 | + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), |
| 181 | + ) |
| 182 | + messages = await consumer.fetch(max_messages=1, timeout=5.0) |
| 183 | + assert len(messages) == 1 |
| 184 | + msg = messages[0] |
| 185 | + assert isinstance(msg.stream_sequence, int) |
| 186 | + assert msg.stream_sequence >= 1 |
| 187 | + assert isinstance(msg.consumer_sequence, int) |
| 188 | + assert msg.consumer_sequence >= 1 |
| 189 | + finally: |
| 190 | + await js.streams.delete(stream_name) |
| 191 | + |
| 192 | + |
| 193 | +async def test_message_consumer_and_stream_names(js: JetStream) -> None: |
| 194 | + stream_name = f"test-names-{uuid.uuid4().hex[:8]}" |
| 195 | + subj = f"{stream_name}.data" |
| 196 | + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) |
| 197 | + stream = await js.streams.create(config) |
| 198 | + try: |
| 199 | + await js.publish(subj, b"names-msg", wait=True) |
| 200 | + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" |
| 201 | + consumer = await stream.consumers.create( |
| 202 | + PullConsumerConfig(name=consumer_name), |
| 203 | + ) |
| 204 | + messages = await consumer.fetch(max_messages=1, timeout=5.0) |
| 205 | + assert len(messages) == 1 |
| 206 | + msg = messages[0] |
| 207 | + assert msg.stream == stream_name |
| 208 | + assert msg.consumer == consumer_name |
| 209 | + finally: |
| 210 | + await js.streams.delete(stream_name) |
| 211 | + |
| 212 | + |
| 213 | +async def test_message_delivered_and_pending(js: JetStream) -> None: |
| 214 | + stream_name = f"test-delpend-{uuid.uuid4().hex[:8]}" |
| 215 | + subj = f"{stream_name}.data" |
| 216 | + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) |
| 217 | + stream = await js.streams.create(config) |
| 218 | + try: |
| 219 | + await js.publish(subj, b"msg-1", wait=True) |
| 220 | + await js.publish(subj, b"msg-2", wait=True) |
| 221 | + consumer = await stream.consumers.create( |
| 222 | + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), |
| 223 | + ) |
| 224 | + messages = await consumer.fetch(max_messages=1, timeout=5.0) |
| 225 | + assert len(messages) == 1 |
| 226 | + msg = messages[0] |
| 227 | + assert isinstance(msg.delivered, int) |
| 228 | + assert msg.delivered >= 1 |
| 229 | + assert isinstance(msg.pending, int) |
| 230 | + assert msg.pending >= 0 |
| 231 | + await msg.ack() |
| 232 | + finally: |
| 233 | + await js.streams.delete(stream_name) |
| 234 | + |
| 235 | + |
| 236 | +async def test_message_published_timestamp(js: JetStream) -> None: |
| 237 | + stream_name = f"test-pub-ts-{uuid.uuid4().hex[:8]}" |
| 238 | + subj = f"{stream_name}.data" |
| 239 | + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) |
| 240 | + stream = await js.streams.create(config) |
| 241 | + try: |
| 242 | + await js.publish(subj, b"ts-msg", wait=True) |
| 243 | + consumer = await stream.consumers.create( |
| 244 | + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), |
| 245 | + ) |
| 246 | + messages = await consumer.fetch(max_messages=1, timeout=5.0) |
| 247 | + assert len(messages) == 1 |
| 248 | + msg = messages[0] |
| 249 | + assert isinstance(msg.published, datetime) |
| 250 | + finally: |
| 251 | + await js.streams.delete(stream_name) |
| 252 | + |
| 253 | + |
| 254 | +async def test_message_length_and_dunder_len(js: JetStream) -> None: |
| 255 | + stream_name = f"test-msglen-{uuid.uuid4().hex[:8]}" |
| 256 | + subj = f"{stream_name}.data" |
| 257 | + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) |
| 258 | + stream = await js.streams.create(config) |
| 259 | + try: |
| 260 | + payload = b"length-test-payload" |
| 261 | + await js.publish(subj, payload, wait=True) |
| 262 | + consumer = await stream.consumers.create( |
| 263 | + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), |
| 264 | + ) |
| 265 | + messages = await consumer.fetch(max_messages=1, timeout=5.0) |
| 266 | + assert len(messages) == 1 |
| 267 | + msg = messages[0] |
| 268 | + assert isinstance(msg.length, int) |
| 269 | + assert msg.length >= len(payload) |
| 270 | + assert len(msg) == msg.length |
| 271 | + await msg.ack() |
| 272 | + finally: |
| 273 | + await js.streams.delete(stream_name) |
0 commit comments