Skip to content

Commit 75e053c

Browse files
author
gaoxin
committed
Offset auto reset
1 parent 4abddfc commit 75e053c

22 files changed

Lines changed: 802 additions & 105 deletions

File tree

pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929

3030
public class LLCSegmentName implements Comparable<LLCSegmentName> {
31-
private static final String SEPARATOR = "__";
31+
public static final String SEPARATOR = "__";
3232
private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
3333
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
3434

@@ -37,30 +37,53 @@ public class LLCSegmentName implements Comparable<LLCSegmentName> {
3737
private final int _sequenceNumber;
3838
private final String _creationTime;
3939
private final String _segmentName;
40+
private final String _topicName;
4041

4142
public LLCSegmentName(String segmentName) {
4243
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
43-
Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName);
44+
Preconditions.checkArgument(
45+
parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: %s", segmentName);
4446
_tableName = parts[0];
45-
_partitionGroupId = Integer.parseInt(parts[1]);
46-
_sequenceNumber = Integer.parseInt(parts[2]);
47-
_creationTime = parts[3];
47+
if (parts.length == 4) {
48+
_topicName = "";
49+
_partitionGroupId = Integer.parseInt(parts[1]);
50+
_sequenceNumber = Integer.parseInt(parts[2]);
51+
_creationTime = parts[3];
52+
} else {
53+
_topicName = parts[1];
54+
_partitionGroupId = Integer.parseInt(parts[2]);
55+
_sequenceNumber = Integer.parseInt(parts[3]);
56+
_creationTime = parts[4];
57+
}
4858
_segmentName = segmentName;
4959
}
5060

5161
public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
62+
this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch);
63+
}
64+
65+
public LLCSegmentName(
66+
String tableName, String topicName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
5267
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName);
5368
_tableName = tableName;
69+
_topicName = topicName;
5470
_partitionGroupId = partitionGroupId;
5571
_sequenceNumber = sequenceNumber;
5672
// ISO8601 date: 20160120T1234Z
5773
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
58-
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
74+
if ("".equals(topicName)) {
75+
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
76+
} else {
77+
_segmentName =
78+
tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR
79+
+ _creationTime;
80+
}
5981
}
6082

6183
private LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, String creationTime,
6284
String segmentName) {
6385
_tableName = tableName;
86+
_topicName = "";
6487
_partitionGroupId = partitionGroupId;
6588
_sequenceNumber = sequenceNumber;
6689
_creationTime = creationTime;
@@ -74,23 +97,17 @@ private LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumbe
7497
@Nullable
7598
public static LLCSegmentName of(String segmentName) {
7699
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
77-
if (parts.length != 4) {
100+
if (parts.length < 4 || parts.length > 5) {
78101
return null;
79102
}
80-
return new LLCSegmentName(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), parts[3], segmentName);
103+
return new LLCSegmentName(segmentName);
81104
}
82105

83106
/**
84107
* Returns whether the given segment name represents an LLC segment.
85108
*/
86109
public static boolean isLLCSegment(String segmentName) {
87-
int numSeparators = 0;
88-
int index = 0;
89-
while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
90-
numSeparators++;
91-
index += 2; // SEPARATOR.length()
92-
}
93-
return numSeparators == 3;
110+
return of(segmentName) != null;
94111
}
95112

