-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add OOM pause mechanism to pause query threads before killing on critical heap #18163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f6ccfeb
5fe7c1a
677e123
f902705
ac5345e
0f63522
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| */ | ||
| package org.apache.pinot.core.accounting; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import it.unimi.dsi.fastutil.longs.LongLongMutablePair; | ||
| import java.util.Comparator; | ||
| import java.util.HashMap; | ||
|
|
@@ -26,6 +27,8 @@ | |
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.locks.Condition; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import org.apache.pinot.common.metrics.AbstractMetrics; | ||
| import org.apache.pinot.common.metrics.BrokerGauge; | ||
| import org.apache.pinot.common.metrics.BrokerMeter; | ||
|
|
@@ -75,6 +78,21 @@ enum TriggeringLevel { | |
| private Map<String, QueryResourceTrackerImpl> _queryResourceUsages; | ||
| private TriggeringLevel _triggeringLevel = TriggeringLevel.Normal; | ||
|
|
||
| // Track previous triggering level to detect transitions into critical | ||
| private TriggeringLevel _previousTriggeringLevel = TriggeringLevel.Normal; | ||
|
|
||
| // OOM pause state - lock+condition for blocking query threads at critical heap level. | ||
| // _pauseLock / _pauseCondition are used to coordinate blocking query threads and signaling them from the watcher | ||
| // thread when the pause is cleared. | ||
| private final ReentrantLock _pauseLock = new ReentrantLock(); | ||
| private final Condition _pauseCondition = _pauseLock.newCondition(); | ||
| // Shared between the watcher thread (writer) and query threads (readers). Volatile enables the fast-path in | ||
| // waitIfPaused() to avoid acquiring the lock when no pause is active. | ||
| private volatile boolean _pauseActive = false; | ||
| // Local state for the watcher thread only (similar to _triggeringLevel) - read/written only from preAggregate and | ||
| // postAggregate, never by query threads. | ||
| private long _pauseDeadlineMs = 0; | ||
|
|
||
| // metrics class | ||
| private final AbstractMetrics _metrics; | ||
| private final AbstractMetrics.Meter _queryKilledMeter; | ||
|
|
@@ -133,13 +151,33 @@ public void preAggregate(Iterable<ThreadResourceTrackerImpl> threadTrackers) { | |
| QueryMonitorConfig config = _queryMonitorConfig.get(); | ||
| _sleepTime = config.getNormalSleepTime(); | ||
| _queryResourceUsages = null; | ||
| _previousTriggeringLevel = _triggeringLevel; | ||
| collectTriggerMetrics(); | ||
| evalTriggers(); | ||
| // Prioritize the panic check, kill ALL QUERIES immediately if triggered | ||
| // Prioritize the panic check | ||
| if (_triggeringLevel == TriggeringLevel.HeapMemoryPanic) { | ||
| killAllQueries(threadTrackers); | ||
| boolean pauseOnPanic = config.isOomPreQueryKillPauseEnabled() && config.isOomPanicPreQueryKillPauseEnabled(); | ||
| if (_pauseActive) { | ||
| if (System.currentTimeMillis() >= _pauseDeadlineMs || !pauseOnPanic) { | ||
| // Grace period expired, or panic-pause disabled — kill all and clear | ||
| killAllQueries(threadTrackers); | ||
| clearPause(); | ||
| } | ||
| // else: still in grace period and panic-pause is enabled | ||
| } else if (pauseOnPanic | ||
| && _previousTriggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) { | ||
| // Fresh jump to panic bypassing critical — activate pause before killing | ||
| activatePause(config.getOomPreQueryKillPauseDurationMs()); | ||
| } else { | ||
| // Pause-on-panic disabled, or already been through a pause cycle — kill immediately | ||
| killAllQueries(threadTrackers); | ||
| } | ||
| return; | ||
| } | ||
| // If heap recovered below critical while an OOM pause is active, clear it | ||
| if (_pauseActive && _triggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) { | ||
| clearPause(); | ||
| } | ||
| // Track aggregated query resource usage if triggered | ||
| if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) { | ||
| _queryResourceUsages = new HashMap<>(); | ||
|
|
@@ -179,7 +217,22 @@ public void postAggregate() { | |
| } | ||
| switch (_triggeringLevel) { | ||
| case HeapMemoryCritical: | ||
| killMostExpensiveQuery(); | ||
| QueryMonitorConfig config = _queryMonitorConfig.get(); | ||
| if (_pauseActive) { | ||
| // Pause is active - check if grace period expired | ||
| if (System.currentTimeMillis() >= _pauseDeadlineMs) { | ||
| killMostExpensiveQuery(); | ||
|
yashmayya marked this conversation as resolved.
|
||
| clearPause(); | ||
| } | ||
| // else: still in grace period, skip kill this cycle | ||
| } else if (config.isOomPreQueryKillPauseEnabled() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only pauses when the previous cycle was below critical. If the config is enabled while the heap is already critical/panic, the watcher still takes the immediate-kill path until usage drops below critical and rises again, so the advertised dynamic toggle does not take effect during an active incident. Please make the pause arm based on the current config/state, not only on the transition edge.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this extreme edge case is worth introducing additional complexity for IMO, it's easier to reason about this way. |
||
| && _previousTriggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) { | ||
| // Just transitioned to critical - activate pause instead of killing | ||
| activatePause(config.getOomPreQueryKillPauseDurationMs()); | ||
| } else { | ||
| // Pause disabled, or already been through a pause cycle - kill | ||
| killMostExpensiveQuery(); | ||
| } | ||
| break; | ||
| case CPUTimeBasedKilling: | ||
| killCPUTimeExceedQueries(); | ||
|
|
@@ -198,8 +251,6 @@ private void collectTriggerMetrics() { | |
| } | ||
|
|
||
| private void evalTriggers() { | ||
| TriggeringLevel previousTriggeringLevel = _triggeringLevel; | ||
|
|
||
| // Compute the new triggering level based on the current heap usage | ||
| QueryMonitorConfig config = _queryMonitorConfig.get(); | ||
| if (_usedBytes > config.getPanicLevel()) { | ||
|
|
@@ -234,7 +285,7 @@ private void evalTriggers() { | |
| } | ||
|
|
||
| // Log the triggering level change | ||
| if (previousTriggeringLevel != _triggeringLevel) { | ||
| if (_previousTriggeringLevel != _triggeringLevel) { | ||
| switch (_triggeringLevel) { | ||
| case HeapMemoryPanic: | ||
| LOGGER.error("Heap used bytes: {} exceeds panic level: {}, killing all queries", _usedBytes, | ||
|
|
@@ -266,6 +317,59 @@ private void evalTriggers() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Activates an OOM pause. Query threads calling {@link #waitIfPaused()} will block until the pause is cleared or the | ||
| * deadline expires. Also hints the JVM to run garbage collection. | ||
| */ | ||
| private void activatePause(long timeoutMs) { | ||
| LOGGER.warn("Activating OOM pause for {}ms to allow garbage collection before killing queries. " | ||
| + "Heap used bytes: {}", timeoutMs, _usedBytes); | ||
| _pauseDeadlineMs = System.currentTimeMillis() + timeoutMs; | ||
| _pauseActive = true; | ||
| // Hint the JVM to run GC while query threads are pausing | ||
| System.gc(); | ||
| } | ||
|
|
||
| /** | ||
| * Clears the OOM pause and wakes all blocked query threads. | ||
| */ | ||
| @VisibleForTesting | ||
| void clearPause() { | ||
|
yashmayya marked this conversation as resolved.
|
||
| LOGGER.info("Clearing OOM pause. Heap used bytes: {}", _usedBytes); | ||
| _pauseLock.lock(); | ||
| try { | ||
| _pauseActive = false; | ||
| _pauseCondition.signalAll(); | ||
| } finally { | ||
| _pauseLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Called by query threads at sampling checkpoints. Blocks if an OOM pause is active. Fast-path: single volatile read | ||
| * when no pause is active. | ||
| * @return {@code true} if the thread entered the slow path (i.e. a pause was active when called), {@code false} if | ||
| * the fast-path was taken and the thread did not block. | ||
| */ | ||
| @VisibleForTesting | ||
| boolean waitIfPaused() { | ||
| if (!_pauseActive) { | ||
| return false; | ||
| } | ||
| _pauseLock.lock(); | ||
| try { | ||
| while (_pauseActive) { | ||
| _pauseCondition.await(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| // Query was killed during pause - restore interrupt flag so the next checkTermination() call detects it | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| _pauseLock.unlock(); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private void killAllQueries(Iterable<ThreadResourceTrackerImpl> threadTrackers) { | ||
| QueryMonitorConfig config = _queryMonitorConfig.get(); | ||
| if (!config.isOomKillQueryEnabled()) { | ||
|
|
@@ -394,6 +498,21 @@ private void cleanInactive() { | |
| _inactiveQueries.addAll(_untrackedCpuMemUsage.keySet()); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| boolean isPauseActive() { | ||
| return _pauseActive; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| TriggeringLevel getTriggeringLevel() { | ||
| return _triggeringLevel; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| TriggeringLevel getPreviousTriggeringLevel() { | ||
| return _previousTriggeringLevel; | ||
| } | ||
|
|
||
| public long getHeapUsageBytes() { | ||
| return _usedBytes; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.