-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicrosoft_proxies.py
More file actions
210 lines (177 loc) · 7.34 KB
/
microsoft_proxies.py
File metadata and controls
210 lines (177 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# microsoft_proxies.py
"""
A Python utility to maintain a local database of Microsoft Azure Service Tag IP prefixes and ASNs,
and to verify IP addresses against that database.
Usage:
# Install dependencies
pip install -r requirements.txt
# Update or create the database
python microsoft_proxies.py --update
# Export CSV files
python microsoft_proxies.py --export
# Check IP addresses
python microsoft_proxies.py --check-ip <ip1> <ip2> ...
"""
import requests
import re
import sqlite3
import ipaddress
from ipwhois import IPWhois
import argparse
import csv
import os
# Microsoft download page for Azure Service Tags
DOWNLOAD_PAGE = "https://www.microsoft.com/en-us/download/details.aspx?id=56519"
def fetch_json_url():
"""
Scrape the Microsoft download page to find the URL of the latest
ServiceTags_Public JSON file—even if it’s buried inside JavaScript.
"""
# 1) Send a realistic browser User-Agent so Microsoft returns the full page
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36"
)
}
resp = requests.get(DOWNLOAD_PAGE, headers=headers)
resp.raise_for_status()
# 2) Match any URL that looks like:
# https://download.microsoft.com/download/.../ServiceTags_Public_YYYYMMDD.json
pattern = (
r"https://download\.microsoft\.com/download/[0-9A-Fa-f\-/]+/"
r"ServiceTags_Public_[0-9]{8}\.json"
)
match = re.search(pattern, resp.text)
# 3) If we didn’t find it, raise an error
if not match:
raise RuntimeError("Could not find JSON download link on Microsoft page.")
# 4) Return the full JSON URL
return match.group(0)
def update_database(db_path="microsoft_services.db"):
"""
Download the latest ServiceTags JSON, parse prefixes, discover ASNs in parallel,
and populate a local SQLite database.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("CREATE TABLE tags (tag TEXT, prefix TEXT)")
c.execute("CREATE TABLE asns (asn INTEGER UNIQUE)")
print("Downloading ServiceTags JSON...")
json_url = fetch_json_url()
resp = requests.get(json_url)
data = resp.json()
print("Inserting tags and prefixes into database...")
for entry in data.get('values', []):
tag = entry.get('name')
for prefix in entry.get('properties', {}).get('addressPrefixes', []):
c.execute("INSERT INTO tags VALUES (?, ?)", (tag, prefix))
prefixes = {row[0] for row in c.execute("SELECT DISTINCT prefix FROM tags")}
print(f"Performing parallel ASN lookups for {len(prefixes)} prefixes...")
def lookup_asn(prefix):
ip = prefix.split('/')[0]
try:
obj = IPWhois(ip)
res = obj.lookup_rdap(asn_methods=['dns', 'whois'])
asn = int(res.get('asn', 0))
return asn if asn else None
except Exception:
return None
asn_set = set()
with ThreadPoolExecutor(max_workers=20) as executor: # Reduced to 20 to avoid rate-limiting (IP banning)
future_to_prefix = {executor.submit(lookup_asn, p): p for p in prefixes}
for i, future in enumerate(as_completed(future_to_prefix), 1):
prefix = future_to_prefix[future]
try:
asn = future.result()
if asn:
asn_set.add(asn)
print(f"[{i}/{len(prefixes)}] Processed {prefix}", end="\r")
except Exception as e:
print(f"[{i}/{len(prefixes)}] Error processing {prefix}: {e}", end="\r")
print("\nASN lookups complete. Writing to database...")
for asn in asn_set:
c.execute("INSERT OR IGNORE INTO asns VALUES(?)", (asn,))
conn.commit()
conn.close()
print("Database updated successfully.")
def export_csv(db_path="microsoft_services.db"):
"""
Export tags/prefixes and ASNs to separate CSV files.
"""
if not os.path.exists(db_path):
print(f"Error: Database '{db_path}' not found. Please run with --update first.")
return False
conn = sqlite3.connect(db_path)
c = conn.cursor()
with open("microsoft_trusted_prefixes.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["tag", "prefix"])
for tag, prefix in c.execute("SELECT tag, prefix FROM tags"):
writer.writerow([tag, prefix])
with open("microsoft_asns.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["asn"])
for (asn,) in c.execute("SELECT asn FROM asns"):
writer.writerow([asn])
conn.close()
return True
def check_ips(ips, db_path="microsoft_services.db"):
"""
Check a list of IP addresses against the database and return results.
"""
if not os.path.exists(db_path):
print(f"Error: Database '{db_path}' not found. Please run with --update first.")
return []
conn = sqlite3.connect(db_path)
c = conn.cursor()
results = []
for ip_str in ips:
info = {"ip": ip_str, "trusted": False, "details": []}
try:
ip_obj = ipaddress.ip_address(ip_str)
for tag, prefix in c.execute("SELECT tag, prefix FROM tags"):
if ip_obj in ipaddress.ip_network(prefix):
info["trusted"] = True
info["details"].append(f"{prefix} ({tag})")
break
if not info["trusted"]:
rdap = IPWhois(ip_str).lookup_rdap(asn_methods=['dns','whois'])
asn = int(rdap.get('asn', 0))
# Check if ASN exists in database efficiently
if asn:
c.execute("SELECT 1 FROM asns WHERE asn=?", (asn,))
if c.fetchone():
info["trusted"] = True
info["details"].append(f"ASN {asn}")
except Exception as e:
info["details"].append(f"Error: {e}")
results.append(info)
conn.close()
return results
def main():
parser = argparse.ArgumentParser(description="Manage and query Microsoft proxy database")
parser.add_argument("--update", action="store_true", help="Fetch latest Service Tags & update database")
parser.add_argument("--export", action="store_true", help="Export prefixes and ASNs to CSV files")
parser.add_argument("--check-ip", nargs="+", help="Check one or more IP addresses")
args = parser.parse_args()
if args.update:
update_database()
print("Database updated.")
if args.export:
if export_csv():
print("CSV files generated: microsoft_trusted_prefixes.csv, microsoft_asns.csv")
if args.check_ip:
results = check_ips(args.check_ip)
for r in results:
status = "Trusted ✅" if r["trusted"] else "Untrusted ❌"
print(f"{r['ip']}: {status}")
if r["details"]:
print(" Details:", ", ".join(r["details"]))
if __name__ == "__main__":
main()