Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
build:
needs: lint
runs-on: ubuntu-22.04
timeout-minutes: 20
timeout-minutes: 60
env:
PROXY: "http://51.83.140.52:16301"
TEST_TESTNET: "true"
Expand Down
206 changes: 146 additions & 60 deletions binance/ws/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class BinanceSocketManager:
FSTREAM_TESTNET_URL = "wss://stream.binancefuture.com/"
DSTREAM_URL = "wss://dstream.binance.{}/"
DSTREAM_TESTNET_URL = "wss://dstream.binancefuture.com/"
VSTREAM_URL = "wss://vstream.binance.{}/"
VSTREAM_TESTNET_URL = "wss://testnetws.binanceops.{}/"
OPTIONS_URL = "wss://nbstream.binance.{}/eoptions/"

WEBSOCKET_DEPTH_5 = "5"
WEBSOCKET_DEPTH_10 = "10"
Expand All @@ -47,8 +46,7 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
self.STREAM_URL = self.STREAM_URL.format(client.tld)
self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld)
self.DSTREAM_URL = self.DSTREAM_URL.format(client.tld)
self.VSTREAM_URL = self.VSTREAM_URL.format(client.tld)
self.VSTREAM_TESTNET_URL = self.VSTREAM_TESTNET_URL.format(client.tld)
self.OPTIONS_URL = self.OPTIONS_URL.format(client.tld)

self._conns = {}
self._loop = get_loop()
Expand Down Expand Up @@ -129,14 +127,12 @@ def _get_futures_socket(
return self._get_socket(path, stream_url, prefix, socket_type=socket_type)

def _get_options_socket(self, path: str, prefix: str = "ws/"):
stream_url = self.VSTREAM_URL
if self.testnet:
stream_url = self.VSTREAM_TESTNET_URL
stream_url = self.OPTIONS_URL
return self._get_socket(
path,
stream_url,
prefix,
is_binary=True,
is_binary=False,
socket_type=BinanceSocketType.OPTIONS,
)

Expand Down Expand Up @@ -659,23 +655,6 @@ def index_price_socket(self, symbol: str, fast: bool = True):
symbol.lower() + stream_name, futures_type=FuturesType.COIN_M
)

def futures_depth_socket(
self, symbol: str, depth: str = "10", futures_type=FuturesType.USD_M
):
"""Subscribe to a futures depth data stream

https://binance-docs.github.io/apidocs/futures/en/#partial-book-depth-streams

:param symbol: required
:type symbol: str
:param depth: optional Number of depth entries to return, default 10.
:type depth: str
:param futures_type: use USD-M or COIN-M futures default USD-M
"""
return self._get_futures_socket(
symbol.lower() + "@depth" + str(depth), futures_type=futures_type
)

def symbol_mark_price_socket(
self,
symbol: str,
Expand Down Expand Up @@ -874,23 +853,11 @@ def multiplex_socket(self, streams: List[str]):

def options_multiplex_socket(self, streams: List[str]):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.

Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker

Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}

https://binance-docs.github.io/apidocs/voptions/en/#account-and-trading-interface

:param streams: list of stream names in lower case
:type streams: list

:returns: connection key string if successful, False otherwise

Message Format - see Binance API docs for all types

https://developers.binance.com/docs/derivatives/option/websocket-market-streams

