Skip to content

Commit 6ddad43

Browse files
authored
RANGER-5476: fix potential deadlock in PolicyRefresher.stopRefresher() while retrying HTTP request (#832)
1 parent 72faddd commit 6ddad43

2 files changed

Lines changed: 207 additions & 0 deletions

File tree

agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,8 @@ protected boolean shouldRetry(String currentUrl, int index, int retryAttemptCoun
667667
Thread.sleep(retryIntervalMs);
668668
} catch (InterruptedException excp) {
669669
LOG.error("Failed while waiting to retry", excp);
670+
Thread.currentThread().interrupt();
671+
return false;
670672
}
671673
} else if (isLastUrl) {
672674
LOG.error("Failed to communicate with all Ranger Admin's URL's : [ {} ]", configuredURLs);
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.ranger.plugin.policyengine;
21+
22+
import com.sun.net.httpserver.HttpServer;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.ranger.plugin.util.RangerRESTClient;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.Timeout;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.net.InetSocketAddress;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
36+
import static org.junit.jupiter.api.Assertions.assertFalse;
37+
import static org.junit.jupiter.api.Assertions.assertTrue;
38+
import static org.junit.jupiter.api.Assertions.fail;
39+
40+
public class TestRangerRESTClientDeadlock {
41+
private static final Logger LOG = LoggerFactory.getLogger(TestRangerRESTClientDeadlock.class);
42+
private HttpServer httpServer;
43+
private RangerRESTClient restClient;
44+
45+
private void setupServerAndClient(CountDownLatch serverLatch) throws Exception {
46+
httpServer = HttpServer.create(new InetSocketAddress(0), 0);
47+
httpServer.createContext("/", exchange -> {
48+
LOG.info("Server: Received request, returning 503...");
49+
serverLatch.countDown(); // Signal the test thread
50+
exchange.sendResponseHeaders(503, -1);
51+
exchange.close();
52+
});
53+
httpServer.start();
54+
String serverUrl = "http://localhost:" + httpServer.getAddress().getPort();
55+
Configuration conf = new Configuration();
56+
restClient = new RangerRESTClient(serverUrl, null, conf);
57+
restClient.setMaxRetryAttempts(10);
58+
restClient.setRetryIntervalMs(1000);
59+
}
60+
61+
@AfterEach
62+
public void tearDown() {
63+
if (httpServer != null) {
64+
httpServer.stop(0);
65+
}
66+
}
67+
68+
@Test
69+
@Timeout(value = 30, unit = TimeUnit.SECONDS)
70+
public void testDeadlockWhenStoppingDuringRetry() throws Exception {
71+
CountDownLatch serverReceivedRequest = new CountDownLatch(1);
72+
CountDownLatch retryStarted = new CountDownLatch(1);
73+
CountDownLatch joinCompleted = new CountDownLatch(1);
74+
AtomicBoolean deadlockDetected = new AtomicBoolean(false);
75+
76+
setupServerAndClient(serverReceivedRequest);
77+
Thread refresherThread = getRefresherThread(retryStarted);
78+
79+
assertTrue(retryStarted.await(5, TimeUnit.SECONDS), "Retry should have started");
80+
assertTrue(serverReceivedRequest.await(10, TimeUnit.SECONDS), "Server should have received a request");
81+
82+
refresherThread.interrupt();
83+
84+
Thread joinThread = getJoinThread(refresherThread, joinCompleted);
85+
// Wait for join to complete (with timeout to detect deadlock)
86+
boolean joined = joinCompleted.await(15, TimeUnit.SECONDS);
87+
88+
if (!joined) {
89+
deadlockDetected.set(true);
90+
LOG.error("DEADLOCK DETECTED: join() did not complete within 15 seconds!");
91+
LOG.error("This confirms that RangerRESTClient ignores InterruptedException during retry.");
92+
joinThread.interrupt();
93+
refresherThread.interrupt();
94+
fail("Deadlock detected: Thread.join() did not complete within timeout. " +
95+
"This reproduces the issue where RangerRESTClient ignores InterruptedException " +
96+
"during retry, causing PolicyRefresher.stopRefresher() to hang indefinitely.");
97+
}
98+
assertFalse(deadlockDetected.get(), "No deadlock should occur.");
99+
LOG.info("<== Test completed - no deadlock occurred.");
100+
}
101+
102+
private static Thread getJoinThread(Thread refresherThread, CountDownLatch joinCompleted) {
103+
Thread joinThread = new Thread(() -> {
104+
try {
105+
long startTime = System.currentTimeMillis();
106+
refresherThread.join();
107+
long duration = System.currentTimeMillis() - startTime;
108+
LOG.info("MainThread: join() completed in {} ms", duration);
109+
joinCompleted.countDown();
110+
} catch (InterruptedException e) {
111+
LOG.error("MainThread: join() was interrupted", e);
112+
}
113+
});
114+
joinThread.start();
115+
return joinThread;
116+
}
117+
118+
private Thread getRefresherThread(CountDownLatch retryStarted) {
119+
Thread refresherThread = new Thread(() -> {
120+
try {
121+
LOG.info("RefresherThread: Starting GET request (will trigger retries)...");
122+
retryStarted.countDown();
123+
restClient.get("/service/policies/download", null);
124+
LOG.info("RefresherThread: GET request completed (unexpected)");
125+
} catch (Exception e) {
126+
LOG.info("RefresherThread: Exited with exception: {}", e.getMessage());
127+
}
128+
}, "RefresherThread");
129+
refresherThread.start();
130+
return refresherThread;
131+
}
132+
133+
@Test
134+
@Timeout(value = 30, unit = TimeUnit.SECONDS)
135+
public void testRetryLogicWorksNormally() throws Exception {
136+
LOG.info("==> Starting test to verify normal retry behavior");
137+
CountDownLatch serverReceivedRequest = new CountDownLatch(1);
138+
setupServerAndClient(serverReceivedRequest);
139+
restClient.setMaxRetryAttempts(3);
140+
restClient.setRetryIntervalMs(1000);
141+
long startTime = System.currentTimeMillis();
142+
try {
143+
restClient.get("/service/policies/download", null);
144+
fail("Expected exception due to 503 errors");
145+
} catch (Exception e) {
146+
long duration = System.currentTimeMillis() - startTime;
147+
LOG.info("Request failed after {}ms (expected): {}", duration, e.getMessage());
148+
assertTrue(duration >= 3000, "Should have retried with delays");
149+
}
150+
LOG.info("<== Test passed - retry logic works normally");
151+
}
152+
153+
@Test
154+
@Timeout(value = 30, unit = TimeUnit.SECONDS)
155+
public void testInterruptWorksWithMultipleURLs() throws Exception {
156+
LOG.info("==> Starting test with multiple URLs");
157+
CountDownLatch serverReceivedRequest = new CountDownLatch(1);
158+
setupServerAndClient(serverReceivedRequest);
159+
// Create a second server
160+
HttpServer httpServer2 = HttpServer.create(new InetSocketAddress(0), 0);
161+
httpServer2.createContext("/", exchange -> {
162+
LOG.info("Server2: Received request, returning 503...");
163+
exchange.sendResponseHeaders(503, -1);
164+
exchange.close();
165+
});
166+
httpServer2.start();
167+
168+
try {
169+
String serverUrl1 = "http://localhost:" + httpServer.getAddress().getPort();
170+
String serverUrl2 = "http://localhost:" + httpServer2.getAddress().getPort();
171+
String multiUrl = serverUrl1 + "," + serverUrl2;
172+
173+
Configuration conf = new Configuration();
174+
RangerRESTClient multiUrlClient = new RangerRESTClient(multiUrl, null, conf);
175+
multiUrlClient.setMaxRetryAttempts(5);
176+
multiUrlClient.setRetryIntervalMs(1000);
177+
178+
CountDownLatch started = new CountDownLatch(1);
179+
Thread testThread = new Thread(() -> {
180+
try {
181+
started.countDown();
182+
multiUrlClient.get("/test", null);
183+
} catch (Exception e) {
184+
LOG.info("Multi-URL thread exited: {}", e.getMessage());
185+
}
186+
});
187+
188+
testThread.start();
189+
assertTrue(started.await(5, TimeUnit.SECONDS));
190+
assertTrue(serverReceivedRequest.await(10, TimeUnit.SECONDS), "Server should have received a request");
191+
192+
long startTime = System.currentTimeMillis();
193+
testThread.interrupt();
194+
testThread.join(5000);
195+
long duration = System.currentTimeMillis() - startTime;
196+
197+
assertFalse(testThread.isAlive(), "Thread should exit even with multiple URLs");
198+
assertTrue(duration < 5000, "Should exit quickly: " + duration + "ms");
199+
200+
LOG.info("<== Test passed - interrupt works with multiple URLs");
201+
} finally {
202+
httpServer2.stop(0);
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)