-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathfutures_ws_examples.py
More file actions
133 lines (106 loc) · 4.54 KB
/
futures_ws_examples.py
File metadata and controls
133 lines (106 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# !/usr/bin/env python3
# -*- mode: python; coding: utf-8 -*-
#
# Copyright (C) 2023 Benjamin Thomas Schwertfeger
# All rights reserved.
# https://github.com/btschwertfeger
#
"""
Module that provides an example usage for the Kraken Futures websocket client.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
from kraken.futures import FuturesWSClient
logging.basicConfig(
format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
LOG: logging.Logger = logging.getLogger(__name__)
clients = []
# Custom client
class Client(FuturesWSClient):
"""Can be used to create a custom trading strategy"""
async def on_message(self: Client, message: list | dict) -> None:
"""Receives the websocket messages"""
LOG.info(message)
# … apply your trading strategy in this class
# … you can also combine this with the Futures REST clients
async def main() -> None:
"""Create a client and subscribe to channels/feeds"""
key = os.getenv("FUTURES_API_KEY")
secret = os.getenv("FUTURES_SECRET_KEY")
try:
# _____Public_Websocket_Feeds___________________
client = Client()
clients.append(client)
await client.start()
# print(client.get_available_public_subscription_feeds())
products = ["PI_XBTUSD", "PF_SOLUSD"]
# subscribe to a public websocket feed
await client.subscribe(feed="ticker", products=products)
await client.subscribe(feed="book", products=products)
# await client.subscribe(feed='trade', products=products)
# await client.subscribe(feed='ticker_lite', products=products)
# await client.subscribe(feed='heartbeat')
# time.sleep(2)
# unsubscribe from a websocket feed
time.sleep(2) # in case subscribe is not done yet
# await client.unsubscribe(feed='ticker', products=['PI_XBTUSD'])
await client.unsubscribe(feed="ticker", products=["PF_XBTUSD"])
await client.unsubscribe(feed="book", products=products)
# ...
# _____Private_Websocket_Feeds_________________
if key and secret:
client_auth = Client(key=key, secret=secret)
clients.append(client_auth)
await client_auth.start()
# print(client_auth.get_available_private_subscription_feeds())
# subscribe to a private/authenticated websocket feed
await client_auth.subscribe(feed="fills")
await client_auth.subscribe(feed="open_positions")
# await client_auth.subscribe(feed='open_orders')
# await client_auth.subscribe(feed='open_orders_verbose')
# await client_auth.subscribe(feed='deposits_withdrawals')
# await client_auth.subscribe(feed='account_balances_and_margins')
# await client_auth.subscribe(feed='balances')
# await client_auth.subscribe(feed='account_log')
# await client_auth.subscribe(feed='notifications_auth')
# authenticated clients can also subscribe to public feeds
# await client_auth.subscribe(feed='ticker', products=['PI_XBTUSD', 'PF_ETHUSD'])
# time.sleep(1)
# unsubscribe from a private/authenticated websocket feed
await client_auth.unsubscribe(feed="fills")
await client_auth.unsubscribe(feed="open_positions")
# ...
while not client.exception_occur: # and not client_auth.exception_occur:
await asyncio.sleep(6)
finally:
# Close the sessions properly.
for _client in clients:
await _client.close()
if __name__ == "__main__":
asyncio.run(main())
# the websocket client will send {'event': 'asyncio.CancelledError'} via on_message
# so you can handle the behavior/next actions individually within you strategy
# ============================================================
# Alternative - as ContextManager:
# from kraken.futures import KrakenFuturesWSClient
# import asyncio
# async def on_message(message):
# print(message)
# async def main() -> None:
# async with KrakenFuturesWSClient(callback=on_message) as session:
# await session.subscribe(feed="ticker", products=["PF_XBTUSD"])
# while True:
# await asyncio.sleep(6)
# if __name__ == "__main__":
# try:
# asyncio.run(main())
# except KeyboardInterrupt:
# pass