"""
stream_name = "/".join([s.lower() for s in streams])
stream_name = "/".join([s for s in streams])
stream_path = f"streams={stream_name}"
return self._get_options_socket(stream_path, prefix="stream?")

Expand Down Expand Up @@ -1036,60 +1003,179 @@ def isolated_margin_socket(self, symbol: str):
return self._get_account_socket(symbol, stream_url=stream_url)

def options_ticker_socket(self, symbol: str):
"""Subscribe to a 24 hour ticker info stream
"""Subscribe to a 24-hour ticker info stream for options trading.

https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-24-hour-ticker
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/24-hour-TICKER

:param symbol: required
Stream provides real-time 24hr ticker information for all symbols. Only symbols whose ticker info
changed will be sent. Updates every 1000ms.

:param symbol: The option symbol to subscribe to (e.g. "BTC-220930-18000-C")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@ticker")
return self._get_options_socket(symbol.upper() + "@ticker")

def options_ticker_by_expiration_socket(self, symbol: str, expiration_date: str):
"""Subscribe to a 24 hour ticker info stream
https://binance-docs.github.io/apidocs/voptions/en/#24-hour-ticker-by-underlying-asset-and-expiration-data
:param symbol: required
"""Subscribe to a 24-hour ticker info stream by underlying asset and expiration date.

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/24-hour-TICKER-by-underlying-asset-and-expiration-data

Stream provides real-time 24hr ticker information grouped by underlying asset and expiration date.
Updates every 1000ms.

:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
:param expiration_date : required
:param expiration_date: The expiration date (e.g., "220930" for Sept 30, 2022)
:type expiration_date: str
"""
return self._get_options_socket(symbol.lower() + "@ticker@" + expiration_date)
return self._get_options_socket(symbol.upper() + "@ticker@" + expiration_date)

def options_recent_trades_socket(self, symbol: str):
"""Subscribe to a latest completed trades stream
"""Subscribe to a real-time trade information stream.

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Trade-Streams

https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-latest-completed-trades
Stream pushes raw trade information for a specific symbol or underlying asset.
Updates every 50ms.

:param symbol: required
:param symbol: The option symbol or underlying asset (e.g., "BTC-200630-9000-P" or "BTC")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@trade")
return self._get_options_socket(symbol.upper() + "@trade")

def options_kline_socket(
self, symbol: str, interval=AsyncClient.KLINE_INTERVAL_1MINUTE
):
"""Subscribe to a candlestick data stream
"""Subscribe to a Kline/Candlestick data stream.

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Kline-Candlestick-Streams

https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-candle
Stream pushes updates to the current klines/candlestick every 1000ms (if existing).

:param symbol: required
Available intervals:
- Minutes: "1m", "3m", "5m", "15m", "30m"
- Hours: "1h", "2h", "4h", "6h", "12h"
- Days: "1d", "3d"
- Weeks: "1w"

:param symbol: The option symbol (e.g., "BTC-200630-9000-P")
:type symbol: str
:param interval: Kline interval, default KLINE_INTERVAL_1MINUTE
:type interval: str
"""
return self._get_options_socket(symbol.lower() + "@kline_" + interval)
return self._get_options_socket(symbol.upper() + "@kline_" + interval)

def options_depth_socket(self, symbol: str, depth: str = "10"):
"""Subscribe to a depth data stream
"""Subscribe to partial book depth stream for options trading.

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Partial-Book-Depth-Streams

https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-depth
Stream provides top N bids and asks from the order book.
Default update speed is 500ms if not specified in the stream name.

:param symbol: The option symbol (e.g., "BTC-200630-9000-P")
:type symbol: str
:param depth: Number of price levels. Valid values: "10", "20", "50", "100"
:type depth: str
"""
return self._get_options_socket(symbol.upper() + "@depth" + str(depth))

def futures_depth_socket(self, symbol: str, depth: str = "10", futures_type=FuturesType.USD_M):
"""Subscribe to a futures depth data stream

https://binance-docs.github.io/apidocs/futures/en/#partial-book-depth-streams

:param symbol: required
:type symbol: str
:param depth: optional Number of depth entries to return, default 10.
:type depth: str
:param futures_type: use USD-M or COIN-M futures default USD-M
"""
return self._get_futures_socket(
symbol.lower() + "@depth" + str(depth), futures_type=futures_type
)

def options_new_symbol_socket(self):
"""Subscribe to a new symbol listing information stream.

Stream provides real-time notifications when new option symbols are listed.
Updates every 50ms.

Stream name: option_pair

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/New-Symbol-Info

Response fields include:
- Event type and timestamps
- Underlying index (e.g., 'BTCUSDT')
- Quotation asset (e.g., 'USDT')
- Trading pair name (e.g., 'BTC-221116-21000-C')
- Conversion ratio and minimum trade volume
- Option type (CALL/PUT)
- Strike price and expiration time
"""
return self._get_options_socket("option_pair")

