Skip to content

Commit 27d55cb

Browse files
feat(ft.aggregate): Add FIRST_VALUE reducer function with BY clause support
- Add FIRST_VALUE reducer with three variants: simple mode (1 arg), BY clause mode (3 args), and BY clause with ASC/DESC (4 args) - Update ft.aggregate.md documentation with FIRST_VALUE syntax and behavior descriptions - Implement FIRST_VALUE in ft_aggregate_exec.cc with support for sorted and unsorted grouping modes - Add comprehensive test coverage in ft_aggregate_exec_test.cc including BY clause variants, case-insensitive keywords, edge cases with nil values, and error conditions - Update ft.aggregate.json command schema to reflect new reducer function - Add integration tests in test_non_vector.py with multiple test cases for deterministic BY-clause behavior - Regenerate compatibility test answers in aggregate-answers.pickle.gz - Simple mode (non-deterministic) intentionally excluded from compatibility tests due to order-dependent results between Redis and Valkey implementations Signed-off-by: Riley Des <riley.desserre@improving.com>
1 parent 1b85d1a commit 27d55cb

7 files changed

Lines changed: 659 additions & 0 deletions

File tree

docs/commands/ft.aggregate.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,6 @@ The following reducer functions are available. The reducer functions that take a
9292
| MAX 1 <expression> | The largest numerical values of the expression. |
9393
| AVG 1 <expression> | The numerical average of the values of the expression. |
9494
| STDDEV 1 <expression> | The standard deviation the values of the expression. |
95+
| FIRST_VALUE 1 <expression> | The first value of the expression encountered in the group. Order depends on record retrieval order. Use only when order does not matter. |
96+
| FIRST_VALUE 3 <expression> BY <expression> | The value of the first expression from the record with the smallest comparison expression (ascending). Ties broken by first-encountered order. |
97+
| FIRST_VALUE 4 <expression> BY <expression> ASC\|DESC | The value of the first expression from the record with the minimum (ASC) or maximum (DESC) comparison expression. Ties broken by first-encountered order. Invalid keyword arguments (e.g., wrong BY token or unrecognised direction) produce a parse-time error. |
127 KB
Binary file not shown.

integration/compatibility/generate.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,3 +468,124 @@ def test_search_sortby(self, key_type, dialect):
468468
for limit in ["LIMIT 0 5", "LIMIT 2 3", ""]:
469469
self.check(dialect, f"ft.search {key_type}_idx1 * SORTBY {sort_key} {direction} {return_keys} {limit} {wsk}")
470470

