Skip to content

Commit 6aa070b

Browse files
authored
Wire crypto scalar functions with analytics-backend-datafusion (opensearch-project#21611)
Signed-off-by: Tanik Pansuriya <panbhai@amazon.com>
1 parent 0ee9f72 commit 6aa070b

12 files changed

Lines changed: 1129 additions & 2 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ public enum ScalarFunction {
9090
TONUMBER(Category.STRING, SqlKind.OTHER_FUNCTION),
9191
STRCMP(Category.STRING, SqlKind.OTHER_FUNCTION),
9292

93+
// ── Cryptographic hash ─────────────────────────────────────────────
94+
// md5(x), sha1(x), sha2(x, bitLen) with bitLen ∈ {224,256,384,512}, crc32(x).
95+
MD5(Category.STRING, SqlKind.OTHER_FUNCTION),
96+
SHA1(Category.STRING, SqlKind.OTHER_FUNCTION),
97+
SHA2(Category.STRING, SqlKind.OTHER_FUNCTION),
98+
CRC32(Category.STRING, SqlKind.OTHER_FUNCTION),
99+
93100
// ── Math ─────────────────────────────────────────────────────────
94101
PLUS(Category.MATH, SqlKind.PLUS),
95102
MINUS(Category.MATH, SqlKind.MINUS),

sandbox/plugins/analytics-backend-datafusion/rust/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ jsonpath-rust = "=0.7.5"
7171
# mvfind UDF — regex matching against stringified array elements
7272
regex = "=1.12.3"
7373

74+
# Cryptographic hash UDFs. `sha1` is a dedicated crate (RFC 3174); DataFusion
75+
# only exposes SHA-2 variants via its `digest` UDF. `crc32fast` is a SIMD-accelerated
76+
# IEEE 802.3 CRC-32 implementation used by `crc32`; also pulled in via workspace
77+
# for parquet metadata (`crc32fast = "1.4"`) so adding it here is a zero-cost alias.
78+
sha1 = "0.11"
79+
crc32fast = { workspace = true }
80+
7481
[dev-dependencies]
7582
criterion = { workspace = true }
7683
tempfile = { workspace = true }
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
//! crc32(input): CRC-32 (IEEE 802.3 polynomial) of the UTF-8 byte stream of `input`.
10+
11+
use std::any::Any;
12+
use std::hash::{Hash, Hasher};
13+
use std::sync::Arc;
14+
15+
use datafusion::arrow::array::{Array, ArrayRef, AsArray, Int64Array};
16+
use datafusion::arrow::datatypes::DataType;
17+
use datafusion::common::{exec_err, Result, ScalarValue};
18+
use datafusion::execution::context::SessionContext;
19+
use datafusion::logical_expr::{
20+
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
21+
};
22+
23+
pub fn register_all(ctx: &SessionContext) {
24+
ctx.register_udf(ScalarUDF::from(Crc32Udf::new()));
25+
}
26+
27+
/// `crc32(varchar)` → `bigint`. Non-negative integer in the range [0, 2^32 - 1].
28+
#[derive(Debug)]
29+
pub struct Crc32Udf {
30+
signature: Signature,
31+
}
32+
33+
impl Crc32Udf {
34+
pub fn new() -> Self {
35+
Self {
36+
signature: Signature::one_of(
37+
vec![
38+
TypeSignature::Exact(vec![DataType::Utf8]),
39+
TypeSignature::Exact(vec![DataType::LargeUtf8]),
40+
],
41+
Volatility::Immutable,
42+
),
43+
}
44+
}
45+
}
46+
47+
impl Default for Crc32Udf {
48+
fn default() -> Self {
49+
Self::new()
50+
}
51+
}
52+
53+
// `ScalarUDFImpl` requires DynEq + DynHash. All instances are functionally identical
54+
// (no parameterization), so equality is trivial.
55+
impl PartialEq for Crc32Udf {
56+
fn eq(&self, _: &Self) -> bool {
57+
true
58+
}
59+
}
60+
impl Eq for Crc32Udf {}
61+
impl Hash for Crc32Udf {
62+
fn hash<H: Hasher>(&self, state: &mut H) {
63+
"crc32".hash(state);
64+
}
65+
}
66+
67+
impl ScalarUDFImpl for Crc32Udf {
68+
fn as_any(&self) -> &dyn Any {
69+
self
70+
}
71+
72+
fn name(&self) -> &str {
73+
"crc32"
74+
}
75+
76+
fn signature(&self) -> &Signature {
77+
&self.signature
78+
}
79+
80+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
81+
Ok(DataType::Int64)
82+
}
83+
84+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
85+
if args.args.len() != 1 {
86+
return exec_err!("crc32 expects exactly 1 argument, got {}", args.args.len());
87+
}
88+
match &args.args[0] {
89+
ColumnarValue::Scalar(ScalarValue::Utf8(opt))
90+
| ColumnarValue::Scalar(ScalarValue::LargeUtf8(opt)) => Ok(ColumnarValue::Scalar(
91+
ScalarValue::Int64(opt.as_ref().map(|s| hash_u32_to_i64(s.as_bytes()))),
92+
)),
93+
ColumnarValue::Scalar(other) => exec_err!("crc32: expected Utf8 input, got {other:?}"),
94+
ColumnarValue::Array(arr) => {
95+
let out: Int64Array = match arr.data_type() {
96+
DataType::Utf8 => arr
97+
.as_string::<i32>()
98+
.iter()
99+
.map(|opt| opt.map(|s| hash_u32_to_i64(s.as_bytes())))
100+
.collect(),
101+
DataType::LargeUtf8 => arr
102+
.as_string::<i64>()
103+
.iter()
104+
.map(|opt| opt.map(|s| hash_u32_to_i64(s.as_bytes())))
105+
.collect(),
106+
other => {
107+
return exec_err!("crc32: expected Utf8 array, got {other:?}");
108+
}
109+
};
110+
Ok(ColumnarValue::Array(Arc::new(out) as ArrayRef))
111+
}
112+
}
113+
}
114+
}
115+
116+
/// IEEE 802.3 CRC-32, zero-extended to i64 so the value stays non-negative.
117+
fn hash_u32_to_i64(value: &[u8]) -> i64 {
118+
crc32fast::hash(value) as i64
119+
}
120+
121+
#[cfg(test)]
122+
mod tests {
123+
use super::*;
124+
use datafusion::arrow::array::StringArray;
125+
use datafusion::arrow::datatypes::Field;
126+
127+
fn udf() -> Crc32Udf {
128+
Crc32Udf::new()
129+
}
130+
131+
fn invoke_scalar(value: ScalarValue) -> Result<ColumnarValue> {
132+
let u = udf();
133+
let return_field = Arc::new(Field::new(u.name(), DataType::Int64, true));
134+
let args = ScalarFunctionArgs {
135+
args: vec![ColumnarValue::Scalar(value)],
136+
arg_fields: vec![Arc::new(Field::new("v", DataType::Utf8, true))],
137+
number_rows: 1,
138+
return_field,
139+
config_options: Arc::new(Default::default()),
140+
};
141+
u.invoke_with_args(args)
142+
}
143+
144+
fn as_i64(v: ColumnarValue) -> Option<i64> {
145+
match v {
146+
ColumnarValue::Scalar(ScalarValue::Int64(opt)) => opt,
147+
other => panic!("expected Int64 scalar, got {other:?}"),
148+
}
149+
}
150+
151+
// Standard reference: crc32("") == 0, crc32("a") == 0xE8B7BE43.
152+
// See https://rosettacode.org/wiki/CRC-32#Rust for the canonical vector.
153+
#[test]
154+
fn empty_input_is_zero() {
155+
let out = invoke_scalar(ScalarValue::Utf8(Some(String::new()))).unwrap();
156+
assert_eq!(as_i64(out), Some(0));
157+
}
158+
159+
#[test]
160+
fn single_char_matches_reference_vector() {
161+
let out = invoke_scalar(ScalarValue::Utf8(Some("a".to_string()))).unwrap();
162+
assert_eq!(as_i64(out), Some(0xE8B7_BE43));
163+
}
164+
165+
#[test]
166+
fn null_input_yields_null() {
167+
let out = invoke_scalar(ScalarValue::Utf8(None)).unwrap();
168+
assert!(as_i64(out).is_none());
169+
}
170+
171+
#[test]
172+
fn result_is_always_non_negative() {
173+
// 0xFFFFFFFF ≈ 4.29e9; must fit as a positive i64 (zero-extended), never a
174+
// negative sign-extended i32.
175+
let out = invoke_scalar(ScalarValue::Utf8(Some("123456789".to_string()))).unwrap();
176+
let got = as_i64(out).unwrap();
177+
assert!(got >= 0, "crc32 must be non-negative, got {got}");
178+
// Reference vector for "123456789" = 0xCBF43926.
179+
assert_eq!(got, 0xCBF4_3926);
180+
}
181+
182+
#[test]
183+
fn array_input_preserves_null_mask() {
184+
let u = udf();
185+
let return_field = Arc::new(Field::new(u.name(), DataType::Int64, true));
186+
let values: ArrayRef = Arc::new(StringArray::from(vec![
187+
Some("a"),
188+
None,
189+
Some(""),
190+
]));
191+
let args = ScalarFunctionArgs {
192+
args: vec![ColumnarValue::Array(values)],
193+
arg_fields: vec![Arc::new(Field::new("v", DataType::Utf8, true))],
194+
number_rows: 3,
195+
return_field,
196+
config_options: Arc::new(Default::default()),
197+
};
198+
match u.invoke_with_args(args).unwrap() {
199+
ColumnarValue::Array(arr) => {
200+
let ints = arr.as_primitive::<datafusion::arrow::datatypes::Int64Type>();
201+
assert_eq!(ints.value(0), 0xE8B7_BE43);
202+
assert!(ints.is_null(1));
203+
assert_eq!(ints.value(2), 0);
204+
}
205+
other => panic!("expected Array, got {other:?}"),
206+
}
207+
}
208+
}

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ pub(crate) fn coerce_args(
121121
}
122122