def options_open_interest_socket(self, symbol: str, expiration_date: str):
"""Subscribe to an options open interest stream.

Stream provides open interest information for specific underlying asset on specific expiration date.
Updates every 60 seconds.

Stream name format: <underlyingAsset>@openInterest@<expirationDate>

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Open-Interest

Response fields include:
- Event type and timestamps
- Option symbol (e.g., 'ETH-221125-2700-C')
- Open interest in contracts
- Open interest in USDT

:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
:param expiration_date: The expiration date (e.g., "221125" for Nov 25, 2022)
:type expiration_date: str
"""
return self._get_options_socket(symbol.upper() + "@openInterest@" + expiration_date)

def options_mark_price_socket(self, symbol: str):
"""Subscribe to an options mark price stream.

Stream provides mark price information for all option symbols on specific underlying asset.
Updates every 1000ms.

Stream name format: <underlyingAsset>@markPrice

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Mark-Price

Response fields include:
- Event type and timestamps
- Option symbol (e.g., 'ETH-220930-1500-C')
- Option mark price

:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
"""
return self._get_options_socket(symbol.upper() + "@markPrice")

def options_index_price_socket(self, symbol: str):
"""Subscribe to an options index price stream.

API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Index-Price-Streams

Stream provides index price information for underlying assets (e.g., ETHUSDT).
Updates every 1000ms.

Response fields include:
- Event type and timestamps
- Underlying symbol (e.g., 'ETHUSDT')
- Index price

:param symbol: The underlying symbol (e.g., "ETHUSDT")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@depth" + str(depth))
return self._get_options_socket(symbol.upper() + "@index")

async def _stop_socket(self, conn_key):
"""Stop a websocket given the connection key
Expand Down
100 changes: 100 additions & 0 deletions tests/test_streams_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import sys
import pytest
from binance import BinanceSocketManager

pytestmark = [
pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+"),
pytest.mark.asyncio
]

# Test constants
OPTION_SYMBOL = "BTC-250328-40000-P"
UNDERLYING_SYMBOL = "BTC"
EXPIRATION_DATE = "250328"
INTERVAL = "1m"
DEPTH = "20"

async def test_options_ticker(clientAsync):
"""Test options ticker socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_socket(OPTION_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == '24hrTicker'
await clientAsync.close_connection()

async def test_options_ticker_by_expiration(clientAsync):
"""Test options ticker by expiration socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_by_expiration_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_recent_trades(clientAsync):
"""Test options recent trades socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_recent_trades_socket(UNDERLYING_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'trade'
await clientAsync.close_connection()

async def test_options_kline(clientAsync):
"""Test options kline socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_kline_socket(OPTION_SYMBOL, INTERVAL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'kline'
await clientAsync.close_connection()

async def test_options_depth(clientAsync):
"""Test options depth socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_depth_socket(OPTION_SYMBOL, DEPTH)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'depth'
await clientAsync.close_connection()

async def test_options_multiplex(clientAsync):
"""Test options multiplex socket"""
bm = BinanceSocketManager(clientAsync)
streams = [
f"{OPTION_SYMBOL}@ticker",
f"{OPTION_SYMBOL}@trade",
]
socket = bm.options_multiplex_socket(streams)
async with socket as ts:
msg = await ts.recv()
assert 'stream' in msg
await clientAsync.close_connection()

async def test_options_open_interest(clientAsync):
"""Test options open interest socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_open_interest_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_mark_price(clientAsync):
"""Test options mark price socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_mark_price_socket(UNDERLYING_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_index_price(clientAsync):
"""Test options index price socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_index_price_socket('ETHUSDT')
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'index'
await clientAsync.close_connection()
Loading