diff --git a/dapr/Cargo.toml b/dapr/Cargo.toml index 9d532f4..e14001d 100644 --- a/dapr/Cargo.toml +++ b/dapr/Cargo.toml @@ -18,7 +18,7 @@ workflow = ["dep:dapr-durabletask"] async-trait = { workspace = true } axum = "0.7" chrono = "0.4" -dapr-durabletask = { version = "0.0.2", optional = true } +dapr-durabletask = { version = "0.0.3", optional = true } futures = "0.3" http = "1" log = "0.4" diff --git a/dapr/src/client/mod.rs b/dapr/src/client/mod.rs index 13ede08..03debd3 100644 --- a/dapr/src/client/mod.rs +++ b/dapr/src/client/mod.rs @@ -192,10 +192,7 @@ impl Client { where S: Into, { - let mut mdata = HashMap::::new(); - if let Some(m) = metadata { - mdata = m; - } + let mdata = metadata.unwrap_or_default(); self.0 .publish_event(PublishEventRequest { pubsub_name: pubsub_name.into(), @@ -263,10 +260,7 @@ impl Client { where S: Into, { - let mut mdata = HashMap::::new(); - if let Some(m) = metadata { - mdata = m; - } + let mdata = metadata.unwrap_or_default(); self.0 .get_state(GetStateRequest { @@ -347,10 +341,7 @@ impl Client { where S: Into, { - let mut mdata = HashMap::::new(); - if let Some(m) = metadata { - mdata = m; - } + let mdata = metadata.unwrap_or_default(); self.0 .query_state_alpha1(QueryStateRequest { @@ -395,10 +386,7 @@ impl Client { where S: Into, { - let mut mdata = HashMap::::new(); - if let Some(m) = metadata { - mdata = m; - } + let mdata = metadata.unwrap_or_default(); self.0 .delete_state(DeleteStateRequest { @@ -456,10 +444,7 @@ impl Client { TInput: Serialize, TOutput: for<'a> Deserialize<'a>, { - let mut mdata = HashMap::::new(); - if let Some(m) = metadata { - mdata = m; - } + let mut mdata = metadata.unwrap_or_default(); mdata.insert("Content-Type".to_string(), "application/json".to_string()); diff --git a/dapr/src/workflow/context.rs b/dapr/src/workflow/context.rs index aa94963..9c9899d 100644 --- a/dapr/src/workflow/context.rs +++ b/dapr/src/workflow/context.rs @@ -1,8 +1,8 @@ use std::future::Future; use std::time::Duration; +use dapr_durabletask::api::ExternalEventResult; use dapr_durabletask::worker::{ActivityResult, OrchestratorResult, Registry}; -use futures::future::Either; use serde::Serialize; use serde::de::DeserializeOwned; @@ -81,11 +81,9 @@ pub trait WorkflowContextExt { /// Wait for an external event with an optional timeout and deserialize the payload. /// - /// When `timeout` is `Some`, a durable timer is scheduled alongside the - /// external event wait. The underlying timer cannot be cancelled in - /// `dapr-durabletask 0.0.1`, so the timer event is persisted into history - /// even when the external event arrives first; this is the same behaviour - /// as other Durable Task SDKs. + /// When `timeout` is `Some`, `dapr-durabletask` schedules a durable timer + /// tagged with the event name and removes the pending event waiter if the + /// timer fires first. /// /// # Arguments /// @@ -175,21 +173,20 @@ impl WorkflowContextExt for WorkflowContext { where T: DeserializeOwned + Send + 'static, { - let event = self.wait_for_external_event(name); - let timer = timeout.map(|duration| self.create_timer(duration)); + let ctx = self.clone(); + let name = name.to_string(); async move { - let output = match timer { - Some(timer) => { - futures::pin_mut!(event); - futures::pin_mut!(timer); - match futures::future::select(event, timer).await { - Either::Left((event_result, _)) => event_result?, - Either::Right(_) => { - return Err(dapr_durabletask::api::DurableTaskError::Timeout); - } + let output = match timeout { + Some(duration) => match ctx + .wait_for_external_event_with_timeout(&name, duration) + .await? + { + ExternalEventResult::Received(output) => output, + ExternalEventResult::TimedOut => { + return Err(dapr_durabletask::api::DurableTaskError::Timeout); } - } - None => event.await?, + }, + None => ctx.wait_for_external_event(&name).await?, }; deserialize_task_output(output) }