Skip to content

Commit d85dee9

Browse files
authored
Track threads & futures spawned in integration tests to make sure logs are in chronological order (#4293)
1 parent 1ef4ff7 commit d85dee9

5 files changed

Lines changed: 236 additions & 42 deletions

File tree

modules/integration/src/test/scala/scala/cli/integration/BspSuite.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,24 @@ trait BspSuite { this: ScalaCliSuite =>
126126
val stdErrPathOpt: Option[os.ProcessOutput] = stdErrOpt.map(path => root / path)
127127
val stderr: os.ProcessOutput = stdErrPathOpt.getOrElse(os.Inherit)
128128

129-
val proc = os.proc(TestUtil.cli, "bsp", bspOptions ++ extraOptionsOverride, args)
130-
.spawn(cwd = root, stderr = stderr, env = bspEnvs)
129+
val proc = trackSubprocess(
130+
os.proc(TestUtil.cli, "bsp", bspOptions ++ extraOptionsOverride, args)
131+
.spawn(cwd = root, stderr = stderr, env = bspEnvs)
132+
)
131133
var remoteServer
132134
: b.BuildServer & b.ScalaBuildServer & b.JavaBuildServer & b.JvmBuildServer &
133135
TestBspClient.WrappedSourcesBuildServer = null
134136

135137
val bspServerExited = Promise[Unit]()
136-
val t = new Thread("bsp-server-watcher") {
137-
setDaemon(true)
138-
override def run() = {
139-
proc.join()
140-
bspServerExited.success(())
138+
val t = trackThread(
139+
new Thread("bsp-server-watcher") {
140+
setDaemon(true)
141+
override def run() = {
142+
proc.join()
143+
bspServerExited.success(())
144+
}
141145
}
142-
}
146+
)
143147
t.start()
144148

145149
def whileBspServerIsRunning[T](f: Future[T]): Future[T] = {
@@ -153,8 +157,9 @@ trait BspSuite { this: ScalaCliSuite =>
153157
}
154158

155159
try {
156-
val (localClient, remoteServer0, _) =
160+
val (localClient, remoteServer0, listeningFuture) =
157161
TestBspClient.connect(proc.stdout, proc.stdin, pool)
162+
trackFuture(listeningFuture)
158163
remoteServer = remoteServer0
159164
val initRes: b.InitializeBuildResult = Await.result(
160165
whileBspServerIsRunning(
@@ -179,6 +184,7 @@ trait BspSuite { this: ScalaCliSuite =>
179184
proc.destroy()
180185
proc.join(2.seconds.toMillis)
181186
proc.destroy(shutdownGracePeriod = 0)
187+
proc.join()
182188
}
183189
}
184190

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package scala.cli.integration
2+
3+
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
4+
5+
import scala.concurrent.duration.FiniteDuration
6+
import scala.concurrent.{Await, Future}
7+
import scala.util.control.NonFatal
8+
9+
private[integration] class ResourceTracker {
10+
private val subprocesses = new ConcurrentLinkedQueue[os.SubProcess]
11+
private val threads = new ConcurrentLinkedQueue[Thread]
12+
private val futures = new ConcurrentLinkedQueue[Future[?]]
13+
protected val drainTimeout: FiniteDuration = FiniteDuration(5, TimeUnit.SECONDS)
14+
private val threadJoinTimeoutMillis = drainTimeout.toMillis
15+
16+
def trackSubprocess[P <: os.SubProcess](proc: P): P = {
17+
subprocesses.add(proc)
18+
proc
19+
}
20+
21+
def trackThread[T <: Thread](thread: T): T = {
22+
threads.add(thread)
23+
thread
24+
}
25+
26+
def trackFuture[F <: Future[?]](future: F): F = {
27+
futures.add(future)
28+
future
29+
}
30+
31+
def clear(): Unit = {
32+
subprocesses.clear()
33+
threads.clear()
34+
futures.clear()
35+
}
36+
37+
def drain(): Unit = {
38+
drainAll(subprocesses, drainSubprocess)
39+
drainAll(threads, drainThread)
40+
drainAll(futures, drainFuture)
41+
}
42+
43+
private def drainAll[A](queue: ConcurrentLinkedQueue[A], drain: A => Unit): Unit = {
44+
var next = queue.poll()
45+
while next != null do
46+
drain(next)
47+
next = queue.poll()
48+
}
49+
50+
protected def drainSubprocess(proc: os.SubProcess): Unit =
51+
try
52+
if proc.isAlive() then
53+
proc.destroy()
54+
proc.join(drainTimeout.toMillis)
55+
if proc.isAlive() then
56+
proc.destroy(shutdownGracePeriod = 0)
57+
proc.join(drainTimeout.toMillis)
58+
catch case NonFatal(_) => ()
59+
60+
protected def drainThread(thread: Thread): Unit =
61+
try thread.join(threadJoinTimeoutMillis)
62+
catch case NonFatal(_) => ()
63+
64+
protected def drainFuture(future: Future[?]): Unit =
65+
try Await.ready(future, drainTimeout)
66+
catch case NonFatal(_) => ()
67+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package scala.cli.integration
2+
3+
import com.eed3si9n.expecty.Expecty.expect
4+
5+
import java.util.concurrent.ConcurrentLinkedQueue
6+
7+
import scala.concurrent.{Future, Promise}
8+
import scala.jdk.CollectionConverters.*
9+
import scala.util.Properties
10+
11+
class ResourceTrackerTests extends munit.FunSuite {
12+
13+
private final class RecordingResourceTracker(events: ConcurrentLinkedQueue[String])
14+
extends ResourceTracker {
15+
override protected def drainSubprocess(proc: os.SubProcess): Unit =
16+
events.add("subprocess")
17+
18+
override protected def drainThread(thread: Thread): Unit =
19+
events.add("thread")
20+
21+
override protected def drainFuture(future: Future[?]): Unit =
22+
events.add("future")
23+
}
24+
25+
private final class NoOpResourceTracker extends ResourceTracker {
26+
override protected def drainSubprocess(proc: os.SubProcess): Unit = ()
27+
28+
override protected def drainThread(thread: Thread): Unit = ()
29+
30+
override protected def drainFuture(future: Future[?]): Unit = ()
31+
}
32+
33+
private def eventsToList(events: ConcurrentLinkedQueue[String]): List[String] =
34+
events.iterator().asScala.toList
35+
36+
private def shortLivedSubprocess(): os.SubProcess =
37+
if Properties.isWin then os.proc("cmd", "/c", "exit", "0").spawn()
38+
else os.proc("true").spawn()
39+
40+
test("drain order is subprocess -> thread -> future") {
41+
val events = new ConcurrentLinkedQueue[String]
42+
val tracker = new RecordingResourceTracker(events)
43+
44+
val future1 = Promise[Unit]().future
45+
val future2 = Promise[Unit]().future
46+
val thread = new Thread(() => ())
47+
val proc1 = shortLivedSubprocess()
48+
val proc2 = shortLivedSubprocess()
49+
50+
tracker.trackFuture(future1)
51+
tracker.trackSubprocess(proc1)
52+
tracker.trackThread(thread)
53+
tracker.trackFuture(future2)
54+
tracker.trackSubprocess(proc2)
55+
56+
tracker.drain()
57+
58+
expect(
59+
eventsToList(events) == List("subprocess", "subprocess", "thread", "future", "future")
60+
)
61+
}
62+
63+
test("drain consumes tracked resources (idempotent on empty)") {
64+
val events = new ConcurrentLinkedQueue[String]
65+
val tracker = new RecordingResourceTracker(events)
66+
67+
tracker.trackFuture(Promise[Unit]().future)
68+
tracker.trackSubprocess(shortLivedSubprocess())
69+
tracker.drain()
70+
val afterFirstDrain = eventsToList(events)
71+
72+
tracker.drain()
73+
74+
expect(eventsToList(events) == afterFirstDrain)
75+
}
76+
77+
test("clear discards without invoking drain hooks") {
78+
val events = new ConcurrentLinkedQueue[String]
79+
val tracker = new RecordingResourceTracker(events)
80+
81+
tracker.trackFuture(Promise[Unit]().future)
82+
tracker.trackSubprocess(shortLivedSubprocess())
83+
tracker.trackThread(new Thread(() => ()))
84+
tracker.clear()
85+
tracker.drain()
86+
87+
expect(eventsToList(events) == Nil)
88+
}
89+
90+
test("track* returns the tracked instance") {
91+
val tracker = new NoOpResourceTracker
92+
val thread = new Thread(() => ())
93+
val future = Promise[Unit]().future
94+
95+
expect(tracker.trackThread(thread) == thread)
96+
expect(tracker.trackFuture(future) == future)
97+
}
98+
}

modules/integration/src/test/scala/scala/cli/integration/ScalaCliSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,24 @@ package scala.cli.integration
22

33
import java.util.concurrent.TimeUnit
44

5+
import scala.concurrent.Future
56
import scala.concurrent.duration.{Duration, FiniteDuration}
67
import scala.util.Properties
78

89
abstract class ScalaCliSuite extends munit.FunSuite {
10+
given scalaCliSuite: ScalaCliSuite = this
11+
12+
private val resourceTracker = new ResourceTracker
13+
14+
private[integration] def trackSubprocess[P <: os.SubProcess](proc: P): P =
15+
resourceTracker.trackSubprocess(proc)
16+
17+
private[integration] def trackThread[T <: Thread](thread: T): T =
18+
resourceTracker.trackThread(thread)
19+
20+
private[integration] def trackFuture[F <: Future[?]](future: F): F =
21+
resourceTracker.trackFuture(future)
22+
923
implicit class BeforeEachOpts(munitContext: BeforeEach) {
1024
def locationAbsolutePath: os.Path = os.Path(munitContext.test.location.path)
1125
}
@@ -17,13 +31,17 @@ abstract class ScalaCliSuite extends munit.FunSuite {
1731
def apply(): Unit = ()
1832

1933
override def beforeEach(context: BeforeEach): Unit = {
34+
resourceTracker.clear()
2035
val fileName = context.locationAbsolutePath.baseName
2136
System.err.println(
2237
s">==== ${Console.CYAN}Running '${context.test.name}' from $fileName${Console.RESET}"
2338
)
2439
}
2540

2641
override def afterEach(context: AfterEach): Unit = {
42+
resourceTracker.drain()
43+
System.out.flush()
44+
System.err.flush()
2745
val fileName = context.locationAbsolutePath.baseName
2846
System.err.println(
2947
s"X==== ${Console.CYAN}Finishing '${context.test.name}' from $fileName${Console.RESET}"

modules/integration/src/test/scala/scala/cli/integration/TestUtil.scala

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -199,33 +199,35 @@ object TestUtil {
199199
user: String,
200200
password: String,
201201
realm: String
202-
)(f: (String, Int) => T): T = {
202+
)(f: (String, Int) => T)(using suite: ScalaCliSuite): T = {
203203
val host = "127.0.0.1"
204204
val port = {
205205
val s = new ServerSocket(0)
206206
try s.getLocalPort()
207207
finally s.close()
208208
}
209-
val proc = os.proc(
210-
cs,
211-
"launch",
212-
"io.get-coursier:http-server_2.12:1.0.1",
213-
"--",
214-
"--user",
215-
user,
216-
"--password",
217-
password,
218-
"--realm",
219-
realm,
220-
"--directory",
221-
".",
222-
"--host",
223-
host,
224-
"--port",
225-
port,
226-
"-v"
209+
val proc = suite.trackSubprocess(
210+
os.proc(
211+
cs,
212+
"launch",
213+
"io.get-coursier:http-server_2.12:1.0.1",
214+
"--",
215+
"--user",
216+
user,
217+
"--password",
218+
password,
219+
"--realm",
220+
realm,
221+
"--directory",
222+
".",
223+
"--host",
224+
host,
225+
"--port",
226+
port,
227+
"-v"
228+
)
229+
.spawn(cwd = path, mergeErrIntoOut = true)
227230
)
228-
.spawn(cwd = path, mergeErrIntoOut = true)
229231
try {
230232

231233
// a timeout around this would be great…
@@ -245,19 +247,21 @@ object TestUtil {
245247
System.err.println(s"Waiting $waitFor")
246248
Thread.sleep(waitFor.toMillis)
247249

248-
val t = new Thread("test-http-server-output") {
249-
setDaemon(true)
250-
override def run(): Unit = {
251-
var line = ""
252-
while (
253-
proc.isAlive() && {
254-
line = proc.stdout.readLine()
255-
line != null
256-
}
257-
)
258-
System.err.println(line)
250+
val t = suite.trackThread(
251+
new Thread("test-http-server-output") {
252+
setDaemon(true)
253+
override def run(): Unit = {
254+
var line = ""
255+
while (
256+
proc.isAlive() && {
257+
line = proc.stdout.readLine()
258+
line != null
259+
}
260+
)
261+
System.err.println(line)
262+
}
259263
}
260-
}
264+
)
261265
t.start()
262266
f(host, port)
263267
}
@@ -376,7 +380,8 @@ object TestUtil {
376380
threadName: String = UUID.randomUUID().toString,
377381
poolSize: Int = 2,
378382
timeout: Duration = 90.seconds
379-
)(f: (os.SubProcess, Duration, ExecutionContext) => Unit): Unit =
383+
)(f: (os.SubProcess, Duration, ExecutionContext) => Unit)(using suite: ScalaCliSuite): Unit =
384+
suite.trackSubprocess(proc)
380385
try withThreadPool(threadName, poolSize) { pool =>
381386
f(proc, timeout, ExecutionContext.fromExecutorService(pool))
382387
}

0 commit comments

Comments
 (0)