Skip to content

Commit 2ca171f

Browse files
author
Rafał Hibner
committed
Preserve order when writting with TeeNode
1 parent b71f629 commit 2ca171f

1 file changed

Lines changed: 21 additions & 2 deletions

File tree

cpp/src/arrow/dataset/file_base.cc

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "arrow/dataset/file_base.h"
1919

20+
#include "arrow/acero/accumulation_queue.h"
2021
#include "arrow/acero/exec_plan.h"
2122

2223
#include <algorithm>
@@ -559,13 +560,18 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
559560

560561
namespace {
561562

562-
class TeeNode : public acero::MapNode {
563+
class TeeNode : public acero::MapNode,
564+
public arrow::acero::util::SerialSequencingQueue::Processor {
563565
public:
564566
TeeNode(acero::ExecPlan* plan, std::vector<acero::ExecNode*> inputs,
565567
std::shared_ptr<Schema> output_schema,
566568
FileSystemDatasetWriteOptions write_options)
567569
: MapNode(plan, std::move(inputs), std::move(output_schema)),
568-
write_options_(std::move(write_options)) {}
570+
write_options_(std::move(write_options)) {
571+
if (write_options.preserve_order) {
572+
sequencer_ = acero::util::SerialSequencingQueue::Make(this);
573+
}
574+
}
569575

570576
Status StartProducing() override {
571577
ARROW_ASSIGN_OR_RAISE(
@@ -592,6 +598,18 @@ class TeeNode : public acero::MapNode {
592598

593599
const char* kind_name() const override { return "TeeNode"; }
594600

601+
Status InputReceived(ExecNode* input, ExecBatch batch) override {
602+
DCHECK_EQ(input, inputs_[0]);
603+
if (sequencer_) {
604+
return sequencer_->InsertBatch(std::move(batch));
605+
}
606+
return Process(std::move(batch));
607+
}
608+
609+
Status Process(ExecBatch batch) override {
610+
return acero::MapNode::InputReceived(inputs_[0], batch);
611+
}
612+
595613
void Finish() override { dataset_writer_->Finish(); }
596614

597615
Result<compute::ExecBatch> ProcessBatch(compute::ExecBatch batch) override {
@@ -625,6 +643,7 @@ class TeeNode : public acero::MapNode {
625643
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
626644
FileSystemDatasetWriteOptions write_options_;
627645
std::atomic<int32_t> backpressure_counter_ = 0;
646+
std::unique_ptr<acero::util::SerialSequencingQueue> sequencer_{nullptr};
628647
};
629648

630649
} // namespace

0 commit comments

Comments
 (0)