Skip to content

Commit 2b20b67

Browse files
committed
tests for log example
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 3dea78e commit 2b20b67

1 file changed

Lines changed: 130 additions & 0 deletions

File tree

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
"""Tests for the log sink example handlers."""
2+
3+
import pytest
4+
5+
from example import UserDefinedSink, udsink_handler
6+
from pynumaflow.sinker.testing import SinkerTestKit, datum
7+
8+
9+
@pytest.fixture(scope="module")
10+
def sinker_kit():
11+
with SinkerTestKit(udsink_handler) as kit:
12+
yield kit
13+
14+
15+
class TestFuncHandler:
16+
def test_is_ready(self, sinker_kit):
17+
with sinker_kit.client() as c:
18+
assert c.is_ready()
19+
20+
def test_single_datum(self, sinker_kit):
21+
with sinker_kit.client() as c:
22+
result = c.send_request(datum(b"hello"))
23+
assert len(result) == 1
24+
result.assert_all_success()
25+
26+
def test_multiple_datums(self, sinker_kit):
27+
with sinker_kit.client() as c:
28+
result = c.send_request(
29+
datum(b"msg-1"),
30+
datum(b"msg-2"),
31+
datum(b"msg-3"),
32+
)
33+
assert len(result) == 3
34+
result.assert_all_success()
35+
result.assert_counts(success=3, failed=0, fallback=0)
36+
37+
def test_str_payload(self, sinker_kit):
38+
with sinker_kit.client() as c:
39+
result = c.send_request(datum("string payload"))
40+
result.assert_all_success()
41+
42+
def test_dict_payload(self, sinker_kit):
43+
with sinker_kit.client() as c:
44+
result = c.send_request(datum({"event": "click", "count": 1}))
45+
result.assert_all_success()
46+
47+
def test_with_keys(self, sinker_kit):
48+
with sinker_kit.client() as c:
49+
result = c.send_request(datum(b"keyed", keys=["k1", "k2"]))
50+
result.assert_all_success()
51+
52+
def test_with_headers(self, sinker_kit):
53+
with sinker_kit.client() as c:
54+
result = c.send_request(
55+
datum(b"with-headers", headers={"x-trace-id": "abc123"})
56+
)
57+
result.assert_all_success()
58+
59+
def test_auto_coerce(self, sinker_kit):
60+
"""send_request accepts raw bytes/str/dict without wrapping in datum()."""
61+
with sinker_kit.client() as c:
62+
result = c.send_request(b"raw", "text", {"k": "v"})
63+
assert len(result) == 3
64+
result.assert_all_success()
65+
66+
def test_large_batch(self, sinker_kit):
67+
with sinker_kit.client() as c:
68+
result = c.send_request(*[datum(f"item-{i}") for i in range(50)])
69+
assert len(result) == 50
70+
result.assert_all_success()
71+
72+
73+
# ---------------------------------------------------------------------------
74+
# Class-based handler (UserDefinedSink)
75+
# ---------------------------------------------------------------------------
76+
77+
@pytest.fixture(scope="module")
78+
def class_kit():
79+
with SinkerTestKit(UserDefinedSink()) as kit:
80+
yield kit
81+
82+
83+
class TestClassHandler:
84+
def test_is_ready(self, class_kit):
85+
with class_kit.client() as c:
86+
assert c.is_ready()
87+
88+
def test_single_datum(self, class_kit):
89+
with class_kit.client() as c:
90+
result = c.send_request(datum(b"hello"))
91+
assert len(result) == 1
92+
result.assert_all_success()
93+
94+
def test_multiple_datums(self, class_kit):
95+
with class_kit.client() as c:
96+
result = c.send_request(
97+
datum(b"msg-1"),
98+
datum(b"msg-2"),
99+
datum(b"msg-3"),
100+
)
101+
assert len(result) == 3
102+
result.assert_all_success()
103+
result.assert_counts(success=3, failed=0, fallback=0)
104+
105+
def test_str_payload(self, class_kit):
106+
with class_kit.client() as c:
107+
result = c.send_request(datum("string payload"))
108+
result.assert_all_success()
109+
110+
def test_dict_payload(self, class_kit):
111+
with class_kit.client() as c:
112+
result = c.send_request(datum({"event": "click", "count": 1}))
113+
result.assert_all_success()
114+
115+
def test_with_keys(self, class_kit):
116+
with class_kit.client() as c:
117+
result = c.send_request(datum(b"keyed", keys=["k1", "k2"]))
118+
result.assert_all_success()
119+
120+
def test_auto_coerce(self, class_kit):
121+
with class_kit.client() as c:
122+
result = c.send_request(b"raw", "text", {"k": "v"})
123+
assert len(result) == 3
124+
result.assert_all_success()
125+
126+
def test_large_batch(self, class_kit):
127+
with class_kit.client() as c:
128+
result = c.send_request(*[datum(f"item-{i}") for i in range(50)])
129+
assert len(result) == 50
130+
result.assert_all_success()

0 commit comments

Comments
 (0)