Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public class QueryMonitorConfig {

private final boolean _queryKilledMetricEnabled;

// how long to pause query threads (ms) before proceeding with kill; non-positive means disabled
private final long _oomPreQueryKillPauseDurationMs;

// whether to also apply the OOM pause before killing at panic level
private final boolean _oomPanicPreQueryKillPauseEnabled;

private final int _workloadSleepTimeMs;

private final boolean _workloadCostEnforcementEnabled;
Expand Down Expand Up @@ -111,6 +117,14 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_queryKilledMetricEnabled = config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);

_oomPreQueryKillPauseDurationMs = config.getProperty(
CommonConstants.Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS,
CommonConstants.Accounting.DEFAULT_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS);

_oomPanicPreQueryKillPauseEnabled = config.getProperty(
CommonConstants.Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE,
CommonConstants.Accounting.DEFAULT_OOM_PANIC_PRE_QUERY_KILL_PAUSE_ENABLED);

_workloadSleepTimeMs = config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_SLEEP_TIME_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_SLEEP_TIME_MS);

Expand Down Expand Up @@ -257,6 +271,30 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_queryKilledMetricEnabled = oldConfig._queryKilledMetricEnabled;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS)) {
_oomPreQueryKillPauseDurationMs = CommonConstants.Accounting.DEFAULT_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS;
} else {
_oomPreQueryKillPauseDurationMs = Long.parseLong(
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS));
}
} else {
_oomPreQueryKillPauseDurationMs = oldConfig._oomPreQueryKillPauseDurationMs;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE)) {
_oomPanicPreQueryKillPauseEnabled = CommonConstants.Accounting.DEFAULT_OOM_PANIC_PRE_QUERY_KILL_PAUSE_ENABLED;
} else {
_oomPanicPreQueryKillPauseEnabled = Boolean.parseBoolean(
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE));
}
} else {
_oomPanicPreQueryKillPauseEnabled = oldConfig._oomPanicPreQueryKillPauseEnabled;
}

