@@ -57,7 +57,8 @@ class HandshakeRestartHandler(
5757 val knownIds = tunnelConfigs.map { it.id }.toSet()
5858
5959 // Cancel jobs for tunnels that left activeTunnels
60- // Skip tunnels we're actively restarting (they leave temporarily during stop/start)
60+ // Skip tunnels we're actively restarting (they leave temporarily during
61+ // stop/start)
6162 // BUT always cancel if the tunnel was deleted from DB
6263 (jobs.keys - activeIds).forEach { id ->
6364 if (restarting[id] != true || id !in knownIds) {
@@ -72,18 +73,15 @@ class HandshakeRestartHandler(
7273 activeIds.forEach { id ->
7374 if (jobs.containsKey(id)) return @forEach
7475 val config = tunnelConfigs.find { it.id == id } ? : return @forEach
75- jobs[id] = applicationScope.launch(ioDispatcher) {
76- runRestartLoop(id, config)
77- }
76+ jobs[id] =
77+ applicationScope.launch(ioDispatcher) { runRestartLoop(id, config) }
7878 }
7979 }
8080 }
8181 }
8282 }
8383
84- /* *
85- * Top-level loop for a single tunnel. Restarts whenever monitoring settings change.
86- */
84+ /* * Top-level loop for a single tunnel. Restarts whenever monitoring settings change. */
8785 private suspend fun runRestartLoop (tunnelId : Int , config : TunnelConfig ) {
8886 monitoringSettingsRepository.flow.collectLatest { settings ->
8987 if (! settings.isAutoRestartActive()) {
@@ -102,8 +100,8 @@ class HandshakeRestartHandler(
102100 }
103101
104102 /* *
105- * Monitoring state machine for one tunnel with fixed settings snapshot.
106- * Exits when max attempts are reached or the coroutine is cancelled.
103+ * Monitoring state machine for one tunnel with fixed settings snapshot. Exits when max attempts
104+ * are reached or the coroutine is cancelled.
107105 */
108106 private suspend fun monitorTunnel (
109107 tunnelId : Int ,
@@ -124,7 +122,9 @@ class HandshakeRestartHandler(
124122 attempt++
125123
126124 if (attempt > maxAttempts) {
127- Timber .d(" HandshakeRestartHandler: tunnel $tunnelId gave up after $maxAttempts attempts" )
125+ Timber .d(
126+ " HandshakeRestartHandler: tunnel $tunnelId gave up after $maxAttempts attempts"
127+ )
128128 updateProgress(
129129 tunnelId,
130130 TunnelRestartProgress (
@@ -165,7 +165,9 @@ class HandshakeRestartHandler(
165165 }
166166
167167 // RESTARTING
168- Timber .d(" HandshakeRestartHandler: restarting tunnel $tunnelId (attempt $attempt /$maxAttempts )" )
168+ Timber .d(
169+ " HandshakeRestartHandler: restarting tunnel $tunnelId (attempt $attempt /$maxAttempts )"
170+ )
169171 updateProgress(
170172 tunnelId,
171173 TunnelRestartProgress (
@@ -179,13 +181,17 @@ class HandshakeRestartHandler(
179181 )
180182 mutex.withLock { restarting[tunnelId] = true }
181183 runCatching { stopTunnel(tunnelId) }
182- .onFailure { Timber .e(it, " HandshakeRestartHandler: stop failed for tunnel $tunnelId " ) }
184+ .onFailure {
185+ Timber .e(it, " HandshakeRestartHandler: stop failed for tunnel $tunnelId " )
186+ }
183187 delay(RESTART_SETTLE_DELAY_MS )
184188
185189 // Abort if another tunnel took over while we were stopped (e.g. auto-tunnel switched)
186190 val currentActive = activeTunnels.value
187191 if (currentActive.isNotEmpty() && ! currentActive.containsKey(tunnelId)) {
188- Timber .d(" HandshakeRestartHandler: tunnel $tunnelId superseded by another tunnel, aborting restart" )
192+ Timber .d(
193+ " HandshakeRestartHandler: tunnel $tunnelId superseded by another tunnel, aborting restart"
194+ )
189195 mutex.withLock { restarting[tunnelId] = false }
190196 updateProgress(tunnelId, null )
191197 return
@@ -194,12 +200,15 @@ class HandshakeRestartHandler(
194200 // Fetch fresh config in case tunnel was modified since handler started
195201 val freshConfig = tunnelsRepository.getById(tunnelId) ? : config
196202 runCatching { startTunnel(freshConfig) }
197- .onFailure { Timber .e(it, " HandshakeRestartHandler: start failed for tunnel $tunnelId " ) }
203+ .onFailure {
204+ Timber .e(it, " HandshakeRestartHandler: start failed for tunnel $tunnelId " )
205+ }
198206
199207 // Wait for tunnel to come back UP (30s safety timeout)
200- val cameUp = withTimeoutOrNull(TUNNEL_UP_TIMEOUT_MS ) {
201- activeTunnels.mapNotNull { it[tunnelId] }.first { it.status is TunnelStatus .Up }
202- }
208+ val cameUp =
209+ withTimeoutOrNull(TUNNEL_UP_TIMEOUT_MS ) {
210+ activeTunnels.mapNotNull { it[tunnelId] }.first { it.status is TunnelStatus .Up }
211+ }
203212 mutex.withLock { restarting[tunnelId] = false }
204213 totalRestarts++
205214
@@ -224,31 +233,38 @@ class HandshakeRestartHandler(
224233 ),
225234 )
226235
227- val timeout = settings.tunnelPingTimeoutSeconds?.toMillis()
228- ? : (settings.tunnelPingAttempts * 2000L )
236+ val timeout =
237+ settings.tunnelPingTimeoutSeconds?.toMillis()
238+ ? : (settings.tunnelPingAttempts * 2000L )
229239 val pingResult = runCatching {
230240 networkUtils.pingWithStats(pingTarget, settings.tunnelPingAttempts, timeout)
231241 }
232242 val recovered = pingResult.getOrNull()?.isReachable == true
233243
234244 if (recovered) {
235- Timber .d(" HandshakeRestartHandler: tunnel $tunnelId recovered after attempt $attempt " )
245+ Timber .d(
246+ " HandshakeRestartHandler: tunnel $tunnelId recovered after attempt $attempt "
247+ )
236248 emitMessage(config.name, BackendMessage .ConnectionRestored )
237249 // Reset attempt counter but keep totalRestarts visible
238250 attempt = 0
239- updateProgress(
240- tunnelId,
241- TunnelRestartProgress (totalRestarts = totalRestarts),
242- )
251+ updateProgress(tunnelId, TunnelRestartProgress (totalRestarts = totalRestarts))
243252 pingTarget = awaitPingFailures(tunnelId, settings)
244253 continue
245254 }
246255
247256 // COOLDOWN — only if there are remaining attempts
248257 if (attempt < maxAttempts) {
249- val cooldownMs = computeCooldown(settings.restartCooldownSeconds, attempt, settings.isBackoffEnabled) * 1000L
258+ val cooldownMs =
259+ computeCooldown(
260+ settings.restartCooldownSeconds,
261+ attempt,
262+ settings.isBackoffEnabled,
263+ ) * 1000L
250264 val cooldownEnd = System .currentTimeMillis() + cooldownMs
251- Timber .d(" HandshakeRestartHandler: cooldown ${cooldownMs / 1000 } s before next restart for tunnel $tunnelId " )
265+ Timber .d(
266+ " HandshakeRestartHandler: cooldown ${cooldownMs / 1000 } s before next restart for tunnel $tunnelId "
267+ )
252268 updateProgress(
253269 tunnelId,
254270 TunnelRestartProgress (
@@ -264,22 +280,28 @@ class HandshakeRestartHandler(
264280 // If cooldown is longer than ping interval, race against periodic ping recovery
265281 val pingIntervalMs = settings.tunnelPingIntervalSeconds.toMillis()
266282 if (cooldownMs > pingIntervalMs) {
267- val recoveredDuringCooldown = withTimeoutOrNull(cooldownMs) {
268- activeTunnels
269- .mapNotNull { it[tunnelId]?.pingStates }
270- .distinctUntilChanged()
271- .first { pingStates ->
272- pingStates.values.isNotEmpty() &&
273- pingStates.values.all { it.isReachable }
274- }
275- true
276- } != null
283+ val recoveredDuringCooldown =
284+ withTimeoutOrNull(cooldownMs) {
285+ activeTunnels
286+ .mapNotNull { it[tunnelId]?.pingStates }
287+ .distinctUntilChanged()
288+ .first { pingStates ->
289+ pingStates.values.isNotEmpty() &&
290+ pingStates.values.all { it.isReachable }
291+ }
292+ true
293+ } != null
277294
278295 if (recoveredDuringCooldown) {
279- Timber .d(" HandshakeRestartHandler: tunnel $tunnelId recovered during cooldown" )
296+ Timber .d(
297+ " HandshakeRestartHandler: tunnel $tunnelId recovered during cooldown"
298+ )
280299 emitMessage(config.name, BackendMessage .ConnectionRestored )
281300 attempt = 0
282- updateProgress(tunnelId, TunnelRestartProgress (totalRestarts = totalRestarts))
301+ updateProgress(
302+ tunnelId,
303+ TunnelRestartProgress (totalRestarts = totalRestarts),
304+ )
283305 pingTarget = awaitPingFailures(tunnelId, settings)
284306 continue
285307 }
@@ -292,26 +314,24 @@ class HandshakeRestartHandler(
292314 }
293315
294316 /* *
295- * Suspends until [MonitoringSettings.pingFailuresBeforeRestart] consecutive ping cycles
296- * all report unreachable. Returns the ping target of the failing peer.
317+ * Suspends until [MonitoringSettings.pingFailuresBeforeRestart] consecutive ping cycles all
318+ * report unreachable. Returns the ping target of the failing peer.
297319 */
298- private suspend fun awaitPingFailures (
299- tunnelId : Int ,
300- settings : MonitoringSettings ,
301- ): String {
320+ private suspend fun awaitPingFailures (tunnelId : Int , settings : MonitoringSettings ): String {
302321 var consecutive = 0
303322 var target = CLOUDFLARE_IPV4_IP
304323
305324 activeTunnels
306325 .mapNotNull { it[tunnelId]?.pingStates }
307326 .distinctUntilChanged()
308327 .first { pingStates ->
309- val allFailing = pingStates.values.isNotEmpty() &&
310- pingStates.values.all { ! it.isReachable }
328+ val allFailing =
329+ pingStates.values.isNotEmpty() && pingStates.values. all { ! it.isReachable }
311330 if (allFailing) {
312- pingStates.values.firstOrNull { ! it.isReachable }?.pingTarget?.let {
313- target = it
314- }
331+ pingStates.values
332+ .firstOrNull { ! it.isReachable }
333+ ?.pingTarget
334+ ?.let { target = it }
315335 consecutive++
316336 } else {
317337 consecutive = 0
@@ -326,7 +346,8 @@ class HandshakeRestartHandler(
326346 isRestartOnHandshakeTimeoutEnabled && isPingEnabled
327347
328348 private fun computeCooldown (baseSec : Int , attempt : Int , isBackoff : Boolean ): Long =
329- if (isBackoff) baseSec.toLong() * (1L shl (attempt - 1 ).coerceAtMost(MAX_BACKOFF_SHIFT )) else baseSec.toLong()
349+ if (isBackoff) baseSec.toLong() * (1L shl (attempt - 1 ).coerceAtMost(MAX_BACKOFF_SHIFT ))
350+ else baseSec.toLong()
330351
331352 companion object {
332353 private const val TUNNEL_UP_TIMEOUT_MS = 30_000L
0 commit comments