Skip to content

Commit c44fd79

Browse files
authored
[Auto reset 2/3]Introduce topic 'inactive' status (#16692)
* Introduce topic 'inactive' status * Expose the topic pause through API and add UT for PartitionGroupMetadataFetcher * Fix style * Fix idealstate update * Change variable naming
1 parent 0e7393a commit c44fd79

9 files changed

Lines changed: 438 additions & 39 deletions

File tree

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@
2929
import io.swagger.annotations.Authorization;
3030
import io.swagger.annotations.SecurityDefinition;
3131
import io.swagger.annotations.SwaggerDefinition;
32+
import java.util.Arrays;
3233
import java.util.HashMap;
3334
import java.util.HashSet;
35+
import java.util.List;
3436
import java.util.Map;
3537
import java.util.Set;
3638
import java.util.UUID;
3739
import java.util.concurrent.Executor;
40+
import java.util.stream.Collectors;
3841
import javax.inject.Inject;
3942
import javax.ws.rs.DefaultValue;
4043
import javax.ws.rs.GET;
@@ -123,6 +126,38 @@ public Response pauseConsumption(
123126
}
124127
}
125128

129+
@POST
130+
@Path("/tables/{tableName}/pauseTopicConsumption")
131+
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.PAUSE_CONSUMPTION)
132+
@Produces(MediaType.APPLICATION_JSON)
133+
@ApiOperation(value = "Pause consumption of some topics of a realtime table", notes = "Pause the consumption of "
134+
+ "some topics of a realtime table.")
135+
public Response pauseTopicConsumption(
136+
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
137+
@ApiParam(value = "Comma separated list of index of the topics", required = true) @QueryParam("topicIndices")
138+
String topicIndices,
139+
@Context HttpHeaders headers) {
140+
tableName = DatabaseUtils.translateTableName(tableName, headers);
141+
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
142+
validateTable(tableNameWithType);
143+
List<Integer> topicIndexList;
144+
try {
145+
topicIndexList = Arrays.stream(topicIndices.split(","))
146+
.map(String::trim)
147+
.map(idx -> Integer.parseInt(idx))
148+
.collect(Collectors.toList());
149+
} catch (NumberFormatException nfe) {
150+
throw new ControllerApplicationException(LOGGER, "topicIndices should be a comma separated list of integers",
151+
Response.Status.BAD_REQUEST, nfe);
152+
}
153+
try {
154+
return Response.ok(_pinotLLCRealtimeSegmentManager.pauseTopicsConsumption(tableNameWithType, topicIndexList))
155+
.build();
156+
} catch (Exception e) {
157+
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
158+
}
159+
}
160+
126161
@POST
127162
@Path("/tables/{tableName}/resumeConsumption")
128163
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RESUME_CONSUMPTION)
@@ -160,6 +195,39 @@ public Response resumeConsumption(
160195
}
161196
}
162197

198+
@POST
199+
@Path("/tables/{tableName}/resumeTopicConsumption")
200+
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RESUME_CONSUMPTION)
201+
@Produces(MediaType.APPLICATION_JSON)
202+
@ApiOperation(value = "Resume consumption of some topics of a realtime table", notes =
203+
"Resume the consumption for some topics of a realtime table. There are two independent pause mechanism, "
204+
+ "table pause and topic pause. The topics is resumed only if both table and topics are resumed.")
205+
public Response resumeTopicConsumption(
206+
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
207+
@ApiParam(value = "Comma separated list of index of the topics", required = true) @QueryParam("topicIndices")
208+
String topicIndices,
209+
@Context HttpHeaders headers) {
210+
tableName = DatabaseUtils.translateTableName(tableName, headers);
211+
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
212+
validateTable(tableNameWithType);
213+
List<Integer> topicIndexList;
214+
try {
215+
topicIndexList = Arrays.stream(topicIndices.split(","))
216+
.map(String::trim)
217+
.map(idx -> Integer.parseInt(idx))
218+
.collect(Collectors.toList());
219+
} catch (NumberFormatException nfe) {
220+
throw new ControllerApplicationException(LOGGER, "topicIndices should be a comma separated list of integers",
221+
Response.Status.BAD_REQUEST, nfe);
222+
}
223+
try {
224+
return Response.ok(_pinotLLCRealtimeSegmentManager.resumeTopicsConsumption(
225+
tableNameWithType, topicIndexList)).build();
226+
} catch (Exception e) {
227+
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
228+
}
229+
}
230+
163231
@POST
164232
@Path("/tables/{tableName}/forceCommit")
165233
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.FORCE_COMMIT)

pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
479479
if (tableType == TableType.REALTIME && tableConfig != null) {
480480
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
481481
new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
482-
streamConfigs).findAndEmitMetrics(idealState);
482+
streamConfigs, idealState).findAndEmitMetrics(idealState);
483483
}
484484
}
485485

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,14 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
8787
* partition groups.
8888
* The size of this list is equal to the number of partition groups,
8989
* and is created using the latest segment zk metadata.
90+
* @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps.
9091
* @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
9192
*/
9293
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
93-
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
94+
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
95+
boolean forceGetOffsetFromStream) {
9496
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher(
95-
streamConfigs, partitionGroupConsumptionStatusList, forceGetOffsetFromStream);
97+
streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream);
9698
try {
9799
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
98100
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.base.Preconditions;
2323
import java.time.Duration;
2424
import java.time.Instant;
25+
import java.util.ArrayList;
2526
import java.util.Collections;
2627
import java.util.HashMap;
2728
import java.util.List;
@@ -38,6 +39,7 @@
3839
import org.apache.pinot.common.metrics.ControllerMetrics;
3940
import org.apache.pinot.common.utils.LLCSegmentName;
4041
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
42+
import org.apache.pinot.spi.config.table.PauseState;
4143
import org.apache.pinot.spi.stream.OffsetCriteria;
4244
import org.apache.pinot.spi.stream.StreamConfig;
4345
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -67,7 +69,7 @@ public class MissingConsumingSegmentFinder {
6769
private ControllerMetrics _controllerMetrics;
6870

6971
public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore,
70-
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
72+
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs, IdealState idealState) {
7173
_realtimeTableName = realtimeTableName;
7274
_controllerMetrics = controllerMetrics;
7375
_segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics);
@@ -81,7 +83,9 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt
8183
return streamConfig;
8284
});
8385
try {
84-
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), false)
86+
PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
87+
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(),
88+
pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false)
8589
.forEach(metadata -> {
8690
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
8791
});

0 commit comments

Comments
 (0)