Skip to content

Commit 77ecc4c

Browse files
xiangfu0claude
andauthored
Add time-range based segment reload to controller API (#17627)
* Add time-range segment reload API - Add controller endpoint to reload segments in a time range - Select segments via metadata and dispatch per instance - Add unit tests for time range reload behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Unify reload job ID handling: use single shared job ID for instanceToSegmentsMap path The existing `instanceToSegmentsMap` reload path was creating per-instance ZK entries with per-instance Helix message IDs, while the new time-range reload path used a single shared UUID across all servers. Unify them by generating one UUID per logical reload operation in reloadSegmentsForTable and storing a single ZK entry, consistent with the time-range path. Remove the now-unused 3-arg reloadSegments overload in PinotHelixResourceManager. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1bf528e commit 77ecc4c

11 files changed

Lines changed: 856 additions & 74 deletions

File tree

pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ public SegmentReloadMessage(String tableNameWithType, boolean forceDownload) {
5656
* @param forceDownload whether to download segments from deep store when reloading.
5757
*/
5858
public SegmentReloadMessage(String tableNameWithType, @Nullable List<String> segmentNames, boolean forceDownload) {
59+
this(tableNameWithType, segmentNames, forceDownload, null);
60+
}
61+
62+
/**
63+
* This msg asks server to reload a list of specified segments with an explicit reload job id.
64+
*
65+
* @param tableNameWithType the table where the segments are from.
66+
* @param segmentNames a list of specified segments to reload, or null for all segments.
67+
* @param forceDownload whether to download segments from deep store when reloading.
68+
* @param reloadJobId reload job id to associate with the reload operation
69+
*/
70+
public SegmentReloadMessage(String tableNameWithType, @Nullable List<String> segmentNames, boolean forceDownload,
71+
@Nullable String reloadJobId) {
5972
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
6073
setResourceName(tableNameWithType);
6174
setMsgSubType(RELOAD_SEGMENT_MSG_SUB_TYPE);
@@ -66,6 +79,9 @@ public SegmentReloadMessage(String tableNameWithType, @Nullable List<String> seg
6679

6780
// Persisting the msg ID as the reload job ID.
6881
znRecord.setSimpleField(RELOAD_JOB_ID_KEY, getMsgId());
82+
if (reloadJobId != null) {
83+
znRecord.setSimpleField(RELOAD_JOB_ID_KEY, reloadJobId);
84+
}
6985

7086
znRecord.setBooleanField(FORCE_DOWNLOAD_KEY, forceDownload);
7187
if (CollectionUtils.isNotEmpty(segmentNames)) {

pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PinotControllerJobMetadataDto.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class PinotControllerJobMetadataDto {
4141
private int _messageCount;
4242
private String _segmentName;
4343
private String _instanceName;
44+
private String _instanceToSegmentsMap;
4445

4546
public String getJobId() {
4647
return _jobId;
@@ -106,4 +107,14 @@ public PinotControllerJobMetadataDto setInstanceName(@Nullable String instanceNa
106107
_instanceName = instanceName;
107108
return this;
108109
}
110+
111+
@Nullable
112+
public String getInstanceToSegmentsMap() {
113+
return _instanceToSegmentsMap;
114+
}
115+
116+
public PinotControllerJobMetadataDto setInstanceToSegmentsMap(@Nullable String instanceToSegmentsMap) {
117+
_instanceToSegmentsMap = instanceToSegmentsMap;
118+
return this;
119+
}
109120
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.messages;
20+
21+
import java.util.List;
22+
import org.apache.helix.model.Message;
23+
import org.testng.annotations.Test;
24+
25+
import static org.testng.Assert.assertEquals;
26+
import static org.testng.Assert.assertFalse;
27+
import static org.testng.Assert.assertThrows;
28+
import static org.testng.Assert.assertTrue;
29+
30+
public class SegmentReloadMessageTest {
31+
32+
@Test
33+
public void testReloadJobIdDefaultToMessageId() {
34+
SegmentReloadMessage message = new SegmentReloadMessage("testTable_OFFLINE", false);
35+
36+
assertFalse(message.shouldForceDownload());
37+
assertEquals(message.getReloadJobId(), message.getMsgId());
38+
}
39+
40+
@Test
41+
public void testReloadJobIdOverrideAndSegments() {
42+
List<String> segments = List.of("seg_1", "seg_2");
43+
SegmentReloadMessage message = new SegmentReloadMessage("testTable_OFFLINE", segments, true, "job-123");
44+
45+
assertTrue(message.shouldForceDownload());
46+
assertEquals(message.getSegmentList(), segments);
47+
assertEquals(message.getReloadJobId(), "job-123");
48+
}
49+
50+
@Test
51+
public void testInvalidSubtypeThrows() {
52+
Message rawMessage = new Message(Message.MessageType.USER_DEFINE_MSG, "message-id");
53+
rawMessage.setMsgSubType("INVALID_SUBTYPE");
54+
55+
assertThrows(IllegalArgumentException.class, () -> new SegmentReloadMessage(rawMessage));
56+
}
57+
}

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
* POST requests:
6363
* <ul>
6464
* <li>"/segments/{tableName}/{segmentName}/reload": reload a specific segment</li>
65-
* <li>"/segments/{tableName}/reload": reload all segments in a table</li>
65+
* <li>"/segments/{tableName}/reload": reload all segments in a table (optionally within a time range)</li>
6666
* </ul>
6767
* </li>
6868
* <li>
@@ -128,7 +128,11 @@ public SuccessResponse reloadSegment(
128128
@Authenticate(AccessType.UPDATE)
129129
@Produces(MediaType.APPLICATION_JSON)
130130
@ApiOperation(value = "Reload all segments in a table",
131-
notes = "Reloads all segments for the specified table. Supports filtering by type, instance, or custom mapping.")
131+
notes = "Reloads all segments for the specified table. Supports filtering by type, instance, "
132+
+ "custom mapping, or time range. Time range params are in milliseconds and the range is "
133+
+ "[startTimestamp, endTimestamp). Either timestamp can be omitted to make that side of the "
134+
+ "range unbounded. When using time range parameters, do not specify targetInstance or "
135+
+ "instanceToSegmentsMap; these options are mutually exclusive with time range filters.")
132136
@ApiResponses(value = {
133137
@ApiResponse(code = 200, message = "Reload jobs submitted successfully"),
134138
@ApiResponse(code = 404, message = "No segments found")
@@ -140,13 +144,19 @@ public SuccessResponse reloadAllSegments(
140144
String tableTypeStr,
141145
@ApiParam(value = "Force server to re-download segments from deep store", defaultValue = "false")
142146
@QueryParam("forceDownload") @DefaultValue("false") boolean forceDownload,
147+
@ApiParam(value = "Start timestamp (inclusive) in milliseconds. Omit for an unbounded lower bound.")
148+
@QueryParam("startTimestamp") String startTimestampStr,
149+
@ApiParam(value = "End timestamp (exclusive) in milliseconds. Omit for an unbounded upper bound.")
150+
@QueryParam("endTimestamp") String endTimestampStr,
151+
@ApiParam(value = "Whether to exclude segments overlapping the time range", defaultValue = "false")
152+
@QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping,
143153
@ApiParam(value = "Target specific server instance") @QueryParam("targetInstance") @Nullable
144154
String targetInstance,
145155
@ApiParam(value = "JSON map of instance to segment lists (overrides targetInstance)")
146156
@QueryParam("instanceToSegmentsMap") @Nullable String instanceToSegmentsMapInJson, @Context HttpHeaders headers)
147157
throws IOException {
148158
return _service.reloadAllSegments(tableName, tableTypeStr, forceDownload, targetInstance,
149-
instanceToSegmentsMapInJson, headers);
159+
instanceToSegmentsMapInJson, startTimestampStr, endTimestampStr, excludeOverlapping, headers);
150160
}
151161

152162
@GET

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,23 @@ public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTyp
26082608
*/
26092609
public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNames, @Nullable String instanceName,
26102610
String jobId, long jobSubmissionTimeMs, int numMessagesSent) {
2611+
return addNewReloadSegmentJob(tableNameWithType, segmentNames, instanceName, jobId, jobSubmissionTimeMs,
2612+
numMessagesSent, null);
2613+
}
2614+
2615+
/**
2616+
* Adds a new reload segment job metadata into ZK
2617+
* @param tableNameWithType Table for which job is to be added
2618+
* @param segmentNames Name of the segments being reloaded, separated by comma
2619+
* @param instanceName Name of the instance doing the segment reloading, optional.
2620+
* @param jobId job's UUID
2621+
* @param jobSubmissionTimeMs time at which the job was submitted
2622+
* @param numMessagesSent number of messages that were sent to servers. Saved as metadata
2623+
* @param instanceToSegmentsMapJson exact instance-to-segments mapping targeted by the job, optional.
2624+
* @return boolean representing success / failure of the ZK write step
2625+
*/
2626+
public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNames, @Nullable String instanceName,
2627+
String jobId, long jobSubmissionTimeMs, int numMessagesSent, @Nullable String instanceToSegmentsMapJson) {
26112628
Map<String, String> jobMetadata = new HashMap<>();
26122629
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
26132630
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
@@ -2618,6 +2635,10 @@ public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNa
26182635
if (instanceName != null) {
26192636
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, instanceName);
26202637
}
2638+
if (instanceToSegmentsMapJson != null) {
2639+
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_TO_SEGMENTS_MAP,
2640+
instanceToSegmentsMapJson);
2641+
}
26212642
return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.RELOAD_SEGMENT);
26222643
}
26232644

@@ -2976,16 +2997,17 @@ public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMeta
29762997
sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
29772998
}
29782999

2979-
public Map<String, Pair<Integer, String>> reloadSegments(String tableNameWithType, boolean forceDownload,
2980-
Map<String, List<String>> instanceToSegmentsMap) {
3000+
public Map<String, Integer> reloadSegments(String tableNameWithType, boolean forceDownload,
3001+
Map<String, List<String>> instanceToSegmentsMap, String reloadJobId) {
29813002
LOGGER.info("Sending reload messages for table: {} with forceDownload: {}, and instanceToSegmentsMap: {}",
29823003
tableNameWithType, forceDownload, instanceToSegmentsMap);
29833004

29843005
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
2985-
Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
3006+
Map<String, Integer> instanceMsgInfoMap = new HashMap<>();
29863007
for (Map.Entry<String, List<String>> entry : instanceToSegmentsMap.entrySet()) {
29873008
String targetInstance = entry.getKey();
2988-
SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
3009+
SegmentReloadMessage message =
3010+
new SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload, reloadJobId);
29893011
int numMessagesSent =
29903012
MessagingServiceUtils.send(messagingService, message, tableNameWithType, null, targetInstance);
29913013
if (numMessagesSent > 0) {
@@ -2994,7 +3016,7 @@ public Map<String, Pair<Integer, String>> reloadSegments(String tableNameWithTyp
29943016
} else {
29953017
LOGGER.warn("No reload message sent to instance: {} for table: {}", targetInstance, tableNameWithType);
29963018
}
2997-
instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, message.getMsgId()));
3019+
instanceMsgInfoMap.put(targetInstance, numMessagesSent);
29983020
}
29993021
return instanceMsgInfoMap;
30003022
}
@@ -3014,7 +3036,7 @@ public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean
30143036
LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
30153037
}
30163038

3017-
return Pair.of(numMessagesSent, message.getMsgId());
3039+
return Pair.of(numMessagesSent, message.getReloadJobId());
30183040
}
30193041

30203042
public Pair<Integer, String> reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload,
@@ -3033,7 +3055,7 @@ public Pair<Integer, String> reloadSegment(String tableNameWithType, String segm
30333055
LOGGER.warn("No reload message sent for segment: {} in table: {}", segmentName, tableNameWithType);
30343056
}
30353057

3036-
return Pair.of(numMessagesSent, message.getMsgId());
3058+
return Pair.of(numMessagesSent, message.getReloadJobId());
30373059
}
30383060

30393061
/**

0 commit comments

Comments
 (0)