Skip to content

Commit fc98d5c

Browse files
kazantsev-maksimKazantsev Maksim
andauthored
feat: Implement Spark bitmap_bucket_number function (#20288)
## Which issue does this PR close? N/A ## Rationale for this change Add new function: https://spark.apache.org/docs/latest/api/sql/index.html#bitmap_bucket_number ## What changes are included in this PR? - Implementation - Unit Tests - SLT tests ## Are these changes tested? Yes, tests added as part of this PR. ## Are there any user-facing changes? No, these are new function. --------- Co-authored-by: Kazantsev Maksim <mn.kazantsev@gmail.com>
1 parent 7f99947 commit fc98d5c

3 files changed

Lines changed: 278 additions & 1 deletion

File tree

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, AsArray, Int64Array};
19+
use arrow::datatypes::Field;
20+
use arrow::datatypes::{DataType, FieldRef, Int8Type, Int16Type, Int32Type, Int64Type};
21+
use datafusion::logical_expr::{ColumnarValue, Signature, TypeSignature, Volatility};
22+
use datafusion_common::utils::take_function_args;
23+
use datafusion_common::{Result, internal_err};
24+
use datafusion_expr::{ScalarFunctionArgs, ScalarUDFImpl};
25+
use datafusion_functions::utils::make_scalar_function;
26+
use std::any::Any;
27+
use std::sync::Arc;
28+
29+
/// Spark-compatible `bitmap_bucket_number` expression
30+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#bitmap_bucket_number>
31+
#[derive(Debug, PartialEq, Eq, Hash)]
32+
pub struct BitmapBucketNumber {
33+
signature: Signature,
34+
}
35+
36+
impl Default for BitmapBucketNumber {
37+
fn default() -> Self {
38+
Self::new()
39+
}
40+
}
41+
42+
impl BitmapBucketNumber {
43+
pub fn new() -> Self {
44+
Self {
45+
signature: Signature::one_of(
46+
vec![
47+
TypeSignature::Exact(vec![DataType::Int8]),
48+
TypeSignature::Exact(vec![DataType::Int16]),
49+
TypeSignature::Exact(vec![DataType::Int32]),
50+
TypeSignature::Exact(vec![DataType::Int64]),
51+
],
52+
Volatility::Immutable,
53+
),
54+
}
55+
}
56+
}
57+
58+
impl ScalarUDFImpl for BitmapBucketNumber {
59+
fn as_any(&self) -> &dyn Any {
60+
self
61+
}
62+
63+
fn name(&self) -> &str {
64+
"bitmap_bucket_number"
65+
}
66+
67+
fn signature(&self) -> &Signature {
68+
&self.signature
69+
}
70+
71+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
72+
internal_err!("return_field_from_args should be used instead")
73+
}
74+
75+
fn return_field_from_args(
76+
&self,
77+
args: datafusion_expr::ReturnFieldArgs,
78+
) -> Result<FieldRef> {
79+
Ok(Arc::new(Field::new(
80+
self.name(),
81+
DataType::Int64,
82+
args.arg_fields[0].is_nullable(),
83+
)))
84+
}
85+
86+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
87+
make_scalar_function(bitmap_bucket_number_inner, vec![])(&args.args)
88+
}
89+
}
90+
91+
pub fn bitmap_bucket_number_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
92+
let [array] = take_function_args("bitmap_bucket_number", arg)?;
93+
match &array.data_type() {
94+
DataType::Int8 => {
95+
let result: Int64Array = array
96+
.as_primitive::<Int8Type>()
97+
.iter()
98+
.map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
99+
.collect();
100+
Ok(Arc::new(result))
101+
}
102+
DataType::Int16 => {
103+
let result: Int64Array = array
104+
.as_primitive::<Int16Type>()
105+
.iter()
106+
.map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
107+
.collect();
108+
Ok(Arc::new(result))
109+
}
110+
DataType::Int32 => {
111+
let result: Int64Array = array
112+
.as_primitive::<Int32Type>()
113+
.iter()
114+
.map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
115+
.collect();
116+
Ok(Arc::new(result))
117+
}
118+
DataType::Int64 => {
119+
let result: Int64Array = array
120+
.as_primitive::<Int64Type>()
121+
.iter()
122+
.map(|opt| opt.map(bitmap_bucket_number))
123+
.collect();
124+
Ok(Arc::new(result))
125+
}
126+
data_type => {
127+
internal_err!("bitmap_bucket_number does not support {data_type}")
128+
}
129+
}
130+
}
131+
132+
const NUM_BYTES: i64 = 4 * 1024;
133+
const NUM_BITS: i64 = NUM_BYTES * 8;
134+
135+
fn bitmap_bucket_number(value: i64) -> i64 {
136+
if value > 0 {
137+
1 + (value - 1) / NUM_BITS
138+
} else {
139+
value / NUM_BITS
140+
}
141+
}

