Skip to content

Commit a9e8ee4

Browse files
author
Ivo
committed
WIP sparse resampling
1 parent 029f892 commit a9e8ee4

4 files changed

Lines changed: 69 additions & 46 deletions

File tree

cpp/arcticdb/processing/clause_resample.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
366366
);
367367
}
368368
std::optional<Column> aggregated =
369-
aggregator.aggregate(input_index_columns, input_agg_columns, mapping, string_pool);
369+
aggregator.aggregate(input_agg_columns, mapping, string_pool);
370370
if (aggregated) {
371371
seg.add_column(
372372
scalar_field(aggregated->type().data_type(), aggregator.get_output_column_name().value),

cpp/arcticdb/processing/sorted_aggregation.cpp

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ SortedAggregatorOutputColumnInfo SortedAggregator<aggregation_operator, closed_b
6767
auto input_data_type = opt_input_agg_column->column_->type().data_type();
6868
check_aggregator_supported_with_data_type(input_data_type);
6969
add_data_type_impl(input_data_type, output_column_info.data_type_);
70+
if (opt_input_agg_column->column_->is_sparse()) {
71+
output_column_info.maybe_sparse_ = true;
72+
}
7073
} else {
7174
output_column_info.maybe_sparse_ = true;
7275
}
@@ -214,6 +217,16 @@ std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::g
214217
);
215218
}
216219

