Skip to content

Commit a4c7fd4

Browse files
authored
GEOMESA-3565 FSDS - Add hash partition scheme (#3535)
1 parent 1e98028 commit a4c7fd4

11 files changed

Lines changed: 635 additions & 6 deletions

File tree

docs/user/datastores/filter_functions.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ GeoMesa provides several custom CQL filter functions, which can be used for filt
77
Filter functions can be created through GeoTool's standard methods, i.e. ``ECQL.toFilter`` or directly through
88
``FilterFactory2.function``.
99

10+
bucketHash
11+
----------
12+
13+
Returns the hash of an attribute, modulo a number of buckets.
14+
1015
currentDate
1116
-----------
1217

@@ -21,6 +26,11 @@ dateToLong
2126
Converts a ``java.util.Date`` to a ``Long`` representing milliseconds since the epoch, as returned by
2227
``date.getTime()``.
2328

29+
murmurHash
30+
----------
31+
32+
Returns the hash of an attribute.
33+
2434
jsonPath
2535
--------
2636

docs/user/filesystem/partition_schemes.rst

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,17 @@ The following options are supported:
4343
* ``bits`` - The number of bits to use for the curve, which defines the area of each partition. For example, 2 bits would
4444
create ``2 ^ 2`` (4) regions, while 3 bits would create ``2 ^ 3`` (8) regions.
4545

46-
Attribute Schemes
47-
-----------------
46+
Attribute Scheme
47+
----------------
4848

49-
Attribute schemes partition data based on a lexicoded attribute value. The name must be:
49+
The attribute scheme partitions data based on a lexicoded attribute value. The name must be:
5050

5151
* ``attribute``
5252

53+
The following option is required:
54+
55+
* ``attribute`` - The name of the attribute used to partition
56+
5357
The following options are supported:
5458

5559
* ``default`` - A default value to use if the attribute is null
@@ -63,3 +67,20 @@ The following additional options are supported to bucket the partition values, d
6367
``divisor``. For example, with ``divisor=10``, ``100``, ``109``, etc will all be truncated to ``100``.
6468
* ``scale`` - For fractional type attributes (e.g. floats and doubles), the number of digits to keep to the right of the decimal
6569
place. For example, with ``scale=2``, ``100.001``, ``100.009``, etc will all be truncated to ``100.00``
70+
71+
The attribute scheme supports the following attribute types: ``String``, ``Integer``, ``Long``, ``Float`` and ``Double``.
72+
73+
Hash Scheme
74+
-----------
75+
76+
The hash scheme partitions data into buckets based on an attribute value. The name must be:
77+
78+
* ``hash``
79+
80+
The following options are required:
81+
82+
* ``attribute`` - The name of the attribute used to partition
83+
* ``buckets`` - The number of buckets used to partition
84+
85+
The hash scheme supports the following attribute types:
86+
``String``, ``Integer``, ``Long``, ``Float``, ``Double``, ``Date``, ``Bytes``, and ``UUID``.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
org.locationtech.geomesa.filter.function.BucketHashFunction
12
org.locationtech.geomesa.filter.function.Convert2ViewerFunction
23
org.locationtech.geomesa.filter.function.CurrentDateFunction
34
org.locationtech.geomesa.filter.function.DateToLong
45
org.locationtech.geomesa.filter.function.FastProperty
6+
org.locationtech.geomesa.filter.function.MurmurHashFunction
57
org.locationtech.geomesa.filter.function.ProxyIdFunction
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
package org.locationtech.geomesa.filter.function
10+
11+
import org.geotools.api.filter.expression.Expression
12+
import org.geotools.filter.FunctionExpressionImpl
13+
import org.geotools.filter.capability.FunctionNameImpl
14+
import org.geotools.filter.capability.FunctionNameImpl._
15+
16+
import java.util.{Date, UUID}
17+
18+
class BucketHashFunction extends FunctionExpressionImpl(BucketHashFunction.Name) {
19+
20+
import org.locationtech.geomesa.filter.function.MurmurHashFunction._
21+
22+
private var modulo: Int = _
23+
24+
override def setParameters(params: java.util.List[Expression]): Unit = {
25+
super.setParameters(params)
26+
modulo = getExpression(1).evaluate(null).asInstanceOf[Number].intValue()
27+
}
28+
29+
override def evaluate(o: AnyRef): AnyRef = {
30+
if (o == null) {
31+
return null
32+
}
33+
val value = getExpression(0).evaluate(o)
34+
if (value == null) {
35+
return null
36+
}
37+
val hash = value match {
38+
case v: String => StringHashing(v)
39+
case v: Integer => IntegerHashing(v)
40+
case v: java.lang.Long => LongHashing(v)
41+
case v: java.lang.Float => FloatHashing(v)
42+
case v: java.lang.Double => DoubleHashing(v)
43+
case v: Date => DateHashing(v)
44+
case v: Array[Byte] => ByteHashing(v)
45+
case v: UUID => UUIDHashing(v)
46+
case _ => StringHashing(o.toString)
47+
}
48+
Int.box((hash & Int.MaxValue) % modulo)
49+
}
50+
}
51+
52+
object BucketHashFunction {
53+
54+
val Name = new FunctionNameImpl(
55+
"bucketHash",
56+
classOf[java.lang.Integer],
57+
parameter("value", classOf[AnyRef]),
58+
parameter("modulo", classOf[Number]),
59+
)
60+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
package org.locationtech.geomesa.filter.function
10+
11+
import org.apache.commons.codec.digest.MurmurHash3
12+
import org.geotools.filter.FunctionExpressionImpl
13+
import org.geotools.filter.capability.FunctionNameImpl
14+
import org.geotools.filter.capability.FunctionNameImpl._
15+
16+
import java.nio.charset.StandardCharsets
17+
import java.util.{Date, UUID}
18+
import scala.reflect.ClassTag
19+
20+
/**
21+
* Hash function, follows the Apache Icebergs hash specification - https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
22+
*/
23+
class MurmurHashFunction extends FunctionExpressionImpl(MurmurHashFunction.Name) {
24+
25+
import org.locationtech.geomesa.filter.function.MurmurHashFunction._
26+
27+
override def evaluate(o: AnyRef): AnyRef = {
28+
if (o == null) {
29+
return null
30+
}
31+
val value = getExpression(0).evaluate(o)
32+
if (value == null) {
33+
return null
34+
}
35+
val hash = value match {
36+
case v: String => StringHashing(v)
37+
case v: Integer => IntegerHashing(v)
38+
case v: java.lang.Long => LongHashing(v)
39+
case v: java.lang.Float => FloatHashing(v)
40+
case v: java.lang.Double => DoubleHashing(v)
41+
case v: Date => DateHashing(v)
42+
case v: Array[Byte] => ByteHashing(v)
43+
case v: UUID => UUIDHashing(v)
44+
case _ => StringHashing(o.toString)
45+
}
46+
Int.box(hash)
47+
}
48+
}
49+
50+
object MurmurHashFunction {
51+
52+
val Name = new FunctionNameImpl(
53+
"murmurHash",
54+
classOf[java.lang.Integer],
55+
parameter("value", classOf[AnyRef])
56+
)
57+
58+
val Hashers: Seq[Hashing[_ <: AnyRef]] = Seq(
59+
StringHashing,
60+
IntegerHashing,
61+
LongHashing,
62+
FloatHashing,
63+
DoubleHashing,
64+
DateHashing,
65+
ByteHashing,
66+
UUIDHashing,
67+
)
68+
69+
abstract class Hashing[T: ClassTag] extends (T => Int) {
70+
val binding: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
71+
}
72+
73+
object StringHashing extends Hashing[String] {
74+
// string hashBytes(utf8Bytes(v))
75+
override def apply(v1: String): Int = {
76+
val bytes = v1.getBytes(StandardCharsets.UTF_8)
77+
MurmurHash3.hash32x86(bytes, 0, bytes.length, 0)
78+
}
79+
}
80+
81+
object IntegerHashing extends Hashing[Integer]{
82+
// int hashLong(long(v))
83+
override def apply(v1: Integer): Int = LongHashing(v1.toLong)
84+
}
85+
86+
object LongHashing extends Hashing[java.lang.Long]{
87+
// long hashBytes(littleEndianBytes(v))
88+
override def apply(v1: java.lang.Long): Int = MurmurHash3.hash32(java.lang.Long.reverseBytes(v1.toLong), 0)
89+
}
90+
91+
object FloatHashing extends Hashing[java.lang.Float]{
92+
// float hashLong(doubleToLongBits(double(v))
93+
override def apply(v1: java.lang.Float): Int = LongHashing(java.lang.Double.doubleToLongBits(v1.toDouble))
94+
}
95+
96+
object DoubleHashing extends Hashing[java.lang.Double]{
97+
// double hashLong(doubleToLongBits(v))
98+
override def apply(v1: java.lang.Double): Int = LongHashing(java.lang.Double.doubleToLongBits(v1.toDouble))
99+
}
100+
101+
object DateHashing extends Hashing[Date]{
102+
// timestamp hashLong(microsecsFromUnixEpoch(v))
103+
override def apply(v1: Date): Int = LongHashing(v1.getTime * 1000)
104+
}
105+
106+
object ByteHashing extends Hashing[Array[Byte]]{
107+
// binary hashBytes(v)
108+
override def apply(v1: Array[Byte]): Int = MurmurHash3.hash32x86(v1, 0, v1.length, 0)
109+
}
110+
111+
object UUIDHashing extends Hashing[UUID]{
112+
// uuid hashBytes(uuidBytes(v)
113+
override def apply(v1: UUID): Int = MurmurHash3.hash32(v1.getMostSignificantBits, v1.getLeastSignificantBits, 0)
114+
}
115+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
10+
package org.locationtech.geomesa.filter.function
11+
12+
import org.geotools.feature.simple.SimpleFeatureImpl
13+
import org.geotools.filter.identity.FeatureIdImpl
14+
import org.locationtech.geomesa.filter.FilterHelper
15+
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
16+
import org.specs2.mutable.SpecificationWithJUnit
17+
18+
class BucketHashFunctionTest extends SpecificationWithJUnit {
19+
20+
import FilterHelper.ff
21+
22+
import scala.collection.JavaConverters._
23+
24+
val sft = SimpleFeatureTypes.createType("test",
25+
"name:String,age:Int,time:Long,weight:Float,precision:Double,dtg:Date,bytes:Bytes,uuid:UUID")
26+
27+
val nullValues = Seq.fill[AnyRef](sft.getAttributeCount)(null).asJava
28+
29+
"BucketHashFunction" should {
30+
"hash strings" in {
31+
// string hashBytes(utf8Bytes(v)) iceberg → 1210000089
32+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
33+
sf.setAttribute("name", "iceberg")
34+
ff.function("bucketHash", ff.property("name"), ff.literal(8)).evaluate(sf) mustEqual Int.box(1210000089 % 8)
35+
}
36+
"hash ints" in {
37+
// int hashLong(long(v)) [1] 34 → 2017239379
38+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
39+
sf.setAttribute("age", "34")
40+
ff.function("bucketHash", ff.property("age"), ff.literal(8)).evaluate(sf) mustEqual Int.box(2017239379 % 8)
41+
}
42+
"hash longs" in {
43+
// long hashBytes(littleEndianBytes(v)) 34L → 2017239379
44+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
45+
sf.setAttribute("time", "34")
46+
ff.function("bucketHash", ff.property("time"), ff.literal(8)).evaluate(sf) mustEqual Int.box(2017239379 % 8)
47+
}
48+
"hash floats" in {
49+
// float hashLong(doubleToLongBits(double(v)) [5] 1.0F → -142385009, 0.0F → 1669671676, -0.0F → 1669671676
50+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
51+
sf.setAttribute("weight", "1")
52+
ff.function("bucketHash", ff.property("weight"), ff.literal(8)).evaluate(sf) mustEqual Int.box((-142385009 & Int.MaxValue) % 8)
53+
}
54+
"hash doubles" in {
55+
// double hashLong(doubleToLongBits(v)) [5] 1.0D → -142385009, 0.0D → 1669671676, -0.0D → 1669671676
56+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
57+
sf.setAttribute("precision", "1")
58+
ff.function("bucketHash", ff.property("precision"), ff.literal(8)).evaluate(sf) mustEqual Int.box((-142385009 & Int.MaxValue) % 8)
59+
}
60+
"hash dates" in {
61+
// timestamp hashLong(microsecsFromUnixEpoch(v)) 2017-11-16T22:31:08 → -2047944441, 2017-11-16T22:31:08.000001 → -1207196810
62+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
63+
sf.setAttribute("dtg", "2017-11-16T22:31:08")
64+
ff.function("bucketHash", ff.property("dtg"), ff.literal(8)).evaluate(sf) mustEqual Int.box((-2047944441 & Int.MaxValue) % 8)
65+
}
66+
"hash byte arrays" in {
67+
// binary hashBytes(v) 00 01 02 03 → -188683207
68+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
69+
sf.setAttribute("bytes", Array[Byte](0, 1, 2, 3))
70+
ff.function("bucketHash", ff.property("bytes"), ff.literal(8)).evaluate(sf) mustEqual Int.box((-188683207 & Int.MaxValue) % 8)
71+
}
72+
"hash uuids" in {
73+
// uuid hashBytes(uuidBytes(v)) [4] f79c3e09-677c-4bbd-a479-3f349cb785e7 → 1488055340
74+
val sf = new SimpleFeatureImpl(nullValues, sft, new FeatureIdImpl("1"))
75+
sf.setAttribute("uuid", "f79c3e09-677c-4bbd-a479-3f349cb785e7")
76+
ff.function("bucketHash", ff.property("uuid"), ff.literal(8)).evaluate(sf) mustEqual Int.box(1488055340 % 8)
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)