471+
472+
# test_first_value_simple_mode is intentionally omitted.
473+
# FIRST_VALUE without a BY clause is non-deterministic: the order of
474+
# records within a group depends on retrieval order, which differs between
475+
# Redis and Valkey implementations. Compatibility testing requires
476+
# deterministic results, so only BY-clause (sorted) mode is tested here.
477+
478+
def test_first_value_by_clause(self, key_type, dialect):
479+
"""Test FIRST_VALUE with BY clause - sorted mode."""
480+
self.setup_data("sortable numbers", key_type)
481+
482+
# (value_field, group_field, load_fields, by_field, order)
483+
# order=None means default (3-arg form, no explicit ASC/DESC)
484+
cases = [
485+
("@n1", "@n2", "3 @__key @n1 @n2", "@n1", "ASC"),
486+
("@n1", "@n2", "3 @__key @n1 @n2", "@n1", "DESC"),
487+
("@n1", "@n2", "3 @__key @n1 @n2", "@n1", None), # default order
488+
("@t1", "@t2", "3 @__key @t1 @t2", "@t1", "ASC"),
489+
("@t1", "@t2", "3 @__key @t1 @t2", "@t1", "DESC"),
490+
("@t1", "@n2", "4 @__key @t1 @n1 @n2", "@n1", "ASC"), # cross-field
491+
("@n1", "@n2", "3 @__key @n1 @n2", "@n2", "ASC"), # tie-breaking
492+
]
493+
for val, group, load, by, order in cases:
494+
if order is None:
495+
nargs, order_clause = "3", ""
496+
else:
497+
nargs, order_clause = "4", f" {order}"
498+
alias = f"first_{val[1:]}_{by[1:]}_{order or 'default'}"
499+
self.check(dialect,
500+
f"ft.aggregate {key_type}_idx1 * "
501+
f"load {load} "
502+
f"groupby 1 {group} "
503+
f"reduce first_value {nargs} {val} BY {by}{order_clause} as {alias}"
504+
)
505+
506+
def test_first_value_keyword_case(self, key_type, dialect):
507+
"""Test FIRST_VALUE with case-insensitive keywords."""
508+
self.setup_data("sortable numbers", key_type)
509+
510+
# 3-arg form: vary BY keyword case only
511+
for by_kw in ["by", "BY", "By"]:
512+
self.check(dialect,
513+
f"ft.aggregate {key_type}_idx1 * "
514+
f"load 3 @__key @n1 @n2 "
515+
f"groupby 1 @n2 "
516+
f"reduce first_value 3 @n1 {by_kw} @n1 as first_{by_kw}"
517+
)
518+
519+
# 4-arg form: vary order keyword case (ASC and DESC variants)
520+
for order_kw in ["asc", "ASC", "Asc", "desc", "DESC", "Desc"]:
521+
self.check(dialect,
522+
f"ft.aggregate {key_type}_idx1 * "
523+
f"load 3 @__key @n1 @n2 "
524+
f"groupby 1 @n2 "
525+
f"reduce first_value 4 @n1 BY @n1 {order_kw} as first_{order_kw}"
526+
)
527+
528+
def test_first_value_edge_cases(self, key_type, dialect):
529+
"""Test FIRST_VALUE with edge cases like nil values."""
530+
self.setup_data("hard numbers", key_type)
531+
532+
# nil values in comparison field, both directions
533+
for order in ["ASC", "DESC"]:
534+
self.check(dialect,
535+
f"ft.aggregate {key_type}_idx1 * "
536+
f"load 3 @__key @n1 @n2 "
537+
f"groupby 1 @n2 "
538+
f"reduce first_value 4 @n1 BY @n1 {order} as first_nil_{order.lower()}"
539+
)
540+
541+
# NOTE: Simple mode test removed due to non-deterministic ordering.
542+
# When FIRST_VALUE is used without a BY clause, the order of values
543+
# within each group is undefined, leading to inconsistent results.
544+
545+
# Switch to sortable numbers for duplicate comparison values
546+
self.client.execute_command("FLUSHALL SYNC")
547+
time.sleep(0.5)
548+
self.setup_data("sortable numbers", key_type)
549+
550+
# Test with duplicate comparison values (tie-breaking)
551+
self.check(dialect,
552+
f"ft.aggregate {key_type}_idx1 * "
553+
f"load 3 @__key @n1 @n2 "
554+
f"groupby 1 @n2 "
555+
f"reduce first_value 4 @n1 BY @n2 ASC as first_dup_tie"
556+
)
557+
558+
def test_first_value_errors(self, key_type, dialect):
559+
"""Test FIRST_VALUE error conditions."""
560+
self.setup_data("sortable numbers", key_type)
561+
562+
# Test nargs=0 (too few arguments) - this will be caught by parser
563+
# Note: This may not be testable via compatibility tests if parser rejects it
564+
565+
# Test nargs=2 (incomplete BY clause)
566+
self.check(dialect,
567+
f"ft.aggregate {key_type}_idx1 * "
568+
f"load 3 @__key @n1 @n2 "
569+
f"groupby 1 @n2 "
570+
f"reduce first_value 2 @n1 @n2 as first_error_nargs2"
571+
)
572+
573+
# Test nargs=5 (too many arguments) - this will be caught by parser
574+
# Note: This may not be testable via compatibility tests if parser rejects it
575+
576+
# Test invalid BY keyword (e.g., NOTBY)
577+
self.check(dialect,
578+
f"ft.aggregate {key_type}_idx1 * "
579+
f"load 3 @__key @n1 @n2 "
580+
f"groupby 1 @n2 "
581+
f"reduce first_value 3 @n1 NOTBY @n2 as first_error_notby"
582+
)
583+
584+
# Test invalid sort order (not ASC/DESC)
585+
self.check(dialect,
586+
f"ft.aggregate {key_type}_idx1 * "
587+
f"load 3 @__key @n1 @n2 "
588+
f"groupby 1 @n2 "
589+
f"reduce first_value 4 @n1 BY @n2 INVALID as first_error_invalid"
590+
)
591+

integration/test_non_vector.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,157 @@ def validate_aggregate_complex_queries(client: Valkey):
524524
assert result[1][1] == b'406'
525525
assert result[1][3] == b'4060'
526526

