diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java similarity index 89% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java index 04a6d055f56b7..d6ea072b96342 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java @@ -20,16 +20,18 @@ import java.util.Objects; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * Encapsulates the parameters of a Change Data Capture (CDC) query, passed from the * parser / DataFrame API to the catalog's - * {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method. + * {@link TableCatalog#loadChangelog(Identifier, ChangelogContext, CaseInsensitiveStringMap)} + * method. * * @since 4.2.0 */ @Evolving -public class ChangelogInfo { +public class ChangelogContext { /** * Deduplication modes controlling how Spark post-processes raw change data. @@ -47,7 +49,7 @@ public enum DeduplicationMode { private final DeduplicationMode deduplicationMode; private final boolean computeUpdates; - public ChangelogInfo( + public ChangelogContext( ChangelogRange range, DeduplicationMode deduplicationMode, boolean computeUpdates) { @@ -68,7 +70,7 @@ public ChangelogInfo( @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof ChangelogInfo that)) return false; + if (!(o instanceof ChangelogContext that)) return false; return computeUpdates == that.computeUpdates && Objects.equals(range, that.range) && deduplicationMode == that.deduplicationMode; @@ -81,7 +83,7 @@ public int hashCode() { @Override public String toString() { - return "ChangelogInfo{range=" + range + + return "ChangelogContext{range=" + range + ", deduplicationMode=" + deduplicationMode + ", computeUpdates=" + computeUpdates + "}"; } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index a6f51342aef59..f64c34ee0e071 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -25,6 +25,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors; import org.apache.spark.sql.errors.QueryExecutionErrors; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.ArrayList; import java.util.Map; @@ -195,22 +196,25 @@ default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExce /** * Load a {@link Changelog} for the given table, representing the row-level changes within the - * range specified by {@code changelogInfo}. + * range specified by {@code context}. *
* The default implementation throws an analysis exception indicating that the catalog does * not support CDC. Catalogs that support CDC must override this method. * * @param ident a table identifier - * @param changelogInfo the CDC query parameters (range, deduplication mode, etc.) + * @param context the CDC query context (range, deduplication mode, etc.) + * @param options all options passed to the changelog query, including the CDC-recognized + * keys (range, deduplication mode, etc.) that are also parsed into {@code context} * @return a Changelog instance for the requested table and range * @throws NoSuchTableException If the table doesn't exist * * @since 4.2.0 */ - default Changelog loadChangelog(Identifier ident, ChangelogInfo changelogInfo) - throws NoSuchTableException { - throw new UnsupportedOperationException( - name() + " does not support Change Data Capture (CDC)"); + default Changelog loadChangelog( + Identifier ident, + ChangelogContext context, + CaseInsensitiveStringMap options) throws NoSuchTableException { + throw new UnsupportedOperationException(name() + " does not support Change Data Capture (CDC)"); } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d7d06303b7819..af659894f24d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1135,8 +1135,8 @@ class Analyzer( val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone) resolveRelation(u, timeTravelSpec).getOrElse(r) - case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) => - relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r) + case r @ RelationChanges(u: UnresolvedRelation, ctx) => + relationResolution.resolveChangelog(u, ctx).getOrElse(r) case u @ UnresolvedTable(identifier, cmd, suggestAlternative) => lookupTableOrView(identifier).map { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala similarity index 87% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala index fb7ae01843d6d..2b679d955f523 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala @@ -21,16 +21,16 @@ import java.lang.{Long => JLong} import java.util.{Locale, Optional => JOptional} import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} -import org.apache.spark.sql.connector.catalog.ChangelogInfo +import org.apache.spark.sql.connector.catalog.ChangelogContext import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.TimestampType import org.apache.spark.sql.util.CaseInsensitiveStringMap /** - * Utility methods for constructing [[ChangelogInfo]] from DataFrame API options. + * Utility methods for constructing [[ChangelogContext]] from DataFrame API options. */ -object ChangelogInfoUtils { +object ChangelogContextUtils { private val STARTING_VERSION = "startingVersion" private val ENDING_VERSION = "endingVersion" @@ -42,12 +42,12 @@ object ChangelogInfoUtils { private val COMPUTE_UPDATES = "computeUpdates" /** - * Build a [[ChangelogInfo]] from the options specified via `.option()` calls on + * Build a [[ChangelogContext]] from the options specified via `.option()` calls on * `DataFrameReader` or `DataStreamReader`. */ def fromOptions( options: CaseInsensitiveStringMap, - sessionLocalTimeZone: String): ChangelogInfo = { + sessionLocalTimeZone: String): ChangelogContext = { val startVersion = Option(options.get(STARTING_VERSION)) val endVersion = Option(options.get(ENDING_VERSION)) val startTimestamp = Option(options.get(STARTING_TIMESTAMP)) @@ -59,9 +59,9 @@ object ChangelogInfoUtils { val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE)) .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT) val deduplicationMode = deduplicationModeStr match { - case "none" => ChangelogInfo.DeduplicationMode.NONE - case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS - case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES + case "none" => ChangelogContext.DeduplicationMode.NONE + case "dropcarryovers" => ChangelogContext.DeduplicationMode.DROP_CARRYOVERS + case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES case other => throw QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other) } @@ -98,7 +98,7 @@ object ChangelogInfoUtils { new UnboundedRange() } - new ChangelogInfo(range, deduplicationMode, computeUpdates) + new ChangelogContext(range, deduplicationMode, computeUpdates) } private def parseTimestamp(timestampStr: String, sessionLocalTimeZone: String): Long = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala index 2b4ba58d1745c..84f82ffc1f2ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_CHANGES, TreePattern} -import org.apache.spark.sql.connector.catalog.ChangelogInfo +import org.apache.spark.sql.connector.catalog.ChangelogContext /** * A logical node used to query Change Data Capture (CDC) changes for a table relation. @@ -33,10 +33,10 @@ import org.apache.spark.sql.connector.catalog.ChangelogInfo * [[UnresolvedLeafNode]]). Tree traversals like `transformUp` will not visit `relation`. * * @param relation the table relation (typically an [[UnresolvedRelation]]) - * @param changelogInfo the CDC query parameters (range, deduplication mode, etc.) + * @param changelogContext the CDC query context (range, deduplication mode, etc.) */ case class RelationChanges( relation: LogicalPlan, - changelogInfo: ChangelogInfo) extends UnresolvedLeafNode { + changelogContext: ChangelogContext) extends UnresolvedLeafNode { override val nodePatterns: Seq[TreePattern] = Seq(RELATION_CHANGES) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 8769d1c4e4ffa..55a7ad10790ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.{ CatalogManager, CatalogPlugin, CatalogV2Util, - ChangelogInfo, + ChangelogContext, Identifier, LookupCatalog, MetadataTable, @@ -330,19 +330,17 @@ class RelationResolution( * Resolve a CDC (CHANGES) query: look up the catalog, call loadChangelog(), wrap in * ChangelogTable, and return a DataSourceV2Relation. */ - def resolveChangelog( - u: UnresolvedRelation, - changelogInfo: ChangelogInfo): Option[LogicalPlan] = { + def resolveChangelog(u: UnresolvedRelation, ctx: ChangelogContext): Option[LogicalPlan] = { expandIdentifier(u.multipartIdentifier) match { case CatalogAndIdentifier(catalog, ident) => val tableCatalog = catalog.asTableCatalog val changelog = try { - tableCatalog.loadChangelog(ident, changelogInfo) + tableCatalog.loadChangelog(ident, ctx, u.options) } catch { case _: UnsupportedOperationException => throw QueryCompilationErrors.cdcNotSupportedError(tableCatalog.name()) } - val changelogTable = ChangelogTable(changelog, changelogInfo) + val changelogTable = ChangelogTable(changelog, ctx) val relation = if (u.isStreaming) { StreamingRelationV2( None, changelogTable.name, changelogTable, u.options, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 7db9f6fa405bb..3a7b26c710aa9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo} +import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor} @@ -123,7 +123,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if !table.resolved => val changelog = table.changelog - val req = evaluateRequirements(changelog, table.changelogInfo) + val req = evaluateRequirements(changelog, table.changelogContext) val resolvedRel = rel.copy(table = table.copy(resolved = true)) var updatedRel: LogicalPlan = resolvedRel @@ -140,14 +140,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel) updatedRel = injectNetChangeComputation( - updatedRel, rowIdExprs, table.changelogInfo.computeUpdates()) + updatedRel, rowIdExprs, table.changelogContext.computeUpdates()) } updatedRel case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) if !table.resolved => val changelog = table.changelog - val req = evaluateRequirements(changelog, table.changelogInfo) + val req = evaluateRequirements(changelog, table.changelogContext) val resolvedRel = rel.copy(table = table.copy(resolved = true)) var updatedRel: LogicalPlan = resolvedRel if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { @@ -164,7 +164,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // output, so name-based resolution against `updatedRel` recovers the right // attributes regardless of any preceding wrapping. updatedRel = addStreamingNetChangeComputation( - updatedRel, changelog, table.changelogInfo.computeUpdates()) + updatedRel, changelog, table.changelogContext.computeUpdates()) } updatedRel } @@ -175,7 +175,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Captures which post-processing passes a CDC query requires, derived from the - * user-provided [[ChangelogInfo]] options and the connector-declared [[Changelog]] + * user-provided [[ChangelogContext]] options and the connector-declared [[Changelog]] * capability flags. */ private case class PostProcessingRequirements( @@ -194,14 +194,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { */ private def evaluateRequirements( changelog: Changelog, - options: ChangelogInfo): PostProcessingRequirements = { + options: ChangelogContext): PostProcessingRequirements = { val requiresCarryOverRemoval = - options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && + options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE && changelog.containsCarryoverRows() val requiresUpdateDetection = options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() val requiresNetChanges = - options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && + options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES && changelog.containsIntermediateChanges() // If carry-overs are surfaced and update detection is enabled without carry-over @@ -209,7 +209,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // results. Hence we throw. if (requiresUpdateDetection && changelog.containsCarryoverRows() && - options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) { + options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) { throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval( changelog.name()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9a1593cababb8..ff69be1956c6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogContext, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} @@ -2649,17 +2649,17 @@ class AstBuilder extends DataTypeAstBuilder withOrigin(ctx) { val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause)) val options = resolveOptions(Option(ctx.optionsClause)) - val changelogInfo = buildChangelogInfo(ctx.changesClause, options) - val result = RelationChanges(relation, changelogInfo) + val changelogContext = buildChangelogContext(ctx.changesClause, options) + val result = RelationChanges(relation, changelogContext) mayApplyAliasPlan(ctx.tableAlias, result) } /** - * Build a [[ChangelogInfo]] from a batch changesClause context and optional WITH options. + * Build a [[ChangelogContext]] from a batch changesClause context and optional WITH options. */ - private def buildChangelogInfo( + private def buildChangelogContext( ctx: ChangesClauseContext, - options: CaseInsensitiveStringMap): ChangelogInfo = { + options: CaseInsensitiveStringMap): ChangelogContext = { val startExclusive = ctx.startExclusive != null val endExclusive = ctx.endExclusive != null val startInclusive = !startExclusive @@ -2704,16 +2704,16 @@ class AstBuilder extends DataTypeAstBuilder } val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options) - new ChangelogInfo(range, deduplicationMode, computeUpdates) + new ChangelogContext(range, deduplicationMode, computeUpdates) } /** - * Build a [[ChangelogInfo]] from a streaming streamChangesClause context and optional + * Build a [[ChangelogContext]] from a streaming streamChangesClause context and optional * WITH options. */ - private def buildStreamChangelogInfo( + private def buildStreamChangelogContext( ctx: StreamChangesClauseContext, - options: CaseInsensitiveStringMap): ChangelogInfo = { + options: CaseInsensitiveStringMap): ChangelogContext = { val startExclusive = ctx.startExclusive != null val startInclusive = !startExclusive @@ -2744,7 +2744,7 @@ class AstBuilder extends DataTypeAstBuilder } val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options) - new ChangelogInfo(range, deduplicationMode, computeUpdates) + new ChangelogContext(range, deduplicationMode, computeUpdates) } /** @@ -2752,14 +2752,13 @@ class AstBuilder extends DataTypeAstBuilder * Defaults: DROP_CARRYOVERS for deduplicationMode, false for computeUpdates. */ private def resolveChangelogOptions( - options: CaseInsensitiveStringMap) - : (ChangelogInfo.DeduplicationMode, Boolean) = { + options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode, Boolean) = { val deduplicationModeStr = Option(options.get("deduplicationMode")) .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT) val deduplicationMode = deduplicationModeStr match { - case "none" => ChangelogInfo.DeduplicationMode.NONE - case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS - case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES + case "none" => ChangelogContext.DeduplicationMode.NONE + case "dropcarryovers" => ChangelogContext.DeduplicationMode.DROP_CARRYOVERS + case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES case other => throw QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other) } @@ -2925,8 +2924,8 @@ class AstBuilder extends DataTypeAstBuilder case Some(changesCtx) => // Streaming CDC: wrap in RelationChanges and NamedStreamingRelation val options = resolveOptions(Option(ctx.optionsClause)) - val changelogInfo = buildStreamChangelogInfo(changesCtx, options) - val result = RelationChanges(relation, changelogInfo) + val changelogContext = buildStreamChangelogContext(changesCtx, options) + val result = RelationChanges(relation, changelogContext) val table = mayApplyAliasPlan(ctx.tableAlias, result) val tableWithWatermark = table.optionalMap(ctx.watermarkClause)(withWatermark) val sourceNameOpt = extractSourceName(ctx.identifiedByClause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index 0a341fe906873..ec45f1f373177 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.{EnumSet => JEnumSet, Set => JSet} -import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext, Column, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.errors.QueryCompilationErrors @@ -35,7 +35,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ case class ChangelogTable( changelog: Changelog, - changelogInfo: ChangelogInfo, + changelogContext: ChangelogContext, resolved: Boolean = false) extends Table with SupportsRead { // Validate that the connector returned a schema with the required CDC metadata columns diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala similarity index 80% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala index 312754fa24dd9..93bab0009d67e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala @@ -22,11 +22,11 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange} +import org.apache.spark.sql.connector.catalog.{ChangelogContext, ChangelogRange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap -class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { +class ChangelogContextUtilsSuite extends SparkFunSuite with SQLHelper { private val testTimeZone = "UTC" @@ -35,7 +35,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { } test("version range with both start and end") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("startingVersion" -> "1", "endingVersion" -> "5"), testTimeZone) val range = info.range().asInstanceOf[ChangelogRange.VersionRange] assert(range.startingVersion() == "1") @@ -45,7 +45,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { } test("version range with only start") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("startingVersion" -> "10"), testTimeZone) val range = info.range().asInstanceOf[ChangelogRange.VersionRange] assert(range.startingVersion() == "10") @@ -55,14 +55,14 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { test("version range - endingVersion without startingVersion throws") { checkError( intercept[AnalysisException] { - ChangelogInfoUtils.fromOptions( + ChangelogContextUtils.fromOptions( makeOptions("endingVersion" -> "5"), testTimeZone) }, condition = "INVALID_CDC_OPTION.MISSING_STARTING_VERSION") } test("timestamp range with both start and end") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("startingTimestamp" -> "2026-01-01", "endingTimestamp" -> "2026-02-01"), testTimeZone) val range = info.range().asInstanceOf[ChangelogRange.TimestampRange] @@ -72,7 +72,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { } test("timestamp range with only start") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("startingTimestamp" -> "2026-01-01"), testTimeZone) val range = info.range().asInstanceOf[ChangelogRange.TimestampRange] assert(!range.endingTimestamp().isPresent) @@ -81,7 +81,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { test("timestamp range - endingTimestamp without startingTimestamp throws") { checkError( intercept[AnalysisException] { - ChangelogInfoUtils.fromOptions( + ChangelogContextUtils.fromOptions( makeOptions("endingTimestamp" -> "2026-02-01"), testTimeZone) }, condition = "INVALID_CDC_OPTION.MISSING_STARTING_TIMESTAMP") @@ -90,7 +90,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { test("cannot mix version and timestamp range") { checkError( intercept[AnalysisException] { - ChangelogInfoUtils.fromOptions( + ChangelogContextUtils.fromOptions( makeOptions("startingVersion" -> "1", "startingTimestamp" -> "2026-01-01"), testTimeZone) }, @@ -98,37 +98,37 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { } test("unbounded range when no version or timestamp specified") { - val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) + val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone) assert(info.range().isInstanceOf[ChangelogRange.UnboundedRange]) } test("deduplication mode - none") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("deduplicationMode" -> "none"), testTimeZone) - assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) + assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) } test("deduplication mode - dropCarryovers (default)") { - val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) - assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone) + assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) } test("deduplication mode - netChanges") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("deduplicationMode" -> "netChanges"), testTimeZone) - assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES) + assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES) } test("deduplication mode - case insensitive") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("deduplicationMode" -> "DROPCARRYOVERS"), testTimeZone) - assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) } test("deduplication mode - invalid value throws") { checkError( intercept[AnalysisException] { - ChangelogInfoUtils.fromOptions( + ChangelogContextUtils.fromOptions( makeOptions("deduplicationMode" -> "invalid"), testTimeZone) }, condition = "INVALID_CDC_OPTION.INVALID_DEDUPLICATION_MODE", @@ -136,18 +136,18 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { } test("computeUpdates option") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions("computeUpdates" -> "true"), testTimeZone) assert(info.computeUpdates()) } test("computeUpdates defaults to false") { - val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) + val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone) assert(!info.computeUpdates()) } test("bound inclusivity options") { - val info = ChangelogInfoUtils.fromOptions( + val info = ChangelogContextUtils.fromOptions( makeOptions( "startingVersion" -> "1", "endingVersion" -> "5", @@ -162,7 +162,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { test("invalid timestamp throws") { checkError( intercept[AnalysisException] { - ChangelogInfoUtils.fromOptions( + ChangelogContextUtils.fromOptions( makeOptions("startingTimestamp" -> "not-a-timestamp"), testTimeZone) }, condition = "INVALID_CDC_OPTION.INVALID_TIMESTAMP", @@ -177,7 +177,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { // = 2026-01-01 08:00:00 UTC = expectedUtcMicros + 8h val expectedPstMicros = 1767254400000000L - val utcInfo = ChangelogInfoUtils.fromOptions( + val utcInfo = ChangelogContextUtils.fromOptions( makeOptions("startingTimestamp" -> tsStr), "UTC") val utcRange = utcInfo.range().asInstanceOf[ChangelogRange.TimestampRange] @@ -185,7 +185,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { - val laInfo = ChangelogInfoUtils.fromOptions( + val laInfo = ChangelogContextUtils.fromOptions( makeOptions("startingTimestamp" -> tsStr), SQLConf.get.sessionLocalTimeZone) val laRange = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index c0495a47ce698..5370d1ee8d313 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{EvaluateUnresolvedInlineTable, IntervalUtils} -import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange} +import org.apache.spark.sql.connector.catalog.{ChangelogContext, ChangelogRange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StringType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -2194,14 +2194,14 @@ class PlanParserSuite extends AnalysisTest { endInclusive: Boolean = true): RelationChanges = { RelationChanges( UnresolvedRelation(Seq("a", "b", "c")), - new ChangelogInfo( + new ChangelogContext( new ChangelogRange.VersionRange( startVersion, endVersion.map(java.util.Optional.of[String]) .getOrElse(java.util.Optional.empty[String]), startInclusive, endInclusive), - ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS, + ChangelogContext.DeduplicationMode.DROP_CARRYOVERS, false)) } @@ -2262,7 +2262,7 @@ class PlanParserSuite extends AnalysisTest { case rc: RelationChanges => rc case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges] } - changes.changelogInfo.range().asInstanceOf[ChangelogRange.TimestampRange] + changes.changelogContext.range().asInstanceOf[ChangelogRange.TimestampRange] } // Basic timestamp range @@ -2300,54 +2300,54 @@ class PlanParserSuite extends AnalysisTest { } test("CHANGES clause - with options") { - def assertChangelogInfo(sql: String): ChangelogInfo = { + def assertChangelogContext(sql: String): ChangelogContext = { val plan = parsePlan(sql) val project = plan.asInstanceOf[Project] val changes = project.child match { case rc: RelationChanges => rc case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges] } - changes.changelogInfo + changes.changelogContext } // Default: DROP_CARRYOVERS and computeUpdates = false - val info1 = assertChangelogInfo( + val info1 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20") - assert(info1.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + assert(info1.deduplicationMode() == ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) assert(!info1.computeUpdates()) // deduplicationMode = none - val info2 = assertChangelogInfo( + val info2 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " + "WITH (deduplicationMode = 'none')") - assert(info2.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) + assert(info2.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) assert(!info2.computeUpdates()) // deduplicationMode = netChanges - val info3 = assertChangelogInfo( + val info3 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " + "WITH (deduplicationMode = 'netChanges')") - assert(info3.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES) + assert(info3.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES) // computeUpdates = true - val info4 = assertChangelogInfo( + val info4 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " + "WITH (computeUpdates = 'true')") - assert(info4.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + assert(info4.deduplicationMode() == ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) assert(info4.computeUpdates()) // Both options together - val info5 = assertChangelogInfo( + val info5 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " + "WITH (deduplicationMode = 'none', computeUpdates = 'true')") - assert(info5.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) + assert(info5.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) assert(info5.computeUpdates()) // Case-insensitive deduplicationMode value - val info6 = assertChangelogInfo( + val info6 = assertChangelogContext( "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " + "WITH (deduplicationMode = 'DROPCARRYOVERS')") - assert(info6.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + assert(info6.deduplicationMode() == ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) } test("CHANGES clause - invalid deduplicationMode") { @@ -2378,10 +2378,10 @@ class PlanParserSuite extends AnalysisTest { Project(Seq(UnresolvedStar(None)), RelationChanges( UnresolvedRelation(Seq("my_table")), - new ChangelogInfo( + new ChangelogContext( new ChangelogRange.VersionRange( "1", java.util.Optional.empty[String], true, true), - ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS, + ChangelogContext.DeduplicationMode.DROP_CARRYOVERS, false)))) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala index 880431b189a7d..b28f046c91079 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.AliasIdentifier import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedStreamingRelation, RelationChanges, UnresolvedRelation, UnresolvedStar, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias} import org.apache.spark.sql.catalyst.streaming.{Unassigned, UserProvided} -import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange} +import org.apache.spark.sql.connector.catalog.{ChangelogContext, ChangelogRange} import org.apache.spark.sql.util.CaseInsensitiveStringMap class StreamRelationParserSuite extends AnalysisTest { @@ -594,17 +594,17 @@ class StreamRelationParserSuite extends AnalysisTest { val plan = parsePlan("SELECT * FROM STREAM t CHANGES") val relationChanges = plan.collect { case rc: RelationChanges => rc } assert(relationChanges.size == 1) - assert(relationChanges.head.changelogInfo.range().isInstanceOf[ChangelogRange.UnboundedRange]) - assert(relationChanges.head.changelogInfo.deduplicationMode() == - ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) - assert(!relationChanges.head.changelogInfo.computeUpdates()) + assert(relationChanges.head.changelogContext.range().isInstanceOf[ChangelogRange.UnboundedRange]) + assert(relationChanges.head.changelogContext.deduplicationMode() == + ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) + assert(!relationChanges.head.changelogContext.computeUpdates()) } test("STREAM t CHANGES FROM VERSION") { val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 1") val relationChanges = plan.collect { case rc: RelationChanges => rc } assert(relationChanges.size == 1) - val range = relationChanges.head.changelogInfo.range() + val range = relationChanges.head.changelogContext.range() .asInstanceOf[ChangelogRange.VersionRange] assert(range.startingVersion() == "1") assert(!range.endingVersion().isPresent) @@ -615,7 +615,7 @@ class StreamRelationParserSuite extends AnalysisTest { val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 5 EXCLUSIVE") val relationChanges = plan.collect { case rc: RelationChanges => rc } assert(relationChanges.size == 1) - val range = relationChanges.head.changelogInfo.range() + val range = relationChanges.head.changelogContext.range() .asInstanceOf[ChangelogRange.VersionRange] assert(range.startingVersion() == "5") assert(!range.startingBoundInclusive()) @@ -625,7 +625,7 @@ class StreamRelationParserSuite extends AnalysisTest { val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM TIMESTAMP '2026-01-01'") val relationChanges = plan.collect { case rc: RelationChanges => rc } assert(relationChanges.size == 1) - assert(relationChanges.head.changelogInfo.range() + assert(relationChanges.head.changelogContext.range() .isInstanceOf[ChangelogRange.TimestampRange]) } @@ -647,9 +647,9 @@ class StreamRelationParserSuite extends AnalysisTest { "WITH (deduplicationMode = 'none', computeUpdates = 'true')") val relationChanges = plan.collect { case rc: RelationChanges => rc } assert(relationChanges.size == 1) - assert(relationChanges.head.changelogInfo.deduplicationMode() == - ChangelogInfo.DeduplicationMode.NONE) - assert(relationChanges.head.changelogInfo.computeUpdates()) + assert(relationChanges.head.changelogContext.deduplicationMode() == + ChangelogContext.DeduplicationMode.NONE) + assert(relationChanges.head.changelogContext.computeUpdates()) } test("STREAM t CHANGES - error: subquery in timestamp") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala index 2b19bac8d62e5..0c1def1ac55c2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala @@ -40,10 +40,13 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { private val changeData: mutable.Map[String, mutable.ArrayBuffer[InternalRow]] = mutable.Map.empty - // Stores the most recent ChangelogInfo passed to loadChangelog(), so tests can verify - // that the parser/DataFrame API correctly constructed and forwarded it. - private var _lastChangelogInfo: Option[ChangelogInfo] = None - def lastChangelogInfo: Option[ChangelogInfo] = _lastChangelogInfo + // Stores the most recent ChangelogContext and options passed to loadChangelog(), so tests + // can verify that the parser/DataFrame API correctly constructed and forwarded them. + private var _lastChangelogContext: Option[ChangelogContext] = None + def lastChangelogContext: Option[ChangelogContext] = _lastChangelogContext + + private var _lastOptions: Option[CaseInsensitiveStringMap] = None + def lastOptions: Option[CaseInsensitiveStringMap] = _lastOptions // Per-table overrides for Changelog properties (carry-over rows, intermediate changes, // update representation, row identity). Tests can set these to exercise post-processing. @@ -63,8 +66,10 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { override def loadChangelog( ident: Identifier, - changelogInfo: ChangelogInfo): Changelog = { - _lastChangelogInfo = Some(changelogInfo) + changelogContext: ChangelogContext, + options: CaseInsensitiveStringMap): Changelog = { + _lastChangelogContext = Some(changelogContext) + _lastOptions = Some(options) if (!tableExists(ident)) { throw new NoSuchTableException(ident.asMultipartIdentifier) } @@ -74,7 +79,7 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { val numDataCols = table.columns.length // _commit_version is at index numDataCols + 1 (after _change_type) val commitVersionIdx = numDataCols + 1 - val filtered = filterByRange(allRows.toSeq, commitVersionIdx, changelogInfo.range()) + val filtered = filterByRange(allRows.toSeq, commitVersionIdx, changelogContext.range()) val props = changelogProperties.getOrElse(ident.toString, ChangelogProperties()) new InMemoryChangelog( table.name + "_changelog", table.columns, filtered, props) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dff80cb242687..db78dc1744ec9 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -43,7 +43,7 @@ import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest} import org.apache.spark.sql.{AnalysisException, Column, Encoders, ForeachWriter, Row} import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, FunctionIdentifier, InternalRow, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils, FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges, UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar, UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction, UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation, UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, UnresolvedTranspose} +import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils, FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges, UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar, UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction, UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation, UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, UnresolvedTranspose} import org.apache.spark.sql.catalyst.encoders.{encoderFor, AgnosticEncoder, ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, UnboundRowEncoder} import org.apache.spark.sql.catalyst.expressions._ @@ -1741,10 +1741,10 @@ class SparkConnectPlanner( val tableName = rel.getUnparsedIdentifier val options = new CaseInsensitiveStringMap(rel.getOptionsMap) val timeZone = session.sessionState.conf.sessionLocalTimeZone - val changelogInfo = ChangelogInfoUtils.fromOptions(options, timeZone) + val ctx = ChangelogContextUtils.fromOptions(options, timeZone) val ident = parser.parseMultipartIdentifier(tableName) val relation = UnresolvedRelation(ident, options, isStreaming = rel.getIsStreaming) - RelationChanges(relation, changelogInfo) + RelationChanges(relation, ctx) } private def transformParse(rel: proto.Parse): LogicalPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index d0d6bf1e8ec0d..3dbdf05305164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.catalyst.analysis.{RelationChanges, UnresolvedRelation} -import org.apache.spark.sql.catalyst.analysis.ChangelogInfoUtils +import org.apache.spark.sql.catalyst.analysis.ChangelogContextUtils import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} @@ -328,10 +328,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) val multipartIdentifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava) - val changelogInfo = ChangelogInfoUtils.fromOptions( + val changelogContext = ChangelogContextUtils.fromOptions( options, sparkSession.sessionState.conf.sessionLocalTimeZone) val relation = UnresolvedRelation(multipartIdentifier, options) - Dataset.ofRows(sparkSession, RelationChanges(relation, changelogInfo)) + Dataset.ofRows(sparkSession, RelationChanges(relation, changelogContext)) } /** @inheritdoc */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala index ef7cebdb2a192..c8df93768808e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.classic import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.{Evolving, Experimental} -import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils, NamedStreamingRelation, RelationChanges, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils, NamedStreamingRelation, RelationChanges, UnresolvedRelation} import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.classic.ClassicConversions._ @@ -118,10 +118,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) assertNoSpecifiedSchema("changes") val identifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava) - val changelogInfo = ChangelogInfoUtils.fromOptions( + val changelogContext = ChangelogContextUtils.fromOptions( options, sparkSession.sessionState.conf.sessionLocalTimeZone) val unresolved = UnresolvedRelation(identifier, options, isStreaming = true) - val changes = RelationChanges(unresolved, changelogInfo) + val changes = RelationChanges(unresolved, changelogContext) val plan = NamedStreamingRelation.withUserProvidedName(changes, userProvidedSourceName) Dataset.ofRows(sparkSession, plan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala index 3a7281f94d882..bb40cd9874d21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala @@ -389,14 +389,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { // DataFrame API spark.read.option("startingVersion", "1").changes(fullTableName).collect() - val info1 = catalog.lastChangelogInfo.get - assert(info1.deduplicationMode() === ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + val info1 = catalog.lastChangelogContext.get + assert(info1.deduplicationMode() === ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) assert(info1.computeUpdates() === false) // SQL (no WITH clause = defaults) sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1").collect() - val info2 = catalog.lastChangelogInfo.get - assert(info2.deduplicationMode() === ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + val info2 = catalog.lastChangelogContext.get + assert(info2.deduplicationMode() === ChangelogContext.DeduplicationMode.DROP_CARRYOVERS) assert(info2.computeUpdates() === false) } @@ -410,14 +410,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { .option("deduplicationMode", "none") .changes(fullTableName) .collect() - assert(catalog.lastChangelogInfo.get.deduplicationMode() === - ChangelogInfo.DeduplicationMode.NONE) + assert(catalog.lastChangelogContext.get.deduplicationMode() === + ChangelogContext.DeduplicationMode.NONE) // SQL sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " + "WITH (deduplicationMode = 'none')").collect() - assert(catalog.lastChangelogInfo.get.deduplicationMode() === - ChangelogInfo.DeduplicationMode.NONE) + assert(catalog.lastChangelogContext.get.deduplicationMode() === + ChangelogContext.DeduplicationMode.NONE) } test("changes() passes computeUpdates to catalog") { @@ -430,12 +430,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { .option("computeUpdates", "true") .changes(fullTableName) .collect() - assert(catalog.lastChangelogInfo.get.computeUpdates() === true) + assert(catalog.lastChangelogContext.get.computeUpdates() === true) // SQL sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " + "WITH (computeUpdates = 'true')").collect() - assert(catalog.lastChangelogInfo.get.computeUpdates() === true) + assert(catalog.lastChangelogContext.get.computeUpdates() === true) } // ---------- Batch: timestamp range ---------- @@ -450,14 +450,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { .option("endingTimestamp", "2024-12-31 23:59:59") .changes(fullTableName) .collect() - assert(catalog.lastChangelogInfo.get.range() + assert(catalog.lastChangelogContext.get.range() .isInstanceOf[ChangelogRange.TimestampRange]) // SQL sql(s"SELECT * FROM $fullTableName " + "CHANGES FROM TIMESTAMP '2024-01-01 00:00:00' " + "TO TIMESTAMP '2024-12-31 23:59:59'").collect() - assert(catalog.lastChangelogInfo.get.range() + assert(catalog.lastChangelogContext.get.range() .isInstanceOf[ChangelogRange.TimestampRange]) } @@ -599,7 +599,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { .format("memory").queryName("cdc_stream_opts_df").start() try { q1.processAllAvailable() - assert(catalog.lastChangelogInfo.get.computeUpdates() === true) + assert(catalog.lastChangelogContext.get.computeUpdates() === true) } finally { q1.stop() } @@ -612,7 +612,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { .format("memory").queryName("cdc_stream_opts_sql").start() try { q2.processAllAvailable() - assert(catalog.lastChangelogInfo.get.computeUpdates() === true) + assert(catalog.lastChangelogContext.get.computeUpdates() === true) } finally { q2.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index 0d8f573cc4483..26ff7d43db088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -196,18 +196,33 @@ class ChangelogResolutionSuite extends SharedSparkSession { parameters = Map("relationId" -> "`x`")) } - test("CHANGES clause passes changelogInfo to catalog") { + test("CHANGES clause passes changelogContext to catalog") { sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO VERSION 5") val cat = spark.sessionState.catalogManager .catalog(cdcCatalogName) .asInstanceOf[InMemoryChangelogCatalog] - val info = cat.lastChangelogInfo + val info = cat.lastChangelogContext assert(info.isDefined) val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange] assert(range.startingVersion() == "1") assert(range.endingVersion().get() == "5") } + test("user-defined options are forwarded to loadChangelog") { + val cat = spark.sessionState.catalogManager + .catalog(cdcCatalogName) + .asInstanceOf[InMemoryChangelogCatalog] + + spark.read + .option("startingVersion", "1") + .option("customOption", "customValue") + .changes(s"$cdcCatalogName.test_table") + + val opts = cat.lastOptions + assert(opts.isDefined) + assert(opts.get.get("customOption") == "customValue") + } + // =========================================================================== // Streaming post-processing // =========================================================================== @@ -306,9 +321,9 @@ class ChangelogResolutionSuite extends SharedSparkSession { // Generic changelog schema validation // =========================================================================== - private def stubInfo(): ChangelogInfo = new ChangelogInfo( + private def stubInfo(): ChangelogContext = new ChangelogContext( new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, true), - ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS, + ChangelogContext.DeduplicationMode.DROP_CARRYOVERS, false) private def cl(name: String, cols: (String, org.apache.spark.sql.types.DataType)*)