Skip to content

Commit a59ae1c

Browse files
KafkaPublishInput missing swagger attributes (#306)
1 parent 6e4d386 commit a59ae1c

3 files changed

Lines changed: 178 additions & 9 deletions

File tree

src/conductor/client/workflow/task/kafka_publish_input.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,39 @@
55

66

77
class KafkaPublishInput:
8-
def __init__(self,
9-
bootstrap_servers: str = None,
10-
key: str = None,
11-
key_serializer: str = None,
12-
value: str = None,
13-
request_timeout_ms: str = None,
14-
max_block_ms: str = None,
15-
headers: Dict[str, Any] = None,
16-
topic: str = None) -> Self:
8+
swagger_types = {
9+
"_bootstrap_servers": "str",
10+
"_key": "str",
11+
"_key_serializer": "str",
12+
"_value": "str",
13+
"_request_timeout_ms": "str",
14+
"_max_block_ms": "str",
15+
"_headers": "dict[str, Any]",
16+
"_topic": "str",
17+
}
18+
19+
attribute_map = {
20+
"_bootstrap_servers": "bootStrapServers",
21+
"_key": "key",
22+
"_key_serializer": "keySerializer",
23+
"_value": "value",
24+
"_request_timeout_ms": "requestTimeoutMs",
25+
"_max_block_ms": "maxBlockMs",
26+
"_headers": "headers",
27+
"_topic": "topic",
28+
}
29+
30+
def __init__(
31+
self,
32+
bootstrap_servers: str = None,
33+
key: str = None,
34+
key_serializer: str = None,
35+
value: str = None,
36+
request_timeout_ms: str = None,
37+
max_block_ms: str = None,
38+
headers: Dict[str, Any] = None,
39+
topic: str = None,
40+
) -> Self:
1741
self._bootstrap_servers = deepcopy(bootstrap_servers)
1842
self._key = deepcopy(key)
1943
self._key_serializer = deepcopy(key_serializer)

tests/unit/workflow/__init__.py

Whitespace-only changes.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import unittest
2+
3+
from conductor.client.http.api_client import ApiClient
4+
from conductor.client.workflow.task.kafka_publish_input import \
5+
KafkaPublishInput
6+
7+
8+
class TestKafkaPublishInput(unittest.TestCase):
9+
"""Integration tests for KafkaPublishInput with API client serialization."""
10+
11+
def setUp(self):
12+
"""Set up test fixtures."""
13+
self.api_client = ApiClient()
14+
self.sample_kafka_input = KafkaPublishInput(
15+
bootstrap_servers="kafka-broker:29092",
16+
key="test-key",
17+
key_serializer="org.apache.kafka.common.serialization.StringSerializer",
18+
value='{"test": "data"}',
19+
request_timeout_ms="30000",
20+
max_block_ms="60000",
21+
headers={"content-type": "application/json"},
22+
topic="test-topic",
23+
)
24+
25+
def test_kafka_publish_input_serialization_structure(self):
26+
"""Test that serialized KafkaPublishInput has the correct structure."""
27+
serialized = self.api_client.sanitize_for_serialization(self.sample_kafka_input)
28+
29+
expected_keys = [
30+
"bootStrapServers",
31+
"key",
32+
"keySerializer",
33+
"value",
34+
"requestTimeoutMs",
35+
"maxBlockMs",
36+
"headers",
37+
"topic",
38+
]
39+
40+
for key in expected_keys:
41+
self.assertIn(key, serialized, f"Missing key '{key}' in serialized output")
42+
43+
self.assertEqual(serialized["bootStrapServers"], "kafka-broker:29092")
44+
self.assertEqual(serialized["key"], "test-key")
45+
self.assertEqual(
46+
serialized["keySerializer"],
47+
"org.apache.kafka.common.serialization.StringSerializer",
48+
)
49+
self.assertEqual(serialized["value"], '{"test": "data"}')
50+
self.assertEqual(serialized["requestTimeoutMs"], "30000")
51+
self.assertEqual(serialized["maxBlockMs"], "60000")
52+
self.assertEqual(serialized["headers"], {"content-type": "application/json"})
53+
self.assertEqual(serialized["topic"], "test-topic")
54+
55+
def test_kafka_publish_input_with_none_values_serialization(self):
56+
"""Test that KafkaPublishInput with None values serializes correctly."""
57+
kafka_input = KafkaPublishInput(
58+
bootstrap_servers="kafka:9092", topic="test-topic"
59+
)
60+
61+
serialized = self.api_client.sanitize_for_serialization(kafka_input)
62+
63+
self.assertEqual(serialized["bootStrapServers"], "kafka:9092")
64+
self.assertEqual(serialized["topic"], "test-topic")
65+
66+
self.assertNotIn("key", serialized)
67+
self.assertNotIn("keySerializer", serialized)
68+
self.assertNotIn("value", serialized)
69+
self.assertNotIn("requestTimeoutMs", serialized)
70+
self.assertNotIn("maxBlockMs", serialized)
71+
self.assertNotIn("headers", serialized)
72+
73+
def test_kafka_publish_input_complex_headers_serialization(self):
74+
"""Test that KafkaPublishInput with complex headers serializes correctly."""
75+
complex_headers = {
76+
"content-type": "application/json",
77+
"correlation-id": "test-123",
78+
"user-agent": "conductor-python-sdk",
79+
"custom-header": "custom-value",
80+
}
81+
82+
kafka_input = KafkaPublishInput(
83+
bootstrap_servers="kafka:9092",
84+
headers=complex_headers,
85+
topic="complex-topic",
86+
value='{"complex": "data"}',
87+
)
88+
89+
serialized = self.api_client.sanitize_for_serialization(kafka_input)
90+
91+
self.assertEqual(serialized["headers"], complex_headers)
92+
self.assertEqual(serialized["bootStrapServers"], "kafka:9092")
93+
self.assertEqual(serialized["topic"], "complex-topic")
94+
self.assertEqual(serialized["value"], '{"complex": "data"}')
95+
96+
def test_kafka_publish_input_swagger_types_consistency(self):
97+
"""Test that swagger_types are consistent with actual serialization."""
98+
swagger_types = KafkaPublishInput.swagger_types
99+
100+
kafka_input = KafkaPublishInput(
101+
bootstrap_servers="test",
102+
key="test",
103+
key_serializer="test",
104+
value="test",
105+
request_timeout_ms="test",
106+
max_block_ms="test",
107+
headers={"test": "test"},
108+
topic="test",
109+
)
110+
111+
serialized = self.api_client.sanitize_for_serialization(kafka_input)
112+
113+
for internal_attr, expected_type in swagger_types.items():
114+
external_attr = KafkaPublishInput.attribute_map[internal_attr]
115+
self.assertIn(
116+
external_attr,
117+
serialized,
118+
f"Swagger type '{internal_attr}' not found in serialized output",
119+
)
120+
121+
def test_kafka_publish_input_attribute_map_consistency(self):
122+
"""Test that attribute_map correctly maps all internal attributes."""
123+
kafka_input = self.sample_kafka_input
124+
internal_attrs = [
125+
attr
126+
for attr in dir(kafka_input)
127+
if attr.startswith("_") and not attr.startswith("__")
128+
]
129+
130+
for attr in internal_attrs:
131+
self.assertIn(
132+
attr,
133+
KafkaPublishInput.attribute_map,
134+
f"Internal attribute '{attr}' not found in attribute_map",
135+
)
136+
137+
for internal_attr in KafkaPublishInput.attribute_map.keys():
138+
self.assertTrue(
139+
hasattr(kafka_input, internal_attr),
140+
f"Attribute_map key '{internal_attr}' not found in instance",
141+
)
142+
143+
144+
if __name__ == "__main__":
145+
unittest.main()

0 commit comments

Comments
 (0)