-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathtrading_simulator_with_algorithmic_traders.py
More file actions
4556 lines (4210 loc) · 210 KB
/
Copy pathtrading_simulator_with_algorithmic_traders.py
File metadata and controls
4556 lines (4210 loc) · 210 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Trading Simulator in Python With Algorithmic Traders
Converted from Jupyter Notebook to a standalone Python 3.11+ script.
All functionality from the notebook has been retained and made runnable.
Features included:
- Order book, matching engine, FIX server integration (simplefix)
- Live market data via yfinance with broadcast to subscribers
- Market maker, synthetic liquidity provider
- Algorithmic traders: Momentum, EMA-based, Swing, Sentiment-analysis
- News fetching via NewsAPI for sentiment trader
- Backtesting harness
- CLI to run live or backtest modes
Notes:
- Ensure dependencies are installed: pandas, numpy, yfinance, simplefix, tensorflow, newsapi-python, statistics (stdlib), etc.
- The sentiment model file and NewsAPI key are required to enable the sentiment trader.
"""
from __future__ import annotations
from collections import deque
from bisect import insort # noqa: F401 (kept for parity with notebook; not used directly)
import argparse
import logging
import os
import json
import random
import socket
import threading
import time
import uuid
import math
from typing import Any, Deque, Dict, List, Optional, Tuple, Callable
import sys
import importlib
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
import requests
try:
import simplefix # type: ignore
SIMPLEFIX_AVAILABLE = True
except Exception: # pragma: no cover - optional at runtime
simplefix = None # type: ignore
SIMPLEFIX_AVAILABLE = False
try:
import yahooquery as yq # type: ignore
YQ_AVAILABLE = True
except Exception: # pragma: no cover - optional at runtime
yq = None # type: ignore
YQ_AVAILABLE = False
# Optional fallback: yfinance
try:
import yfinance as yf # type: ignore
YF_AVAILABLE = True
except Exception:
YF_AVAILABLE = False
try:
import tensorflow as tf # type: ignore
from tensorflow.keras.models import load_model # type: ignore
TF_AVAILABLE = True
except Exception: # pragma: no cover - optional at runtime
tf = None # type: ignore
def load_model(*args, **kwargs): # type: ignore
raise RuntimeError("TensorFlow is not available; install tensorflow to use SentimentAnalysisTrader")
TF_AVAILABLE = False
try:
from newsapi import NewsApiClient # type: ignore
NEWSAPI_AVAILABLE = True
except Exception: # pragma: no cover - optional at runtime
NEWSAPI_AVAILABLE = False
# Database (SQLAlchemy) for persistence
try:
from sqlalchemy import Column, String, Float, Integer, DateTime, Text, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
SQLA_AVAILABLE = True
except Exception:
SQLA_AVAILABLE = False
# Optional experiment tooling
try:
import optuna # type: ignore
OPTUNA_AVAILABLE = True
except Exception:
OPTUNA_AVAILABLE = False
try:
import mlflow # type: ignore
MLFLOW_AVAILABLE = True
except Exception:
MLFLOW_AVAILABLE = False
# Optional event backends
try:
import redis # type: ignore
REDIS_AVAILABLE = True
except Exception:
REDIS_AVAILABLE = False
try:
from confluent_kafka import Producer # type: ignore
KAFKA_AVAILABLE = True
except Exception:
KAFKA_AVAILABLE = False
# --------------------------------------------------------------------------------------
# Logging
# --------------------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --------------------------------------------------------------------------------------
# Event Bus (Redis/Kafka optional publishers)
# --------------------------------------------------------------------------------------
class EventBus:
def __init__(self) -> None:
self._publishers: List[Callable[[str, Dict[str, Any]], None]] = []
def add_publisher(self, fn: Callable[[str, Dict[str, Any]], None]) -> None:
self._publishers.append(fn)
def publish(self, event_type: str, payload: Dict[str, Any]) -> None:
for fn in list(self._publishers):
try:
fn(event_type, payload)
except Exception:
logging.debug("Event publish failed", exc_info=True)
def make_redis_publisher(url: str, channel: str) -> Callable[[str, Dict[str, Any]], None]:
if not REDIS_AVAILABLE:
raise RuntimeError("redis is not available")
client = redis.from_url(url) # type: ignore
ch = str(channel)
def _pub(evt: str, data: Dict[str, Any]) -> None:
try:
client.publish(ch, json.dumps({"event": evt, "data": data}, default=str)) # type: ignore
except Exception:
logging.debug("Redis publish failed", exc_info=True)
return _pub
def make_kafka_publisher(bootstrap_servers: str, topic: str) -> Callable[[str, Dict[str, Any]], None]:
if not KAFKA_AVAILABLE:
raise RuntimeError("confluent_kafka is not available")
producer = Producer({'bootstrap.servers': bootstrap_servers}) # type: ignore
tp = str(topic)
def delivery_err(err, msg): # type: ignore
if err is not None:
logging.debug("Kafka delivery failed: %s", err)
def _pub(evt: str, data: Dict[str, Any]) -> None:
try:
producer.produce(tp, json.dumps({"event": evt, "data": data}, default=str), callback=delivery_err)
producer.poll(0)
except Exception:
logging.debug("Kafka publish failed", exc_info=True)
return _pub
# --------------------------------------------------------------------------------------
# FIX tags and mappings (retained from notebook)
# --------------------------------------------------------------------------------------
FIX_TAGS: Dict[int, str] = {
8: "BeginString",
35: "MsgType",
11: "ClOrdID",
54: "Side",
55: "Symbol",
44: "Price",
38: "OrderQty",
10: "Checksum",
41: "OrigClOrdID",
}
SIDE_MAPPING: Dict[str, str] = {
'1': 'Buy',
'2': 'Sell',
}
MSG_TYPE_MAPPING: Dict[str, str] = {
'D': 'NewOrderSingle',
'F': 'OrderCancelRequest',
}
# --------------------------------------------------------------------------------------
# Yahoo data fetch utilities (yahooquery) with retry + optional on-disk cache
# --------------------------------------------------------------------------------------
TICK_SIZE: Dict[str, float] = {}
LOT_SIZE: Dict[str, int] = {}
DECIMAL_PRECISION: Dict[str, int] = {}
INSTRUMENTS: Dict[str, Dict[str, Any]] = {}
def _tick_size_for_symbol(symbol: str) -> float:
return float(TICK_SIZE.get(symbol, 0.01))
def _normalize_price_to_tick(price: float, symbol: str) -> float:
tick = _tick_size_for_symbol(symbol)
if tick <= 0:
res = float(price)
else:
res = float(round(float(price) / tick) * tick)
# Apply instrument decimal precision if configured
prec = int(DECIMAL_PRECISION.get(symbol, 0))
if prec > 0:
return round(res, prec)
return res
def _lot_size_for_symbol(symbol: str) -> int:
return int(LOT_SIZE.get(symbol, 1))
def _normalize_quantity_to_lot(quantity: int, symbol: str) -> int:
lot = _lot_size_for_symbol(symbol)
if lot <= 1:
return int(quantity)
return int(quantity // lot * lot)
def _is_market_open(symbol: str, now_utc: Optional[pd.Timestamp] = None) -> bool:
cfg = INSTRUMENTS.get(symbol)
if not cfg:
return True
try:
tz = cfg.get('tz') or 'America/New_York'
open_str = cfg.get('open') or '09:30'
close_str = cfg.get('close') or '16:00'
holidays = set(cfg.get('holidays') or [])
now_utc = now_utc or pd.Timestamp.now(tz='UTC')
local = now_utc.tz_convert(tz)
date_key = local.strftime('%Y-%m-%d')
if date_key in holidays:
return False
t_open = pd.to_datetime(f"{date_key} {open_str}").tz_localize(tz)
t_close = pd.to_datetime(f"{date_key} {close_str}").tz_localize(tz)
return bool(t_open <= local <= t_close)
except Exception:
return True
def _ensure_cache_dir() -> str:
cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache")
try:
os.makedirs(cache_dir, exist_ok=True)
except Exception:
pass
return cache_dir
def _build_yf_session() -> requests.Session:
"""Create a shared requests session with caching and rate limiting if available."""
# Default plain session
session: requests.Session
try:
from requests_cache import CachedSession # type: ignore
try:
from requests_ratelimiter import LimiterSession # type: ignore
from pyrate_limiter import Duration, RequestRate, Limiter # type: ignore
class CachedLimiterSession(CachedSession, LimiterSession):
pass
session = CachedLimiterSession(
limiter=Limiter(RequestRate(2, Duration.SECOND * 5)),
backend="sqlite",
cache_name=os.path.join(_ensure_cache_dir(), "yfinance_cache"),
allowable_codes=(200,),
stale_if_error=True,
)
except Exception:
# Fallback to cache only
session = CachedSession(
cache_name=os.path.join(_ensure_cache_dir(), "yfinance_cache"),
backend="sqlite",
allowable_codes=(200,),
stale_if_error=True,
)
except Exception:
session = requests.Session()
# Set a UA header to align with yfinance behavior; set timeouts per request in calls
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
})
# Optional proxy support via environment variables HTTPS_PROXY/HTTP_PROXY already honored by requests
return session
_HTTP_SESSION: requests.Session = _build_yf_session()
def _cache_path_for_history(symbol: str, start: Optional[str], end: Optional[str], interval: Optional[str], period: Optional[str]) -> Tuple[str, str]:
cache_dir = _ensure_cache_dir()
name_parts = [symbol]
if start:
name_parts.append(start)
if end:
name_parts.append(end)
if interval:
name_parts.append(interval)
if period:
name_parts.append(period)
base = "yq_hist_" + "_".join(name_parts)
parquet_path = os.path.join(cache_dir, f"{base}.parquet")
csv_path = os.path.join(cache_dir, f"{base}.csv")
return parquet_path, csv_path
def _save_cache_df(df: pd.DataFrame, parquet_path: str, csv_path: str) -> None:
try:
# Try parquet first for efficiency
df.to_parquet(parquet_path, index=True)
except Exception:
try:
df.to_csv(csv_path, index=True)
except Exception:
pass
def _load_cache_df(parquet_path: str, csv_path: str) -> Optional[pd.DataFrame]:
if os.path.exists(parquet_path):
try:
return pd.read_parquet(parquet_path)
except Exception:
pass
if os.path.exists(csv_path):
try:
return pd.read_csv(csv_path, index_col=0, parse_dates=True)
except Exception:
pass
return None
def _normalize_yq_history_df(df: pd.DataFrame, symbol: str) -> pd.DataFrame:
if df is None or df.empty:
return df
# Handle MultiIndex (symbol, date) or symbol column
if isinstance(df.index, pd.MultiIndex) and 'symbol' in df.index.names:
try:
df = df.xs(symbol, level='symbol', drop_level=True)
except Exception:
pass
if 'symbol' in df.columns:
try:
df = df[df['symbol'] == symbol].drop(columns=['symbol'])
except Exception:
pass
# Rename columns to expected case
rename_map = {
'open': 'Open',
'high': 'High',
'low': 'Low',
'close': 'Close',
'adjclose': 'Adj Close',
'adj_close': 'Adj Close',
'volume': 'Volume',
}
actual_rename: Dict[str, str] = {}
for col in df.columns:
low = col.lower()
if low in rename_map:
actual_rename[col] = rename_map[low]
if actual_rename:
df = df.rename(columns=actual_rename)
return df
def _yq_history_with_retry(symbol: str, *, period: Optional[str] = None, interval: Optional[str] = None,
start: Optional[str] = None, end: Optional[str] = None,
max_retries: int = 5, base_backoff: float = 1.5) -> pd.DataFrame:
last_exc: Optional[Exception] = None
# Primary: yahooquery (if available)
if YQ_AVAILABLE:
for attempt in range(max_retries):
try:
if period is not None and interval is not None:
data = yq.Ticker(symbol, session=_HTTP_SESSION).history(period=period, interval=interval)
else:
# Date range daily
data = yq.Ticker(symbol, session=_HTTP_SESSION).history(start=start, end=end, interval='1d')
if data is not None and not data.empty:
data = _normalize_yq_history_df(data, symbol)
if data is not None and not data.empty:
return data
except Exception as exc:
last_exc = exc
logging.warning("yahooquery fetch failed (attempt %s/%s): %s", attempt + 1, max_retries, exc)
# Backoff with jitter
sleep_s = base_backoff ** attempt + random.uniform(0, 0.5)
time.sleep(sleep_s)
logging.error("yahooquery fetch failed or empty after %s attempts; falling back to yfinance if available", max_retries)
# Fallback: yfinance
if YF_AVAILABLE:
try:
if period is not None and interval is not None:
df = yf.Ticker(symbol).history(period=period, interval=interval)
else:
df = yf.download(symbol, start=start, end=end, interval='1d', progress=False)
if df is not None and not df.empty:
# Ensure DataFrame has expected columns and index
if 'Close' not in df.columns and 'close' in [c.lower() for c in df.columns]:
# yfinance generally returns capitalized columns, but guard just in case
rename_map = {c: c.capitalize() for c in df.columns}
df = df.rename(columns=rename_map)
return df
except Exception as exc:
last_exc = exc
logging.error("yfinance fallback failed: %s", exc)
# Neither provider succeeded
if last_exc:
raise last_exc
raise RuntimeError("Market data fetch returned empty data after retries (yahooquery/yfinance)")
# --------------------------------------------------------------------------------------
# Order and OrderBook
# --------------------------------------------------------------------------------------
@dataclass
class Order:
id: str
price: float
quantity: int
side: str # 'buy' or 'sell'
type: str # 'limit' or 'market'
symbol: str
tif: str = 'GTC' # 'GTC', 'IOC', 'FOK'
post_only: bool = False
timestamp: float = field(default_factory=lambda: time.time())
owner_id: str = "default"
expires_at: Optional[pd.Timestamp] = None
auction_only: bool = False
auction_phase: Optional[str] = None # 'open' or 'close'
class OrderBook:
"""In-memory limit order book for bids and asks with basic operations."""
def __init__(self) -> None:
self.bids: Dict[float, Deque[Order]] = {}
self.asks: Dict[float, Deque[Order]] = {}
self.order_map: Dict[str, Order] = {}
self._bid_prices: List[float] = [] # descending
self._ask_prices: List[float] = [] # ascending
def add_order(self, order: Order) -> None:
# Enforce tick size normalization
normalized_price = _normalize_price_to_tick(order.price, order.symbol)
order.price = normalized_price
if order.side == 'buy':
if order.price not in self.bids:
self.bids[order.price] = deque()
# insert into sorted bid prices (desc)
import bisect
pos = bisect.bisect_left([-p for p in self._bid_prices], -order.price)
self._bid_prices.insert(pos, order.price)
self.bids[order.price].append(order)
else:
if order.price not in self.asks:
self.asks[order.price] = deque()
# insert into sorted ask prices (asc)
import bisect
pos = bisect.bisect_left(self._ask_prices, order.price)
self._ask_prices.insert(pos, order.price)
self.asks[order.price].append(order)
self.order_map[order.id] = order
def remove_order(self, order_id: str) -> None:
if order_id not in self.order_map:
return
order = self.order_map[order_id]
if order.side == 'buy':
queue = self.bids.get(order.price)
if queue is not None and order in queue:
queue.remove(order)
if not queue:
del self.bids[order.price]
# remove from price list
try:
self._bid_prices.remove(order.price)
except ValueError:
pass
else:
queue = self.asks.get(order.price)
if queue is not None and order in queue:
queue.remove(order)
if not queue:
del self.asks[order.price]
try:
self._ask_prices.remove(order.price)
except ValueError:
pass
del self.order_map[order_id]
def cancel_order(self, order_id: str) -> None:
if order_id in self.order_map:
self.remove_order(order_id)
logging.info(f"Order {order_id} has been cancelled.")
else:
logging.warning(f"Order {order_id} not found for cancellation.")
def cancel_orders_by_owner(self, owner_id: str) -> int:
to_cancel: List[str] = []
for oid, order in list(self.order_map.items()):
try:
if getattr(order, 'owner_id', None) == owner_id:
to_cancel.append(oid)
except Exception:
pass
count = 0
for oid in to_cancel:
try:
self.cancel_order(oid)
count += 1
except Exception:
pass
return count
def modify_order(self, order_id: str, new_quantity: Optional[int] = None, new_price: Optional[float] = None) -> None:
if order_id not in self.order_map:
logging.warning(f"Order {order_id} not found for modification.")
return
order = self.order_map[order_id]
self.remove_order(order_id)
if new_quantity is not None:
order.quantity = int(new_quantity)
if new_price is not None:
order.price = float(new_price)
self.add_order(order)
logging.info(f"Order {order_id} has been modified.")
def get_best_bid(self) -> Optional[float]:
# Return highest bid price, cleaning up any stale price levels
while self._bid_prices:
top = self._bid_prices[0]
if top in self.bids and self.bids.get(top):
return top
# stale level with no queue; remove
try:
self._bid_prices.pop(0)
except Exception:
break
return None
def get_best_ask(self) -> Optional[float]:
# Return lowest ask price, cleaning up any stale price levels
while self._ask_prices:
top = self._ask_prices[0]
if top in self.asks and self.asks.get(top):
return top
# stale level with no queue; remove
try:
self._ask_prices.pop(0)
except Exception:
break
return None
def bids_to_dataframe(self) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for price in sorted(self.bids.keys(), reverse=True):
for order in self.bids[price]:
rows.append({'Order ID': order.id, 'Price': order.price, 'Quantity': order.quantity})
return pd.DataFrame(rows)
def asks_to_dataframe(self) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for price in sorted(self.asks.keys()):
for order in self.asks[price]:
rows.append({'Order ID': order.id, 'Price': order.price, 'Quantity': order.quantity})
return pd.DataFrame(rows)
def display_order_book(self) -> None:
logging.info("Order Book:")
bids_df = self.bids_to_dataframe()
asks_df = self.asks_to_dataframe()
logging.info("Bid Side:\n%s", bids_df.to_string(index=False) if not bids_df.empty else "<empty>")
logging.info("Ask Side:\n%s", asks_df.to_string(index=False) if not asks_df.empty else "<empty>")
# --------------------------------------------------------------------------------------
# Matching Engine
# --------------------------------------------------------------------------------------
class MatchingEngine:
"""Naive matching engine to cross incoming orders against resting book."""
def __init__(self, order_book: OrderBook) -> None:
self.order_book = order_book
self._lock = threading.RLock()
self._trade_subscribers: List[Callable[["Execution"], None]] = []
self.risk_manager: Optional[RiskManager] = None # type: ignore[name-defined]
# Latency/slippage modeling (used by backtests)
self.latency_ms: int = 0
self.slippage_bps_per_100_shares: float = 0.0
self._current_time: Optional[pd.Timestamp] = None
self._delayed_orders: List[Tuple[pd.Timestamp, Order]] = []
# Price band protection
self.price_band_bps: float = 0.0
self.band_reference: str = 'mid' # 'mid' or 'last'
self._last_trade_price_by_symbol: Dict[str, float] = {}
# Auction state
self.auction_mode: Optional[str] = None # None/'open'/'close'
self._auction_orders: List[Order] = []
# Fees/Rebates (bps)
self.taker_fee_bps: float = 0.0
self.maker_rebate_bps: float = 0.0
# Optional event logger
self.event_logger: Optional[EventLogger] = None # type: ignore[name-defined]
self.audit_logger: Optional[AuditLogger] = None # type: ignore[name-defined]
# Halted symbols
self._halted: set[str] = set()
# Optional submission queue for concurrency model
self.use_queue: bool = False
self.queue_max: int = 10000
self._order_queue: Deque[Order] = deque()
self._queue_cond = threading.Condition(self._lock)
self._loop_running: bool = False
self._loop_thread: Optional[threading.Thread] = None
# Snapshotting
self.snapshot_interval_sec: int = 0
self.snapshot_dir: Optional[str] = None
self._snapshot_thread: Optional[threading.Thread] = None
self._snapshot_running: bool = False
# Track last execution per symbol for adverse selection
self._last_exec_by_symbol: Dict[str, "Execution"] = {}
def subscribe_trades(self, callback: Callable[["Execution"], None]) -> None:
self._trade_subscribers.append(callback)
def _emit_trade(self, execution: "Execution") -> None:
# Track last trade price per symbol for banding
try:
self._last_trade_price_by_symbol[execution.symbol] = float(execution.price)
except Exception:
pass
# Event log with best bid/ask snapshot for TCA
try:
if self.event_logger is not None:
bb = self.order_book.get_best_bid()
ba = self.order_book.get_best_ask()
self.event_logger.log_execution(execution, best_bid=bb, best_ask=ba)
if self.audit_logger is not None:
self.audit_logger.log_execution(execution)
except Exception:
pass
for cb in list(self._trade_subscribers):
try:
cb(execution)
except Exception as exc:
logging.exception("Trade subscriber error: %s", exc)
# TCA logging: slippage vs mid and last trade
try:
mid = self._mid_price()
last = self.get_last_trade_price(execution.symbol)
slip_mid = None if mid is None else ((execution.price - mid) / mid * 10000.0 if execution.side == 'buy' else (mid - execution.price) / mid * 10000.0)
slip_last = None if last is None or last == 0 else ((execution.price - last) / last * 10000.0 if execution.side == 'buy' else (last - execution.price) / last * 10000.0)
if hasattr(self, 'tca_logger') and getattr(self, 'tca_logger') is not None:
self.tca_logger.log_tca(execution.timestamp, execution.symbol, execution.side, float(execution.price), mid, last, slip_mid, slip_last) # type: ignore[attr-defined]
# Adverse selection: compare prior exec price to current exec price for same symbol and log outcome
try:
prev = self._last_exec_by_symbol.get(execution.symbol)
if prev is not None:
next_price = float(execution.price)
adverse = bool((prev.side == 'buy' and next_price < float(prev.price)) or (prev.side == 'sell' and next_price > float(prev.price)))
self.tca_logger.log_tca_adv(prev.timestamp, prev.symbol, prev.side, float(prev.price), next_price, adverse) # type: ignore[attr-defined]
except Exception:
pass
# Update last exec for symbol
self._last_exec_by_symbol[execution.symbol] = execution
except Exception:
pass
def _reference_price(self, symbol: str) -> Optional[float]:
if self.band_reference == 'last':
return self._last_trade_price_by_symbol.get(symbol)
# default to mid
best_bid = self.order_book.get_best_bid()
best_ask = self.order_book.get_best_ask()
if best_bid is not None and best_ask is not None:
return float((best_bid + best_ask) / 2.0)
return self._last_trade_price_by_symbol.get(symbol)
def _mid_price(self) -> Optional[float]:
bb = self.order_book.get_best_bid()
ba = self.order_book.get_best_ask()
if bb is None or ba is None:
return None
return float((bb + ba) / 2.0)
def _within_price_band(self, order: Order) -> bool:
if self.price_band_bps <= 0 or order.type != 'limit':
return True
ref = self._reference_price(order.symbol)
if ref is None or ref <= 0:
return True
dev_bps = abs(order.price - ref) / ref * 10000.0
return dev_bps <= self.price_band_bps
def get_last_trade_price(self, symbol: str) -> Optional[float]:
return self._last_trade_price_by_symbol.get(symbol)
def _available_depth(self, side: str, limit_price: float) -> int:
# Returns total opposing quantity available up to limit price for sweeping
total = 0
if side == 'buy':
# consume asks up to limit_price
for px in list(self.order_book._ask_prices):
if px is None or px > limit_price:
break
q = self.order_book.asks.get(px)
if not q:
continue
for o in q:
total += int(getattr(o, 'quantity', 0))
else:
# consume bids down to limit_price
for px in list(self.order_book._bid_prices):
if px is None or px < limit_price:
break
q = self.order_book.bids.get(px)
if not q:
continue
for o in q:
total += int(getattr(o, 'quantity', 0))
return total
def set_time(self, current_time: pd.Timestamp) -> None:
self._current_time = current_time
def process_delayed_orders(self, upto_time: pd.Timestamp) -> None:
with self._lock:
if not self._delayed_orders:
return
self._delayed_orders.sort(key=lambda x: x[0])
ready: List[Tuple[pd.Timestamp, Order]] = []
while self._delayed_orders and self._delayed_orders[0][0] <= upto_time:
ready.append(self._delayed_orders.pop(0))
for _, order in ready:
self._direct_match(order)
def match_order(self, incoming_order: Order) -> None:
with self._lock:
# Halt protection
if incoming_order.symbol in self._halted and self.auction_mode is None:
logging.warning("Order %s rejected: trading halted for %s", incoming_order.id, incoming_order.symbol)
return
# Pre-trade risk check
if self.risk_manager is not None and not self.risk_manager.allow_order(incoming_order):
logging.warning("Order %s rejected by risk manager", incoming_order.id)
return
# Session check
if not _is_market_open(incoming_order.symbol, self._current_time or pd.Timestamp.now(tz='UTC')):
logging.warning("Order %s rejected: market closed for %s", incoming_order.id, incoming_order.symbol)
return
# Auction routing
if self.auction_mode is not None:
# Only accept orders allowed for current phase
if incoming_order.auction_only and (incoming_order.auction_phase in (None, self.auction_mode)):
self._auction_orders.append(incoming_order)
return
# During auction, regular orders rest but not execute until uncross, unless they are auction_only which go to pool
if not incoming_order.auction_only:
# Allow resting for limit; hold market orders until uncross
if incoming_order.type == 'limit':
self.order_book.add_order(incoming_order)
else:
# buffer market orders into auction pool
incoming_order.auction_only = True
incoming_order.auction_phase = self.auction_mode
self._auction_orders.append(incoming_order)
return
# Log accepted new order event
try:
if self.event_logger is not None:
self.event_logger.log_new_order(incoming_order)
if self.audit_logger is not None:
self.audit_logger.log_new_order(incoming_order)
except Exception:
pass
# Latency: enqueue if configured and current time known
if self.latency_ms > 0 and self._current_time is not None:
available_at = self._current_time + pd.Timedelta(milliseconds=int(self.latency_ms))
self._delayed_orders.append((available_at, incoming_order))
return
# Direct matching path
self._direct_match(incoming_order)
def submit_order(self, incoming_order: Order) -> None:
if not self.use_queue:
self.match_order(incoming_order)
return
with self._lock:
if len(self._order_queue) >= self.queue_max:
logging.warning("Order queue full; rejecting order %s", incoming_order.id)
return
self._order_queue.append(incoming_order)
self._queue_cond.notify()
def start_loop(self) -> None:
if not self.use_queue:
return
with self._lock:
if self._loop_running:
return
self._loop_running = True
self._loop_thread = threading.Thread(target=self._run_loop, daemon=True)
self._loop_thread.start()
def stop_loop(self) -> None:
with self._lock:
self._loop_running = False
self._queue_cond.notify_all()
if self._loop_thread is not None:
try:
self._loop_thread.join(timeout=3)
except Exception:
pass
self._loop_thread = None
def _run_loop(self) -> None:
while True:
with self._lock:
if not self._loop_running:
break
while self._loop_running and not self._order_queue:
self._queue_cond.wait(timeout=0.5)
if not self._loop_running:
return
if not self._order_queue:
continue
order = self._order_queue.popleft()
# Process outside of lock to avoid blocking producers
try:
self.match_order(order)
except Exception as exc:
logging.exception("Engine loop error: %s", exc)
def halt(self, symbol: str) -> None:
with self._lock:
self._halted.add(symbol)
def resume(self, symbol: str) -> None:
with self._lock:
self._halted.discard(symbol)
def depth_snapshot(self, top_n: int = 5) -> Dict[str, List[Tuple[float, int]]]:
with self._lock:
bids: List[Tuple[float, int]] = []
asks: List[Tuple[float, int]] = []
for px in list(self.order_book._bid_prices)[:top_n]:
q = self.order_book.bids.get(px)
if q:
bids.append((px, sum(o.quantity for o in q)))
for px in list(self.order_book._ask_prices)[:top_n]:
q = self.order_book.asks.get(px)
if q:
asks.append((px, sum(o.quantity for o in q)))
return {"bids": bids, "asks": asks}
def _snapshot_once(self) -> None:
try:
if not self.snapshot_dir:
return
os.makedirs(self.snapshot_dir, exist_ok=True)
snap = {"bids": [], "asks": []}
with self._lock:
for px in self.order_book._bid_prices:
q = self.order_book.bids.get(px)
if not q:
continue
snap["bids"].append({"price": px, "orders": [
{"id": o.id, "qty": o.quantity, "owner": getattr(o, 'owner_id', ''), "symbol": getattr(o, 'symbol', ''), "side": "buy"} for o in q
]})
for px in self.order_book._ask_prices:
q = self.order_book.asks.get(px)
if not q:
continue
snap["asks"].append({"price": px, "orders": [
{"id": o.id, "qty": o.quantity, "owner": getattr(o, 'owner_id', ''), "symbol": getattr(o, 'symbol', ''), "side": "sell"} for o in q
]})
path = os.path.join(self.snapshot_dir, f"snapshot_{int(time.time())}.json")
with open(path, 'w', encoding='utf-8') as f:
json.dump(snap, f)
except Exception as exc:
logging.debug("Snapshot failed: %s", exc)
def snapshot_now(self) -> None:
self._snapshot_once()
def _snapshot_loop(self) -> None:
while True:
with self._lock:
if not self._snapshot_running or self.snapshot_interval_sec <= 0:
break
interval = self.snapshot_interval_sec
try:
self._snapshot_once()
except Exception:
pass
time.sleep(max(1, int(interval)))
def start_snapshotting(self, interval_sec: int, out_dir: str) -> None:
with self._lock:
self.snapshot_interval_sec = max(0, int(interval_sec))
self.snapshot_dir = out_dir
if self.snapshot_interval_sec <= 0:
return
if self._snapshot_running:
return
self._snapshot_running = True
self._snapshot_thread = threading.Thread(target=self._snapshot_loop, daemon=True)
self._snapshot_thread.start()
def stop_snapshotting(self) -> None:
with self._lock:
self._snapshot_running = False
if self._snapshot_thread is not None:
try:
self._snapshot_thread.join(timeout=3)
except Exception:
pass
self._snapshot_thread = None
def load_snapshot_file(self, path: str) -> None:
"""Load an order book snapshot JSON and rebuild book state.
Orders are reconstructed as resting limit orders using stored symbol/side/price/qty.
"""
try:
with open(path, 'r', encoding='utf-8') as f:
snap = json.load(f)
except Exception as exc:
raise RuntimeError(f"Failed to load snapshot: {exc}")
with self._lock:
# Clear existing book
self.order_book.bids.clear()
self.order_book.asks.clear()
self.order_book.order_map.clear()
self.order_book._bid_prices.clear()
self.order_book._ask_prices.clear()
# Rebuild from snapshot
for side_key in ("bids", "asks"):
levels = snap.get(side_key, []) or []
for lvl in levels:
px = float(lvl.get("price"))
for od in (lvl.get("orders") or []):
try:
oid = str(od.get("id") or uuid.uuid4().hex)
qty = int(od.get("qty") or 0)
if qty <= 0:
continue
sym = str(od.get("symbol") or '')
side = str(od.get("side") or ("buy" if side_key == "bids" else "sell"))
owner = str(od.get("owner") or '')
o = Order(id=oid, price=px, quantity=qty, side=side, type='limit', symbol=sym, owner_id=owner)
self.order_book.add_order(o)
except Exception:
continue
def start_auction(self, phase: str) -> None:
with self._lock:
self.auction_mode = phase
self._auction_orders.clear()
def uncross_auction(self) -> None:
with self._lock:
if self.auction_mode is None:
return
# Build indicative book from resting regular limits and auction pool
temp_bids: Dict[float, int] = {}
temp_asks: Dict[float, int] = {}
def add_side(dst: Dict[float, int], px: float, qty: int):
dst[px] = dst.get(px, 0) + qty
# Include resting limit orders
for px, q in self.order_book.bids.items():
size = sum(o.quantity for o in q if o.type == 'limit')
if size > 0:
add_side(temp_bids, px, size)
for px, q in self.order_book.asks.items():
size = sum(o.quantity for o in q if o.type == 'limit')
if size > 0:
add_side(temp_asks, px, size)
# Include auction-only orders as aggressive: market treated as crossing all; limits as normal
for ao in self._auction_orders:
if ao.side == 'buy':
px = math.inf if ao.type == 'market' else ao.price
add_side(temp_bids, px, ao.quantity)
else:
px = 0.0 if ao.type == 'market' else ao.price
add_side(temp_asks, px, ao.quantity)
# Determine single-price cross maximizing matched volume