Originally posted as a question on Stackoverflow with some more details: (Continuous trigger fails on Spark 3, but works on Spark 2.4.5), below is the summary:
Spark structured streaming from Kinesis as a source was working fine with Spark 2.4.5, after an upgrade to Spark 3.0.0 (also tested with 3.0.1) it started failing with error:
Continuous processing does not support StreamingRelation operations.;; kinesis org.apache.spark.sql.AnalysisException: Continuous processing does not support StreamingRelation operations.;; kinesis at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1(UnsupportedOperationChecker.scala:408) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1$adapted(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForContinuous(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:290) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366) at com.niceic.dl.kinesispuller.RunnerErr$.main(RunnerErrorSpark3.scala:50) at com.niceic.dl.kinesispuller.Spark3ErrorTest.$anonfun$new$2(test.scala:32) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) at org.scalatest.TestSuite.withFixture(TestSuite.scala:196) at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182) at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236) at com.niceic.dl.kinesispuller.Spark3ErrorTest.org$scalatest$BeforeAndAfterAllConfigMap$$super$run(test.scala:24) at org.scalatest.BeforeAndAfterAllConfigMap.liftedTree1$1(BeforeAndAfterAllConfigMap.scala:248) at org.scalatest.BeforeAndAfterAllConfigMap.run(BeforeAndAfterAllConfigMap.scala:245) at org.scalatest.BeforeAndAfterAllConfigMap.run$(BeforeAndAfterAllConfigMap.scala:242) at com.niceic.dl.kinesispuller.Spark3ErrorTest.run(test.scala:24) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Is it a breaking change bug in Spark or was the kinises-sql adapter supposed to be fixed to pick up the change?
Originally posted as a question on Stackoverflow with some more details: (Continuous trigger fails on Spark 3, but works on Spark 2.4.5), below is the summary:
Spark structured streaming from Kinesis as a source was working fine with Spark 2.4.5, after an upgrade to Spark 3.0.0 (also tested with 3.0.1) it started failing with error:
Continuous processing does not support StreamingRelation operations.;; kinesis org.apache.spark.sql.AnalysisException: Continuous processing does not support StreamingRelation operations.;; kinesis at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1(UnsupportedOperationChecker.scala:408) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1$adapted(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForContinuous(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:290) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366) at com.niceic.dl.kinesispuller.RunnerErr$.main(RunnerErrorSpark3.scala:50) at com.niceic.dl.kinesispuller.Spark3ErrorTest.$anonfun$new$2(test.scala:32) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) at org.scalatest.TestSuite.withFixture(TestSuite.scala:196) at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182) at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236) at com.niceic.dl.kinesispuller.Spark3ErrorTest.org$scalatest$BeforeAndAfterAllConfigMap$$super$run(test.scala:24) at org.scalatest.BeforeAndAfterAllConfigMap.liftedTree1$1(BeforeAndAfterAllConfigMap.scala:248) at org.scalatest.BeforeAndAfterAllConfigMap.run(BeforeAndAfterAllConfigMap.scala:245) at org.scalatest.BeforeAndAfterAllConfigMap.run$(BeforeAndAfterAllConfigMap.scala:242) at com.niceic.dl.kinesispuller.Spark3ErrorTest.run(test.scala:24) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)Is it a breaking change bug in Spark or was the kinises-sql adapter supposed to be fixed to pick up the change?