Skip to content

Commit 4e6f930

Browse files
authored
fix: fix broker segment metadata cache refresh (#19625)
Brokers can maintain a schema cache via segment metadata queries. Currently, if any of these queries timeout, the remaining queries are aborted until the next refresh. If you have a huge datasource delta (think 500k+ segments being scanned), such a query can fail/timeout and cause other unrelated datasources' broker schema discovery to fail. Without centralized schema through coordinator, there is no intra-datasource atomicity guarantee w.r.t schema discovery (it is just ASAP), so decoupling this error dependency and instead emitting a metric per datasource when failures occur. Introduces segment/schemaCache/refresh/failed metric with a dataSource dimension, emitted when a refresh fails. Can alternatively just aggregate and emit at the end. Also open to keeping this a warning/error log.
1 parent 60e3487 commit 4e6f930

4 files changed

Lines changed: 156 additions & 1 deletion

File tree

docs/operations/metrics.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
7474
|`segment/metadataCache/sync/time`|Time taken to poll segment metadata from the Coordinator and update the segment metadata cache. This metric is emitted only if [metadata cache](../configuration/index.md#sql) is enabled on the Broker.||Depends on the number of segments.|
7575
|`segment/schemaCache/refresh/count`|Number of segments refreshed in broker segment schema cache.|`dataSource`||
7676
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in broker segment schema cache.|`dataSource`||
77+
|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema refresh failed in the broker segment schema cache (for example, a segment metadata query timeout). Emitted only when a refresh fails; the failed dataSource is skipped for the cycle and retried later, while other dataSources are unaffected. Recurring emission indicates a dataSource missing from the SQL schema.|`dataSource`||
7778
|`segment/schemaCache/poll/count`|Number of coordinator polls to fetch datasource schema.|||
7879
|`segment/schemaCache/poll/failed`|Number of failed coordinator polls to fetch datasource schema.|||
7980
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
@@ -475,6 +476,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
475476
|`segment/schemaCache/refreshSkipped/count`|Number of segments for which schema refresh was skipped due to presence of segment metadata in datasource polled from coordinator.|`dataSource`||
476477
|`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`||
477478
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`||
479+
|`segment/schemaCache/refresh/failed`|Number of dataSources whose schema refresh failed in the coordinator segment schema cache (for example, a segment metadata query timeout). Emitted only when a refresh fails; the failed dataSource is skipped for the cycle and retried later, while other dataSources are unaffected. Recurring emission indicates a dataSource missing from the SQL schema.|`dataSource`||
478480
|`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`||
479481
|`segment/schemaCache/rowSignature/changed`|Emitted when the cached row signature on the Broker's segment metadata cache for a datasource changes, indicating schema evolution or some form of flapping.|`dataSource`||
480482
|`segment/schemaCache/rowSignature/column/count`|Number of columns in the row signature on the Broker's segment metadata cache for a datasource when it's initialized or updated.|`dataSource`||

server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
4646
import org.apache.druid.query.DruidMetrics;
4747
import org.apache.druid.query.QueryContexts;
48+
import org.apache.druid.query.QueryInterruptedException;
4849
import org.apache.druid.query.TableDataSource;
4950
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
5051
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
@@ -717,13 +718,43 @@ public Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOEx
717718
.add(segmentId);
718719
}
719720

721+
// Refresh each dataSource independently so one failure (e.g. a metadata query timeout) does not
722+
// abort the cycle and starve the rest.
720723
for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
721-
updatedSegmentIds.addAll(refreshSegmentsForDataSource(entry.getKey(), entry.getValue()));
724+
final String dataSource = entry.getKey();
725+
try {
726+
updatedSegmentIds.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
727+
}
728+
catch (QueryInterruptedException e) {
729+
// QueryInterruptedException also wraps ordinary query failures, not just interruption
730+
// Don't emit failures for interrupted exceptions (shutdown signal, etc.)
731+
if (e.getCause() instanceof InterruptedException) {
732+
Thread.currentThread().interrupt();
733+
throw e;
734+
}
735+
recordDataSourceRefreshFailure(dataSource, e);
736+
}
737+
catch (Exception e) {
738+
recordDataSourceRefreshFailure(dataSource, e);
739+
}
722740
}
723741

724742
return updatedSegmentIds;
725743
}
726744

745+
/**
746+
* Records a refresh failure for one dataSource; it is skipped this cycle and retried later while the rest continue.
747+
*/
748+
private void recordDataSourceRefreshFailure(String dataSource, Exception e)
749+
{
750+
log.warn(e, "Failed to refresh segment schema for dataSource[%s]; skipping it this cycle.", dataSource);
751+
emitMetric(
752+
Metric.REFRESH_FAILED,
753+
1,
754+
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource)
755+
);
756+
}
757+
727758
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
728759
{
729760
if (servers.isEmpty()) {

server/src/main/java/org/apache/druid/segment/metadata/Metric.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class Metric
5252
public static final String REFRESHED_SEGMENTS = PREFIX + "refresh/count";
5353
public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + "refresh/tombstone/count";
5454
public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time";
55+
public static final String REFRESH_FAILED = PREFIX + "refresh/failed";
5556
public static final String DATASOURCE_REMOVED = PREFIX + "dataSource/removed";
5657
public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + "rowSignature/changed";
5758
public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + "rowSignature/column/count";

server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.druid.java.util.common.Pair;
3838
import org.apache.druid.java.util.common.StringUtils;
3939
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
40+
import org.apache.druid.java.util.common.guava.Sequence;
4041
import org.apache.druid.java.util.common.guava.Sequences;
4142
import org.apache.druid.java.util.emitter.EmittingLogger;
4243
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -47,6 +48,8 @@
4748
import org.apache.druid.metadata.TestDerbyConnector;
4849
import org.apache.druid.query.DruidMetrics;
4950
import org.apache.druid.query.QueryContexts;
51+
import org.apache.druid.query.QueryInterruptedException;
52+
import org.apache.druid.query.QueryTimeoutException;
5053
import org.apache.druid.query.TableDataSource;
5154
import org.apache.druid.query.aggregation.CountAggregatorFactory;
5255
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -112,6 +115,7 @@
112115
import java.util.Set;
113116
import java.util.concurrent.CountDownLatch;
114117
import java.util.concurrent.TimeUnit;
118+
import java.util.concurrent.atomic.AtomicReference;
115119
import java.util.stream.Collectors;
116120

117121
public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetadataCacheTestBase
@@ -2455,4 +2459,121 @@ private void verifyFoo2DSSchema(CoordinatorSegmentMetadataCache schema)
24552459
Assert.assertEquals("m1", columnNames.get(2));
24562460
Assert.assertEquals(ColumnType.LONG, fooRowSignature.getColumnType(columnNames.get(2)).get());
24572461
}
2462+
2463+
/**
2464+
* A failure refreshing one dataSource (e.g. a SegmentMetadataQuery timeout) must not abort the whole
2465+
* refresh cycle: other dataSources are still refreshed, and a {@code segment/schemaCache/refresh/failed}
2466+
* metric is emitted for the failing dataSource.
2467+
*/
2468+
@Test
2469+
public void testRefreshFailureForOneDatasourceIsIsolated() throws InterruptedException, IOException
2470+
{
2471+
final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
2472+
final CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
2473+
getQueryLifecycleFactory(walker),
2474+
serverView,
2475+
SEGMENT_CACHE_CONFIG_DEFAULT,
2476+
new NoopEscalator(),
2477+
new InternalQueryConfig(),
2478+
emitter,
2479+
segmentSchemaCache,
2480+
backFillQueue,
2481+
segmentsMetadataManager,
2482+
segmentsMetadataManagerConfigSupplier
2483+
)
2484+
{
2485+
@Override
2486+
public Sequence<SegmentAnalysis> runSegmentMetadataQuery(Iterable<SegmentId> segments)
2487+
{
2488+
// Simulate a metadata query that times out for DATASOURCE1 but succeeds for everything else.
2489+
final SegmentId first = segments.iterator().next();
2490+
if (DATASOURCE1.equals(first.getDataSource())) {
2491+
throw new QueryTimeoutException("test-induced timeout for " + DATASOURCE1);
2492+
}
2493+
return super.runSegmentMetadataQuery(segments);
2494+
}
2495+
};
2496+
2497+
schema.onLeaderStart();
2498+
schema.awaitInitialization();
2499+
2500+
final Set<SegmentId> allSegmentIds = schema.getSegmentMetadataSnapshot().keySet();
2501+
emitter.flush();
2502+
2503+
// Must not propagate the DATASOURCE1 failure.
2504+
final Set<SegmentId> refreshed = schema.refreshSegments(allSegmentIds);
2505+
2506+
// The healthy dataSources were still refreshed despite DATASOURCE1 failing.
2507+
Assert.assertTrue(
2508+
"expected a refreshed segment from " + DATASOURCE2,
2509+
refreshed.stream().anyMatch(id -> DATASOURCE2.equals(id.getDataSource()))
2510+
);
2511+
Assert.assertTrue(
2512+
"expected a refreshed segment from " + SOME_DATASOURCE,
2513+
refreshed.stream().anyMatch(id -> SOME_DATASOURCE.equals(id.getDataSource()))
2514+
);
2515+
// The failing dataSource produced no refreshed segments.
2516+
Assert.assertTrue(
2517+
"expected no refreshed segments from the failing " + DATASOURCE1,
2518+
refreshed.stream().noneMatch(id -> DATASOURCE1.equals(id.getDataSource()))
2519+
);
2520+
2521+
// A failure metric was emitted, dimensioned by the failing dataSource.
2522+
final List<Number> failures = emitter.getMetricValues(
2523+
Metric.REFRESH_FAILED,
2524+
ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
2525+
);
2526+
Assert.assertFalse("expected a refresh/failed metric for " + DATASOURCE1, failures.isEmpty());
2527+
Assert.assertEquals(1, failures.get(0).intValue());
2528+
}
2529+
2530+
@Test
2531+
public void testLocalInterruptionPropagatesButWrappedQueryFailureIsIsolated() throws IOException
2532+
{
2533+
final StubServiceEmitter emitter = new StubServiceEmitter("test", "test");
2534+
final AtomicReference<Throwable> causeRef = new AtomicReference<>();
2535+
final CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
2536+
getQueryLifecycleFactory(walker),
2537+
serverView,
2538+
SEGMENT_CACHE_CONFIG_DEFAULT,
2539+
new NoopEscalator(),
2540+
new InternalQueryConfig(),
2541+
emitter,
2542+
segmentSchemaCache,
2543+
backFillQueue,
2544+
segmentsMetadataManager,
2545+
segmentsMetadataManagerConfigSupplier
2546+
)
2547+
{
2548+
@Override
2549+
public Sequence<SegmentAnalysis> runSegmentMetadataQuery(Iterable<SegmentId> segments)
2550+
{
2551+
throw new QueryInterruptedException(causeRef.get());
2552+
}
2553+
};
2554+
2555+
final Set<SegmentId> segments = ImmutableSet.of(
2556+
SegmentId.of(DATASOURCE1, Intervals.of("2000/2001"), "v1", 0)
2557+
);
2558+
2559+
// Genuine local interruption: propagate, restore the interrupt flag, record no failure.
2560+
causeRef.set(new InterruptedException("test interrupt"));
2561+
Assert.assertThrows(QueryInterruptedException.class, () -> schema.refreshSegments(segments));
2562+
Assert.assertTrue("interrupt flag should be restored", Thread.interrupted()); // also clears it for the next case
2563+
Assert.assertTrue(
2564+
"local interruption must not emit a refresh/failed metric",
2565+
emitter.getMetricEvents(Metric.REFRESH_FAILED).isEmpty()
2566+
);
2567+
2568+
// Wrapped ordinary failure (no InterruptedException cause): isolate it - no propagation, failure recorded.
2569+
causeRef.set(new RuntimeException("wrapped query failure"));
2570+
schema.refreshSegments(segments); // must not throw
2571+
Assert.assertFalse("interrupt flag must not be set for a wrapped failure", Thread.currentThread().isInterrupted());
2572+
final List<Number> failures = emitter.getMetricValues(
2573+
Metric.REFRESH_FAILED,
2574+
ImmutableMap.of(DruidMetrics.DATASOURCE, DATASOURCE1)
2575+
);
2576+
Assert.assertFalse("a wrapped query failure should emit refresh/failed", failures.isEmpty());
2577+
Assert.assertEquals(1, failures.get(0).intValue());
2578+
}
24582579
}

0 commit comments

Comments
 (0)