|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""BULK-OLAP head-to-head: GFQL 4 engines vs kuzu on REAL graphs. |
| 3 | +
|
| 4 | +Answers "is bulk OLAP better with GFQL (cudf / polars-gpu)?" The seeded CSR index |
| 5 | +is O(degree) and wins tiny work; this bench deliberately AVOIDS that path and |
| 6 | +measures the BULK regime instead — large-frontier multi-hop + full-graph |
| 7 | +aggregation, i.e. the scan/join work where columnar GPU throughput should pay off |
| 8 | +and the index does NOT help. We run g0.hop (NO resident index -> engine traversal, |
| 9 | +the honest bulk path) so every engine does the same materialized join work. |
| 10 | +
|
| 11 | +Tasks (all bulk, all materialized on both sides): |
| 12 | + BULK1 1-hop forward from K seeds (edge semijoin, frontier=K) |
| 13 | + BULK2 2-hop forward from K seeds (edge-edge join, frontier blows up) |
| 14 | + DEGALL full-graph out-degree aggregation (group_by over ALL edges; pure OLAP) |
| 15 | +K frontier sweep: 1k, 10k, 100k seeds. cudf/polars-gpu should overtake pandas as K |
| 16 | +(hence work) grows; kuzu is the WCOJ/optimizer peer for the multi-hop join. |
| 17 | +
|
| 18 | +Trust: GFQL rows reported per engine (engine parity is separately guaranteed by the |
| 19 | +conformance suite); kuzu rows reported alongside with a semantic note. Timing is the |
| 20 | +deliverable — rows are the honesty check that each system did real work. |
| 21 | +
|
| 22 | +Env: PARQUET=/path/edges.parquet KS=1000,10000,100000 ENGINES=pandas,polars,cudf,polars-gpu |
| 23 | + SYSTEMS=gfql,kuzu REPS=10 WARM=2 OUT=/tmp/bulk.jsonl SEED=0 |
| 24 | +""" |
| 25 | +from __future__ import annotations |
| 26 | +import json, os, statistics, time, tempfile, shutil |
| 27 | +import numpy as np |
| 28 | +import pandas as pd |
| 29 | +import graphistry |
| 30 | +from graphistry.compute.ast import n, e_forward |
| 31 | + |
| 32 | + |
| 33 | +def _sync(engine): |
| 34 | + if engine in ("cudf", "polars-gpu"): |
| 35 | + try: |
| 36 | + import cupy as cp # type: ignore |
| 37 | + cp.cuda.runtime.deviceSynchronize() |
| 38 | + except Exception: |
| 39 | + pass |
| 40 | + |
| 41 | + |
| 42 | +def timeit(fn, reps, engine="cpu", warmup=2): |
| 43 | + for _ in range(warmup): |
| 44 | + fn(); _sync(engine) |
| 45 | + ts = [] |
| 46 | + for _ in range(reps): |
| 47 | + t0 = time.perf_counter(); fn(); _sync(engine) |
| 48 | + ts.append((time.perf_counter() - t0) * 1e3) |
| 49 | + ts.sort() |
| 50 | + return statistics.median(ts) |
| 51 | + |
| 52 | + |
| 53 | +def load_graph(): |
| 54 | + edf = pd.read_parquet(os.environ["PARQUET"]).astype({"src": np.int64, "dst": np.int64}) |
| 55 | + nodes = np.unique(np.concatenate([edf["src"].values, edf["dst"].values])) |
| 56 | + ndf = pd.DataFrame({"id": nodes}) |
| 57 | + return ndf, edf, nodes |
| 58 | + |
| 59 | + |
| 60 | +def gfql_trav(g0, seed_ids, hops, engine): |
| 61 | + """BULK seeded multi-hop via the CHAIN API — the one GFQL surface that supports |
| 62 | + ALL FOUR engines (generic hop() is pandas/cudf only; polars/polars-gpu route |
| 63 | + through engine_polars). n({id:seeds}) = frontier filter, then e_forward()*hops.""" |
| 64 | + ops = [n({"id": seed_ids})] + [e_forward() for _ in range(hops)] |
| 65 | + return g0.chain(ops, engine=engine) |
| 66 | + |
| 67 | + |
| 68 | +def run_gfql(ndf, edf, nodes, ks, engines, reps, warm, outf, seed): |
| 69 | + N, E = len(ndf), len(edf) |
| 70 | + rng = np.random.default_rng(seed) |
| 71 | + seed_sets = {k: rng.choice(nodes, size=min(k, len(nodes)), replace=False).tolist() for k in ks} |
| 72 | + for engine in engines: |
| 73 | + try: |
| 74 | + g0 = graphistry.nodes(ndf, "id").edges(edf, "src", "dst") |
| 75 | + # warm/convert frames onto the engine ONCE (exclude H2D/convert from timing) |
| 76 | + _ = gfql_trav(g0, seed_sets[ks[0]], 1, engine) |
| 77 | + except Exception as ex: |
| 78 | + print(f" gfql {engine}: SETUP FAILED {type(ex).__name__}: {ex}"); continue |
| 79 | + # frontier sweep: BULK1 (1-hop) + BULK2 (2-hop) |
| 80 | + for k in ks: |
| 81 | + sids = seed_sets[k] |
| 82 | + for task, hops in (("BULK1", 1), ("BULK2", 2)): |
| 83 | + try: |
| 84 | + res = gfql_trav(g0, sids, hops, engine) |
| 85 | + rows = int(res._edges.shape[0]); nn = int(res._nodes.shape[0]) |
| 86 | + ms = timeit(lambda: gfql_trav(g0, sids, hops, engine), reps, engine, warm) |
| 87 | + except Exception as ex: |
| 88 | + print(f" gfql {engine} {task} k={k} FAILED: {type(ex).__name__}: {ex}"); continue |
| 89 | + rec = dict(system="gfql", engine=engine, task=task, k=k, hops=hops, |
| 90 | + n=N, edges=E, warm_ms=ms, e_rows=rows, n_rows=nn) |
| 91 | + print(f" gfql {engine:11} {task} k={k:>7} {ms:10.3f}ms e_rows={rows:>10} n_rows={nn:>9}") |
| 92 | + if outf: outf.write(json.dumps(rec) + "\n"); outf.flush() |
| 93 | + # DEGALL: full-graph out-degree aggregation (pure columnar OLAP, no traversal) |
| 94 | + try: |
| 95 | + ms, rows = degall(edf, engine, reps, warm) |
| 96 | + rec = dict(system="gfql", engine=engine, task="DEGALL", k=None, hops=0, |
| 97 | + n=N, edges=E, warm_ms=ms, e_rows=rows, n_rows=rows) |
| 98 | + print(f" gfql {engine:11} DEGALL{'':>13} {ms:10.3f}ms groups={rows:>10}") |
| 99 | + if outf: outf.write(json.dumps(rec) + "\n"); outf.flush() |
| 100 | + except Exception as ex: |
| 101 | + print(f" gfql {engine} DEGALL FAILED: {type(ex).__name__}: {ex}") |
| 102 | + |
| 103 | + |
| 104 | +def degall(edf, engine, reps, warm): |
| 105 | + """Full-graph out-degree = group_by(src).size() on the chosen engine.""" |
| 106 | + if engine == "pandas": |
| 107 | + df = edf |
| 108 | + fn = lambda: df.groupby("src").size() |
| 109 | + elif engine == "cudf": |
| 110 | + import cudf |
| 111 | + df = cudf.from_pandas(edf) |
| 112 | + fn = lambda: df.groupby("src").size() |
| 113 | + elif engine in ("polars", "polars-gpu"): |
| 114 | + import polars as pl |
| 115 | + df = pl.from_pandas(edf) |
| 116 | + if engine == "polars-gpu": |
| 117 | + eng = pl.GPUEngine(executor="in-memory", raise_on_fail=False) |
| 118 | + fn = lambda: df.lazy().group_by("src").len().collect(engine=eng) |
| 119 | + else: |
| 120 | + fn = lambda: df.group_by("src").len() |
| 121 | + else: |
| 122 | + raise ValueError(engine) |
| 123 | + r = fn(); rows = int(r.shape[0]) |
| 124 | + ms = timeit(fn, reps, engine, warm) |
| 125 | + return ms, rows |
| 126 | + |
| 127 | + |
| 128 | +def run_kuzu(ndf, edf, nodes, ks, reps, warm, outf, seed, tmpdir): |
| 129 | + try: |
| 130 | + import kuzu |
| 131 | + except Exception: |
| 132 | + print(" kuzu: NOT AVAILABLE (pip install kuzu)"); return |
| 133 | + rng = np.random.default_rng(seed) |
| 134 | + seed_sets = {k: rng.choice(nodes, size=min(k, len(nodes)), replace=False).tolist() for k in ks} |
| 135 | + dbp = tempfile.mkdtemp(dir=tmpdir) |
| 136 | + db = kuzu.Database(os.path.join(dbp, "kz")); conn = kuzu.Connection(db) |
| 137 | + conn.execute("CREATE NODE TABLE N(id INT64, PRIMARY KEY(id))") |
| 138 | + conn.execute("CREATE REL TABLE E(FROM N TO N)") |
| 139 | + np_path = os.path.join(dbp, "n.parquet"); ep_path = os.path.join(dbp, "e.parquet") |
| 140 | + ndf.to_parquet(np_path) |
| 141 | + edf.rename(columns={"src": "from", "dst": "to"}).to_parquet(ep_path) |
| 142 | + t0 = time.perf_counter() |
| 143 | + conn.execute(f'COPY N FROM "{np_path}"'); conn.execute(f'COPY E FROM "{ep_path}"') |
| 144 | + load_ms = (time.perf_counter() - t0) * 1e3 |
| 145 | + print(f" kuzu load: {load_ms:.0f}ms") |
| 146 | + # BULK1/BULK2: distinct reachable set from K seeds (materialized columnar via get_as_df) |
| 147 | + q1 = conn.prepare("MATCH (a:N)-[:E]->(b:N) WHERE a.id IN $seeds RETURN b.id") |
| 148 | + q2 = conn.prepare("MATCH (a:N)-[:E]->()-[:E]->(b:N) WHERE a.id IN $seeds RETURN b.id") |
| 149 | + for k in ks: |
| 150 | + s = seed_sets[k] |
| 151 | + for task, stmt in (("BULK1", q1), ("BULK2", q2)): |
| 152 | + try: |
| 153 | + rows = len(conn.execute(stmt, {"seeds": s}).get_as_df()) |
| 154 | + ms = timeit(lambda: conn.execute(stmt, {"seeds": s}).get_as_df(), reps, "kuzu", warm) |
| 155 | + except Exception as ex: |
| 156 | + print(f" kuzu {task} k={k} FAILED: {type(ex).__name__}: {ex}"); continue |
| 157 | + rec = dict(system="kuzu", engine="kuzu", task=task, k=k, n=len(ndf), edges=len(edf), |
| 158 | + warm_ms=ms, e_rows=rows, n_rows=rows, load_ms=load_ms) |
| 159 | + print(f" kuzu {'':11} {task} k={k:>7} {ms:10.3f}ms rows={rows:>10} (b.id, not-distinct)") |
| 160 | + if outf: outf.write(json.dumps(rec) + "\n"); outf.flush() |
| 161 | + # DEGALL: full out-degree aggregation |
| 162 | + try: |
| 163 | + qd = "MATCH (a:N)-[:E]->() RETURN a.id, count(*) AS deg" |
| 164 | + for _ in range(warm): conn.execute(qd).get_as_df() |
| 165 | + rows = len(conn.execute(qd).get_as_df()) |
| 166 | + ms = timeit(lambda: conn.execute(qd).get_as_df(), reps, "kuzu", warm) |
| 167 | + rec = dict(system="kuzu", engine="kuzu", task="DEGALL", k=None, n=len(ndf), edges=len(edf), |
| 168 | + warm_ms=ms, e_rows=rows, n_rows=rows, load_ms=load_ms) |
| 169 | + print(f" kuzu {'':11} DEGALL{'':>13} {ms:10.3f}ms groups={rows:>10}") |
| 170 | + if outf: outf.write(json.dumps(rec) + "\n"); outf.flush() |
| 171 | + except Exception as ex: |
| 172 | + print(f" kuzu DEGALL FAILED: {type(ex).__name__}: {ex}") |
| 173 | + shutil.rmtree(dbp, ignore_errors=True) |
| 174 | + |
| 175 | + |
| 176 | +def main(): |
| 177 | + ndf, edf, nodes = load_graph() |
| 178 | + print(f"===== graph: {len(ndf):,} nodes {len(edf):,} edges =====") |
| 179 | + ks = [int(x) for x in os.environ.get("KS", "1000,10000,100000").split(",")] |
| 180 | + engines = os.environ.get("ENGINES", "pandas,polars,cudf,polars-gpu").split(",") |
| 181 | + systems = os.environ.get("SYSTEMS", "gfql,kuzu").split(",") |
| 182 | + reps = int(os.environ.get("REPS", "10")); warm = int(os.environ.get("WARM", "2")) |
| 183 | + seed = int(os.environ.get("SEED", "0")) |
| 184 | + tmpdir = os.environ.get("TMPDIR_BENCH", "/tmp/bulkbench"); os.makedirs(tmpdir, exist_ok=True) |
| 185 | + outf = open(os.environ["OUT"], "a") if os.environ.get("OUT") else None |
| 186 | + if "gfql" in systems: |
| 187 | + run_gfql(ndf, edf, nodes, ks, engines, reps, warm, outf, seed) |
| 188 | + if "kuzu" in systems: |
| 189 | + run_kuzu(ndf, edf, nodes, ks, reps, warm, outf, seed, tmpdir) |
| 190 | + if outf: outf.close() |
| 191 | + |
| 192 | + |
| 193 | +if __name__ == "__main__": |
| 194 | + main() |
0 commit comments