Skip to content

Commit 22a1c1a

Browse files
tconley1428claude
andauthored
Fix deep recursion in ResourceBasedSlotSupplier causing futures to never resolve (#2779)
* Fix deep recursion in ResourceBasedSlotSupplier causing futures to never resolve Replace recursive CompletableFuture chaining in scheduleSlotAcquisition with iterative polling approach using ScheduledExecutorService. The previous implementation created deep future chains that could cause stack overflow issues and prevent futures from completing when resources became available. Changes: - Replace thenCompose() chaining with scheduled polling task - Add proper cancellation handling with AtomicReference<ScheduledFuture<?>> - Maintain ramp throttle timing behavior - Add minimum 10ms delay to prevent tight spinning - Preserve all existing API contracts and behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Add end to end test * Remove unnecessary tests --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 08bfad1 commit 22a1c1a

File tree

2 files changed

+355
-20
lines changed

2 files changed

+355
-20
lines changed

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.time.Instant;
55
import java.util.Optional;
66
import java.util.concurrent.*;
7+
import java.util.concurrent.atomic.AtomicReference;
78

89
/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
910
public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
@@ -190,35 +191,52 @@ public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Excepti
190191
}
191192

192193
private SlotSupplierFuture scheduleSlotAcquisition(SlotReserveContext<SI> ctx) {
194+
CompletableFuture<SlotPermit> resultFuture = new CompletableFuture<>();
195+
AtomicReference<ScheduledFuture<?>> taskRef = new AtomicReference<>();
196+
197+
Runnable pollingTask =
198+
new Runnable() {
199+
@Override
200+
public void run() {
201+
if (resultFuture.isDone()) {
202+
return; // Already completed or cancelled
203+
}
204+
205+
try {
206+
Optional<SlotPermit> permit = tryReserveSlot(ctx);
207+
if (permit.isPresent()) {
208+
resultFuture.complete(permit.get());
209+
} else {
210+
taskRef.set(scheduler.schedule(this, 10, TimeUnit.MILLISECONDS));
211+
}
212+
} catch (Exception e) {
213+
resultFuture.completeExceptionally(e);
214+
}
215+
}
216+
};
217+
218+
// Calculate initial delay based on ramp throttle
193219
Duration mustWaitFor;
194220
try {
195221
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
196222
} catch (ArithmeticException e) {
197223
mustWaitFor = Duration.ZERO;
198224
}
199225

200-
CompletableFuture<Void> permitFuture;
201-
if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
202-
permitFuture =
203-
CompletableFuture.supplyAsync(() -> null, delayedExecutor(mustWaitFor.toMillis()));
204-
} else {
205-
permitFuture = CompletableFuture.completedFuture(null);
206-
}
226+
long initialDelayMs = Math.max(0, mustWaitFor.toMillis());
227+
228+
// Schedule the initial attempt
229+
taskRef.set(scheduler.schedule(pollingTask, initialDelayMs, TimeUnit.MILLISECONDS));
207230

208-
// After the delay, try to reserve the slot
209231
return SlotSupplierFuture.fromCompletableFuture(
210-
permitFuture.thenCompose(
211-
ignored -> {
212-
Optional<SlotPermit> permit = tryReserveSlot(ctx);
213-
// If we couldn't get a slot this time, delay for a short period and try again
214-
return permit
215-
.map(CompletableFuture::completedFuture)
216-
.orElseGet(
217-
() ->
218-
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
219-
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
220-
}),
221-
() -> permitFuture.cancel(true));
232+
resultFuture,
233+
() -> {
234+
// Cancel the scheduled task when aborting
235+
ScheduledFuture<?> task = taskRef.get();
236+
if (task != null) {
237+
task.cancel(true);
238+
}
239+
});
222240
}
223241

