Skip to content

Commit 711e632

Browse files
authored
Implement a fingerprinting mechanism to track compaction states in a more efficient manner (#18844)
* meatadata store bits part 1 * annotate segments with compaction fingerprint before persist * Add ability to generate compaction state fingerprint * add fingerprint to task context and make legacy last compaction state storage configurable * update embedded tests for compaction supervisors to flex fingerprints * checkpoint with persisting compaction states * add duty to clean up unused compaction states * take fingerprints into account in CompactionStatus * Add and improve tests * get rid of some todo comments * fix checkstyle * cleanup some more TODO * Add some docs * update web console * make cache size configurable and fix some spelling * fixup use of deprecated builder * fix checktyle * fix coordinator compactsegments duty and respond to self review comments * fix spellchecker * predates is a word * improve some javadocs * simplify some test assertions based on review * better naming * controller impl cleanup * For compaction supervisors, take persisting pending compaction states out of hot path * use Configs.valueOrDefault helper in data segment * Refactor where fingerprinting happens and how the object mapper is wired up * refactor CompactionStateManager into an interface with a persisted and heap impl * remove fingerprinting support from the coordinator compact segments duty * Move on heap compaction state manager to test sources * CompactionStateManager is now overlord only * Refactor how the compaction state fingerprint cache is wired up * prettify * small changes after self-review * Cleanup CompactionStateCache per review * compactionstatemanager to compactionstatestorage plus refactor * Add compaction state added and deleted metrics * improve queries for compaction state cache sync * clean up doc wording * Miscl. cleanup from review * some metadata store code cleanup * refactor id out of the compaction states table as it is superflous * Some CompactionStatus cleanup * Migrate the location of creating a compaction state from config * More refactoring per review * refactor to remove duplicate fingerprint generator code * Do some consolidation of fingerprint related classes to clean up code * minor cleanup * fix fobidden api use * Improvements and cleanup to the fingerprint and state persist + cache * Refactor where in the code compaction fingerprints are generated * Formalize unique constraint exception check in sqlmetadataconnector and db specific impls * some naming cleanup * Migrate the compaction state cleanup duty to the overlord * Blow up the compaction supervisor scheduler if incremental caching is disabled * add some strict input sanitization in upserting compaction fingerprints * cleanup test class * Add pending flag to compaction state to prevent potentially destructive early cleanup * Refactor database naming to use indexingState instead of compactionState * Refactor naming to IndexingState for the metadata cleanup duty * refresh some docs * fixup tests * Refactoring name of CompactionStateCache to IndexingStateCache * Rename CompactionStateStorage to IndexingStateStorage * Refactor compactionStateFingerprint out of the code in favor of indexingStateFingerprint * Refactor FingerprintMapper name to remove compaction for indexing state * refactorings after self review * fixup a few things post merge with master * Cleanup and refactor after code review round Batch marking of indexing states as active to avoid chained updates where only one is needed Build segments table missing columns error column by column refactor how we are configuring and executing the ol metadata cleanup duties. fix missed naming refactor Improve readability of upsertIndexingState Fixup SqlIndexingStateStorage constructor drop default impl of isUniqueConstraintViolation Refactor how the deterministic mapper is handled for reindexing * cleanup * use effective state for dimspec and indexspec for reindexing fingerprinting * Only call into running checks if there are unknown states to check
1 parent b1aa660 commit 711e632

108 files changed

Lines changed: 5269 additions & 168 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222
import com.google.common.collect.ImmutableList;
2323
import org.apache.druid.client.DataSourcesSnapshot;
24+
import org.apache.druid.jackson.DefaultObjectMapper;
2425
import org.apache.druid.java.util.common.DateTimes;
26+
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
27+
import org.apache.druid.segment.metadata.NoopIndexingStateCache;
2528
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
2629
import org.apache.druid.server.compaction.CompactionSegmentIterator;
2730
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
@@ -135,7 +138,8 @@ public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
135138
policy,
136139
compactionConfigs,
137140
dataSources,
138-
Collections.emptyMap()
141+
Collections.emptyMap(),
142+
new DefaultIndexingStateFingerprintMapper(new NoopIndexingStateCache(), new DefaultObjectMapper())
139143
);
140144
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
141145
blackhole.consume(iterator.next());

