Skip to content

Commit b31ca2d

Browse files
committed
added script for fetching ripple data
1 parent 2fc45d9 commit b31ca2d

1 file changed

Lines changed: 350 additions & 0 deletions

File tree

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""
2+
Ripple (XRP Ledger) snapshot extraction.
3+
4+
This module fetches monthly account state snapshots from xrplcluster.com,
5+
stores them in SQLite, and exports CSVs. Balances are stored in drops (int).
6+
"""
7+
8+
import requests
9+
import sqlite3
10+
import csv
11+
import os
12+
import logging
13+
import time
14+
from datetime import datetime, timezone, timedelta
15+
16+
17+
# ----------------------------------------------------------------------
18+
# Configuration
19+
# ----------------------------------------------------------------------
20+
RPC_URL = "https://xrplcluster.com"
21+
OUTPUT_DIR = "/mnt/output"
22+
os.makedirs(OUTPUT_DIR, exist_ok=True)
23+
DB_FILE = os.path.join(OUTPUT_DIR, "balances.db")
24+
25+
26+
logging.basicConfig(
27+
filename=os.path.join(OUTPUT_DIR, "run.log"),
28+
level=logging.INFO,
29+
format="%(asctime)s %(message)s"
30+
)
31+
32+
session = requests.Session()
33+
34+
# ----------------------------------------------------------------------
35+
# RPC utilities
36+
# ----------------------------------------------------------------------
37+
38+
def rpc_call(payload, retries: int = 3, delay: float = 0.4) -> dict:
39+
"""Perform an RPC call with retries."""
40+
for attempt in range(retries):
41+
try:
42+
resp = session.post(RPC_URL, json=payload, timeout=60)
43+
resp.raise_for_status()
44+
result = resp.json()
45+
time.sleep(delay)
46+
return result
47+
except Exception as e:
48+
logging.warning(f"RPC call failed (attempt {attempt+1}): {e}")
49+
time.sleep(1)
50+
raise RuntimeError("RPC call failed after retries")
51+
52+
53+
def get_complete_ledger_range() -> tuple[int, int]:
54+
"""
55+
Returns (min_index, max_index) from server_info.complete_ledgers.
56+
"""
57+
result = rpc_call({"method": "server_info", "params": [{}]})
58+
complete = result["result"]["info"]["complete_ledgers"]
59+
# Parse ranges and take overall min/max
60+
min_idx, max_idx = None, None
61+
for part in complete.split(","):
62+
a, b = part.split("-")
63+
a, b = int(a), int(b)
64+
min_idx = a if min_idx is None else min(min_idx, a)
65+
max_idx = b if max_idx is None else max(max_idx, b)
66+
return min_idx, max_idx
67+
68+
def get_ledger_close_time(ledger_index: int) -> datetime:
69+
"""Return the UTC close time of a given ledger index."""
70+
result = rpc_call({
71+
"method": "ledger",
72+
"params": [{
73+
"ledger_index": ledger_index,
74+
"transactions": False,
75+
"accounts": False,
76+
"full": False,
77+
"expand": False
78+
}]
79+
})
80+
ledger = result["result"]["ledger"]
81+
82+
ct_human = ledger.get("close_time_human")
83+
if ct_human and ct_human.endswith(" UTC"):
84+
clean = ct_human.replace(" UTC", "")
85+
# If fractional seconds are present, strip them off
86+
if "." in clean:
87+
clean = clean.split(".")[0]
88+
return datetime.strptime(clean, "%Y-%b-%d %H:%M:%S").replace(tzinfo=timezone.utc)
89+
90+
# Fallback: use ripple epoch seconds
91+
ct_ripple = ledger.get("close_time")
92+
if isinstance(ct_ripple, int):
93+
ripple_epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
94+
return ripple_epoch + timedelta(seconds=ct_ripple)
95+
96+
raise ValueError("close_time not available in ledger response")
97+
98+
# ----------------------------------------------------------------------
99+
# Database utilities
100+
# ----------------------------------------------------------------------
101+
102+
def init_db(year, month):
103+
"""Initialize SQLite tables for a given snapshot month."""
104+
conn = sqlite3.connect(DB_FILE)
105+
cur = conn.cursor()
106+
table_name = f"accounts_{year}_{month:02d}"
107+
cur.execute(f"""
108+
CREATE TABLE IF NOT EXISTS {table_name} (
109+
account TEXT PRIMARY KEY,
110+
balance TEXT
111+
)
112+
""")
113+
# Progress table
114+
cur.execute("""
115+
CREATE TABLE IF NOT EXISTS progress (
116+
year INTEGER,
117+
month INTEGER,
118+
ledger_index INTEGER,
119+
last_marker TEXT,
120+
PRIMARY KEY (year, month)
121+
)
122+
""")
123+
conn.commit()
124+
return conn, table_name
125+
126+
def get_exact_snapshot_ledger_index(year: int, month: int) -> int:
127+
"""
128+
Find the first validated ledger whose close_time >= first day of next month (UTC).
129+
This is the canonical anchor for the prior month's end-of-month snapshot.
130+
"""
131+
# Target: first day of next month at 00:00:00 UTC
132+
next_month_dt = datetime(year, month, 1, tzinfo=timezone.utc) + timedelta(days=32)
133+
target = datetime(next_month_dt.year, next_month_dt.month, 1, tzinfo=timezone.utc)
134+
135+
lo, hi = get_complete_ledger_range()
136+
137+
ans = None
138+
while lo <= hi:
139+
mid = (lo + hi) // 2
140+
ct = get_ledger_close_time(mid)
141+
if ct >= target:
142+
ans = mid
143+
hi = mid - 1
144+
else:
145+
lo = mid + 1
146+
147+
if ans is None:
148+
raise RuntimeError("No ledger found at or after the target time within complete range")
149+
return ans
150+
151+
152+
153+
def save_progress(conn, year, month, ledger_index, marker):
154+
cur = conn.cursor()
155+
cur.execute("""
156+
INSERT INTO progress (year, month, ledger_index, last_marker)
157+
VALUES (?, ?, ?, ?)
158+
ON CONFLICT(year, month) DO UPDATE
159+
SET ledger_index=excluded.ledger_index,
160+
last_marker=excluded.last_marker
161+
""", (year, month, ledger_index, marker))
162+
conn.commit()
163+
164+
def load_progress(conn, year, month):
165+
cur = conn.cursor()
166+
cur.execute("SELECT ledger_index, last_marker FROM progress WHERE year=? AND month=?", (year, month))
167+
row = cur.fetchone()
168+
return row if row else (None, None)
169+
170+
def upsert_balance(conn, table_name, account, balance):
171+
"""Insert or update account balance in the month’s table."""
172+
cur = conn.cursor()
173+
cur.execute(f"""
174+
INSERT INTO {table_name} (account, balance)
175+
VALUES (?, ?)
176+
ON CONFLICT(account) DO UPDATE SET balance=excluded.balance
177+
""", (account, balance))
178+
179+
# ----------------------------------------------------------------------
180+
# Snapshots
181+
# ----------------------------------------------------------------------
182+
183+
def fetch_snapshot(ledger_index, conn, table_name, year, month):
184+
payload = {
185+
"method": "ledger_data",
186+
"params": [{
187+
"ledger_index": ledger_index,
188+
"type": "account",
189+
"limit": 1000
190+
}]
191+
}
192+
193+
# Resume from last marker if available
194+
last_ledger, last_marker = load_progress(conn, year, month)
195+
if last_marker:
196+
payload["params"][0]["marker"] = last_marker
197+
logging.info(f"Resuming from marker {last_marker}")
198+
199+
total_accounts = 0
200+
201+
while True:
202+
result = rpc_call(payload)
203+
state = result.get("result", {}).get("state", [])
204+
for entry in state:
205+
if entry["LedgerEntryType"] == "AccountRoot":
206+
account = entry["Account"]
207+
balance_drops = int(entry["Balance"])
208+
#balance_xrp = f"{balance_drops / 1_000_000:.6f}"
209+
#upsert_balance(conn, table_name, account, balance_xrp)
210+
upsert_balance(conn, table_name, account, balance_drops)
211+
total_accounts += 1
212+
if total_accounts % 1000 == 0:
213+
logging.info(f"{year} - {month} Total accounts saved: {total_accounts}")
214+
215+
# Commit after each page
216+
conn.commit()
217+
218+
marker = result.get("result", {}).get("marker")
219+
save_progress(conn, year, month, ledger_index, marker)
220+
221+
if not marker:
222+
break
223+
payload["params"][0]["marker"] = marker
224+
225+
logging.info(f"Snapshot complete. Final total accounts saved: {total_accounts}")
226+
227+
228+
def export_month(year: int, month: int):
229+
230+
# Resolve exact anchor: first validated ledger of the next month (UTC)
231+
ledger_index = get_exact_snapshot_ledger_index(year, month)
232+
close_time = get_ledger_close_time(ledger_index)
233+
234+
logging.info(f"Fetching snapshot for {year}-{month:02d} at ledger {ledger_index} (close_time {close_time.isoformat()})")
235+
236+
# Compute the first day of the next month
237+
snapshot_date = datetime(year, month, 1)
238+
next_month_date = snapshot_date.replace(day=28) + timedelta(days=4)
239+
next_month_date = next_month_date.replace(day=1)
240+
241+
filename = os.path.join(OUTPUT_DIR, f"ripple_{next_month_date.strftime('%Y-%m-%d')}_raw_data.csv")
242+
conn, table_name = init_db(year, month)
243+
ensure_metadata_table(conn)
244+
record_snapshot_metadata(conn, year, month, ledger_index, close_time)
245+
246+
try:
247+
fetch_snapshot(ledger_index, conn, table_name, year, month)
248+
except Exception as e:
249+
logging.error(f"Error fetching ledger {ledger_index}: {e}")
250+
conn.close()
251+
return
252+
253+
# Export final balances to CSV
254+
cur = conn.cursor()
255+
cur.execute(f"SELECT account, balance FROM {table_name}")
256+
rows = cur.fetchall()
257+
with open(filename, "w", newline="") as f:
258+
writer = csv.writer(f)
259+
for row in rows:
260+
writer.writerow(row)
261+
262+
logging.info(f"Finished. Saved {len(rows)} accounts to {filename}")
263+
conn.close()
264+
265+
def export_range(start_year: int, start_month: int, end_year: int, end_month: int):
266+
"""
267+
Export multiple months in sequence, from (start_year, start_month)
268+
through (end_year, end_month), inclusive.
269+
"""
270+
year, month = start_year, start_month
271+
while (year < end_year) or (year == end_year and month <= end_month):
272+
logging.info(f"Starting export for {year}-{month:02d}")
273+
export_month(year, month)
274+
# increment month
275+
if month == 12:
276+
year += 1
277+
month = 1
278+
else:
279+
month += 1
280+
281+
282+
def export_csv_from_sqlite(year: int, month: int):
283+
"""
284+
Export account balances for a given year and month from SQLite to CSV.
285+
Reads from table accounts_YYYY_MM and writes to /mnt/output/xrp_MM_YYYY.csv
286+
"""
287+
table_name = f"accounts_{year}_{month:02d}"
288+
csv_filename = f"xrp_{month:02d}_{year}.csv"
289+
csv_path = os.path.join(OUTPUT_DIR, csv_filename)
290+
291+
try:
292+
conn = sqlite3.connect(DB_FILE)
293+
cur = conn.cursor()
294+
295+
# Check if table exists
296+
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
297+
if not cur.fetchone():
298+
logging.error(f"Table {table_name} does not exist in {DB_FILE}")
299+
conn.close()
300+
return
301+
302+
# Query all rows
303+
cur.execute(f"SELECT account, balance FROM {table_name}")
304+
rows = cur.fetchall()
305+
306+
# Write to CSV
307+
with open(csv_path, "w", newline="") as f:
308+
writer = csv.writer(f)
309+
writer.writerow(["account", "balance"])
310+
writer.writerows(rows)
311+
312+
logging.info(f"Successfully exported {len(rows)} rows to {csv_path}")
313+
conn.close()
314+
except Exception as e:
315+
logging.error(f"Failed to export CSV for {year}-{month:02d}: {e}")
316+
317+
318+
def ensure_metadata_table(conn):
319+
cur = conn.cursor()
320+
cur.execute("""
321+
CREATE TABLE IF NOT EXISTS snapshot_metadata (
322+
year INTEGER,
323+
month INTEGER,
324+
ledger_index INTEGER,
325+
close_time TEXT,
326+
PRIMARY KEY (year, month)
327+
)
328+
""")
329+
conn.commit()
330+
331+
def record_snapshot_metadata(conn, year: int, month: int, ledger_index: int, close_time: datetime):
332+
cur = conn.cursor()
333+
cur.execute("""
334+
INSERT INTO snapshot_metadata (year, month, ledger_index, close_time)
335+
VALUES (?, ?, ?, ?)
336+
ON CONFLICT(year, month) DO UPDATE
337+
SET ledger_index = excluded.ledger_index,
338+
close_time = excluded.close_time
339+
""", (year, month, ledger_index, close_time.isoformat()))
340+
conn.commit()
341+
logging.info(f"Recorded metadata: {year}-{month:02d} → ledger {ledger_index}, close_time {close_time.isoformat()}")
342+
343+
344+
345+
# ----------------------------------------------------------------------
346+
# Entry point
347+
# ----------------------------------------------------------------------
348+
349+
if __name__ == "__main__":
350+
export_month(2025, 10) # Export October 2025 snapshot

0 commit comments

Comments
 (0)