Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
// Cross joins are not supported because they increase the amount of data.
condition.isDefined &&
// Do not push down join if either side has a pushed sample, because
// the merged scan builder would silently discard it.
// Do not push down join if either side has a pushed sample with
// fraction < 1, because the merged scan builder would silently
// discard it and change the result. At fraction = 1 without
// replacement the sample is a no-op on the result set, so dropping
// it is safe. With replacement (Poisson sampling), even fraction 1
// can emit each row 0, 1, 2, ... times, so it is not a no-op.
// TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is @stanyao 's todo JIRA, can we make this pr target that and remove the TODO?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, this is separate ?

// samples so sources supporting both can handle the composition.
leftHolder.pushedSample.isEmpty && rightHolder.pushedSample.isEmpty &&
leftHolder.pushedSample.forall(s =>
!s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
rightHolder.pushedSample.forall(s =>
!s.withReplacement && s.upperBound - s.lowerBound >= 1.0) &&
lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
// Process left and right columns in original order
val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase
val dfNoSample = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.id = $t2.id")
checkJoinPushed(dfNoSample)

// With SYSTEM sample on one side: join pushdown should be skipped
// With a SYSTEM sample (fraction < 1) on one side: join pushdown
// should be skipped because the merged scan builder would silently
// discard the sample.
val dfWithSample = sql(
s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (50 PERCENT) " +
s"JOIN $t2 ON $t1.id = $t2.id")
checkJoinNotPushed(dfWithSample)
// The sample should still be pushed down though
Expand All @@ -185,6 +187,58 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase
}
}

test("SPARK-55978: 100% SYSTEM sample does not block join pushdown") {
val joinSampleCatalog = "testjoinsample100"
registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog])
val t1 = s"$joinSampleCatalog.ns.t1"
val t2 = s"$joinSampleCatalog.ns.t2"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
try {
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
// At fraction = 1 the sample is a no-op on the result set, so
// dropping it inside the merged scan builder is safe. The guard
// in V2ScanRelationPushDown short-circuits and join pushdown
// proceeds.
val dfWithSample = sql(
s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " +
s"JOIN $t2 ON $t1.id = $t2.id")
checkJoinPushed(dfWithSample)
}
} finally {
sql(s"DROP TABLE IF EXISTS $t1")
sql(s"DROP TABLE IF EXISTS $t2")
}
}

test("SPARK-55978: with-replacement sample blocks join pushdown even at fraction 1") {
val joinSampleCatalog = "testjoinsamplerepl"
registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog])
val t1 = s"$joinSampleCatalog.ns.t1"
val t2 = s"$joinSampleCatalog.ns.t2"
sql(s"CREATE TABLE $t1 (id bigint, data string) USING _")
sql(s"CREATE TABLE $t2 (id bigint, data string) USING _")
try {
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')")
withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
// SQL TABLESAMPLE always sets withReplacement=false, so use the
// DataFrame API. Poisson sampling at fraction 1 still emits each
// input row 0, 1, 2, ... times, so the sample is not a no-op and
// join pushdown must remain blocked.
val df = spark.table(t1).sample(withReplacement = true, fraction = 1.0)
.join(spark.table(t2), "id")
checkJoinNotPushed(df)
checkSamplePushed(df, pushed = true)
}
} finally {
sql(s"DROP TABLE IF EXISTS $t1")
sql(s"DROP TABLE IF EXISTS $t2")
}
}

test("SPARK-55978: legacy connector with only 4-arg pushTableSample - BERNOULLI pushes down") {
val legacyCatalog = "testlegacysample"
registerCatalog(legacyCatalog, classOf[InMemoryTableWithLegacyTableSampleCatalog])
Expand Down