Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Sansa query (Spark)program fails when integrated with Spark Job Server #7

@dileepvikram

Description

@dileepvikram

Hello Team,
I am trying to run one basic sansa query example and I am able to do it. But when I tried to integrate the same with spark job server it is getting failed with the exception as below

org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1923)

.Below are the things I am doing
1)Extend a class SparkSessionJob and overriding methods in it
2)runJob method has the sansa code in it for querying
3)Build and upload the jar into spark job server(SJS) using an API
4)Trigger spark job using another API

The code snippet I am using are the below .

object SansaQueryExample extends SparkSessionJob {
  override type JobData = Seq[String]
  override type JobOutput = collection.Map[String, Long]

  override def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
  JobData Or Every[ValidationProblem] = {
    Try(config.getString("input.string").split(" ").toSeq)
      .map(words => Good(words))
      .getOrElse(Bad(One(SingleProblem("No input.string param"))))
  }
  override def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = {

    val input =  "hdfs_location_of_input .NT file";
    val sparqlQuery: String = "SELECT * WHERE {?s ?p ?o} LIMIT 10"
    val lang = Lang.NTRIPLES
    val graphRdd = sparkSession.rdf(lang)(input)
    println(graphRdd.collect().foreach(println))
    val result = graphRdd.sparql(sparqlQuery)

    result.write.format("csv").mode("overwrite").save("output_hdfs_location_to_write")
    sparkSession.sparkContext.parallelize(data).countByValue
  }

I am using sansa 0.7.1 and spark job server 0.8.0 and spark 2.4.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions