-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfetch.py
More file actions
155 lines (131 loc) · 6.95 KB
/
fetch.py
File metadata and controls
155 lines (131 loc) · 6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import pandas as pd
import requests
from datetime import datetime, timedelta
from io import StringIO
import os
def fetch_recent_ipo_symbols(years_back=3):
"""Dynamic IPO symbol fetching with multiple fallback methods"""
try:
print(f"[*] Fetching recent IPO symbols for last {years_back} year(s)...")
# Method 1: Try NSE API with retry
for attempt in range(3):
try:
print(f"[Attempt {attempt + 1}/3] Fetching NSE equity list...")
url = "https://nsearchives.nseindia.com/content/equities/EQUITY_L.csv"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
session = requests.Session()
session.headers.update(headers)
# Establish session first
session.get("https://www.nseindia.com", timeout=15)
resp = session.get(url, timeout=45)
resp.raise_for_status()
print("[OK] NSE API connection successful")
df = pd.read_csv(StringIO(resp.text))
print(f"[Info] NSE EQUITY_L returned {len(df)} records")
# Find the right columns
date_col = None
symbol_col = None
name_col = None
for col in df.columns:
col_upper = col.upper()
if 'DATE' in col_upper and 'LISTING' in col_upper:
date_col = col
elif 'SYMBOL' in col_upper:
symbol_col = col
elif 'NAME' in col_upper and 'COMPANY' in col_upper:
name_col = col
if date_col and symbol_col:
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
cutoff = datetime.now() - timedelta(days=365 * years_back)
# Filter for recent IPOs
recent_mask = df[date_col] > cutoff
recent_ipos = df[recent_mask]
# Remove suspicious companies
suspicious_patterns = ['RNBDENIMS']
if name_col:
suspicious_mask = recent_ipos[name_col].str.contains('|'.join(suspicious_patterns), case=False, na=False)
recent_ipos = recent_ipos[~suspicious_mask]
# Remove RE and SME shares
if symbol_col:
re_sme_mask = recent_ipos[symbol_col].str.contains('-RE|-SM|RE1', case=False, na=False)
recent_ipos = recent_ipos[~re_sme_mask]
symbols = recent_ipos[symbol_col].tolist()
companies = recent_ipos[name_col].tolist() if name_col else symbols
dates = recent_ipos[date_col].dt.strftime('%Y-%m-%d').tolist()
print(f"[OK] NSE API: Found {len(symbols)} recent IPOs")
df_symbols = pd.DataFrame({
'symbol': symbols,
'company': companies,
'listing_date': dates
})
# No longer saving to CSV, using MongoDB exclusively
# MongoDB dual-write: upsert discovered IPOs
try:
from db import upsert_ipo, ensure_indexes
ensure_indexes()
for _, row in df_symbols.iterrows():
upsert_ipo(
symbol=row['symbol'],
listing_date=row['listing_date'],
name=row['company']
)
print(f"[MongoDB] Upserted {len(df_symbols)} IPO records")
except Exception as db_e:
print(f"[Warning] [MongoDB] IPO write FAILED (CSV write succeeded): {db_e}")
try:
from db import db_metrics
db_metrics["failures"] = db_metrics.get("failures", 0) + 1
except Exception:
pass
return df_symbols
else:
print("[Warning] NSE API: Could not find required columns")
raise Exception("Column mapping failed")
except Exception as e:
print(f"[Warning] NSE API attempt {attempt + 1} failed: {e}")
if attempt == 2: # Last attempt
print("[Error] All NSE API attempts failed")
break
else:
print("[Info] Retrying in 5 seconds...")
import time
time.sleep(5)
# Method 2: Fallback to MongoDB list
print("[Info] Falling back to MongoDB records...")
try:
from db import ipos_col
if ipos_col is not None:
docs = list(ipos_col.find({}, {"_id": 0, "symbol": 1, "company": 1, "listing_date": 1}))
if docs:
df_symbols = pd.DataFrame(docs)
print(f"[Info] MongoDB fallback: {len(df_symbols)} symbols")
return df_symbols
print("[Error] No valid MongoDB records found")
except Exception as db_fallback_error:
print(f"[Warning] MongoDB fallback failed: {db_fallback_error}")
print("[Info] Creating minimal fallback data...")
# Method 3: Create minimal fallback
fallback_symbols = [
'SWIGGY', 'BLACKBUCK', 'STALLION', 'BHARATSE',
'NATCAPSUQ', 'MOSCHIP', 'TRAVELFOOD', 'OCCLLTD', 'GARUDA',
'CEWATER', 'RACLGEAR', 'ORCHASP', 'OSWALPUMPS', 'IGIL',
'VIKRAN', 'AFCONS', 'MOBIKWIK', 'MASTERTR', 'JAINREC',
'DRAGARWQ', 'KOTIC', 'SCANSTL', 'IWP', 'NOVARTIND'
]
df_symbols = pd.DataFrame({
'symbol': fallback_symbols,
'company': [f"{sym} Ltd" for sym in fallback_symbols],
'listing_date': [datetime.now().strftime('%Y-%m-%d')] * len(fallback_symbols)
})
# Save to MongoDB
try:
from db import upsert_ipo
for _, row in df_symbols.iterrows():
upsert_ipo(row['symbol'], row['listing_date'], row['company'])
except:
pass
print(f"[Info] Created minimal fallback with {len(fallback_symbols)} symbols")
return df_symbols
except Exception as e:
print(f"[Error] fetch_recent_ipo_symbols failed: {e}")
return None