Skip to content

Commit 00dc2e1

Browse files
committed
fix(worker): correct db query in batch cancellation
1 parent a828fad commit 00dc2e1

1 file changed

Lines changed: 80 additions & 44 deletions

File tree

  • netmito/src/service/worker

netmito/src/service/worker/mod.rs

Lines changed: 80 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ pub use queue::{TaskDispatcher, TaskDispatcherOp};
1212
use sea_orm::{
1313
prelude::*,
1414
sea_query::{
15-
extension::postgres::PgExpr, Alias, Asterisk, CommonTableExpression, Nullable, OnConflict,
16-
PgFunc, Query, WithClause,
15+
extension::postgres::PgExpr, Alias, Asterisk, CommonTableExpression, DeleteStatement,
16+
Nullable, OnConflict, PgFunc, Query, WithClause,
1717
},
1818
FromQueryResult, QuerySelect, Set, TransactionTrait,
1919
};
@@ -945,7 +945,8 @@ pub async fn shutdown_workers_by_filter(
945945
);
946946
}
947947

948-
// Execute shutdown operations in a transaction using subqueries
948+
// Execute shutdown operations in a transaction using CTEs.
949+
let builder = pool.db.get_database_backend();
949950
let (shutdown_count, tasks_to_reassign, removed_worker_ids) = pool
950951
.db
951952
.transaction::<_, (u64, Vec<ActiveTask::Model>, Vec<i64>), crate::error::Error>(|txn| {
@@ -957,9 +958,8 @@ pub async fn shutdown_workers_by_filter(
957958

958959
match req.op {
959960
WorkerShutdownOp::Force => {
960-
// Force shutdown: remove all workers immediately using subqueries
961-
962-
// 1. Update tasks assigned to matching workers, get them back for reassignment
961+
// 1. Reassign tasks assigned to matching workers.
962+
// The subquery joins group_worker which still exists at this point.
963963
tasks_to_reassign = ActiveTask::Entity::update_many()
964964
.col_expr(
965965
ActiveTask::Column::AssignedWorker,
@@ -974,68 +974,104 @@ pub async fn shutdown_workers_by_filter(
974974
.exec_with_returning(txn)
975975
.await?;
976976

977-
// 2. Delete group_worker relations using subquery
978-
GroupWorker::Entity::delete_many()
979-
.filter(
980-
GroupWorker::Column::WorkerId
981-
.in_subquery(worker_filter_subquery.clone()),
977+
// 2. CTE: delete all matching workers and their group_worker entries.
978+
let del_gw_cte = CommonTableExpression::new()
979+
.query(
980+
DeleteStatement::new()
981+
.from_table(GroupWorker::Entity)
982+
.and_where(
983+
GroupWorker::Column::WorkerId
984+
.in_subquery(worker_filter_subquery),
985+
)
986+
.returning_col(GroupWorker::Column::WorkerId)
987+
.to_owned(),
982988
)
983-
.exec(txn)
984-
.await?;
985-
986-
// 3. Delete workers using subquery, get back deleted IDs
987-
let deleted_workers = Worker::Entity::delete_many()
988-
.filter(Worker::Column::Id.in_subquery(worker_filter_subquery.clone()))
989-
.exec_with_returning(txn)
990-
.await?;
991-
992-
removed_worker_ids = deleted_workers.into_iter().map(|w| w.id).collect();
989+
.table_name(Alias::new("del_gw"))
990+
.column(Alias::new("worker_id"))
991+
.to_owned();
992+
let del_workers = DeleteStatement::new()
993+
.from_table(Worker::Entity)
994+
.and_where(
995+
Worker::Column::Id.in_subquery(
996+
Query::select()
997+
.column(Alias::new("worker_id"))
998+
.from(Alias::new("del_gw"))
999+
.to_owned(),
1000+
),
1001+
)
1002+
.returning_col(Worker::Column::Id)
1003+
.to_owned();
1004+
let stmt = builder
1005+
.build(&del_workers.with(WithClause::new().cte(del_gw_cte).to_owned()));
1006+
removed_worker_ids = super::task::PartialWorkerId::find_by_statement(stmt)
1007+
.all(txn)
1008+
.await?
1009+
.into_iter()
1010+
.map(|r| r.id)
1011+
.collect();
9931012
}
9941013
WorkerShutdownOp::Graceful => {
9951014
// Graceful shutdown: separate workers with/without tasks
9961015
// No tasks to reassign in graceful mode
9971016
tasks_to_reassign = Vec::new();
9981017

999-
// 1. Delete workers without tasks using subquery with additional filter
1018+
// 1. CTE: delete no-task workers and their group_worker entries.
10001019
let mut worker_filter_no_task = worker_filter_subquery.clone();
10011020
worker_filter_no_task.and_where(
10021021
Expr::col((Worker::Entity, Worker::Column::AssignedTaskId)).is_null(),
10031022
);
1004-
1005-
// Delete group_worker relations for workers without tasks
1006-
GroupWorker::Entity::delete_many()
1007-
.filter(
1008-
GroupWorker::Column::WorkerId
1009-
.in_subquery(worker_filter_no_task.clone()),
1023+
let del_gw_cte = CommonTableExpression::new()
1024+
.query(
1025+
DeleteStatement::new()
1026+
.from_table(GroupWorker::Entity)
1027+
.and_where(
1028+
GroupWorker::Column::WorkerId
1029+
.in_subquery(worker_filter_no_task),
1030+
)
1031+
.returning_col(GroupWorker::Column::WorkerId)
1032+
.to_owned(),
10101033
)
1011-
.exec(txn)
1012-
.await?;
1013-
1014-
// Delete workers without tasks
1015-
let deleted_workers = Worker::Entity::delete_many()
1016-
.filter(Worker::Column::Id.in_subquery(worker_filter_no_task.clone()))
1017-
.exec_with_returning(txn)
1018-
.await?;
1019-
1020-
removed_worker_ids = deleted_workers.into_iter().map(|w| w.id).collect();
1021-
1022-
// 2. Update workers with tasks to GracefulShutdown using subquery
1023-
let mut worker_filter_with_task = worker_filter_subquery.clone();
1034+
.table_name(Alias::new("del_gw"))
1035+
.column(Alias::new("worker_id"))
1036+
.to_owned();
1037+
let del_workers = DeleteStatement::new()
1038+
.from_table(Worker::Entity)
1039+
.and_where(
1040+
Worker::Column::Id.in_subquery(
1041+
Query::select()
1042+
.column(Alias::new("worker_id"))
1043+
.from(Alias::new("del_gw"))
1044+
.to_owned(),
1045+
),
1046+
)
1047+
.returning_col(Worker::Column::Id)
1048+
.to_owned();
1049+
let stmt = builder
1050+
.build(&del_workers.with(WithClause::new().cte(del_gw_cte).to_owned()));
1051+
removed_worker_ids = super::task::PartialWorkerId::find_by_statement(stmt)
1052+
.all(txn)
1053+
.await?
1054+
.into_iter()
1055+
.map(|r| r.id)
1056+
.collect();
1057+
1058+
// 2. Mark with-task workers as GracefulShutdown.
1059+
// After step 1, only with-task workers still have group_worker entries,
1060+
// so the subquery correctly selects only those workers.
1061+
let mut worker_filter_with_task = worker_filter_subquery;
10241062
worker_filter_with_task.and_where(
10251063
Expr::col((Worker::Entity, Worker::Column::AssignedTaskId))
10261064
.is_not_null(),
10271065
);
1028-
10291066
let updated_workers = Worker::Entity::update_many()
10301067
.col_expr(
10311068
Worker::Column::State,
10321069
Expr::value(WorkerState::GracefulShutdown),
10331070
)
10341071
.col_expr(Worker::Column::UpdatedAt, Expr::value(now))
1035-
.filter(Worker::Column::Id.in_subquery(worker_filter_with_task.clone()))
1072+
.filter(Worker::Column::Id.in_subquery(worker_filter_with_task))
10361073
.exec_with_returning(txn)
10371074
.await?;
1038-
10391075
graceful_worker_ids = updated_workers.len() as u64;
10401076
}
10411077
}

0 commit comments

Comments
 (0)