Skip to content

Commit 944b549

Browse files
SessionHero01CopilotThomasSession
committed
Fix batch API executor (#2003)
* Fixes batch api executor * More fixes * Batch main loop hardening * Fix incorrect use of withTimeoutOrNull * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: ThomasSession <thomas.r@getsession.org>
1 parent e3466c5 commit 944b549

2 files changed

Lines changed: 90 additions & 60 deletions

File tree

app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt

Lines changed: 84 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@ import kotlinx.coroutines.CancellationException
44
import kotlinx.coroutines.CoroutineScope
55
import kotlinx.coroutines.ExperimentalCoroutinesApi
66
import kotlinx.coroutines.channels.Channel
7+
import kotlinx.coroutines.channels.ClosedReceiveChannelException
78
import kotlinx.coroutines.channels.SendChannel
9+
import kotlinx.coroutines.delay
10+
import kotlinx.coroutines.isActive
811
import kotlinx.coroutines.launch
9-
import kotlinx.coroutines.withTimeoutOrNull
12+
import kotlinx.coroutines.selects.onTimeout
13+
import kotlinx.coroutines.selects.select
1014
import org.session.libsignal.utilities.Log
1115
import org.thoughtcrime.securesms.api.ApiExecutor
1216
import org.thoughtcrime.securesms.api.ApiExecutorContext
13-
import java.time.Instant
17+
import kotlin.time.ComparableTimeMark
18+
import kotlin.time.Duration.Companion.milliseconds
19+
import kotlin.time.TimeSource
1420

1521
/**
1622
* An [ApiExecutor] that batches requests together based on a [Batcher.batchKey].
@@ -37,73 +43,87 @@ class BatchApiExecutor<Req, Res, T>(
3743
batchCommandSender = channel
3844

3945
scope.launch {
46+
val timeSource: TimeSource.WithComparableMarks = TimeSource.Monotonic
47+
4048
val pendingRequests = linkedMapOf<Any, BatchInfo<Req, Res>>()
41-
var nextDeadline: Instant? = null
42-
43-
while (true) {
44-
val command: BatchCommand<Req, Res>? = if (nextDeadline == null) {
45-
channel.receive()
46-
} else {
47-
val now = Instant.now()
48-
if (nextDeadline > now) {
49-
withTimeoutOrNull(nextDeadline.toEpochMilli() - now.toEpochMilli()) {
50-
channel.receive()
51-
}
49+
var nextDeadline: ComparableTimeMark? = null
50+
51+
while (isActive) {
52+
try {
53+
val command: BatchCommand<Req, Res>? = if (nextDeadline == null) {
54+
channel.receive()
5255
} else {
53-
// Deadline already reached
54-
null
56+
val now = timeSource.markNow()
57+
if (nextDeadline > now) {
58+
select {
59+
channel.onReceive { it }
60+
onTimeout(nextDeadline - now) { null }
61+
}
62+
} else {
63+
// Deadline already reached
64+
null
65+
}
5566
}
56-
}
5767

58-
var calculateNextDeadline = false
68+
var calculateNextDeadline = false
5969

60-
when (command) {
61-
is BatchCommand.Send<Req, Res> -> {
62-
val existingBatch = pendingRequests[command.batchKey]
63-
if (existingBatch == null) {
64-
pendingRequests[command.batchKey] = BatchInfo(
65-
requests = arrayListOf(command),
66-
deadline = Instant.now().plusMillis(100L),
67-
)
70+
when (command) {
71+
is BatchCommand.Send<Req, Res> -> {
72+
val existingBatch = pendingRequests[command.batchKey]
73+
if (existingBatch == null) {
74+
pendingRequests[command.batchKey] = BatchInfo(
75+
requests = arrayListOf(command),
76+
deadline = timeSource.markNow() + 100.milliseconds
77+
)
6878

69-
calculateNextDeadline = true
70-
} else {
71-
existingBatch.requests.add(command)
79+
calculateNextDeadline = true
80+
} else {
81+
existingBatch.requests.add(command)
82+
}
7283
}
73-
}
7484

75-
is BatchCommand.Cancel<Req, Res> -> {
76-
val existingBatch = pendingRequests[command.batchKey]
77-
if (existingBatch != null) {
78-
existingBatch.requests.removeIf { it.req == command.req }
79-
if (existingBatch.requests.isEmpty()) {
80-
pendingRequests.remove(command.batchKey)
81-
calculateNextDeadline = true
85+
is BatchCommand.Cancel<Req, Res> -> {
86+
val existingBatch = pendingRequests[command.batchKey]
87+
if (existingBatch != null) {
88+
existingBatch.requests.removeIf { it.req == command.req }
89+
if (existingBatch.requests.isEmpty()) {
90+
pendingRequests.remove(command.batchKey)
91+
calculateNextDeadline = true
92+
}
8293
}
8394
}
84-
}
8595

86-
null -> {
87-
// Deadline reached: it will be the first batch in the queue
88-
executeBatch(pendingRequests.removeFirst())
89-
calculateNextDeadline = true
96+
null -> {
97+
// Deadline reached: it will be the first batch in the queue
98+
executeBatch(pendingRequests.removeFirst())
99+
calculateNextDeadline = true
100+
}
90101
}
91-
}
92102

93-
if (calculateNextDeadline) {
94-
nextDeadline = if (pendingRequests.isEmpty()) {
95-
null
96-
} else {
97-
pendingRequests.values.first().deadline
103+
if (calculateNextDeadline) {
104+
nextDeadline = if (pendingRequests.isEmpty()) {
105+
null
106+
} else {
107+
pendingRequests.values.first().deadline
108+
}
98109
}
110+
} catch (e: CancellationException) {
111+
Log.i(TAG, "Main loop cancelled")
112+
throw e
113+
} catch (e: ClosedReceiveChannelException) {
114+
Log.e(TAG, "Channel no longer open. Stopping batch handling", e)
115+
break
116+
} catch (e: Throwable) {
117+
Log.e(TAG, "Error in main loop. Retrying in 1s", e)
118+
delay(1000)
99119
}
100120
}
101121
}
102122
}
103123

104124
private class BatchInfo<Req, Res>(
105125
val requests: ArrayList<BatchCommand.Send<Req, Res>>,
106-
val deadline: Instant,
126+
val deadline: ComparableTimeMark,
107127
) {
108128
init {
109129
check(requests.isNotEmpty()) {
@@ -123,8 +143,11 @@ class BatchApiExecutor<Req, Res, T>(
123143
}
124144

125145
if (transformed.isFailure) {
146+
Log.d(TAG, "Transformation of request failed", transformed.exceptionOrNull())
126147
// Notify individual request of failure
127-
r.callback.send(Result.failure(transformed.exceptionOrNull()!!))
148+
if (!r.callback.trySend(Result.failure(transformed.exceptionOrNull()!!)).isSuccess) {
149+
Log.w(TAG, "Unable to send transform result back")
150+
}
128151
continue
129152
}
130153

@@ -150,22 +173,27 @@ class BatchApiExecutor<Req, Res, T>(
150173
response = resp
151174
)
152175

153-
check(responses.size == batch.requests.size) {
154-
"Batch response size ${responses.size} does not match request size ${batch.requests.size}"
176+
check(responses.size == requestsToSend.size) {
177+
"Batch response size ${responses.size} does not match request size ${requestsToSend.size}"
155178
}
156179

157-
for (i in batch.requests.indices) {
158-
val request = batch.requests[i]
180+
for (i in requestsToSend.indices) {
181+
val (request, _) = requestsToSend[i]
159182
val response = responses[i]
160-
request.callback.send(response)
183+
184+
if (!request.callback.trySend(response).isSuccess) {
185+
Log.w(TAG, "Unable to send result back to individual request")
186+
}
161187
}
162188

163189
} catch (e: Throwable) {
164190
if (e is CancellationException) throw e
165191

166192
// Notify all requests of the failure
167193
for (request in requestsToSend) {
168-
request.first.callback.send(Result.failure(e))
194+
if (!request.first.callback.trySend(Result.failure(e)).isSuccess) {
195+
Log.w(TAG, "Unable to send failure back to individual request")
196+
}
169197
}
170198
}
171199
}
@@ -182,7 +210,7 @@ class BatchApiExecutor<Req, Res, T>(
182210
val batchKey = batcher.batchKey(req)
183211
?: return actualExecutor.send(ctx, req)
184212

185-
val callback = Channel<Result<*>>(1)
213+
val callback = Channel<Result<*>>(capacity = 1)
186214
batchCommandSender.send(BatchCommand.Send(
187215
ctx = ctx,
188216
batchKey = batchKey,

app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import kotlinx.coroutines.flow.MutableSharedFlow
1111
import kotlinx.coroutines.flow.first
1212
import kotlinx.coroutines.launch
1313
import kotlinx.coroutines.runBlocking
14+
1415
import kotlinx.coroutines.withTimeoutOrNull
1516
import org.session.libsignal.utilities.Log.Logger
1617
import org.thoughtcrime.securesms.dependencies.ManagerScope
1718
import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent
1819
import java.io.File
20+
1921
import java.text.SimpleDateFormat
2022
import java.util.Locale
2123
import java.util.regex.Pattern
@@ -25,6 +27,7 @@ import javax.inject.Inject
2527
import javax.inject.Singleton
2628
import kotlin.time.Duration.Companion.milliseconds
2729

30+
2831
/**
2932
* A [Logger] that writes logs to encrypted files in the app's cache directory.
3033
*/
@@ -159,10 +162,9 @@ class PersistentLogger @Inject constructor(
159162
private suspend fun ReceiveChannel<LogEntry>.receiveBulkLogs(out: MutableList<LogEntry>) {
160163
out += receive()
161164

162-
withTimeoutOrNull(500.milliseconds) {
163-
repeat(15) {
164-
out += receiveCatching().getOrNull() ?: return@repeat
165-
}
165+
// We may have many items cached in the channel, try to receive up to 15 items
166+
repeat(15) {
167+
out += tryReceive().getOrNull() ?: return@repeat
166168
}
167169
}
168170

0 commit comments

Comments
 (0)