Skip to content

Commit 51596f5

Browse files
author
Rafał Hibner
committed
Release lock when pushing result
1 parent b98589c commit 51596f5

1 file changed

Lines changed: 16 additions & 16 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,28 +1063,28 @@ class AsofJoinNode : public ExecNode {
10631063
// Process batches while we have data
10641064
for (;;) {
10651065
backpressure_future_.Wait();
1066+
Result<std::shared_ptr<RecordBatch>> result;
10661067
{
10671068
std::lock_guard<std::mutex> guard(gate_);
10681069
if (!CheckEnded()) {
10691070
return false;
10701071
}
1071-
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072-
1073-
if (result.ok()) {
1074-
auto out_rb = *result;
1075-
if (!out_rb) break;
1076-
ExecBatch out_b(*out_rb);
1077-
out_b.index = batches_produced_++;
1078-
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079-
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080-
Status st = output_->InputReceived(this, std::move(out_b));
1081-
if (!st.ok()) {
1082-
EndFromProcessThread(std::move(st));
1083-
}
1084-
} else {
1085-
EndFromProcessThread(result.status());
1086-
return false;
1072+
result = ProcessInner();
1073+
}
1074+
if (result.ok()) {
1075+
auto out_rb = *result;
1076+
if (!out_rb) break;
1077+
ExecBatch out_b(*out_rb);
1078+
out_b.index = batches_produced_++;
1079+
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1080+
out_rb->ToString(), DEBUG_MANIP(std::endl));
1081+
Status st = output_->InputReceived(this, std::move(out_b));
1082+
if (!st.ok()) {
1083+
EndFromProcessThread(std::move(st));
10871084
}
1085+
} else {
1086+
EndFromProcessThread(result.status());
1087+
return false;
10881088
}
10891089
}
10901090

0 commit comments

Comments
 (0)