-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsse_server.py
More file actions
283 lines (233 loc) · 10.7 KB
/
sse_server.py
File metadata and controls
283 lines (233 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
SSE Server Module
Broadcasts messages to connected frontend clients in real-time using Server-Sent Events
"""
from flask import Flask, Response, request
from flask_cors import CORS
import json
import logging
import time
from datetime import datetime
from threading import Thread, Event, Lock
import queue
logger = logging.getLogger(__name__)
class SSEServer:
"""Handles SSE connections and message broadcasting"""
def __init__(self, host, port, max_history_messages=100, decode_handler=None,
api_keys=None, jwt_secret=None, jwt_cookie_name='datalink_session'):
"""
Initialize SSE server
Args:
host (str): IP address to bind to
port (int): Port to listen on
max_history_messages (int): Maximum history messages to keep in backend
(Frontend decides how many to display)
decode_handler: DecodeHandler instance for ACARS decoding
api_keys (iterable[str] | None): Static keys accepted via X-API-Key header.
jwt_secret (str | None): HS256 secret for verifying browser JWT cookies.
jwt_cookie_name (str): Cookie name carrying the JWT.
"""
self.host = host
self.port = port
self.max_history_messages = max_history_messages
self.decode_handler = decode_handler
self.clients = {} # Dictionary to store client queues
self.recent_messages = []
self.app = Flask(__name__)
CORS(self.app, supports_credentials=True) # Enable CORS; allow cookies cross-site
self.server = None
self.thread = None
self.running = Event()
self.client_lock = Lock()
self.client_id_counter = 0
self.tcp_status_callback = None # Callback to get TCP connection status
# Setup Flask routes
self._setup_routes()
# Install connection-level auth (X-API-Key header or JWT cookie)
if api_keys or jwt_secret:
from auth_helper import make_auth_validator
self.app.before_request(make_auth_validator(
api_keys=api_keys,
jwt_secret=jwt_secret,
jwt_cookie_name=jwt_cookie_name,
exempt_paths={'/health'},
))
def _setup_routes(self):
"""Setup Flask routes"""
@self.app.route('/stream')
def stream():
"""SSE endpoint for streaming messages"""
# CORS headers are added by flask_cors (with supports_credentials=True);
# do not hardcode Access-Control-* here — '*' would conflict with credentials.
return Response(
self._event_stream(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', # Disable nginx buffering
}
)
@self.app.route('/health')
def health():
"""Health check endpoint"""
# TCP connection status (if tcp_status_callback is set)
tcp_status = 'unknown'
try:
if hasattr(self, 'tcp_status_callback') and self.tcp_status_callback:
tcp_status = self.tcp_status_callback()
logger.debug(f"TCP status from callback: {tcp_status}")
else:
logger.warning(f"tcp_status_callback not set or None")
except Exception as e:
logger.error(f"Error calling tcp_status_callback: {e}", exc_info=True)
tcp_status = 'error'
return {
'status': 'ok',
'clients': len(self.clients),
'recent_messages': len(self.recent_messages),
'tcp_status': tcp_status
}
@self.app.route('/decode', methods=['POST', 'OPTIONS'])
def decode():
"""Decode ACARS message endpoint"""
if request.method == 'OPTIONS':
return '', 200
try:
data = request.get_json()
label = data.get('label', '')
text = data.get('text', '')
if not label or not text:
return {'error': 'Label and text required'}, 400
if not self.decode_handler or not self.decode_handler.initialized:
return {'error': 'Decoder not available'}, 503
if not self.decode_handler.is_decodable(label):
return {'decodable': False, 'decoded': None}
decoded = self.decode_handler.decode_message(label, text)
return {'decodable': True, 'decoded': decoded}
except Exception as e:
logger.error(f"Decode endpoint error: {e}")
return {'error': str(e)}, 500
def _event_stream(self):
"""
Generator function for SSE stream
Sends messages to connected client
"""
# Get client IP and store it immediately before request context is lost
try:
client_ip = str(request.remote_addr) if request and hasattr(request, 'remote_addr') else 'unknown'
except (RuntimeError, AttributeError):
client_ip = 'unknown'
# Create a queue for this client
client_queue = queue.Queue(maxsize=100)
# Initialize client_id to None to avoid reference errors
client_id = None
try:
with self.client_lock:
self.client_id_counter += 1
client_id = self.client_id_counter
self.clients[client_id] = client_queue
logger.info(f"New SSE client connected: {client_ip} (ID: {client_id})")
# İLK ÖNCE heartbeat gönder ki browser hemen bağlandığını anlasın
yield f": heartbeat\n\n"
# Geçmiş mesajları GÖNDERME - Sadece canlı mesajlar!
logger.info(f"Client {client_id} ready for live messages")
last_heartbeat = time.time()
# Keep connection alive and send messages
while self.running.is_set():
try:
# Get message from queue with timeout
message = client_queue.get(timeout=1.0)
yield f"data: {json.dumps(message)}\n\n"
except queue.Empty:
# Send heartbeat every 15 seconds to keep connection alive
current_time = time.time()
if current_time - last_heartbeat >= 15:
yield f": heartbeat\n\n"
last_heartbeat = current_time
except GeneratorExit:
logger.info(f"Client disconnected: {client_ip} (ID: {client_id})")
except Exception as e:
logger.error(f"Error in event stream for client {client_ip} (ID: {client_id}): {e}")
finally:
# Remove client from the dictionary
if client_id is not None:
with self.client_lock:
if client_id in self.clients:
del self.clients[client_id]
def start(self):
"""Start SSE server in a separate thread"""
if self.thread and self.thread.is_alive():
logger.warning("SSE server is already running")
return False
try:
self.running.set()
self.thread = Thread(target=self._run_server, daemon=True)
self.thread.start()
# Wait for the server to start
time.sleep(1)
logger.info(f"SSE server started on http://{self.host}:{self.port}/stream")
return True
except Exception as e:
logger.error(f"Error starting SSE server: {e}")
return False
def _run_server(self):
"""Run SSE server (runs in separate thread)"""
try:
# Disable Flask request logging to reduce noise
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
logger.info("SSE server is running and accepting connections")
self.app.run(host=self.host, port=self.port, threaded=True)
except Exception as e:
logger.error(f"SSE server error: {e}")
def broadcast_message(self, message_data):
"""
Broadcast a message to all connected clients
Args:
message_data (dict): Message data to broadcast
"""
if not self.running.is_set():
logger.warning("Server is not running, cannot broadcast message")
return
# Add to recent messages
self.recent_messages.append(message_data)
# Keep only the last max_history_messages in backend
if len(self.recent_messages) > self.max_history_messages:
self.recent_messages = self.recent_messages[-self.max_history_messages:]
# Prepare message
message = {
'type': 'message',
'data': message_data
}
# Broadcast to all connected clients
with self.client_lock:
disconnected_clients = []
for client_id, client_queue in self.clients.items():
try:
# Try to add message to queue without blocking
client_queue.put_nowait(message)
except queue.Full:
logger.warning(f"Client {client_id} queue is full, marking for disconnect")
disconnected_clients.append(client_id)
except Exception as e:
logger.error(f"Error sending to client {client_id}: {e}")
disconnected_clients.append(client_id)
# Remove disconnected clients
for client_id in disconnected_clients:
if client_id in self.clients:
del self.clients[client_id]
def stop(self):
"""Stop the SSE server"""
logger.info("Stopping SSE server...")
self.running.clear()
# Clear all client queues
with self.client_lock:
self.clients.clear()
if self.thread and self.thread.is_alive():
self.thread.join(timeout=3.0)
logger.info("SSE server stopped")
def get_client_count(self):
"""Get number of connected clients"""
with self.client_lock:
return len(self.clients)