Skip to content

Commit 204a991

Browse files
committed
add single pass SF calculator for all SF types
1 parent 1d5ba24 commit 204a991

8 files changed

Lines changed: 663 additions & 3 deletions

ext/StructureFunctionsDistributedExt.jl

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,72 @@ function SFC.parallel_calculate_structure_function(
163163
)
164164
end
165165

166+
"""
167+
SFC._dispatch_single_pass(::DistributedBackend, x, u, distance_bins; kwargs...)
168+
169+
Calculates single-pass structure functions utilizing multi-process Distributed execution.
170+
Uses a process-parallelized loop via `@distributed (+)` and reduces results element-wise.
171+
"""
172+
function SFC._dispatch_single_pass(
173+
::SF.DistributedBackend,
174+
x::AbstractMatrix{FT1},
175+
u::AbstractMatrix{FT2},
176+
distance_bins::AbstractVector{FT3};
177+
kwargs...
178+
) where {FT1 <: Number, FT2 <: Number, FT3 <: Number}
179+
OT = promote_type(float(FT1), float(FT2))
180+
n_bins = length(distance_bins) - 1
181+
n_points = size(x, 2)
182+
183+
# We distribute the outer points loop.
184+
# The reduction operator `+` works element-wise on the returned Float64 matrix of shape (16, n_bins).
185+
combined_reduced = Distributed.@distributed (+) for i in 1:n_points
186+
local_combined = zeros(Float64, 16, n_bins)
187+
x_i = SA.SVector{2, FT1}(x[1, i], x[2, i])
188+
u_i = SA.SVector{2, FT2}(u[1, i], u[2, i])
189+
190+
for j in (i+1):n_points
191+
x_j = SA.SVector{2, FT1}(x[1, j], x[2, j])
192+
193+
dx = SFH.δr(x_i, x_j)
194+
r = LA.norm(dx)
195+
196+
bin_idx = SFH.digitize(r, distance_bins)
197+
198+
if 1 <= bin_idx <= n_bins
199+
u_j = SA.SVector{2, FT2}(u[1, j], u[2, j])
200+
du = u_j - u_i
201+
202+
rh = SFH.(x_i, x_j)
203+
nh = SFH.(rh)
204+
205+
du_L = LA.dot(du, rh)
206+
du_T = LA.dot(du, nh)
207+
208+
du_L2 = du_L * du_L
209+
du_T2 = du_T * du_T
210+
211+
local_combined[1, bin_idx] += du_L2 + du_T2
212+
local_combined[2, bin_idx] += du_L2
213+
local_combined[3, bin_idx] += du_T2
214+
local_combined[4, bin_idx] += du_L * (du_L2 + du_T2)
215+
local_combined[5, bin_idx] += du_L * du_L2
216+
local_combined[6, bin_idx] += du_L2 * du_T
217+
local_combined[7, bin_idx] += du_L * du_T2
218+
local_combined[8, bin_idx] += du_T * du_T2
219+
220+
for t in 9:16
221+
local_combined[t, bin_idx] += 1.0
222+
end
223+
end
224+
end
225+
local_combined
226+
end
227+
228+
sums = OT.(combined_reduced[1:8, :])
229+
counts = Int64.(combined_reduced[9:16, :])
230+
231+
return SFC.postprocess_single_pass_results(sums, counts, distance_bins)
232+
end
233+
166234
end

