11#!/usr/bin/env python3
22"""Graph lane: OLTP (point/1-hop reads + transactional node writes) and OLAP (traversals).
33
4- Backends: kuzu , arcadedb. Graph: (User)-[:POSTED]->(Post), (Post)-[:ANSWERS]->(Post).
5- Loads from normalized parquet; edges filtered to existing endpoints (tiny caps tables at 10k).
6- Cypher queries are shared (ints embedded, no param-dialect issues). Prints RESULT {json}.
4+ Backends: ladybug , arcadedb. Graph: (User)-[:POSTED]->(Post), (Post)-[:ANSWERS]->(Post).
5+ (LadybugDB is the maintained continuation of Kùzu; package `real_ladybug`, Kùzu-compatible API.)
6+ Records lifecycle phase timings, on-disk size, and full per-op latency stats. RESULT {json}.
77"""
88import argparse
99import json
1010import os
1111import random
12- import statistics as st
13- import tempfile
1412import time
1513
1614import pandas as pd
1715
16+ import bench_common as bc
17+
1818
1919def load_graph (data_dir , limit ):
2020 posts = pd .read_parquet (os .path .join (data_dir , "posts.parquet" ), columns = ["id" ])
@@ -40,25 +40,29 @@ def load_graph(data_dir, limit):
4040]
4141
4242
43- def be_kuzu (users , posts , posted , answers , workload ):
44- import kuzu
45- db = kuzu .Database (tempfile .mkdtemp (prefix = "gb_kuzu_" ) + "/db" )
46- conn = kuzu .Connection (db )
47- conn .execute ("CREATE NODE TABLE User(id INT64, PRIMARY KEY(id))" )
48- conn .execute ("CREATE NODE TABLE Post(id INT64, PRIMARY KEY(id))" )
49- conn .execute ("CREATE REL TABLE POSTED(FROM User TO Post)" )
50- conn .execute ("CREATE REL TABLE ANSWERS(FROM Post TO Post)" )
51- d = tempfile .mkdtemp (prefix = "gb_kuzu_data_" )
43+ def be_ladybug (users , posts , posted , answers , workload ):
44+ import tempfile
45+ with bc .timed () as t_imp :
46+ import real_ladybug as lb # maintained continuation of Kùzu; Kùzu-compatible API
47+ path = tempfile .mkdtemp (prefix = "gb_ladybug_" ) + "/db.lbug"
48+ with bc .timed () as t_open :
49+ db = lb .Database (path )
50+ conn = lb .Connection (db )
51+ with bc .timed () as t_schema :
52+ conn .execute ("CREATE NODE TABLE User(id INT64, PRIMARY KEY(id))" )
53+ conn .execute ("CREATE NODE TABLE Post(id INT64, PRIMARY KEY(id))" )
54+ conn .execute ("CREATE REL TABLE POSTED(FROM User TO Post)" )
55+ conn .execute ("CREATE REL TABLE ANSWERS(FROM Post TO Post)" )
56+ d = tempfile .mkdtemp (prefix = "gb_ladybug_data_" )
5257 pd .DataFrame ({"id" : users }).to_parquet (f"{ d } /u.parquet" )
5358 pd .DataFrame ({"id" : posts }).to_parquet (f"{ d } /p.parquet" )
5459 pd .DataFrame (posted , columns = ["f" , "t" ]).to_parquet (f"{ d } /posted.parquet" )
5560 pd .DataFrame (answers , columns = ["f" , "t" ]).to_parquet (f"{ d } /answers.parquet" )
56- t0 = time .time ()
57- conn .execute (f"COPY User FROM '{ d } /u.parquet'" )
58- conn .execute (f"COPY Post FROM '{ d } /p.parquet'" )
59- conn .execute (f"COPY POSTED FROM '{ d } /posted.parquet'" )
60- conn .execute (f"COPY ANSWERS FROM '{ d } /answers.parquet'" )
61- load_s = time .time () - t0
61+ with bc .timed () as t_ing :
62+ conn .execute (f"COPY User FROM '{ d } /u.parquet'" )
63+ conn .execute (f"COPY Post FROM '{ d } /p.parquet'" )
64+ conn .execute (f"COPY POSTED FROM '{ d } /posted.parquet'" )
65+ conn .execute (f"COPY ANSWERS FROM '{ d } /answers.parquet'" )
6266
6367 def query (q ):
6468 r = conn .execute (q )
@@ -67,64 +71,70 @@ def query(q):
6771 r .get_next (); n += 1
6872 return n
6973
70- def write (q ):
71- conn .execute (q )
72-
73- return dict (load_s = load_s , query = query , write = write , close = lambda : None ,
74- version = kuzu .__version__ )
74+ return dict (import_s = t_imp .s , jvm_init_s = 0.0 , open_s = t_open .s , schema_s = t_schema .s ,
75+ ingest_s = t_ing .s , index_build_s = 0.0 , gav_build_s = 0.0 , db_path = path ,
76+ query = query , write = lambda q : conn .execute (q ),
77+ close = lambda : None , version = lb .__version__ )
7578
7679
7780GAV_NAME = "gbOlap"
7881
7982
8083def be_arcadedb (users , posts , posted , answers , workload ):
81- import arcadedb_embedded as arcadedb
82- ctx = arcadedb .create_database (tempfile .mkdtemp (prefix = "gb_arcadedb_" ) + "/db" ,
83- jvm_kwargs = {"heap_size" : os .environ .get ("ARCADEDB_HEAP" , "4g" )})
84- db = ctx .__enter__ ()
85- for v in ("User" , "Post" ):
86- db .command ("sql" , f"CREATE VERTEX TYPE { v } " )
87- db .command ("sql" , f"CREATE PROPERTY { v } .id LONG" )
88- db .command ("sql" , f"CREATE INDEX ON { v } (id) UNIQUE_HASH" ) # fast point lookups (ex 09)
89- db .command ("sql" , "CREATE EDGE TYPE POSTED" )
90- db .command ("sql" , "CREATE EDGE TYPE ANSWERS" )
84+ import tempfile
85+ with bc .timed () as t_imp :
86+ import arcadedb_embedded as arcadedb
87+ from arcadedb_embedded import jvm
88+ path = tempfile .mkdtemp (prefix = "gb_arcadedb_" ) + "/db"
89+ heap = os .environ .get ("ARCADEDB_HEAP" , "4g" )
90+ with bc .timed () as t_jvm :
91+ jvm .start_jvm (heap_size = heap ) # heap must match (else medium OOMs)
92+ with bc .timed () as t_open :
93+ ctx = arcadedb .create_database (path , jvm_kwargs = {"heap_size" : heap })
94+ db = ctx .__enter__ ()
95+ with bc .timed () as t_schema :
96+ for v in ("User" , "Post" ):
97+ db .command ("sql" , f"CREATE VERTEX TYPE { v } " )
98+ db .command ("sql" , f"CREATE PROPERTY { v } .id LONG" )
99+ db .command ("sql" , f"CREATE INDEX ON { v } (id) UNIQUE_HASH" ) # point lookups (ex 09)
100+ db .command ("sql" , "CREATE EDGE TYPE POSTED" )
101+ db .command ("sql" , "CREATE EDGE TYPE ANSWERS" )
91102
92103 pf = db .async_executor ().get_parallel_level () > 1
93- t0 = time .time ()
94- # vertices + edges via the tuned graph batch loader (ex 09), not per-row SQL
95- for vtype , ids in (("User" , users ), ("Post" , posts )):
96- with db .graph_batch (batch_size = max (1 , len (ids )), expected_edge_count = 0 ,
97- bidirectional = False , commit_every = max (1 , len (ids )),
98- use_wal = False , parallel_flush = pf ) as b :
99- b .create_vertices (vtype , [{"id" : i } for i in ids ])
100- urid = {int (r ["id" ]): r ["rid" ] for r in
101- db .query ("sql" , "SELECT id, @rid as rid FROM User" ).to_list ()}
102- prid = {int (r ["id" ]): r ["rid" ] for r in
103- db .query ("sql" , "SELECT id, @rid as rid FROM Post" ).to_list ()}
104- for etype , edges , frm , to in (("POSTED" , posted , urid , prid ),
105- ("ANSWERS" , answers , prid , prid )):
106- with db .graph_batch (batch_size = max (1 , len (edges )), expected_edge_count = max (1 , len (edges )),
107- bidirectional = False , commit_every = max (1 , len (edges )),
108- use_wal = False , parallel_flush = pf ) as b :
109- for a , c in edges :
110- b .new_edge (frm [a ], etype , to [c ])
111- load_s = time .time () - t0
104+ with bc .timed () as t_ing :
105+ for vtype , ids in (("User" , users ), ("Post" , posts )):
106+ with db .graph_batch (batch_size = max (1 , len (ids )), expected_edge_count = 0 ,
107+ bidirectional = False , commit_every = max (1 , len (ids )),
108+ use_wal = False , parallel_flush = pf ) as b :
109+ b .create_vertices (vtype , [{"id" : i } for i in ids ])
110+ urid = {int (r ["id" ]): r ["rid" ] for r in
111+ db .query ("sql" , "SELECT id, @rid as rid FROM User" ).to_list ()}
112+ prid = {int (r ["id" ]): r ["rid" ] for r in
113+ db .query ("sql" , "SELECT id, @rid as rid FROM Post" ).to_list ()}
114+ for etype , edges , frm , to in (("POSTED" , posted , urid , prid ),
115+ ("ANSWERS" , answers , prid , prid )):
116+ with db .graph_batch (batch_size = max (1 , len (edges )), expected_edge_count = max (1 , len (edges )),
117+ bidirectional = False , commit_every = max (1 , len (edges )),
118+ use_wal = False , parallel_flush = pf ) as b :
119+ for a , c in edges :
120+ b .new_edge (frm [a ], etype , to [c ])
112121
113122 gav_build_s = 0.0
114- if workload == "olap" : # GAV accelerates the SAME OpenCypher queries (ex 10)
115- g0 = time .time ()
116- db .command ("sql" , f"CREATE GRAPH ANALYTICAL VIEW { GAV_NAME } "
117- "VERTEX TYPES (User, Post) EDGE TYPES (POSTED, ANSWERS) "
118- "PROPERTIES (id) UPDATE MODE OFF" )
119- while True :
120- row = db .query ("sql" , "SELECT FROM schema:graphAnalyticalViews WHERE name = ?" ,
121- GAV_NAME ).first ()
122- if row is not None and row .get ("status" ) == "READY" :
123- break
124- if time .time () - g0 > 1800 :
125- raise RuntimeError ("GAV did not reach READY" )
126- time .sleep (0.25 )
127- gav_build_s = time .time () - g0
123+ with bc .timed () as t_idx :
124+ if workload == "olap" : # GAV accelerates the SAME OpenCypher queries (ex 10)
125+ g0 = time .time ()
126+ db .command ("sql" , f"CREATE GRAPH ANALYTICAL VIEW { GAV_NAME } "
127+ "VERTEX TYPES (User, Post) EDGE TYPES (POSTED, ANSWERS) "
128+ "PROPERTIES (id) UPDATE MODE OFF" )
129+ while True :
130+ row = db .query ("sql" , "SELECT FROM schema:graphAnalyticalViews WHERE name = ?" ,
131+ GAV_NAME ).first ()
132+ if row is not None and row .get ("status" ) == "READY" :
133+ break
134+ if time .time () - g0 > 1800 :
135+ raise RuntimeError ("GAV did not reach READY" )
136+ time .sleep (0.25 )
137+ gav_build_s = time .time () - g0
128138
129139 def query (q ):
130140 return len (db .query ("opencypher" , q ).to_list ())
@@ -136,12 +146,13 @@ def write(q):
136146 except Exception :
137147 db .rollback ()
138148
139- return dict (load_s = load_s , gav_build_s = gav_build_s , query = query , write = write ,
140- close = lambda : ctx .__exit__ (None , None , None ),
149+ return dict (import_s = t_imp .s , jvm_init_s = t_jvm .s , open_s = t_open .s , schema_s = t_schema .s ,
150+ ingest_s = t_ing .s , index_build_s = t_idx .s , gav_build_s = gav_build_s , db_path = path ,
151+ query = query , write = write , close = lambda : ctx .__exit__ (None , None , None ),
141152 version = getattr (arcadedb , "__version__" , "?" ))
142153
143154
144- BACKENDS = {"kuzu " : be_kuzu , "arcadedb" : be_arcadedb }
155+ BACKENDS = {"ladybug " : be_ladybug , "arcadedb" : be_arcadedb }
145156
146157
147158def run_oltp (be , users , posts , n_ops , seed = 0 ):
@@ -161,15 +172,25 @@ def run_oltp(be, users, posts, n_ops, seed=0):
161172 lat [k ].append ((time .time () - s ) * 1000 )
162173 total = time .time () - t0
163174 out = {"oltp_total_s" : round (total , 3 ), "oltp_ops_per_s" : round (n_ops / total , 1 )}
175+ alllat = []
164176 for k , v in lat .items ():
165- if v :
166- out [ f" { k } _p50_ms" ] = round ( st . median ( v ), 3 )
167- return out
177+ out . update ( bc . latstats ( k , v )); alllat . extend ( v )
178+ out . update ( bc . latstats ( "oltp" , alllat ) )
179+ return out , lat
168180
169181
170- def run_olap (be , reps = 3 ):
171- per = [round (min (_time (be , q ) for _ in range (reps )), 3 ) for q in OLAP ]
172- return {"olap_query_ms" : per , "olap_total_ms" : round (sum (per ), 3 )}
182+ def run_olap (be , reps = 7 ):
183+ per_mean , per_std , raw = [], [], {}
184+ for idx , q in enumerate (OLAP ):
185+ samples = [_time (be , q ) for _ in range (reps )]
186+ raw [f"olap_q{ idx } " ] = samples
187+ s = bc .latstats (f"olap_q{ idx } " , samples )
188+ per_mean .append (round (s [f"olap_q{ idx } _mean_ms" ], 3 ))
189+ per_std .append (round (s [f"olap_q{ idx } _std_ms" ], 3 ))
190+ out = {"olap_query_ms" : per_mean , "olap_query_std_ms" : per_std ,
191+ "olap_total_ms" : round (sum (per_mean ), 3 )}
192+ out .update (bc .latstats ("olap" , [x for v in raw .values () for x in v ]))
193+ return out , raw
173194
174195
175196def _time (be , q ):
@@ -187,14 +208,28 @@ def main():
187208
188209 users , posts , posted , answers = load_graph (args .data_dir , args .limit )
189210 be = BACKENDS [args .backend ](users , posts , posted , answers , args .workload )
211+ ingest_s = be ["ingest_s" ]
190212 res = {"backend" : args .backend , "lib_version" : be ["version" ], "lane" : "graph" ,
191213 "workload" : args .workload , "n_users" : len (users ), "n_posts" : len (posts ),
192- "n_posted" : len (posted ), "n_answers" : len (answers ), "load_s" : round (be ["load_s" ], 3 )}
214+ "n_posted" : len (posted ), "n_answers" : len (answers ),
215+ "import_s" : round (be ["import_s" ], 4 ), "jvm_init_s" : round (be ["jvm_init_s" ], 4 ),
216+ "open_s" : round (be ["open_s" ], 4 ), "schema_s" : round (be ["schema_s" ], 4 ),
217+ "ingest_s" : round (ingest_s , 3 ), "index_build_s" : round (be ["index_build_s" ], 3 ),
218+ "load_s" : round (ingest_s , 3 ), # continuity
219+ "ingest_edges_per_s" : round ((len (posted ) + len (answers )) / ingest_s , 1 ) if ingest_s else None }
193220 if be .get ("gav_build_s" ):
194221 res ["gav_build_s" ] = round (be ["gav_build_s" ], 3 )
195222 res ["gav" ] = True
196- res .update (run_oltp (be , users , posts , args .ops ) if args .workload == "oltp" else run_olap (be ))
197- be ["close" ]()
223+ if args .workload == "oltp" :
224+ m , raw = run_oltp (be , users , posts , args .ops )
225+ else :
226+ m , raw = run_olap (be )
227+ res .update (m )
228+ with bc .timed () as t_close :
229+ be ["close" ]()
230+ res ["close_s" ] = round (t_close .s , 4 )
231+ res ["db_size_mb" ] = bc .dir_size_mb (be ["db_path" ])
232+ bc .dump_latencies (os .environ .get ("RUN_LABEL" ), raw )
198233 print ("RESULT " + json .dumps (res ))
199234
200235
0 commit comments