Skip to content

Commit bf82bf3

Browse files
authored
Fix a few type issues in multithreaded parsing (#944)
* Fix a few type issues in multithreaded parsing Fixes #939. There ended up being a couple issues here. In the originally posted issue, the "hang" was just due to a very slow code path where our `nonstandardtype` call was not correctly ignoring `NeedsTypeDetection`, which is obviously the most common type for columns when no other columns are provided. The "hang" was due to the 60_000 columns, all calling `nonstandardtype`, then `tupcat`, which is a pretty expensive dynamic tuple type creation call. Fixing `nonstandardtype` to correctly ignore `NeedsTypeDetection` speeds up the case dramatically and avoids the `tupcat` call. The other issue found once that "hang" was avoided, was that the new multithreaded detection code was ignoring the `typemap` keyword arg. This also fixes that case by piping it through to the `findrowstarts!` family of functions. Note that the new `downcast=true` keyword arg can be used instead of the original use of `typemap`. * fix 32-bit
1 parent b747df8 commit bf82bf3

4 files changed

Lines changed: 15 additions & 6 deletions

File tree

src/context.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ end
614614
limit = Int(limit)
615615
limitposguess = ceil(Int, (limit / (origrowsguess * 0.8)) * len)
616616
newlen = [0, limitposguess, min(limitposguess * 2, len)]
617-
findrowstarts!(buf, options, newlen, ncols, columns, stringtype, downcast, 5)
617+
findrowstarts!(buf, options, newlen, ncols, columns, stringtype, typemap, downcast, 5)
618618
len = newlen[2] - 1
619619
origrowsguess = limit
620620
debug && println("limiting, adjusting len to $len")
@@ -625,7 +625,7 @@ end
625625
chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i)
626626
end
627627
debug && println("initial byte positions before adjusting for start of rows: $chunkpositions")
628-
avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, downcast, rows_to_check)
628+
avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, typemap, downcast, rows_to_check)
629629
if successfullychunked
630630
origbytesperrow = ((len - datapos) / origrowsguess)
631631
weightedavgbytesperrow = ceil(Int, avgbytesperrow * ((ntasks - 1) / ntasks) + origbytesperrow * (1 / ntasks))

src/detection.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ ColumnProperties(T) = ColumnProperties(T, 0x00)
336336
end
337337
end
338338

339-
function findchunkrowstart(ranges, i, buf, opts, downcast, ncols, rows_to_check, columns, origcoltypes, columnlock, @nospecialize(stringtype), totalbytes, totalrows, succeeded)
339+
function findchunkrowstart(ranges, i, buf, opts, typemap, downcast, ncols, rows_to_check, columns, origcoltypes, columnlock, @nospecialize(stringtype), totalbytes, totalrows, succeeded)
340340
pos = ranges[i]
341341
len = ranges[i + 1]
342342
while pos <= len
@@ -405,7 +405,7 @@ function findchunkrowstart(ranges, i, buf, opts, downcast, ncols, rows_to_check,
405405
if type === stringtype
406406
type = pickstringtype(stringtype, cp.maxstringsize)
407407
end
408-
col.type = type
408+
col.type = get(typemap, type, type)
409409
end
410410
end
411411
end
@@ -458,7 +458,7 @@ end
458458
# right # of expected columns then we move on to the next file chunk byte position. If we fail, we start over
459459
# at the byte position, assuming we were in a quoted field (and encountered a newline inside the quoted
460460
# field the first time through)
461-
function findrowstarts!(buf, opts, ranges, ncols, columns, @nospecialize(stringtype), downcast, rows_to_check=5)
461+
function findrowstarts!(buf, opts, ranges, ncols, columns, @nospecialize(stringtype), typemap, downcast, rows_to_check=5)
462462
totalbytes = Threads.Atomic{Int}(0)
463463
totalrows = Threads.Atomic{Int}(0)
464464
succeeded = Threads.Atomic{Bool}(true)
@@ -467,7 +467,7 @@ function findrowstarts!(buf, opts, ranges, ncols, columns, @nospecialize(stringt
467467
origcoltypes = Type[col.type for col in columns]
468468
@sync for i = 2:(length(ranges) - 1)
469469
Threads.@spawn begin
470-
findchunkrowstart(ranges, i, buf, opts, downcast, ncols, rows_to_check, columns, origcoltypes, lock, stringtype, totalbytes, totalrows, succeeded)
470+
findchunkrowstart(ranges, i, buf, opts, typemap, downcast, ncols, rows_to_check, columns, origcoltypes, lock, stringtype, totalbytes, totalrows, succeeded)
471471
end
472472
end
473473
return totalbytes[] / totalrows[], succeeded[]

src/utils.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ end
7272
@inline function nonstandardtype(T)
7373
T = nonmissingtype(T)
7474
if T === Union{} ||
75+
T === NeedsTypeDetection ||
7576
T isa StringTypes ||
7677
isinttype(T) ||
7778
T === Float16 ||

test/basics.jl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,4 +735,12 @@ f = CSV.File(IOBuffer(data); select=[2], type=Int32)
735735
@test length(f) == 2
736736
@test length(f.names) == 1
737737

738+
# 939
739+
row = join((i == 1 ? string(i + 10000000000) : i == 60_000 ? "0\n" : rand(("-1", "0", "1")) for i = 1:60_000), " ")
740+
data = repeat(row, 271);
741+
f = CSV.File(IOBuffer(data); header=false, types=Dict(1 => String), typemap=Dict(Int => Int8));
742+
@test f.types == [i == 1 ? String : Int8 for i = 1:60_000]
743+
f = CSV.File(IOBuffer(data); header=false, types=Dict(1 => String), downcast=true);
744+
@test f.types == [i == 1 ? String : Int8 for i = 1:60_000]
745+
738746
end

0 commit comments

Comments
 (0)