|
4 | 4 | import java.time.Instant; |
5 | 5 | import java.util.Optional; |
6 | 6 | import java.util.concurrent.*; |
| 7 | +import java.util.concurrent.atomic.AtomicReference; |
7 | 8 |
|
8 | 9 | /** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */ |
9 | 10 | public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> { |
@@ -190,35 +191,62 @@ public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Excepti |
190 | 191 | } |
191 | 192 |
|
192 | 193 | private SlotSupplierFuture scheduleSlotAcquisition(SlotReserveContext<SI> ctx) { |
| 194 | + CompletableFuture<SlotPermit> resultFuture = new CompletableFuture<>(); |
| 195 | + AtomicReference<ScheduledFuture<?>> taskRef = new AtomicReference<>(); |
| 196 | + |
| 197 | + Runnable pollingTask = |
| 198 | + new Runnable() { |
| 199 | + @Override |
| 200 | + public void run() { |
| 201 | + if (resultFuture.isDone()) { |
| 202 | + return; // Already completed or cancelled |
| 203 | + } |
| 204 | + |
| 205 | + try { |
| 206 | + Optional<SlotPermit> permit = tryReserveSlot(ctx); |
| 207 | + if (permit.isPresent()) { |
| 208 | + resultFuture.complete(permit.get()); |
| 209 | + } else { |
| 210 | + // Calculate delay respecting ramp throttle on each retry |
| 211 | + Duration mustWaitFor; |
| 212 | + try { |
| 213 | + mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued()); |
| 214 | + } catch (ArithmeticException e) { |
| 215 | + mustWaitFor = Duration.ZERO; |
| 216 | + } |
| 217 | + |
| 218 | + // Use at least 10ms to avoid tight spinning, but respect ramp throttle if longer |
| 219 | + long delayMs = Math.max(10, mustWaitFor.toMillis()); |
| 220 | + taskRef.set(scheduler.schedule(this, delayMs, TimeUnit.MILLISECONDS)); |
| 221 | + } |
| 222 | + } catch (Exception e) { |
| 223 | + resultFuture.completeExceptionally(e); |
| 224 | + } |
| 225 | + } |
| 226 | + }; |
| 227 | + |
| 228 | + // Calculate initial delay based on ramp throttle |
193 | 229 | Duration mustWaitFor; |
194 | 230 | try { |
195 | 231 | mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued()); |
196 | 232 | } catch (ArithmeticException e) { |
197 | 233 | mustWaitFor = Duration.ZERO; |
198 | 234 | } |
199 | 235 |
|
200 | | - CompletableFuture<Void> permitFuture; |
201 | | - if (mustWaitFor.compareTo(Duration.ZERO) > 0) { |
202 | | - permitFuture = |
203 | | - CompletableFuture.supplyAsync(() -> null, delayedExecutor(mustWaitFor.toMillis())); |
204 | | - } else { |
205 | | - permitFuture = CompletableFuture.completedFuture(null); |
206 | | - } |
| 236 | + long initialDelayMs = Math.max(0, mustWaitFor.toMillis()); |
| 237 | + |
| 238 | + // Schedule the initial attempt |
| 239 | + taskRef.set(scheduler.schedule(pollingTask, initialDelayMs, TimeUnit.MILLISECONDS)); |
207 | 240 |
|
208 | | - // After the delay, try to reserve the slot |
209 | 241 | return SlotSupplierFuture.fromCompletableFuture( |
210 | | - permitFuture.thenCompose( |
211 | | - ignored -> { |
212 | | - Optional<SlotPermit> permit = tryReserveSlot(ctx); |
213 | | - // If we couldn't get a slot this time, delay for a short period and try again |
214 | | - return permit |
215 | | - .map(CompletableFuture::completedFuture) |
216 | | - .orElseGet( |
217 | | - () -> |
218 | | - CompletableFuture.supplyAsync(() -> null, delayedExecutor(10)) |
219 | | - .thenCompose(ig -> scheduleSlotAcquisition(ctx))); |
220 | | - }), |
221 | | - () -> permitFuture.cancel(true)); |
| 242 | + resultFuture, |
| 243 | + () -> { |
| 244 | + // Cancel the scheduled task when aborting |
| 245 | + ScheduledFuture<?> task = taskRef.get(); |
| 246 | + if (task != null) { |
| 247 | + task.cancel(true); |
| 248 | + } |
| 249 | + }); |
222 | 250 | } |
223 | 251 |
|
224 | 252 | @Override |
@@ -248,11 +276,6 @@ private Duration timeSinceLastSlotIssued() { |
248 | 276 | return Duration.between(lastSlotIssuedAt, Instant.now()); |
249 | 277 | } |
250 | 278 |
|
251 | | - // Polyfill for Java 9 delayedExecutor |
252 | | - private Executor delayedExecutor(long delay) { |
253 | | - return r -> scheduler.schedule(() -> scheduler.execute(r), delay, TimeUnit.MILLISECONDS); |
254 | | - } |
255 | | - |
256 | 279 | private static ScheduledExecutorService getDefaultScheduler() { |
257 | 280 | synchronized (ResourceBasedSlotSupplier.class) { |
258 | 281 | if (defaultScheduler == null) { |
|
0 commit comments