96113
@Deprecated
@@ -102,17 +119,34 @@ public static boolean isLowLevelConsumerSegmentName(String segmentName) {
102119
* Returns the sequence number of the given segment name.
103120
*/
104121
public static int getSequenceNumber(String segmentName) {
105-
return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
122+
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
123+
if (parts.length == 4) {
124+
return Integer.parseInt(parts[2]);
125+
} else {
126+
return Integer.parseInt(parts[3]);
127+
}
106128
}
107129

108130
public String getTableName() {
109131
return _tableName;
110132
}
111133

134+
public String getTopicName() {
135+
return _topicName;
136+
}
137+
112138
public int getPartitionGroupId() {
113139
return _partitionGroupId;
114140
}
115141

142+
public String getPartitionGroupInfo() {
143+
if (_topicName.isEmpty()) {
144+
return String.valueOf(_partitionGroupId);
145+
} else {
146+
return _topicName + SEPARATOR + _partitionGroupId;
147+
}
148+
}
149+
116150
public int getSequenceNumber() {
117151
return _sequenceNumber;
118152
}
@@ -134,6 +168,9 @@ public String getSegmentName() {
134168
public int compareTo(LLCSegmentName other) {
135169
Preconditions.checkArgument(_tableName.equals(other._tableName),
136170
"Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName());
171+
if (!_topicName.equals(other._topicName)) {
172+
return StringUtils.compare(_topicName, other._topicName);
173+
}
137174
if (_partitionGroupId != other._partitionGroupId) {
138175
return Integer.compare(_partitionGroupId, other._partitionGroupId);
139176
}

pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
115115
import org.apache.pinot.controller.validation.DiskUtilizationChecker;
116116
import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
117+
import org.apache.pinot.controller.validation.RealtimeOffsetAutoResetManager;
117118
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
118119
import org.apache.pinot.controller.validation.ResourceUtilizationChecker;
119120
import org.apache.pinot.controller.validation.ResourceUtilizationManager;
@@ -185,6 +186,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
185186
// Can only be constructed after resource manager getting started
186187
protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
187188
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
189+
protected RealtimeOffsetAutoResetManager _realtimeOffsetAutoResetManager;
188190
protected BrokerResourceValidationManager _brokerResourceValidationManager;
189191
protected SegmentRelocator _segmentRelocator;
190192
protected RetentionManager _retentionManager;
@@ -373,6 +375,10 @@ public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
373375
return _realtimeSegmentValidationManager;
374376
}
375377

378+
public RealtimeOffsetAutoResetManager getRealtimeOffsetAutoResetManager() {
379+
return _realtimeOffsetAutoResetManager;
380+
}
381+
376382
public BrokerResourceValidationManager getBrokerResourceValidationManager() {
377383
return _brokerResourceValidationManager;
378384
}
@@ -850,6 +856,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
850856
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker,
851857
_resourceUtilizationManager);
852858
periodicTasks.add(_realtimeSegmentValidationManager);
859+
_realtimeOffsetAutoResetManager =
860+
new RealtimeOffsetAutoResetManager(_config, _helixResourceManager, _leadControllerManager,
861+
_pinotLLCRealtimeSegmentManager, _controllerMetrics);
853862
_brokerResourceValidationManager =
854863
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
855864
periodicTasks.add(_brokerResourceValidationManager);

pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ public static class ControllerPeriodicTasksConf {
112112
"controller.realtime.segment.validation.frequencyPeriod";
113113
public static final String REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
114114
"controller.realtime.segment.validation.initialDelayInSeconds";
115+
public static final String REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD =
116+
"controller.realtime.offsetAutoReset.frequencyPeriod";
117+
public static final String REALTIME_OFFSET_AUTO_RESET_INITIAL_DELAY_IN_SECONDS =
118+
"controller.realtime.offsetAutoReset.initialDelayInSeconds";
115119
// Deprecated as of 0.8.0
116120
@Deprecated
117121
public static final String DEPRECATED_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -268,6 +272,7 @@ public static long getRandomInitialDelayInSeconds() {
268272
public static final int DEFAULT_RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
269273
public static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
270274
public static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
275+
public static final int DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
271276
public static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
272277
public static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
273278
public static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
@@ -657,6 +662,16 @@ public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequen
657662
Integer.toString(validationFrequencyInSeconds));
658663
}
659664

665+
public int getRealtimeOffsetAutoResetFrequencyInSeconds() {
666+
return getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD,
667+
ControllerPeriodicTasksConf.DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS);
668+
}
669+
670+
public void setRealtimeOffsetAutoResetFrequencyInSeconds(int offsetAutoResetFrequencyInSeconds) {
671+
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD,
672+
Integer.toString(offsetAutoResetFrequencyInSeconds));
673+
}
674+
660675
/**
661676
* Return <code>controller.broker.resource.validation.frequencyPeriod</code> or
662677
* <code>controller.broker.resource.validation.frequencyInSeconds</code> or the default broker resource validation
@@ -1107,6 +1122,11 @@ public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
11071122
getPeriodicTaskInitialDelayInSeconds());
11081123
}
11091124

1125+
public long getRealtimeOffsetAutoResetInitialDelaySeconds() {
1126+
return getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_INITIAL_DELAY_IN_SECONDS,
1127+
getPeriodicTaskInitialDelayInSeconds());
1128+
}
1129+
11101130
public boolean isDeepStoreRetryUploadLLCSegmentEnabled() {
11111131
return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
11121132
}

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,13 @@ private Constants() {
5353
public static final String APP_CONFIGS = "AppConfigs";
5454
public static final String PERIODIC_TASK_TAG = "PeriodicTask";
5555
public static final String UPSERT_RESOURCE_TAG = "Upsert";
56+
public static final String RESET_OFFSET_FROM = "ResetOffsetFrom";
57+
public static final String RESET_OFFSET_TO = "ResetOffsetTo";
58+
public static final String RESET_OFFSET_TOPIC_NAME = "ResetOffsetTopicName";
59+
public static final String RESET_OFFSET_TOPIC_PARTITION = "ResetOffsetTopicPartition";
5660

5761
public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = "RealtimeSegmentValidationManager";
62+
public static final String REALTIME_OFFSET_AUTO_RESET_MANAGER = "RealtimeOffsetAutoResetManager";
5863

5964
public static TableType validateTableType(String tableTypeStr) {
6065
if (StringUtils.isBlank(tableTypeStr)) {

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,6 +2038,7 @@ public void updateTableConfig(TableConfig tableConfig)
20382038
throws IOException {
20392039
validateTableTenantConfig(tableConfig);
20402040
validateTableTaskMinionInstanceTagConfig(tableConfig);
2041+
// TODO: make sure external table update would not remove the ephemeral backfill topics
20412042
setExistingTableConfig(tableConfig);
20422043
}
20432044

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.controller.helix.core.periodictask;
20+
21+
import java.util.Collection;
22+
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
23+
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
24+
import org.apache.pinot.spi.stream.StreamConfig;
25+
26+
27+
public abstract class RealtimeOffsetAutoResetHandler {
28+
protected PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
29+
protected PinotHelixResourceManager _pinotHelixResourceManager;
30+
31+
public RealtimeOffsetAutoResetHandler(PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
32+
PinotHelixResourceManager pinotHelixResourceManager) {
33+
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
34+
_pinotHelixResourceManager = pinotHelixResourceManager;
35+
}
36+
37+
/**
38+
* Trigger the job to backfill the skipped interval due to offset auto reset.
39+
* It is expected to backfill the [fromOffset, toOffset) interval.
40+
* @return if successfully started the backfill job
41+
*/
42+
public abstract boolean triggerBackfillJob(
43+
String tableNameWithType, StreamConfig streamConfig, String topicName, int partitionId, long fromOffset,
44+
long toOffset);
45+
46+
/**
47+
* Ensure all topics under the table are being backfilled. It is the caller's responsibility to figure out what
48+
* topic set it should check.
49+
*/
50+
public abstract void ensureBackfillJobsRunning(String tableNameWithType, Collection<String> topicNames);
51+
52+
/**
53+
* Return the Collection of completed and cleaned up topicNames from the input.
54+
*/
55+
public abstract Collection<String> cleanupCompletedBackfillJobs(
56+
String tableNameWithType, Collection<String> topicNames);
57+
}

0 commit comments

Comments
 (0)