-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathParallelTestRunner.jl
More file actions
1053 lines (902 loc) · 34.2 KB
/
Copy pathParallelTestRunner.jl
File metadata and controls
1053 lines (902 loc) · 34.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
module ParallelTestRunner
export runtests, addworkers, addworker
using Malt
using Dates
using Printf: @sprintf
using Base.Filesystem: path_separator
using Statistics
using Scratch
using Serialization
import Test
import Random
import IOCapture
import Test: DefaultTestSet
function anynonpass(ts::Test.AbstractTestSet)
@static if VERSION >= v"1.13.0-DEV.1037"
return Test.anynonpass(ts)
else
Test.get_test_counts(ts)
return ts.anynonpass
end
end
#Always set the max rss so that if tests add large global variables (which they do) we don't make the GC's life too hard
if Sys.WORD_SIZE == 64
const JULIA_TEST_MAXRSS_MB = 3800
else
# Assume that we only have 3.5GB available to a single process, and that a single
# test can take up to 2GB of RSS. This means that we should instruct the test
# framework to restart any worker that comes into a test set with 1.5GB of RSS.
const JULIA_TEST_MAXRSS_MB = 1536
end
const max_worker_rss = JULIA_TEST_MAXRSS_MB * 2^20
# parse some command-line arguments
function extract_flag!(args, flag, default = nothing; typ = typeof(default))
for f in args
if startswith(f, flag)
# Check if it's just `--flag` or if it's `--flag=foo`
if f != flag
val = split(f, '=')[2]
if !(typ === Nothing || typ <: AbstractString)
val = parse(typ, val)
end
else
val = default
end
# Drop this value from our args
filter!(x -> x != f, args)
return (true, val)
end
end
return (false, default)
end
function with_testset(f, testset)
@static if VERSION >= v"1.13.0-DEV.1044"
Test.@with_testset testset f()
else
Test.push_testset(testset)
try
f()
finally
Test.pop_testset()
end
end
return nothing
end
if VERSION >= v"1.13.0-DEV.1044"
using Base.ScopedValues
end
abstract type AbstractTestRecord end
struct TestRecord <: AbstractTestRecord
value::DefaultTestSet
output::String # captured stdout/stderr
# stats
time::Float64
bytes::UInt64
gctime::Float64
rss::UInt64
end
function memory_usage(rec::TestRecord)
return rec.rss
end
function Base.getindex(rec::TestRecord)
return rec.value
end
#
# overridable I/O context for pretty-printing
#
struct TestIOContext
stdout::IO
stderr::IO
color::Bool
lock::ReentrantLock
name_align::Int
elapsed_align::Int
gc_align::Int
percent_align::Int
alloc_align::Int
rss_align::Int
end
function test_IOContext(::Type{TestRecord}, stdout::IO, stderr::IO, lock::ReentrantLock, name_align::Int)
elapsed_align = textwidth("Time (s)")
gc_align = textwidth("GC (s)")
percent_align = textwidth("GC %")
alloc_align = textwidth("Alloc (MB)")
rss_align = textwidth("RSS (MB)")
color = get(stdout, :color, false)
return TestIOContext(
stdout, stderr, color, lock, name_align, elapsed_align, gc_align, percent_align,
alloc_align, rss_align
)
end
function print_header(::Type{TestRecord}, ctx::TestIOContext, testgroupheader, workerheader)
lock(ctx.lock)
try
printstyled(ctx.stdout, " "^(ctx.name_align + textwidth(testgroupheader) - 3), " │ ")
printstyled(ctx.stdout, " │ ──────────────── CPU ──────────────── │\n", color = :white)
printstyled(ctx.stdout, testgroupheader, color = :white)
printstyled(ctx.stdout, lpad(workerheader, ctx.name_align - textwidth(testgroupheader) + 1), " │ ", color = :white)
printstyled(ctx.stdout, "Time (s) │ GC (s) │ GC % │ Alloc (MB) │ RSS (MB) │\n", color = :white)
flush(ctx.stdout)
finally
unlock(ctx.lock)
end
end
function print_test_started(::Type{TestRecord}, wrkr, test, ctx::TestIOContext)
lock(ctx.lock)
try
printstyled(ctx.stdout, test, lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " │", color = :white)
printstyled(
ctx.stdout,
" "^ctx.elapsed_align, "started at $(now())\n", color = :light_black
)
flush(ctx.stdout)
finally
unlock(ctx.lock)
end
end
function print_test_finished(record::TestRecord, wrkr, test, ctx::TestIOContext)
lock(ctx.lock)
try
printstyled(ctx.stdout, test, color = :white)
printstyled(ctx.stdout, lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " │ ", color = :white)
time_str = @sprintf("%7.2f", record.time)
printstyled(ctx.stdout, lpad(time_str, ctx.elapsed_align, " "), " │ ", color = :white)
gc_str = @sprintf("%5.2f", record.gctime)
printstyled(ctx.stdout, lpad(gc_str, ctx.gc_align, " "), " │ ", color = :white)
percent_str = @sprintf("%4.1f", 100 * record.gctime / record.time)
printstyled(ctx.stdout, lpad(percent_str, ctx.percent_align, " "), " │ ", color = :white)
alloc_str = @sprintf("%5.2f", record.bytes / 2^20)
printstyled(ctx.stdout, lpad(alloc_str, ctx.alloc_align, " "), " │ ", color = :white)
rss_str = @sprintf("%5.2f", record.rss / 2^20)
printstyled(ctx.stdout, lpad(rss_str, ctx.rss_align, " "), " │\n", color = :white)
flush(ctx.stdout)
finally
unlock(ctx.lock)
end
end
function print_test_failed(record::TestRecord, wrkr, test, ctx::TestIOContext)
lock(ctx.lock)
try
printstyled(ctx.stderr, test, color = :red)
printstyled(
ctx.stderr,
lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " |"
, color = :red
)
time_str = @sprintf("%7.2f", record.time)
printstyled(ctx.stderr, lpad(time_str, ctx.elapsed_align + 1, " "), " │", color = :red)
failed_str = "failed at $(now())\n"
# 11 -> 3 from " | " 3x and 2 for each " " on either side
fail_align = (11 + ctx.gc_align + ctx.percent_align + ctx.alloc_align + ctx.rss_align - textwidth(failed_str)) ÷ 2 + textwidth(failed_str)
failed_str = lpad(failed_str, fail_align, " ")
printstyled(ctx.stderr, failed_str, color = :red)
# TODO: print other stats?
flush(ctx.stderr)
finally
unlock(ctx.lock)
end
end
function print_test_crashed(::Type{TestRecord}, wrkr, test, ctx::TestIOContext)
lock(ctx.lock)
try
printstyled(ctx.stderr, test, color = :red)
printstyled(
ctx.stderr,
lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " |",
" "^ctx.elapsed_align, " crashed at $(now())\n", color = :red
)
flush(ctx.stderr)
finally
unlock(ctx.lock)
end
end
#
# entry point
#
"""
WorkerTestSet
A test set wrapper used internally by worker processes.
`Base.DefaultTestSet` detects when it is the top-most and throws
a `TestSetException` containing very little information. By inserting this
wrapper as the top-most test set, we can capture the full results.
"""
mutable struct WorkerTestSet <: Test.AbstractTestSet
const name::String
wrapped_ts::Test.DefaultTestSet
function WorkerTestSet(name::AbstractString)
new(name)
end
end
function Test.record(ts::WorkerTestSet, res)
@assert res isa Test.DefaultTestSet
@assert !isdefined(ts, :wrapped_ts)
ts.wrapped_ts = res
return nothing
end
function Test.finish(ts::WorkerTestSet)
# This testset is just a placeholder so it must be the top-most
@assert Test.get_testset_depth() == 0
@assert isdefined(ts, :wrapped_ts)
# Return the wrapped_ts so that we don't need to handle WorkerTestSet anywhere else
return ts.wrapped_ts
end
function runtest(::Type{TestRecord}, f, name, init_code, color)
function inner()
# generate a temporary module to execute the tests in
mod = @eval(Main, module $(gensym(name)) end)
@eval(mod, import ParallelTestRunner: Test, Random)
@eval(mod, using .Test, .Random)
# Both bindings must be imported since `@testset` can't handle fully-qualified names when VERSION < v"1.11.0-DEV.1518".
@eval(mod, import ParallelTestRunner: WorkerTestSet)
@eval(mod, import Test: DefaultTestSet)
Core.eval(mod, init_code)
data = @eval mod begin
GC.gc(true)
Random.seed!(1)
mktemp() do path, io
stats = redirect_stdio(stdout=io, stderr=io) do
# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
@timed @testset WorkerTestSet "placeholder" begin
@testset DefaultTestSet $name begin
$f
end
end
end
close(io)
output = read(path, String)
(; testset=stats.value, output, stats.time, stats.bytes, stats.gctime)
end
end
# process results
rss = Sys.maxrss()
record = TestRecord(data..., rss)
GC.gc(true)
return record
end
@static if VERSION >= v"1.13.0-DEV.1044"
@with Test.TESTSET_PRINT_ENABLE => false begin
inner()
end
else
old_print_setting = Test.TESTSET_PRINT_ENABLE[]
Test.TESTSET_PRINT_ENABLE[] = false
try
inner()
finally
Test.TESTSET_PRINT_ENABLE[] = old_print_setting
end
end
end
@static if Sys.isapple()
mutable struct VmStatistics64
free_count::UInt32
active_count::UInt32
inactive_count::UInt32
wire_count::UInt32
zero_fill_count::UInt64
reactivations::UInt64
pageins::UInt64
pageouts::UInt64
faults::UInt64
cow_faults::UInt64
lookups::UInt64
hits::UInt64
purges::UInt64
purgeable_count::UInt32
speculative_count::UInt32
decompressions::UInt64
compressions::UInt64
swapins::UInt64
swapouts::UInt64
compressor_page_count::UInt32
throttled_count::UInt32
external_page_count::UInt32
internal_page_count::UInt32
total_uncompressed_pages_in_compressor::UInt64
VmStatistics64() = new(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
end
function available_memory()
vms = Ref{VmStatistics64}(VmStatistics64())
mach_host_self = @ccall mach_host_self()::UInt32
count = UInt32(sizeof(VmStatistics64) ÷ sizeof(Int32))
ref_count = Ref(count)
@ccall host_statistics64(mach_host_self::UInt32, 4::Int64, pointer_from_objref(vms[])::Ptr{Int64}, ref_count::Ref{UInt32})::Int64
page_size = Int(@ccall sysconf(29::UInt32)::UInt32)
return (Int(vms[].free_count) + Int(vms[].inactive_count) + Int(vms[].purgeable_count)) * page_size
end
else
available_memory() = Sys.free_memory()
end
# This is an internal function, not to be used by end users. The keyword
# arguments are only for testing purposes.
"""
default_njobs()
Determine default number of parallel jobs.
"""
function default_njobs(; cpu_threads = Sys.CPU_THREADS, free_memory = available_memory())
jobs = cpu_threads
memory_jobs = Int64(free_memory) ÷ (2 * 2^30)
return max(1, min(jobs, memory_jobs))
end
# Historical test duration database
function get_history_file(mod::Module)
scratch_dir = @get_scratch!("durations")
return joinpath(scratch_dir, "v$(VERSION.major).$(VERSION.minor)", "$(nameof(mod)).jls")
end
function load_test_history(mod::Module)
history_file = get_history_file(mod)
if isfile(history_file)
try
return deserialize(history_file)
catch e
@warn "Failed to load test history from $history_file" exception=e
return Dict{String, Float64}()
end
else
return Dict{String, Float64}()
end
end
function save_test_history(mod::Module, history::Dict{String, Float64})
history_file = get_history_file(mod)
try
mkpath(dirname(history_file))
serialize(history_file, history)
catch e
@warn "Failed to save test history to $history_file" exception=e
end
end
function test_exe()
test_exeflags = Base.julia_cmd()
filter!(test_exeflags.exec) do c
!(startswith(c, "--depwarn") || startswith(c, "--check-bounds"))
end
push!(test_exeflags.exec, "--check-bounds=yes")
push!(test_exeflags.exec, "--startup-file=no")
push!(test_exeflags.exec, "--depwarn=yes")
push!(test_exeflags.exec, "--project=$(Base.active_project())")
return test_exeflags
end
# Map PIDs to logical worker IDs
# Malt doesn't have a global worker ID, and PID make printing ugly
const WORKER_IDS = Dict{Int32, Int32}()
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]
"""
addworkers(X; kwargs...)
Add `X` worker processes.
"""
addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X]
function addworker(; env=Vector{Pair{String, String}}())
exe = test_exe()
exeflags = exe[2:end]
push!(env, "JULIA_NUM_THREADS" => "1")
# Malt already sets OPENBLAS_NUM_THREADS to 1
push!(env, "OPENBLAS_NUM_THREADS" => "1")
wrkr = Malt.Worker(;exeflags, env)
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
return wrkr
end
"""
runtests(mod::Module, ARGS; RecordType = TestRecord,
test_filter = Returns(true),
custom_tests = Dict(),
init_code = :(),
test_worker = Returns(nothing),
stdout = Base.stdout,
stderr = Base.stderr)
Run Julia tests in parallel across multiple worker processes.
## Arguments
- `mod`: The module calling runtests
- `ARGS`: Command line arguments array, typically from `Base.ARGS`. When you run the tests
with `Pkg.test`, this can be changed with the `test_args` keyword argument.
Several keyword arguments are also supported:
- `RecordType`: Type of test record to use for tracking test results (default: `TestRecord`)
- `test_filter`: Optional function to filter which tests to run (default: run all tests)
- `custom_tests`: Optional dictionary of custom tests, mapping test names to expressions.
- `init_code`: Code use to initialize each test's sandbox module (e.g., import auxiliary
packages, define constants, etc).
- `test_worker`: Optional function that takes a test name and returns a specific worker.
When returning `nothing`, the test will be assigned to any available default worker.
- `stdout` and `stderr`: I/O streams to write to (default: `Base.stdout` and `Base.stderr`)
## Command Line Options
- `--help`: Show usage information and exit
- `--list`: List all available test files and exit
- `--verbose`: Print more detailed information during test execution
- `--quickfail`: Stop the entire test run as soon as any test fails
- `--jobs=N`: Use N worker processes (default: based on CPU threads and available memory)
- `TESTS...`: Filter tests by name, matched using `startswith`
## Behavior
- Automatically discovers all `.jl` files in the test directory (excluding `runtests.jl`)
- Sorts tests by file size (largest first) for load balancing
- Launches worker processes with appropriate Julia flags for testing
- Monitors memory usage and recycles workers that exceed memory limits
- Provides real-time progress output with timing and memory statistics
- Handles interruptions gracefully (Ctrl+C)
- Returns nothing, but throws `Test.FallbackTestSetException` if any tests fail
## Examples
```julia
# Run all tests with default settings
runtests(MyModule, ARGS)
# Run only tests matching "integration"
runtests(MyModule, ["integration"])
# Run with custom filter function
runtests(MyModule, ARGS; test_filter = test -> occursin("unit", test))
# Use custom test record type
runtests(MyModule, ARGS; RecordType = MyCustomTestRecord)
```
## Memory Management
Workers are automatically recycled when they exceed memory limits to prevent out-of-memory
issues during long test runs. The memory limit is set based on system architecture.
"""
function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = TestRecord,
custom_tests::Dict{String, Expr}=Dict{String, Expr}(), init_code = :(),
test_worker = Returns(nothing), stdout = Base.stdout, stderr = Base.stderr)
#
# set-up
#
do_help, _ = extract_flag!(ARGS, "--help")
if do_help
println(
"""
Usage: runtests.jl [--help] [--list] [--jobs=N] [TESTS...]
--help Show this text.
--list List all available tests.
--verbose Print more information during testing.
--quickfail Fail the entire run as soon as a single test errored.
--jobs=N Launch `N` processes to perform tests.
Remaining arguments filter the tests that will be executed."""
)
exit(0)
end
set_jobs, jobs = extract_flag!(ARGS, "--jobs"; typ = Int)
do_verbose, _ = extract_flag!(ARGS, "--verbose")
do_quickfail, _ = extract_flag!(ARGS, "--quickfail")
do_list, _ = extract_flag!(ARGS, "--list")
## no options should remain
optlike_args = filter(startswith("-"), ARGS)
if !isempty(optlike_args)
error("Unknown test options `$(join(optlike_args, " "))` (try `--help` for usage instructions)")
end
WORKDIR = pwd()
# choose tests
tests = []
test_runners = Dict()
## custom tests by the user
for (name, runner) in custom_tests
push!(tests, name)
test_runners[name] = runner
end
## files in the test folder
for (rootpath, dirs, files) in walkdir(WORKDIR)
# find Julia files
filter!(files) do file
endswith(file, ".jl") && file !== "runtests.jl"
end
isempty(files) && continue
# strip extension
files = map(files) do file
file[1:(end - 3)]
end
# prepend subdir
subdir = relpath(rootpath, WORKDIR)
if subdir != "."
files = map(files) do file
joinpath(subdir, file)
end
end
# unify path separators
files = map(files) do file
replace(file, path_separator => '/')
end
append!(tests, files)
for file in files
test_runners[file] = quote
include($(joinpath(WORKDIR, file * ".jl")))
end
end
end
## finalize
unique!(tests)
Random.shuffle!(tests)
historical_durations = load_test_history(mod)
sort!(tests, by = x -> -get(historical_durations, x, Inf))
# list tests, if requested
if do_list
println(stdout, "Available tests:")
for test in sort(tests)
println(stdout, " - $test")
end
exit(0)
end
# filter tests
if isempty(ARGS)
filter!(test_filter, tests)
else
# let the user filter
filter!(tests) do test
any(arg -> startswith(test, arg), ARGS)
end
end
# determine parallelism
if !set_jobs
jobs = default_njobs()
end
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $jobs tests in parallel. If this is too many, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
workers = addworkers(min(jobs, length(tests)))
nworkers = length(workers)
t0 = time()
results = []
running_tests = Dict{String, Tuple{Int, Float64}}() # test => (worker, start_time)
test_lock = ReentrantLock() # to protect crucial access to tests and running_tests
done = false
function stop_work()
if !done
done = true
for task in worker_tasks
task == current_task() && continue
Base.istaskdone(task) && continue
try; schedule(task, InterruptException(); error=true); catch; end
end
end
end
#
# output
#
# pretty print information about gc and mem usage
testgroupheader = "Test"
workerheader = "(Worker)"
name_align = maximum(
[
textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader);
map(x -> textwidth(x) + 5, tests)
]
)
print_lock = stdout isa Base.LibuvStream ? stdout.lock : ReentrantLock()
if stderr isa Base.LibuvStream
stderr.lock = print_lock
end
io_ctx = test_IOContext(RecordType, stdout, stderr, print_lock, name_align)
print_header(RecordType, io_ctx, testgroupheader, workerheader)
status_lines_visible = Ref(0)
function clear_status()
if status_lines_visible[] > 0
for i in 1:status_lines_visible[]-1
print(io_ctx.stdout, "\033[2K") # Clear entire line
print(io_ctx.stdout, "\033[1A") # Move up one line
end
print(io_ctx.stdout, "\r") # Move to start of line
status_lines_visible[] = 0
end
end
function update_status()
# only draw if we have something to show
isempty(running_tests) && return
completed = length(results)
total = completed + length(tests) + length(running_tests)
# line 1: empty line
line1 = ""
# line 2: running tests
test_list = sort(collect(running_tests), by = x -> x[2][2])
status_parts = map(test_list) do (test, (wrkr, _))
"$test ($wrkr)"
end
line2 = "Running: " * join(status_parts, ", ")
## truncate
max_width = displaysize(io_ctx.stdout)[2]
if length(line2) > max_width
line2 = line2[1:max_width-3] * "..."
end
# line 3: progress + ETA
line3 = "Progress: $completed/$total tests completed"
if completed > 0
# estimate per-test time (slightly pessimistic)
durations_done = [end_time - start_time for (_, _, start_time, end_time) in results]
μ = mean(durations_done)
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
est_per_test = μ + 0.5σ
est_remaining = 0.0
## currently-running
for (test, (_, start_time)) in running_tests
elapsed = time() - start_time
duration = get(historical_durations, test, est_per_test)
est_remaining += max(0.0, duration - elapsed)
end
## yet-to-run
for test in tests
est_remaining += get(historical_durations, test, est_per_test)
end
eta_sec = est_remaining / jobs
eta_mins = round(Int, eta_sec / 60)
line3 *= " | ETA: ~$eta_mins min"
end
# only display the status bar on actual terminals
# (but make sure we cover this code in CI)
if io_ctx.stdout isa Base.TTY
clear_status()
println(io_ctx.stdout, line1)
println(io_ctx.stdout, line2)
print(io_ctx.stdout, line3)
flush(io_ctx.stdout)
status_lines_visible[] = 3
end
end
# Message types for the printer channel
# (:started, test_name, worker_id)
# (:finished, test_name, worker_id, record)
# (:crashed, test_name, worker_id, test_time)
printer_channel = Channel{Tuple}(100)
printer_task = @async begin
last_status_update = Ref(time())
try
while isopen(printer_channel) || isready(printer_channel)
got_message = false
while isready(printer_channel)
# Try to get a message from the channel (with timeout)
msg = take!(printer_channel)
got_message = true
msg_type = msg[1]
if msg_type == :started
test_name, wrkr = msg[2], msg[3]
# Optionally print verbose started message
if do_verbose
clear_status()
print_test_started(RecordType, wrkr, test_name, io_ctx)
end
elseif msg_type == :finished
test_name, wrkr, record = msg[2], msg[3], msg[4]
clear_status()
if anynonpass(record[])
print_test_failed(record, wrkr, test_name, io_ctx)
else
print_test_finished(record, wrkr, test_name, io_ctx)
end
elseif msg_type == :crashed
test_name, wrkr = msg[2], msg[3]
clear_status()
print_test_crashed(RecordType, wrkr, test_name, io_ctx)
end
end
# After a while, display a status line
if !done && time() - t0 >= 5 && (got_message || (time() - last_status_update[] >= 1))
update_status()
last_status_update[] = time()
end
isopen(printer_channel) && sleep(0.1)
end
catch ex
if isa(ex, InterruptException)
# the printer should keep on running,
# but we need to signal other tasks to stop
stop_work()
else
rethrow()
end
isa(ex, InterruptException) || rethrow()
finally
if isempty(tests) && isempty(running_tests)
# XXX: only erase the status if we completed successfully.
# in other cases we'll have printed "caught interrupt"
clear_status()
end
end
end
#
# execution
#
worker_tasks = Task[]
for p in workers
push!(worker_tasks, @async begin
while !done
# if a worker failed, spawn a new one
if !Malt.isrunning(p)
p = addworker()
end
# get a test to run
test, wrkr, test_t0 = Base.@lock test_lock begin
isempty(tests) && break
test = popfirst!(tests)
wrkr = something(test_worker(test), p)
test_t0 = time()
running_tests[test] = (worker_id(wrkr), test_t0)
test, wrkr, test_t0
end
# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr, runtest, RecordType, test_runners[test], test,
init_code, io_ctx.color)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
stop_work()
break
end
ex
end
test_t1 = time()
push!(results, (test, result, test_t0, test_t1))
# act on the results
if result isa AbstractTestRecord
@assert result isa RecordType
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
if memory_usage(result) > max_worker_rss
# the worker has reached the max-rss limit, recycle it
# so future tests start with a smaller working set
Malt.stop(wrkr)
end
else
# One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException
@assert result isa Exception
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if do_quickfail
stop_work()
end
# the worker encountered some serious failure, recycle it
Malt.stop(wrkr)
end
# get rid of the custom worker
if wrkr != p
Malt.stop(wrkr)
end
delete!(running_tests, test)
end
end)
end
#
# finalization
#
# monitor worker tasks for failure so that each one doesn't need a try/catch + stop_work()
try
while true
if any(istaskfailed, worker_tasks)
println(io_ctx.stderr, "\nCaught an error, stopping...")
break
elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests))
break
end
sleep(1)
end
catch err
# in case the sleep got interrupted
isa(err, InterruptException) || rethrow()
finally
stop_work()
end
# wait for the printer to finish so that all results have been printed
close(printer_channel)
wait(printer_task)
# wait for worker tasks to catch unhandled exceptions
for task in worker_tasks
try
wait(task)
catch err
# unwrap TaskFailedException
while isa(err, TaskFailedException)
err = current_exceptions(err.task)[1].exception
end
isa(err, InterruptException) || rethrow()
end
end
@async rmprocs(; waitfor=0)
# print the output generated by each testset
for (testname, result, start, stop) in results
if isa(result, AbstractTestRecord) && !isempty(result.output)
println(io_ctx.stdout, "\nOutput generated during execution of '$testname':")
lines = collect(eachline(IOBuffer(result.output)))
for (i,line) in enumerate(lines)
prefix = if length(lines) == 1
"["
elseif i == 1
"┌"
elseif i == length(lines)
"└"
else
"│"
end
println(io_ctx.stdout, prefix, " ", line)
end
end
end
# process test results and convert into a testset
function create_testset(name; start=nothing, stop=nothing, kwargs...)
if start === nothing
testset = Test.DefaultTestSet(name; kwargs...)
elseif VERSION >= v"1.13.0-DEV.1297"
testset = Test.DefaultTestSet(name; time_start=start, kwargs...)
elseif VERSION < v"1.13.0-DEV.1037"
testset = Test.DefaultTestSet(name; kwargs...)
testset.time_start = start
else
# no way to set time_start retroactively
testset = Test.DefaultTestSet(name; kwargs...)
end
if stop !== nothing
if VERSION < v"1.13.0-DEV.1037"
testset.time_end = stop
elseif VERSION >= v"1.13.0-DEV.1297"
@atomic testset.time_end = stop
else
# if we can't set the start time, also don't set a stop one
# to avoid negative timings
end
end
return testset
end
t1 = time()
o_ts = create_testset("Overall"; start=t0, stop=t1, verbose=do_verbose)
function collect_results()
with_testset(o_ts) do
completed_tests = Set{String}()
for (testname, result, start, stop) in results
push!(completed_tests, testname)
if result isa AbstractTestRecord
testset = result[]::DefaultTestSet
historical_durations[testname] = stop - start
else
# If this test raised an exception that means the test runner itself had some problem,
# so we may have hit a segfault, deserialization errors or something similar.
# Record this testset as Errored.
# One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException
@assert result isa Exception
testset = create_testset(testname; start, stop)
Test.record(testset, Test.Error(:nontest_error, testname, nothing, Base.ExceptionStack(NamedTuple[(;exception = result, backtrace = [])]), LineNumberNode(1)))
end
with_testset(testset) do
Test.record(o_ts, testset)
end
end
# mark remaining or running tests as interrupted
for test in [tests; collect(keys(running_tests))]