Skip to content

Commit 73451f2

Browse files
committed
Faster Uniformization
1 parent d058de6 commit 73451f2

14 files changed

Lines changed: 1414 additions & 67 deletions

File tree

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/UniformSplitterDBAdapter.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,36 @@ String getCountQuery(
8383
default Duration extractBoundaryDuration(ResultSet rs, int index) throws SQLException {
8484
return BoundaryExtractorFactory.parseTimeStringToDuration(rs.getString(index));
8585
}
86+
87+
/**
88+
* Get query for the prepared statement to estimate count (e.g., via EXPLAIN) of a given range.
89+
*
90+
* @param tableName name of the table to read.
91+
* @param partitionColumns partition columns.
92+
* @return Query Statement for execution plan estimation.
93+
*/
94+
default String getApproximateCountQuery(
95+
String tableName, ImmutableList<String> partitionColumns) {
96+
throw new UnsupportedOperationException("Approximate counts not supported by this dialect.");
97+
}
98+
99+
/**
100+
* Parse the estimated row count from the execution plan (EXPLAIN) result set.
101+
*
102+
* @param rs result set from executing {@link #getApproximateCountQuery}.
103+
* @return estimated row count.
104+
* @throws SQLException if parsing fails or format is unrecognized.
105+
*/
106+
default long parseApproximateCount(ResultSet rs) throws SQLException {
107+
throw new UnsupportedOperationException("Approximate count parsing not implemented.");
108+
}
109+
110+
/**
111+
* Check if this database dialect supports approximate execution plan counting (EXPLAIN).
112+
*
113+
* @return true if approximate counts are supported.
114+
*/
115+
default boolean supportsApproximateCounts() {
116+
return false;
117+
}
86118
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/Range.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public abstract class Range implements Serializable, Comparable<Range> {
4949
*/
5050
public abstract long count();
5151

52+
/**
53+
* Approximate count (e.g., from EXPLAIN query) of a given range. Defaults to {@link
54+
* Range#INDETERMINATE_COUNT}. If not explicitly assigned, defaults to whatever {@link #count()}
55+
* is set to.
56+
*
57+
* @return approximate count of rows represented by the range.
58+
*/
59+
public abstract long approxCount();
60+
5261
/**
5362
* Height for this range. The leaf child always has a height of 0. Defaults to 0.
5463
*
@@ -103,6 +112,7 @@ public TableIdentifier tableIdentifier() {
103112
public static <T extends Serializable> Builder builder() {
104113
return new AutoValue_Range.Builder()
105114
.setCount(INDETERMINATE_COUNT)
115+
.setApproxCount(INDETERMINATE_COUNT)
106116
.setHeight(0L)
107117
.setIsFirst(false)
108118
.setIsLast(false);
@@ -129,6 +139,22 @@ public Range withCount(long count, @Nullable ProcessContext processContext) {
129139
return this.toBuilder().setCount(count).build();
130140
}
131141

142+
/**
143+
* Return a cloned Range with both factual count and approximate count parameters set.
144+
*
145+
* @param count count of the range
146+
* @param approxCount approximate count of the range
147+
* @param processContext process context
148+
* @return range with count and approxCount.
149+
*/
150+
public Range withCounts(long count, long approxCount, @Nullable ProcessContext processContext) {
151+
if (hasChildRange()) {
152+
return withChildRange(
153+
childRange().withCounts(count, approxCount, processContext), processContext);
154+
}
155+
return this.toBuilder().setCount(count).setApproxCount(approxCount).build();
156+
}
157+
132158
/**
133159
* Add Counts ensuring that uncounted ranges don't lead to overflow. It is assumed that the total
134160
* rows moved by the migration job is less than {@link Long#MAX_VALUE}
@@ -181,6 +207,7 @@ public Range withChildRange(Range childRange, @Nullable ProcessContext processCo
181207
return this.toBuilder()
182208
.setChildRange(childRange)
183209
.setCount(childRange.count())
210+
.setApproxCount(childRange.approxCount())
184211
.setHeight(childRange.height() + 1)
185212
.build();
186213
}
@@ -228,12 +255,14 @@ public Pair<Range, Range> split(@Nullable ProcessContext processContext) {
228255
this.toBuilder()
229256
.setBoundary(boundaries.getLeft())
230257
.setCount(INDETERMINATE_COUNT)
258+
.setApproxCount(INDETERMINATE_COUNT)
231259
.setIsFirst(isFirst())
232260
.setIsLast(false)
233261
.build(),
234262
this.toBuilder()
235263
.setBoundary(boundaries.getRight())
236264
.setCount(INDETERMINATE_COUNT)
265+
.setApproxCount(INDETERMINATE_COUNT)
237266
.setIsFirst(false)
238267
.setIsLast(isLast())
239268
.build());
@@ -284,12 +313,14 @@ public Range mergeRange(Range other, @Nullable ProcessContext processContext) {
284313
return this.toBuilder()
285314
.setBoundary(this.boundary().merge(other.boundary()))
286315
.setCount(addCount(this.count(), other.count()))
316+
.setApproxCount(addCount(this.approxCount(), other.approxCount()))
287317
.setIsLast(other.isLast())
288318
.build();
289319
} else {
290320
return this.toBuilder()
291321
.setBoundary(this.boundary().merge(other.boundary()))
292322
.setCount(addCount(this.count(), other.count()))
323+
.setApproxCount(addCount(this.approxCount(), other.approxCount()))
293324
.setIsFirst(other.isFirst())
294325
.build();
295326
}
@@ -308,7 +339,7 @@ public boolean equals(Object obj) {
308339
return false;
309340
}
310341
Range that = (Range) obj;
311-
if (this.count() != that.count()) {
342+
if (this.count() != that.count() || this.approxCount() != that.approxCount()) {
312343
return false;
313344
}
314345
return Objects.equal(this.childRange(), that.childRange());
@@ -362,6 +393,10 @@ public int compareTo(Range other) {
362393
}
363394
}
364395
result = Long.valueOf(this.count()).compareTo(other.count());
396+
if (result != 0) {
397+
return result;
398+
}
399+
result = Long.valueOf(this.approxCount()).compareTo(other.approxCount());
365400

366401
Preconditions.checkState(
367402
result != 0,
@@ -382,6 +417,8 @@ public abstract static class Builder {
382417

383418
public abstract Builder setCount(long value);
384419

420+
public abstract Builder setApproxCount(long value);
421+
385422
protected abstract Builder setHeight(long value);
386423

387424
public abstract Builder setIsFirst(boolean value);
@@ -432,11 +469,19 @@ public <T extends Serializable> Builder setTableIdentifier(TableIdentifier table
432469
return this;
433470
}
434471

472+
abstract Range autoBuild();
473+
435474
/**
436-
* Build {@link Range}.
475+
* Build {@link Range}, auto-defaulting {@code approxCount} to {@code count} if left unassigned.
437476
*
438477
* @return range.
439478
*/
440-
public abstract Range build();
479+
public Range build() {
480+
Range range = autoBuild();
481+
if (range.approxCount() == INDETERMINATE_COUNT && range.count() != INDETERMINATE_COUNT) {
482+
range = range.toBuilder().setApproxCount(range.count()).build();
483+
}
484+
return range;
485+
}
441486
}
442487
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/TableSplitSpecification.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,13 @@ public TableSplitSpecification build() {
150150
}
151151

152152
if (!splitStagesCount().isPresent()) {
153-
setSplitStagesCount(
153+
long baseStages =
154154
logToBaseTwo(maxPartitionsHint)
155155
+ partitionColumns().size()
156-
+ 1 /* For initial counts */);
156+
+ 1 /* For initial counts */;
157+
// We add 1/4th more stages to the total count to accommodate for the initial phase
158+
// of the pipeline where only approximate counts are performed.
159+
setSplitStagesCount(baseStages + baseStages / 4);
157160
}
158161

159162
if (initialRange() != null) {

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/RangeClassifierDoFn.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
* <li><b>Multi-Column Splitting</b>: When a range on a single column becomes too large but cannot
5454
* be split further (e.g., all rows have the same value for that column), it triggers the
5555
* addition of the next configured partition column.
56+
* <li><b>Multi-Round Adaptive Memory Halving</b>: For extremely large ranges (hotspots), multiple
57+
* rounds of memory splitting are performed in a single stage to accelerate convergence.
5658
* </ul>
5759
*/
5860
@AutoValue
@@ -179,21 +181,43 @@ private void processTable(TableIdentifier tableId, List<Range> ranges, ProcessCo
179181
mean = Math.max(1, totalCount / maxPartitions);
180182

181183
for (Range range : ranges) {
182-
if (range.isUncounted()
183-
|| range.count()
184-
> ((1 + TableSplitSpecification.SPLITTER_MAX_RELATIVE_DEVIATION) * mean)) {
184+
// if `count` is not indeterminate, approxCount is always same as count.
185+
// (we don't trigger the approximate count query if the actual query does not time out and in
186+
// such a case, we copy the actual count into the approximate count.)
187+
long effectiveCount =
188+
range.approxCount() != Range.INDETERMINATE_COUNT ? range.approxCount() : range.count();
189+
// In the first 20% of split stages (which is 25% of the original base stages),
190+
// we only perform approximate counts.
191+
// After that, if an actual count query times out (becoming indeterminate),
192+
// we treat it as an empirical signal of a large range and force a split.
193+
boolean isUncountable =
194+
range.count() == Range.INDETERMINATE_COUNT && stageIdx() >= tableMaxSplitHeight / 5;
195+
196+
if (effectiveCount > ((1 + TableSplitSpecification.SPLITTER_MAX_RELATIVE_DEVIATION) * mean)
197+
|| isUncountable) {
198+
185199
if (stageIdx() == 0) {
186200
// For the first stage, we have an initial split without the counts.
187201
c.output(TO_COUNT_TAG, range);
188202
} else if (range.isSplittable(c)) {
189-
Pair<Range, Range> splitPair = range.split(c);
203+
int splitRounds = 1;
204+
if (effectiveCount != Range.INDETERMINATE_COUNT && mean > 0) {
205+
double ratio = (double) effectiveCount / mean;
206+
// Beyond 16, we want to avoid aggressive splitting, so we cap at 4 round split.
207+
if (ratio >= 16.0) {
208+
splitRounds = 4;
209+
} else if (ratio >= 8.0) {
210+
splitRounds = 3;
211+
} else if (ratio >= 4.0) {
212+
splitRounds = 2;
213+
}
214+
}
190215
logger.debug(
191-
"Counting range {} and {} for stage {}.",
192-
splitPair.getLeft(),
193-
splitPair.getRight(),
216+
"Performing {} rounds of memory splitting for range {} at stage {}.",
217+
splitRounds,
218+
range,
194219
stageIdx());
195-
c.output(TO_COUNT_TAG, splitPair.getLeft());
196-
c.output(TO_COUNT_TAG, splitPair.getRight());
220+
multiRoundSplit(range, splitRounds, c);
197221
} else {
198222
if (range.height() + 1 < tableSplitSpecification.partitionColumns().size()) {
199223
PartitionColumn newColumn =
@@ -218,6 +242,16 @@ private void processTable(TableIdentifier tableId, List<Range> ranges, ProcessCo
218242
}
219243
}
220244

245+
private void multiRoundSplit(Range range, int roundsLeft, ProcessContext c) {
246+
if (roundsLeft <= 0 || !range.isSplittable(c)) {
247+
c.output(TO_COUNT_TAG, range);
248+
return;
249+
}
250+
Pair<Range, Range> splitPair = range.split(c);
251+
multiRoundSplit(splitPair.getLeft(), roundsLeft - 1, c);
252+
multiRoundSplit(splitPair.getRight(), roundsLeft - 1, c);
253+
}
254+
221255
public static Builder builder() {
222256
return new AutoValue_RangeClassifierDoFn.Builder();
223257
}

0 commit comments

Comments
 (0)