Skip to content

Commit c7908d8

Browse files
authored
IGNITE-27685 Implement exponential backoff for durable finish (#7680)
1 parent cf34a31 commit c7908d8

27 files changed

Lines changed: 2339 additions & 128 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.Map;
5252
import java.util.Objects;
5353
import java.util.Optional;
54-
import java.util.concurrent.Callable;
5554
import java.util.concurrent.CancellationException;
5655
import java.util.concurrent.CompletableFuture;
5756
import java.util.concurrent.ConcurrentHashMap;
@@ -60,7 +59,6 @@
6059
import java.util.concurrent.Executor;
6160
import java.util.concurrent.ExecutorService;
6261
import java.util.concurrent.Future;
63-
import java.util.concurrent.ScheduledExecutorService;
6462
import java.util.concurrent.TimeUnit;
6563
import java.util.concurrent.TimeoutException;
6664
import java.util.concurrent.atomic.AtomicReference;
@@ -1341,35 +1339,6 @@ public static Object[] flatArray(Object... arguments) {
13411339
return list.toArray();
13421340
}
13431341

1344-
/**
1345-
* Schedules the provided operation to be retried after the specified delay.
1346-
*
1347-
* @param operation Operation.
1348-
* @param delay Delay.
1349-
* @param unit Time unit of the delay.
1350-
* @param executor Executor to schedule the retry in.
1351-
* @return Future that is completed when the operation is successful or failed with an exception.
1352-
*/
1353-
public static <T> CompletableFuture<T> scheduleRetry(
1354-
Callable<CompletableFuture<T>> operation,
1355-
long delay,
1356-
TimeUnit unit,
1357-
ScheduledExecutorService executor
1358-
) {
1359-
CompletableFuture<T> future = new CompletableFuture<>();
1360-
1361-
executor.schedule(() -> operation.call()
1362-
.whenComplete((res, e) -> {
1363-
if (e == null) {
1364-
future.complete(res);
1365-
} else {
1366-
future.completeExceptionally(e);
1367-
}
1368-
}), delay, unit);
1369-
1370-
return future;
1371-
}
1372-
13731342
private static CompletableFuture<Void> startAsync(ComponentContext componentContext, Stream<? extends IgniteComponent> components) {
13741343
return allOf(components
13751344
.filter(Objects::nonNull)
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util.retry;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
22+
/**
23+
* A {@link TimeoutStrategy} that increases retry timeouts exponentially on each attempt.
24+
*
25+
* <p>Each call to {@link #next(int)} multiplies the current timeout by {@code backoffCoefficient},
26+
* capping the result at {@link #maxTimeout}. Optionally, random jitter can be applied to spread
27+
* retry attempts across time and avoid thundering herd problems under high concurrency.
28+
*
29+
* <p>When jitter is enabled, the returned timeout is randomized within the range
30+
* {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout}.
31+
*
32+
* <p>This class is stateless and thread-safe. A single instance can be shared across
33+
* multiple retry contexts.
34+
*/
35+
// TODO: https://issues.apache.org/jira/browse/IGNITE-28481
36+
public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
37+
/**
38+
* Default backoff coefficient applied on each retry step. Doubles the timeout per attempt.
39+
*/
40+
private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
41+
42+
/**
43+
* Multiplier applied to the current timeout on each call to {@link #next(int)}.
44+
* Must be greater than {@code 1.0} to produce a growing sequence.
45+
*/
46+
private final double backoffCoefficient;
47+
48+
/**
49+
* Whether to apply random jitter to the computed timeout.
50+
* When {@code true}, the result is randomized within {@code [raw / 2, raw * 1.5]}.
51+
*/
52+
private final boolean jitter;
53+
54+
/**
55+
* Upper bound for the timeout produced by this strategy, in milliseconds.
56+
* The result of {@link #next(int)} is always capped at this value.
57+
*/
58+
private final int maxTimeout;
59+
60+
/**
61+
* Creates a strategy with default max timeout and backoff coefficient, and no jitter.
62+
*
63+
* @see TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX
64+
* @see #DEFAULT_BACKOFF_COEFFICIENT
65+
*/
66+
public ExponentialBackoffTimeoutStrategy() {
67+
this(DEFAULT_RETRY_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT);
68+
}
69+
70+
/**
71+
* Creates a strategy with the given max timeout and backoff coefficient, and no jitter.
72+
*
73+
* @param maxTimeout maximum timeout this strategy may produce, in milliseconds.
74+
* @param backoffCoefficient multiplier applied to the current timeout on each step.
75+
* Must be greater than {@code 1.0}.
76+
*/
77+
public ExponentialBackoffTimeoutStrategy(
78+
int maxTimeout,
79+
double backoffCoefficient
80+
) {
81+
this(maxTimeout, backoffCoefficient, false);
82+
}
83+
84+
/**
85+
* Creates a strategy with the given max timeout, backoff coefficient, and jitter setting.
86+
*
87+
* @param maxTimeout maximum timeout this strategy may produce, in milliseconds.
88+
* @param backoffCoefficient multiplier applied to the current timeout on each step.
89+
* Must be greater than {@code 1.0}.
90+
* @param jitter if {@code true}, random jitter is applied to each computed timeout.
91+
*/
92+
public ExponentialBackoffTimeoutStrategy(
93+
int maxTimeout,
94+
double backoffCoefficient,
95+
boolean jitter
96+
) {
97+
this.maxTimeout = maxTimeout;
98+
this.backoffCoefficient = backoffCoefficient;
99+
this.jitter = jitter;
100+
}
101+
102+
/**
103+
* Computes the next retry timeout by multiplying {@code currentTimeout} by
104+
* {@link #backoffCoefficient}, then capping at {@link #maxTimeout}.
105+
* If jitter is enabled, the result is further randomized.
106+
*
107+
* @param currentTimeout current retry timeout in milliseconds.
108+
* @return next retry timeout in milliseconds, capped at {@link #maxTimeout}.
109+
*/
110+
@Override
111+
public int next(int currentTimeout) {
112+
int jitteredTimeout = jitter ? applyJitter(currentTimeout) : currentTimeout;
113+
114+
return (int) Math.min((jitteredTimeout * backoffCoefficient), maxTimeout);
115+
}
116+
117+
/**
118+
* Applies random jitter to the given timeout value.
119+
*
120+
* <p>The result is uniformly distributed within {@code [raw / 2, raw * 1.5]},
121+
* then capped at {@link #maxTimeout} to ensure the strategy ceiling is never exceeded.
122+
*
123+
* @param raw computed timeout before jitter, in milliseconds.
124+
* @return jittered timeout in milliseconds, capped at {@link #maxTimeout}.
125+
*/
126+
private int applyJitter(int raw) {
127+
int lo = raw / 2;
128+
int hi = raw + lo;
129+
130+
return lo + ThreadLocalRandom.current().nextInt(hi - lo + 1);
131+
}
132+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util.retry;
19+
20+
import static java.util.Collections.unmodifiableMap;
21+
import static java.util.Optional.of;
22+
import static java.util.Optional.ofNullable;
23+
import static org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_TIMEOUT_MS_MAX;
24+
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import org.jetbrains.annotations.TestOnly;
30+
31+
/**
32+
* A retry context that tracks timeout state independently per key.
33+
*
34+
* <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff progression
35+
* for different retry targets — for example, different replication group IDs or transaction IDs.
36+
* State updates are performed atomically per key using {@link ConcurrentHashMap#compute}.
37+
*
38+
* <p>To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT}
39+
* entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState}
40+
* that always returns {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}. The limit is a soft cap and may be
41+
* slightly exceeded under concurrent insertions.
42+
*
43+
* <p>This class is thread-safe.
44+
*/
45+
public class KeyBasedRetryContext implements RetryContext {
46+
/**
47+
* Maximum number of keys tracked in {@link #registry}.
48+
* Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState}.
49+
* Can be slightly exceeded under concurrent insertions.
50+
*/
51+
private static final int REGISTRY_SIZE_LIMIT = 1_000;
52+
53+
/** Strategy used to compute the next timeout from the current one on each advancement. */
54+
private final TimeoutStrategy timeoutStrategy;
55+
56+
/**
57+
* Sentinel state returned for keys that cannot be tracked because the registry is full.
58+
* Initialized with {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} and attempt {@code -1}
59+
* to distinguish it from legitimately tracked states.
60+
*/
61+
private final TimeoutState fallbackTimeoutState;
62+
63+
/** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */
64+
private final ConcurrentHashMap<String, TimeoutState> registry = new ConcurrentHashMap<>();
65+
66+
/**
67+
* Creates a new context with the given initial timeout and strategy.
68+
*
69+
* @param timeoutStrategy strategy used to compute subsequent timeout values.
70+
*/
71+
public KeyBasedRetryContext(TimeoutStrategy timeoutStrategy) {
72+
this.timeoutStrategy = timeoutStrategy;
73+
74+
this.fallbackTimeoutState = new TimeoutState(DEFAULT_RETRY_TIMEOUT_MS_MAX, -1);
75+
}
76+
77+
/**
78+
* Returns the current {@link TimeoutState} for the given key, if tracked.
79+
*
80+
* <p>Returns an empty {@link Optional} if the key has no recorded state yet.
81+
* If the registry is full and the key is not yet tracked, returns an {@link Optional}
82+
* containing a fallback state initialized to {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
83+
*
84+
* <p>This method does not insert the key into the registry.
85+
*
86+
* @param key the key to look up, typically a transaction ID or replication group ID.
87+
* @return current state for the key, fallback state if registry is full, or empty if not tracked.
88+
*/
89+
@Override
90+
public Optional<TimeoutState> getState(String key) {
91+
if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) {
92+
return of(fallbackTimeoutState);
93+
}
94+
95+
return ofNullable(registry.get(key));
96+
}
97+
98+
/**
99+
* Atomically advances the retry state for the given key and returns the updated state.
100+
*
101+
* <p>The update is performed inside {@link ConcurrentHashMap#compute}, which holds
102+
* an exclusive per-key lock for the duration of the lambda, ensuring that
103+
* {@link TimeoutState#update(TimeoutStrategy)} is never called concurrently on the same instance.
104+
*
105+
* <p>When the registry is full, untracked keys receive the maximum timeout.
106+
* This acts as implicit backpressure: if enough keys are actively retrying to fill
107+
* the registry, the system is under a heavy load and new operations should retry conservatively.
108+
*
109+
* @param key the key to advance state for, typically a transaction ID or replication group ID.
110+
* @return updated {@link TimeoutState} for the key, or {@link #fallbackTimeoutState}
111+
* if the registry is full.
112+
*/
113+
@Override
114+
public TimeoutState updateAndGetState(String key) {
115+
if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) {
116+
return fallbackTimeoutState;
117+
}
118+
119+
return registry.compute(key, (k, state) -> {
120+
if (state == null) {
121+
state = new TimeoutState();
122+
}
123+
124+
state.update(timeoutStrategy);
125+
126+
return state;
127+
});
128+
}
129+
130+
/**
131+
* Removes the retry state for the given key, resetting it as if no retries had occurred.
132+
*
133+
* @param key the key whose state should be removed.
134+
*/
135+
@Override
136+
public void resetState(String key) {
137+
registry.remove(key);
138+
}
139+
140+
/**
141+
* Returns an unmodifiable snapshot of the current registry contents.
142+
*
143+
* <p>The snapshot is a point-in-time copy of the registry map. The returned
144+
* {@link TimeoutState} values are live references — their internal state may
145+
* continue to change concurrently after the snapshot is taken.
146+
*
147+
* <p>This method is intended for testing only and should not be used in production code.
148+
*
149+
* @return unmodifiable copy of the current key-to-state mappings.
150+
*/
151+
@TestOnly
152+
public Map<String, TimeoutState> snapshot() {
153+
return unmodifiableMap(new HashMap<>(registry));
154+
}
155+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util.retry;
19+
20+
/**
21+
* A {@link TimeoutStrategy} that returns the current timeout unchanged on every call.
22+
*
23+
* <p>Useful when retry backoff is not desired — for example, in tests or when a flat
24+
* retry interval is intentional. The timeout passed to {@link #next(int)} is returned
25+
* as-is, so the retry interval remains constant across all attempts.
26+
*
27+
* <p>This class is stateless and thread-safe.
28+
*/
29+
public class NoopTimeoutStrategy implements TimeoutStrategy {
30+
/**
31+
* Returns {@code currentTimeout} unchanged.
32+
*
33+
* @param currentTimeout current retry timeout in milliseconds.
34+
* @return the same {@code currentTimeout} value, unmodified.
35+
*/
36+
@Override
37+
public int next(int currentTimeout) {
38+
return currentTimeout;
39+
}
40+
}

0 commit comments

Comments
 (0)