From 8432d3486cd9763a76bdd663d44a8aa3eb8a0cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E7=B1=B3=E5=89=8D=E6=9C=89=E8=95=89=E7=9A=AE?= Date: Tue, 30 Dec 2025 09:31:33 +0800 Subject: [PATCH 1/6] Add async_start function for asynchronous execution --- src/graph/graph.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/graph/graph.rs b/src/graph/graph.rs index 498e42f..76a2f9d 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -166,6 +166,12 @@ impl Graph { /// This function is used for the execution of a single dag. pub fn start(&mut self) -> Result<(), GraphError> { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async { self.async_start().await }) + } + /// This function is used for the execution of a single dag with async. + pub async fn async_start(&mut self) -> Result<(), GraphError> { self.init(); let is_loop = self.check_loop_and_partition(); if is_loop { @@ -175,10 +181,7 @@ impl Graph { if !self.is_active.load(Ordering::Relaxed) { return Err(GraphError::GraphNotActive); } - - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async { self.run().await }) + self.run().await } /// Executes the graph's nodes in a concurrent manner, respecting the block structure. From 7def253d36f422003fd1f4e4546151131d7989cc Mon Sep 17 00:00:00 2001 From: cn-kali-team Date: Tue, 30 Dec 2025 10:27:22 +0800 Subject: [PATCH 2/6] fix blocking_lock --- src/graph/graph.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/graph/graph.rs b/src/graph/graph.rs index 76a2f9d..4630f84 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -173,7 +173,7 @@ impl Graph { /// This function is used for the execution of a single dag with async. pub async fn async_start(&mut self) -> Result<(), GraphError> { self.init(); - let is_loop = self.check_loop_and_partition(); + let is_loop = self.check_loop_and_partition().await; if is_loop { return Err(GraphError::GraphLoopDetected); } @@ -266,9 +266,10 @@ impl Graph { } } Err(_) => { - // Close all the channels - node_ref.blocking_lock().input_channels().close_all(); - node_ref.blocking_lock().output_channels().close_all(); + // Close all the channels using the async lock (do not use blocking_lock inside runtime) + let mut node_guard = node_ref.lock().await; + node_guard.input_channels().close_all(); + node_guard.output_channels().close_all(); error!("Execution failed [name: {}, id: {}]", node_name, node_id,); let mut errors_lock = errors.lock().await; @@ -312,7 +313,7 @@ impl Graph { /// - Groups nodes into blocks, creating a new block whenever a conditional node / loop is encountered /// /// Returns true if the graph contains a cycle, false otherwise. - pub fn check_loop_and_partition(&mut self) -> bool { + pub async fn check_loop_and_partition(&mut self) -> bool { // Check for cycles using abstract graph let has_cycle = self.abstract_graph.check_loop(); @@ -337,10 +338,13 @@ impl Graph { // Create new block if conditional node / loop encountered let node = self.nodes.get(node_id).unwrap(); - if node.blocking_lock().is_condition() { + // Use an async lock here to avoid blocking the runtime + let node_guard = node.lock().await; + if node_guard.is_condition() { self.blocks.push(current_block); current_block = HashSet::new(); } + // node_guard is dropped here } } From 7840fa735e4a2df7207e3f47c22e314818f7a21a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E7=B1=B3=E5=89=8D=E6=9C=89=E8=95=89=E7=9A=AE?= Date: Tue, 30 Dec 2025 10:31:09 +0800 Subject: [PATCH 3/6] Update src/graph/graph.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/graph/graph.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/graph/graph.rs b/src/graph/graph.rs index 4630f84..14996b9 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -166,9 +166,9 @@ impl Graph { /// This function is used for the execution of a single dag. pub fn start(&mut self) -> Result<(), GraphError> { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async { self.async_start().await }) + let runtime = tokio::runtime::Runtime::new() + .map_err(GraphError::RuntimeCreationFailed)?; + runtime.block_on(async { self.async_start().await }) } /// This function is used for the execution of a single dag with async. pub async fn async_start(&mut self) -> Result<(), GraphError> { From c2d5abba65a273993aed5d7a1d2a63cda08e6a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E7=B1=B3=E5=89=8D=E6=9C=89=E8=95=89=E7=9A=AE?= Date: Tue, 30 Dec 2025 10:31:27 +0800 Subject: [PATCH 4/6] Update src/graph/graph.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/graph/graph.rs | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/graph/graph.rs b/src/graph/graph.rs index 14996b9..82542d7 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -170,7 +170,41 @@ impl Graph { .map_err(GraphError::RuntimeCreationFailed)?; runtime.block_on(async { self.async_start().await }) } - /// This function is used for the execution of a single dag with async. + /// Executes a single DAG within an existing async runtime. + /// + /// Use this method when you are already running inside an async context + /// (for example, inside a `tokio::main` function or a task spawned on a + /// Tokio runtime) and you do **not** want `Graph` to create and manage + /// its own Tokio runtime. + /// + /// Unlike [`start`], this method: + /// - Does not create a new Tokio runtime. + /// - Assumes it is called on a thread where a Tokio runtime is already + /// active. + /// - Can be `await`-ed like any other async function. + /// + /// # Requirements + /// + /// - A Tokio runtime must be active on the current thread when this + /// method is called. + /// - The graph must have been properly configured (nodes and edges + /// added) before calling this method. + /// + /// If those conditions are not met, execution may fail at runtime. + /// + /// # Examples + /// + /// ```ignore + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let mut graph = build_graph_somehow(); + /// + /// // Use `async_start` because we are already inside a Tokio runtime. + /// graph.async_start().await?; + /// + /// Ok(()) + /// } + /// ``` pub async fn async_start(&mut self) -> Result<(), GraphError> { self.init(); let is_loop = self.check_loop_and_partition().await; From 2ba99052948d7ad270234de1b6450208a4d7a49c Mon Sep 17 00:00:00 2001 From: cn-kali-team Date: Tue, 30 Dec 2025 10:41:35 +0800 Subject: [PATCH 5/6] add RuntimeCreationFailed --- src/graph/error.rs | 2 ++ src/graph/graph.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/graph/error.rs b/src/graph/error.rs index b3016b0..b7bf81d 100644 --- a/src/graph/error.rs +++ b/src/graph/error.rs @@ -12,6 +12,8 @@ pub enum GraphError { node_id: usize, }, MultipleErrors(Vec), + /// Contains the original error message when runtime creation failed + RuntimeCreationFailed(String), } impl std::fmt::Display for GraphError { diff --git a/src/graph/graph.rs b/src/graph/graph.rs index 82542d7..c98894f 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -167,7 +167,7 @@ impl Graph { /// This function is used for the execution of a single dag. pub fn start(&mut self) -> Result<(), GraphError> { let runtime = tokio::runtime::Runtime::new() - .map_err(GraphError::RuntimeCreationFailed)?; + .map_err(|e| GraphError::RuntimeCreationFailed(e.to_string()))?; runtime.block_on(async { self.async_start().await }) } /// Executes a single DAG within an existing async runtime. From bc68aa82ae1f2d3861497e31446e9afc2fcc3225 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E7=B1=B3=E5=89=8D=E6=9C=89=E8=95=89=E7=9A=AE?= Date: Tue, 30 Dec 2025 10:42:24 +0800 Subject: [PATCH 6/6] Update src/graph/graph.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/graph/graph.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/graph/graph.rs b/src/graph/graph.rs index c98894f..32c8a9a 100644 --- a/src/graph/graph.rs +++ b/src/graph/graph.rs @@ -378,7 +378,6 @@ impl Graph { self.blocks.push(current_block); current_block = HashSet::new(); } - // node_guard is dropped here } }