Skip to content

Commit a7d65e6

Browse files
committed
update
1 parent 7cedd5a commit a7d65e6

2 files changed

Lines changed: 17 additions & 2 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.doris.thrift.PaloInternalServiceVersion;
9898
import org.apache.doris.thrift.TAIResource;
9999
import org.apache.doris.thrift.TBrokerScanRange;
100+
import org.apache.doris.thrift.TDataSink;
100101
import org.apache.doris.thrift.TDataSinkType;
101102
import org.apache.doris.thrift.TDescriptorTable;
102103
import org.apache.doris.thrift.TErrorTabletInfo;
@@ -565,12 +566,19 @@ private static void assignAdaptiveRandomBucketForFragment(
565566
if (partitionAssignments == null) {
566567
continue;
567568
}
569+
TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam);
568570
OlapTableSink.applyAdaptiveRandomBucketAssignments(
569-
sinkParam.getFragment().getOutputSink().getOlapTableSink().getPartition().getPartitions(),
571+
copiedSink.getPartition().getPartitions(),
570572
partitionAssignments);
571573
}
572574
}
573575

576+
private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) {
577+
TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy();
578+
sinkParam.getFragment().setOutputSink(copiedOutputSink);
579+
return copiedOutputSink.getOlapTableSink();
580+
}
581+
574582
// Initialize
575583
protected void prepare() throws UserException {
576584
for (PlanFragment fragment : fragments) {

fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,19 @@ private static void assignAdaptiveRandomBucketForSinkParams(List<TPipelineFragme
295295
if (partitionAssignments == null) {
296296
continue;
297297
}
298+
TOlapTableSink copiedSink = deepCopyOlapTableSinkForCurrentBackend(sinkParam);
298299
OlapTableSink.applyAdaptiveRandomBucketAssignments(
299-
sinkParam.getFragment().getOutputSink().getOlapTableSink().getPartition().getPartitions(),
300+
copiedSink.getPartition().getPartitions(),
300301
partitionAssignments);
301302
}
302303
}
303304

305+
private static TOlapTableSink deepCopyOlapTableSinkForCurrentBackend(TPipelineFragmentParams sinkParam) {
306+
TDataSink copiedOutputSink = sinkParam.getFragment().getOutputSink().deepCopy();
307+
sinkParam.getFragment().setOutputSink(copiedOutputSink);
308+
return copiedOutputSink.getOlapTableSink();
309+
}
310+
304311
private static Multiset<DistributedPlanWorker> computeInstanceNumPerWorker(
305312
List<PipelineDistributedPlan> distributedPlans) {
306313
Multiset<DistributedPlanWorker> workerCounter = LinkedHashMultiset.create();

0 commit comments

Comments
 (0)