Skip to content

Support array_filter on arrays of structs #62

@aalexandrov

Description

@aalexandrov
  • Status: Draft
  • Date: January 2025

TL;DR

When we initially rolled out the STRUCT feature, we decided to add support for STRUCT functions on a "need to" basis.

One of our newest customers is heavily relying on complex struct types for their data model.
As part of their on-boarding process, they are asking us to add STRUCT support to some of the missing primitives.
The current feature request (FIR-51585) is to handle structs in array_filter, but I can envision similar requests in the future for other functions (for example array_sort).

To move forward, we first need to decide how to approach adding STRUCT support to functions that not only operate on, but also produce structs or arrays of structs.

Background

One of the design goals of adding STRUCT support was to integrate well with existing optimizer features (such as column pruning). The current approach to achieve this is to treat STRUCT columns as a planner-level concept and eliminate them as part of the optimization pipeline. We achieve this by pulling struct and struct_array(...) calls from the leafs up towards the root of the logical query plan using a family of StructPullUpThrough<Op>Rule rules. After this transform, the STRUCT value is ideally constructed only once at the plan root, and all other optimizer features (such as the Project<Op>TransposeRule rules that implement column pruning) can continue working without further modifications.

Detailed design

We are currently discussing two solutions.

Solution 2 works for array_filter, and can also be applied to array_sort, which AFAICT are the only two array_~ functions that don’t support STRUCT values.

Solution 1 is more general, but is at odds with some optimizer invariants that are baked into the current design and implementation of the STRUCT type, and will require more work both in the optimizer and planner compared to Solution 2.

Solution 1: Runtime Implementation

The first solution is to have a runtime implementation of array_filter that can operate on arrays of structs. This is also aligned with our the strategy that we last aggreed.

Pros

  1. Simple filters should work out of the box.
  2. If we resolve the conceptual limitations (see the Cons section below), we can apply the same strategy for all other runtime functions.

Cons

  1. This introduces a new STRUCT constructor.

This might seem benign at first, but it has far-reaching consequences for the optimizer. Currently, the only way to create STRUCT values is either through a struct or a struct_array call. Consequently, the optimizer assumes the constituent fields of a struct expression can be extracted by traversing a tree of struct / struct_array calls to its leafs. This assumption is baked into various places:

  • Most of the functionality in StructPullUpUtils, and consequently in all StructPullUp~Rule rules.
  • Reduction rules that involve STRUCT expressions.
  • Producing a projection of the physical columns when inserting into tables with STRUCT columns.

We can fix the StructPullUp~Rule rules (and consequently unblock column pruning) by implementing the approach outlined in APPENDIX A: Column pruning with SOLUTION 1. However, this approach comes with some caveats, most notably that we can only support array_filter calls that are not nested inside lambdas.

Solution 2: Planner Rewrite

The second solution is to introduce a new runtime primitive

template<T>
array<T> array_select(array<bool> pos_mask, array<T> arr)

that accepts two same-sized arrays pos_mask and arr, and produces an output array where arr[i] is retained iff pos_mask[i] == true.

With this, we can rewrite array_filter as follows:

<Operator> array_filter(<lambda>, struct_array(<arr_1>, ...,<arr_n>))
  ...

// => rewrite as array_map + array_select

<Operator> array_select(pos_mask, struct_array(a_1, ..., a_n))
  Project ..., a_1 := <arr_1>, ..., a_n := <arr_n>, pos_mask := array_map(<lambda>, struct_array(a_1, ...,a_n))
    ...

The rewritten form is more suitable for term reduction along the lines that it’s done today. For example, we can normalize the STRUCT-producing expression by pulling up the struct_array(...) through the array_select(...), and then rely on the regular StructPullUpThrough<Operator>Rule to lift the resulting struct_array(a_1, ..., a_n) above the <Operator> if needed.

// => pull up struct_array(...) through array_select(...)

<Operator> struct_array(array_select(pos_mask, a_1), ..., array_select(pos_mask, a_n))
  Project ..., a_1 := <arr_1>, ..., a_n := <arr_n>, pos_mask := array_map(<lambda>, struct_array(a_1, ..., a_n))

// => remove unused a_i from pos_mask definition (here, assuming that <lambda> uses only a_1 and a_3)

