Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.bigtable.repackaged.com.google.common.base.Preconditions;
import com.google.bigtable.repackaged.com.google.common.collect.ImmutableMap;
import com.google.cloud.bigtable.batch.common.CloudBigtableServiceImpl;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration.ScanType;
import com.google.cloud.bigtable.hbase.BigtableFixedProtoScan;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import java.io.IOException;
Expand Down Expand Up @@ -145,8 +146,7 @@ public class CloudBigtableIO {
abstract static class AbstractSource extends BoundedSource<Result> {

protected static final Logger SOURCE_LOG = LoggerFactory.getLogger(AbstractSource.class);
protected static final long SIZED_BASED_MAX_SPLIT_COUNT = 4_000;
static final long COUNT_MAX_SPLIT_COUNT = 15_360;
private static final long COUNT_MAX_SPLIT_COUNT = 5_000_000;

/** Configuration for a Cloud Bigtable connection, a table, and an optional scan. */
private final CloudBigtableScanConfiguration configuration;
Expand All @@ -165,16 +165,16 @@ public Coder<Result> getOutputCoder() {
// TODO: Move the splitting logic to bigtable-hbase, and separate concerns between beam needs
// and Cloud Bigtable logic.
protected List<SourceWithKeys> getSplits(long desiredBundleSizeBytes) throws Exception {
desiredBundleSizeBytes =
Math.max(
calculateEstimatedSizeBytes(null) / SIZED_BASED_MAX_SPLIT_COUNT,
desiredBundleSizeBytes);
CloudBigtableScanConfiguration conf = getConfiguration();
byte[] scanStartKey = conf.getStartRow();
byte[] scanEndKey = conf.getStopRow();
List<SourceWithKeys> splits = new ArrayList<>();
byte[] startKey = HConstants.EMPTY_START_ROW;
long lastOffset = 0;

byte[] currentStartKey = null;
long accumulatedSize = 0;

for (KeyOffset response : getSampleRowKeys()) {
byte[] endKey = response.getKey().toByteArray();
// Avoid empty regions.
Expand All @@ -183,34 +183,63 @@ protected List<SourceWithKeys> getSplits(long desiredBundleSizeBytes) throws Exc
}

long offset = response.getOffsetBytes();
// Get all the start/end key ranges that match the user supplied Scan. See
long tabletSize = offset - lastOffset;

// Get all the start/end key ranges that match the user supplied Scan. See
// https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L298
// for original logic.
if (isWithinRange(scanStartKey, scanEndKey, startKey, endKey)) {
byte[] splitStart = null;
byte[] splitStop = null;
if (scanStartKey.length == 0 || Bytes.compareTo(startKey, scanStartKey) >= 0) {
splitStart = startKey;
} else {
splitStart = scanStartKey;
}

if ((scanEndKey.length == 0 || Bytes.compareTo(endKey, scanEndKey) <= 0)
&& endKey.length > 0) {
splitStop = endKey;
byte[] splitStart =
(scanStartKey.length == 0 || Bytes.compareTo(startKey, scanStartKey) >= 0)
? startKey
: scanStartKey;
byte[] splitStop =
((scanEndKey.length == 0 || Bytes.compareTo(endKey, scanEndKey) <= 0)
&& endKey.length > 0)
? endKey
: scanEndKey;

// Merging tablets that are smaller than desired bundle size
if (tabletSize >= desiredBundleSizeBytes) {
// If the current tablet size is already > bundle size, add previously accumulated
// tablets
// if the currentStartKey is not null.
if (currentStartKey != null) {
splits.add(createSourceWithKeys(currentStartKey, splitStart, accumulatedSize));
currentStartKey = null;
accumulatedSize = 0;
}
// and also add the current tablet to the splits
splits.add(createSourceWithKeys(splitStart, splitStop, tabletSize));
} else {
splitStop = scanEndKey;
if (currentStartKey == null) {
currentStartKey = splitStart;
}
accumulatedSize += tabletSize;
if (accumulatedSize >= desiredBundleSizeBytes) {
splits.add(createSourceWithKeys(currentStartKey, splitStop, accumulatedSize));
currentStartKey = null;
accumulatedSize = 0;
}
}
splits.addAll(split(offset - lastOffset, desiredBundleSizeBytes, splitStart, splitStop));
}
lastOffset = offset;
startKey = endKey;
}

// Create one last region if the last region doesn't reach the end or there are no regions.
byte[] endKey = HConstants.EMPTY_END_ROW;
if (!Bytes.equals(startKey, endKey) && scanEndKey.length == 0) {
splits.add(createSourceWithKeys(startKey, endKey, 0));
if (currentStartKey != null) {
splits.add(createSourceWithKeys(currentStartKey, endKey, accumulatedSize));
currentStartKey = null;
} else {
splits.add(createSourceWithKeys(startKey, endKey, 0));
}
} else if (currentStartKey != null) {
splits.add(createSourceWithKeys(currentStartKey, scanEndKey, accumulatedSize));
}

List<SourceWithKeys> result = reduceSplits(splits);

// Randomize the list, since the default behavior would lead to multiple workers hitting the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.bigtable.repackaged.com.google.protobuf.ByteString;
import com.google.cloud.bigtable.beam.CloudBigtableIO.AbstractSource;
import com.google.cloud.bigtable.beam.CloudBigtableIO.Source;
import com.google.cloud.bigtable.beam.CloudBigtableIO.SourceWithKeys;
import java.util.ArrayList;
Expand Down Expand Up @@ -112,7 +111,7 @@ public void testSourceToString() throws Exception {
@Test
public void testSampleRowKeys() throws Exception {
List<KeyOffset> sampleRowKeys = new ArrayList<>();
int count = (int) (AbstractSource.COUNT_MAX_SPLIT_COUNT * 3 - 5);
int count = 5;
byte[][] keys = Bytes.split("A".getBytes(), "Z".getBytes(), count - 2);
long tabletSize = 2L * 1024L * 1024L * 1024L;
long boundary = 0;
Expand All @@ -129,7 +128,7 @@ public void testSampleRowKeys() throws Exception {
}
Source source = (Source) CloudBigtableIO.read(scanConfig);
source.setSampleRowKeys(sampleRowKeys);
List<SourceWithKeys> splits = source.getSplits(20000);
List<SourceWithKeys> splits = source.getSplits(tabletSize * 2);
Collections.sort(
splits,
new Comparator<SourceWithKeys>() {
Expand All @@ -140,7 +139,6 @@ public int compare(SourceWithKeys o1, SourceWithKeys o2) {
ByteString.copyFrom(o2.getConfiguration().getStartRow()));
}
});
Assert.assertTrue(splits.size() <= AbstractSource.COUNT_MAX_SPLIT_COUNT);
Iterator<SourceWithKeys> iter = splits.iterator();
SourceWithKeys last = iter.next();
while (iter.hasNext()) {
Expand All @@ -159,7 +157,108 @@ public int compare(SourceWithKeys o1, SourceWithKeys o2) {
Assert.assertTrue(current.getEstimatedSize() >= tabletSize);
last = current;
}
// check first and last
}

@Test
public void testMergeSmallTablets() throws Exception {
List<KeyOffset> sampleRowKeys = new ArrayList<>();
long tabletSize = 10 * 1024 * 1024; // 10MB
// Tablets:
// "" to "A" (10MB)
// "A" to "B" (10MB)
// "B" to "C" (10MB)
// "C" to "D" (10MB)
// "D" to "E" (10MB)
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("B"), tabletSize * 2));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("C"), tabletSize * 3));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("D"), tabletSize * 4));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("E"), tabletSize * 5));

