Skip to content

Commit eae5060

Browse files
logicalplan: fix correctness issues in distributed optimizer (#702)
Fix isBinaryExpressionWithDistributableMatching to check all partition labels instead of only the first one. The loop had a return statement inside the body, causing it to exit on the first iteration. With multiple partition labels (e.g. region + datacenter), only one arbitrary label was checked, potentially allowing incorrect distribution. Move LIMIT_RATIO from the mathematically distributive aggregation list to the non-distributive group. Applying limit_ratio remotely and then locally results in double-application (e.g. 0.5 * 0.5 = 0.25 ratio instead of the intended 0.5). It can still be pushed as-is when partition labels are preserved. Add label_join handling alongside label_replace in both isDistributive and preservesPartitionLabels. Previously, label_join targeting a partition label (e.g. label_join(metric, "region", ...)) was not detected, allowing incorrect distribution or partition label loss. Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 2ae49e5 commit eae5060

2 files changed

Lines changed: 113 additions & 8 deletions

File tree

logicalplan/distribute.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ func preservesPartitionLabels(expr Node, partitionLabels map[string]struct{}) bo
656656
return preservesPartitionLabels(e.LHS, partitionLabels) &&
657657
preservesPartitionLabels(e.RHS, partitionLabels)
658658
case *FunctionCall:
659-
if e.Func.Name == "label_replace" {
659+
if e.Func.Name == "label_replace" || e.Func.Name == "label_join" {
660660
if _, ok := partitionLabels[UnsafeUnwrapString(e.Args[1])]; ok {
661661
return false
662662
}
@@ -705,18 +705,19 @@ func (m DistributedExecutionOptimizer) isDistributive(expr *Node, engineLabels m
705705
// Mathematically distributive: can be split into local_agg(remote_agg(X))
706706
// regardless of partition labels.
707707
case parser.SUM, parser.MIN, parser.MAX, parser.GROUP, parser.COUNT,
708-
parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO:
708+
parser.TOPK, parser.BOTTOMK, parser.LIMITK:
709709
// Non-distributive: can only be pushed as-is when they preserve
710710
// partition labels (each engine computes over disjoint data).
711-
case parser.AVG, parser.QUANTILE, parser.STDDEV, parser.STDVAR, parser.COUNT_VALUES:
711+
case parser.AVG, parser.QUANTILE, parser.STDDEV, parser.STDVAR,
712+
parser.COUNT_VALUES, parser.LIMIT_RATIO:
712713
if !preservesPartitionLabels(e, engineLabels) {
713714
return false
714715
}
715716
default:
716717
return false
717718
}
718719
case *FunctionCall:
719-
if e.Func.Name == "label_replace" {
720+
if e.Func.Name == "label_replace" || e.Func.Name == "label_join" {
720721
targetLabel := UnsafeUnwrapString(e.Args[1])
721722
if _, ok := engineLabels[targetLabel]; ok {
722723
warns.Add(RewrittenExternalLabelWarning)
@@ -783,11 +784,10 @@ func isBinaryExpressionWithDistributableMatching(expr *Binary, engineLabels map[
783784
// changes match cardinality semantics. Each partition only sees one value for
784785
// that label, so what's many-to-many globally may become one-to-one per partition,
785786
// producing results instead of errors (or vice versa).
786-
return !inInclude && inMatching == expr.VectorMatching.On
787+
if inInclude || inMatching != expr.VectorMatching.On {
788+
return false
789+
}
787790
}
788-
// At this point, partition labels are in the matching set (either via on() or
789-
// by not being in ignoring()). This means or/unless can be safely distributed
790-
// because the matching ensures series are paired by partition.
791791
return true
792792
}
793793

logicalplan/distribute_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,20 @@ label_replace(max by (location) (dedup(
356356
)), "region", "$1", "location", "(.*)")`,
357357
expectWarn: true,
358358
},
359+
{
360+
name: "label join targeting non-partition label distributes",
361+
expr: `label_join(http_requests_total, "zone", ",", "pod")`,
362+
expected: `
363+
dedup(
364+
remote(label_join(http_requests_total, "zone", ",", "pod")),
365+
remote(label_join(http_requests_total, "zone", ",", "pod")))`,
366+
},
367+
{
368+
name: "label join targeting partition label does not distribute",
369+
expr: `max by (location) (label_join(http_requests_total, "region", ",", "pod"))`,
370+
expected: `max by (location) (label_join(dedup(remote(http_requests_total), remote(http_requests_total)), "region", ",", "pod"))`,
371+
expectWarn: true,
372+
},
359373
{
360374
name: "binary operation in the operand path",
361375
expr: `max by (pod) (metric_a / metric_b)`,
@@ -1056,6 +1070,87 @@ sum by (pod) (dedup(
10561070
}
10571071
}
10581072

1073+
func TestDistributedExecutionMultiplePartitionLabels(t *testing.T) {
1074+
// Engines partitioned by both region and datacenter.
1075+
engines := []api.RemoteEngine{
1076+
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east", "datacenter", "dc1")}),
1077+
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west", "datacenter", "dc2")}),
1078+
}
1079+
optimizers := []Optimizer{
1080+
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
1081+
}
1082+
1083+
cases := []struct {
1084+
name string
1085+
expr string
1086+
expected string
1087+
}{
1088+
{
1089+
name: "on() must include all partition labels to distribute",
1090+
expr: `metric_a + on (region) metric_b`,
1091+
expected: `
1092+
dedup(remote(metric_a), remote(metric_a))
1093+
+ on (region)
1094+
dedup(remote(metric_b), remote(metric_b))`,
1095+
},
1096+
{
1097+
name: "on() with all partition labels distributes the binary",
1098+
expr: `metric_a + on (region, datacenter) metric_b`,
1099+
expected: `
1100+
dedup(
1101+
remote(metric_a + on (region, datacenter) metric_b),
1102+
remote(metric_a + on (region, datacenter) metric_b))`,
1103+
},
1104+
{
1105+
name: "ignoring() must not include any partition label to distribute",
1106+
expr: `metric_a + ignoring (region) metric_b`,
1107+
expected: `
1108+
dedup(remote(metric_a), remote(metric_a))
1109+
+ ignoring (region)
1110+
dedup(remote(metric_b), remote(metric_b))`,
1111+
},
1112+
{
1113+
name: "ignoring() non-partition label distributes the binary",
1114+
expr: `metric_a + ignoring (pod) metric_b`,
1115+
expected: `
1116+
dedup(
1117+
remote(metric_a + ignoring (pod) metric_b),
1118+
remote(metric_a + ignoring (pod) metric_b))`,
1119+
},
1120+
{
1121+
name: "sum must include all partition labels to preserve",
1122+
expr: `sum by (region) (metric_a)`,
1123+
expected: `
1124+
sum by (region) (
1125+
dedup(
1126+
remote(sum by (datacenter, region) (metric_a)),
1127+
remote(sum by (datacenter, region) (metric_a))))`,
1128+
},
1129+
{
1130+
name: "sum by all partition labels preserves",
1131+
expr: `max(sum by (region, datacenter) (metric_a))`,
1132+
expected: `
1133+
max(
1134+
dedup(
1135+
remote(max by (datacenter, region) (sum by (region, datacenter) (metric_a))),
1136+
remote(max by (datacenter, region) (sum by (region, datacenter) (metric_a)))))`,
1137+
},
1138+
}
1139+
1140+
for _, tcase := range cases {
1141+
t.Run(tcase.name, func(t *testing.T) {
1142+
expr, err := parser.ParseExpr(tcase.expr)
1143+
testutil.Ok(t, err)
1144+
1145+
plan, err := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{})
1146+
testutil.Ok(t, err)
1147+
optimizedPlan, _ := plan.Optimize(optimizers)
1148+
expectedPlan := cleanUp(replacements, tcase.expected)
1149+
testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root()))
1150+
})
1151+
}
1152+
}
1153+
10591154
func TestDistributedExecutionClonesNodes(t *testing.T) {
10601155
var (
10611156
start = time.Unix(0, 0)
@@ -1243,6 +1338,16 @@ func TestPreservesPartitionLabels(t *testing.T) {
12431338
expr: `label_replace(metric, "zone", "$1", "pod", "(.*)")`,
12441339
expected: true,
12451340
},
1341+
{
1342+
name: "label_join targeting partition label does not preserve",
1343+
expr: `label_join(metric, "region", ",", "pod")`,
1344+
expected: false,
1345+
},
1346+
{
1347+
name: "label_join targeting non-partition label preserves",
1348+
expr: `label_join(metric, "zone", ",", "pod")`,
1349+
expected: true,
1350+
},
12461351
{
12471352
name: "rate preserves",
12481353
expr: `rate(metric[5m])`,

0 commit comments

Comments
 (0)