-
Notifications
You must be signed in to change notification settings - Fork 678
Expand file tree
/
Copy pathrolling_window_ticker.py
More file actions
45 lines (35 loc) · 1.24 KB
/
rolling_window_ticker.py
File metadata and controls
45 lines (35 loc) · 1.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import os
import logging
from binance_sdk_spot.spot import (
Spot,
SPOT_WS_STREAMS_PROD_URL,
ConfigurationWebSocketStreams,
)
from binance_sdk_spot.websocket_streams.models import RollingWindowTickerWindowSizeEnum
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create configuration for the WebSocket Streams
configuration_ws_streams = ConfigurationWebSocketStreams(
stream_url=os.getenv("STREAM_URL", SPOT_WS_STREAMS_PROD_URL)
)
# Initialize Spot client
client = Spot(config_ws_streams=configuration_ws_streams)
async def rolling_window_ticker():
connection = None
try:
connection = await client.websocket_streams.create_connection()
stream = await connection.rolling_window_ticker(
symbol="bnbusdt",
window_size=RollingWindowTickerWindowSizeEnum["WINDOW_SIZE_1h"].value,
)
stream.on("message", lambda data: print(f"{data}"))
await asyncio.sleep(5)
await stream.unsubscribe()
except Exception as e:
logging.error(f"rolling_window_ticker() error: {e}")
finally:
if connection:
await connection.close_connection(close_session=True)
if __name__ == "__main__":
asyncio.run(rolling_window_ticker())