Skip to content

Commit 19cd1e9

Browse files
committed
Use SIMD for stdev, further lock/modernization work
1 parent d7ad0f8 commit 19cd1e9

14 files changed

Lines changed: 475 additions & 1143 deletions

File tree

Sources/Core/Microsoft.StreamProcessing/Aggregates/ListAggregateBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace Microsoft.StreamProcessing.Aggregates
1010
{
1111
internal abstract class ListAggregateBase<T, R> : IAggregate<T, List<T>, R>
1212
{
13-
private static readonly Expression<Func<List<T>>> init = () => new List<T>();
13+
private static readonly Expression<Func<List<T>>> init = () => new();
1414
public Expression<Func<List<T>>> InitialState() => init;
1515

1616
public Expression<Func<List<T>, long, T, List<T>>> Accumulate()

Sources/Core/Microsoft.StreamProcessing/Aggregates/StatisticalAggregate.cs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@
33
// Licensed under the MIT License
44
// *********************************************************************
55
using System;
6+
using System.Buffers;
67
using System.Collections.Generic;
7-
using System.Linq;
88
using System.Linq.Expressions;
9+
using System.Numerics.Tensors;
10+
using System.Runtime.InteropServices;
911

1012
namespace Microsoft.StreamProcessing.Aggregates
1113
{
1214
internal abstract class StatisticalAggregate : ListAggregateBase<double, double?>
1315
{
16+
/// <summary>At or below this size, a simple scalar loop avoids renting a scratch buffer for the second pass.</summary>
17+
internal const int VarianceScratchThreshold = 64;
18+
1419
protected static double? ComputeStdev(List<double> valueList, bool useAsPopulation)
1520
{
1621
var variance = ComputeVariance(valueList, useAsPopulation);
@@ -22,19 +27,43 @@ internal abstract class StatisticalAggregate : ListAggregateBase<double, double?
2227
if (list == null || list.Count == 0) return null;
2328
if (list.Count == 1) return useAsPopulation ? 0.0 : (double?)null;
2429

25-
// compute mean
26-
// instead of dividing the sum of elements we divide each element to avoid a potential overflow
27-
double mean = list.Sum(element => element / list.Count);
30+
int n = list.Count;
31+
var divisor = useAsPopulation ? n : n - 1;
32+
ReadOnlySpan<double> span = CollectionsMarshal.AsSpan(list);
2833

29-
// for the population variance the divisor is n, for the sample variance the divisor is n - 1
30-
var divisor = useAsPopulation ? list.Count : list.Count - 1;
34+
// TensorPrimitives.Sum is SIMD-accelerated on supported hardware.
35+
// Mean is computed as Sum/n rather than Sum(x/n) per element; this is faster and
36+
// typically as accurate; for pathological magnitudes, per-element scaling avoids
37+
// intermediate overflow in the sum (trade-off: rare edge case vs. hot-path cost).
38+
double mean = TensorPrimitives.Sum(span) / n;
3139

32-
// compute variance
33-
// instead of dividing the sum of differences we divide each difference to try to avoid a potential overflow
34-
double variance = list.Select(element => element - mean).Sum(difference => (difference * difference) / divisor);
40+
double variance;
41+
if (n <= VarianceScratchThreshold)
42+
{
43+
variance = 0.0;
44+
for (int i = 0; i < n; i++)
45+
{
46+
double d = span[i] - mean;
47+
variance += (d * d) / divisor;
48+
}
49+
}
50+
else
51+
{
52+
double[] rented = ArrayPool<double>.Shared.Rent(n);
53+
try
54+
{
55+
Span<double> scratch = rented.AsSpan(0, n);
56+
TensorPrimitives.Subtract(span, mean, scratch);
57+
TensorPrimitives.Multiply(scratch, scratch, scratch);
58+
variance = TensorPrimitives.Sum(scratch) / divisor;
59+
}
60+
finally
61+
{
62+
ArrayPool<double>.Shared.Return(rented);
63+
}
64+
}
3565

36-
// difference or variance can still overflow
37-
return double.IsInfinity(variance) ? null : (double?)variance;
66+
return double.IsInfinity(variance) ? null : variance;
3867
}
3968
}
4069

Sources/Core/Microsoft.StreamProcessing/Collections/EndPointHeap.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ private void Grow()
253253

254254
// Calculate new capacity to be twice as large as before.
255255
this.capacity <<= 1;
256-
if (this.capacity < 0 || this.capacity > MaxCapacity)
256+
if (this.capacity is < 0 or > MaxCapacity)
257257
{
258258
// Handle case of capacity going larger than MaxCapacity,
259259
// or overflowing (which would make it appear negative).

Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public TValue GetOrAddUnlessNull(CacheKey key, Func<CacheKey, TValue> valueFacto
9393

9494
/// <summary>
9595
/// Clears all entries from the dictionary and the per-key lock table.
96-
/// Marked internal (not private) so that test code can clear the codegen cache
96+
/// Marked internal (not private) so that test code can clear generated-type caches
9797
/// (e.g. EquiJoinStreamable.cachedPipes) to ensure deterministic test behavior
9898
/// without relying on reflection.
9999
/// </summary>

0 commit comments

Comments
 (0)