@@ -14,21 +14,59 @@ import kotlin.time.Duration
1414/* *
1515 * Expermiental multiplatform coroutine-based bouncer: safe way to call some suspend code
1616 * after a timeout. It is thread-safe (where multithreaded) and coroutine-safe.
17+ *
18+ * Note that creating a bouncer will not invoke its callback until the corresponding pulse call.
19+ *
20+ * @param initialTimeout default timeout for [pulse], could be changed at runtime assigning to [timeout].
21+ * @param initialMaxTimeout maximum timeout between calls, systemm will invoke callback when it expires even if there
22+ * will be [pulse] calls in between. Could be changed with [maxTimeout]
23+ * @param callback what to invoke.
1724 */
18- class AsyncBouncer (var timeout : Duration , callback : suspend () -> Unit ) {
25+ class AsyncBouncer (
26+ initialTimeout : Duration ,
27+ initialMaxTimeout : Duration = initialTimeout,
28+ callback : suspend () -> Unit ,
29+ ) {
1930
31+ private var lastCallAt: Instant ? = null
2032 private var callAt: Instant ? = null
2133 private val access = Mutex ()
2234 private val pulseChannel = Channel <Int >(0 , onBufferOverflow = BufferOverflow .DROP_OLDEST )
2335
36+ /* *
37+ * Default time between call to [pulse] and invocation. Assigning calls [pulse].
38+ */
39+ var timeout: Duration = initialTimeout
40+ set(value) {
41+ field = value
42+ pulse()
43+ }
44+
45+ /* *
46+ * Maximim time between invocations: even if [pulse] is being called more often, invocations will happen at this
47+ * rate. Assigning it calls [pulse]
48+ */
49+ var maxTimeout: Duration = initialMaxTimeout
50+ set(value) {
51+ field = value
52+ pulse()
53+ }
54+
2455 /* *
2556 * Cause a block to be scheduled after [timeout] or right now from now even it is already being executing.
57+ * To change effective [timeout], assign a value to it _prior to call pulse_.
2658 */
27- suspend fun pulse (now : Boolean = false) {
28- access.withReentrantLock {
29- checkNotClosed()
30- callAt = if (now) Now () else Now () + timeout
31- pulseChannel.send(1 )
59+ fun pulse (now : Boolean = false) {
60+ checkNotClosed()
61+ globalLaunch {
62+ access.withReentrantLock {
63+ callAt = if (now) Now () else Now () + timeout
64+ lastCallAt?.let {
65+ val limitTime = it + maxTimeout
66+ if (callAt!! > limitTime) callAt = limitTime
67+ }
68+ pulseChannel.send(1 )
69+ }
3270 }
3371 }
3472
@@ -89,8 +127,8 @@ class AsyncBouncer(var timeout: Duration, callback: suspend () -> Unit) {
89127 try {
90128 callback()
91129 callAt = null
92- }
93- catch (t: Throwable ) {
130+ lastCallAt = Now ()
131+ } catch (t: Throwable ) {
94132 // we can't use logging here as logger uses us ;)
95133 println (" unexpected error in AsyncBouncer: $t " )
96134 t.printStackTrace()
0 commit comments