527+
# 17. FIRST_VALUE reducer - simple mode (no BY clause)
528+
# Note: Simple mode is non-deterministic as it depends on retrieval order
529+
# We only verify that valid values are returned, not specific values
530+
result = client.execute_command(
531+
"FT.AGGREGATE", "products", "@price:[1 1000]",
532+
"LOAD", "2", "price", "category",
533+
"GROUPBY", "1", "@category",
534+
"REDUCE", "FIRST_VALUE", "1", "@price", "AS", "first_price"
535+
)
536+
assert result[0] == 2
537+
for i in range(1, len(result)):
538+
row = dict(zip(result[i][::2], result[i][1::2]))
539+
assert b'category' in row
540+
assert b'first_price' in row
541+
first_price = float(row[b'first_price'])
542+
# Verify it's a valid price from the dataset (1-1000)
543+
assert 1.0 <= first_price <= 1000.0
544+
# Verify category matches expected values
545+
if row[b'category'] == b'electronics':
546+
# Electronics has odd prices (1, 3, 5, ..., 999)
547+
assert int(first_price) % 2 == 1
548+
else:
549+
# Books has even prices (2, 4, 6, ..., 1000)
550+
assert int(first_price) % 2 == 0
551+
552+
# 18. FIRST_VALUE reducer - sorted ASC mode (with BY clause)
553+
result = client.execute_command(
554+
"FT.AGGREGATE", "products", "@price:[1 1000]",
555+
"LOAD", "3", "price", "rating", "category",
556+
"GROUPBY", "1", "@category",
557+
"REDUCE", "FIRST_VALUE", "4", "@price", "BY", "@rating", "ASC", "AS", "price_with_min_rating"
558+
)
559+
assert result[0] == 2
560+
for i in range(1, len(result)):
561+
row = dict(zip(result[i][::2], result[i][1::2]))
562+
assert b'category' in row
563+
assert b'price_with_min_rating' in row
564+
price_with_min_rating = float(row[b'price_with_min_rating'])
565+
if row[b'category'] == b'electronics':
566+
# Multiple electronics have the same minimum rating (1.0), so any of their prices is valid
567+
valid_prices_for_min_rating = {1, 101, 201, 301, 401, 501, 601, 701, 801, 901}
568+
assert price_with_min_rating in valid_prices_for_min_rating, \
569+
f"Electronics price_with_min_rating should be one of {valid_prices_for_min_rating}, got {price_with_min_rating}"
570+
else:
571+
# Multiple books have the same minimum rating (2.0), so any of their prices is valid
572+
valid_prices_for_min_rating = {2, 102, 202, 302, 402, 502, 602, 702, 802, 902}
573+
assert price_with_min_rating in valid_prices_for_min_rating, \
574+
f"Books price_with_min_rating should be one of {valid_prices_for_min_rating}, got {price_with_min_rating}"
575+
576+
# 19. FIRST_VALUE reducer - sorted DESC mode (with BY clause)
577+
result = client.execute_command(
578+
"FT.AGGREGATE", "products", "@price:[1 1000]",
579+
"LOAD", "3", "price", "rating", "category",
580+
"GROUPBY", "1", "@category",
581+
"REDUCE", "FIRST_VALUE", "4", "@price", "BY", "@rating", "DESC", "AS", "price_with_max_rating"
582+
)
583+
assert result[0] == 2
584+
for i in range(1, len(result)):
585+
row = dict(zip(result[i][::2], result[i][1::2]))
586+
assert b'category' in row
587+
assert b'price_with_max_rating' in row
588+
price_with_max_rating = float(row[b'price_with_max_rating'])
589+
if row[b'category'] == b'electronics':
590+
# Multiple electronics have the same maximum rating (99.0), so any of their prices is valid
591+
valid_prices_for_max_rating = {99, 199, 299, 399, 499, 599, 699, 799, 899, 999}
592+
assert price_with_max_rating in valid_prices_for_max_rating, \
593+
f"Electronics price_with_max_rating should be one of {valid_prices_for_max_rating}, got {price_with_max_rating}"
594+
else:
595+
# Multiple books have the same maximum rating (100.0), so any of their prices is valid
596+
valid_prices_for_max_rating = {100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}
597+
assert price_with_max_rating in valid_prices_for_max_rating, \
598+
f"Books price_with_max_rating should be one of {valid_prices_for_max_rating}, got {price_with_max_rating}"
599+
600+
# 20. FIRST_VALUE reducer - multiple groups with independent results
601+
# Note: Simple mode (first_price) is non-deterministic
602+
result = client.execute_command(
603+
"FT.AGGREGATE", "products", "@price:[1 1000]",
604+
"LOAD", "3", "price", "rating", "category",
605+
"GROUPBY", "1", "@category",
606+
"REDUCE", "FIRST_VALUE", "1", "@price", "AS", "first_price",
607+
"REDUCE", "FIRST_VALUE", "4", "@price", "BY", "@rating", "ASC", "AS", "price_min_rating",
608+
"REDUCE", "FIRST_VALUE", "4", "@price", "BY", "@rating", "DESC", "AS", "price_max_rating",
609+
"REDUCE", "COUNT", "0", "AS", "count"
610+
)
611+
assert result[0] == 2
612+
613+
electronics_found = False
614+
books_found = False
615+
616+
for i in range(1, len(result)):
617+
row = dict(zip(result[i][::2], result[i][1::2]))
618+
assert b'category' in row
619+
assert b'first_price' in row
620+
assert b'price_min_rating' in row
621+
assert b'price_max_rating' in row
622+
assert b'count' in row
623+
624+
category = row[b'category']
625+
first_price = float(row[b'first_price'])
626+
price_min_rating = float(row[b'price_min_rating'])
627+
price_max_rating = float(row[b'price_max_rating'])
628+
count = int(row[b'count'])
629+
630+
if category == b'electronics':
631+
electronics_found = True
632+
# Simple mode is non-deterministic, just verify valid range
633+
assert 1.0 <= first_price <= 1000.0, f"Electronics first_price out of range: {first_price}"
634+
assert int(first_price) % 2 == 1, f"Electronics should have odd prices, got {first_price}"
635+
# Sorted modes - multiple electronics have the same min/max rating, so any of their prices is valid
636+
valid_prices_for_min_rating = {1, 101, 201, 301, 401, 501, 601, 701, 801, 901}
637+
assert price_min_rating in valid_prices_for_min_rating, \
638+
f"Electronics price_min_rating should be one of {valid_prices_for_min_rating}, got {price_min_rating}"
639+
valid_prices_for_max_rating = {99, 199, 299, 399, 499, 599, 699, 799, 899, 999}
640+
assert price_max_rating in valid_prices_for_max_rating, \
641+
f"Electronics price_max_rating should be one of {valid_prices_for_max_rating}, got {price_max_rating}"
642+
assert count == 500, f"Electronics count should be 500, got {count}"
643+
elif category == b'books':
644+
books_found = True
645+
# Simple mode is non-deterministic, just verify valid range
646+
assert 1.0 <= first_price <= 1000.0, f"Books first_price out of range: {first_price}"
647+
assert int(first_price) % 2 == 0, f"Books should have even prices, got {first_price}"
648+
# Sorted modes - multiple books have the same min/max rating, so any of their prices is valid
649+
valid_prices_for_min_rating = {2, 102, 202, 302, 402, 502, 602, 702, 802, 902}
650+
assert price_min_rating in valid_prices_for_min_rating, \
651+
f"Books price_min_rating should be one of {valid_prices_for_min_rating}, got {price_min_rating}"
652+
valid_prices_for_max_rating = {100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}
653+
assert price_max_rating in valid_prices_for_max_rating, \
654+
f"Books price_max_rating should be one of {valid_prices_for_max_rating}, got {price_max_rating}"
655+
assert count == 500, f"Books count should be 500, got {count}"
656+
else:
657+
raise AssertionError(f"Unexpected category: {category}")
658+
659+
assert electronics_found, "Electronics group not found in results"
660+
assert books_found, "Books group not found in results"
661+
662+
# 21. FIRST_VALUE reducer - numeric field type handling
663+
result = client.execute_command(
664+
"FT.AGGREGATE", "products", "@price:[1 1000]",
665+
"LOAD", "2", "price", "category",
666+
"GROUPBY", "1", "@category",
667+
"REDUCE", "FIRST_VALUE", "4", "@price", "BY", "@price", "ASC", "AS", "min_price"
668+
)
669+
assert result[0] == 2
670+
for i in range(1, len(result)):
671+
row = dict(zip(result[i][::2], result[i][1::2]))
672+
min_price = float(row[b'min_price'])
673+
if row[b'category'] == b'electronics':
674+
assert min_price == 1.0, f"Electronics min_price should be 1.0, got {min_price}"
675+
else:
676+
assert min_price == 2.0, f"Books min_price should be 2.0, got {min_price}"
677+
527678
class TestNonVector(ValkeySearchTestCaseBase):
528679

529680
def test_basic(self):

src/commands/ft.aggregate.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@
263263
"name": "STDDEV",
264264
"type": "pure-token",
265265
"token": "STDDEV"
266+
},
267+
{
268+
"name": "FIRST_VALUE",
269+
"type": "pure-token",
270+
"token": "FIRST_VALUE"
266271
}
267272
]
268273
},

0 commit comments

Comments
 (0)