224242
@Override
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package io.temporal.worker.tuning;
2+
3+
import static org.junit.Assert.*;
4+
5+
import com.uber.m3.tally.NoopScope;
6+
import io.temporal.activity.ActivityInterface;
7+
import io.temporal.activity.ActivityMethod;
8+
import io.temporal.activity.ActivityOptions;
9+
import io.temporal.client.WorkflowClient;
10+
import io.temporal.client.WorkflowClientOptions;
11+
import io.temporal.client.WorkflowOptions;
12+
import io.temporal.internal.worker.SlotReservationData;
13+
import io.temporal.internal.worker.TrackingSlotSupplier;
14+
import io.temporal.serviceclient.WorkflowServiceStubs;
15+
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
16+
import io.temporal.testing.internal.ExternalServiceTestConfigurator;
17+
import io.temporal.testing.internal.SDKTestWorkflowRule;
18+
import io.temporal.worker.Worker;
19+
import io.temporal.worker.WorkerFactory;
20+
import io.temporal.worker.WorkerOptions;
21+
import io.temporal.workflow.Workflow;
22+
import io.temporal.workflow.WorkflowInterface;
23+
import io.temporal.workflow.WorkflowMethod;
24+
import java.time.Duration;
25+
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import org.junit.Assume;
29+
import org.junit.Test;
30+
31+
/**
32+
* Test to demonstrate the recursion issue in ResourceBasedSlotSupplier.scheduleSlotAcquisition()
33+
* where a long chain of futures is created when resources remain unavailable.
34+
*/
35+
public class ResourceBasedSlotSupplierNonRecursiveTest {
36+
37+
static class TestResourceController extends ResourceBasedController {
38+
private final AtomicBoolean allow;
39+
private final AtomicInteger pidDecisionCallCount;
40+
41+
public TestResourceController(AtomicBoolean allow) {
42+
super(
43+
ResourceBasedControllerOptions.newBuilder(0.5, 0.5)
44+
.setMemoryPGain(1)
45+
.setMemoryIGain(0)
46+
.setMemoryDGain(0)
47+
.setMemoryOutputThreshold(0)
48+
.setCpuPGain(1)
49+
.setCpuIGain(0)
50+
.setCpuDGain(0)
51+
.setCpuOutputThreshold(0)
52+
.build(),
53+
new JVMSystemResourceInfo());
54+
this.allow = allow;
55+
this.pidDecisionCallCount = new AtomicInteger(0);
56+
}
57+
58+
@Override
59+
boolean pidDecision() {
60+
pidDecisionCallCount.incrementAndGet();
61+
return allow.get();
62+
}
63+
64+
public int getPidDecisionCallCount() {
65+
return pidDecisionCallCount.get();
66+
}
67+
}
68+
69+
/**
70+
* This test demonstrates the severe bug: even when resources become available, the future may
71+
* never resolve due to the extremely deep recursive chain. This is the actual reported bug.
72+
*/
73+
@Test(timeout = 90000)
74+
public void testFutureNeverResolvesEvenAfterResourcesBecomeAvailable() throws Exception {
75+
AtomicBoolean allow = new AtomicBoolean(false);
76+
TestResourceController controller = new TestResourceController(allow);
77+
78+
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
79+
80+
ResourceBasedSlotSupplier<ActivitySlotInfo> supplier =
81+
ResourceBasedSlotSupplier.createForActivity(
82+
controller,
83+
ResourceBasedSlotOptions.newBuilder()
84+
.setMinimumSlots(0)
85+
.setMaximumSlots(10)
86+
.setRampThrottle(Duration.ZERO)
87+
.build(),
88+
scheduler);
89+
90+
TrackingSlotSupplier<ActivitySlotInfo> tracking =
91+
new TrackingSlotSupplier<>(supplier, new NoopScope());
92+
93+
// Reserve first slot
94+
SlotSupplierFuture firstFuture =
95+
tracking.reserveSlot(new SlotReservationData("tq", "id1", "bid1"));
96+
firstFuture.get(1, TimeUnit.SECONDS);
97+
98+
// Try to reserve second slot - will build up recursive chain
99+
SlotSupplierFuture secondFuture =
100+
tracking.reserveSlot(new SlotReservationData("tq", "id2", "bid2"));
101+
102+
// Build up a very deep chain - 30 seconds should create ~3000 recursive calls
103+
System.out.println("Building deep recursive chain for 30 seconds...");
104+
Thread.sleep(30000);
105+
106+
int recursiveAttempts = controller.getPidDecisionCallCount();
107+
System.out.println("Built chain with " + recursiveAttempts + " recursive attempts");
108+
109+
// Now ALLOW resources - this is the critical test
110+
System.out.println("Allowing resources NOW...");
111+
allow.set(true);
112+
113+
// Try to get the result - this should complete quickly, but might NEVER complete
114+
// if the chain is too deep
115+
try {
116+
System.out.println("Waiting for future to complete (30 second timeout)...");
117+
long startTime = System.currentTimeMillis();
118+
SlotPermit permit = secondFuture.get(30, TimeUnit.SECONDS);
119+
long duration = System.currentTimeMillis() - startTime;
120+
121+
if (permit != null) {
122+
System.out.println(
123+
"SUCCESS: Future completed in " + duration + "ms after allowing resources");
124+
} else {
125+
fail("Future completed but returned null permit");
126+
}
127+
} catch (TimeoutException e) {
128+
// This is the BUG - even though resources are now available, the future never completes
129+
System.out.println(
130+
"BUG REPRODUCED: Future did NOT complete even after 30 seconds of resources being"
131+
+ " available!");
132+
System.out.println(
133+
"The recursive chain of "
134+
+ recursiveAttempts
135+
+ " futures is too deep and never resolves");
136+
fail(
137+
"Future never completed even after resources became available - this is the reported"
138+
+ " bug!");
139+
} finally {
140+
SlotPermit ignored = secondFuture.abortReservation();
141+
scheduler.shutdownNow();
142+
}
143+
}
144+
145+
@WorkflowInterface
146+
public interface TestWorkflow {
147+
@WorkflowMethod
148+
String execute(String input);
149+
}
150+
151+
@ActivityInterface
152+
public interface TestActivity {
153+
@ActivityMethod
154+
String process(String input);
155+
}
156+
157+
public static class TestWorkflowImpl implements TestWorkflow {
158+
private final TestActivity activity =
159+
Workflow.newActivityStub(
160+
TestActivity.class,
161+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build());
162+
163+
@Override
164+
public String execute(String input) {
165+
// Execute just one activity to keep test timing reasonable
166+
return activity.process(input);
167+
}
168+
}
169+
170+
public static class TestActivityImpl implements TestActivity {
171+
@Override
172+
public String process(String input) {
173+
try {
174+
// Short activity duration to keep test fast
175+
Thread.sleep(100);
176+
} catch (InterruptedException e) {
177+
Thread.currentThread().interrupt();
178+
}
179+
return "Processed: " + input;
180+
}
181+
}
182+
183+
@Test
184+
public void testEndToEndWorkerWithResourceStarvationRecovery() throws Exception {
185+
Assume.assumeTrue(
186+
"This test requires a real Temporal server to reproduce the recursion bug",
187+
SDKTestWorkflowRule.useExternalService);
188+
189+
System.out.println(
190+
"=== End-to-End Worker Test with Resource Starvation Recovery (External Service) ===");
191+
192+
// Create connections to real Temporal server
193+
WorkflowServiceStubs service =
194+
WorkflowServiceStubs.newServiceStubs(
195+
WorkflowServiceStubsOptions.newBuilder()
196+
.setTarget(ExternalServiceTestConfigurator.getTemporalServiceAddress())
197+
.build());
198+
WorkflowClient client =
199+
WorkflowClient.newInstance(
200+
service, WorkflowClientOptions.newBuilder().setNamespace("default").build());
201+
WorkerFactory workerFactory = WorkerFactory.newInstance(client);
202+
203+
// Create our own resource controller that we can control
204+
AtomicBoolean allowResources = new AtomicBoolean(false);
205+
TestResourceController resourceController = new TestResourceController(allowResources);
206+
207+
// Create a custom WorkerTuner that uses our controlled resource controller
208+
WorkerTuner customTuner =
209+
new WorkerTuner() {
210+
@Override
211+
public SlotSupplier<WorkflowSlotInfo> getWorkflowTaskSlotSupplier() {
212+
return ResourceBasedSlotSupplier.createForWorkflow(
213+
resourceController,
214+
ResourceBasedSlotOptions.newBuilder()
215+
.setMinimumSlots(-1) // Use negative to bypass default fallback
216+
.setMaximumSlots(2) // Very restrictive limit to force queueing
217+
.setRampThrottle(Duration.ZERO) // No delay = maximum recursion depth
218+
.build());
219+
}
220+
221+
@Override
222+
public SlotSupplier<ActivitySlotInfo> getActivityTaskSlotSupplier() {
223+
return ResourceBasedSlotSupplier.createForActivity(
224+
resourceController,
225+
ResourceBasedSlotOptions.newBuilder()
226+
.setMinimumSlots(-1) // Use negative to bypass default fallback
227+
.setMaximumSlots(2) // Very restrictive limit to force queueing
228+
.setRampThrottle(Duration.ZERO) // No delay = maximum recursion depth
229+
.build());
230+
}
231+
232+
@Override
233+
public SlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier() {
234+
return ResourceBasedSlotSupplier.createForLocalActivity(
235+
resourceController,
236+
ResourceBasedSlotOptions.newBuilder()
237+
.setMinimumSlots(-1) // Use negative to bypass default fallback
238+
.setMaximumSlots(2) // Very restrictive limit to force queueing
239+
.setRampThrottle(Duration.ZERO) // No delay = maximum recursion depth
240+
.build());
241+
}
242+
243+
@Override
244+
public SlotSupplier<NexusSlotInfo> getNexusSlotSupplier() {
245+
return ResourceBasedSlotSupplier.createForNexus(
246+
resourceController,
247+
ResourceBasedSlotOptions.newBuilder()
248+
.setMinimumSlots(-1) // Use negative to bypass default fallback
249+
.setMaximumSlots(2) // Very restrictive limit to force queueing
250+
.setRampThrottle(Duration.ZERO) // No delay = maximum recursion depth
251+
.build());
252+
}
253+
};
254+
255+
// Create worker with our custom tuner
256+
String taskQueue = "test-task-queue-" + System.currentTimeMillis();
257+
Worker worker =
258+
workerFactory.newWorker(
259+
taskQueue, WorkerOptions.newBuilder().setWorkerTuner(customTuner).build());
260+
261+
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
262+
worker.registerActivitiesImplementations(new TestActivityImpl());
263+
264+
workerFactory.start();
265+
266+
try {
267+
// Start multiple workflows to create enough concurrent slot pressure
268+
TestWorkflow workflow =
269+
client.newWorkflowStub(
270+
TestWorkflow.class,
271+
WorkflowOptions.newBuilder()
272+
.setTaskQueue(taskQueue)
273+
.setWorkflowId("test-workflow-" + System.currentTimeMillis())
274+
.build());
275+
276+
// Start workflow execution in background
277+
CompletableFuture<String> workflowResult =
278+
CompletableFuture.supplyAsync(
279+
() -> {
280+
try {
281+
return workflow.execute("test-input");
282+
} catch (Exception e) {
283+
throw new RuntimeException(e);
284+
}
285+
});
286+
287+
// Let the system build up resource starvation for much longer to create extreme deep
288+
// recursion
289+
// We need VERY long starvation to build recursive chains deep enough to cause the bug
290+
long starvationStart = System.currentTimeMillis();
291+
System.out.println(
292+
"Building resource starvation for 30 seconds to create deep recursion... Started at: "
293+
+ starvationStart);
294+
Thread.sleep(30000);
295+
296+
int pidCallsBefore = resourceController.getPidDecisionCallCount();
297+
System.out.println("PID decisions during starvation: " + pidCallsBefore);
298+
System.out.println("Allow resources flag during starvation: " + allowResources.get());
299+
300+
// The recursive bug should have built up thousands of nested futures by now
301+
302+
// Now allow resources - workflow should complete quickly
303+
long allowTime = System.currentTimeMillis();
304+
System.out.println(
305+
"Allowing resources NOW at " + allowTime + " - workflow should complete quickly...");
306+
allowResources.set(true);
307+
System.out.println("Allow resources flag after setting: " + allowResources.get());
308+
309+
assertEquals("Processed: test-input", workflowResult.get());
310+
311+
} finally {
312+
workerFactory.shutdownNow();
313+
workerFactory.awaitTermination(1, TimeUnit.MINUTES);
314+
service.shutdownNow();
315+
}
316+
}
317+
}

0 commit comments

Comments
 (0)