Skip to content

Commit 436e5ad

Browse files
author
gaoxin
committed
Offset auto reset
1 parent 75fb18b commit 436e5ad

23 files changed

Lines changed: 1376 additions & 157 deletions

File tree

pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030

3131
public class LLCSegmentName implements Comparable<LLCSegmentName> {
32-
private static final String SEPARATOR = "__";
32+
public static final String SEPARATOR = "__";
3333
private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
3434
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
3535

@@ -38,25 +38,59 @@ public class LLCSegmentName implements Comparable<LLCSegmentName> {
3838
private final int _sequenceNumber;
3939
private final String _creationTime;
4040
private final String _segmentName;
41+
private final String _topicName;
4142

4243
public LLCSegmentName(String segmentName) {
4344
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
44-
Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName);
45+
Preconditions.checkArgument(
46+
parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: %s", segmentName);
4547
_tableName = parts[0];
46-
_partitionGroupId = Integer.parseInt(parts[1]);
47-
_sequenceNumber = Integer.parseInt(parts[2]);
48-
_creationTime = parts[3];
48+
if (parts.length == 4) {
49+
_topicName = "";
50+
_partitionGroupId = Integer.parseInt(parts[1]);
51+
_sequenceNumber = Integer.parseInt(parts[2]);
52+
_creationTime = parts[3];
53+
} else {
54+
_topicName = parts[1];
55+
_partitionGroupId = Integer.parseInt(parts[2]);
56+
_sequenceNumber = Integer.parseInt(parts[3]);
57+
_creationTime = parts[4];
58+
}
4959
_segmentName = segmentName;
5060
}
5161

5262
public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
63+
this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch);
64+
}
65+
66+
public LLCSegmentName(
67+
String tableName, String topicName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
5368
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName);
69+
Preconditions.checkArgument(topicName == null || !topicName.contains(SEPARATOR),
70+
"Illegal topic name: %s", tableName);
5471
_tableName = tableName;
72+
_topicName = topicName;
5573
_partitionGroupId = partitionGroupId;
5674
_sequenceNumber = sequenceNumber;
5775
// ISO8601 date: 20160120T1234Z
5876
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
59-
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
77+
if ("".equals(topicName)) {
78+
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
79+
} else {
80+
_segmentName =
81+
tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR
82+
+ _creationTime;
83+
}
84+
}
85+
86+
private LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, String creationTime,
87+
String segmentName) {
88+
_tableName = tableName;
89+
_topicName = "";
90+
_partitionGroupId = partitionGroupId;
91+
_sequenceNumber = sequenceNumber;
92+
_creationTime = creationTime;
93+
_segmentName = segmentName;
6094
}
6195

