Skip to content

Commit 69fb898

Browse files
feat: enhance concurrency logging to differentiate config vs default values
Co-Authored-By: unknown <>
1 parent a4b6051 commit 69fb898

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,23 @@ def __init__(
233233
concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
234234
initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
235235

236-
self.logger.info(
237-
"Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, concurrency_level_from_manifest=%s",
238-
concurrency_level,
239-
initial_number_of_partitions_to_generate,
240-
"defined" if concurrency_level_from_manifest else "not_defined",
241-
)
236+
if concurrency_level_from_manifest:
237+
raw_default_concurrency = concurrency_level_from_manifest.get("default_concurrency", "N/A")
238+
self.logger.info(
239+
"Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, "
240+
"source=manifest (expression=%s), config=%s",
241+
concurrency_level,
242+
initial_number_of_partitions_to_generate,
243+
raw_default_concurrency,
244+
{k: v for k, v in (config or {}).items() if "worker" in k.lower() or "concurren" in k.lower()},
245+
)
246+
else:
247+
self.logger.info(
248+
"Concurrency configuration: concurrency_level=%d, initial_number_of_partitions_to_generate=%d, "
249+
"source=default (_LOWEST_SAFE_CONCURRENCY_LEVEL)",
250+
concurrency_level,
251+
initial_number_of_partitions_to_generate,
252+
)
242253

243254
self._concurrent_source = ConcurrentSource.create(
244255
num_workers=concurrency_level,

0 commit comments

Comments
 (0)