-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathphi_forest_V3.py
More file actions
2768 lines (2469 loc) · 141 KB
/
phi_forest_V3.py
File metadata and controls
2768 lines (2469 loc) · 141 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor, VotingRegressor
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.exceptions import NotFittedError
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from datetime import date
import os
import joblib
import math
import sys
import json
import time
import argparse
import matplotlib.pyplot as plt
import yfinance as yf
# === Configuration defaults ===
# Change these at the top of the file or pass via CLI to control symbol and download ranges
DEFAULT_ASSET_SYMBOL = "GBPJPY=X"
DEFAULT_DOWNLOAD_PERIOD = "5D" # e.g., '2y', '1y', '6mo'
DEFAULT_DOWNLOAD_INTERVAL = "1H" # e.g., '1wk', '1d', '1h'
# Feature/dataset persistence settings
FEATURE_VERSION = "1" # bump if feature engineering changes
WINDOW_SIZE = 10 # rolling window length used for features
# Intraday default settings used when running in intraday mode
INTRADAY_DEFAULTS = {
'1H': {'interval': '60m', 'period': '90d'}, # More data for better models
'30M': {'interval': '30m', 'period': '60d'}, # Increased data range
'5M': {'interval': '5m', 'period': '30d'}, # Much more 5M data
'1M': {'interval': '1m', 'period': '7d'}, # Add 1-minute resolution
}
DEFAULT_MAX_TRAIN_SAMPLES = 8000 # Begin decay after ~8000 samples (rolling cap)
DEFAULT_RECENCY_HALF_LIFE = 250
DEFAULT_USE_RECENCY_WEIGHTING = True
# Fibonacci tier configuration
DEFAULT_MAX_FIB_MULTIPLIER = 100.0 # Explore tiers up to +/- this multiple
DEFAULT_INCLUDE_NEGATIVE_FIB_TIERS = True # Include tiers above the high (negative ratios)
DEFAULT_MAX_FIB_LEVELS = 1000 # Safety cap on total tier count - increased to 1000 for thorough exploration
def save_config(symbol, period, interval, path='config.json'):
cfg = {'symbol': symbol, 'period': period, 'interval': interval}
with open(path, 'w') as f:
json.dump(cfg, f, indent=2)
print(f"Saved config to {path}")
def load_config(path='config.json'):
if os.path.exists(path):
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
return None
return None
class PhiForestEngineWithVolumeTiers:
def __init__(self, symbol=None):
self.asset_symbol = (symbol or DEFAULT_ASSET_SYMBOL).upper()
print(f"[INIT] Using asset: {self.asset_symbol}")
self.model_version = "v1.0"
self.hyperparam_history = []
# Start with small, manageable hyperparameters - grow slowly with evolution
self.hyperparams = {
'n_estimators': 100, # Start small, grow with evolution
'random_state': 42,
'n_jobs': -1, # Use all CPU cores
'oob_score': True,
'min_samples_leaf': 1, # More granular splits (higher compute)
'max_depth': 10, # Start shallow, grow with evolution
'max_features': 'sqrt', # More feature evaluation
'min_impurity_decrease': 1e-9, # More sensitive splits
'bootstrap': True, # Enable bootstrap sampling
'max_samples': None, # Use all samples for each tree
}
# Meta flags (not passed to sklearn)
self.actual_price = True
self.prediction_price = True
self.setup_directories()
# load model and hyperparams if present
self.model, loaded_version = self.load_model()
# If hyperparams file contained meta flags, they will be in self.hyperparams; extract them
self.actual_price = self.hyperparams.pop('actual_price', self.actual_price)
self.prediction_price = self.hyperparams.pop('prediction_price', self.prediction_price)
if not self.model:
# Filter hyperparams to valid RandomForest parameters
allowed = RandomForestRegressor().get_params().keys()
model_kwargs = {k: v for k, v in self.hyperparams.items() if k in allowed}
self.model = RandomForestRegressor(**model_kwargs)
print("Created new model with initial hyperparameters")
else:
self.model_version = loaded_version
print(f"Loaded model version {self.model_version}")
# Check if loaded model is compatible with current feature set
# We'll do a quick feature compatibility test during first use
self._needs_retraining = False
self.history = []
self.metrics_log = []
# Small rolling history for EMC values to form adaptive thresholds
self._emc_history = []
# Whether to use phi-aware hyperparameter evolution (can be toggled later)
self.phi_evolve_enabled = True
self.X_train = []
self.y_train = []
# Track whether we've achieved perfect accuracy in the most recent cycle
self.perfect_accuracy_achieved = False
# Extensive Fibonacci tier controls
self.max_fib_multiplier = DEFAULT_MAX_FIB_MULTIPLIER
self.include_negative_fib_tiers = DEFAULT_INCLUDE_NEGATIVE_FIB_TIERS
self.max_fib_levels = DEFAULT_MAX_FIB_LEVELS
self._tiers_info_logged = False
# Load accumulated training samples if they exist
self.load_training_samples()
# Clean samples to ensure shape consistency
self.clean_training_samples()
self.save_fingerprint()
# Optional runtime context for dataset caching (set from CLI)
self.data_interval = None
self.data_period = None
self.use_training_cache = True
self.rebuild_training_cache = False
# Disable adaptive scaling by default - let trees grow freely
self.use_adaptive_scaling = False
def setup_directories(self):
if self.asset_symbol != "SPY":
old_model = f"models/SPY_phi_forest_model.pkl"
if os.path.exists(old_model):
os.remove(old_model)
print(f"[CLEANUP] Removed old model: {old_model}")
os.makedirs(f"models/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"training_data/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"metrics_log/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"forecasts/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"hyperparams/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"actual_price", exist_ok=True)
os.makedirs(f"prediction_price", exist_ok=True)
os.makedirs(f"actual_price/{self.asset_symbol}", exist_ok=True)
os.makedirs(f"prediction_price/{self.asset_symbol}", exist_ok=True)
def save_fingerprint(self):
os.makedirs("symbols_used", exist_ok=True)
with open(f"symbols_used/{self.asset_symbol}_fingerprint.txt", "w") as f:
f.write(f"Symbol: {self.asset_symbol}\nModel Version: {self.model_version}")
def load_model(self):
path = f"models/{self.asset_symbol}/phi_forest_model.pkl"
hyper_path = f"hyperparams/{self.asset_symbol}/current_hyperparams.json"
if os.path.exists(path):
try:
if os.path.getsize(path) < 1024:
raise ValueError("Model file too small, likely corrupted")
model = joblib.load(path)
print(f"Successfully loaded model from {path}")
if os.path.exists(hyper_path):
try:
with open(hyper_path, 'r') as f:
self.hyperparams = json.load(f)
print("Loaded associated hyperparameters")
except Exception as e:
print(f"Error loading hyperparameters: {e}. Using current hyperparameters.")
version_path = f"models/{self.asset_symbol}/model_version.txt"
version = "unknown"
if os.path.exists(version_path):
try:
with open(version_path, 'r') as f:
version = f.read().strip()
except:
print("Could not read version file")
return model, version
except Exception as e:
print(f"Error loading model: {e}")
try:
os.remove(path)
print(f"Removed corrupted model: {path}")
except:
pass
return None, ""
def check_feature_compatibility_and_retrain_if_needed(self, df):
"""Check if current model is compatible with current feature set and retrain if needed."""
if not hasattr(self, 'model') or self.model is None:
return False
try:
# Test feature generation with current dataset
test_window = df.iloc[-WINDOW_SIZE:] if len(df) >= WINDOW_SIZE else df
result = self.phi_fold_features_with_tiers(test_window)
if result is None:
return False
feat, _ = result
expected_features = getattr(self.model, 'n_features_in_', None)
actual_features = feat.shape[1]
if expected_features is not None and expected_features != actual_features:
print(f"[COMPATIBILITY] Model expects {expected_features} features, current code generates {actual_features}")
print("[COMPATIBILITY] Retraining model with current feature set...")
# Try to use cached data first
cached_result = self.load_cached_training_dataset(df)
if cached_result is not None:
print("[COMPATIBILITY] Using cached dataset for retraining")
success = self.train_from_cached_dataset()
if success:
print("[COMPATIBILITY] Successfully retrained from cache")
return True
# Fall back to training from current data
print("[COMPATIBILITY] No compatible cache found, training from current data")
self._needs_retraining = True
return False
except Exception as e:
print(f"[COMPATIBILITY] Error during compatibility check: {e}")
self._needs_retraining = True
return False
return True
# ===== Training dataset caching helpers =====
def _cache_dir(self):
d = f"training_data/{self.asset_symbol}"
os.makedirs(d, exist_ok=True)
return d
def _sanitize(self, s: str) -> str:
if s is None:
return ""
return ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in str(s))
def _dataset_key(self, df: pd.DataFrame) -> str:
try:
start = pd.to_datetime(df.index[0]).strftime('%Y%m%d%H%M%S')
end = pd.to_datetime(df.index[-1]).strftime('%Y%m%d%H%M%S')
except Exception:
start = 'start'
end = 'end'
interval = self._sanitize(self.data_interval or 'unknown')
period = self._sanitize(self.data_period or 'unknown')
nrows = len(df)
key = f"feat_v{FEATURE_VERSION}_{interval}_{period}_{start}_{end}_n{nrows}"
return key
def _dataset_paths(self, df: pd.DataFrame):
key = self._dataset_key(df)
base = os.path.join(self._cache_dir(), key)
return f"{base}.npz", f"{base}.json"
def clear_training_cache(self):
"""Delete cached training datasets (.npz/.json) for this symbol.
Removes files like feat_v*.npz/.json and the latest.json pointer.
"""
cache_dir = self._cache_dir()
removed = []
kept = []
try:
for fname in os.listdir(cache_dir):
fpath = os.path.join(cache_dir, fname)
if not os.path.isfile(fpath):
continue
if fname == 'latest.json' or (fname.startswith(f"feat_v{FEATURE_VERSION}_") and (fname.endswith('.npz') or fname.endswith('.json'))):
try:
os.remove(fpath)
removed.append(fname)
except Exception as e:
print(f"[CACHE] Failed to remove {fname}: {e}")
else:
kept.append(fname)
print(f"[CACHE] Removed {len(removed)} cache files from {cache_dir}")
if removed:
for r in removed[:10]:
print(f" - {r}")
if len(removed) > 10:
print(f" ... and {len(removed)-10} more")
if kept:
print(f"[CACHE] Kept {len(kept)} non-cache files")
return True
except Exception as e:
print(f"[CACHE] Error clearing cache: {e}")
return False
def orbital_cannon_reset(self):
"""ORBITAL CANNON RESET: Nuke both caches for this symbol.
- Clears training_data/<symbol> feature caches (.npz/.json + latest.json)
- Clears models/<symbol>/accumulated_training_samples.npz and resets memory
"""
print("\n===== ORBITAL CANNON RESET ENGAGED =====")
ok_cache = self.clear_training_cache()
ok_samples = self.clear_samples_cache()
overall = bool(ok_cache and ok_samples)
if overall:
print("[ORBITAL CANNON] All caches purged for symbol:", self.asset_symbol)
else:
print("[ORBITAL CANNON] Completed with warnings for symbol:", self.asset_symbol)
print("===== ORBITAL CANNON RESET COMPLETE =====\n")
return overall
def prepare_and_save_training_dataset(self, df: pd.DataFrame):
"""Build full training pairs (X,y) for df and persist to training_data/<symbol>/.
Returns (npz_path, meta_path)."""
if df is None or len(df) < WINDOW_SIZE + 2:
print(f"[CACHE] Not enough rows to prepare dataset (got {len(df) if df is not None else 0})")
return None, None
npz_path, meta_path = self._dataset_paths(df)
if os.path.exists(npz_path) and not self.rebuild_training_cache and self.use_training_cache:
print(f"[CACHE] Training dataset already exists: {npz_path}")
return npz_path, meta_path
X = []
y = []
emc_total_energy = 0.0
for i in range(WINDOW_SIZE, len(df) - 1):
window = df.iloc[i-(WINDOW_SIZE-1):i+1]
result = self.phi_fold_features_with_tiers(window)
if result is not None:
feat, emc_pi = result
X.append(feat.flatten())
y.append(df['Close'].iloc[i + 1].item())
try:
emc_total_energy += float(emc_pi)
except Exception:
pass
if not X:
print("[CACHE] No features constructed; skipping save.")
return None, None
X = np.array(X)
y = np.array(y)
# enforce rolling cap consistent with training
max_samples = getattr(self, 'max_train_samples', DEFAULT_MAX_TRAIN_SAMPLES)
if len(X) > max_samples:
X = X[-max_samples:]
y = y[-max_samples:]
# Save compressed arrays
try:
np.savez_compressed(npz_path, X=X, y=y)
meta = {
'symbol': self.asset_symbol,
'interval': self.data_interval,
'period': self.data_period,
'feature_version': FEATURE_VERSION,
'rows': len(df),
'n_samples': int(len(X)),
'window': WINDOW_SIZE,
'start': str(df.index[0]),
'end': str(df.index[-1]),
'emc_energy': float(emc_total_energy),
'created_at': pd.Timestamp.now().isoformat()
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
# also write/update a "latest" pointer file for convenience
latest_ptr = os.path.join(self._cache_dir(), 'latest.json')
with open(latest_ptr, 'w') as f:
json.dump({'npz_path': npz_path, 'meta_path': meta_path, 'meta': meta}, f, indent=2)
print(f"[CACHE] Saved training dataset: {npz_path}")
return npz_path, meta_path
except Exception as e:
print(f"[CACHE] Failed to save training dataset: {e}")
return None, None
def load_cached_training_dataset(self, df: pd.DataFrame):
"""Load cached (X,y) arrays for the given df range if available."""
if not self.use_training_cache:
return None
npz_path, meta_path = self._dataset_paths(df)
if not os.path.exists(npz_path):
return None
try:
data = np.load(npz_path)
X = data['X']
y = data['y']
print(f"[CACHE] Loaded training dataset from {npz_path} (n={len(X)})")
return X, y
except Exception as e:
print(f"[CACHE] Failed to load dataset: {e}")
return None
def train_from_cached_dataset(self, npz_path: str = None):
"""Train and save the main model using a previously cached dataset.
If npz_path is None, attempts to use latest.json pointer."""
try:
if npz_path is None:
latest_ptr = os.path.join(self._cache_dir(), 'latest.json')
if not os.path.exists(latest_ptr):
print("[CACHE] No latest cached dataset found.")
return False
with open(latest_ptr, 'r') as f:
info = json.load(f)
npz_path = info.get('npz_path')
if not os.path.exists(npz_path):
print(f"[CACHE] Cached dataset not found: {npz_path}")
return False
data = np.load(npz_path)
X = data['X']
y = data['y']
if len(X) == 0:
print("[CACHE] Empty cached dataset.")
return False
# Scale hyperparameters based on cached dataset size
scaled = self.scale_hyperparams_for_data_size(len(X))
if scaled:
print(f"[CACHE-ADAPTIVE] Hyperparameters automatically scaled for cached dataset size ({len(X)} samples)")
# validation split (time-ordered)
val_size = max(1, int(len(X) * 0.1))
if val_size >= len(X):
val_size = 1
train_size = len(X) - val_size
X_tr, y_tr = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:], y[train_size:]
sample_weight_tr = None
try:
if getattr(self, 'use_recency_weighting', DEFAULT_USE_RECENCY_WEIGHTING):
half_life = getattr(self, 'recency_half_life', DEFAULT_RECENCY_HALF_LIFE)
if half_life and half_life > 0 and len(X_tr) > 0:
idx = np.arange(len(X_tr))
sample_weight_tr = 0.5 ** ((len(X_tr) - 1 - idx) / float(half_life))
except Exception:
sample_weight_tr = None
if sample_weight_tr is not None:
self.model.fit(X_tr, y_tr, sample_weight=sample_weight_tr)
else:
self.model.fit(X_tr, y_tr)
# Evaluate and then train on full
try:
y_val_pred = self.model.predict(X_val)
r2 = r2_score(y_val, y_val_pred) if len(y_val) >= 2 else float('nan')
mae = mean_absolute_error(y_val, y_val_pred)
mse = mean_squared_error(y_val, y_val_pred)
acc = np.mean(np.isclose(y_val, y_val_pred, rtol=0.00001))
print(f"[CACHE-TRAIN] R²: {r2:.6f}, MAE: {mae:.6f}, MSE: {mse:.6f}, ACC: {acc*100:.4f}%")
except Exception:
pass
# final fit on full data
final_sw = None
try:
if getattr(self, 'use_recency_weighting', DEFAULT_USE_RECENCY_WEIGHTING):
half_life = getattr(self, 'recency_half_life', DEFAULT_RECENCY_HALF_LIFE)
if half_life and half_life > 0 and len(X) > 0:
idx = np.arange(len(X))
final_sw = 0.5 ** ((len(X) - 1 - idx) / float(half_life))
except Exception:
final_sw = None
if final_sw is not None:
self.model.fit(X, y, sample_weight=final_sw)
else:
self.model.fit(X, y)
# accumulate X_train/y_train samples (don't replace, extend)
try:
# Add new samples to existing ones
new_X = [row for row in X]
new_y = [val for val in y]
self.X_train.extend(new_X)
self.y_train.extend(new_y)
# Apply rolling decay to accumulated samples
self.apply_sample_decay()
print(f"[CACHE-TRAIN] Total accumulated samples: {len(self.X_train)}")
except Exception:
pass
self.save_model()
print("[CACHE-TRAIN] Model trained and saved from cached dataset.")
return True
except Exception as e:
print(f"[CACHE-TRAIN] Failed to train from cache: {e}")
return False
def save_model(self):
path = f"models/{self.asset_symbol}/phi_forest_model.pkl"
hyper_path = f"hyperparams/{self.asset_symbol}/current_hyperparams.json"
version_path = f"models/{self.asset_symbol}/model_version.txt"
joblib.dump(self.model, path)
with open(hyper_path, 'w') as f:
json.dump(self.hyperparams, f)
with open(version_path, 'w') as f:
f.write(self.model_version)
# Save accumulated training samples
self.save_training_samples()
# save sample_count metadata for main model
try:
sample_count = len(getattr(self, 'X_train', [])) if hasattr(self, 'X_train') else 0
# Also save the expected feature count for compatibility checking
expected_features = getattr(self.model, 'n_features_in_', None)
meta = {
'sample_count': sample_count,
'model_path': path,
'timestamp': pd.Timestamp.now().isoformat(),
'expected_features': expected_features,
'feature_version': FEATURE_VERSION
}
meta_path = f"models/{self.asset_symbol}/phi_forest_model_meta.json"
with open(meta_path, 'w') as mf:
json.dump(meta, mf, indent=2)
except Exception:
pass
def save_training_samples(self):
"""Save accumulated training samples to disk for persistence across sessions."""
try:
if not hasattr(self, 'X_train') or not hasattr(self, 'y_train'):
return
if not self.X_train or not self.y_train:
return
samples_dir = f"models/{self.asset_symbol}"
os.makedirs(samples_dir, exist_ok=True)
samples_path = f"{samples_dir}/accumulated_training_samples.npz"
X = np.array(self.X_train)
y = np.array(self.y_train)
# Apply rolling window cap before saving
max_samples = getattr(self, 'max_train_samples', DEFAULT_MAX_TRAIN_SAMPLES)
if len(X) > max_samples:
X = X[-max_samples:]
y = y[-max_samples:]
# Update the in-memory arrays too
self.X_train = [row for row in X]
self.y_train = [val for val in y]
np.savez_compressed(samples_path, X=X, y=y)
print(f"[PERSIST] Saved {len(X)} accumulated training samples to {samples_path}")
except Exception as e:
print(f"[PERSIST] Failed to save training samples: {e}")
def apply_sample_decay(self):
"""Apply rolling sample decay by keeping only the most recent max_train_samples.
Drops the oldest samples first once the cap is exceeded.
"""
try:
max_samples = getattr(self, 'max_train_samples', DEFAULT_MAX_TRAIN_SAMPLES)
cur_n = len(self.X_train)
if cur_n > max_samples:
self.X_train = self.X_train[-max_samples:]
self.y_train = self.y_train[-max_samples:]
print(f"[DECAY] Applied sample decay, kept {len(self.X_train)} most recent samples")
except Exception as e:
print(f"[DECAY] Error applying sample decay: {e}")
def accuracy_rounded_to_dimes(self, y_true, y_pred):
"""Compute accuracy after rounding both y_true and y_pred to the nearest $0.10.
Returns fraction in [0,1]. If arrays are empty, returns 0.0.
"""
try:
yt = np.asarray(y_true, dtype=float)
yp = np.asarray(y_pred, dtype=float)
if yt.size == 0 or yp.size == 0:
return 0.0
# Only consider finite pairs
finite_mask = np.isfinite(yt) & np.isfinite(yp)
if not np.any(finite_mask):
return 0.0
yt = yt[finite_mask]
yp = yp[finite_mask]
# Round both to nearest dime
yt_q = np.round(yt * 10.0) / 10.0
yp_q = np.round(yp * 10.0) / 10.0
# Overshoot penalty: any prediction greater than actual counts as zero accuracy
overshoot = yp > yt
# Accurate only if rounded values match AND it's not an overshoot
correct_and_not_overshoot = (yt_q == yp_q) & (~overshoot)
return float(np.mean(correct_and_not_overshoot))
except Exception:
return 0.0
def clear_samples_cache(self):
"""Delete accumulated training samples file and reset in-memory samples."""
try:
samples_path = f"models/{self.asset_symbol}/accumulated_training_samples.npz"
removed = False
if os.path.exists(samples_path):
try:
os.remove(samples_path)
removed = True
except Exception as e:
print(f"[PERSIST] Failed to remove samples file: {e}")
# Reset in-memory buffers
self.X_train = []
self.y_train = []
if removed:
print(f"[PERSIST] Removed accumulated samples file: {samples_path}")
else:
print(f"[PERSIST] No accumulated samples file to remove at: {samples_path}")
print("[PERSIST] In-memory training samples reset to 0")
return True
except Exception as e:
print(f"[PERSIST] Error clearing samples cache: {e}")
return False
def get_expected_feature_count(self):
"""Get the expected number of features from current feature engineering."""
# Create a small test dataframe to get feature count
try:
test_data = {
'Close': [100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0],
'High': [101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0],
'Low': [99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0],
'Volume': [1000.0] * 11
}
test_df = pd.DataFrame(test_data)
result = self.phi_fold_features_with_tiers(test_df.iloc[-10:])
if result is not None:
feat, _ = result
return len(feat.flatten())
except Exception:
pass
return 25 # fallback to expected count
def load_training_samples(self):
"""Load previously accumulated training samples from disk."""
try:
samples_path = f"models/{self.asset_symbol}/accumulated_training_samples.npz"
if not os.path.exists(samples_path):
print(f"[PERSIST] No accumulated training samples found at {samples_path}")
return
data = np.load(samples_path)
X = data['X']
y = data['y']
# Validate feature shape consistency
expected_features = self.get_expected_feature_count()
valid_samples = []
valid_targets = []
for i, (sample, target) in enumerate(zip(X, y)):
if len(sample) == expected_features:
valid_samples.append(sample)
valid_targets.append(target)
else:
print(f"[PERSIST] Skipping sample {i}: has {len(sample)} features, expected {expected_features}")
self.X_train = valid_samples
self.y_train = valid_targets
if len(valid_samples) < len(X):
print(f"[PERSIST] Filtered {len(X)} -> {len(valid_samples)} samples due to shape mismatch")
print(f"[PERSIST] Loaded {len(valid_samples)} accumulated training samples from {samples_path}")
except Exception as e:
print(f"[PERSIST] Failed to load training samples: {e}")
# Initialize empty if loading fails
self.X_train = []
self.y_train = []
def evolve_hyperparams(self, accuracy, r2, predicted_price=None, phi_signals=None, actual_price=None):
"""Evolve hyperparameters under a strict policy:
- If accuracy < 100%, only INCREASE model complexity (monotonic increases)
- Once 100% is reached, hold complexity and only fine-tune stability knobs
- If it drops below 100% again, resume increasing until back to 100%
- PARENT VOTE: If parent had higher accuracy, keep parent's hyperparameters
"""
prev_params = self.hyperparams.copy()
# Track parent's accuracy and last prediction/actual for comparison
parent_accuracy = getattr(self, '_last_accuracy', 0.0)
parent_pred = getattr(self, '_last_predicted_price', None)
parent_actl = getattr(self, '_last_actual_price', None)
# Overshoot-aware selection policy takes precedence over parent-vote by accuracy
selection_result = 'none' # 'prefer_parent' | 'prefer_child' | 'none'
current_overshoot = None
parent_overshoot = None
try:
if predicted_price is not None and actual_price is not None:
current_overshoot = float(predicted_price) > float(actual_price)
if parent_pred is not None and parent_actl is not None:
parent_overshoot = float(parent_pred) > float(parent_actl)
except Exception:
pass
# If we can compare overshoot statuses, enforce the user's policy
if current_overshoot is not None and parent_overshoot is not None:
print(f"[SELECTION] Overshoot statuses → parent: {parent_overshoot}, child: {current_overshoot}")
if current_overshoot and not parent_overshoot:
selection_result = 'prefer_parent'
print("[SELECTION] Child overshot while parent did not — keeping parent (no evolution)")
elif (not current_overshoot) and parent_overshoot:
selection_result = 'prefer_child'
print("[SELECTION] Parent overshot while child did not — evolving child (bypass parent accuracy vote)")
elif current_overshoot and parent_overshoot:
# Both overshot — TRIGGER TIER SELECTION to find a non-overshooting Fibonacci tier
print("[SELECTION] ⚠ Both parent and child overshot — triggering tier selection")
self._trigger_tier_selection = True # Flag for update() to perform tier selection
# DO NOT prefer child or parent - tier selection will handle it
# Skip normal evolution logic - tier selection will retrain and re-evaluate
selection_result = 'tier_selection'
print("[SELECTION] Will use tier-based training instead of parent/child selection")
# Apply selection result or fall back to parent-vote by accuracy
if selection_result == 'prefer_parent':
# Keep parent; do not evolve
self._last_accuracy = accuracy
# Persist last prediction/actual for next cycle
try:
self._last_predicted_price = float(predicted_price) if predicted_price is not None else None
self._last_actual_price = float(actual_price) if actual_price is not None else None
except Exception:
self._last_predicted_price = predicted_price
self._last_actual_price = actual_price
return False
elif selection_result == 'tier_selection':
# Both overshot - tier selection will handle retraining in update()
# Don't evolve hyperparams, just return False and let tier selection take over
print("[SELECTION] Skipping normal evolution - tier selection will retrain the model")
self._last_accuracy = accuracy
try:
self._last_predicted_price = float(predicted_price) if predicted_price is not None else None
self._last_actual_price = float(actual_price) if actual_price is not None else None
except Exception:
self._last_predicted_price = predicted_price
self._last_actual_price = actual_price
return False
elif selection_result == 'prefer_child':
# Proceed to evolve; bypass parent accuracy vote
self._last_accuracy = accuracy
else:
# PARENT VOTE: If parent had higher accuracy, revert to parent and skip evolution
if parent_accuracy > accuracy:
print(f"[PARENT-VOTE] Parent had higher accuracy ({parent_accuracy*100:.2f}% vs {accuracy*100:.2f}%) - keeping parent hyperparameters")
self._last_accuracy = accuracy
try:
self._last_predicted_price = float(predicted_price) if predicted_price is not None else None
self._last_actual_price = float(actual_price) if actual_price is not None else None
except Exception:
self._last_predicted_price = predicted_price
self._last_actual_price = actual_price
return False
# Store current accuracy as the new parent for next generation
self._last_accuracy = accuracy
# Helper to safely get/set and bound values
def _inc(name, step, max_val=None):
v = self.hyperparams.get(name)
if isinstance(v, (int, float)):
nv = v + step
if max_val is not None:
nv = min(max_val, nv)
self.hyperparams[name] = int(nv) if isinstance(v, int) else float(nv)
return self.hyperparams.get(name)
def _dec(name, step, min_val=None):
v = self.hyperparams.get(name)
if isinstance(v, (int, float)):
nv = v - step
if min_val is not None:
nv = max(min_val, nv)
self.hyperparams[name] = int(nv) if isinstance(v, int) else float(nv)
return self.hyperparams.get(name)
# Determine if we are at perfect accuracy for this evaluation
is_perfect = (accuracy >= 1.0)
if is_perfect:
self.perfect_accuracy_achieved = True
else:
# If we dropped below perfect, resume growth loop
if getattr(self, 'perfect_accuracy_achieved', False):
print("[EVOLVE] Dropped below 100% — resuming complexity growth loop")
self.perfect_accuracy_achieved = False
# Baseline evolution driven by the policy
if not is_perfect:
# Only increase complexity knobs until we hit 100% - VERY SLOW 1% growth
before = self.hyperparams.copy()
_inc('n_estimators', step= max(1, int(self.hyperparams.get('n_estimators', 100) * 0.01)), max_val=None)
_inc('max_depth', step=1, max_val=None)
# Smaller leaves allow more complex trees
if isinstance(self.hyperparams.get('min_samples_leaf', 1), (int, float)):
self.hyperparams['min_samples_leaf'] = max(1, int(self.hyperparams.get('min_samples_leaf', 2) - 1))
# Encourage considering more features (if numeric)
mf = self.hyperparams.get('max_features')
if isinstance(mf, (int, float)) and mf is not None:
self.hyperparams['max_features'] = min(1.0, float(mf) + 0.01) # Reduced from 0.05 to 0.01
# Reduce impurity threshold to allow finer splits
mid = self.hyperparams.get('min_impurity_decrease', 1e-7)
try:
self.hyperparams['min_impurity_decrease'] = max(1e-12, float(mid) * 0.5)
except Exception:
self.hyperparams['min_impurity_decrease'] = 1e-9
print("[EVOLVE] Very slowly increasing complexity to reach 100% accuracy (1% growth)")
else:
# At perfect accuracy: avoid increasing complexity; fine-tune stability only
# Nudge impurity up a bit to prefer simpler, more stable splits while preserving accuracy
mid = self.hyperparams.get('min_impurity_decrease', 1e-9)
try:
self.hyperparams['min_impurity_decrease'] = min(1e-5, float(mid) * 1.1)
except Exception:
pass
# Optionally nudge max_features slightly toward sqrt if numeric and > 0.8
mf = self.hyperparams.get('max_features')
if isinstance(mf, (int, float)) and mf is not None and mf > 0.8:
self.hyperparams['max_features'] = max(0.7, float(mf) - 0.02)
print("[EVOLVE] Perfect accuracy achieved — holding complexity; light stability tuning only")
# If actual price provided, compute error and use it to influence evolution
pred_error = None
if predicted_price is not None and actual_price is not None:
try:
pred_error = float(predicted_price) - float(actual_price)
abs_err = abs(pred_error)
rel_err = abs_err / float(actual_price) if float(actual_price) != 0 else None
print(f"[EVAL] pred={predicted_price}, actual={actual_price}, abs_err={abs_err}, rel_err={rel_err}")
# If relative error unusually high, increase model complexity conservatively
if rel_err is not None and rel_err > 0.02:
# Very slow 1% increase
self.hyperparams['n_estimators'] = self.hyperparams.get('n_estimators', 100) + max(1, int(self.hyperparams.get('n_estimators', 100) * 0.01))
self.hyperparams['max_depth'] = self.hyperparams.get('max_depth', 10) + 1
print(f"[EVAL] High relative error ({rel_err:.4f}) detected; very slowly increasing complexity (1%).")
except Exception:
pred_error = None
# phi-aware adjustments (respect monotonic growth policy)
if self.phi_evolve_enabled:
if phi_signals is None:
phi_signals = getattr(self, '_last_phi_signals', None)
if phi_signals:
try:
phi_pressure = abs(float(phi_signals.get('phi_pressure', 0.0)))
emc_pi = float(phi_signals.get('EMC_Pi', 0.0))
except Exception:
phi_pressure = 0.0
emc_pi = 0.0
median_emc = None
try:
if hasattr(self, '_emc_history') and len(self._emc_history) > 0:
median_emc = float(np.median(self._emc_history))
except Exception:
median_emc = None
volatile = False
if phi_pressure > 0.6:
volatile = True
elif median_emc and median_emc > 0 and emc_pi > median_emc * 2.5:
volatile = True
if volatile:
if not is_perfect:
# Only allow increases while below 100% - very slow 1% growth
self.hyperparams['n_estimators'] = self.hyperparams.get('n_estimators', 100) + max(1, int(self.hyperparams.get('n_estimators', 100) * 0.01))
self.hyperparams['max_depth'] = self.hyperparams.get('max_depth', 10) + 1
# Don't increase min_samples_leaf beyond current (avoid decreasing complexity)
print(f"[PHI-EVOLVE] Volatile regime (phi_pressure={phi_pressure:.3f}, emc={emc_pi:.3f}) — very slowly increasing (1%)")
else:
# At perfect, keep complexity stable
print(f"[PHI-EVOLVE] Volatile regime but perfect accuracy — holding complexity")
else:
if not is_perfect:
# Stable and below perfect — very slow 1% growth
self.hyperparams['max_depth'] = self.hyperparams.get('max_depth', 10) + 1
self.hyperparams['min_samples_leaf'] = max(1, self.hyperparams.get('min_samples_leaf', 2) - 1)
current_max_features = self.hyperparams.get('max_features', 0.8)
if isinstance(current_max_features, (int, float)) and current_max_features is not None:
self.hyperparams['max_features'] = min(1.0, current_max_features + 0.01) # Reduced from 0.05 to 0.01
print(f"[PHI-EVOLVE] Stable regime — very slowly increasing complexity towards 100% (1%)")
else:
# At perfect — very light, stability-oriented nudges handled above
pass
if prev_params != self.hyperparams:
self.model_version = f"v{self.model_version[1:].split('.')[0]}.{int(time.time()) % 10000}"
allowed = RandomForestRegressor().get_params().keys()
model_kwargs = {k: v for k, v in self.hyperparams.items() if k in allowed}
self.model = RandomForestRegressor(**model_kwargs)
# record evolution event including predicted price and phi_signals
evo_record = {
'timestamp': pd.Timestamp.now().isoformat(),
'from': prev_params,
'to': self.hyperparams.copy(),
'accuracy': accuracy,
'r2': r2,
'predicted_price': float(predicted_price) if predicted_price is not None else None,
'actual_price': float(actual_price) if actual_price is not None else None,
'prediction_error': float(pred_error) if pred_error is not None else None,
'phi_signals': phi_signals,
'model_version': self.model_version
}
try:
self.hyperparam_history.append(evo_record)
except Exception:
pass
# persist to disk
try:
hp_dir = f"hyperparams/{self.asset_symbol}"
os.makedirs(hp_dir, exist_ok=True)
hist_path = f"{hp_dir}/evolution_history.json"
existing = []
if os.path.exists(hist_path):
try:
with open(hist_path, 'r') as hf:
existing = json.load(hf)
except Exception:
existing = []
existing.append(evo_record)
with open(hist_path, 'w') as hf:
json.dump(existing, hf, indent=2)
except Exception as e:
print(f"Could not persist hyperparam evolution: {e}")
print(f"Evolved hyperparameters to version {self.model_version} | predicted_price: {evo_record['predicted_price']}")
# Persist last prediction/actual used for selection into 'parent' for next cycle
try:
self._last_predicted_price = float(predicted_price) if predicted_price is not None else None
self._last_actual_price = float(actual_price) if actual_price is not None else None
except Exception:
self._last_predicted_price = predicted_price
self._last_actual_price = actual_price
return True
# No evolution; still persist last prediction/actual for next cycle
try:
self._last_predicted_price = float(predicted_price) if predicted_price is not None else None
self._last_actual_price = float(actual_price) if actual_price is not None else None
except Exception:
self._last_predicted_price = predicted_price
self._last_actual_price = actual_price
return False
def phi_fold_features_with_tiers(self, df, force_actual_price_tier=False, actual_price=None, exclude_tiers=None):
"""
Generate phi-folded features with Fibonacci tier analysis.
Args:
df: DataFrame with price/volume data
force_actual_price_tier: If True, override volume-based tier selection with closest tier to actual_price
actual_price: The actual price to use for forced tier selection (required if force_actual_price_tier=True)
exclude_tiers: Set of tier indices to exclude from selection (for overshoot elimination loop)
"""
phi = 1.61803398875
if len(df) < 10:
return None
# Normalize DataFrame to handle MultiIndex issues
try:
# If df has MultiIndex columns, flatten them
if hasattr(df.columns, 'nlevels') and df.columns.nlevels > 1:
df = df.copy()
df.columns = df.columns.get_level_values(0) if len(df.columns.levels) > 0 else df.columns
# Ensure we have the required columns
required_cols = ['Close', 'High', 'Low', 'Volume']
for col in required_cols:
if col not in df.columns:
print(f"[ERROR] Missing required column: {col}")
return None
except Exception as e:
print(f"[ERROR] DataFrame structure issue: {e}")
return None
anchor = df.iloc[-2]['Close'].item()
latest = df.iloc[-1]
high = latest['High'].item()
low = latest['Low'].item()
volume = latest['Volume'].item()
delta_high = high - anchor
delta_low = anchor - low
folded_high = delta_high / phi
folded_low = delta_low / phi
non_euclid_high = anchor + folded_high
non_euclid_low = anchor - folded_low
fib_range = non_euclid_high - non_euclid_low
# Dynamically expanded Fibonacci-style ratios across positive and negative ranges
max_mult = float(getattr(self, 'max_fib_multiplier', DEFAULT_MAX_FIB_MULTIPLIER))
include_neg = bool(getattr(self, 'include_negative_fib_tiers', DEFAULT_INCLUDE_NEGATIVE_FIB_TIERS))
max_levels = int(getattr(self, 'max_fib_levels', DEFAULT_MAX_FIB_LEVELS))
# Extended Fibonacci-based ratios using golden ratio (phi) relationships
# FOCUS: High granularity between 0-1 for near-price accuracy, then coarser for extremes
phi = 1.61803398875
base = [0.0]
# FINE GRANULARITY 0.00 to 1.00 - Every 0.01 for precise near-price tiers
fib_classics = []