-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathPersistentWorker.kt
More file actions
125 lines (119 loc) · 3.97 KB
/
PersistentWorker.kt
File metadata and controls
125 lines (119 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.rules.android.lint.worker
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.schedulers.Schedulers
import java.io.BufferedOutputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.PrintStream
internal class PersistentWorker(
/**
* WorkerIO instance wrapping the standard output streams
*/
private val workerIO: WorkerIO,
/**
* Rxjava Scheduler to execute work requests on.
*/
private val scheduler: Scheduler,
/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val persistentWorkerCpuTimeBasedGcScheduler: PersistentWorkerCpuTimeBasedGcScheduler,
/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val workRequestProcessor: Worker.WorkerMessageProcessor,
/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val workerWorkRequestCallback: Worker.WorkRequestCallback,
) : Worker {
constructor(
workerMessageProcessor: Worker.WorkRequestCallback,
) : this(
workerIO = WorkerIO(),
scheduler = Schedulers.io(),
persistentWorkerCpuTimeBasedGcScheduler = PersistentWorkerCpuTimeBasedGcScheduler(),
workRequestProcessor =
WorkerJsonMessageProcessor(
System.`in`,
System.out,
),
workerWorkRequestCallback = workerMessageProcessor,
)
/**
* Initiate the worker and begin processing work requests
*/
override fun processRequests(): Int {
return workerIO.use { io ->
// Start by redirecting the system streams so that nothing
// corrupts the streams that the worker uses
io.redirectSystemStreams()
// Process requests as they come in using RxJava
Flowable
.create(
{ emitter ->
while (!emitter.isCancelled) {
try {
val request: WorkRequest = workRequestProcessor.readWorkRequest()
emitter.onNext(request)
} catch (e: IOException) {
emitter.onError(e)
}
}
},
BackpressureStrategy.BUFFER,
).subscribeOn(scheduler)
.parallel()
.runOn(scheduler)
// Execute the work and map the result to a work response
.map { request -> return@map this.respondToRequest(request) }
// Run the garbage collector periodically so that we are a good responsible worker
.doOnNext { persistentWorkerCpuTimeBasedGcScheduler.maybePerformGc() }
.doOnError { it.printStackTrace() }
.sequential()
.observeOn(scheduler)
.blockingSubscribe { response ->
workRequestProcessor.writeWorkResponse(response)
}
return@use 0
}
}
private fun respondToRequest(request: WorkRequest): WorkResponse {
ByteArrayOutputStream().use { baos ->
// Create a print stream that the execution can write logs to
val printStream = PrintStream(BufferedOutputStream(ByteArrayOutputStream()))
var exitCode: Int
try {
// Sanity check the work request arguments
val arguments =
requireNotNull(request.arguments) {
"Request with id ${request.requestId} " +
"does not have arguments!"
}
require(arguments.isNotEmpty()) {
"Request with id ${request.requestId} " +
"does not have arguments!"
}
exitCode = workerWorkRequestCallback.processWorkRequest(arguments, printStream)
} catch (e: Exception) {
e.printStackTrace(printStream)
exitCode = 1
} finally {
printStream.flush()
}
val output =
arrayOf(baos.toString())
.asSequence()
.map { it.trim() }
.filter { it.isNotEmpty() }
.joinToString("\n")
return WorkResponse(
exitCode = exitCode,
output = output,
requestId = request.requestId,
)
}
}
}