Skip to content

Commit 3c4a1a9

Browse files
committed
store: Clarify the logic of the control loop in copy_data_internal
1 parent 92b2df4 commit 3c4a1a9

1 file changed

Lines changed: 31 additions & 17 deletions

File tree

store/postgres/src/copy.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,34 +1035,48 @@ impl Connection {
10351035
};
10361036
workers.push(worker);
10371037
}
1038+
10381039
self.assert_progress(workers.len(), &state)?;
10391040
let (result, _idx, remaining) = select_all(workers).await;
10401041
workers = remaining;
10411042

1042-
let worker = match result {
1043-
Ok(worker) => worker,
1043+
// Analyze `result` and take another trip through the loop if
1044+
// everything is ok; wait for pending workers and return if
1045+
// there was an error or if copying was cancelled.
1046+
match result {
10441047
Err(e) => {
10451048
// This is a panic in the background task. We need to
10461049
// cancel all other tasks and return the error
10471050
self.cancel_workers(progress, workers).await;
10481051
return Err(e);
10491052
}
1050-
};
1051-
1052-
// Put the connection back into self.conn so that we can use it
1053-
// in the next iteration.
1054-
self.conn = Some(worker.conn);
1055-
state.finished.push(worker.table);
1056-
1057-
if worker.result.is_err() {
1058-
self.cancel_workers(progress, workers).await;
1059-
return worker.result;
1060-
}
1053+
Ok(worker) => {
1054+
// Put the connection back into self.conn so that we can use it
1055+
// in the next iteration.
1056+
self.conn = Some(worker.conn);
10611057

1062-
if progress.is_cancelled() {
1063-
self.cancel_workers(progress, workers).await;
1064-
return Ok(Status::Cancelled);
1065-
}
1058+
match (worker.result, progress.is_cancelled()) {
1059+
(Ok(Status::Finished), false) => {
1060+
// The worker finished successfully, and nothing was
1061+
// cancelled; take another trip through the loop
1062+
state.finished.push(worker.table);
1063+
}
1064+
(Ok(Status::Finished), true) => {
1065+
state.finished.push(worker.table);
1066+
self.cancel_workers(progress, workers).await;
1067+
return Ok(Status::Cancelled);
1068+
}
1069+
(Ok(Status::Cancelled), _) => {
1070+
self.cancel_workers(progress, workers).await;
1071+
return Ok(Status::Cancelled);
1072+
}
1073+
(Err(e), _) => {
1074+
self.cancel_workers(progress, workers).await;
1075+
return Err(e);
1076+
}
1077+
}
1078+
}
1079+
};
10661080
}
10671081
debug_assert!(self.conn.is_some());
10681082

0 commit comments

Comments
 (0)