Skip to content

Commit f5ceab6

Browse files
authored
Merge pull request #1020: [beam-core] add min-initial-splits to BatchLogRead
2 parents 9ec98b0 + acfe092 commit f5ceab6

3 files changed

Lines changed: 16 additions & 5 deletions

File tree

beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BatchLogRead.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public void splitRestriction(
250250
List<Partition> partitions = restriction.getPartitions();
251251
partitions.sort(Comparator.comparing(Partition::getMinTimestamp));
252252
int pos = 0;
253-
int reduced = (int) Math.sqrt(partitions.size());
253+
int reduced = Math.max(minInitialSplits, (int) Math.sqrt(partitions.size()));
254254
if (maxInitialSplits > 0 && reduced > maxInitialSplits) {
255255
reduced = maxInitialSplits;
256256
}
@@ -307,6 +307,7 @@ public TypeDescriptor<StreamElement> getOutputTypeDescriptor() {
307307
private final long startStamp;
308308
private final long endStamp;
309309
private final int maxInitialSplits;
310+
private final int minInitialSplits;
310311

311312
@VisibleForTesting
312313
BatchLogRead(
@@ -324,16 +325,24 @@ public TypeDescriptor<StreamElement> getOutputTypeDescriptor() {
324325
this.readerFactory = readerFactory;
325326
this.startStamp = startStamp;
326327
this.endStamp = endStamp;
327-
this.maxInitialSplits = readInitialSplits(cfg);
328+
this.maxInitialSplits = readMaxInitialSplits(cfg);
329+
this.minInitialSplits = readMinInitialSplits(cfg);
328330
}
329331

330-
private int readInitialSplits(Map<String, Object> cfg) {
332+
private int readMaxInitialSplits(Map<String, Object> cfg) {
331333
return Optional.ofNullable(cfg.get("batch.max-initial-splits"))
332334
.map(Object::toString)
333335
.map(Integer::valueOf)
334336
.orElse(-1);
335337
}
336338

339+
private int readMinInitialSplits(Map<String, Object> cfg) {
340+
return Optional.ofNullable(cfg.get("batch.min-initial-splits"))
341+
.map(Object::toString)
342+
.map(Integer::valueOf)
343+
.orElse(-1);
344+
}
345+
337346
@Override
338347
public PCollection<StreamElement> expand(PBegin input) {
339348
long delayMs =

beam/core/src/test/java/cz/o2/proxima/beam/core/BeamDataOperatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,8 @@ public void testBatchLogReadConfigParsing() {
450450
beam.getBatchUpdates(p, armed);
451451
DataAccessor accessor = beam.getAccessorFor(repo.getFamilyByName("gateway-storage-batch"));
452452
Map<String, Object> cfg = ((DirectDataAccessorWrapper) accessor).getCfg();
453-
assertEquals(1, cfg.get("batch.max-initial-splits"));
453+
assertEquals(3, cfg.get("batch.max-initial-splits"));
454+
assertEquals(2, cfg.get("batch.min-initial-splits"));
454455
}
455456

456457
private void terminatePipeline(
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
attributeFamilies {
22
gateway-storage-batch {
33
batch {
4-
max-initial-splits: 1
4+
max-initial-splits: 3
5+
min-initial-splits: 2
56
}
67
}
78
}

0 commit comments

Comments
 (0)