Skip to content

Commit cb78cec

Browse files
committed
fix: update DISTINCT queries to avoid database context issues and improve async loader reset function
1 parent 8747a68 commit cb78cec

7 files changed

Lines changed: 159 additions & 36 deletions

bindings/python/examples/05_csv_import_graph.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,9 @@ def create_all_vertices(
300300
def _create_users(self, total_users: int) -> tuple[int, BenchmarkStats]:
301301
"""Create User vertices.
302302
303-
Note: Uses SELECT DISTINCT (not paginated - efficient pagination difficult)
303+
Note: Uses a DISTINCT subquery. The direct DISTINCT/ORDER BY form can
304+
resolve against the wrong database context when the target graph DB is
305+
open at the same time as the source document DB.
304306
"""
305307
print(f"Creating {total_users:,} User vertices...")
306308
stats = BenchmarkStats()
@@ -317,7 +319,10 @@ def _create_users(self, total_users: int) -> tuple[int, BenchmarkStats]:
317319
with arcadedb.open_database(
318320
str(self.data_loader.source_db_path)
319321
) as source_db:
320-
query = "SELECT DISTINCT userId FROM Rating ORDER BY userId"
322+
query = (
323+
"SELECT userId FROM (SELECT DISTINCT userId FROM Rating) "
324+
"ORDER BY userId"
325+
)
321326
for record in source_db.query("sql", query):
322327
user_id = record.get("userId")
323328
async_exec.command(
@@ -344,7 +349,10 @@ def _create_users(self, total_users: int) -> tuple[int, BenchmarkStats]:
344349
with arcadedb.open_database(
345350
str(self.data_loader.source_db_path)
346351
) as source_db:
347-
query = "SELECT DISTINCT userId FROM Rating ORDER BY userId"
352+
query = (
353+
"SELECT userId FROM (SELECT DISTINCT userId FROM Rating) "
354+
"ORDER BY userId"
355+
)
348356
batch_user_ids = []
349357

350358
for record in source_db.query("sql", query):
@@ -380,7 +388,10 @@ def _create_users(self, total_users: int) -> tuple[int, BenchmarkStats]:
380388
with arcadedb.open_database(
381389
str(self.data_loader.source_db_path)
382390
) as source_db:
383-
query = "SELECT DISTINCT userId FROM Rating ORDER BY userId"
391+
query = (
392+
"SELECT userId FROM (SELECT DISTINCT userId FROM Rating) "
393+
"ORDER BY userId"
394+
)
384395
batch_user_ids = []
385396

386397
for record in source_db.query("sql", query):
@@ -1406,8 +1417,8 @@ def create_schema(db: Any, create_indexes: bool = True):
14061417
schema_commands = [
14071418
"CREATE VERTEX TYPE User",
14081419
"CREATE VERTEX TYPE Movie",
1409-
"CREATE EDGE TYPE RATED UNIDIRECTIONAL",
1410-
"CREATE EDGE TYPE TAGGED UNIDIRECTIONAL",
1420+
"CREATE EDGE TYPE RATED",
1421+
"CREATE EDGE TYPE TAGGED",
14111422
"CREATE PROPERTY User.userId INTEGER",
14121423
"CREATE PROPERTY Movie.movieId INTEGER",
14131424
"CREATE PROPERTY Movie.title STRING",
@@ -2389,6 +2400,8 @@ def main():
23892400
print("❌ Step 5: Some validations or queries failed!")
23902401
print()
23912402

2403+
validation_passed = validation_passed_before
2404+
23922405
# Step 6: Export Database (Optional)
23932406
export_filename = None
23942407
export_time = 0.0
@@ -2555,6 +2568,7 @@ def main():
25552568

25562569
except Exception as e:
25572570
print(f" ❌ Roundtrip validation failed: {e}")
2571+
validation_passed = False
25582572
print()
25592573
# Try to clean up if exists
25602574
if roundtrip_db_path.exists():

bindings/python/examples/07_stackoverflow_tables_oltp.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,10 +653,11 @@ def configure_arcadedb_async_loader(db, batch_size: int, parallelism: int = 1):
653653
return async_exec
654654

655655

656-
def reset_arcadedb_async_loader(db):
657-
db.async_executor().wait_completion()
656+
def reset_arcadedb_async_loader(db, async_exec):
657+
async_exec.wait_completion()
658+
async_exec.close()
658659
db.set_read_your_writes(True)
659-
db.async_executor().set_transaction_use_wal(True)
660+
async_exec.set_transaction_use_wal(True)
660661

661662

662663
def insert_batch_sqlite(conn, table: Dict[str, Any], rows: List[Dict[str, Any]]):
@@ -792,7 +793,7 @@ def on_error(exc: Exception):
792793

793794
return id_pools, next_ids, time.time() - start
794795
finally:
795-
reset_arcadedb_async_loader(db)
796+
reset_arcadedb_async_loader(db, async_exec)
796797

797798

798799
def load_tables(

bindings/python/examples/08_stackoverflow_tables_olap.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -627,10 +627,11 @@ def configure_arcadedb_async_loader(db, batch_size: int, parallelism: int = 1):
627627
return async_exec
628628

629629

630-
def reset_arcadedb_async_loader(db):
631-
db.async_executor().wait_completion()
630+
def reset_arcadedb_async_loader(db, async_exec):
631+
async_exec.wait_completion()
632+
async_exec.close()
632633
db.set_read_your_writes(True)
633-
db.async_executor().set_transaction_use_wal(True)
634+
async_exec.set_transaction_use_wal(True)
634635

635636

636637
def sqlite_type(field_type: str) -> str:
@@ -1587,7 +1588,7 @@ def on_error(exc: Exception):
15871588
print(f" {count:,} rows in {elapsed:.2f}s")
15881589
load_total = time.time() - load_start
15891590
finally:
1590-
reset_arcadedb_async_loader(db)
1591+
reset_arcadedb_async_loader(db, async_exec)
15911592

15921593
load_counts_start = time.time()
15931594
table_counts_after_load = count_table_rows_arcadedb(db)

bindings/python/examples/13_stackoverflow_hybrid_queries.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,11 @@ def configure_arcadedb_async_loader(db, batch_size: int, parallelism: int = 1):
495495
return async_exec
496496

497497

498-
def reset_arcadedb_async_loader(db):
499-
db.async_executor().wait_completion()
498+
def reset_arcadedb_async_loader(db, async_exec):
499+
async_exec.wait_completion()
500+
async_exec.close()
500501
db.set_read_your_writes(True)
501-
db.async_executor().set_transaction_use_wal(True)
502+
async_exec.set_transaction_use_wal(True)
502503

503504

504505
def load_table_arcadedb_async(
@@ -1721,7 +1722,7 @@ def on_error(exc: Exception):
17211722
)
17221723
load_time = time.time() - load_start
17231724
finally:
1724-
reset_arcadedb_async_loader(db)
1725+
reset_arcadedb_async_loader(db, async_exec)
17251726

17261727
index_time = create_indexes_with_retry(
17271728
db,

bindings/python/examples/16_import_database_vs_transactional_graph_ingest.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
1212
Goal: compare graph ingest speed for equivalent synthetic data shape.
1313
14-
Observed benchmark result (2026-03-19):
14+
Observed benchmark result (2026-03-24):
1515
For:
16-
- vertices=2,000,000
17-
- edges=2,000,000
16+
- vertices=5,000,000
17+
- edges=5,000,000
1818
- vertex-int-props=10
1919
- vertex-str-props=10
2020
- edge-int-props=10
@@ -24,13 +24,15 @@
2424
- heap-size=8g
2525
2626
Measured ingest times:
27-
- Transactional (`single-threaded`): 253.615s
28-
- GraphBatch (`single-threaded`, `--parallel 1`): 177.150s
29-
- GraphBatch (`4 threads`, `--parallel 4`): 187.681s
30-
- GraphBatch (`8 threads`, `--parallel 8`): 134.836s
31-
- Async SQL (`single-threaded`, `--async-parallel 1`): 230.192s
32-
- IMPORT DATABASE (`single-threaded`, `--parallel 1`): 444.357s
33-
- IMPORT DATABASE (`4 threads`, `--parallel 4`): 336.206s
27+
- Transactional (`single-threaded`, `1 thread`): 575.078s
28+
- Async SQL (`single-threaded`, `--async-parallel 1`): 701.080s
29+
- GraphBatch (`single-threaded`, `--parallel 1`): 507.983s
30+
- GraphBatch (`4 threads`, `--parallel 4`): 359.672s
31+
- IMPORT DATABASE (`single-threaded`, `--parallel 1`): 453.481s
32+
- IMPORT DATABASE (`4 threads`, `--parallel 4`): 275.325s
33+
34+
Logical parity:
35+
- All four methods produced the same final graph output
3436
3537
Known limitation:
3638
Current `IMPORT DATABASE` behavior can vary by import path and data shape. In some

bindings/python/tests/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ For detailed test documentation, examples, and best practices, see the **[Testin
77
## Quick Stats
88

99
- **27 test files** in the current suite
10-
-**Current package**: 277 passed
10+
-**Current package**: 279 passed
1111
- Package includes all ArcadeDB features (SQL, OpenCypher, Studio)
1212

1313
## Running Tests
@@ -39,7 +39,7 @@ pytest -k "transaction" -v
3939
| `test_server_patterns.py` | 6 | Embedded, server-managed, HTTP performance |
4040
| `test_import_database.py` | 13 | SQL `IMPORT DATABASE`, CSV/XML/Neo4j and restore flows |
4141
| `test_docs_examples.py` | 7 grouped tests | Validates runnable Python snippets from installation, quickstart, query, and graph docs |
42-
| `test_cypher.py` | 1 | OpenCypher query language |
42+
| `test_cypher.py` | 18 | OpenCypher query language, path modes, and planner regressions |
4343

4444
## Documentation Links
4545

bindings/python/tests/test_cypher.py

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,21 @@ def _seed_graph(db) -> None:
3939
)
4040

4141

42+
def _seed_path_mode_graph(db) -> None:
43+
db.command("sql", "CREATE VERTEX TYPE Node")
44+
db.command("sql", "CREATE EDGE TYPE LINK")
45+
46+
with db.transaction():
47+
db.command(
48+
"opencypher",
49+
"CREATE (a:Node {name: 'A'})-[:LINK]->(b:Node {name: 'B'})"
50+
"-[:LINK]->(c:Node {name: 'C'})"
51+
"-[:LINK]->(d:Node {name: 'D'})"
52+
"-[:LINK]->(a),"
53+
"(a)-[:LINK]->(e:Node {name: 'E'})",
54+
)
55+
56+
4257
def test_opencypher_basic_match(temp_db_path):
4358
"""Test basic OpenCypher MATCH/WHERE."""
4459
with arcadedb.create_database(temp_db_path) as db:
@@ -87,19 +102,108 @@ def test_opencypher_variable_length_path(temp_db_path):
87102
assert names == ["Bob", "Charlie", "David"]
88103

89104

105+
def test_opencypher_trail_is_default_path_mode(temp_db_path):
106+
"""Test that TRAIL remains the default path mode for variable-length traversals."""
107+
with arcadedb.create_database(temp_db_path) as db:
108+
_ensure_opencypher(db)
109+
_seed_path_mode_graph(db)
110+
111+
trail_names = [
112+
record.get("name")
113+
for record in db.query(
114+
"opencypher",
115+
"MATCH TRAIL (a:Node {name: 'A'})-[:LINK*1..5]->(b) "
116+
"RETURN b.name AS name",
117+
)
118+
]
119+
default_names = [
120+
record.get("name")
121+
for record in db.query(
122+
"opencypher",
123+
"MATCH (a:Node {name: 'A'})-[:LINK*1..5]->(b) " "RETURN b.name AS name",
124+
)
125+
]
126+
127+
assert len(default_names) == len(trail_names)
128+
assert "A" in trail_names
129+
130+
131+
def test_opencypher_acyclic_blocks_vertex_revisit(temp_db_path):
132+
"""Test that ACYCLIC traversal does not revisit vertices on a cycle."""
133+
with arcadedb.create_database(temp_db_path) as db:
134+
_ensure_opencypher(db)
135+
_seed_path_mode_graph(db)
136+
137+
names = [
138+
record.get("name")
139+
for record in db.query(
140+
"opencypher",
141+
"MATCH ACYCLIC (a:Node {name: 'A'})-[:LINK*1..5]->(b) "
142+
"RETURN b.name AS name",
143+
)
144+
]
145+
146+
assert "A" not in names
147+
assert {"B", "C", "D", "E"}.issubset(set(names))
148+
149+
150+
def test_opencypher_walk_produces_more_results_than_trail(temp_db_path):
151+
"""Test that WALK allows more variable-length matches than TRAIL on a cycle."""
152+
with arcadedb.create_database(temp_db_path) as db:
153+
_ensure_opencypher(db)
154+
_seed_path_mode_graph(db)
155+
156+
walk_count = len(
157+
list(
158+
db.query(
159+
"opencypher",
160+
"MATCH WALK (a:Node {name: 'A'})-[:LINK*1..6]->(b) RETURN b",
161+
)
162+
)
163+
)
164+
trail_count = len(
165+
list(
166+
db.query(
167+
"opencypher",
168+
"MATCH TRAIL (a:Node {name: 'A'})-[:LINK*1..6]->(b) RETURN b",
169+
)
170+
)
171+
)
172+
173+
assert walk_count > trail_count
174+
175+
176+
def test_opencypher_walk_requires_max_hops(temp_db_path):
177+
"""Test that WALK requires an explicit maximum hop bound."""
178+
with arcadedb.create_database(temp_db_path) as db:
179+
_ensure_opencypher(db)
180+
_seed_path_mode_graph(db)
181+
182+
with pytest.raises(Exception, match="WALK"):
183+
list(
184+
db.query(
185+
"opencypher",
186+
"MATCH WALK (a:Node {name: 'A'})-[:LINK*]->(b) RETURN b",
187+
)
188+
)
189+
190+
90191
def test_opencypher_aggregation(temp_db_path):
91192
"""Test aggregation and ordering."""
92193
with arcadedb.create_database(temp_db_path) as db:
93194
_ensure_opencypher(db)
94195
_seed_graph(db)
95196

96-
result = db.query(
97-
"opencypher",
98-
"MATCH (p:Person)-[:WORKS_FOR]->(c:Company) "
99-
"WITH c, count(p) as employees "
100-
"RETURN c.name as company, employees ORDER BY employees DESC",
197+
rows = list(
198+
db.query(
199+
"opencypher",
200+
"MATCH (p:Person)-[:WORKS_FOR]->(c:Company) "
201+
"WITH c, count(p) as employees "
202+
"RETURN c.name as company, employees ORDER BY employees DESC",
203+
)
101204
)
102-
row = next(result)
205+
206+
row = rows[0]
103207

104208
assert row.get("company") == "Acme"
105209
assert row.get("employees") == 2

0 commit comments

Comments
 (0)