if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_SLEEP_TIME_MS)) {
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_WORKLOAD_SLEEP_TIME_MS)) {
Expand Down Expand Up @@ -330,6 +368,18 @@ public boolean isQueryKilledMetricEnabled() {
return _queryKilledMetricEnabled;
}

public boolean isOomPreQueryKillPauseEnabled() {
return _oomPreQueryKillPauseDurationMs > 0;
}

public long getOomPreQueryKillPauseDurationMs() {
return _oomPreQueryKillPauseDurationMs;
}

public boolean isOomPanicPreQueryKillPauseEnabled() {
return _oomPanicPreQueryKillPauseEnabled;
}

public int getWorkloadSleepTimeMs() {
return _workloadSleepTimeMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.accounting;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.longs.LongLongMutablePair;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -26,6 +27,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
Expand Down Expand Up @@ -75,6 +78,21 @@ enum TriggeringLevel {
private Map<String, QueryResourceTrackerImpl> _queryResourceUsages;
private TriggeringLevel _triggeringLevel = TriggeringLevel.Normal;

// Track previous triggering level to detect transitions into critical
private TriggeringLevel _previousTriggeringLevel = TriggeringLevel.Normal;

// OOM pause state - lock+condition for blocking query threads at critical heap level.
// _pauseLock / _pauseCondition are used to coordinate blocking query threads and signaling them from the watcher
// thread when the pause is cleared.
private final ReentrantLock _pauseLock = new ReentrantLock();
private final Condition _pauseCondition = _pauseLock.newCondition();
// Shared between the watcher thread (writer) and query threads (readers). Volatile enables the fast-path in
// waitIfPaused() to avoid acquiring the lock when no pause is active.
private volatile boolean _pauseActive = false;
// Local state for the watcher thread only (similar to _triggeringLevel) - read/written only from preAggregate and
// postAggregate, never by query threads.
private long _pauseDeadlineMs = 0;

// metrics class
private final AbstractMetrics _metrics;
private final AbstractMetrics.Meter _queryKilledMeter;
Expand Down Expand Up @@ -133,13 +151,33 @@ public void preAggregate(Iterable<ThreadResourceTrackerImpl> threadTrackers) {
QueryMonitorConfig config = _queryMonitorConfig.get();
_sleepTime = config.getNormalSleepTime();
_queryResourceUsages = null;
_previousTriggeringLevel = _triggeringLevel;
collectTriggerMetrics();
evalTriggers();
// Prioritize the panic check, kill ALL QUERIES immediately if triggered
// Prioritize the panic check
if (_triggeringLevel == TriggeringLevel.HeapMemoryPanic) {
killAllQueries(threadTrackers);
boolean pauseOnPanic = config.isOomPreQueryKillPauseEnabled() && config.isOomPanicPreQueryKillPauseEnabled();
if (_pauseActive) {
if (System.currentTimeMillis() >= _pauseDeadlineMs || !pauseOnPanic) {
// Grace period expired, or panic-pause disabled — kill all and clear
killAllQueries(threadTrackers);
Comment thread
yashmayya marked this conversation as resolved.
clearPause();
}
// else: still in grace period and panic-pause is enabled
} else if (pauseOnPanic
&& _previousTriggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) {
// Fresh jump to panic bypassing critical — activate pause before killing
activatePause(config.getOomPreQueryKillPauseDurationMs());
} else {
// Pause-on-panic disabled, or already been through a pause cycle — kill immediately
killAllQueries(threadTrackers);
}
return;
}
// If heap recovered below critical while an OOM pause is active, clear it
if (_pauseActive && _triggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) {
clearPause();
}
// Track aggregated query resource usage if triggered
if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) {
_queryResourceUsages = new HashMap<>();
Expand Down Expand Up @@ -179,7 +217,22 @@ public void postAggregate() {
}
switch (_triggeringLevel) {
case HeapMemoryCritical:
killMostExpensiveQuery();
QueryMonitorConfig config = _queryMonitorConfig.get();
if (_pauseActive) {
// Pause is active - check if grace period expired
if (System.currentTimeMillis() >= _pauseDeadlineMs) {
killMostExpensiveQuery();
Comment thread
yashmayya marked this conversation as resolved.
clearPause();
}
// else: still in grace period, skip kill this cycle
} else if (config.isOomPreQueryKillPauseEnabled()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only pauses when the previous cycle was below critical. If the config is enabled while the heap is already critical/panic, the watcher still takes the immediate-kill path until usage drops below critical and rises again, so the advertised dynamic toggle does not take effect during an active incident. Please make the pause arm based on the current config/state, not only on the transition edge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this extreme edge case is worth introducing additional complexity for IMO, it's easier to reason about this way.

&& _previousTriggeringLevel.ordinal() < TriggeringLevel.HeapMemoryCritical.ordinal()) {
// Just transitioned to critical - activate pause instead of killing
activatePause(config.getOomPreQueryKillPauseDurationMs());
} else {
// Pause disabled, or already been through a pause cycle - kill
killMostExpensiveQuery();
}
break;
case CPUTimeBasedKilling:
killCPUTimeExceedQueries();
Expand All @@ -198,8 +251,6 @@ private void collectTriggerMetrics() {
}

private void evalTriggers() {
TriggeringLevel previousTriggeringLevel = _triggeringLevel;

// Compute the new triggering level based on the current heap usage
QueryMonitorConfig config = _queryMonitorConfig.get();
if (_usedBytes > config.getPanicLevel()) {
Expand Down Expand Up @@ -234,7 +285,7 @@ private void evalTriggers() {
}

// Log the triggering level change
if (previousTriggeringLevel != _triggeringLevel) {
if (_previousTriggeringLevel != _triggeringLevel) {
switch (_triggeringLevel) {
case HeapMemoryPanic:
LOGGER.error("Heap used bytes: {} exceeds panic level: {}, killing all queries", _usedBytes,
Expand Down Expand Up @@ -266,6 +317,59 @@ private void evalTriggers() {
}
}

/**
* Activates an OOM pause. Query threads calling {@link #waitIfPaused()} will block until the pause is cleared or the
* deadline expires. Also hints the JVM to run garbage collection.
*/
private void activatePause(long timeoutMs) {
LOGGER.warn("Activating OOM pause for {}ms to allow garbage collection before killing queries. "
+ "Heap used bytes: {}", timeoutMs, _usedBytes);
_pauseDeadlineMs = System.currentTimeMillis() + timeoutMs;
_pauseActive = true;
// Hint the JVM to run GC while query threads are pausing
System.gc();
}

/**
* Clears the OOM pause and wakes all blocked query threads.
*/
@VisibleForTesting
void clearPause() {
Comment thread
yashmayya marked this conversation as resolved.
LOGGER.info("Clearing OOM pause. Heap used bytes: {}", _usedBytes);
_pauseLock.lock();
try {
_pauseActive = false;
_pauseCondition.signalAll();
} finally {
_pauseLock.unlock();
}
}

/**
* Called by query threads at sampling checkpoints. Blocks if an OOM pause is active. Fast-path: single volatile read
* when no pause is active.
* @return {@code true} if the thread entered the slow path (i.e. a pause was active when called), {@code false} if
* the fast-path was taken and the thread did not block.
*/
@VisibleForTesting
boolean waitIfPaused() {
if (!_pauseActive) {
return false;
}
_pauseLock.lock();
try {
while (_pauseActive) {
_pauseCondition.await();
}
} catch (InterruptedException e) {
// Query was killed during pause - restore interrupt flag so the next checkTermination() call detects it
Thread.currentThread().interrupt();
} finally {
_pauseLock.unlock();
}
return true;
}

private void killAllQueries(Iterable<ThreadResourceTrackerImpl> threadTrackers) {
QueryMonitorConfig config = _queryMonitorConfig.get();
if (!config.isOomKillQueryEnabled()) {
Expand Down Expand Up @@ -394,6 +498,21 @@ private void cleanInactive() {
_inactiveQueries.addAll(_untrackedCpuMemUsage.keySet());
}

@VisibleForTesting
boolean isPauseActive() {
return _pauseActive;
}

@VisibleForTesting
TriggeringLevel getTriggeringLevel() {
return _triggeringLevel;
}

@VisibleForTesting
TriggeringLevel getPreviousTriggeringLevel() {
return _previousTriggeringLevel;
}

public long getHeapUsageBytes() {
return _usedBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public void sampleUsage() {
}
}

@Override
public boolean waitIfPaused() {
return _queryResourceAggregator.waitIfPaused();
}

@Override
public void clear() {
ThreadResourceTrackerImpl threadTracker = _threadLocalEntry.get();
Expand Down Expand Up @@ -330,6 +335,8 @@ private void logQueryMonitorConfig(QueryMonitorConfig queryMonitorConfig) {
LOGGER.info("_minMemoryFootprintForKill: {}", queryMonitorConfig.getMinMemoryFootprintForKill());
LOGGER.info("_cpuTimeBasedKillingEnabled: {}", queryMonitorConfig.isCpuTimeBasedKillingEnabled());
LOGGER.info("_cpuTimeBasedKillingThresholdNs: {}", queryMonitorConfig.getCpuTimeBasedKillingThresholdNs());
LOGGER.info("_oomPreQueryKillPauseDurationMs: {}", queryMonitorConfig.getOomPreQueryKillPauseDurationMs());
LOGGER.info("_oomPanicPreQueryKillPauseEnabled: {}", queryMonitorConfig.isOomPanicPreQueryKillPauseEnabled());
LOGGER.info("_workloadSleepTimeMs: {}", queryMonitorConfig.getWorkloadSleepTimeMs());
LOGGER.info("_workloadCostEnforcementEnabled: {}", queryMonitorConfig.isWorkloadCostEnforcementEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private static String getFullyQualifiedConfigName(String config) {
return CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + config;
}

private static final long EXPECTED_OOM_PAUSE_TIMEOUT_MS = 2000;
private static final boolean EXPECTED_OOM_PAUSE_ON_PANIC_ENABLED = true;

@BeforeClass
public void setUp() {
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY),
Expand All @@ -76,6 +79,10 @@ public void setUp() {
Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS),
Long.toString(EXPECTED_OOM_PAUSE_TIMEOUT_MS));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE),
Boolean.toString(EXPECTED_OOM_PAUSE_ON_PANIC_ENABLED));
}

@Test
Expand Down Expand Up @@ -219,4 +226,31 @@ void testQueryKilledMetricEnabledConfigChange() {
CLUSTER_CONFIGS);
assertTrue(accountant.getQueryMonitorConfig().isQueryKilledMetricEnabled());
}

@Test
void testOomPauseTimeoutMsConfigChange() {
PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", InstanceType.SERVER);

assertEquals(accountant.getQueryMonitorConfig().getOomPreQueryKillPauseDurationMs(),
Accounting.DEFAULT_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS);
accountant.getWatcherTask()
.onChange(
Set.of(getFullyQualifiedConfigName(Accounting.CONFIG_OF_OOM_PRE_QUERY_KILL_PAUSE_DURATION_MS)),
CLUSTER_CONFIGS);
assertEquals(accountant.getQueryMonitorConfig().getOomPreQueryKillPauseDurationMs(), EXPECTED_OOM_PAUSE_TIMEOUT_MS);
}

@Test
void testOomPauseOnPanicEnabledConfigChange() {
PerQueryCPUMemResourceUsageAccountant accountant =
new PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", InstanceType.SERVER);

assertFalse(accountant.getQueryMonitorConfig().isOomPanicPreQueryKillPauseEnabled());
accountant.getWatcherTask()
.onChange(
Set.of(getFullyQualifiedConfigName(Accounting.CONFIG_OF_OOM_PANIC_ALLOW_PRE_QUERY_KILL_PAUSE)),
CLUSTER_CONFIGS);
assertTrue(accountant.getQueryMonitorConfig().isOomPanicPreQueryKillPauseEnabled());
}
}
Loading
Loading