ext/StructureFunctionsGPUExt.jl

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,5 +453,110 @@ function SFSA.gpu_calculate_spectrum(
453453
return Array(coeffs_dev), ks_phys
454454
end
455455

456+
# ---------------------------------------------------------------------------
457+
# Single-Pass GPU Kernel
458+
# ---------------------------------------------------------------------------
459+
460+
KA.@kernel function _sf_single_pass_kernel!(
461+
output_sums, # Matrix{FT} of size (8, N_bins-1)
462+
output_counts, # Matrix{FT} of size (8, N_bins-1)
463+
@Const(x_mat), # Matrix{FT} of size (2, N_points)
464+
@Const(u_mat), # Matrix{FT} of size (2, N_points)
465+
@Const(distance_bins), # monotone bin edges, length N_bins
466+
N_points::Int,
467+
N_bins::Int,
468+
)
469+
I = @index(Global, NTuple)
470+
i = I[1]
471+
j = I[2]
472+
473+
if i < j
474+
# Static arrays on stack (using 2D coordinates)
475+
X1 = SA.SVector{2}(x_mat[1, i], x_mat[2, i])
476+
X2 = SA.SVector{2}(x_mat[1, j], x_mat[2, j])
477+
U1 = SA.SVector{2}(u_mat[1, i], u_mat[2, i])
478+
U2 = SA.SVector{2}(u_mat[1, j], u_mat[2, j])
479+
480+
dX = X2 - X1
481+
dist = sqrt(dX[1]^2 + dX[2]^2)
482+
483+
bin = SFH.digitize(dist, distance_bins)
484+
485+
if 1 <= bin < N_bins
486+
dU = U2 - U1
487+
= dX / dist
488+
= SA.SVector{2, eltype(x_mat)}(r̂[2], -r̂[1])
489+
490+
du_L = SA.dot(dU, r̂)
491+
du_T = SA.dot(dU, n̂)
492+
493+
du_L2 = du_L * du_L
494+
du_T2 = du_T * du_T
495+
496+
# Atomically accumulate the 8 structure functions
497+
@atomic output_sums[1, bin] += du_L2 + du_T2
498+
@atomic output_sums[2, bin] += du_L2
499+
@atomic output_sums[3, bin] += du_T2
500+
@atomic output_sums[4, bin] += du_L * (du_L2 + du_T2)
501+
@atomic output_sums[5, bin] += du_L * du_L2
502+
@atomic output_sums[6, bin] += du_L2 * du_T
503+
@atomic output_sums[7, bin] += du_L * du_T2
504+
@atomic output_sums[8, bin] += du_T * du_T2
505+
506+
for t in 1:8
507+
@atomic output_counts[t, bin] += one(eltype(output_counts))
508+
end
509+
end
510+
end
511+
end
512+
513+
"""
514+
SFC._dispatch_single_pass(::GPUBackend, x, u, distance_bins; workgroup_size=64, kwargs...)
515+
516+
Calculates single-pass structure functions utilizing GPU-accelerated computing.
517+
"""
518+
function SFC._dispatch_single_pass(
519+
gpu_backend::SF.GPUBackend,
520+
x::AbstractMatrix{FT1},
521+
u::AbstractMatrix{FT2},
522+
distance_bins::AbstractVector{FT3};
523+
workgroup_size::Int = 64,
524+
kwargs...
525+
) where {FT1 <: Number, FT2 <: Number, FT3 <: Number}
526+
backend = gpu_backend.backend
527+
FT = promote_type(float(FT1), float(FT2))
528+
N_dims, N_points = size(x)
529+
n_bins = length(distance_bins) - 1
530+
531+
if N_dims != 2
532+
error("GPUExt: single-pass calculation only supports 2D coordinates (got N_dims=$N_dims)")
533+
end
534+
535+
# Allocate device arrays
536+
x_dev = KA.allocate(backend, FT, 2, N_points)
537+
u_dev = KA.allocate(backend, FT, 2, N_points)
538+
bins_dev = KA.allocate(backend, FT, length(distance_bins))
539+
out_sums_dev = KA.zeros(backend, FT, 8, n_bins)
540+
out_cnts_dev = KA.zeros(backend, FT, 8, n_bins)
541+
542+
copyto!(x_dev, collect(x))
543+
copyto!(u_dev, collect(u))
544+
copyto!(bins_dev, collect(distance_bins))
545+
546+
kernel! = _sf_single_pass_kernel!(backend, workgroup_size)
547+
kernel!(
548+
out_sums_dev, out_cnts_dev,
549+
x_dev, u_dev,
550+
bins_dev,
551+
N_points, length(distance_bins);
552+
ndrange = (N_points, N_points)
553+
)
554+
KA.synchronize(backend)
555+
556+
sums = Array(out_sums_dev)
557+
counts = Int64.(Array(out_cnts_dev))
558+
559+
return SFC.postprocess_single_pass_results(sums, counts, distance_bins)
560+
end
456561

457562
end # module GPUExt

ext/StructureFunctionsOhMyThreadsExt.jl

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ module StructureFunctionsOhMyThreadsExt
22

33
using Distances: Distances as DI
44
using OhMyThreads: OhMyThreads as OMT
5+
using StaticArrays: StaticArrays as SA
6+
using LinearAlgebra: LinearAlgebra as LA
57
using StructureFunctions:
68
StructureFunctions as SF,
79
Calculations as SFC,
810
StructureFunctionObjects as SFO,
9-
StructureFunctionTypes as SFT
11+
StructureFunctionTypes as SFT,
12+
HelperFunctions as SFH
1013

1114
"""
1215
SFC.threaded_calculate_structure_function(sf_type, x_vecs, u_vecs, distance_bins, ::Val{RSAC}; ...)
@@ -174,4 +177,90 @@ function SFC.threaded_calculate_structure_function(
174177
return SF.StructureFunction(structure_function_type, distance_bins, output_div)
175178
end
176179

180+
"""
181+
SFC._dispatch_single_pass(::ThreadedBackend, x, u, distance_bins; thread_sums=nothing, thread_counts=nothing)
182+
183+
Calculates single-pass structure functions utilizing multi-threaded CPU execution with OhMyThreads.jl.
184+
Chunks the loop over points to balance work across tasks, utilizing a pre-allocated thread-local
185+
reduction matrix to prevent memory allocation and write hazards.
186+
"""
187+
function SFC._dispatch_single_pass(
188+
::SF.ThreadedBackend,
189+
x::AbstractMatrix{FT1},
190+
u::AbstractMatrix{FT2},
191+
distance_bins::AbstractVector{FT3};
192+
thread_sums = nothing,
193+
thread_counts = nothing,
194+
kwargs...
195+
) where {FT1 <: Number, FT2 <: Number, FT3 <: Number}
196+
OT = promote_type(float(FT1), float(FT2))
197+
n_bins = length(distance_bins) - 1
198+
n_threads = Threads.nthreads()
199+
200+
# Check/allocate thread-local reduction heaps
201+
ts = isnothing(thread_sums) ? zeros(OT, 8, n_bins, n_threads) : thread_sums
202+
tc = isnothing(thread_counts) ? zeros(Int64, 8, n_bins, n_threads) : thread_counts
203+
204+
fill!(ts, zero(OT))
205+
fill!(tc, 0)
206+
207+
n_points = size(x, 2)
208+
209+
OMT.tforeach(1:n_points) do i
210+
tid = Threads.threadid()
211+
x_i = SA.SVector{2, FT1}(x[1, i], x[2, i])
212+
u_i = SA.SVector{2, FT2}(u[1, i], u[2, i])
213+
214+
for j in (i+1):n_points
215+
x_j = SA.SVector{2, FT1}(x[1, j], x[2, j])
216+
217+
dx = SFH.δr(x_i, x_j)
218+
r = LA.norm(dx)
219+
220+
bin_idx = SFH.digitize(r, distance_bins)
221+
222+
if 1 <= bin_idx <= n_bins
223+
u_j = SA.SVector{2, FT2}(u[1, j], u[2, j])
224+
du = u_j - u_i
225+
226+
rh = SFH.(x_i, x_j)
227+
nh = SFH.(rh)
228+
229+
du_L = LA.dot(du, rh)
230+
du_T = LA.dot(du, nh)
231+
232+
du_L2 = du_L * du_L
233+
du_T2 = du_T * du_T
234+
235+
@inbounds ts[1, bin_idx, tid] += du_L2 + du_T2
236+
@inbounds ts[2, bin_idx, tid] += du_L2
237+
@inbounds ts[3, bin_idx, tid] += du_T2
238+
@inbounds ts[4, bin_idx, tid] += du_L * (du_L2 + du_T2)
239+
@inbounds ts[5, bin_idx, tid] += du_L * du_L2
240+
@inbounds ts[6, bin_idx, tid] += du_L2 * du_T
241+
@inbounds ts[7, bin_idx, tid] += du_L * du_T2
242+
@inbounds ts[8, bin_idx, tid] += du_T * du_T2
243+
244+
for t in 1:8
245+
@inbounds tc[t, bin_idx, tid] += 1
246+
end
247+
end
248+
end
249+
end
250+
251+
# Reduce thread-local slices
252+
sums = zeros(OT, 8, n_bins)
253+
counts = zeros(Int64, 8, n_bins)
254+
for tid in 1:n_threads
255+
for k in 1:n_bins
256+
for t in 1:8
257+
sums[t, k] += ts[t, k, tid]
258+
counts[t, k] += tc[t, k, tid]
259+
end
260+
end
261+
end
262+
263+
return SFC.postprocess_single_pass_results(sums, counts, distance_bins)
264+
end
265+
177266
end

0 commit comments

Comments
 (0)