Skip to content

Commit 9c680b3

Browse files
author
Ghislain Fourny
committed
This should now work
1 parent f33206c commit 9c680b3

4 files changed

Lines changed: 24 additions & 13 deletions

File tree

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ classifiers = [
2222
"Programming Language :: Python :: 3.11",
2323
]
2424

25+
[tool.setuptools.packages.find]
26+
where = ["src"]
27+
28+
[tool.setuptools.package-data]
29+
jsoniq = ["jars/*.jar"]
30+
2531
[project.urls]
2632
Homepage = "https://rumbledb.org"
2733
Documentation = "https://rumble.readthedocs.io/en/latest/"
26.1 MB
Binary file not shown.

src/jsoniq/sequence.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1+
from pyspark import RDD
12
from pyspark.sql import SparkSession
23
import json
34

45
class SequenceOfItems:
5-
def __init__(self, sequence):
6+
def __init__(self, sequence, sparkcontext):
67
self._jsequence = sequence
8+
self._sparkcontext = sparkcontext
79

810
def getAsJSONList(self):
911
return [json.loads(l.serializeAsJSON()) for l in self._jsequence.getAsList()]
1012

13+
def getAsJSONRDD(self):
14+
rdd = self._jsequence.getAsStringRDD();
15+
print("Strings:");
16+
for s in rdd.take(10):
17+
print(s);
18+
rdd = RDD(rdd, self._sparkcontext)
19+
return rdd.map(lambda l: json.loads(l))
20+
1121
def nextJSON(self):
1222
return self._jsequence.next().serializeAsJSON()
1323

src/jsoniq/session.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,53 @@
11
from pyspark.sql import SparkSession
22
from .sequence import SequenceOfItems
3+
import importlib.resources as pkg_resources
4+
5+
with pkg_resources.path("jsoniq.jars", "rumbledb-1.24.0.jar") as jar_path:
6+
print("Found Rumble JAR at:", jar_path)
7+
jar_path_str = "file://" + str(jar_path)
38

49
class MetaRumbleSession(type):
510
def __getattr__(cls, item):
6-
print(f"Dynamically handled: {item}")
711
if item == "builder":
812
return cls._builder
913
else:
1014
return getattr(SparkSession, item)
1115

1216
class RumbleSession(object, metaclass=MetaRumbleSession):
1317
def __init__(self, spark_session: SparkSession):
14-
print("Initializing RumbleSession object");
1518
self._sparksession = spark_session
1619
self._jrumblesession = spark_session._jvm.org.rumbledb.api.Rumble(spark_session._jsparkSession)
1720

1821
class Builder:
1922
def __init__(self):
20-
self._sparkbuilder = SparkSession.builder.config("spark.jars", "file:///Users/ghislain/Code/rumble/target/rumbledb-1.24.0-jar-with-dependencies.jar")
23+
self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str)
2124

2225
def getOrCreate(self):
23-
print("getOrCreate called");
2426
return RumbleSession(self._sparkbuilder.getOrCreate())
2527

2628
def appName(self, name):
27-
print(f"Setting app name: {name}");
2829
self._sparkbuilder = self._sparkbuilder.appName(name);
2930
return self;
3031

3132
def master(self, url):
32-
print(f"Setting master URL: {url}");
3333
self._sparkbuilder = self._sparkbuilder.master(url);
3434
return self;
3535

3636
def config(self, key, value):
37-
print(f"Setting config: {key} = {value}");
3837
self._sparkbuilder = self._sparkbuilder.config(key, value);
3938
return self;
4039

4140
def config(self, conf):
42-
print(f"Setting config: {conf}");
4341
self._sparkbuilder = self._sparkbuilder.config(conf);
4442
return self;
4543

4644
def __getattr__(self, name):
47-
print(f"Calling attribute: {name}");
4845
res = getattr(self._sparkbuilder, name);
4946
return res;
5047

5148
_builder = Builder()
5249

5350
def bindDataFrameAsVariable(self, name: str, df):
54-
print(f"Binding DataFrame as variable: {name}");
5551
conf = self._jrumblesession.getConfiguration();
5652
if not name.startswith("$"):
5753
raise ValueError("Variable name must start with a dollar symbol ('$').")
@@ -61,8 +57,7 @@ def bindDataFrameAsVariable(self, name: str, df):
6157

6258
def jsoniq(self, str):
6359
sequence = self._jrumblesession.runQuery(str);
64-
return SequenceOfItems(sequence);
60+
return SequenceOfItems(sequence, self._sparksession.sparkContext);
6561

6662
def __getattr__(self, item):
67-
print(f"Accessing attribute: {item}")
6863
return getattr(self._sparksession, item)

0 commit comments

Comments
 (0)