|
53 | 53 | public class ContainerHealthSchemaManager { |
54 | 54 | private static final Logger LOG = |
55 | 55 | LoggerFactory.getLogger(ContainerHealthSchemaManager.class); |
56 | | - private static final int BATCH_INSERT_CHUNK_SIZE = 1000; |
| 56 | + private static final int BATCH_INSERT_CHUNK_SIZE = 5_000; |
57 | 57 |
|
58 | 58 | /** |
59 | 59 | * Maximum number of container IDs to include in a single |
@@ -205,28 +205,75 @@ public void replaceUnhealthyContainerRecordsAtomically( |
205 | 205 |
|
206 | 206 | private int deleteScmStatesForContainers(DSLContext dslContext, |
207 | 207 | List<Long> containerIds) { |
| 208 | + if (containerIds.isEmpty()) { |
| 209 | + return 0; |
| 210 | + } |
| 211 | + |
| 212 | + List<Long> sortedIds = containerIds.stream() |
| 213 | + .distinct() |
| 214 | + .sorted() |
| 215 | + .collect(Collectors.toList()); |
| 216 | + |
208 | 217 | int totalDeleted = 0; |
| 218 | + List<Long> inClauseBatch = new ArrayList<>(MAX_IN_CLAUSE_CHUNK_SIZE); |
| 219 | + |
| 220 | + for (int i = 0; i < sortedIds.size(); ) { |
| 221 | + int rangeStart = i; |
| 222 | + while (i + 1 < sortedIds.size() |
| 223 | + && sortedIds.get(i + 1) == sortedIds.get(i) + 1) { |
| 224 | + i++; |
| 225 | + } |
| 226 | + |
| 227 | + if (i > rangeStart) { |
| 228 | + if (!inClauseBatch.isEmpty()) { |
| 229 | + totalDeleted += deleteScmStatesForContainerIdInClause(dslContext, inClauseBatch); |
| 230 | + inClauseBatch.clear(); |
| 231 | + } |
| 232 | + totalDeleted += deleteScmStatesForContainerIdRange(dslContext, |
| 233 | + sortedIds.get(rangeStart), sortedIds.get(i)); |
| 234 | + } else { |
| 235 | + inClauseBatch.add(sortedIds.get(i)); |
| 236 | + if (inClauseBatch.size() >= MAX_IN_CLAUSE_CHUNK_SIZE) { |
| 237 | + totalDeleted += deleteScmStatesForContainerIdInClause(dslContext, inClauseBatch); |
| 238 | + inClauseBatch.clear(); |
| 239 | + } |
| 240 | + } |
| 241 | + i++; |
| 242 | + } |
209 | 243 |
|
210 | | - for (int from = 0; from < containerIds.size(); from += MAX_IN_CLAUSE_CHUNK_SIZE) { |
211 | | - int to = Math.min(from + MAX_IN_CLAUSE_CHUNK_SIZE, containerIds.size()); |
212 | | - List<Long> chunk = containerIds.subList(from, to); |
213 | | - |
214 | | - int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS) |
215 | | - .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk)) |
216 | | - .and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( |
217 | | - UnHealthyContainerStates.MISSING.toString(), |
218 | | - UnHealthyContainerStates.EMPTY_MISSING.toString(), |
219 | | - UnHealthyContainerStates.UNDER_REPLICATED.toString(), |
220 | | - UnHealthyContainerStates.OVER_REPLICATED.toString(), |
221 | | - UnHealthyContainerStates.MIS_REPLICATED.toString(), |
222 | | - UnHealthyContainerStates.NEGATIVE_SIZE.toString(), |
223 | | - UnHealthyContainerStates.REPLICA_MISMATCH.toString())) |
224 | | - .execute(); |
225 | | - totalDeleted += deleted; |
| 244 | + if (!inClauseBatch.isEmpty()) { |
| 245 | + totalDeleted += deleteScmStatesForContainerIdInClause(dslContext, inClauseBatch); |
226 | 246 | } |
227 | 247 | return totalDeleted; |
228 | 248 | } |
229 | 249 |
|
| 250 | + private int deleteScmStatesForContainerIdRange(DSLContext dslContext, |
| 251 | + long startContainerId, long endContainerId) { |
| 252 | + return dslContext.deleteFrom(UNHEALTHY_CONTAINERS) |
| 253 | + .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.between(startContainerId, endContainerId)) |
| 254 | + .and(scmGeneratedStateFilter()) |
| 255 | + .execute(); |
| 256 | + } |
| 257 | + |
| 258 | + private int deleteScmStatesForContainerIdInClause(DSLContext dslContext, |
| 259 | + List<Long> containerIds) { |
| 260 | + return dslContext.deleteFrom(UNHEALTHY_CONTAINERS) |
| 261 | + .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(containerIds)) |
| 262 | + .and(scmGeneratedStateFilter()) |
| 263 | + .execute(); |
| 264 | + } |
| 265 | + |
| 266 | + private org.jooq.Condition scmGeneratedStateFilter() { |
| 267 | + return UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( |
| 268 | + UnHealthyContainerStates.MISSING.toString(), |
| 269 | + UnHealthyContainerStates.EMPTY_MISSING.toString(), |
| 270 | + UnHealthyContainerStates.UNDER_REPLICATED.toString(), |
| 271 | + UnHealthyContainerStates.OVER_REPLICATED.toString(), |
| 272 | + UnHealthyContainerStates.MIS_REPLICATED.toString(), |
| 273 | + UnHealthyContainerStates.NEGATIVE_SIZE.toString(), |
| 274 | + UnHealthyContainerStates.REPLICA_MISMATCH.toString()); |
| 275 | + } |
| 276 | + |
230 | 277 | /** |
231 | 278 | * Returns previous in-state-since timestamps for tracked unhealthy states. |
232 | 279 | * The key is a stable containerId + state tuple. |
|
0 commit comments