<Operator> struct_array(array_select(pos_mask, a_1), ..., array_select(pos_mask, a_n))
  Project ..., a_1 := <arr_1>, ..., a_n := <arr_n>, pos_mask := array_map(<lambda>, struct_array(a_1, a_3))

// => Pull up struct through <Operator>

Project ..., struct_array(a_1, ..., a_n)
  <Operator> ...
    Project a_1 := array_select(pos_mask, a_1), ..., a_n := array_select(pos_mask, a_n))
      Project ..., a_1 := <arr_1>, ..., a_n := <arr_n>, pos_mask := array_map(<lambda>, struct_array(<arr_1>, <arr_3>))

Pros

  1. Allows to pull up struct_array constructors through array_filter calls.
  2. STRUCT field pruning and other orthogonal optimizer features should continue working out of the box.

Cons

  1. It is not clear if this solution generalizes to other functions that produce STRUCT values or arrays of STRUCT values.
  2. Expressions such as pos_mask and arr_i are referenced more than once in the final result. To avoid duplicate computation (and inconsistent result in case these expressions are non-deterministic), we have two options:
    1. Introduce a Projection or a Map under the current operator that binds these values.
    2. Introduce a let ... in primitive.

Both strategies for avoiding duplicate computation will require some extra work. The upside is that we can reuse those to solve the random() issue that was recently discussed. If we go with strategy (a), we can only support array_filter calls that are not nested inside lambdas.

Test plan

For both solutions, we can use SQL and planner tests to cover the new feature.

In addition, for Solution 2, we can use fuzzying to check that we don’t produce plans that cannot be fully supported by the runtime.

Measuring success

Ideally, the solution here will both (a) resolve the current customer pain, and (b) allow us to implement other STRUCT-producing functions in the future without having to worry about their interplay with existing optimizer infrastructure.

Alternatives considered

TODO: move one of the two alternatives here once we decide what to do.

Appendix A: Column pruning with SOLUTION 1

Consider a query that has one of the following expressions

-- expression 1
array_transform(e -> e.value, array_filter(e -> lower(e.key) = 'mykey', entries))

-- expression 2:
(array_filter(e -> lower(p.key) = 'mykey', entries)[0]).value

Where entries is an array of STRUCT(key TEXT, value TEXT, junk TEXT) values. The goal is to prune the column representing the junk field.

With the runtime function, we can attempt to achieve column this as follows (steps already supported on master are marked with ✅ and those that need to be implemented with ):

// ✅ 0. Initial plan contains an arbitrary <Operator> with the given expression

[i] <Operator> ..., array_transform(e -> e.value, array_filter(e -> lower(e.key) = 'mykey', entries))
  ...
    [1] ScanTable entries

// ✅ 1. => After ScanOnFlattenedColumnsRule

[i] <Operator> ..., array_transform(e -> e.value, array_filter(e -> lower(e.key) = 'mykey', entries))
  ...
    [2] Projection entries := struct_array(entries.key, entries.value)
      [1] ScanTable entries.key, entries.value

// ✅ 2. => After pulling up the struct_array(...) call

[i] <Operator> ..., array_transform(e -> e.value, array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value, entries.junk)))
  ...
    [1] ScanTable entries.key, entries.value, entries.junk

// ❌ 3. => Putting the array_filter(...) and its field projections into separate Projection nodes under <Operator>

[i] <Operator> ..., array_transform(e -> e.value, struct_array(keys_f, values_f, junk_f))
  [j] Projection keys_f := project(entries_f, “key”), values_f := project(entries_f, “value”), junk_f := project(entries_f, “junk”)
    [k] Projection entries_f := array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value, entries.junk))
      ...
        [1] ScanTable entries.key, entries.value, entries.junk

// ✅ 4. => Reduce scalar expression in [i] <Operator>:
//         Either:
//         array_transform(e -> e.value, struct_array(keys_f, values_f, junk_f)) =>
//         array_transform(key, value, junk -> key, keys_f, values_f, junk_f) =>
//         array_transform(key -> key, keys_f) =>
//         values_f
//
//         Or:
//         (struct_array(keys_f, values_f, junk_f)[0]).value =>
//         (struct(keys_f[0], values_f[0], junk_f[0])).value =>
//         values_f[0]

