Skip to content

Commit fcd8595

Browse files
committed
readded executor queue limit for submit()
submit() now tries to purge if it reaches the limit
1 parent b6c0ce1 commit fcd8595

File tree

2 files changed

+150
-9
lines changed

2 files changed

+150
-9
lines changed

sentry/src/main/java/io/sentry/SentryExecutorService.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.sentry.util.AutoClosableReentrantLock;
44
import java.util.concurrent.Callable;
5+
import java.util.concurrent.CancellationException;
56
import java.util.concurrent.Future;
67
import java.util.concurrent.RejectedExecutionException;
78
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -22,6 +23,12 @@ public final class SentryExecutorService implements ISentryExecutorService {
2223
*/
2324
private static final int INITIAL_QUEUE_SIZE = 40;
2425

26+
/**
27+
* By default, the work queue is unbounded so it can grow as much as the memory allows. We want to
28+
* limit it by 271 which would be x8 times growth from the default initial capacity.
29+
*/
30+
private static final int MAX_QUEUE_SIZE = 271;
31+
2532
private final @NotNull ScheduledThreadPoolExecutor executorService;
2633
private final @NotNull AutoClosableReentrantLock lock = new AutoClosableReentrantLock();
2734

@@ -46,16 +53,41 @@ public SentryExecutorService() {
4653
this(new ScheduledThreadPoolExecutor(1, new SentryExecutorServiceThreadFactory()), null);
4754
}
4855

56+
private boolean isQueueAvailable() {
57+
// If limit is reached, purge cancelled tasks from the queue
58+
if (executorService.getQueue().size() >= MAX_QUEUE_SIZE) {
59+
executorService.purge();
60+
}
61+
// Check limit again after purge
62+
return executorService.getQueue().size() < MAX_QUEUE_SIZE;
63+
}
64+
4965
@Override
5066
public @NotNull Future<?> submit(final @NotNull Runnable runnable)
5167
throws RejectedExecutionException {
52-
return executorService.submit(runnable);
68+
if (isQueueAvailable()) {
69+
return executorService.submit(runnable);
70+
}
71+
if (options != null) {
72+
options
73+
.getLogger()
74+
.log(SentryLevel.WARNING, "Task " + runnable + " rejected from " + executorService);
75+
}
76+
return new CancelledFuture<>();
5377
}
5478

5579
@Override
5680
public @NotNull <T> Future<T> submit(final @NotNull Callable<T> callable)
5781
throws RejectedExecutionException {
58-
return executorService.submit(callable);
82+
if (isQueueAvailable()) {
83+
return executorService.submit(callable);
84+
}
85+
if (options != null) {
86+
options
87+
.getLogger()
88+
.log(SentryLevel.WARNING, "Task " + callable + " rejected from " + executorService);
89+
}
90+
return new CancelledFuture<>();
5991
}
6092

6193
@Override
@@ -127,4 +159,30 @@ private static final class SentryExecutorServiceThreadFactory implements ThreadF
127159
return ret;
128160
}
129161
}
162+
private static final class CancelledFuture<T> implements Future<T> {
163+
@Override
164+
public boolean cancel(final boolean mayInterruptIfRunning) {
165+
return true;
166+
}
167+
168+
@Override
169+
public boolean isCancelled() {
170+
return true;
171+
}
172+
173+
@Override
174+
public boolean isDone() {
175+
return true;
176+
}
177+
178+
@Override
179+
public T get() {
180+
throw new CancellationException();
181+
}
182+
183+
@Override
184+
public T get(final long timeout, final @NotNull TimeUnit unit) {
185+
throw new CancellationException();
186+
}
187+
}
130188
}

sentry/src/test/java/io/sentry/SentryExecutorServiceTest.kt

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@ import org.mockito.kotlin.any
1414
import org.mockito.kotlin.doReturn
1515
import org.mockito.kotlin.mock
1616
import org.mockito.kotlin.never
17+
import org.mockito.kotlin.spy
1718
import org.mockito.kotlin.verify
1819
import org.mockito.kotlin.whenever
20+
import java.util.concurrent.BlockingQueue
21+
import java.util.concurrent.Callable
22+
import java.util.concurrent.CancellationException
23+
import kotlin.test.assertFailsWith
1924

