Skip to content

Commit 1b932c9

Browse files
authored
fix(pipeline): avoid spawning unguarded coroutines (#643)
coroutines are now spawned only inside the awaited execution path this fixes async runtime warnings when using the experimental `--parallel` flag ```sh sys:1: RuntimeWarning: coroutine 'Pipeline.build_execution_graph.<locals>.run_parallel_tasks' was never awaited sys:1: RuntimeWarning: coroutine 'deploy.<locals>.deploy_runner' was never awaited ```
1 parent f946c98 commit 1b932c9

1 file changed

Lines changed: 20 additions & 20 deletions

File tree

kpops/pipeline/__init__.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,19 @@ def build_execution_graph(
104104
/,
105105
reverse: bool = False,
106106
) -> Awaitable[None]:
107-
async def run_parallel_tasks(
108-
coroutines: list[Coroutine[Any, Any, None]],
107+
async def run_layer_parallel(
108+
components: list[PipelineComponent],
109109
) -> None:
110110
tasks: list[asyncio.Task[None]] = []
111-
for coro in coroutines:
112-
tasks.append(asyncio.create_task(coro))
111+
for component in components:
112+
tasks.append(asyncio.create_task(runner(component)))
113113
await asyncio.gather(*tasks)
114114

115-
async def run_graph_tasks(pending_tasks: list[Awaitable[None]]) -> None:
116-
for pending_task in pending_tasks:
117-
await pending_task
115+
async def run_graph_layers(
116+
pending_layers: list[list[PipelineComponent]],
117+
) -> None:
118+
for layer_components in pending_layers:
119+
await run_layer_parallel(layer_components)
118120

119121
graph: nx.DiGraph[str] = self._graph.copy()
120122

@@ -130,15 +132,15 @@ async def run_graph_tasks(pending_tasks: list[Awaitable[None]]) -> None:
130132

131133
layers_graph: list[list[str]] = list(nx.bfs_layers(graph, root_node))
132134

133-
sorted_tasks: list[Awaitable[None]] = []
135+
sorted_layers: list[list[PipelineComponent]] = []
134136
for layer in layers_graph[1:]:
135-
if parallel_tasks := self.__get_parallel_tasks_from(layer, runner):
136-
sorted_tasks.append(run_parallel_tasks(parallel_tasks))
137+
if parallel_components := self.__get_parallel_components_from(layer):
138+
sorted_layers.append(parallel_components)
137139

138140
if reverse:
139-
sorted_tasks.reverse()
141+
sorted_layers.reverse()
140142

141-
return run_graph_tasks(sorted_tasks)
143+
return run_graph_layers(sorted_layers)
142144

143145
def __getitem__(self, component_id: str) -> PipelineComponent:
144146
try:
@@ -173,18 +175,16 @@ def __add_input(self, topic_id: str, target: str) -> None:
173175
self._graph.add_node(topic_id)
174176
self._graph.add_edge(topic_id, target)
175177

176-
def __get_parallel_tasks_from(
177-
self,
178-
layer: list[str],
179-
runner: Callable[[PipelineComponent], Coroutine[Any, Any, None]],
180-
) -> list[Coroutine[Any, Any, None]]:
181-
def gen_parallel_tasks():
178+
def __get_parallel_components_from(
179+
self, layer: list[str]
180+
) -> list[PipelineComponent]:
181+
def gen_parallel_components() -> Iterator[PipelineComponent]:
182182
for node_in_layer in layer:
183183
# check if component, skip topics
184184
if (component := self._component_index.get(node_in_layer)) is not None:
185-
yield runner(component)
185+
yield component
186186

187-
return list(gen_parallel_tasks())
187+
return list(gen_parallel_components())
188188

189189
def __validate_graph(self) -> None:
190190
if not nx.is_directed_acyclic_graph(self._graph):

0 commit comments

Comments
 (0)