-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgrpc_streaming.py
More file actions
153 lines (112 loc) · 3.79 KB
/
grpc_streaming.py
File metadata and controls
153 lines (112 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
"""
gRPC Streaming Example — High-performance real-time market data via gRPC.
This example demonstrates:
- Subscribing to trades, orders, blocks, and L2/L4 order books
- Automatic reconnection handling
- Graceful shutdown
gRPC offers lower latency than WebSocket for high-frequency data.
Requirements:
pip install hyperliquid-sdk[grpc]
Usage:
export ENDPOINT="https://YOUR-ENDPOINT.hype-mainnet.quiknode.pro/YOUR-TOKEN"
python grpc_streaming.py
"""
import os
import signal
import sys
from hyperliquid_sdk import HyperliquidSDK
# Get endpoint from args or environment
if len(sys.argv) > 1:
ENDPOINT = sys.argv[1]
else:
ENDPOINT = os.environ.get("ENDPOINT") or os.environ.get("QUICKNODE_ENDPOINT")
if not ENDPOINT:
print("Hyperliquid gRPC Streaming Example")
print("=" * 50)
print()
print("Usage:")
print(" export ENDPOINT='https://YOUR-ENDPOINT.quiknode.pro/TOKEN'")
print(" python grpc_streaming.py")
sys.exit(1)
def on_trade(data):
"""Handle incoming trade."""
coin = data.get("coin", "?")
px = float(data.get("px", 0))
sz = data.get("sz", "?")
side = "BUY" if data.get("side") == "B" else "SELL"
print(f"[TRADE] {coin}: {side} {sz} @ ${px:,.2f}")
def on_book_update(data):
"""Handle incoming book update."""
coin = data.get("coin", "?")
bids = data.get("bids", [])
asks = data.get("asks", [])
if bids and asks:
best_bid = bids[0]
best_ask = asks[0]
print(f"[BOOK] {coin}: Bid ${float(best_bid.get('price', 0)):,.2f} | Ask ${float(best_ask.get('price', 0)):,.2f}")
def on_l2_book(data):
"""Handle L2 order book update."""
coin = data.get("coin", "?")
bids = data.get("bids", [])[:3]
asks = data.get("asks", [])[:3]
print(f"[L2] {coin}: {len(bids)} bid levels, {len(asks)} ask levels")
def on_block(data):
"""Handle incoming block."""
block_num = data.get("block_number", "?")
print(f"[BLOCK] #{block_num}")
def on_state_change(state):
"""Handle connection state changes."""
print(f"[STATE] {state.value}")
def on_reconnect(attempt: int):
"""Handle reconnection attempts."""
print(f"[RECONNECT] Attempt {attempt}")
def on_error(error):
"""Handle errors."""
print(f"[ERROR] {error}")
def on_close():
"""Handle final connection close."""
print("[CLOSED] gRPC stream stopped")
def on_connect():
"""Handle connection established."""
print("[CONNECTED] gRPC stream ready")
def main():
print("Hyperliquid gRPC Streaming Example")
print("=" * 50)
print(f"Endpoint: {ENDPOINT[:60]}{'...' if len(ENDPOINT) > 60 else ''}")
print()
# Create SDK and get gRPC stream
sdk = HyperliquidSDK(ENDPOINT)
stream = sdk.grpc
# Configure callbacks
stream.on_error = on_error
stream.on_close = on_close
stream.on_connect = on_connect
stream.on_state_change = on_state_change
stream.on_reconnect = on_reconnect
# Subscribe to BTC and ETH trades
stream.trades(["BTC", "ETH"], on_trade)
print("Subscribed to: BTC, ETH trades")
# Subscribe to book updates
stream.book_updates(["BTC"], on_book_update)
print("Subscribed to: BTC book updates")
# Subscribe to L2 order book
stream.l2_book("ETH", on_l2_book)
print("Subscribed to: ETH L2 order book")
# Subscribe to blocks
stream.blocks(on_block)
print("Subscribed to: blocks")
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nShutting down gracefully...")
stream.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print()
print("Streaming... Press Ctrl+C to stop")
print("-" * 50)
# Run the stream (blocking)
# Use stream.start() for non-blocking background mode
stream.run()
if __name__ == "__main__":
main()