@@ -4,13 +4,19 @@ import kotlinx.coroutines.CancellationException
44import kotlinx.coroutines.CoroutineScope
55import kotlinx.coroutines.ExperimentalCoroutinesApi
66import kotlinx.coroutines.channels.Channel
7+ import kotlinx.coroutines.channels.ClosedReceiveChannelException
78import kotlinx.coroutines.channels.SendChannel
9+ import kotlinx.coroutines.delay
10+ import kotlinx.coroutines.isActive
811import kotlinx.coroutines.launch
9- import kotlinx.coroutines.withTimeoutOrNull
12+ import kotlinx.coroutines.selects.onTimeout
13+ import kotlinx.coroutines.selects.select
1014import org.session.libsignal.utilities.Log
1115import org.thoughtcrime.securesms.api.ApiExecutor
1216import org.thoughtcrime.securesms.api.ApiExecutorContext
13- import java.time.Instant
17+ import kotlin.time.ComparableTimeMark
18+ import kotlin.time.Duration.Companion.milliseconds
19+ import kotlin.time.TimeSource
1420
1521/* *
1622 * An [ApiExecutor] that batches requests together based on a [Batcher.batchKey].
@@ -37,73 +43,87 @@ class BatchApiExecutor<Req, Res, T>(
3743 batchCommandSender = channel
3844
3945 scope.launch {
46+ val timeSource: TimeSource .WithComparableMarks = TimeSource .Monotonic
47+
4048 val pendingRequests = linkedMapOf<Any , BatchInfo <Req , Res >>()
41- var nextDeadline: Instant ? = null
42-
43- while (true ) {
44- val command: BatchCommand <Req , Res >? = if (nextDeadline == null ) {
45- channel.receive()
46- } else {
47- val now = Instant .now()
48- if (nextDeadline > now) {
49- withTimeoutOrNull(nextDeadline.toEpochMilli() - now.toEpochMilli()) {
50- channel.receive()
51- }
49+ var nextDeadline: ComparableTimeMark ? = null
50+
51+ while (isActive) {
52+ try {
53+ val command: BatchCommand <Req , Res >? = if (nextDeadline == null ) {
54+ channel.receive()
5255 } else {
53- // Deadline already reached
54- null
56+ val now = timeSource.markNow()
57+ if (nextDeadline > now) {
58+ select {
59+ channel.onReceive { it }
60+ onTimeout(nextDeadline - now) { null }
61+ }
62+ } else {
63+ // Deadline already reached
64+ null
65+ }
5566 }
56- }
5767
58- var calculateNextDeadline = false
68+ var calculateNextDeadline = false
5969
60- when (command) {
61- is BatchCommand .Send <Req , Res > -> {
62- val existingBatch = pendingRequests[command.batchKey]
63- if (existingBatch == null ) {
64- pendingRequests[command.batchKey] = BatchInfo (
65- requests = arrayListOf (command),
66- deadline = Instant .now().plusMillis( 100L ),
67- )
70+ when (command) {
71+ is BatchCommand .Send <Req , Res > -> {
72+ val existingBatch = pendingRequests[command.batchKey]
73+ if (existingBatch == null ) {
74+ pendingRequests[command.batchKey] = BatchInfo (
75+ requests = arrayListOf (command),
76+ deadline = timeSource.markNow() + 100 .milliseconds
77+ )
6878
69- calculateNextDeadline = true
70- } else {
71- existingBatch.requests.add(command)
79+ calculateNextDeadline = true
80+ } else {
81+ existingBatch.requests.add(command)
82+ }
7283 }
73- }
7484
75- is BatchCommand .Cancel <Req , Res > -> {
76- val existingBatch = pendingRequests[command.batchKey]
77- if (existingBatch != null ) {
78- existingBatch.requests.removeIf { it.req == command.req }
79- if (existingBatch.requests.isEmpty()) {
80- pendingRequests.remove(command.batchKey)
81- calculateNextDeadline = true
85+ is BatchCommand .Cancel <Req , Res > -> {
86+ val existingBatch = pendingRequests[command.batchKey]
87+ if (existingBatch != null ) {
88+ existingBatch.requests.removeIf { it.req == command.req }
89+ if (existingBatch.requests.isEmpty()) {
90+ pendingRequests.remove(command.batchKey)
91+ calculateNextDeadline = true
92+ }
8293 }
8394 }
84- }
8595
86- null -> {
87- // Deadline reached: it will be the first batch in the queue
88- executeBatch(pendingRequests.removeFirst())
89- calculateNextDeadline = true
96+ null -> {
97+ // Deadline reached: it will be the first batch in the queue
98+ executeBatch(pendingRequests.removeFirst())
99+ calculateNextDeadline = true
100+ }
90101 }
91- }
92102
93- if (calculateNextDeadline) {
94- nextDeadline = if (pendingRequests.isEmpty()) {
95- null
96- } else {
97- pendingRequests.values.first().deadline
103+ if (calculateNextDeadline) {
104+ nextDeadline = if (pendingRequests.isEmpty()) {
105+ null
106+ } else {
107+ pendingRequests.values.first().deadline
108+ }
98109 }
110+ } catch (e: CancellationException ) {
111+ Log .i(TAG , " Main loop cancelled" )
112+ throw e
113+ } catch (e: ClosedReceiveChannelException ) {
114+ Log .e(TAG , " Channel no longer open. Stopping batch handling" , e)
115+ break
116+ } catch (e: Throwable ) {
117+ Log .e(TAG , " Error in main loop. Retrying in 1s" , e)
118+ delay(1000 )
99119 }
100120 }
101121 }
102122 }
103123
104124 private class BatchInfo <Req , Res >(
105125 val requests : ArrayList <BatchCommand .Send <Req , Res >>,
106- val deadline : Instant ,
126+ val deadline : ComparableTimeMark ,
107127 ) {
108128 init {
109129 check(requests.isNotEmpty()) {
@@ -123,8 +143,11 @@ class BatchApiExecutor<Req, Res, T>(
123143 }
124144
125145 if (transformed.isFailure) {
146+ Log .d(TAG , " Transformation of request failed" , transformed.exceptionOrNull())
126147 // Notify individual request of failure
127- r.callback.send(Result .failure(transformed.exceptionOrNull()!! ))
148+ if (! r.callback.trySend(Result .failure(transformed.exceptionOrNull()!! )).isSuccess) {
149+ Log .w(TAG , " Unable to send transform result back" )
150+ }
128151 continue
129152 }
130153
@@ -150,22 +173,27 @@ class BatchApiExecutor<Req, Res, T>(
150173 response = resp
151174 )
152175
153- check(responses.size == batch.requests .size) {
154- " Batch response size ${responses.size} does not match request size ${batch.requests .size} "
176+ check(responses.size == requestsToSend .size) {
177+ " Batch response size ${responses.size} does not match request size ${requestsToSend .size} "
155178 }
156179
157- for (i in batch.requests .indices) {
158- val request = batch.requests [i]
180+ for (i in requestsToSend .indices) {
181+ val ( request, _) = requestsToSend [i]
159182 val response = responses[i]
160- request.callback.send(response)
183+
184+ if (! request.callback.trySend(response).isSuccess) {
185+ Log .w(TAG , " Unable to send result back to individual request" )
186+ }
161187 }
162188
163189 } catch (e: Throwable ) {
164190 if (e is CancellationException ) throw e
165191
166192 // Notify all requests of the failure
167193 for (request in requestsToSend) {
168- request.first.callback.send(Result .failure(e))
194+ if (! request.first.callback.trySend(Result .failure(e)).isSuccess) {
195+ Log .w(TAG , " Unable to send failure back to individual request" )
196+ }
169197 }
170198 }
171199 }
@@ -182,7 +210,7 @@ class BatchApiExecutor<Req, Res, T>(
182210 val batchKey = batcher.batchKey(req)
183211 ? : return actualExecutor.send(ctx, req)
184212
185- val callback = Channel <Result <* >>(1 )
213+ val callback = Channel <Result <* >>(capacity = 1 )
186214 batchCommandSender.send(BatchCommand .Send (
187215 ctx = ctx,
188216 batchKey = batchKey,
0 commit comments