1- import Base: == , fetch
1+ import Base: == , fetch, length, isempty, size
22
33export DArray, DVector, DMatrix, DVecOrMat, Blocks, AutoBlocks
44export distribute
@@ -83,7 +83,8 @@ isempty(a::ArrayDomain) = length(a) == 0
8383The domain of an array is an ArrayDomain.
8484"""
8585domain (x:: AbstractArray ) = ArrayDomain ([1 : l for l in size (x)])
86-
86+ # Scalar / non-array values (e.g. for Chunk of immediate data)
87+ domain (x:: Any ) = ArrayDomain (())
8788
8889abstract type ArrayOp{T, N} <: AbstractArray{T, N} end
8990Base. IndexStyle (:: Type{<:ArrayOp} ) = IndexCartesian ()
@@ -176,46 +177,28 @@ domainchunks(d::DArray) = d.subdomains
176177size (x:: DArray ) = size (domain (x))
177178stage (ctx, c:: DArray ) = c
178179
179- @warn " Dispatch uniform on acceleration" maxlog= 1
180- @warn " Take D.concat into account" maxlog= 1
181- function Base. collect (D:: DArray{T,N} ; tree= false , copyto= false , uniform:: Bool = true ) where {T,N}
182- if isempty (D. chunks)
183- return Array {eltype(D)} (undef, size (D)... )
180+ function Base. collect (d:: DArray{T,N} ; tree= false , copyto= false ) where {T,N}
181+ a = fetch (d)
182+ if isempty (d. chunks)
183+ return Array {eltype(d)} (undef, size (d)... )
184184 end
185185
186- # Return a scalar, as required by Julia's array interface
187- if ndims (D) == 0
188- return fetch (D. chunks[1 ]; unwrap= true )
186+ if ndims (d) == 0
187+ return fetch (a. chunks[1 ])
189188 end
190189
191- if uniform
192- @assert D. concat === cat " FIXME: Handle non-cat"
193- A = Array {eltype(D)} (undef, size (D)... )
194- DA = view (A, D. partitioning; space= CPURAMMemorySpace ())
195-
196- # Perform the equivalent of `copyto!(DA, D)`, but force local updates
197- # FIXME : Be more parallel?
198- for idx in eachindex (DA. chunks)
199- dest = fetch (DA. chunks[idx]; move_value= false , unwrap= true , uniform= true ):: AbstractArray
200- src = fetch (D. chunks[idx]; move_value= true , unwrap= true , uniform= true ):: AbstractArray
201- copyto! (dest, src)
202- end
190+ if copyto
191+ C = Array {T,N} (undef, size (a))
192+ DC = view (C, Blocks (size (a)... ))
193+ copyto! (DC, a)
194+ return C
195+ end
203196
204- return A
197+ dimcatfuncs = [(x... ) -> d. concat (x... , dims= i) for i in 1 : ndims (d)]
198+ if tree
199+ collect (fetch (treereduce_nd (map (x -> ((args... ,) -> Dagger. @spawn x (args... )) , dimcatfuncs), a. chunks)))
205200 else
206- if copyto
207- C = Array {T,N} (undef, size (D))
208- DC = view (C, Blocks (size (D)... ))
209- copyto! (DC, D)
210- return C
211- end
212-
213- dimcatfuncs = [(x... ) -> D. concat (x... , dims= i) for i in 1 : ndims (D)]
214- if tree
215- collect (fetch (treereduce_nd (map (x -> ((args... ,) -> Dagger. @spawn x (args... )) , dimcatfuncs), D. chunks)))
216- else
217- treereduce_nd (dimcatfuncs, asyncmap (fetch, D. chunks))
218- end
201+ collect (treereduce_nd (dimcatfuncs, asyncmap (fetch, a. chunks)))
219202 end
220203end
221204Array {T,N} (A:: DArray{S,N} ) where {T,N,S} = convert (Array{T,N}, collect (A))
@@ -339,8 +322,8 @@ function Base.isequal(x::ArrayOp, y::ArrayOp)
339322 x === y
340323end
341324
342- Base. similar (:: DArray{T,N} where T, :: Type{S} , dims:: Dims{N} ) where {S,N} =
343- DArray {S,N} (undef, dims)
325+ Base. similar (D :: DArray{T,N} where T, :: Type{S} , dims:: Dims{N} ) where {S,N} =
326+ DArray {S,N} (undef, D . partitioning, dims)
344327
345328Base. copy (x:: DArray{T,N,B,F} ) where {T,N,B,F} =
346329 map (identity, x):: DArray{T,N,B,F}
@@ -406,18 +389,23 @@ function lookup_parts(A::DArray, ps::AbstractArray, subdmns::DomainBlocks{N}, d:
406389end
407390
408391"""
409- Base.fetch(A ::DArray; unwrap::Bool=false, kwargs...) -> DArray
392+ Base.fetch(c ::DArray)
410393
411- Returns a new `DArray` with the same data as `A`, but where all values are
412- fully computed.
394+ If a `DArray` tree has a `Thunk` in it, make the whole thing a big thunk.
413395"""
414- function Base. fetch (A:: DArray{T} ; unwrap:: Bool = false , kwargs... ) where T
415- if any (unwrappable, chunks (A))
416- tasks = map (t-> unwrappable (t) ? fetch (t; unwrap, kwargs... ) : t, chunks (A))
417- B = DArray (T, A. domain, A. subdomains, tasks, A. partitioning, A. concat)
418- return B
396+ function Base. fetch (c:: DArray{T} ) where T
397+ if any (istask, chunks (c))
398+ thunks = chunks (c)
399+ sz = size (thunks)
400+ dmn = domain (c)
401+ dmnchunks = domainchunks (c)
402+ return fetch (Dagger. spawn (Options (meta= true ), thunks... ) do results...
403+ t = eltype (fetch (results[1 ]))
404+ DArray (t, dmn, dmnchunks, reshape (Any[results... ], sz),
405+ c. partitioning, c. concat)
406+ end )
419407 else
420- return A
408+ return c
421409 end
422410end
423411
@@ -518,6 +506,7 @@ auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A))
518506
519507const AssignmentType{N} = Union{Symbol, AbstractArray{<: Int , N}, AbstractArray{<: Processor , N}}
520508
509+ distribute (A:: AbstractArray , assignment:: AssignmentType = :arbitrary ) = distribute (A, AutoBlocks (), assignment)
521510function distribute (A:: AbstractArray{T,N} , dist:: Blocks{N} , assignment:: AssignmentType{N} = :arbitrary ) where {T,N}
522511 procgrid = nothing
523512 availprocs = collect (Dagger. compatible_processors ())
@@ -558,10 +547,8 @@ function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::Assignme
558547 procgrid = assignment
559548 end
560549
561- return _distribute ( current_acceleration () , A, dist, procgrid)
550+ return _to_darray ( Distribute (dist , A, procgrid) )
562551end
563- _distribute (:: DistributedAcceleration , A:: AbstractArray{T,N} , dist:: Blocks{N} , procgrid) where {T,N} =
564- _to_darray (Distribute (dist, A, procgrid))
565552
566553distribute (A:: AbstractArray , :: AutoBlocks , assignment:: AssignmentType = :arbitrary ) = distribute (A, auto_blocks (A), assignment)
567554function distribute (x:: AbstractArray{T,N} , n:: NTuple{N} , assignment:: AssignmentType{N} = :arbitrary ) where {T,N}
@@ -570,6 +557,7 @@ function distribute(x::AbstractArray{T,N}, n::NTuple{N}, assignment::AssignmentT
570557end
571558distribute (x:: AbstractVector , n:: Int , assignment:: AssignmentType{1} = :arbitrary ) = distribute (x, (n,), assignment)
572559
560+
573561DVector (A:: AbstractVector{T} , part:: Blocks{1} , assignment:: AssignmentType{1} = :arbitrary ) where T = distribute (A, part, assignment)
574562DMatrix (A:: AbstractMatrix{T} , part:: Blocks{2} , assignment:: AssignmentType{2} = :arbitrary ) where T = distribute (A, part, assignment)
575563DArray (A:: AbstractArray{T,N} , part:: Blocks{N} , assignment:: AssignmentType{N} = :arbitrary ) where {T,N} = distribute (A, part, assignment)
@@ -582,26 +570,29 @@ DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::AssignmentType{1} = :arb
582570DMatrix (A:: AbstractMatrix{T} , :: AutoBlocks , assignment:: AssignmentType{2} = :arbitrary ) where T = DMatrix (A, auto_blocks (A), assignment)
583571DArray (A:: AbstractArray , :: AutoBlocks , assignment:: AssignmentType = :arbitrary ) = DArray (A, auto_blocks (A), assignment)
584572
585- @warn " Add assignment to undef initializer" maxlog= 1
586- function DArray {T,N} (:: UndefInitializer , dims:: NTuple{N,Int} ) where {T,N}
587- dist = auto_blocks (dims)
588- return DArray {T,N} (undef, dist, dims... )
589- end
590- function DArray {T,N} (:: UndefInitializer , dist:: Blocks{N} , dims:: NTuple{N,Int} ) where {T,N}
591- domain = ArrayDomain (ntuple (i-> 1 : dims[i], N))
573+ struct AllocateUndef{S} end
574+ (:: AllocateUndef{S} )(T, dims:: Dims{N} ) where {S,N} = Array {S,N} (undef, dims)
575+ function DArray {T,N} (:: UndefInitializer , dist:: Blocks{N} , dims:: NTuple{N,Int} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N}
576+ domain = ArrayDomain (map (x-> 1 : x, dims))
592577 subdomains = partition (dist, domain)
593- tasks = Array {DTask,N} (undef, size (subdomains)... )
594- Dagger. spawn_datadeps () do
595- for (i, x) in enumerate (subdomains)
596- tasks[i] = Dagger. @spawn allocate_array_undef (T, size (x))
597- end
598- end
599- return DArray (T, domain, subdomains, tasks, dist)
600- end
601- DArray {T,N} (:: UndefInitializer , dims:: Vararg{Int,N} ) where {T,N} =
602- DArray {T,N} (undef, auto_blocks ((dims... ,)), (dims... ,))
603- DArray {T,N} (:: UndefInitializer , dist:: Blocks{N} , dims:: Vararg{Int,N} ) where {T,N} =
604- DArray {T,N} (undef, dist, (dims... ,))
578+ a = AllocateArray (T, AllocateUndef {T} (), false , domain, subdomains, dist, assignment)
579+ return _to_darray (a)
580+ end
581+ DArray {T,N} (:: UndefInitializer , dist:: Blocks{N} , dims:: Vararg{Int,N} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
582+ DArray {T,N} (undef, dist, (dims... ,); assignment)
583+ DArray {T,N} (:: UndefInitializer , dims:: NTuple{N,Int} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
584+ DArray {T,N} (undef, auto_blocks (dims), dims; assignment)
585+ DArray {T,N} (:: UndefInitializer , dims:: Vararg{Int,N} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
586+ DArray {T,N} (undef, auto_blocks ((dims... ,)), (dims... ,); assignment)
587+
588+ DArray {T} (:: UndefInitializer , dist:: Blocks{N} , dims:: NTuple{N,Int} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
589+ DArray {T,N} (undef, dist, dims; assignment)
590+ DArray {T} (:: UndefInitializer , dist:: Blocks{N} , dims:: Vararg{Int,N} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
591+ DArray {T,N} (undef, dist, (dims... ,); assignment)
592+ DArray {T} (:: UndefInitializer , dims:: NTuple{N,Int} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
593+ DArray {T,N} (undef, auto_blocks (dims), dims; assignment)
594+ DArray {T} (:: UndefInitializer , dims:: Vararg{Int,N} ; assignment:: AssignmentType{N} = :arbitrary ) where {T,N} =
595+ DArray {T,N} (undef, auto_blocks ((dims... ,)), (dims... ,); assignment)
605596
606597function Base.:(== )(x:: ArrayOp{T,N} , y:: AbstractArray{S,N} ) where {T,S,N}
607598 collect (x) == y
622613mapchunk (f, chunk) = tochunk (f (poolget (chunk. handle)))
623614function mapchunks (f, d:: DArray{T,N,F} ) where {T,N,F}
624615 chunks = map (d. chunks) do chunk
625- owner = get_parent (chunk. processor). pid
616+ owner = root_worker_id (chunk. processor)
626617 remotecall_fetch (mapchunk, owner, f, chunk)
627618 end
628619 DArray {T,N,F} (d. domain, d. subdomains, chunks, d. concat)
0 commit comments