Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,24 @@ public boolean checkForTimeout(SQLException exception) {
return false;
}

@Override
public boolean supportsApproximateCounts() {
return true;
}

@Override
public String getApproximateCountQuery(String tableName, ImmutableList<String> partitionColumns) {
return addWhereClause("EXPLAIN SELECT * FROM " + tableName, partitionColumns);
}

@Override
public long parseApproximateCount(ResultSet rs) throws SQLException {
if (rs.next()) {
return rs.getLong("rows");
}
return -1L;
}

/**
* Get Query that returns order of collation. The query must return all the characters in the
* character set with the columns listed in {@link CollationsOrderQueryColumns}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,30 @@ public boolean checkForTimeout(SQLException exception) {
&& TIMEOUT_SQL_STATES.contains(exception.getSQLState().toUpperCase());
}

@Override
public boolean supportsApproximateCounts() {
return true;
}

@Override
public String getApproximateCountQuery(String tableName, ImmutableList<String> partitionColumns) {
return addWhereClause("EXPLAIN SELECT * FROM " + tableName, partitionColumns);
}

@Override
public long parseApproximateCount(ResultSet rs) throws SQLException {
if (rs.next()) {
String explainPlan = rs.getString(1);
// PostgreSQL default EXPLAIN format contains "rows=N"
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile("rows=(\\d+)");
java.util.regex.Matcher matcher = pattern.matcher(explainPlan);
if (matcher.find()) {
return Long.parseLong(matcher.group(1));
}
}
return -1L;
}

/**
* Ref <a href="https://www.db-fiddle.com/f/sJyGyFpqfnoxYFpEXPxR1/0"></a> Get Query that returns
* order of collation. The query must return all the characters in the character set with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,36 @@ String getCountQuery(
default Duration extractBoundaryDuration(ResultSet rs, int index) throws SQLException {
return BoundaryExtractorFactory.parseTimeStringToDuration(rs.getString(index));
}

/**
* Get query for the prepared statement to estimate count (e.g., via EXPLAIN) of a given range.
*
* @param tableName name of the table to read.
* @param partitionColumns partition columns.
* @return Query Statement for execution plan estimation.
*/
default String getApproximateCountQuery(
String tableName, ImmutableList<String> partitionColumns) {
throw new UnsupportedOperationException("Approximate counts not supported by this dialect.");
}

/**
* Parse the estimated row count from the execution plan (EXPLAIN) result set.
*
* @param rs result set from executing {@link #getApproximateCountQuery}.
* @return estimated row count, or -1L if parsing fails or format is unrecognized.
* @throws SQLException if a database error occurs.
*/
default long parseApproximateCount(ResultSet rs) throws SQLException {
throw new UnsupportedOperationException("Approximate count parsing not implemented.");
}

