@@ -62,8 +62,7 @@ ScalarAggregateNode::MakeAggregateNodeArgs(const std::shared_ptr<Schema>& input_
6262 const std::vector<FieldRef>& keys,
6363 const std::vector<FieldRef>& segment_keys,
6464 const std::vector<Aggregate>& aggs,
65- ExecContext* exec_ctx, size_t concurrency,
66- std::vector<ExecNode*> inputs) {
65+ ExecContext* exec_ctx, size_t concurrency) {
6766 // Copy (need to modify options pointer below)
6867 std::vector<Aggregate> aggregates (aggs);
6968 std::vector<int > segment_field_ids (segment_keys.size ());
@@ -158,27 +157,61 @@ ScalarAggregateNode::MakeAggregateNodeArgs(const std::shared_ptr<Schema>& input_
158157 fields[base + i] = field (aggregates[i].name , out_type.GetSharedPtr ());
159158 }
160159
160+ return AggregateNodeArgs<ScalarAggregateKernel>{schema (std::move (fields)),
161+ /* grouping_key_field_ids=*/ {},
162+ std::move (segment_field_ids),
163+ std::move (segmenter),
164+ std::move (target_fieldsets),
165+ std::move (aggregates),
166+ std::move (kernels),
167+ std::move (kernel_intypes),
168+ std::move (states),
169+ requires_ordering};
170+ }
171+
172+ Result<ExecNode*> ScalarAggregateNode::Make (ExecPlan* plan, std::vector<ExecNode*> inputs,
173+ const ExecNodeOptions& options) {
174+ RETURN_NOT_OK (ValidateExecNodeInputs (plan, inputs, 1 , " ScalarAggregateNode" ));
175+
176+ const auto & aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
177+ auto aggregates = aggregate_options.aggregates ;
178+ const auto & keys = aggregate_options.keys ;
179+ const auto & segment_keys = aggregate_options.segment_keys ;
180+ const auto input = inputs[0 ];
181+ const auto concurrency = plan->query_context ()->max_concurrency ();
182+
183+ if (keys.size () > 0 ) {
184+ return Status::Invalid (" Scalar aggregation with some key" );
185+ }
186+
187+ const auto & input_schema = inputs[0 ]->output_schema ();
188+ auto exec_ctx = plan->query_context ()->exec_context ();
189+
190+ ARROW_ASSIGN_OR_RAISE (
191+ auto args, MakeAggregateNodeArgs (input_schema, keys, segment_keys, aggregates,
192+ exec_ctx, concurrency));
161193 Ordering out_ordering = Ordering::Unordered ();
162- if (requires_ordering && inputs[ 0 ] ->ordering ().is_implicit ()) {
194+ if (args. requires_ordering && input ->ordering ().is_implicit ()) {
163195 out_ordering = Ordering::Implicit ();
164- } else if (requires_ordering) {
196+ } else if (args. requires_ordering ) {
165197 std::vector<compute::SortKey> out_sort_keys;
166- std::unordered_set<int > segmented_key_field_id_set (segment_field_ids .begin (),
167- segment_field_ids .end ());
168- // Propagate output sorting only by segmented keys excluding sorting by regualr keys
198+ std::unordered_set<int > segmented_key_field_id_set (args. segment_key_field_ids .begin (),
199+ args. segment_key_field_ids .end ());
200+ // Propagate output sorting only by segmented keys excluding sorting by regular keys
169201 // since this will break the segmentation.
170- for (auto key : inputs[ 0 ] ->ordering ().sort_keys ()) {
202+ for (auto key : input ->ordering ().sort_keys ()) {
171203 ARROW_ASSIGN_OR_RAISE (auto match, key.target .FindOne (*input_schema));
172204 if (segmented_key_field_id_set.find (match[0 ]) != segmented_key_field_id_set.end ()) {
173205 out_sort_keys.emplace_back (key);
174206 } else {
175207 break ;
176208 }
177209 }
178- if (out_sort_keys.size () > 0 )
210+ if (out_sort_keys.size () > 0 ) {
179211 out_ordering = Ordering (out_sort_keys);
180- else
212+ } else {
181213 out_ordering = Ordering::Implicit ();
214+ }
182215 }
183216
184217 if (!out_ordering.is_unordered ()) {
@@ -189,42 +222,11 @@ ScalarAggregateNode::MakeAggregateNodeArgs(const std::shared_ptr<Schema>& input_
189222 " order_by node)" );
190223 }
191224 }
192-
193- return AggregateNodeArgs<ScalarAggregateKernel>{
194- schema (std::move (fields)),
195- /* grouping_key_field_ids=*/ {}, std::move (segment_field_ids),
196- std::move (segmenter), std::move (target_fieldsets),
197- std::move (aggregates), std::move (kernels),
198- std::move (kernel_intypes), std::move (states),
199- std::move (out_ordering)};
200- }
201-
202- Result<ExecNode*> ScalarAggregateNode::Make (ExecPlan* plan, std::vector<ExecNode*> inputs,
203- const ExecNodeOptions& options) {
204- RETURN_NOT_OK (ValidateExecNodeInputs (plan, inputs, 1 , " ScalarAggregateNode" ));
205-
206- const auto & aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
207- auto aggregates = aggregate_options.aggregates ;
208- const auto & keys = aggregate_options.keys ;
209- const auto & segment_keys = aggregate_options.segment_keys ;
210- const auto concurrency = plan->query_context ()->max_concurrency ();
211-
212- if (keys.size () > 0 ) {
213- return Status::Invalid (" Scalar aggregation with some key" );
214- }
215-
216- const auto & input_schema = inputs[0 ]->output_schema ();
217- auto exec_ctx = plan->query_context ()->exec_context ();
218-
219- ARROW_ASSIGN_OR_RAISE (
220- auto args, MakeAggregateNodeArgs (input_schema, keys, segment_keys, aggregates,
221- exec_ctx, concurrency, inputs));
222-
223225 return plan->EmplaceNode <ScalarAggregateNode>(
224226 plan, std::move (inputs), std::move (args.output_schema ), std::move (args.segmenter ),
225227 std::move (args.segment_key_field_ids ), std::move (args.target_fieldsets ),
226228 std::move (args.aggregates ), std::move (args.kernels ), std::move (args.kernel_intypes ),
227- std::move (args.states ), std::move (args. ordering ));
229+ std::move (args.states ), std::move (out_ordering ));
228230}
229231
230232Status ScalarAggregateNode::DoConsume (const ExecSpan& batch, size_t thread_index) {
0 commit comments