Skip to content

Commit 3615c5d

Browse files
committed
feat: change worker to suit for new exec spec
1 parent 2826c93 commit 3615c5d

1 file changed

Lines changed: 16 additions & 9 deletions

File tree

netmito/src/worker.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ async fn execute_task(
619619
_ = task_executor.task_cancel_token.cancelled() => return Ok(()),
620620
_ = tokio::time::sleep(task_executor.polling_interval) => {},
621621
_ = tokio::time::sleep_until(timeout_until) => {
622-
tracing::debug!("Fetching resource timeout, commit this task as canceled");
622+
tracing::debug!("Fetching resource timeout, commit this task as cancelled");
623623
let req = ReportTaskReq {
624624
id: task.id,
625625
op: ReportTaskOp::Cancel,
@@ -709,7 +709,7 @@ async fn execute_task(
709709
}
710710
_ = task_executor.task_cancel_token.cancelled() => return Ok(()),
711711
_ = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
712-
tracing::debug!("Fetching resource timeout, commit this task as canceled");
712+
tracing::debug!("Fetching resource timeout, commit this task as cancelled");
713713
let req = ReportTaskReq {
714714
id: task.id,
715715
op: ReportTaskOp::Cancel,
@@ -736,7 +736,7 @@ async fn execute_task(
736736
return Ok(());
737737
}
738738
_ = tokio::time::sleep_until(timeout_until) => {
739-
tracing::debug!("Fetching resource timeout, commit this task as canceled");
739+
tracing::debug!("Fetching resource timeout, commit this task as cancelled");
740740
let req = ReportTaskReq {
741741
id: task.id,
742742
op: ReportTaskOp::Cancel,
@@ -764,7 +764,7 @@ async fn execute_task(
764764
}
765765
}
766766
} else if resp.status() == StatusCode::NOT_FOUND {
767-
tracing::debug!("Resource not found, commit this task as canceled");
767+
tracing::debug!("Resource not found, commit this task as cancelled");
768768
let req = ReportTaskReq {
769769
id: task.id,
770770
op: ReportTaskOp::Cancel,
@@ -786,7 +786,7 @@ async fn execute_task(
786786
.await;
787787
return Ok(());
788788
} else if resp.status() == StatusCode::FORBIDDEN {
789-
tracing::debug!("Resource is forbidden to be fetched, commit this task as canceled");
789+
tracing::debug!("Resource is forbidden to be fetched, commit this task as cancelled");
790790
let req = ReportTaskReq {
791791
id: task.id,
792792
op: ReportTaskOp::Cancel,
@@ -826,7 +826,9 @@ async fn execute_task(
826826
}
827827
}
828828

829-
if let Some((watched_task_uuid, watched_task_state)) = task.spec.watch {
829+
if let Some((watched_task_uuid, watched_task_state)) =
830+
task.exec_options.as_ref().and_then(|opts| opts.watch)
831+
{
830832
// Watch other tasks to specified state to trigger this task
831833
if task_executor.task_redis_conn.is_some() && task_executor.task_redis_pubsub.is_some() {
832834
task_executor
@@ -845,7 +847,7 @@ async fn execute_task(
845847
},
846848
_ = task_executor.watch_task(&watched_task_uuid, watched_task_state) => {},
847849
_ = tokio::time::sleep_until(timeout_until) => {
848-
tracing::debug!("Watching timeout, commit this task as canceled");
850+
tracing::debug!("Watching timeout, commit this task as cancelled");
849851
task_executor.unsubscribe_task_exec_state(&watched_task_uuid).await;
850852
let req = ReportTaskReq {
851853
id: task.id,
@@ -876,14 +878,19 @@ async fn execute_task(
876878
}
877879
}
878880

881+
// Default timeout is 10 minutes if not specified
882+
let timeout = task
883+
.spec
884+
.timeout
885+
.unwrap_or(std::time::Duration::from_secs(600));
879886
task_executor
880887
.announce_task_state_ex(
881888
&task.uuid,
882889
TaskExecState::ExecPending as i32,
883-
task.timeout.as_secs() + 60,
890+
timeout.as_secs() + 60,
884891
)
885892
.await;
886-
let timeout_until = tokio::time::Instant::now() + task.timeout;
893+
let timeout_until = tokio::time::Instant::now() + timeout;
887894

888895
// Setup new task file path and clean up any stale file
889896
let new_task_path = task_executor.task_cache_path.join("new_task.json");

0 commit comments

Comments
 (0)