Source source = (Source) CloudBigtableIO.read(scanConfig);
source.setSampleRowKeys(sampleRowKeys);

// desired = 25MB
long desiredSize = 25 * 1024 * 1024;
List<SourceWithKeys> splits = source.getSplits(desiredSize);

Collections.sort(
splits,
(o1, o2) ->
ByteStringComparator.INSTANCE.compare(
ByteString.copyFrom(o1.getConfiguration().getStartRow()),
ByteString.copyFrom(o2.getConfiguration().getStartRow())));

// Expecting:
// Split 1: "" to "C" (30MB)
// Split 2: "C" to "" (20MB)
Assert.assertEquals(2, splits.size());
Assert.assertEquals("", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
Assert.assertEquals("C", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
Assert.assertEquals("C", Bytes.toStringBinary(splits.get(1).getConfiguration().getStartRow()));
Assert.assertEquals("", Bytes.toStringBinary(splits.get(1).getConfiguration().getStopRow()));
}

@Test
public void testRespectScanRange() throws Exception {
List<KeyOffset> sampleRowKeys = new ArrayList<>();
long tabletSize = 10 * 1024 * 1024;
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("B"), tabletSize * 2));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("C"), tabletSize * 3));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("D"), tabletSize * 4));
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("E"), tabletSize * 5));

// Scan from "B" to "D"
CloudBigtableScanConfiguration customScanConfig =
scanConfig.toBuilder().withKeys("B".getBytes(), "D".getBytes()).build();

Source source = (Source) CloudBigtableIO.read(customScanConfig);
source.setSampleRowKeys(sampleRowKeys);

List<SourceWithKeys> splits = source.getSplits(25 * 1024 * 1024);

Collections.sort(
splits,
(o1, o2) ->
ByteStringComparator.INSTANCE.compare(
ByteString.copyFrom(o1.getConfiguration().getStartRow()),
ByteString.copyFrom(o2.getConfiguration().getStartRow())));

// Tablet 3 ("B" to "C") and Tablet 4 ("C" to "D") are within range.
// They are merged into one split of "B" to "D" (20MB).
// Note: Due to current logic without flush on scanEnd, the last piece might get lost if it
// doesn't cross desiredBundleSize,
// but here we are checking if it respects the range. If it fails, it means it's lost and we
// should check if bug exists.
Assert.assertEquals(1, splits.size());
Assert.assertEquals("B", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
Assert.assertEquals("D", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
}

@Test
public void testLargeTabletsAsIs() throws Exception {
List<KeyOffset> sampleRowKeys = new ArrayList<>();
long tabletSize = 100 * 1024 * 1024; // 100MB
sampleRowKeys.add(KeyOffset.create(ByteString.copyFromUtf8("A"), tabletSize));

Source source = (Source) CloudBigtableIO.read(scanConfig);
source.setSampleRowKeys(sampleRowKeys);

List<SourceWithKeys> splits = source.getSplits(25 * 1024 * 1024); // desired 25MB

Collections.sort(
splits,
(o1, o2) ->
Bytes.compareTo(
o1.getConfiguration().getStartRow(), o2.getConfiguration().getStartRow()));

Assert.assertEquals(2, splits.size()); // The tablet + trailing region to end of table
Assert.assertEquals("", Bytes.toStringBinary(splits.get(0).getConfiguration().getStartRow()));
Assert.assertEquals("A", Bytes.toStringBinary(splits.get(0).getConfiguration().getStopRow()));
Assert.assertEquals("A", Bytes.toStringBinary(splits.get(1).getConfiguration().getStartRow()));
Assert.assertEquals("", Bytes.toStringBinary(splits.get(1).getConfiguration().getStopRow()));
Assert.assertEquals(tabletSize, splits.get(0).getEstimatedSize());
}

@Test
Expand Down
Loading