datafusion/spark/src/function/bitmap/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod bitmap_bit_position;
19+
pub mod bitmap_bucket_number;
1920
pub mod bitmap_count;
2021

2122
use datafusion_expr::ScalarUDF;
@@ -24,6 +25,10 @@ use std::sync::Arc;
2425

2526
make_udf_function!(bitmap_count::BitmapCount, bitmap_count);
2627
make_udf_function!(bitmap_bit_position::BitmapBitPosition, bitmap_bit_position);
28+
make_udf_function!(
29+
bitmap_bucket_number::BitmapBucketNumber,
30+
bitmap_bucket_number
31+
);
2732

2833
pub mod expr_fn {
2934
use datafusion_functions::export_functions;
@@ -38,8 +43,17 @@ pub mod expr_fn {
3843
"Returns the bit position for the given input child expression.",
3944
arg
4045
));
46+
export_functions!((
47+
bitmap_bucket_number,
48+
"Returns the bucket number for the given input child expression.",
49+
arg
50+
));
4151
}
4252

4353
pub fn functions() -> Vec<Arc<ScalarUDF>> {
44-
vec![bitmap_count(), bitmap_bit_position()]
54+
vec![
55+
bitmap_count(),
56+
bitmap_bit_position(),
57+
bitmap_bucket_number(),
58+
]
4559
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
query I
20+
SELECT bitmap_bucket_number(arrow_cast(1, 'Int8'));
21+
----
22+
1
23+
24+
query I
25+
SELECT bitmap_bucket_number(arrow_cast(127, 'Int8'));
26+
----
27+
1
28+
29+
query I
30+
SELECT bitmap_bucket_number(arrow_cast(-1, 'Int8'));
31+
----
32+
0
33+
34+
query I
35+
SELECT bitmap_bucket_number(arrow_cast(-64, 'Int8'));
36+
----
37+
0
38+
39+
query I
40+
SELECT bitmap_bucket_number(arrow_cast(-65, 'Int8'));
41+
----
42+
0
43+
44+
query I
45+
SELECT bitmap_bucket_number(arrow_cast(1, 'Int16'));
46+
----
47+
1
48+
49+
query I
50+
SELECT bitmap_bucket_number(arrow_cast(257, 'Int16'));
51+
----
52+
1
53+
54+
query I
55+
SELECT bitmap_bucket_number(arrow_cast(32767, 'Int16'));
56+
----
57+
1
58+
59+
query I
60+
SELECT bitmap_bucket_number(arrow_cast(-1, 'Int16'));
61+
----
62+
0
63+
64+
query I
65+
SELECT bitmap_bucket_number(arrow_cast(-256, 'Int16'));
66+
----
67+
0
68+
69+
query I
70+
SELECT bitmap_bucket_number(arrow_cast(1, 'Int32'));
71+
----
72+
1
73+
74+
query I
75+
SELECT bitmap_bucket_number(arrow_cast(65537, 'Int32'));
76+
----
77+
3
78+
79+
query I
80+
SELECT bitmap_bucket_number(arrow_cast(2147483647, 'Int32'));
81+
----
82+
65536
83+
84+
query I
85+
SELECT bitmap_bucket_number(arrow_cast(-1, 'Int32'));
86+
----
87+
0
88+
89+
query I
90+
SELECT bitmap_bucket_number(arrow_cast(-65536, 'Int32'));
91+
----
92+
-2
93+
94+
query I
95+
SELECT bitmap_bucket_number(arrow_cast(1, 'Int64'));
96+
----
97+
1
98+
99+
query I
100+
SELECT bitmap_bucket_number(arrow_cast(4294967297, 'Int64'));
101+
----
102+
131073
103+
104+
query I
105+
SELECT bitmap_bucket_number(arrow_cast(9223372036854775807, 'Int64'));
106+
----
107+
281474976710656
108+
109+
query I
110+
SELECT bitmap_bucket_number(arrow_cast(-1, 'Int64'));
111+
----
112+
0
113+
114+
query I
115+
SELECT bitmap_bucket_number(arrow_cast(-4294967296, 'Int64'));
116+
----
117+
-131072
118+
119+
query I
120+
SELECT bitmap_bucket_number(arrow_cast(-9223372036854775808, 'Int64'));
121+
----
122+
-281474976710656

0 commit comments

Comments
 (0)