-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path_main.py
More file actions
62 lines (50 loc) · 1.8 KB
/
_main.py
File metadata and controls
62 lines (50 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Generate typed SQL function wrappers from DuckDB introspection."""
from __future__ import annotations
from typing import TYPE_CHECKING
import polars as pl
from polars.exceptions import ComputeError
from pyochain import Iter
from rich import print
from rich.text import Text
from ._query import run_qry
from ._schemas import TableSchema
from ._sections import FunctionInfo, build_file
if TYPE_CHECKING:
from pathlib import Path
def run_pipeline(
caller: Path, source: Path, *, profile: bool = False, regenerate: bool
) -> str:
return (
_try_scan(source, regenerate=regenerate)
.pipe(run_qry)
.pipe(_inspect if profile else lambda lf: lf)
.collect()
.map_rows(lambda x: FunctionInfo(*x), return_dtype=pl.Object) # pyright: ignore[reportAny]
.pipe(lambda df: Iter[FunctionInfo](df.to_series()))
.collect()
.inspect(
lambda x: print(Text(f"Generated {x.len()} functions", style="yellow"))
)
.into(build_file, caller)
)
def _try_scan(source: Path, *, regenerate: bool) -> pl.LazyFrame:
if source.exists() and not regenerate:
return pl.scan_parquet(source)
import duckdb
conn = duckdb.connect()
conn.install_extension("spatial")
conn.load_extension("spatial")
conn.install_extension("delta")
conn.load_extension("delta")
df = conn.table_function("duckdb_functions").pl().cast(TableSchema)
source.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(source)
return df.lazy()
def _inspect(lf: pl.LazyFrame) -> pl.LazyFrame:
try:
lf.profile()[1].with_columns(
pl.col("end").sub(pl.col("start")).alias("duration")
).sort("duration", descending=True).show(10, fmt_str_lengths=100)
except ComputeError:
return lf
return lf