Skip to content

Commit 3822879

Browse files
authored
Admin: re-use config processing for CreateTopicsResponse (#3036)
1 parent 9a8a2e2 commit 3822879

2 files changed

Lines changed: 49 additions & 25 deletions

File tree

kafka/admin/_configs.py

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -104,35 +104,50 @@ def _describe_configs_request(cls, config_resources, include_synonyms=False):
104104
min_version=min_version)
105105

106106
@staticmethod
107-
def _describe_configs_process_responses(responses, config_filter='modified'):
107+
def _get_config_source(config, resource_type):
108+
if 'config_source' in config:
109+
return ConfigSourceType.build_from(config['config_source'])
110+
elif config['read_only'] and resource_type is ConfigResourceType.BROKER:
111+
return ConfigSourceType.STATIC_BROKER_CONFIG
112+
elif config.get('is_default', False):
113+
return ConfigSourceType.DEFAULT_CONFIG
114+
else:
115+
return ConfigSourceType.dynamic_for_resource_type(resource_type)
116+
117+
@classmethod
118+
def _should_skip_config(cls, config, config_filter, resource_type):
119+
if config_filter == ConfigFilterType.DYNAMIC and config['read_only']:
120+
return True
121+
config_source = cls._get_config_source(config, resource_type)
122+
if config_filter.should_skip(config_source):
123+
return True
124+
return False
125+
126+
@classmethod
127+
def _process_config(cls, config, resource_type):
128+
name = config.pop('name')
129+
config_source = cls._get_config_source(config, resource_type)
130+
config['config_source'] = config_source.name
131+
if 'synonyms' in config:
132+
for synonym in config['synonyms']:
133+
synonym['source'] = ConfigSourceType(synonym['source']).name
134+
if 'config_type' in config:
135+
config['config_type'] = ConfigType(config['config_type']).name
136+
return name
137+
138+
@classmethod
139+
def _describe_configs_process_responses(cls, responses, config_filter='modified'):
108140
config_filter = ConfigFilterType.build_from(config_filter)
109141
ret = defaultdict(dict)
110142
for response in responses:
111143
for result in response.results:
112144
resource_type = ConfigResourceType(result.resource_type)
113145
resource_configs = {}
114-
for config in result.configs:
115-
config = config.to_dict()
116-
name = config.pop('name')
117-
if config_filter == ConfigFilterType.DYNAMIC and config['read_only']:
118-
continue
119-
if 'config_source' in config:
120-
config_source = ConfigSourceType(config['config_source'])
121-
elif config['read_only'] and resource_type is ConfigResourceType.BROKER:
122-
config_source = ConfigSourceType.STATIC_BROKER_CONFIG
123-
elif config['is_default']:
124-
config_source = ConfigSourceType.DEFAULT_CONFIG
125-
else:
126-
config_source = ConfigSourceType.dynamic_for_resource_type(resource_type)
127-
if config_filter.should_skip(config_source):
128-
continue
129-
config['config_source'] = config_source.name
130-
if 'synonyms' in config:
131-
for synonym in config['synonyms']:
132-
synonym['source'] = ConfigSourceType(synonym['source']).name
133-
if 'config_type' in config:
134-
config['config_type'] = ConfigType(config['config_type']).name
135-
resource_configs[name] = config
146+
for config_struct in result.configs:
147+
config = config_struct.to_dict()
148+
name = cls._process_config(config, resource_type)
149+
if not cls._should_skip_config(config, config_filter, resource_type):
150+
resource_configs[name] = config
136151
ret[resource_type.name.lower()][result.resource_name] = resource_configs
137152
return dict(ret)
138153

@@ -419,7 +434,7 @@ def __repr__(self):
419434
return f"ConfigResource({self.resource_type}, {self.name}, {self.configs})"
420435

421436

422-
class ConfigType(IntEnum):
437+
class ConfigType(EnumHelper, IntEnum):
423438
UNKNOWN = 0
424439
BOOLEAN = 1
425440
STRING = 2
@@ -432,7 +447,7 @@ class ConfigType(IntEnum):
432447
PASSWORD = 9
433448

434449

435-
class ConfigSourceType(IntEnum):
450+
class ConfigSourceType(EnumHelper, IntEnum):
436451
UNKNOWN = 0
437452
DYNAMIC_TOPIC_CONFIG = 1
438453
DYNAMIC_BROKER_CONFIG = 2

kafka/admin/_topics.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import kafka.errors as Errors
1414
from kafka.errors import IncompatibleBrokerVersion
1515
from kafka.protocol.admin import CreateTopicsRequest, DeleteTopicsRequest, CreatePartitionsRequest
16+
from ._configs import ConfigResourceType
1617

1718
if TYPE_CHECKING:
1819
from kafka.net.manager import KafkaConnectionManager
@@ -143,6 +144,14 @@ def response_errors(r):
143144
self.wait_for_topics([new_topic.name for new_topic in request.topics])
144145
result = response.to_dict()
145146
result.pop('throttle_time_ms', None)
147+
for topic in result['topics']:
148+
configs = topic.pop('configs', None)
149+
if configs:
150+
processed_configs = {}
151+
for config in configs:
152+
name = self._process_config(config, ConfigResourceType.TOPIC)
153+
processed_configs[name] = config
154+
topic['configs'] = processed_configs
146155
return result
147156

148157
def wait_for_topics(self, topic_names, timeout_ms=10000):

0 commit comments

Comments
 (0)