[i] <Operator> ..., values_f
  [j] Projection keys_f := project(entries_f, “key”), values_f := project(entries_f, “value”), junk_f := project(entries_f, “junk”)
    [k] Projection entries_f := array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value, entries.junk))
      ...
        [1] ScanTable entries.key, entries.value, entries.junk

// ✅ 5. => Prune unused entries_f projections

[i] <Operator> ..., values_f
  [j] Projection keys_f := project(entries_f, “key”)
    [k] Projection entries_f := array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value, entries.junk))
      ...
        [1] ScanTable entries.key, entries.value, entries.junk

// ❌ 6. => Prune unused entries_f projections from the array_filter(...) argument

[i] <Operator> ..., values_f
  [j] Projection keys_f := project(entries_f, “key”)
    [k] Projection entries_f := array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value))
      ...
        [1] ScanTable entries.key, entries.value, entries.junk

// ✅ 7. => Prune unused ScanTable columns

[i] <Operator> ..., values_f
  [j] Projection keys_f := project(entries_f, “key”)
    [k] Projection entries_f := array_filter(e -> lower(e.key) = 'mykey', struct_array(entries.key, entries.value))
      ...
        [1] ScanTable entries.key, entries.value

The following aspects need to be called out when making a design choice:

  • This doesn’t work out of the box with the current implementation of some of our rules. For example, ProjectMergeRule will just remove the project(entries_f, “<path>”) entries introduced at [j] Projection.
  • Step 3 can only be done if the array_filter appears at a top-level expression (that is, it is not nested inside a <lambda> body. We also need to add runtime support for a project function that takes a (possibly nested) array of structs value and a full path to a non-struct value and projects the individual column representing this value.
  • Step 6 relies on inspecting the [j] Projection to determine the entries_f paths used downstream from [k] Projection. To determine the full set of paths that can be pruned, though, we have to inspect the <lambda> (and possibly nested lambdas) of the array_filter.

Appendix B: Current STRUCT support

Legend

  • ❔ unknown
  • ✅ supported through a reduction rule
  • ❌ explicitly prohibited in the function signature

List of functions

  • SqlScalarFunction::AllMatch
  • SqlScalarFunction::Array
  • SqlScalarFunction::ArrayAvg
  • SqlScalarFunction::ArrayConcat
  • SqlScalarFunction::ArrayContains
  • SqlScalarFunction::ArrayCount
  • SqlScalarFunction::ArrayCountDistinct
  • SqlScalarFunction::ArrayCumulativeSum
  • SqlScalarFunction::ArrayDistinctPG
  • SqlScalarFunction::ArrayEnumerate
  • SqlScalarFunction::ArrayExists
  • SqlScalarFunction::ArrayFill
  • SqlScalarFunction::ArrayFilter
  • SqlScalarFunction::ArrayFirst
  • SqlScalarFunction::ArrayFirstIndex
  • SqlScalarFunction::ArrayFlatten
  • SqlScalarFunction::ArrayFlattenLegacy
  • SqlScalarFunction::ArrayIndex
  • SqlScalarFunction::ArrayIntersect
  • SqlScalarFunction::ArrayMap
  • SqlScalarFunction::ArrayMapNullIfNewNull
  • SqlScalarFunction::ArrayMax
  • SqlScalarFunction::ArrayMin
  • SqlScalarFunction::ArrayProduct
  • SqlScalarFunction::ArrayReplaceBackwards
  • SqlScalarFunction::ArrayReverse
  • SqlScalarFunction::ArrayReverseSort
  • SqlScalarFunction::ArrayReverseSplit
  • SqlScalarFunction::ArraySlice2
  • SqlScalarFunction::ArraySort
  • SqlScalarFunction::ArraySortedLowerBound
  • SqlScalarFunction::ArraysOverlap
  • SqlScalarFunction::ArraySplit
  • SqlScalarFunction::ArraySumPg
  • SqlScalarFunction::ArrayToString
  • SqlScalarFunction::ArrayUniq
  • SqlScalarFunction::ArrayZip
  • SqlScalarFunction::IndexOfPG
  • SqlScalarFunction::Length
  • SqlScalarFunction::Range
  • SqlScalarFunction::RuntimeArrayToString

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions