-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathtest_kafka_publish_input.py
More file actions
145 lines (120 loc) · 5.4 KB
/
test_kafka_publish_input.py
File metadata and controls
145 lines (120 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import unittest
from conductor.client.http.api_client import ApiClient
from conductor.client.workflow.task.kafka_publish_input import \
KafkaPublishInput
class TestKafkaPublishInput(unittest.TestCase):
"""Integration tests for KafkaPublishInput with API client serialization."""
def setUp(self):
"""Set up test fixtures."""
self.api_client = ApiClient()
self.sample_kafka_input = KafkaPublishInput(
bootstrap_servers="kafka-broker:29092",
key="test-key",
key_serializer="org.apache.kafka.common.serialization.StringSerializer",
value='{"test": "data"}',
request_timeout_ms="30000",
max_block_ms="60000",
headers={"content-type": "application/json"},
topic="test-topic",
)
def test_kafka_publish_input_serialization_structure(self):
"""Test that serialized KafkaPublishInput has the correct structure."""
serialized = self.api_client.sanitize_for_serialization(self.sample_kafka_input)
expected_keys = [
"bootStrapServers",
"key",
"keySerializer",
"value",
"requestTimeoutMs",
"maxBlockMs",
"headers",
"topic",
]
for key in expected_keys:
self.assertIn(key, serialized, f"Missing key '{key}' in serialized output")
self.assertEqual(serialized["bootStrapServers"], "kafka-broker:29092")
self.assertEqual(serialized["key"], "test-key")
self.assertEqual(
serialized["keySerializer"],
"org.apache.kafka.common.serialization.StringSerializer",
)
self.assertEqual(serialized["value"], '{"test": "data"}')
self.assertEqual(serialized["requestTimeoutMs"], "30000")
self.assertEqual(serialized["maxBlockMs"], "60000")
self.assertEqual(serialized["headers"], {"content-type": "application/json"})
self.assertEqual(serialized["topic"], "test-topic")
def test_kafka_publish_input_with_none_values_serialization(self):
"""Test that KafkaPublishInput with None values serializes correctly."""
kafka_input = KafkaPublishInput(
bootstrap_servers="kafka:9092", topic="test-topic"
)
serialized = self.api_client.sanitize_for_serialization(kafka_input)
self.assertEqual(serialized["bootStrapServers"], "kafka:9092")
self.assertEqual(serialized["topic"], "test-topic")
self.assertNotIn("key", serialized)
self.assertNotIn("keySerializer", serialized)
self.assertNotIn("value", serialized)
self.assertNotIn("requestTimeoutMs", serialized)
self.assertNotIn("maxBlockMs", serialized)
self.assertNotIn("headers", serialized)
def test_kafka_publish_input_complex_headers_serialization(self):
"""Test that KafkaPublishInput with complex headers serializes correctly."""
complex_headers = {
"content-type": "application/json",
"correlation-id": "test-123",
"user-agent": "conductor-python-sdk",
"custom-header": "custom-value",
}
kafka_input = KafkaPublishInput(
bootstrap_servers="kafka:9092",
headers=complex_headers,
topic="complex-topic",
value='{"complex": "data"}',
)
serialized = self.api_client.sanitize_for_serialization(kafka_input)
self.assertEqual(serialized["headers"], complex_headers)
self.assertEqual(serialized["bootStrapServers"], "kafka:9092")
self.assertEqual(serialized["topic"], "complex-topic")
self.assertEqual(serialized["value"], '{"complex": "data"}')
def test_kafka_publish_input_swagger_types_consistency(self):
"""Test that swagger_types are consistent with actual serialization."""
swagger_types = KafkaPublishInput.swagger_types
kafka_input = KafkaPublishInput(
bootstrap_servers="test",
key="test",
key_serializer="test",
value="test",
request_timeout_ms="test",
max_block_ms="test",
headers={"test": "test"},
topic="test",
)
serialized = self.api_client.sanitize_for_serialization(kafka_input)
for internal_attr, expected_type in swagger_types.items():
external_attr = KafkaPublishInput.attribute_map[internal_attr]
self.assertIn(
external_attr,
serialized,
f"Swagger type '{internal_attr}' not found in serialized output",
)
def test_kafka_publish_input_attribute_map_consistency(self):
"""Test that attribute_map correctly maps all internal attributes."""
kafka_input = self.sample_kafka_input
internal_attrs = [
attr
for attr in dir(kafka_input)
if attr.startswith("_") and not attr.startswith("__")
]
for attr in internal_attrs:
self.assertIn(
attr,
KafkaPublishInput.attribute_map,
f"Internal attribute '{attr}' not found in attribute_map",
)
for internal_attr in KafkaPublishInput.attribute_map.keys():
self.assertTrue(
hasattr(kafka_input, internal_attr),
f"Attribute_map key '{internal_attr}' not found in instance",
)
if __name__ == "__main__":
unittest.main()