-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_github_routes.py
More file actions
335 lines (269 loc) · 12 KB
/
update_github_routes.py
File metadata and controls
335 lines (269 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python3
"""
Pritunl GitHub Routes Manager
This script:
1. Stops the Pritunl server (if running)
2. Gets all routes for the specified server
3. Removes existing GitHub routes (comments starting with "github-")
4. Fetches GitHub CIDR ranges from https://api.github.com/meta
5. Adds new routes for GitHub "web" and "git" ranges
6. Starts the Pritunl server
Usage:
python update_github_routes.py --server-id <server_id> --base-url <pritunl_url> --api-token <token> --api-secret <secret>
"""
import requests
import time
import uuid
import hmac
import hashlib
import base64
import json
import sys
import argparse
import logging
import os
from typing import Dict, List, Any, Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class PritunlAPIClient:
"""Pritunl API client for managing servers and routes"""
def __init__(self, base_url: str, api_token: str, api_secret: str):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.api_secret = api_secret
def _auth_request(self, method: str, path: str, headers: Optional[Dict] = None, data: Optional[str] = None) -> requests.Response:
"""Make authenticated request to Pritunl API"""
auth_timestamp = str(int(time.time()))
logger.debug(f"Client UTC timestamp: {auth_timestamp} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(auth_timestamp)))})")
auth_nonce = uuid.uuid4().hex
auth_string = '&'.join([
self.api_token,
auth_timestamp,
auth_nonce,
method.upper(),
path
])
# Create signature
if sys.version_info[0] < 3:
auth_signature = base64.b64encode(hmac.new(
self.api_secret, auth_string, hashlib.sha256).digest())
else:
auth_signature = base64.b64encode(hmac.new(
self.api_secret.encode(),
auth_string.encode(),
hashlib.sha256
).digest())
# Prepare headers
auth_headers = {
'Auth-Token': self.api_token,
'Auth-Timestamp': auth_timestamp,
'Auth-Nonce': auth_nonce,
'Auth-Signature': auth_signature,
}
if headers:
auth_headers.update(headers)
# Make request
response = getattr(requests, method.lower())(
self.base_url + path,
headers=auth_headers,
data=data
)
return response
def get_server_status(self, server_id: str) -> Dict[str, Any]:
"""Get server status"""
logger.info(f"Getting server status for {server_id}")
response = self._auth_request('GET', f'/server/{server_id}')
response.raise_for_status()
return response.json()
def stop_server(self, server_id: str) -> None:
"""Stop the Pritunl server"""
logger.info(f"Stopping server {server_id}")
response = self._auth_request('PUT', f'/server/{server_id}/operation/stop')
if response.status_code not in [200, 202]:
raise Exception(f"Failed to stop server: {response.status_code} - {response.text}")
# Wait for server to stop
self._wait_for_server_status(server_id, 'offline')
def start_server(self, server_id: str) -> None:
"""Start the Pritunl server"""
logger.info(f"Starting server {server_id}")
response = self._auth_request('PUT', f'/server/{server_id}/operation/start')
if response.status_code not in [200, 202]:
raise Exception(f"Failed to start server: {response.status_code} - {response.text}")
# Wait for server to start
self._wait_for_server_status(server_id, 'online')
def _wait_for_server_status(self, server_id: str, target_status: str, timeout: int = 60) -> None:
"""Wait for server to reach target status"""
start_time = time.time()
while time.time() - start_time < timeout:
server_info = self.get_server_status(server_id)
current_status = server_info.get('status', 'unknown')
logger.info(f"Server status: {current_status} (waiting for {target_status})")
if current_status == target_status:
logger.info(f"Server reached {target_status} status")
return
time.sleep(2)
raise Exception(f"Timeout waiting for server to reach {target_status} status")
def get_routes(self, server_id: str) -> List[Dict[str, Any]]:
"""Get all routes for the server (per-route API)"""
logger.info(f"Getting routes for server {server_id}")
response = self._auth_request('GET', f'/server/{server_id}/route')
response.raise_for_status()
return response.json()
def delete_route(self, server_id: str, route_id: str) -> None:
"""Delete a specific route (per-route API)"""
logger.info(f"Deleting route {route_id}")
response = self._auth_request('DELETE', f'/server/{server_id}/route/{route_id}')
if response.status_code not in [200, 204]:
logger.warning(f"Failed to delete route {route_id}: {response.status_code} - {response.text}")
def add_route(self, server_id: str, network: str, comment: str) -> None:
"""Add a single route to the server (per-route API)"""
logger.info(f"Adding route {network} ({comment}) to server {server_id}")
data = {"server": server_id, "network": network, "comment": comment, "nat": True}
response = self._auth_request(
'POST',
f'/server/{server_id}/route',
headers={'Content-Type': 'application/json'},
data=json.dumps(data)
)
if response.status_code not in [200, 201]:
logger.warning(f"Failed to add route {network}: {response.status_code} - {response.text}")
else:
logger.info(f"Successfully added route {network}")
class GitHubCIDRManager:
"""Manager for fetching and processing GitHub CIDR ranges"""
@staticmethod
def fetch_github_cidrs() -> Dict[str, List[str]]:
"""Fetch GitHub CIDR ranges from GitHub's meta API"""
logger.info("Fetching GitHub CIDR ranges from https://api.github.com/meta")
try:
response = requests.get('https://api.github.com/meta', timeout=30)
response.raise_for_status()
data = response.json()
# Extract web and git CIDR ranges
web_cidrs = data.get('web', [])
git_cidrs = data.get('git', [])
logger.info(f"Found {len(web_cidrs)} web CIDRs and {len(git_cidrs)} git CIDRs")
return {
'web': web_cidrs,
'git': git_cidrs
}
except Exception as e:
raise Exception(f"Failed to fetch GitHub CIDR ranges: {e}")
def filter_github_routes(routes: List[Dict[str, Any]]) -> tuple:
"""
Filter routes into GitHub routes and non-GitHub routes
Returns:
tuple: (github_routes, other_routes)
"""
github_routes = []
other_routes = []
for route in routes:
comment = route.get('comment', '') or ''
if comment.startswith('github-'):
github_routes.append(route)
else:
other_routes.append(route)
return github_routes, other_routes
def create_github_routes(github_cidrs: Dict[str, List[str]]) -> List[Dict[str, Any]]:
"""
Create Pritunl route objects from GitHub CIDR ranges
Args:
github_cidrs: Dictionary containing 'web' and 'git' CIDR lists
Returns:
List of route objects for Pritunl API
"""
routes = []
# Add web routes
for cidr in github_cidrs['web']:
routes.append({
'network': cidr,
'comment': 'github-web',
'nat': True
})
# Add git routes
for cidr in github_cidrs['git']:
routes.append({
'network': cidr,
'comment': 'github-git',
'nat': True
})
return routes
def main():
"""Main function"""
parser = argparse.ArgumentParser(description='Update GitHub routes in Pritunl')
parser.add_argument('--server-id', required=True, help='Pritunl server ID')
parser.add_argument('--base-url', required=True, help='Pritunl base URL (e.g., https://pritunl.example.com)')
parser.add_argument('--api-token', required=True, help='Pritunl API token')
parser.add_argument('--api-secret', required=True, help='Pritunl API secret')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes')
args = parser.parse_args()
try:
# Initialize API client
client = PritunlAPIClient(args.base_url, args.api_token, args.api_secret)
# Check initial server status
server_info = client.get_server_status(args.server_id)
initial_status = server_info.get('status', 'unknown')
logger.info(f"Initial server status: {initial_status}")
# Stop server if it's running
server_was_online = initial_status == 'online'
if server_was_online and not args.dry_run:
client.stop_server(args.server_id)
elif server_was_online:
logger.info("DRY RUN: Would stop server")
# Get current routes
current_routes = client.get_routes(args.server_id)
logger.info(f"Found {len(current_routes)} existing routes")
# Filter GitHub routes
github_routes, other_routes = filter_github_routes(current_routes)
logger.info(f"Found {len(github_routes)} GitHub routes to remove")
# Delete existing GitHub routes
if github_routes and not args.dry_run:
logger.info("Removing existing GitHub routes...")
for route in github_routes:
route_id = route.get('id')
if route_id:
client.delete_route(args.server_id, route_id)
elif github_routes:
logger.info(f"DRY RUN: Would remove {len(github_routes)} GitHub routes")
# Fetch new GitHub CIDR ranges
github_cidrs = GitHubCIDRManager.fetch_github_cidrs()
# Create new GitHub routes
new_routes = create_github_routes(github_cidrs)
logger.info(f"Created {len(new_routes)} new GitHub routes")
# Add new GitHub routes (one by one)
if new_routes and not args.dry_run:
for route in new_routes:
client.add_route(args.server_id, route['network'], route['comment'])
elif new_routes:
logger.info(f"DRY RUN: Would add {len(new_routes)} new GitHub routes")
for route in new_routes[:5]: # Show first 5 as example
logger.info(f" - {route['network']} ({route['comment']})")
if len(new_routes) > 5:
logger.info(f" ... and {len(new_routes) - 5} more")
# Start server if it was originally online
if server_was_online and not args.dry_run:
client.start_server(args.server_id)
elif server_was_online:
logger.info("DRY RUN: Would start server")
logger.info("GitHub routes update completed successfully!")
# Summary
logger.info("Summary:")
logger.info(f" - Removed {len(github_routes)} old GitHub routes")
logger.info(f" - Added {len(new_routes)} new GitHub routes")
logger.info(f" - {len(github_cidrs['web'])} web routes")
logger.info(f" - {len(github_cidrs['git'])} git routes")
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == '__main__':
# Set UTC timezone
os.environ['TZ'] = 'UTC'
try:
time.tzset()
except AttributeError:
pass # Windows does not support tzset
main()