Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 66143b5

Browse files
committed
Factored out sparqlify code
1 parent 4cf3a5d commit 66143b5

2 files changed

Lines changed: 71 additions & 44 deletions

File tree

sansa-query-spark-parent/sansa-query-spark-server/src/main/scala/net/sansa_stack/query/spark/server/MainSansaSparqlServer.scala

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import java.nio.file.Files
4242

4343

4444
object MainSansaSparqlServer
45-
extends LazyLogging
4645
{
4746

4847
def main(args: Array[String]): Unit = {
@@ -73,7 +72,6 @@ object MainSansaSparqlServer
7372
.getOrCreate()
7473

7574

76-
7775
val triplesString =
7876
"""<http://dbpedia.org/resource/Guy_de_Maupassant> <http://xmlns.com/foaf/0.1/givenName> "Guy De" .
7977
|<http://dbpedia.org/resource/Guy_de_Maupassant> <http://dbpedia.org/ontology/influenced> <http://dbpedia.org/resource/Tobias_Wolff> .
@@ -89,48 +87,7 @@ object MainSansaSparqlServer
8987
//val map = graphRdd.partitionGraphByPredicates
9088
val partitions = RdfPartitionUtilsSpark.partitionGraph(graphRdd)
9189

92-
93-
val config = new Config()
94-
val loggerCount = new LoggerCount(logger.underlying)
95-
96-
97-
val backendConfig = new SqlBackendConfig(new DatatypeToStringCast(), new SqlEscaperBase("", "")) //new SqlEscaperBacktick())
98-
val sqlEscaper = backendConfig.getSqlEscaper()
99-
val typeSerializer = backendConfig.getTypeSerializer()
100-
101-
102-
val ers = SparqlifyUtils.createDefaultExprRewriteSystem()
103-
val mappingOps = SparqlifyUtils.createDefaultMappingOps(ers)
104-
105-
106-
val candidateViewSelector = new CandidateViewSelectorSparqlify(mappingOps, new ViewDefinitionNormalizerImpl());
107-
108-
val views = partitions.map {
109-
case (p, rdd) =>
110-
//
111-
println("Processing: " + p)
112-
113-
val vd = SparqlifyUtils2.createViewDefinition(p);
114-
val tableName = vd.getRelation match {
115-
case o: SqlOpTable => o.getTableName
116-
case _ => throw new RuntimeException("Table name required - instead got: " + vd)
117-
}
118-
119-
val scalaSchema = p.layout.schema
120-
val sparkSchema = ScalaReflection.schemaFor(scalaSchema).dataType.asInstanceOf[StructType]
121-
val df = sparkSession.createDataFrame(rdd, sparkSchema)
122-
123-
df.createOrReplaceTempView(tableName)
124-
config.getViewDefinitions.add(vd)
125-
}
126-
127-
val basicTableInfoProvider = new BasicTableInfoProviderSpark(sparkSession)
128-
129-
val rewriter = SparqlifyUtils.createDefaultSparqlSqlStringRewriter(basicTableInfoProvider, null, config, typeSerializer, sqlEscaper)
130-
//val rewrite = rewriter.rewrite(QueryFactory.create("Select * { <http://dbpedia.org/resource/Guy_de_Maupassant> ?p ?o }"))
131-
132-
// val rewrite = rewriter.rewrite(QueryFactory.create("Select * { ?s <http://xmlns.com/foaf/0.1/givenName> ?o ; <http://dbpedia.org/ontology/deathPlace> ?d }"))
133-
90+
val rewriter = SparqlifyUtils3.createSparqlSqlRewriter(sparkSession, partitions)
13491

13592
val qef = new QueryExecutionFactorySparqlifySpark(sparkSession, rewriter)
13693

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package net.sansa_stack.query.spark.server
2+
3+
import org.aksw.sparqlify.backend.postgres.DatatypeToStringCast
4+
import org.aksw.sparqlify.core.sql.common.serialization.SqlEscaperBase
5+
import org.aksw.sparqlify.core.algorithms.ViewDefinitionNormalizerImpl
6+
import org.aksw.sparqlify.core.algorithms.CandidateViewSelectorSparqlify
7+
import net.sansa_stack.rdf.partition.sparqlify.SparqlifyUtils2
8+
import org.apache.spark.sql.catalyst.ScalaReflection
9+
import net.sansa_stack.rdf.spark.sparqlify.BasicTableInfoProviderSpark
10+
import org.aksw.sparqlify.util.SqlBackendConfig
11+
import org.aksw.sparqlify.algebra.sql.nodes.SqlOpTable
12+
import org.apache.spark.sql.types.StructType
13+
import org.aksw.sparqlify.config.syntax.Config
14+
import org.aksw.sparqlify.util.SparqlifyUtils
15+
import org.aksw.sparqlify.validation.LoggerCount
16+
import net.sansa_stack.rdf.partition.core.RdfPartitionDefault
17+
import org.apache.spark.sql.SparkSession
18+
import com.typesafe.scalalogging.LazyLogging
19+
import org.apache.spark.rdd.RDD
20+
import org.apache.spark.sql.Row
21+
import org.aksw.sparqlify.core.interfaces.SparqlSqlStringRewriter
22+
23+
object SparqlifyUtils3
24+
extends LazyLogging
25+
{
26+
def createSparqlSqlRewriter(sparkSession: SparkSession, partitions: Map[RdfPartitionDefault, RDD[Row]]): SparqlSqlStringRewriter = {
27+
val config = new Config()
28+
val loggerCount = new LoggerCount(logger.underlying)
29+
30+
31+
val backendConfig = new SqlBackendConfig(new DatatypeToStringCast(), new SqlEscaperBase("", "")) //new SqlEscaperBacktick())
32+
val sqlEscaper = backendConfig.getSqlEscaper()
33+
val typeSerializer = backendConfig.getTypeSerializer()
34+
35+
36+
val ers = SparqlifyUtils.createDefaultExprRewriteSystem()
37+
val mappingOps = SparqlifyUtils.createDefaultMappingOps(ers)
38+
39+
40+
val candidateViewSelector = new CandidateViewSelectorSparqlify(mappingOps, new ViewDefinitionNormalizerImpl());
41+
42+
val views = partitions.map {
43+
case (p, rdd) =>
44+
//
45+
println("Processing: " + p)
46+
47+
val vd = SparqlifyUtils2.createViewDefinition(p);
48+
val tableName = vd.getRelation match {
49+
case o: SqlOpTable => o.getTableName
50+
case _ => throw new RuntimeException("Table name required - instead got: " + vd)
51+
}
52+
53+
val scalaSchema = p.layout.schema
54+
val sparkSchema = ScalaReflection.schemaFor(scalaSchema).dataType.asInstanceOf[StructType]
55+
val df = sparkSession.createDataFrame(rdd, sparkSchema)
56+
57+
df.createOrReplaceTempView(tableName)
58+
config.getViewDefinitions.add(vd)
59+
}
60+
61+
val basicTableInfoProvider = new BasicTableInfoProviderSpark(sparkSession)
62+
63+
val rewriter = SparqlifyUtils.createDefaultSparqlSqlStringRewriter(basicTableInfoProvider, null, config, typeSerializer, sqlEscaper)
64+
//val rewrite = rewriter.rewrite(QueryFactory.create("Select * { <http://dbpedia.org/resource/Guy_de_Maupassant> ?p ?o }"))
65+
66+
// val rewrite = rewriter.rewrite(QueryFactory.create("Select * { ?s <http://xmlns.com/foaf/0.1/givenName> ?o ; <http://dbpedia.org/ontology/deathPlace> ?d }"))
67+
rewriter
68+
}
69+
70+
}

0 commit comments

Comments
 (0)