-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabases.py
More file actions
276 lines (248 loc) · 8.74 KB
/
databases.py
File metadata and controls
276 lines (248 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Marimo UI for managed Hotdata databases (create + parquet table loads)."""
from __future__ import annotations
import os
import tempfile
import marimo as mo
from hotdata_runtime import (
DEFAULT_SCHEMA,
HotdataClient,
LoadManagedTableResult,
ManagedDatabase,
)
from hotdata_marimo._options import empty_dropdown
def _parse_table_names(text: str) -> list[str]:
return [line.strip() for line in text.splitlines() if line.strip()]
def _upload_parquet_bytes(client: HotdataClient, contents: bytes) -> str:
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
tmp.write(contents)
path = tmp.name
try:
return client.upload_parquet(path)
finally:
os.unlink(path)
def databases_panel(client: HotdataClient):
"""Table of managed databases in the workspace."""
dbs = client.list_managed_databases()
if not dbs:
return mo.vstack(
[
mo.md("### Managed databases"),
mo.md("_No managed databases yet._"),
mo.md(
"Create one below, or with the CLI: "
"`hotdata databases create --name <name> --table <table>`."
),
],
gap=1,
)
rows: list[dict[str, object]] = [
{"description": db.description or db.id, "id": db.id, "sql_prefix": f"{db.id}.{{schema}}.{{table}}"}
for db in dbs
]
return mo.vstack(
[
mo.md("### Managed databases"),
mo.ui.table(
rows,
label="Managed databases",
pagination=True,
page_size=min(10, len(rows)),
selection=None,
max_height=240,
),
mo.md("_Query as `database.schema.table` in SQL._"),
],
gap=1,
)
class ManagedDatabaseWriter:
"""Create managed databases and load parquet files into declared tables.
Instantiate in one cell and use ``.tab_ui`` in another (see package README).
"""
def __init__(
self,
client: HotdataClient,
*,
default_schema: str = DEFAULT_SCHEMA,
) -> None:
self._client = client
self._default_schema = default_schema
self._last_create_n: int | None = None
self._last_load_n: int | None = None
self._create_result: ManagedDatabase | None = None
self._load_result: LoadManagedTableResult | None = None
self._create_error: str | None = None
self._load_error: str | None = None
self._show_create_success = False
self._show_load_success = False
self.name = mo.ui.text("", label="Database name", full_width=True)
self.schema = mo.ui.text(default_schema, label="Schema", full_width=True)
self.tables = mo.ui.text_area(
"",
label="Tables to declare (one per line)",
full_width=True,
)
self.create = mo.ui.button(
value=0,
on_click=lambda n: n + 1,
label="Create database",
kind="success",
)
self._rebuild_database_pick()
self.table = mo.ui.text("", label="Table name", full_width=True)
self.file = mo.ui.file(
filetypes=[".parquet"],
label="Parquet file",
kind="area",
)
self.load = mo.ui.button(
value=0,
on_click=lambda n: n + 1,
label="Load table",
kind="success",
)
def _rebuild_database_pick(self) -> None:
current = getattr(getattr(self, "database", None), "value", None)
dbs = self._client.list_managed_databases()
if not dbs:
self.database = empty_dropdown(
label="Database",
message="(create one first)",
)
return
options = {db.description or db.id: db.id for db in dbs}
# current holds the previously selected database ID (.value returns the dict value).
# mo.ui.dropdown validates value= against option keys (labels), not values.
default_key = next(iter(options))
selected_key = next((k for k, v in options.items() if v == current), default_key)
self.database = mo.ui.dropdown(
options=options,
label="Database",
full_width=True,
value=selected_key,
)
def _maybe_create(self) -> None:
create_n = self.create.value
if create_n == 0 or create_n == self._last_create_n:
return
self._last_create_n = create_n
self._create_error = None
self._create_result = None
self._show_create_success = False
self._show_load_success = False
db_name = self.name.value.strip()
if not db_name:
self._create_error = "Enter a database name."
return
schema = self.schema.value.strip() or self._default_schema
tables = _parse_table_names(self.tables.value)
try:
self._create_result = self._client.create_managed_database(
description=db_name,
schema=schema,
tables=tables or None,
)
self._rebuild_database_pick()
self._show_create_success = True
except (RuntimeError, ValueError, KeyError) as e:
self._create_error = str(e)
def _maybe_load(self) -> None:
load_n = self.load.value
if load_n == 0 or load_n == self._last_load_n:
return
self._last_load_n = load_n
self._load_error = None
self._load_result = None
self._show_load_success = False
database = self.database.value
table = self.table.value.strip()
if not database:
self._load_error = "Choose or create a database first."
return
if not table:
self._load_error = "Enter a table name."
return
uploads = self.file.value
if not uploads:
self._load_error = "Choose a parquet file to upload."
return
schema = self.schema.value.strip() or self._default_schema
try:
upload_id = _upload_parquet_bytes(self._client, uploads[0].contents)
self._load_result = self._client.load_managed_table(
database,
table,
schema=schema,
upload_id=upload_id,
)
self._show_load_success = True
self._show_create_success = False
except (RuntimeError, ValueError, KeyError, OSError) as e:
self._load_error = str(e)
@property
def result_panel(self):
_ = self.create.value
_ = self.load.value
self._maybe_create()
self._maybe_load()
if self._create_error:
return mo.callout(mo.md(self._create_error), kind="danger")
if self._show_create_success and self._create_result is not None:
db = self._create_result
return mo.callout(
mo.md(
f"Created **{db.description or db.id}** (`{db.id}`). "
"Load parquet into a declared table below."
),
kind="success",
)
if self._load_error:
return mo.callout(mo.md(self._load_error), kind="danger")
if self._show_load_success and self._load_result is not None:
loaded = self._load_result
return mo.callout(
mo.md(
f"Loaded **{loaded.full_name}** · **{loaded.row_count}** rows."
),
kind="success",
)
return mo.md("_Create a database or load a parquet table to see results here._")
@property
def ui(self):
_ = self.create.value
_ = self.load.value
_ = self.database.value
return mo.vstack(
[
mo.md("### Create database"),
self.name,
self.schema,
self.tables,
self.create,
mo.md("### Load parquet table"),
self.database,
self.table,
self.file,
self.load,
],
gap=1,
)
@property
def tab_ui(self):
_ = self.create.value
_ = self.load.value
if hasattr(self.database, "value"):
_ = self.database.value
return mo.vstack(
[
databases_panel(self._client),
self.ui,
self.result_panel,
],
gap=2,
)
def managed_database_writer(
client: HotdataClient,
*,
default_schema: str = DEFAULT_SCHEMA,
) -> ManagedDatabaseWriter:
return ManagedDatabaseWriter(client, default_schema=default_schema)