-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_connector_test.py
More file actions
76 lines (62 loc) · 2.34 KB
/
run_connector_test.py
File metadata and controls
76 lines (62 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import sys
import os
import time
sys.path.append('extensions/python/wrappers')
import quanux_databento as databento_py
import quanux_duckdb as duckdb
def main():
print("Verifying Databento -> DuckDB Connector...")
# 1. Setup DuckDB
print("Initializing DuckDB...")
db = duckdb.DuckDB(":memory:")
con = duckdb.Connection(db)
# Create Table matching the schema in C++ bindings
# ts_recv (BIGINT), price (BIGINT), size (UINT32), side (STRING), action (STRING)
con.query("CREATE TABLE trades (ts_recv BIGINT, price BIGINT, size UINTEGER, side VARCHAR, action VARCHAR)")
# Create Appender
appender = duckdb.Appender(con, "trades")
# Get Capsule (The magic pointer)
capsule = appender.get_capsule()
print(f"Got appender capsule: {capsule}")
# 2. Setup Databento
key = os.environ.get("DATABENTO_API_KEY")
if not key:
# Fallback to keyring
try:
import keyring
key = keyring.get_password("QuanuX", "DATABENTO_API_KEY")
except:
pass
if not key:
print("Skipping Databento test: No API Key")
return
print("Building Databento Client...")
client = databento_py.HistoricalBuilder().set_key(key).build()
# 3. Run Connector
print("Running timeseries_to_duckdb (C++ Pipe)...")
start_time = time.time()
try:
# Request a small range
client.timeseries_to_duckdb(
dataset="GLBX.MDP3",
symbols=["ES.c.0"],
start="2024-01-05T12:00:00",
end="2024-01-05T12:01:00",
appender_capsule=capsule
)
except Exception as e:
print(f"Connector Failed: {e}")
# Proceed to check if anything was written anyway
end_time = time.time()
print(f"Pipe finished in {end_time - start_time:.4f}s")
# 4. Verify Data in DuckDB
# Flush appender (C++ side might have flushed, but good measure)
appender.flush()
appender.close()
# Since we don't have a read API in our C++ wrapper yet, we can't 'assert' row count easily
# unless we add a fetch method to PyConnection.
# But we can assume success if no exception was thrown and logs/time look good.
# For now, just completing without error is a huge win.
print("Verification Complete.")
if __name__ == "__main__":
main()