Skip to content

Commit e08d8cc

Browse files
author
Ghislain Fourny
committed
Add implementation
1 parent 2595af6 commit e08d8cc

2 files changed

Lines changed: 70 additions & 0 deletions

File tree

src/jsoniq/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from jsoniq.session import RumbleSession
2+
3+
__all__ = ["RumbleSession"]

src/jsoniq/session.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from pyspark.sql import SparkSession
2+
3+
class MetaRumbleSession(type):
4+
def __getattr__(cls, item):
5+
print(f"Dynamically handled: {item}")
6+
if item == "builder":
7+
return cls._builder
8+
else:
9+
return getattr(SparkSession, item)
10+
11+
class RumbleSession(object, metaclass=MetaRumbleSession):
12+
def __init__(self, spark_session: SparkSession):
13+
print("Initializing RumbleSession object");
14+
self._sparksession = spark_session
15+
self._jrumblesession = spark_session._jvm.org.rumbledb.api.Rumble(spark_session._jsparkSession)
16+
17+
class Builder:
18+
def __init__(self):
19+
self._sparkbuilder = SparkSession.builder.config("spark.jars", "file:///Users/ghislain/Code/rumble/target/rumbledb-1.24.0-jar-with-dependencies.jar")
20+
21+
def getOrCreate(self):
22+
print("getOrCreate called");
23+
return RumbleSession(self._sparkbuilder.getOrCreate())
24+
25+
def appName(self, name):
26+
print(f"Setting app name: {name}");
27+
self._sparkbuilder = self._sparkbuilder.appName(name);
28+
return self;
29+
30+
def master(self, url):
31+
print(f"Setting master URL: {url}");
32+
self._sparkbuilder = self._sparkbuilder.master(url);
33+
return self;
34+
35+
def config(self, key, value):
36+
print(f"Setting config: {key} = {value}");
37+
self._sparkbuilder = self._sparkbuilder.config(key, value);
38+
return self;
39+
40+
def config(self, conf):
41+
print(f"Setting config: {conf}");
42+
self._sparkbuilder = self._sparkbuilder.config(conf);
43+
return self;
44+
45+
def __getattr__(self, name):
46+
print(f"Calling attribute: {name}");
47+
res = getattr(self._sparkbuilder, name);
48+
return res;
49+
50+
_builder = Builder()
51+
52+
def bindDataFrameAsVariable(self, name: str, df):
53+
print(f"Binding DataFrame as variable: {name}");
54+
conf = self._jrumblesession.getConfiguration();
55+
if not name.startswith("$"):
56+
raise ValueError("Variable name must start with a dollar symbol ('$').")
57+
name = name[1:]
58+
conf.setExternalVariableValue(name, df._jdf);
59+
return self;
60+
61+
def jsoniq(self, str):
62+
sequence = self._jrumblesession.runQuery(str);
63+
return sequence;
64+
65+
def __getattr__(self, item):
66+
print(f"Accessing attribute: {item}")
67+
return getattr(self._sparksession, item)

0 commit comments

Comments
 (0)