220+
// Build rs index for each sparse input column once, for fast range presence checks below.
221+
std::vector<std::unique_ptr<util::BitIndex>> rs_indices(input_agg_columns.size());
222+
for (size_t col_idx = 0; col_idx < input_agg_columns.size(); ++col_idx) {
223+
const auto& opt_col = input_agg_columns[col_idx];
224+
if (opt_col.has_value() && opt_col->column_->is_sparse()) {
225+
rs_indices[col_idx] = std::make_unique<util::BitIndex>();
226+
opt_col->column_->sparse_map().build_rs_index(rs_indices[col_idx].get());
227+
}
228+
}
229+
217230
util::BitSet sparse_map(output_row_count);
218231
for (int64_t out_row = 0; out_row < output_row_count; ++out_row) {
219232
const auto& start = mapping[out_row];
@@ -222,7 +235,25 @@ std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::g
222235
for (size_t col_idx = start.input_column_idx;
223236
col_idx < last_contributing_exclusive && col_idx < input_agg_columns.size();
224237
++col_idx) {
225-
if (input_agg_columns[col_idx].has_value()) {
238+
const auto& opt_col = input_agg_columns[col_idx];
239+
if (!opt_col.has_value()) {
240+
continue;
241+
}
242+
const auto& col = *opt_col->column_;
243+
if (!col.is_sparse()) {
244+
sparse_map.set(out_row);
245+
break;
246+
}
247+
const size_t range_start = (col_idx == start.input_column_idx) ? start.offset : 0;
248+
const size_t range_end_exclusive =
249+
(col_idx == end.input_column_idx) ? end.offset : static_cast<size_t>(col.last_row()) + 1;
250+
if (range_start >= range_end_exclusive) {
251+
continue;
252+
}
253+
const auto cnt = col.sparse_map().count_range(
254+
bv_size(range_start), bv_size(range_end_exclusive - 1), *rs_indices[col_idx]
255+
);
256+
if (cnt > 0) {
226257
sparse_map.set(out_row);
227258
break;
228259
}
@@ -245,7 +276,6 @@ std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::g
245276

246277
template<AggregationOperator aggregation_operator, ResampleBoundary closed_boundary>
247278
std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::aggregate(
248-
const std::vector<std::shared_ptr<Column>>& input_index_columns,
249279
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns, const ResampleMapping& mapping,
250280
StringPool& string_pool
251281
) const {
@@ -304,47 +334,50 @@ std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::a
304334
continue;
305335
}
306336
const auto& agg_column = *opt_input_agg_column;
307-
const auto& input_index_column = input_index_columns[col_idx];
308337
details::visit_type(
309338
agg_column.column_->type().data_type(),
310339
[&, &agg_column = agg_column](auto input_type_desc_tag) {
311340
using input_type_info = ScalarTypeInfo<decltype(input_type_desc_tag)>;
312341
if constexpr (is_aggregation_allowed<input_type_info, output_type_info>(
313342
aggregation_operator
314343
)) {
315-
schema::check<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>(
316-
!agg_column.column_->is_sparse() &&
317-
agg_column.column_->row_count() == input_index_column->row_count(),
318-
"Not implemented yet: Cannot aggregate sparse column '{}' during "
319-
"resampling.",
320-
get_input_column_name().value
321-
);
322344
auto agg_data = agg_column.column_->data();
323-
auto col_it = agg_data.template cbegin<
324-
typename input_type_info::TDT,
325-
IteratorType::ENUMERATED,
326-
IteratorDensity::DENSE>();
327-
const auto col_end = agg_data.template cend<
328-
typename input_type_info::TDT,
329-
IteratorType::ENUMERATED,
330-
IteratorDensity::DENSE>();
331-
for (; col_it != col_end && output_it != output_end; ++col_it) {
332-
const auto idx = static_cast<size_t>(col_it->idx());
333-
// After advance_output, the next bucket may not include this column.
334-
if (col_idx < start_col_idx) {
335-
break;
336-
}
337-
// Skip rows before the bucket's start (e.g., right-closed bucket excluding
338-
// its leftmost edge, or date_range trimming).
339-
if (col_idx == start_col_idx && idx < start_col_offset) {
340-
continue;
341-
}
342-
push_to_aggregator<input_type_info::data_type>(
343-
bucket_aggregator, col_it->value(), agg_column
344-
);
345-
if (col_idx == end_col_idx && idx + 1 == end_col_offset) {
346-
advance_output();
345+
const auto run_iter = [&]<IteratorDensity input_density>() {
346+
auto col_it = agg_data.template cbegin<
347+
typename input_type_info::TDT,
348+
IteratorType::ENUMERATED,
349+
input_density>();
350+
const auto col_end = agg_data.template cend<
351+
typename input_type_info::TDT,
352+
IteratorType::ENUMERATED,
353+
input_density>();
354+
for (; col_it != col_end && output_it != output_end; ++col_it) {
355+
const auto idx = static_cast<size_t>(col_it->idx());
356+
// Finalise any buckets whose exclusive end falls at or before this row.
357+
// Driven by logical idx so it works the same for sparse columns where the
358+
// last present row before end_col_offset may be < end_col_offset - 1.
359+
while (output_it != output_end &&
360+
(col_idx > end_col_idx ||
361+
(col_idx == end_col_idx && idx >= end_col_offset))) {
362+
advance_output();
363+
}
364+
if (output_it == output_end || col_idx < start_col_idx) {
365+
break;
366+
}
367+
// Skip rows before the bucket's start (e.g., right-closed bucket excluding
368+
// its leftmost edge, or date_range trimming).
369+
if (col_idx == start_col_idx && idx < start_col_offset) {
370+
continue;
371+
}
372+
push_to_aggregator<input_type_info::data_type>(
373+
bucket_aggregator, col_it->value(), agg_column
374+
);
347375
}
376+
};
377+
if (agg_column.column_->is_sparse()) {
378+
run_iter.template operator()<IteratorDensity::SPARSE>();
379+
} else {
380+
run_iter.template operator()<IteratorDensity::DENSE>();
348381
}
349382
}
350383
}

cpp/arcticdb/processing/sorted_aggregation.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ struct ISortedAggregator {
4141
[[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); };
4242
[[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); };
4343
[[nodiscard]] std::optional<Column> aggregate(
44-
const std::vector<std::shared_ptr<Column>>& input_index_columns,
4544
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns, const ResampleMapping& mapping,
4645
StringPool& string_pool
4746
) const {
48-
return folly::poly_call<2>(*this, input_index_columns, input_agg_columns, mapping, string_pool);
47+
return folly::poly_call<2>(*this, input_agg_columns, mapping, string_pool);
4948
}
5049
void check_aggregator_supported_with_data_type(DataType data_type) const {
5150
folly::poly_call<3>(*this, data_type);
@@ -398,7 +397,6 @@ class SortedAggregator {
398397
[[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; }
399398

400399
[[nodiscard]] std::optional<Column> aggregate(
401-
const std::vector<std::shared_ptr<Column>>& input_index_columns,
402400
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns, const ResampleMapping& mapping,
403401
StringPool& string_pool
404402
) const;

python/tests/unit/arcticdb/version_store/test_arrow_sparse.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -754,10 +754,6 @@ def test_named_aggs(self, group_col):
754754
_check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns, check_row_order=False)
755755

756756

757-
@pytest.mark.xfail(
758-
reason="Resample rejects sparse columns. (monday ref: 11679866800)",
759-
raises=Exception,
760-
)
761757
class TestSparseArrowResample:
762758
sym = "test_sparse_resample"
763759

@@ -920,10 +916,6 @@ def test_concat_with_index(self):
920916
expected = pl.concat([pl.from_arrow(t1), pl.from_arrow(t2)])
921917
polars_assert_frame_equal(received, expected)
922918

923-
@pytest.mark.xfail(
924-
reason="Resample rejects sparse columns: sorted_aggregation.cpp 'Cannot aggregate column as it is sparse'",
925-
raises=Exception,
926-
)
927919
def test_concat_with_resample(self):
928920
dates1 = pd.date_range("2025-01-01", periods=6, freq="h")
929921
dates2 = pd.date_range("2025-01-01T06:00:00", periods=6, freq="h")

0 commit comments

Comments
 (0)