/**
* Check if this database dialect supports approximate execution plan counting (EXPLAIN).
*
* @return true if approximate counts are supported.
*/
default boolean supportsApproximateCounts() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public abstract class Range implements Serializable, Comparable<Range> {
*/
public abstract long count();

/**
* Approximate count (e.g., from EXPLAIN query) of a given range. Defaults to {@link
* Range#INDETERMINATE_COUNT}. If not explicitly assigned, defaults to whatever {@link #count()}
* is set to.
*
* @return approximate count of rows represented by the range.
*/
public abstract long approxCount();

/**
* Height for this range. The leaf child always has a height of 0. Defaults to 0.
*
Expand Down Expand Up @@ -103,6 +112,7 @@ public TableIdentifier tableIdentifier() {
public static <T extends Serializable> Builder builder() {
return new AutoValue_Range.Builder()
.setCount(INDETERMINATE_COUNT)
.setApproxCount(INDETERMINATE_COUNT)
.setHeight(0L)
.setIsFirst(false)
.setIsLast(false);
Expand All @@ -129,6 +139,22 @@ public Range withCount(long count, @Nullable ProcessContext processContext) {
return this.toBuilder().setCount(count).build();
}

/**
* Return a cloned Range with both factual count and approximate count parameters set.
*
* @param count count of the range
* @param approxCount approximate count of the range
* @param processContext process context
* @return range with count and approxCount.
*/
public Range withCounts(long count, long approxCount, @Nullable ProcessContext processContext) {
if (hasChildRange()) {
return withChildRange(
childRange().withCounts(count, approxCount, processContext), processContext);
}
return this.toBuilder().setCount(count).setApproxCount(approxCount).build();
}

/**
* Add Counts ensuring that uncounted ranges don't lead to overflow. It is assumed that the total
* rows moved by the migration job is less than {@link Long#MAX_VALUE}
Expand Down Expand Up @@ -181,6 +207,7 @@ public Range withChildRange(Range childRange, @Nullable ProcessContext processCo
return this.toBuilder()
.setChildRange(childRange)
.setCount(childRange.count())
.setApproxCount(childRange.approxCount())
.setHeight(childRange.height() + 1)
.build();
}
Expand Down Expand Up @@ -228,12 +255,14 @@ public Pair<Range, Range> split(@Nullable ProcessContext processContext) {
this.toBuilder()
.setBoundary(boundaries.getLeft())
.setCount(INDETERMINATE_COUNT)
.setApproxCount(INDETERMINATE_COUNT)
.setIsFirst(isFirst())
.setIsLast(false)
.build(),
this.toBuilder()
.setBoundary(boundaries.getRight())
.setCount(INDETERMINATE_COUNT)
.setApproxCount(INDETERMINATE_COUNT)
.setIsFirst(false)
.setIsLast(isLast())
.build());
Expand Down Expand Up @@ -284,12 +313,14 @@ public Range mergeRange(Range other, @Nullable ProcessContext processContext) {
return this.toBuilder()
.setBoundary(this.boundary().merge(other.boundary()))
.setCount(addCount(this.count(), other.count()))
.setApproxCount(addCount(this.approxCount(), other.approxCount()))
.setIsLast(other.isLast())
.build();
} else {
return this.toBuilder()
.setBoundary(this.boundary().merge(other.boundary()))
.setCount(addCount(this.count(), other.count()))
.setApproxCount(addCount(this.approxCount(), other.approxCount()))
.setIsFirst(other.isFirst())
.build();
}
Expand All @@ -308,7 +339,7 @@ public boolean equals(Object obj) {
return false;
}
Range that = (Range) obj;
if (this.count() != that.count()) {
if (this.count() != that.count() || this.approxCount() != that.approxCount()) {
return false;
}
return Objects.equal(this.childRange(), that.childRange());
Expand Down Expand Up @@ -362,6 +393,10 @@ public int compareTo(Range other) {
}
}
result = Long.valueOf(this.count()).compareTo(other.count());
if (result != 0) {
return result;
}
result = Long.valueOf(this.approxCount()).compareTo(other.approxCount());

Preconditions.checkState(
result != 0,
Expand All @@ -382,6 +417,8 @@ public abstract static class Builder {

public abstract Builder setCount(long value);

public abstract Builder setApproxCount(long value);

protected abstract Builder setHeight(long value);

public abstract Builder setIsFirst(boolean value);
Expand Down Expand Up @@ -432,11 +469,19 @@ public <T extends Serializable> Builder setTableIdentifier(TableIdentifier table
return this;
}

abstract Range autoBuild();

/**
* Build {@link Range}.
* Build {@link Range}, auto-defaulting {@code approxCount} to {@code count} if left unassigned.
*
* @return range.
*/
public abstract Range build();
public Range build() {
Range range = autoBuild();
if (range.approxCount() == INDETERMINATE_COUNT && range.count() != INDETERMINATE_COUNT) {
range = range.toBuilder().setApproxCount(range.count()).build();
}
return range;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ public TableSplitSpecification build() {
}

if (!splitStagesCount().isPresent()) {
setSplitStagesCount(
long baseStages =
logToBaseTwo(maxPartitionsHint)
+ partitionColumns().size()
+ 1 /* For initial counts */);
+ 1 /* For initial counts */;
// We add 1/4th more stages to the total count to accommodate for the initial phase
// of the pipeline where only approximate counts are performed.
setSplitStagesCount(baseStages + baseStages / 4);
}

if (initialRange() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* <li><b>Multi-Column Splitting</b>: When a range on a single column becomes too large but cannot
* be split further (e.g., all rows have the same value for that column), it triggers the
* addition of the next configured partition column.
* <li><b>Multi-Round Adaptive Memory Halving</b>: For extremely large ranges (hotspots), multiple
* rounds of memory splitting are performed in a single stage to accelerate convergence.
* </ul>
*/
@AutoValue
Expand Down Expand Up @@ -179,21 +181,43 @@ private void processTable(TableIdentifier tableId, List<Range> ranges, ProcessCo
mean = Math.max(1, totalCount / maxPartitions);

for (Range range : ranges) {
if (range.isUncounted()
|| range.count()
> ((1 + TableSplitSpecification.SPLITTER_MAX_RELATIVE_DEVIATION) * mean)) {
// if `count` is not indeterminate, approxCount is always same as count.
// (we don't trigger the approximate count query if the actual query does not time out and in
// such a case, we copy the actual count into the approximate count.)
long effectiveCount =
range.approxCount() != Range.INDETERMINATE_COUNT ? range.approxCount() : range.count();
// In the first 20% of split stages (which is 25% of the original base stages),
// we only perform approximate counts.
// After that, if an actual count query times out (becoming indeterminate),
// we treat it as an empirical signal of a large range and force a split.
boolean isUncountable =
range.count() == Range.INDETERMINATE_COUNT && stageIdx() >= tableMaxSplitHeight / 5;

if (effectiveCount > ((1 + TableSplitSpecification.SPLITTER_MAX_RELATIVE_DEVIATION) * mean)
|| isUncountable) {

if (stageIdx() == 0) {
// For the first stage, we have an initial split without the counts.
c.output(TO_COUNT_TAG, range);
} else if (range.isSplittable(c)) {
Pair<Range, Range> splitPair = range.split(c);
int splitRounds = 1;
if (effectiveCount != Range.INDETERMINATE_COUNT && mean > 0) {
double ratio = (double) effectiveCount / mean;
// Beyond 16, we want to avoid aggressive splitting, so we cap at 4 round split.
if (ratio >= 16.0) {
splitRounds = 4;
} else if (ratio >= 8.0) {
splitRounds = 3;
} else if (ratio >= 4.0) {
splitRounds = 2;
}
}
logger.debug(
"Counting range {} and {} for stage {}.",
splitPair.getLeft(),
splitPair.getRight(),
"Performing {} rounds of memory splitting for range {} at stage {}.",
splitRounds,
range,
stageIdx());
c.output(TO_COUNT_TAG, splitPair.getLeft());
c.output(TO_COUNT_TAG, splitPair.getRight());
multiRoundSplit(range, splitRounds, c);
} else {
if (range.height() + 1 < tableSplitSpecification.partitionColumns().size()) {
PartitionColumn newColumn =
Expand All @@ -218,6 +242,16 @@ private void processTable(TableIdentifier tableId, List<Range> ranges, ProcessCo
}
}

private void multiRoundSplit(Range range, int roundsLeft, ProcessContext c) {
if (roundsLeft <= 0 || !range.isSplittable(c)) {
c.output(TO_COUNT_TAG, range);
return;
}
Pair<Range, Range> splitPair = range.split(c);
multiRoundSplit(splitPair.getLeft(), roundsLeft - 1, c);
multiRoundSplit(splitPair.getRight(), roundsLeft - 1, c);
}

public static Builder builder() {
return new AutoValue_RangeClassifierDoFn.Builder();
}
Expand Down
Loading
Loading