Skip to content

Commit 3cbf0be

Browse files
authored
fix: fixing splitting logic (#4554)
* fix: increasing max splits to 1 million Change-Id: I305c6b2182300f4ffdce8b8de523a5c0487d41ec * fix: improve splitting logic Change-Id: I68bbe7fd1b1026c13f9bf1c2f28b43a99280971e * update Change-Id: I06e3cdec7bc246ec27e6a83240175af5143e61fe * update Change-Id: I90db995cac0cf27de9520be459b06e834b23dae0 * add comment back Change-Id: I09b61e2c740739fd26279b79e3d4f9eafb4f637c
1 parent bb23792 commit 3cbf0be

File tree

2 files changed

+154
-26
lines changed

2 files changed

+154
-26
lines changed

bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.bigtable.repackaged.com.google.common.base.Preconditions;
2727
import com.google.bigtable.repackaged.com.google.common.collect.ImmutableMap;
2828
import com.google.cloud.bigtable.batch.common.CloudBigtableServiceImpl;
29+
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.ScanType;
2930
import com.google.cloud.bigtable.hbase.BigtableFixedProtoScan;
3031
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
3132
import java.io.IOException;
@@ -145,8 +146,7 @@ public class CloudBigtableIO {
145146
abstract static class AbstractSource extends BoundedSource<Result> {
146147

147148
protected static final Logger SOURCE_LOG = LoggerFactory.getLogger(AbstractSource.class);
148-
protected static final long SIZED_BASED_MAX_SPLIT_COUNT = 4_000;
149-
static final long COUNT_MAX_SPLIT_COUNT = 15_360;
149+
private static final long COUNT_MAX_SPLIT_COUNT = 5_000_000;
150150

151151
/** Configuration for a Cloud Bigtable connection, a table, and an optional scan. */
152152
private final CloudBigtableScanConfiguration configuration;
@@ -165,16 +165,16 @@ public Coder<Result> getOutputCoder() {
165165
// TODO: Move the splitting logic to bigtable-hbase, and separate concerns between beam needs
166166
// and Cloud Bigtable logic.
167167
protected List<SourceWithKeys> getSplits(long desiredBundleSizeBytes) throws Exception {
168-
desiredBundleSizeBytes =
169-
Math.max(
170-
calculateEstimatedSizeBytes(null) / SIZED_BASED_MAX_SPLIT_COUNT,
171-
desiredBundleSizeBytes);
172168
CloudBigtableScanConfiguration conf = getConfiguration();
173169
byte[] scanStartKey = conf.getStartRow();
174170
byte[] scanEndKey = conf.getStopRow();
175171
List<SourceWithKeys> splits = new ArrayList<>();
176172
byte[] startKey = HConstants.EMPTY_START_ROW;
177173
long lastOffset = 0;
174+
175+
byte[] currentStartKey = null;
176+
long accumulatedSize = 0;
177+
178178
for (KeyOffset response : getSampleRowKeys()) {
179179
byte[] endKey = response.getKey().toByteArray();
180180
// Avoid empty regions.
@@ -183,34 +183,63 @@ protected List<SourceWithKeys> getSplits(long desiredBundleSizeBytes) throws Exc
183183
}
184184

185185
long offset = response.getOffsetBytes();
186-
// Get all the start/end key ranges that match the user supplied Scan. See
186+
long tabletSize = offset - lastOffset;
187+
188+
// Get all the start/end key ranges that match the user supplied Scan. See
187189
// https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L298
188190
// for original logic.
189191
if (isWithinRange(scanStartKey, scanEndKey, startKey, endKey)) {
190-
byte[] splitStart = null;
191-
byte[] splitStop = null;
192-
if (scanStartKey.length == 0 || Bytes.compareTo(startKey, scanStartKey) >= 0) {
193-
splitStart = startKey;
194-
} else {
195-
splitStart = scanStartKey;
196-
}
197-
198-
if ((scanEndKey.length == 0 || Bytes.compareTo(endKey, scanEndKey) <= 0)
199-
&& endKey.length > 0) {
200-
splitStop = endKey;
192+
byte[] splitStart =
193+
(scanStartKey.length == 0 || Bytes.compareTo(startKey, scanStartKey) >= 0)
194+
? startKey
195+
: scanStartKey;
196+
byte[] splitStop =
197+
((scanEndKey.length == 0 || Bytes.compareTo(endKey, scanEndKey) <= 0)
198+
&& endKey.length > 0)
199+
? endKey
200+
: scanEndKey;
201+
202+
// Merging tablets that are smaller than desired bundle size
203+
if (tabletSize >= desiredBundleSizeBytes) {
204+
// If the current tablet size is already > bundle size, add previously accumulated
205+
// tablets
206+
// if the currentStartKey is not null.
207+
if (currentStartKey != null) {
208+
splits.add(createSourceWithKeys(currentStartKey, splitStart, accumulatedSize));
209+
currentStartKey = null;
210+
accumulatedSize = 0;
211+
}
212+
// and also add the current tablet to the splits
213+
splits.add(createSourceWithKeys(splitStart, splitStop, tabletSize));
201214
} else {
202-
splitStop = scanEndKey;
215+
if (currentStartKey == null) {
216+
currentStartKey = splitStart;
217+
}
218+
accumulatedSize += tabletSize;
219+
if (accumulatedSize >= desiredBundleSizeBytes) {
220+
splits.add(createSourceWithKeys(currentStartKey, splitStop, accumulatedSize));
221+
currentStartKey = null;
222+
accumulatedSize = 0;
223+
}
203224
}
204-
splits.addAll(split(offset - lastOffset, desiredBundleSizeBytes, splitStart, splitStop));
205225
}
206226
lastOffset = offset;
207227
startKey = endKey;
208228
}
229+
209230
// Create one last region if the last region doesn't reach the end or there are no regions.
210231
byte[] endKey = HConstants.EMPTY_END_ROW;
211232
if (!Bytes.equals(startKey, endKey) && scanEndKey.length == 0) {
212-
splits.add(createSourceWithKeys(startKey, endKey, 0));
233+
if (currentStartKey != null) {
234+
splits.add(createSourceWithKeys(currentStartKey, endKey, accumulatedSize));
235+
currentStartKey = null;
236+
} else {
237+
splits.add(createSourceWithKeys(startKey, endKey, 0));
238+
}
239+
} else if (currentStartKey != null) {
240+
splits.add(createSourceWithKeys(currentStartKey, scanEndKey, accumulatedSize));
213241
}
242+
214243
List<SourceWithKeys> result = reduceSplits(splits);
215244

216245
// Randomize the list, since the default behavior would lead to multiple workers hitting the

bigtable-dataflow-parent/bigtable-hbase-beam/src/test/java/com/google/cloud/bigtable/beam/CloudBigtableIOTest.java

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
2424
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.KeyOffset;
2525
import com.google.bigtable.repackaged.com.google.protobuf.ByteString;
26-
import com.google.cloud.bigtable.beam.CloudBigtableIO.AbstractSource;
2726
import com.google.cloud.bigtable.beam.CloudBigtableIO.Source;
2827
import com.google.cloud.bigtable.beam.CloudBigtableIO.SourceWithKeys;
2928
import java.util.ArrayList;
@@ -112,7 +111,7 @@ public void testSourceToString() throws Exception {
112111
@Test
113112
public void testSampleRowKeys() throws Exception {
114113
List<KeyOffset> sampleRowKeys = new ArrayList<>();
115-
int count = (int) (AbstractSource.COUNT_MAX_SPLIT_COUNT * 3 - 5);
114+
int count = 5;
116115
byte[][] keys = Bytes.split("A".getBytes(), "Z".getBytes(), count - 2);
117116
long tabletSize = 2L * 1024L * 1024L * 1024L;
118117
long boundary = 0;
@@ -129,7 +128,7 @@ public void testSampleRowKeys() throws Exception {
129128
}
130129
Source source = (Source) CloudBigtableIO.read(scanConfig);
131130
source.setSampleRowKeys(sampleRowKeys);
132-
List<SourceWithKeys> splits = source.getSplits(20000);
131+
List<SourceWithKeys> splits = source.getSplits(tabletSize * 2);
133132
Collections.sort(
134133
splits,
135134
new Comparator<SourceWithKeys>() {
@@ -140,7 +139,6 @@ public int compare(SourceWithKeys o1, SourceWithKeys o2) {
140139
ByteString.copyFrom(o2.getConfiguration().getStartRow()));
141140
}
142141
});
143-
Assert.assertTrue(splits.size() <= AbstractSource.COUNT_MAX_SPLIT_COUNT);
144142
Iterator<SourceWithKeys> iter = splits.iterator();
145143
SourceWithKeys last = iter.next();
146144
while (iter.hasNext()) {
@@ -159,7 +157,108 @@ public int compare(SourceWithKeys o1, SourceWithKeys o2) {
159157
Assert.assertTrue(current.getEstimatedSize() >= tabletSize);
160158
last = current;
161159
}
162-
// check first and last
160+
}
161+
162+
@Test
163+
public void testMergeSmallTablets() throws Exception {
164+
List<KeyOffset> sampleRowKeys = new ArrayList<>();
165+
long tabletSize = 10 * 1024 * 1024; // 10MB
166+
// Tablets:
167+
// "" to "A" (10MB)
168+
// "A" to "B" (10MB)
169+
// "B" to "C" (10MB)
170+
// "C" to "D" (10MB)
171+
// "D" to "E" (10MB)
172+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));
173+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("B"), tabletSize * 2));
174+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("C"), tabletSize * 3));
175+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("D"), tabletSize * 4));
176+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("E"), tabletSize * 5));
177+
178+
Source source = (Source) CloudBigtableIO.read(scanConfig);
179+
source.setSampleRowKeys(sampleRowKeys);
180+
181+
// desired = 25MB
182+
long desiredSize = 25 * 1024 * 1024;
183+
List<SourceWithKeys> splits = source.getSplits(desiredSize);
184+
185+
Collections.sort(
186+
splits,
187+
(o1, o2) ->
188+
ByteStringComparator.INSTANCE.compare(
189+
ByteString.copyFrom(o1.getConfiguration().getStartRow()),
190+
ByteString.copyFrom(o2.getConfiguration().getStartRow())));
191+
192+
// Expecting:
193+
// Split 1: "" to "C" (30MB)
194+
// Split 2: "C" to "" (20MB)
195+
Assert.assertEquals(2, splits.size());
196+
Assert.assertEquals("", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
197+
Assert.assertEquals("C", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
198+
Assert.assertEquals("C", Bytes.toStringBinary(splits.get(1).getConfiguration().getStartRow()));
199+
Assert.assertEquals("", Bytes.toStringBinary(splits.get(1).getConfiguration().getStopRow()));
200+
}
201+
202+
@Test
203+
public void testRespectScanRange() throws Exception {
204+
List<KeyOffset> sampleRowKeys = new ArrayList<>();
205+
long tabletSize = 10 * 1024 * 1024;
206+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));
207+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("B"), tabletSize * 2));
208+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("C"), tabletSize * 3));
209+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("D"), tabletSize * 4));
210+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("E"), tabletSize * 5));
211+
212+
// Scan from "B" to "D"
213+
CloudBigtableScanConfiguration customScanConfig =
214+
scanConfig.toBuilder().withKeys("B".getBytes(), "D".getBytes()).build();
215+
216+
Source source = (Source) CloudBigtableIO.read(customScanConfig);
217+
source.setSampleRowKeys(sampleRowKeys);
218+
219+
List<SourceWithKeys> splits = source.getSplits(25 * 1024 * 1024);
220+
221+
Collections.sort(
222+
splits,
223+
(o1, o2) ->
224+
ByteStringComparator.INSTANCE.compare(
225+
ByteString.copyFrom(o1.getConfiguration().getStartRow()),
226+
ByteString.copyFrom(o2.getConfiguration().getStartRow())));
227+
228+
// Tablet 3 ("B" to "C") and Tablet 4 ("C" to "D") are within range.
229+
// They are merged into one split of "B" to "D" (20MB).
230+
// Note: Due to current logic without flush on scanEnd, the last piece might get lost if it
231+
// doesn't cross desiredBundleSize,
232+
// but here we are checking if it respects the range. If it fails, it means it's lost and we
233+
// should check if bug exists.
234+
Assert.assertEquals(1, splits.size());
235+
Assert.assertEquals("B", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
236+
Assert.assertEquals("D", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
237+
}
238+
239+
@Test
240+
public void testLargeTabletsAsIs() throws Exception {
241+
List<KeyOffset> sampleRowKeys = new ArrayList<>();
242+
long tabletSize = 100 * 1024 * 1024; // 100MB
243+
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));
244+
245+
Source source = (Source) CloudBigtableIO.read(scanConfig);
246+
source.setSampleRowKeys(sampleRowKeys);
247+
248+
List<SourceWithKeys> splits = source.getSplits(25 * 1024 * 1024); // desired 25MB
249+
250+
Collections.sort(
251+
splits,
252+
(o1, o2) ->
253+
Bytes.compareTo(
254+
o1.getConfiguration().getStartRow(), o2.getConfiguration().getStartRow()));
255+
256+
Assert.assertEquals(2, splits.size()); // The tablet + trailing region to end of table
257+
Assert.assertEquals("", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
258+
Assert.assertEquals("A", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
259+
Assert.assertEquals("A", Bytes.toStringBinary(splits.get(1).getConfiguration().getStartRow()));
260+
Assert.assertEquals("", Bytes.toStringBinary(splits.get(1).getConfiguration().getStopRow()));
261+
Assert.assertEquals(tabletSize, splits.get(0).getEstimatedSize());
163262
}
164263

165264
@Test

0 commit comments

Comments
 (0)