docs/api-reference/automatic-compaction-api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,7 @@ This includes the following fields:
889889
|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the only supported policy is [Newest segment first](#compaction-policy-newestsegmentfirst).|Newest segment first|
890890
|`useSupervisors`|Whether compaction should be run on Overlord using supervisors instead of Coordinator duties.|false|
891891
|`engine`|Engine used for running compaction tasks, unless overridden in the datasource-level compaction config. Possible values are `native` and `msq`. `msq` engine can be used for compaction only if `useSupervisors` is `true`.|`native`|
892+
|`storeCompactionStatePerSegment`|**This configuration only takes effect if `useSupervisors` is `true`.** Whether to persist the full compaction state in segment metadata. When `true` (default), compaction state is stored in both the segment metadata and the indexing states table. This is historically how Druid has worked. When `false`, only a fingerprint reference is stored in the segment metadata, reducing storage overhead in the segments table. The actual compaction state is stored in the indexing states table and can be referenced with the aforementioned fingerprint. Eventually this configuration will be removed and all compaction will use the fingerprint method only. This configuration exists for operators to opt into this future pattern early. **WARNING: if you set this to false and then compact data, rolling back to a Druid version that predates indexing state fingerprinting (< Druid 37) will result in missing compaction states and trigger compaction on segments that may already be compacted.**|`true`|
892893

893894
#### Compaction policy `newestSegmentFirst`
894895

docs/configuration/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ These properties specify the JDBC connection and other configuration around the
389389
|`druid.metadata.storage.tables.segments`|The table to use to look for segments.|`druid_segments`|
390390
|`druid.metadata.storage.tables.rules`|The table to use to look for segment load/drop rules.|`druid_rules`|
391391
|`druid.metadata.storage.tables.config`|The table to use to look for configs.|`druid_config`|
392+
|`druid.metadata.storage.tables.indexingStates`|The table that stores indexing state payloads and fingerprints.|`druid_indexingStates`|
392393
|`druid.metadata.storage.tables.tasks`|Used by the indexing service to store tasks.|`druid_tasks`|
393394
|`druid.metadata.storage.tables.taskLog`|Used by the indexing service to store task logs.|`druid_tasklogs`|
394395
|`druid.metadata.storage.tables.taskLock`|Used by the indexing service to store task locks.|`druid_tasklocks`|

docs/data-management/automatic-compaction.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,18 @@ You can run automatic compaction using compaction supervisors on the Overlord ra
241241
* Can use either the native compaction engine or the [MSQ task engine](#use-msq-for-auto-compaction)
242242
* More reactive and submits tasks as soon as a compaction slot is available
243243
* Tracked compaction task status to avoid re-compacting an interval repeatedly
244+
* Uses new Indexing State Fingerprinting mechanisms to store less data per segment in metadata storage
244245

245246

246-
To use compaction supervisors, update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set:
247+
To use compaction supervisors, the following configuration requirements must be met:
247248

248-
* `useSupervisors` to `true` so that compaction tasks can be run as supervisor tasks
249-
* `engine` to `msq` to use the MSQ task engine as the compaction engine or to `native` (default value) to use the native engine.
249+
* You must be using incremental segment metadata caching:
250+
* `druid.manager.segments.useIncrementalCache` set to `always` or `ifSynced` in your Overlord and Coordinator runtime properties.
251+
* See [Segment metadata caching](../configuration/index.md#metadata-retrieval) for full configuration documentation.
252+
253+
* update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set:
254+
* `useSupervisors` to `true` so that compaction tasks can be run as supervisor tasks
255+
* `engine` to `msq` to use the MSQ task engine as the compaction engine or to `native` (default value) to use the native engine.
250256

251257
Compaction supervisors use the same syntax as auto-compaction using Coordinator duties with one key difference: you submit the auto-compaction as a supervisor spec. In the spec, set the `type` to `autocompact` and include the auto-compaction config in the `spec`.
252258

docs/operations/clean-metadata-store.md

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ The metadata store includes the following:
3434
- Compaction configuration records
3535
- Datasource records created by supervisors
3636
- Indexer task logs
37+
- Indexing State records
3738

3839
When you delete some entities from Apache Druid, records related to the entity may remain in the metadata store.
3940
If you have a high datasource churn rate, meaning you frequently create and delete many short-lived datasources or other related entities like compaction configuration or rules, the leftover records can fill your metadata store and cause performance issues.
@@ -59,7 +60,7 @@ If you have compliance requirements to keep audit records and you enable automat
5960
## Configure automated metadata cleanup
6061

6162
You can configure cleanup for each entity separately, as described in this section.
62-
Define the properties in the `coordinator/runtime.properties` file.
63+
Unless otherwise specified, define the properties in the `coordinator/runtime.properties` file.
6364

6465
The cleanup of one entity may depend on the cleanup of another entity as follows:
6566
- You have to configure a [kill task for segment records](#segment-records-and-segments-in-deep-storage-kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records).
@@ -131,6 +132,27 @@ Compaction configuration cleanup uses the following configuration:
131132
If you already have an extremely large compaction configuration, you may not be able to delete compaction configuration due to size limits with the audit log. In this case you can set `druid.audit.manager.maxPayloadSizeBytes` and `druid.audit.manager.skipNullField` to avoid the auditing issue. See [Audit logging](../configuration/index.md#audit-logging).
132133
:::
133134

135+
### Indexing State Records
136+
137+
:::info
138+
Indexing State Records are cleaned up by the overlord. Therefore, this section should be configured in the `overlord/runtime.properties` file.
139+
:::
140+
141+
:::info
142+
Indexing State Records are only created if you are using automatic compaction supervisors.
143+
:::
144+
145+
Indexing State records become eligible for deletion in the following scenarios:
146+
- When no `used` segments have an `indexing_state_fingerprint` that is equal to the `fingerprint` of the record.
147+
- When a record has `pending` state set to `true`
148+
149+
Indexing State cleanup uses the following configuration:
150+
- `druid.overlord.kill.indexingStates.on`: When `true`, enables cleanup for indexing state records.
151+
- `druid.overlord.kill.indexingStates.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible indexing state records. Defaults to `P1D`.
152+
- `druid.overlord.kill.indexingStates.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after indexing state records are marked as `used=false` become eligible for deletion. Defaults to `P7D`.
153+
- `druid.overlord.kill.indexingStates.pendingDurationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that pending indexing state records become eligible for deletion. Defaults to `P7D`.
154+
- It is recommended that this value be greater than the maximum expected duration of compaction tasks to avoid pending records being deleted prematurely.
155+
134156
### Datasource records created by supervisors
135157

136158
Datasource records created by supervisors become eligible for deletion when the supervisor is terminated or does not exist in the `druid_supervisors` table and the `durationToRetain` time has passed since their creation.
@@ -160,7 +182,9 @@ For more detail, see [Task logging](../configuration/index.md#task-logging).
160182
## Disable automated metadata cleanup
161183

162184
Druid automatically cleans up metadata records, excluding compaction configuration records and indexer task logs.
163-
To disable automated metadata cleanup, set the following properties in the `coordinator/runtime.properties` file:
185+
To disable automated metadata cleanup
186+
187+
set the following properties in the `coordinator/runtime.properties` file:
164188

165189
```properties
166190
# Keep unused segments
@@ -178,11 +202,19 @@ druid.coordinator.kill.rule.on=false
178202
# Keep datasource records created by supervisors
179203
druid.coordinator.kill.datasource.on=false
180204
```
205+
set the following properties in the `overlord/runtime.properties` file:
206+
207+
```properties
208+
# Keep indexing state records
209+
druid.overlord.kill.indexingStates.on=false
210+
```
181211

182212
## Example configuration for automated metadata cleanup
183213

184214
Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:
185215

216+
Coordinator configuration (`coordinator/runtime.properties`):
217+
186218
```properties
187219
...
188220
# Schedule the metadata management store task for every hour:
@@ -226,6 +258,18 @@ druid.coordinator.kill.datasource.durationToRetain=P4D
226258
...
227259
```
228260

261+
Overlord configuration - if using automatic compaction supervisors (`overlord/runtime.properties`):
262+
263+
```properties
264+
...
265+
# Poll every day to delete pending or unreferenced indexing state records > 4 days old
266+
druid.overlord.kill.indexingStates.on=true
267+
druid.overlord.kill.indexingStates.period=P1D
268+
druid.overlord.kill.indexingStates.durationToRetain=P4D
269+
druid.overlord.kill.indexingStates.pendingDurationToRetain=P4D
270+
...
271+
```
272+
229273
## Learn more
230274
See the following topics for more information:
231275
- [Metadata management](../configuration/index.md#metadata-management) for metadata store configuration reference.

embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,7 +1508,7 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th
15081508
@ParameterizedTest(name = "useSupervisors={0}")
15091509
public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception
15101510
{
1511-
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
1511+
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));
15121512

15131513
loadData(INDEX_TASK);
15141514
try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -1552,7 +1552,7 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce
15521552
@ParameterizedTest(name = "useSupervisors={0}")
15531553
public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception
15541554
{
1555-
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
1555+
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));
15561556

15571557
loadData(INDEX_TASK);
15581558
try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -1854,7 +1854,7 @@ private void forceTriggerAutoCompaction(
18541854
).collect(Collectors.toList())
18551855
);
18561856
updateClusterConfig(
1857-
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null)
1857+
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null, true)
18581858
);
18591859

18601860
// Wait for scheduler to pick up the compaction job
@@ -1864,7 +1864,7 @@ private void forceTriggerAutoCompaction(
18641864

18651865
// Disable all compaction
18661866
updateClusterConfig(
1867-
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null)
1867+
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null, true)
18681868
);
18691869
} else {
18701870
forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
@@ -1956,7 +1956,8 @@ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCom
19561956
maxCompactionTaskSlots,
19571957
oldConfig.getCompactionPolicy(),
19581958
oldConfig.isUseSupervisors(),
1959-
oldConfig.getEngine()
1959+
oldConfig.getEngine(),
1960+
oldConfig.isStoreCompactionStatePerSegment()
19601961
)
19611962
);
19621963

0 commit comments

Comments
 (0)