2025
class SentryExecutorServiceTest {
2126
@Test
@@ -105,6 +110,86 @@ class SentryExecutorServiceTest {
105110
whenever(executor.isShutdown).thenReturn(false)
106111
assertFalse(sentryExecutor.isClosed)
107112
}
113+
@Test
114+
fun `SentryExecutorService submit runnable returns cancelled future when queue size exceeds limit`() {
115+
val queue = mock<BlockingQueue<Runnable>>()
116+
whenever(queue.size).thenReturn(272) // Above MAX_QUEUE_SIZE (271)
117+
118+
val executor = mock<ScheduledThreadPoolExecutor> { on { getQueue() } doReturn queue }
119+
120+
val options = mock<SentryOptions>()
121+
val logger = mock<ILogger>()
122+
whenever(options.logger).thenReturn(logger)
123+
124+
val sentryExecutor = SentryExecutorService(executor, options)
125+
val future = sentryExecutor.submit {}
126+
127+
assertTrue(future.isCancelled)
128+
assertTrue(future.isDone)
129+
assertFailsWith<CancellationException> { future.get() }
130+
verify(executor, never()).submit(any<Runnable>())
131+
verify(logger).log(any<SentryLevel>(), any<String>())
132+
}
133+
134+
@Test
135+
fun `SentryExecutorService submit runnable accepts when queue size is within limit`() {
136+
val queue = mock<BlockingQueue<Runnable>>()
137+
whenever(queue.size).thenReturn(270) // Below MAX_QUEUE_SIZE (271)
138+
139+
val executor = mock<ScheduledThreadPoolExecutor> { on { getQueue() } doReturn queue }
140+
141+
val sentryExecutor = SentryExecutorService(executor, null)
142+
sentryExecutor.submit {}
143+
144+
verify(executor).submit(any<Runnable>())
145+
}
146+
147+
@Test
148+
fun `SentryExecutorService submit callable returns cancelled future when queue size exceeds limit`() {
149+
val queue = mock<BlockingQueue<Runnable>>()
150+
whenever(queue.size).thenReturn(272) // Above MAX_QUEUE_SIZE (271)
151+
152+
val executor = mock<ScheduledThreadPoolExecutor> { on { getQueue() } doReturn queue }
153+
154+
val options = mock<SentryOptions>()
155+
val logger = mock<ILogger>()
156+
whenever(options.logger).thenReturn(logger)
157+
158+
val sentryExecutor = SentryExecutorService(executor, options)
159+
val future = sentryExecutor.submit(Callable { "result" })
160+
161+
assertTrue(future.isCancelled)
162+
assertTrue(future.isDone)
163+
assertFailsWith<CancellationException> { future.get() }
164+
verify(executor, never()).submit(any<Callable<String>>())
165+
verify(logger).log(any<SentryLevel>(), any<String>())
166+
}
167+
168+
@Test
169+
fun `SentryExecutorService submit callable accepts when queue size is within limit`() {
170+
val queue = mock<BlockingQueue<Runnable>>()
171+
whenever(queue.size).thenReturn(270) // Below MAX_QUEUE_SIZE (271)
172+
173+
val executor = mock<ScheduledThreadPoolExecutor> { on { getQueue() } doReturn queue }
174+
175+
val sentryExecutor = SentryExecutorService(executor, null)
176+
sentryExecutor.submit(Callable { "result" })
177+
178+
verify(executor).submit(any<Callable<String>>())
179+
}
180+
181+
@Test
182+
fun `SentryExecutorService schedule accepts when queue size is within limit`() {
183+
val queue = mock<BlockingQueue<Runnable>>()
184+
whenever(queue.size).thenReturn(270) // Below MAX_QUEUE_SIZE (271)
185+
186+
val executor = mock<ScheduledThreadPoolExecutor> { on { getQueue() } doReturn queue }
187+
188+
val sentryExecutor = SentryExecutorService(executor, null)
189+
sentryExecutor.schedule({}, 1000L)
190+
191+
verify(executor).schedule(any<Runnable>(), any(), any())
192+
}
108193

109194
@Test
110195
fun `SentryExecutorService prewarm schedules dummy tasks and clears queue`() {
@@ -122,13 +207,11 @@ class SentryExecutorServiceTest {
122207
}
123208

124209
@Test
125-
fun `SentryExecutorService runs any number of job`() {
126-
val sentryExecutor = SentryExecutorService()
127-
var called = false
128-
// Post 1k jobs after 1 day, to test new jobs are accepted
210+
fun `SentryExecutorService schedules any number of job`() {
211+
val executor = ScheduledThreadPoolExecutor(1)
212+
val sentryExecutor = SentryExecutorService(executor, null)
213+
// Post 1k jobs after 1 day, to test they are all accepted
129214
repeat(1000) { sentryExecutor.schedule({}, TimeUnit.DAYS.toMillis(1)) }
130-
sentryExecutor.submit { called = true }
131-
await.until { called }
132-
assertTrue(called)
215+
assertEquals(1000, executor.queue.size)
133216
}
134217
}

0 commit comments

Comments
 (0)