6296
/**
@@ -65,6 +99,10 @@ public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber
6599
*/
66100
@Nullable
67101
public static LLCSegmentName of(String segmentName) {
102+
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
103+
if (parts.length < 4 || parts.length > 5) {
104+
return null;
105+
}
68106
try {
69107
return new LLCSegmentName(segmentName);
70108
} catch (Exception e) {
@@ -76,13 +114,7 @@ public static LLCSegmentName of(String segmentName) {
76114
* Returns whether the given segment name represents an LLC segment.
77115
*/
78116
public static boolean isLLCSegment(String segmentName) {
79-
int numSeparators = 0;
80-
int index = 0;
81-
while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
82-
numSeparators++;
83-
index += 2; // SEPARATOR.length()
84-
}
85-
return numSeparators == 3;
117+
return of(segmentName) != null;
86118
}
87119

88120
@Deprecated
@@ -94,17 +126,34 @@ public static boolean isLowLevelConsumerSegmentName(String segmentName) {
94126
* Returns the sequence number of the given segment name.
95127
*/
96128
public static int getSequenceNumber(String segmentName) {
97-
return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
129+
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
130+
if (parts.length == 4) {
131+
return Integer.parseInt(parts[2]);
132+
} else {
133+
return Integer.parseInt(parts[3]);
134+
}
98135
}
99136

100137
public String getTableName() {
101138
return _tableName;
102139
}
103140

141+
public String getTopicName() {
142+
return _topicName;
143+
}
144+
104145
public int getPartitionGroupId() {
105146
return _partitionGroupId;
106147
}
107148

149+
public String getPartitionGroupInfo() {
150+
if (_topicName.isEmpty()) {
151+
return String.valueOf(_partitionGroupId);
152+
} else {
153+
return _topicName + SEPARATOR + _partitionGroupId;
154+
}
155+
}
156+
108157
public int getSequenceNumber() {
109158
return _sequenceNumber;
110159
}
@@ -127,6 +176,9 @@ public String getSegmentName() {
127176
public int compareTo(LLCSegmentName other) {
128177
Preconditions.checkArgument(_tableName.equals(other._tableName),
129178
"Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName());
179+
if (!_topicName.equals(other._topicName)) {
180+
return StringUtils.compare(_topicName, other._topicName);
181+
}
130182
if (_partitionGroupId != other._partitionGroupId) {
131183
return Integer.compare(_partitionGroupId, other._partitionGroupId);
132184
}

pinot-common/src/test/java/org/apache/pinot/common/utils/LLCSegmentNameTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ public void testSegmentNameBuilder() {
4242
assertEquals(llcSegmentName.getPartitionGroupId(), 0);
4343
assertEquals(llcSegmentName.getSequenceNumber(), 1);
4444

45+
LLCSegmentName llcSegmentNameWithTopicName = new LLCSegmentName("myTable", "myTopic", 0, 1, 1465508537069L);
46+
String segmentNameWithTopicName = llcSegmentNameWithTopicName.getSegmentName();
47+
assertEquals(segmentNameWithTopicName, "myTable__myTopic__0__1__20160609T2142Z");
48+
assertTrue(LLCSegmentName.isLLCSegment(segmentName));
49+
assertEquals(llcSegmentNameWithTopicName.getTableName(), "myTable");
50+
assertEquals(llcSegmentNameWithTopicName.getTopicName(), "myTopic");
51+
assertEquals(llcSegmentNameWithTopicName.getPartitionGroupId(), 0);
52+
assertEquals(llcSegmentNameWithTopicName.getSequenceNumber(), 1);
53+
4554
// Invalid segment name
4655
assertFalse(LLCSegmentName.isLLCSegment("a__abc__1__3__4__54__g__gg___h"));
4756
}
@@ -95,4 +104,61 @@ public void testLLCSegmentName() {
95104
Arrays.sort(testSorted);
96105
Assert.assertEquals(testSorted, new LLCSegmentName[]{segName5, segName1, segName6, segName3, segName4});
97106
}
107+
108+
@Test
109+
public void testLLCSegmentNameWithTopicName() {
110+
String tableName = "myTable";
111+
String topicName = "myTopic";
112+
final int partitionGroupId = 4;
113+
final int sequenceNumber = 27;
114+
final long msSinceEpoch = 1466200248000L;
115+
final String creationTime = "20160617T2150Z";
116+
final long creationTimeInMs = 1466200200000L;
117+
final String segmentName = "myTable__myTopic__4__27__" + creationTime;
118+
119+
LLCSegmentName segName1 = new LLCSegmentName(tableName, topicName, partitionGroupId, sequenceNumber, msSinceEpoch);
120+
Assert.assertEquals(segName1.getSegmentName(), segmentName);
121+
Assert.assertEquals(segName1.getPartitionGroupId(), partitionGroupId);
122+
Assert.assertEquals(segName1.getCreationTime(), creationTime);
123+
Assert.assertEquals(segName1.getCreationTimeMs(), creationTimeInMs);
124+
Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber);
125+
Assert.assertEquals(segName1.getTableName(), tableName);
126+
Assert.assertEquals(segName1.getTopicName(), topicName);
127+
128+
LLCSegmentName segName2 = new LLCSegmentName(segmentName);
129+
Assert.assertEquals(segName2.getSegmentName(), segmentName);
130+
Assert.assertEquals(segName2.getPartitionGroupId(), partitionGroupId);
131+
Assert.assertEquals(segName2.getCreationTime(), creationTime);
132+
Assert.assertEquals(segName2.getCreationTimeMs(), creationTimeInMs);
133+
Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber);
134+
Assert.assertEquals(segName2.getTableName(), tableName);
135+
Assert.assertEquals(segName2.getTopicName(), topicName);
136+
137+
Assert.assertEquals(segName1, segName2);
138+
139+
LLCSegmentName segName3 =
140+
new LLCSegmentName(tableName, topicName, partitionGroupId + 1, sequenceNumber - 1, msSinceEpoch);
141+
Assert.assertTrue(segName1.compareTo(segName3) < 0);
142+
LLCSegmentName segName4 =
143+
new LLCSegmentName(tableName, topicName, partitionGroupId + 1, sequenceNumber + 1, msSinceEpoch);
144+
Assert.assertTrue(segName1.compareTo(segName4) < 0);
145+
LLCSegmentName segName5 =
146+
new LLCSegmentName(tableName, topicName, partitionGroupId - 1, sequenceNumber + 1, msSinceEpoch);
147+
Assert.assertTrue(segName1.compareTo(segName5) > 0);
148+
LLCSegmentName segName6 =
149+
new LLCSegmentName(tableName, topicName, partitionGroupId, sequenceNumber + 1, msSinceEpoch);
150+
Assert.assertTrue(segName1.compareTo(segName6) < 0);
151+
152+
LLCSegmentName segName7 =
153+
new LLCSegmentName(tableName + "NotGood", topicName, partitionGroupId, sequenceNumber + 1, msSinceEpoch);
154+
try {
155+
segName1.compareTo(segName7);
156+
Assert.fail("Not failing when comparing " + segName1.getSegmentName() + " and " + segName7.getSegmentName());
157+
} catch (Exception e) {
158+
// expected
159+
}
160+
LLCSegmentName[] testSorted = new LLCSegmentName[]{segName3, segName1, segName4, segName5, segName6};
161+
Arrays.sort(testSorted);
162+
Assert.assertEquals(testSorted, new LLCSegmentName[]{segName5, segName1, segName6, segName3, segName4});
163+
}
98164
}

pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
119119
import org.apache.pinot.controller.validation.DiskUtilizationChecker;
120120
import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
121+
import org.apache.pinot.controller.validation.RealtimeOffsetAutoResetManager;
121122
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
122123
import org.apache.pinot.controller.validation.ResourceUtilizationChecker;
123124
import org.apache.pinot.controller.validation.ResourceUtilizationManager;
@@ -190,6 +191,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
190191
// Can only be constructed after resource manager getting started
191192
protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
192193
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
194+
protected RealtimeOffsetAutoResetManager _realtimeOffsetAutoResetManager;
193195
protected BrokerResourceValidationManager _brokerResourceValidationManager;
194196
protected SegmentRelocator _segmentRelocator;
195197
protected RetentionManager _retentionManager;
@@ -385,6 +387,10 @@ public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
385387
return _realtimeSegmentValidationManager;
386388
}
387389

390+
public RealtimeOffsetAutoResetManager getRealtimeOffsetAutoResetManager() {
391+
return _realtimeOffsetAutoResetManager;
392+
}
393+
388394
public BrokerResourceValidationManager getBrokerResourceValidationManager() {
389395
return _brokerResourceValidationManager;
390396
}
@@ -880,6 +886,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
880886
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker,
881887
_resourceUtilizationManager);
882888
periodicTasks.add(_realtimeSegmentValidationManager);
889+
_realtimeOffsetAutoResetManager =
890+
new RealtimeOffsetAutoResetManager(_config, _helixResourceManager, _leadControllerManager,
891+
_pinotLLCRealtimeSegmentManager, _controllerMetrics);
883892
_brokerResourceValidationManager =
884893
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
885894
periodicTasks.add(_brokerResourceValidationManager);

pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ public static class ControllerPeriodicTasksConf {
114114
"controller.realtime.segment.validation.frequencyPeriod";
115115
public static final String REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
116116
"controller.realtime.segment.validation.initialDelayInSeconds";
117+
public static final String REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD =
118+
"controller.realtime.offsetAutoReset.frequencyPeriod";
119+
public static final String REALTIME_OFFSET_AUTO_RESET_INITIAL_DELAY_IN_SECONDS =
120+
"controller.realtime.offsetAutoReset.initialDelayInSeconds";
117121
// Deprecated as of 0.8.0
118122
@Deprecated
119123
public static final String DEPRECATED_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -272,6 +276,7 @@ public static long getRandomInitialDelayInSeconds() {
272276
public static final int DEFAULT_RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
273277
public static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
274278
public static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
279+
public static final int DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
275280
public static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
276281
public static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
277282
public static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
@@ -674,6 +679,16 @@ public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequen
674679
Integer.toString(validationFrequencyInSeconds));
675680
}
676681

682+
public int getRealtimeOffsetAutoResetFrequencyInSeconds() {
683+
return getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD,
684+
ControllerPeriodicTasksConf.DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS);
685+
}
686+
687+
public void setRealtimeOffsetAutoResetFrequencyInSeconds(int offsetAutoResetFrequencyInSeconds) {
688+
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_FREQUENCY_PERIOD,
689+
Integer.toString(offsetAutoResetFrequencyInSeconds));
690+
}
691+
677692
/**
678693
* Return <code>controller.broker.resource.validation.frequencyPeriod</code> or
679694
* <code>controller.broker.resource.validation.frequencyInSeconds</code> or the default broker resource validation
@@ -1153,6 +1168,11 @@ public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
11531168
getPeriodicTaskInitialDelayInSeconds());
11541169
}
11551170

1171+
public long getRealtimeOffsetAutoResetInitialDelaySeconds() {
1172+
return getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_INITIAL_DELAY_IN_SECONDS,
1173+
getPeriodicTaskInitialDelayInSeconds());
1174+
}
1175+
11561176
public boolean isDeepStoreRetryUploadLLCSegmentEnabled() {
11571177
return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
11581178
}

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,13 @@ private Constants() {
5454
public static final String PERIODIC_TASK_TAG = "PeriodicTask";
5555
public static final String UPSERT_RESOURCE_TAG = "Upsert";
5656
public static final String QUERY_WORKLOAD_TAG = "QueryWorkload";
57+
public static final String RESET_OFFSET_FROM = "ResetOffsetFrom";
58+
public static final String RESET_OFFSET_TO = "ResetOffsetTo";
59+
public static final String RESET_OFFSET_TOPIC_NAME = "ResetOffsetTopicName";
60+
public static final String RESET_OFFSET_TOPIC_PARTITION = "ResetOffsetTopicPartition";
5761

5862
public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = "RealtimeSegmentValidationManager";
63+
public static final String REALTIME_OFFSET_AUTO_RESET_MANAGER = "RealtimeOffsetAutoResetManager";
5964

6065
public static TableType validateTableType(String tableTypeStr) {
6166
if (StringUtils.isBlank(tableTypeStr)) {

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,6 +2106,7 @@ public void updateTableConfig(TableConfig tableConfig)
21062106
throws IOException {
21072107
validateTableTenantConfig(tableConfig);
21082108
validateTableTaskMinionInstanceTagConfig(tableConfig);
2109+
// TODO: make sure external table update would not remove the ephemeral backfill topics
21092110
setExistingTableConfig(tableConfig);
21102111
}
21112112

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.controller.helix.core.periodictask;
20+
21+
import java.util.Collection;
22+
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
23+
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
24+
import org.apache.pinot.spi.stream.StreamConfig;
25+
26+
27+
public abstract class RealtimeOffsetAutoResetHandler {
28+
protected PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
29+
protected PinotHelixResourceManager _pinotHelixResourceManager;
30+
31+
public RealtimeOffsetAutoResetHandler(PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
32+
PinotHelixResourceManager pinotHelixResourceManager) {
33+
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
34+
_pinotHelixResourceManager = pinotHelixResourceManager;
35+
}
36+
37+
/**
38+
* Trigger the job to backfill the skipped interval due to offset auto reset.
39+
* It is expected to backfill the [fromOffset, toOffset) interval.
40+
* @return if successfully started the backfill job
41+
*/
42+
public abstract boolean triggerBackfillJob(
43+
String tableNameWithType, StreamConfig streamConfig, String topicName, int partitionId, long fromOffset,
44+
long toOffset);
45+
46+
/**
47+
* Ensure all topics under the table are being backfilled. It is the caller's responsibility to figure out what
48+
* topic set it should check.
49+
*/
50+
public abstract void ensureBackfillJobsRunning(String tableNameWithType, Collection<String> topicNames);
51+
52+
/**
53+
* Return the Collection of completed and cleaned up topicNames from the input.
54+
*/
55+
public abstract Collection<String> cleanupCompletedBackfillJobs(
56+
String tableNameWithType, Collection<String> topicNames);
57+
58+
public abstract void close();
59+
}

0 commit comments

Comments
 (0)