123123
pub mod convert_tz;
124+
pub mod crc32;
124125
pub mod date_format;
125126
pub mod extract;
126127
pub mod from_unixtime;
@@ -138,6 +139,7 @@ pub mod mvappend;
138139
pub mod mvfind;
139140
pub mod mvzip;
140141
pub(crate) mod mysql_format;
142+
pub mod sha1;
141143
pub mod str_to_date;
142144
pub mod strftime;
143145
pub mod time_format;
@@ -153,6 +155,7 @@ pub mod tostring;
153155
// and restart the OpenSearch JVM (the loaded dylib is JVM-cached).
154156
pub fn register_all(ctx: &SessionContext) {
155157
convert_tz::register_all(ctx);
158+
crc32::register_all(ctx);
156159
date_format::register_all(ctx);
157160
extract::register_all(ctx);
158161
from_unixtime::register_all(ctx);
@@ -168,13 +171,14 @@ pub fn register_all(ctx: &SessionContext) {
168171
mvappend::register_all(ctx);
169172
mvfind::register_all(ctx);
170173
mvzip::register_all(ctx);
174+
sha1::register_all(ctx);
171175
str_to_date::register_all(ctx);
172176
strftime::register_all(ctx);
173177
time_format::register_all(ctx);
174178
tonumber::register_all(ctx);
175179
tostring::register_all(ctx);
176180
log::info!(
177-
"OpenSearch UDF register_all: convert_tz, date_format, extract, from_unixtime, json_append, json_array_length, json_delete, json_extend, json_extract, json_keys, json_set, makedate, maketime, mvappend, mvfind, mvzip, str_to_date, strftime, time_format, tonumber, tostring registered"
181+
"OpenSearch UDF register_all: convert_tz, crc32, date_format, extract, from_unixtime, json_append, json_array_length, json_delete, json_extend, json_extract, json_keys, json_set, makedate, maketime, mvappend, mvfind, mvzip, sha1, str_to_date, strftime, time_format, tonumber, tostring registered"
178182
);
179183
}
180184

0 commit comments

Comments
 (0)