-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-56505][SQL][TESTS] Add SessionQueryTest to replace SharedSparkSession #56190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
fwc
wants to merge
73
commits into
apache:master
from
fwc:sharedsparksession-refactor-mostly-nonbreaking
+1,381
−482
Closed
Changes from all commits
Commits
Show all changes
73 commits
Select commit
Hold shift + click to select a range
756e5f3
Add classic.SparkSessionProvider
3b2c7cd
Move test.SharedSparkSessionBase functionality to sql.SharedSparkSession
67dd5f0
[API CHANGE]: Move doThreadPreAudit, doThreadPostAudit to sql.SharedS…
49778ad
Rename sql.SharedSparkSession to sql.SparkSessionBinder to prevent sh…
3a6caaa
Deprecate test.SharedSparkSession
cbda4b5
Add connect.SparkSession{Provider,Binder}, connect.QueryTest and demo
284c012
Add classic.SparkSessionBinder with usage demonstration
af8915d
fixup: fix compile error
d407555
Restructure so that SparkSessionBinder implements QueryTest, address …
56b9281
fixup
442ff43
Have SharedSparkSession as empty alias of classic.SparkSessionBinder
36e2940
fixup
2603ca7
partial refactor of connect/classic test
8d5f248
rest of refactor
749120e
Add example suite
0b427b4
Minimize sql.SparkSessionBinder stuff
6e81376
WIP
9b0b938
WIP: partially refactor DSv2IncrementallyConstructedQueryTests.scala
82185e2
Remove extra checkAnswer helpers to have on thing to override
473f4ff
Add SessionQueryTest::sessionType
33c369a
Fix DSv2ExternalMutationTestBase by replacing 'isConnect' with sessio…
c847392
Remove unused imports
a9aede7
WIP
3ab9157
reset connect session in beforeEach/afterEach
772a8c6
Shutdown SparkConnectServer at end of afterAll, don't silence shutdow…
f7ab7e4
fix server session access in DataSourceV2DataFrameConnectSuite
3db84f3
Extract CheckError into CheckErrorHelper
8ded0b2
smash
02308de
add, fix scaladoc
cd8f516
Update, extend deprecation annotations
292dee4
Add missing newline at EOF
2958be0
Remove unused import
4e0335e
Merge branch 'master' into sharedsparksession-refactor-mostly-nonbrea…
88292e4
fixup: CheckErrorHelper
bcb97fd
Only use SessionQueryTestBase in DSv2ExternalMutationTestBase
6ca5822
Add SQLConfHelper to SessionQueryTestBase
a89b8b0
Fix QueryTest::checkAnswer
fc5b673
Catch analysis-time failure in CheckAnswerHelper
5bc0c8d
CheckAnswerHelper: limit df.queryExec access to classic dfs
3a41ba1
fix grammar
9946e07
fix capitalization
ec566ff
docstring fix
a2872d2
Don't add hive.SessionQueryTest (yet)
53a5b2c
Add docstring for connect.SessionQueryTest::isDfSorted
ef19ed4
Document that SparkSessionBinderBase is temporary
02e8c28
use precomputed analyzedDF instead of df in checkAnswer
8e83141
make connect.SparkSessionBinder::classicSession private
a9acd76
fixup: rename
13da2a0
add example testcase with conf stuff
03f4912
SessionQueryTestBase declares 'withConf' instead of extending SQLConf…
ca341c6
add connect isDfSorted and connect.DataFrame::explainString
9d0f4b3
fixup! add connect isDfSorted and connect.DataFrame::explainString
cd7183a
fixup! add connect isDfSorted and connect.DataFrame::explainString
895ff8d
deprecate rarely used method in SharedSparkSession
deac517
WIP: provide isDfSorted override for connect
783681f
fixup! add connect isDfSorted and connect.DataFrame::explainString
c009270
Don't deprecate SharedSparkSession[Base] yet
176a528
Don't refactor DSv2 classic/connect tests (yet)
7743581
fixup! SessionQueryTestBase declares 'withConf' instead of extending …
c8eac69
Hint towards connect.SessionQueryTest in isDfSorted base case
fbd27bc
fix formatting
ef86a04
update comment
dd2cb7a
more comments in ExampleSessionAgnosticSuite
d10f7ca
Further justify withConf
016d4c0
connect isDfSorted: Search for [PhotonSort]Sort in explainString
7e74a2a
fixup! connect isDfSorted: Search for [PhotonSort]Sort in explainString
d7cbc3b
Address some review issues
ionagamed 5b81142
Address some review issues
ionagamed 17364bb
Split out RowComparisonUtils into a separate object
ionagamed 639734b
Move the example suite into a separate file
ionagamed 9a17691
scalafmt some changed files
ionagamed aafee7e
Merge branch 'master' of https://github.com/apache/spark into shareds…
ionagamed 700fa4f
refactor sessionType -> isConnect
ionagamed File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
206 changes: 206 additions & 0 deletions
206
core/src/test/scala/org/apache/spark/CheckErrorHelper.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,206 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| import scala.collection.mutable.ListBuffer | ||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| import org.scalatest.Suite | ||
|
|
||
| trait CheckErrorHelper { self: Suite => | ||
|
|
||
| case class ExpectedContext( | ||
| contextType: QueryContextType, | ||
| objectType: String, | ||
| objectName: String, | ||
| startIndex: Int, | ||
| stopIndex: Int, | ||
| fragment: String, | ||
| callSitePattern: String | ||
| ) | ||
|
|
||
| object ExpectedContext { | ||
| def apply(fragment: String, start: Int, stop: Int): ExpectedContext = { | ||
| ExpectedContext("", "", start, stop, fragment) | ||
| } | ||
|
|
||
| // Check the fragment only. This is only used when the fragment is distinguished within | ||
| // the query text | ||
| def apply(fragment: String): ExpectedContext = { | ||
| ExpectedContext("", "", -1, -1, fragment) | ||
| } | ||
|
|
||
| def apply( | ||
| objectType: String, | ||
| objectName: String, | ||
| startIndex: Int, | ||
| stopIndex: Int, | ||
| fragment: String): ExpectedContext = { | ||
| new ExpectedContext(QueryContextType.SQL, objectType, objectName, startIndex, stopIndex, | ||
| fragment, "") | ||
| } | ||
|
|
||
| def apply(fragment: String, callSitePattern: String): ExpectedContext = { | ||
| new ExpectedContext(QueryContextType.DataFrame, "", "", -1, -1, fragment, callSitePattern) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parameter keys that are omitted from comparison when absent from the expected map. | ||
| * For each error condition, the set lists keys that are removed from the actual | ||
| * exception parameters before comparison with the expected map. | ||
| * Test suites may override this to add or change ignorable parameters per condition. | ||
| */ | ||
| protected def checkErrorIgnorableParameters: Map[String, Set[String]] = Map( | ||
| "TABLE_OR_VIEW_NOT_FOUND" -> Set("searchPath") | ||
| ) | ||
|
|
||
| /** | ||
| * Checks an exception with an error condition against expected results. | ||
| * @param exception The exception to check | ||
| * @param condition The expected error condition identifying the error | ||
| * @param sqlState Optional the expected SQLSTATE, not verified if not supplied | ||
| * @param parameters A map of parameter names and values. The names are as defined | ||
| * in the error-classes file. | ||
| * @param matchPVals Optionally treat the parameters value as regular expression pattern. | ||
| * false if not supplied. | ||
| */ | ||
| protected def checkError( | ||
| exception: SparkThrowable, | ||
| condition: String, | ||
| sqlState: Option[String] = None, | ||
| parameters: Map[String, String] = Map.empty, | ||
| matchPVals: Boolean = false, | ||
| queryContext: Array[ExpectedContext] = Array.empty): Unit = { | ||
| val mismatches = new ListBuffer[String] | ||
|
|
||
| if (exception.getCondition != condition) { | ||
| mismatches += s"condition: expected '$condition' but got '${exception.getCondition}'" | ||
| } | ||
| sqlState.foreach { state => | ||
| if (exception.getSqlState != state) { | ||
| mismatches += s"sqlState: expected '$state' but got '${exception.getSqlState}'" | ||
| } | ||
| } | ||
|
|
||
| val actualParameters = exception.getMessageParameters.asScala | ||
| val ignorable = checkErrorIgnorableParameters.getOrElse(condition, Set.empty[String]) | ||
| val actualParametersToCompare = actualParameters.filter { case (k, _) => | ||
| !ignorable.contains(k) || parameters.contains(k) | ||
| } | ||
| if (matchPVals) { | ||
| if (actualParametersToCompare.size != parameters.size) { | ||
| mismatches += s"parameters size: expected ${parameters.size} but got" + | ||
| s" ${actualParametersToCompare.size}" | ||
| } | ||
| actualParametersToCompare.foreach { case (key, actualVal) => | ||
| parameters.get(key) match { | ||
| case None => | ||
| mismatches += s"parameters: unexpected key '$key' with value '$actualVal'" | ||
| case Some(pattern) if !actualVal.matches(pattern) => | ||
| mismatches += s"parameters['$key']: value '$actualVal' does not match pattern" + | ||
| s" '$pattern'" | ||
| case _ => | ||
| } | ||
| } | ||
| parameters.keys.filterNot(actualParametersToCompare.contains).foreach { key => | ||
| mismatches += s"parameters: missing expected key '$key'" | ||
| } | ||
| } else if (actualParametersToCompare != parameters) { | ||
| mismatches += s"parameters: expected $parameters but got $actualParametersToCompare" | ||
| } | ||
|
|
||
| val actualQueryContext = exception.getQueryContext() | ||
| if (actualQueryContext.length != queryContext.length) { | ||
| mismatches += s"queryContext.length: expected ${queryContext.length}" + | ||
| s" but got ${actualQueryContext.length}" | ||
| } | ||
| actualQueryContext.zip(queryContext).zipWithIndex.foreach { | ||
| case ((actual, expected), idx) => | ||
| if (actual.contextType() != expected.contextType) { | ||
| mismatches += s"queryContext[$idx].contextType: expected ${expected.contextType}" + | ||
| s" but got ${actual.contextType()}" | ||
| } | ||
| if (actual.contextType() == QueryContextType.SQL) { | ||
| if (actual.objectType() != expected.objectType) { | ||
| mismatches += s"queryContext[$idx].objectType: expected '${expected.objectType}'" + | ||
| s" but got '${actual.objectType()}'" | ||
| } | ||
| if (actual.objectName() != expected.objectName) { | ||
| mismatches += s"queryContext[$idx].objectName: expected '${expected.objectName}'" + | ||
| s" but got '${actual.objectName()}'" | ||
| } | ||
| // If startIndex and stopIndex are -1, it means we simply want to check the | ||
| // fragment of the query context. This should be the case when the fragment is | ||
| // distinguished within the query text. | ||
| if (expected.startIndex != -1 && actual.startIndex() != expected.startIndex) { | ||
| mismatches += s"queryContext[$idx].startIndex: expected ${expected.startIndex}" + | ||
| s" but got ${actual.startIndex()}" | ||
| } | ||
| if (expected.stopIndex != -1 && actual.stopIndex() != expected.stopIndex) { | ||
| mismatches += s"queryContext[$idx].stopIndex: expected ${expected.stopIndex}" + | ||
| s" but got ${actual.stopIndex()}" | ||
| } | ||
| if (actual.fragment() != expected.fragment) { | ||
| mismatches += s"queryContext[$idx].fragment: expected '${expected.fragment}'" + | ||
| s" but got '${actual.fragment()}'" | ||
| } | ||
| } else if (actual.contextType() == QueryContextType.DataFrame) { | ||
| if (actual.fragment() != expected.fragment) { | ||
| mismatches += s"queryContext[$idx].fragment: expected '${expected.fragment}'" + | ||
| s" but got '${actual.fragment()}'" | ||
| } | ||
| if (expected.callSitePattern.nonEmpty && | ||
| !actual.callSite().matches(expected.callSitePattern)) { | ||
| mismatches += s"queryContext[$idx].callSite: '${actual.callSite()}'" + | ||
| s" does not match pattern '${expected.callSitePattern}'" | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (mismatches.nonEmpty) { | ||
| val sb = new StringBuilder | ||
| sb.append(s"checkError found ${mismatches.size} mismatch(es).\n\n") | ||
| sb.append("=== Actual Exception State ===\n") | ||
| sb.append(s" condition: ${exception.getCondition}\n") | ||
| sb.append(s" sqlState: ${exception.getSqlState}\n") | ||
| sb.append(s" parameters:\n") | ||
| if (actualParameters.isEmpty) { | ||
| sb.append(" (empty)\n") | ||
| } else { | ||
| actualParameters.foreach { case (k, v) => sb.append(s" $k -> $v\n") } | ||
| } | ||
| actualQueryContext.zipWithIndex.foreach { case (ctx, idx) => | ||
| sb.append(s" queryContext[$idx] (${ctx.contextType()}):\n") | ||
| if (ctx.contextType() == QueryContextType.SQL) { | ||
| sb.append(s" objectType: ${ctx.objectType()}\n") | ||
| sb.append(s" objectName: ${ctx.objectName()}\n") | ||
| sb.append(s" startIndex: ${ctx.startIndex()}\n") | ||
| sb.append(s" stopIndex: ${ctx.stopIndex()}\n") | ||
| sb.append(s" fragment: ${ctx.fragment()}\n") | ||
| } else if (ctx.contextType() == QueryContextType.DataFrame) { | ||
| sb.append(s" fragment: ${ctx.fragment()}\n") | ||
| sb.append(s" callSite: ${ctx.callSite()}\n") | ||
| } | ||
| } | ||
| sb.append("\n=== Mismatches ===\n") | ||
| mismatches.foreach(m => sb.append(s" $m\n")) | ||
| fail(sb.toString()) | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.