-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
2608 lines (2177 loc) · 99.8 KB
/
Copy pathapp.py
File metadata and controls
2608 lines (2177 loc) · 99.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Flask application – Aircraft Finder.
Features:
- Flask-Login session auth for the web admin panel
- Bearer-token (API key) auth for the JSON REST API
- Role-based access: admin, manager, viewer
- Scoped CRUD: users see only their assigned museums/countries
- Full CRUD on aircraft, museums, and exhibit links
- Public read-only search + proximity endpoints
- International museum support (optional coordinates)
- Structured logging: auth, changes, access
"""
import csv
import io
import json
import re
import secrets
from datetime import datetime, timedelta, timezone
from functools import wraps
from urllib.parse import urlparse, urljoin
from flask import (
Flask, render_template, request, jsonify,
redirect, url_for, flash, abort, g, session,
)
from flask_login import (
LoginManager, login_user, logout_user,
login_required, current_user,
)
from flask_wtf.csrf import CSRFProtect, CSRFError
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from sqlalchemy import or_, and_, func
from sqlalchemy.orm import joinedload, selectinload, contains_eager
from models import (
db, User, ApiKey,
Museum, Aircraft, AircraftAlias, AircraftMuseum, ZipCode, haversine,
UserMuseumAssignment, UserCountryAssignment,
AircraftTemplate, AircraftTemplateAlias,
)
from geocoder import resolve_location
from config import Config
from logger import auth_log, change_log, access_log
# ══════════════════════════════════════════════
# App factory
# ══════════════════════════════════════════════
class _BearerAwareCSRF(CSRFProtect):
"""CSRF protection that skips validation for requests authenticated via
Authorization: Bearer header.
Bearer-token auth is not vulnerable to CSRF because browsers do not attach
the header automatically — possession of the token IS the authorization.
Session (cookie) requests still go through the full CSRF flow; session AJAX
from the admin UI includes the X-CSRFToken header set in base.html.
The previous implementation mutated the view function's ``__dict__`` to mark
it exempt after the first Bearer request, which leaked that exemption to
every subsequent caller — including unauthenticated ones.
"""
def protect(self):
if request.headers.get("Authorization", "").startswith("Bearer "):
return
return super().protect()
csrf = _BearerAwareCSRF()
limiter = Limiter(key_func=get_remote_address, default_limits=[])
_DEFAULT_SECRET_KEY = "change-me-in-production"
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# Refuse to boot in production with the placeholder SECRET_KEY. Sessions
# are signed with this key — a known value means anyone can forge a
# session cookie and impersonate any user.
if Config.SECRET_KEY == _DEFAULT_SECRET_KEY:
if not Config.SERVER_DEBUG:
raise RuntimeError(
"SECRET_KEY is the default placeholder. Set the SECRET_KEY env var "
"(or [app] secret_key in web.config) before running outside debug mode."
)
# Dev mode: still loud about it.
import logging as _logging
_logging.warning("SECRET_KEY is the default placeholder. OK in debug, never in prod.")
db.init_app(app)
csrf.init_app(app)
limiter.init_app(app)
@app.errorhandler(CSRFError)
def _handle_csrf_error(e):
return jsonify({"error": "CSRF validation failed. Reload the page and try again."}), 400
login_manager = LoginManager()
login_manager.login_view = "login_page"
login_manager.login_message_category = "warning"
login_manager.init_app(app)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
return app
app = create_app()
# ══════════════════════════════════════════════
# Request logging middleware
# ══════════════════════════════════════════════
@app.before_request
def _log_request():
"""Stash request start time; identify user for logging."""
g.request_start = datetime.now(timezone.utc)
g.log_user = "anonymous"
if current_user.is_authenticated:
g.log_user = current_user.username
@app.after_request
def _log_response(response):
"""Log every request to the appropriate log file."""
user = getattr(g, "log_user", "anonymous")
method = request.method
path = request.path
status = response.status_code
ip = request.remote_addr
line = f"{ip} {user} {method} {path} → {status}"
if method == "GET":
access_log.info(line)
elif method in ("POST", "PUT", "PATCH", "DELETE"):
change_log.info(line)
return response
# ══════════════════════════════════════════════
# Session timeout
# ══════════════════════════════════════════════
# Endpoints that must keep working even when an idle session has just been
# timed out — otherwise we can't render the login page or process the logout.
_TIMEOUT_EXEMPT_ENDPOINTS = {
"login_page", "logout", "register_page", "static",
"desktop_only_page",
}
def _idle_timeout_for(user):
"""Idle-timeout in seconds for ``user``'s role."""
if user.is_admin:
return Config.SESSION_IDLE_TIMEOUT_ADMIN
if user.is_manager:
return Config.SESSION_IDLE_TIMEOUT_MANAGER
return Config.SESSION_IDLE_TIMEOUT_VIEWER
def _full_logout():
"""Tear the session down completely AND make sure the remember-me cookie
actually gets cleared on the response.
Naive version is broken: ``logout_user(); session.clear()`` wipes the
``_remember = 'clear'`` flag that Flask-Login sets to signal cookie
deletion, so the long-lived remember cookie survives and on the very
next request Flask-Login auto-re-authenticates the user. (We hit this
on the explicit /logout route AND on the timeout middleware — both
routes need the same dance.)
We capture _remember and _remember_seconds before the clear and put
them back, so Flask-Login's response handler can still issue the
expiring Set-Cookie header.
"""
logout_user()
remember_signal = session.get("_remember")
remember_seconds = session.get("_remember_seconds")
session.clear()
if remember_signal is not None:
session["_remember"] = remember_signal
if remember_seconds is not None:
session["_remember_seconds"] = remember_seconds
@app.before_request
def _enforce_session_timeout():
"""Log out an authenticated user whose session is idle or too old.
Called on every request. Two conditions can trigger logout:
- Idle: more than ``role idle timeout`` since the last activity.
- Absolute: more than SESSION_ABSOLUTE_TIMEOUT since login_time.
Either condition flashes a message and redirects to /login. We update
last_activity at the END of this hook so each request resets the idle
timer (even unauthenticated ones — harmless).
"""
# Skip for endpoints that need to work even when the session just expired
# (login page, logout endpoint, static files, the desktop-only page).
if request.endpoint in _TIMEOUT_EXEMPT_ENDPOINTS:
return None
if not current_user.is_authenticated:
# Anonymous: just refresh last_activity so any subsequent login starts
# with a clean clock; nothing to enforce.
session["last_activity"] = datetime.now(timezone.utc).isoformat()
return None
now = datetime.now(timezone.utc)
login_time_iso = session.get("login_time")
last_activity_iso = session.get("last_activity")
# Defensive: if either value is missing (e.g. the user is on a session
# cookie issued before this code shipped), seed both and continue.
if not login_time_iso or not last_activity_iso:
session["login_time"] = now.isoformat()
session["last_activity"] = now.isoformat()
return None
login_time = datetime.fromisoformat(login_time_iso)
last_activity = datetime.fromisoformat(last_activity_iso)
# Absolute timeout: from login, regardless of activity.
if (now - login_time).total_seconds() > Config.SESSION_ABSOLUTE_TIMEOUT:
username = current_user.username
_full_logout()
auth_log.info(f"SESSION_TIMEOUT_ABSOLUTE user={username} ip={request.remote_addr}")
flash("Your session has expired. Please sign in again.", "warning")
return redirect(url_for("login_page"))
# Idle timeout: per-role.
idle_limit = _idle_timeout_for(current_user)
if (now - last_activity).total_seconds() > idle_limit:
username = current_user.username
_full_logout()
auth_log.info(f"SESSION_TIMEOUT_IDLE user={username} ip={request.remote_addr}")
flash("You have been signed out due to inactivity.", "warning")
return redirect(url_for("login_page"))
# Activity recorded — bump the idle clock.
session["last_activity"] = now.isoformat()
return None
# ══════════════════════════════════════════════
# Password policy
# ══════════════════════════════════════════════
def _validate_password_strength(password):
"""Validate ``password`` against the configured policy.
Returns ``None`` if valid; otherwise returns a human-readable error
string suitable for showing to the user.
"""
if password is None:
return "Password is required."
if len(password) < Config.PASSWORD_MIN_LENGTH:
return f"Password must be at least {Config.PASSWORD_MIN_LENGTH} characters."
if Config.PASSWORD_REQUIRE_MIXED:
has_letter = any(c.isalpha() for c in password)
has_digit = any(c.isdigit() for c in password)
if not (has_letter and has_digit):
return "Password must contain at least one letter and one digit."
return None
# ══════════════════════════════════════════════
# Security headers + safe-redirect helper
# ══════════════════════════════════════════════
# Per-request CSP nonce. Generated in _generate_csp_nonce (before_request)
# and exposed to templates via the _csp_nonce context processor. Inline
# <script nonce="{{ csp_nonce }}"> tags execute; anything injected via XSS
# will not have the nonce and will be blocked by the browser.
def _build_csp(nonce):
"""Construct the Content-Security-Policy header value for THIS request.
Built per-request because the script-src directive embeds a fresh nonce
each time. The other directives are static.
Notes on the rules:
default-src 'self' only load resources from our origin
unless explicitly allowed below
script-src our origin + cdnjs (jQuery, Three.js,
Font Awesome) + a per-request nonce
that legitimate inline <script> tags
carry. NO 'unsafe-inline' — that's the
whole point of the nonce.
style-src still has 'unsafe-inline' because the
templates have many inline `style=`
attributes and inline <style> blocks.
Lower XSS risk than script.
font-src Google Fonts and Font Awesome.
img-src 'self' data: https: museum/site images from any HTTPS origin.
connect-src 'self' + jsDelivr (world-borders GeoJSON).
Tight allowlist — not 'https:' — so an
XSS payload can't exfiltrate to any host.
frame-ancestors 'none' no one can iframe us (clickjacking).
base-uri 'self' block <base href="evil"> tricks.
form-action 'self' POSTs only go to us.
object-src 'none' <object>/<embed> are blocked entirely.
"""
return (
f"default-src 'self'; "
f"script-src 'nonce-{nonce}' 'self' https://cdnjs.cloudflare.com; "
f"style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://fonts.googleapis.com; "
f"font-src 'self' https://cdnjs.cloudflare.com https://fonts.gstatic.com data:; "
f"img-src 'self' data: https:; "
f"connect-src 'self' https://cdn.jsdelivr.net; "
f"frame-ancestors 'none'; "
f"base-uri 'self'; "
f"form-action 'self'; "
f"object-src 'none'"
)
@app.before_request
def _generate_csp_nonce():
"""Mint a fresh nonce per request. Stored on flask.g so the after_request
handler and the template context processor see the same value within
one request lifecycle. 16 bytes = 128 bits of entropy, base64 url-safe.
"""
g.csp_nonce = secrets.token_urlsafe(16)
@app.context_processor
def _csp_nonce():
"""Expose ``csp_nonce`` to every Jinja template — used as
``<script nonce="{{ csp_nonce }}">`` on every legitimate inline script."""
return {"csp_nonce": getattr(g, "csp_nonce", "")}
@app.after_request
def _add_security_headers(response):
"""Apply standard security response headers.
All gated by SECURITY_HEADERS_ENABLED so local HTTP dev isn't disrupted
(HSTS over HTTP is wasted; some browsers also bicker about it).
"""
if not Config.SECURITY_HEADERS_ENABLED:
return response
# Don't ever let the page be framed (clickjacking + UI-redress attacks).
response.headers.setdefault("X-Frame-Options", "DENY")
# Defense against MIME-type sniffing.
response.headers.setdefault("X-Content-Type-Options", "nosniff")
# Don't leak full URLs as Referer to third-party origins.
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
# Block APIs we don't use; tightens the surface against compromised JS.
response.headers.setdefault(
"Permissions-Policy",
"geolocation=(), camera=(), microphone=(), payment=(), usb=()",
)
# CSP is per-request — the script-src directive embeds the nonce so
# legitimate inline <script> blocks can execute while injected ones can't.
response.headers.setdefault(
"Content-Security-Policy",
_build_csp(getattr(g, "csp_nonce", "")),
)
# HSTS only makes sense on a secure connection. Set max-age 6 months,
# preload-eligible. Trust X-Forwarded-Proto if a reverse proxy is
# terminating TLS in front of us.
is_secure = request.is_secure or (
request.headers.get("X-Forwarded-Proto", "").lower() == "https"
)
if is_secure:
response.headers.setdefault(
"Strict-Transport-Security",
"max-age=15552000; includeSubDomains",
)
return response
def _safe_next_url(target):
"""Return ``target`` if it's a same-origin URL, else None.
Mitigates open-redirect via ``?next=https://evil.com``. Allows relative
paths (most common case) and absolute URLs whose netloc matches this
request's host. Anything else gets dropped — caller falls back to a
safe default.
"""
if not target:
return None
# urljoin resolves a relative target against the current URL; absolute
# targets pass through unchanged. urlparse then lets us inspect them.
test_url = urlparse(urljoin(request.host_url, target))
ref_url = urlparse(request.host_url)
same_scheme = test_url.scheme in ("http", "https")
same_host = test_url.netloc == ref_url.netloc
if same_scheme and same_host:
# Return the original (so a caller-supplied relative URL stays
# relative when redirected — preserves clean URLs).
return target
return None
# ══════════════════════════════════════════════
# Mobile dispatch
# ══════════════════════════════════════════════
# A small regex covering the User-Agent strings of phones and small tablets.
# Intentionally conservative — better to render desktop for an ambiguous UA
# (an unknown bot, e.g.) than to ship a half-broken mobile layout. Users on a
# desktop browser in mobile-emulation mode trip this regex too, which is fine
# for testing.
_MOBILE_UA_RE = re.compile(
r"(iPhone|iPod|Android.*Mobile|BlackBerry|IEMobile|Opera Mini|"
r"Mobile Safari|Windows Phone|webOS)",
re.IGNORECASE,
)
def _is_mobile_request():
"""True if the current request looks like it's from a phone.
Honors a per-session override: visiting any URL with ``?desktop=1`` flips
a session flag that forces the desktop layout for the rest of the session.
Visiting ``?desktop=0`` clears the override. The override exists so users
can request the full site from a phone (and so devs can verify mobile
routing without spoofing UA).
"""
if request.args.get("desktop") == "1":
session["force_desktop"] = True
elif request.args.get("desktop") == "0":
session.pop("force_desktop", None)
if session.get("force_desktop"):
return False
ua = request.headers.get("User-Agent", "")
return bool(_MOBILE_UA_RE.search(ua))
@app.before_request
def _detect_mobile():
"""Stash is_mobile on flask.g so route handlers and templates can branch."""
g.is_mobile = _is_mobile_request()
@app.context_processor
def _inject_mobile_flag():
"""Make ``is_mobile`` available inside every Jinja template."""
return {"is_mobile": getattr(g, "is_mobile", False)}
def mobile_render(name, **context):
"""Render ``mobile/<name>`` for phone clients, else the desktop ``<name>``.
Used by every public-facing web route. Admin routes use ``@no_mobile``
instead, which redirects to /desktop-only — admin is desktop-only by
product decision.
"""
if getattr(g, "is_mobile", False):
return render_template(f"mobile/{name}", **context)
return render_template(name, **context)
def no_mobile(fn):
"""Decorator: redirect mobile callers to /desktop-only.
Apply to every admin route. Mobile users get a friendly page explaining
that admin tools require a larger screen, with a one-click link to
request the desktop layout for the rest of their session.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
if getattr(g, "is_mobile", False):
return redirect(url_for("desktop_only_page"))
return fn(*args, **kwargs)
return wrapper
@app.route("/desktop-only")
def desktop_only_page():
"""Mobile-only landing page shown when a phone hits an admin route."""
return render_template("mobile/desktop_only.html")
# ══════════════════════════════════════════════
# API-key authentication decorator
# ══════════════════════════════════════════════
_LAST_USED_THROTTLE_SECONDS = 60
def _get_api_user():
"""Return (User, ApiKey) from the Authorization header, or (None, None)."""
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
raw_key = auth[7:].strip()
api_key = ApiKey.lookup(raw_key)
if api_key:
# Throttle last_used writes: only update if the previous value is
# missing or stale. This keeps the auth-check path read-only on hot
# traffic instead of committing a row on every request.
now = datetime.now(timezone.utc)
prev = api_key.last_used
if prev is not None and prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
if prev is None or (now - prev).total_seconds() > _LAST_USED_THROTTLE_SECONDS:
api_key.last_used = now
db.session.commit()
return api_key.user, api_key
return None, None
def api_auth_required(min_permission="read"):
"""Decorator: require a valid API key with at least *min_permission*.
Permission levels: read < readwrite < admin
Also accepts a logged-in web session as a fallback.
"""
levels = {"read": 0, "readwrite": 1, "admin": 2}
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
user, api_key = _get_api_user()
# Fallback: logged-in web session
if user is None and current_user.is_authenticated:
user = current_user
# Map web session role to permission level. The 'admin'
# permission level here means "can perform admin-level data
# operations" — both 'admin' and 'aircraft_admin' qualify.
# User-management endpoints use the separate @admin_required
# decorator, which strictly checks is_admin (role == 'admin').
if user.is_data_admin:
perm_level = 2
elif user.is_manager:
perm_level = 1
else:
perm_level = 0
elif api_key:
perm_level = levels.get(api_key.permissions, 0)
else:
return jsonify({"error": "Authentication required. Supply 'Authorization: Bearer <api_key>' header."}), 401
if perm_level < levels.get(min_permission, 0):
return jsonify({"error": f"Insufficient permissions. Requires '{min_permission}'."}), 403
# Store the resolved user on g for scope checks
g.api_user = user
return fn(*args, **kwargs)
return wrapper
return decorator
def admin_required(fn):
"""Decorator: require admin role (web session only)."""
@wraps(fn)
@login_required
def wrapper(*args, **kwargs):
if not current_user.is_admin:
abort(403)
return fn(*args, **kwargs)
return wrapper
def _get_effective_user():
"""Return the authenticated user from either API key or web session."""
return getattr(g, "api_user", current_user if current_user.is_authenticated else None)
def _increment_contribution():
"""Bump the contribution counter for the effective user."""
user = _get_effective_user()
if user:
user.contribution_count = (user.contribution_count or 0) + 1
def _user_can_write_museum(museum_id):
"""Check if the current user has write access to a museum (by ID)."""
user = _get_effective_user()
if not user:
return False
if user.is_admin:
return True
museum = Museum.query.get(museum_id)
if not museum:
return False
return user.can_access_museum(museum)
# ══════════════════════════════════════════════
# Web page routes (public)
# ══════════════════════════════════════════════
@app.route("/")
def index():
return mobile_render("index.html")
@app.route("/aircraft")
def aircraft_page():
return mobile_render("aircraft.html")
@app.route("/museums")
def museums_page():
return mobile_render("museums.html")
@app.route("/aircraft/<int:aircraft_id>")
def aircraft_detail_page(aircraft_id):
"""Aircraft detail page.
Mobile: renders a dedicated detail template.
Desktop: the list page handles detail in-place — redirect there with a
``focus`` query param so the existing JS can highlight/scroll to the row.
"""
if getattr(g, "is_mobile", False):
return render_template("mobile/aircraft_detail.html", aircraft_id=aircraft_id)
return redirect(url_for("aircraft_page") + f"?focus={aircraft_id}")
@app.route("/museums/<int:museum_id>")
def museum_detail_page(museum_id):
"""Museum detail page — same dispatch pattern as aircraft_detail_page."""
if getattr(g, "is_mobile", False):
return render_template("mobile/museum_detail.html", museum_id=museum_id)
return redirect(url_for("museums_page") + f"?focus={museum_id}")
# ══════════════════════════════════════════════
# Auth: login / logout / register
# ══════════════════════════════════════════════
# Rate-limit login by username AND by IP. The username key blocks an
# attacker spreading attempts across a botnet against one account, while
# the IP key blocks a single client trying many usernames.
def _login_username_key():
return (request.form.get("username") or "").strip().lower() or get_remote_address()
@app.route("/login", methods=["GET", "POST"])
@limiter.limit("5 per minute", methods=["POST"])
@limiter.limit("10 per hour", key_func=_login_username_key, methods=["POST"])
def login_page():
if current_user.is_authenticated:
return redirect(url_for("admin_page"))
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
user = User.query.filter_by(username=username).first()
# Lockout check runs before the password check so a locked attacker
# learns nothing about whether their password is correct, AND so a
# legitimate user with a forgotten password sees the lockout
# message instead of repeatedly hitting "wrong password" and
# extending their own lockout.
if user and user.is_locked:
remaining = user.lockout_seconds_remaining()
mins = max(1, remaining // 60)
auth_log.warning(
f"LOGIN_BLOCKED_LOCKED user={username} ip={request.remote_addr} "
f"remaining={remaining}s"
)
flash(f"Account temporarily locked. Try again in {mins} minute(s).", "error")
return mobile_render("login.html")
if user and user.check_password(password) and user.is_active:
user.last_login = datetime.now(timezone.utc)
user.last_login_ip = request.headers.get("X-Forwarded-For", request.remote_addr)
user.reset_failed_logins()
db.session.commit()
# Session-fixation defense: clear any pre-existing session before
# we mark the user as authenticated, so an attacker who planted
# a session ID via an XSS or shared-link vector can't elevate it
# by waiting for the victim to log in. Flask's signed-session
# implementation derives the cookie from session content, so a
# cleared-then-rebuilt session effectively rotates the cookie.
session.clear()
login_user(user, remember=True)
# Stamp absolute and idle clocks for the timeout middleware.
now_iso = datetime.now(timezone.utc).isoformat()
session["login_time"] = now_iso
session["last_activity"] = now_iso
auth_log.info(f"LOGIN_SUCCESS user={username} ip={user.last_login_ip}")
# Open-redirect defense: only redirect to ?next= if it's a URL
# on this host. Anything else (full external URL, missing host)
# falls back to /admin.
next_url = _safe_next_url(request.args.get("next")) or url_for("admin_page")
return redirect(next_url)
# Failed authentication: bump the counter and possibly lock.
if user:
locked_now = user.register_failed_login(
Config.LOGIN_LOCKOUT_MAX_ATTEMPTS,
Config.LOGIN_LOCKOUT_DURATION,
)
db.session.commit()
if locked_now:
auth_log.warning(
f"LOGIN_LOCKOUT user={username} ip={request.remote_addr} "
f"after {user.failed_login_count} failed attempts"
)
auth_log.warning(f"LOGIN_FAILED user={username} ip={request.remote_addr}")
flash("Invalid username or password.", "error")
return mobile_render("login.html")
@app.route("/logout")
@login_required
def logout():
# Capture identity BEFORE we tear the session down — current_user
# becomes anonymous as soon as logout_user() runs.
username = current_user.username
user_id = current_user.id
# Stamp last_logout on the database row.
user = User.query.get(user_id)
if user:
user.last_logout = datetime.now(timezone.utc)
db.session.commit()
auth_log.info(f"LOGOUT user={username} ip={request.remote_addr}")
# See _full_logout for the why-is-this-so-fiddly comment. Same dance
# is used by the session-timeout middleware so the two paths can't
# silently drift apart.
_full_logout()
# flash() must come AFTER session.clear() (flash messages live in the
# session). Redirect to /login (not /) so the user gets unambiguous
# feedback that they're signed out — the topbar on the index page
# only updates after the next render, which can confuse on cache.
flash("You have been signed out.", "info")
return redirect(url_for("login_page"))
@app.route("/register", methods=["GET", "POST"])
@limiter.limit("3 per minute", methods=["POST"])
def register_page():
if current_user.is_authenticated:
return redirect(url_for("admin_page"))
if request.method == "POST":
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip() or None
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
password_error = _validate_password_strength(password) if password else None
if not username or not password:
flash("Username and password are required.", "error")
elif password != password2:
flash("Passwords do not match.", "error")
elif password_error:
flash(password_error, "error")
elif User.query.filter_by(username=username).first():
flash("Username already taken.", "error")
else:
# First user is automatically admin
is_first = User.query.count() == 0
role = "admin" if is_first else "viewer"
user = User(username=username, email=email, role=role)
user.set_password(password)
db.session.add(user)
db.session.commit()
login_user(user)
auth_log.info(f"REGISTER user={username} role={role} ip={request.remote_addr}")
flash("Account created!" + (" You are the first user, so you have admin rights." if is_first else ""), "success")
return redirect(url_for("admin_page"))
return mobile_render("register.html")
# ══════════════════════════════════════════════
# User account page (session-protected)
# ══════════════════════════════════════════════
@app.route("/account")
@login_required
def account_page():
return mobile_render("account.html")
# ══════════════════════════════════════════════
# Admin panel (session-protected)
# ══════════════════════════════════════════════
@app.route("/admin")
@no_mobile
@login_required
def admin_page():
return render_template("admin.html")
@app.route("/admin/aircraft")
@no_mobile
@login_required
def admin_aircraft_page():
return render_template("admin_aircraft.html")
@app.route("/admin/aircraft/new")
@no_mobile
@login_required
def admin_aircraft_new_page():
return render_template("admin_aircraft_new.html")
@app.route("/admin/museums")
@no_mobile
@login_required
def admin_museums_page():
return render_template("admin_museums.html")
@app.route("/admin/museums/new")
@no_mobile
@login_required
def admin_museums_new_page():
return render_template("admin_museums_new.html")
@app.route("/admin/exhibits")
@no_mobile
@login_required
def admin_exhibits_page():
"""Flat overview of every aircraft-museum link.
Link *creation* lives in the aircraft and museum edit modals; this page
is the cross-cutting view those modals can't give — every exhibit in
one sortable table, with duplicate detection and quick unlink/status
edits. Backed by GET /api/v1/exhibits."""
return render_template("admin_exhibits.html")
@app.route("/admin/templates")
@no_mobile
@login_required
def admin_templates_page():
return render_template("admin_templates.html")
@app.route("/admin/import")
@no_mobile
@login_required
def admin_import_page():
"""Bulk-import landing page. Auth handled by the login_required decorator;
the API endpoints behind the form enforce the admin-data role gate."""
return render_template("admin_import.html")
@app.route("/admin/users")
@no_mobile
@admin_required
def admin_users_page():
return render_template("users.html")
@app.route("/admin/api-keys")
@no_mobile
@login_required
def api_keys_page():
return render_template("api_keys.html")
# ══════════════════════════════════════════════
# Public: contributions leaderboard
# ══════════════════════════════════════════════
@app.route("/contributors")
def contributors_page():
return mobile_render("contributors.html")
@app.route("/api/v1/contributors")
def api_contributors():
"""Public endpoint: list users with contribution counts, sorted by most contributions."""
# Column-only query: avoids loading full User rows (and their relationships)
# just to serialize three fields.
rows = (
db.session.query(User.username, User.role, User.contribution_count)
.filter(User.contribution_count > 0)
.order_by(User.contribution_count.desc())
.all()
)
return jsonify([
{"username": username, "role": role, "contributions": count}
for username, role, count in rows
])
# ══════════════════════════════════════════════
# API: Public read-only (no auth needed)
# ══════════════════════════════════════════════
# Sortable-column whitelists. Only columns listed here can be sorted via
# ?sort_by=...; anything else is silently ignored and the endpoint falls back
# to its default ORDER BY. This stops "?sort_by=password_hash" style probes
# in case someone tries to sort by a column we never meant to expose.
_AIRCRAFT_SORT_COLUMNS = {
"id": lambda: Aircraft.id,
"model": lambda: Aircraft.model,
"variant": lambda: Aircraft.variant,
"model_name": lambda: Aircraft.model_name,
"aircraft_name": lambda: Aircraft.aircraft_name,
"tail_number": lambda: Aircraft.tail_number,
"manufacturer": lambda: Aircraft.manufacturer,
"aircraft_type": lambda: Aircraft.aircraft_type,
"military_civilian": lambda: Aircraft.military_civilian,
"role_type": lambda: Aircraft.role_type,
"year_built": lambda: Aircraft.year_built,
"full_designation": lambda: Aircraft.full_designation,
}
_MUSEUM_SORT_COLUMNS = {
"id": lambda: Museum.id,
"name": lambda: Museum.name,
"city": lambda: Museum.city,
"state_province": lambda: Museum.state_province,
"country": lambda: Museum.country,
"region": lambda: Museum.region,
}
def _apply_sort(query, column_map, default_order):
"""Apply ?sort_by=…&sort_dir=… to ``query`` if the requested column is
in the ``column_map`` whitelist; otherwise fall back to ``default_order``.
column_map values are zero-arg callables returning the column expression
so we evaluate them only when used (avoids touching the model at module
import time before the app is configured).
"""
sort_by = (request.args.get("sort_by") or "").strip()
sort_dir = (request.args.get("sort_dir") or "asc").strip().lower()
column_factory = column_map.get(sort_by)
if column_factory is None:
return query.order_by(*default_order)
column = column_factory()
return query.order_by(column.desc() if sort_dir == "desc" else column.asc())
def _build_aircraft_filter(q):
"""Build an OR filter that matches aircraft by model, name, tail, manufacturer, or alias."""
like = f"%{q}%"
# Subquery: aircraft IDs that match via aliases. Use scalar_subquery() so
# SQLAlchemy 2.x doesn't emit a coercion warning when this is fed into
# in_(); the older .subquery() form is deprecated for that use case.
alias_ids = db.session.query(AircraftAlias.aircraft_id).filter(
AircraftAlias.alias.ilike(like)
).scalar_subquery()
return or_(
Aircraft.tail_number.ilike(like),
Aircraft.model_name.ilike(like),
Aircraft.aircraft_name.ilike(like),
Aircraft.model.ilike(like),
Aircraft.variant.ilike(like),
# Use the STORED generated column (indexed as idx_full_desig) instead
# of computing CONCAT(model, '-', variant) per row, which would defeat
# the index and force a full table scan.
Aircraft.full_designation.ilike(like),
Aircraft.manufacturer.ilike(like),
Aircraft.id.in_(alias_ids),
)
@app.route("/api/v1/aircraft/search")
def api_aircraft_search():
"""Search aircraft by tail number, model, variant, name, manufacturer, or alias."""
q = request.args.get("q", "").strip()
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 100)
query = Aircraft.query
if q:
query = query.filter(_build_aircraft_filter(q))
# Honor ?sort_by=field&sort_dir=asc|desc; default ordering otherwise.
query = _apply_sort(
query, _AIRCRAFT_SORT_COLUMNS,
default_order=(Aircraft.model, Aircraft.variant, Aircraft.model_name),
)
p = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({"results": [a.to_dict() for a in p.items], "total": p.total, "page": p.page, "pages": p.pages})
@app.route("/api/v1/aircraft/<int:aircraft_id>")
def api_aircraft_detail(aircraft_id):
"""Get a single aircraft with its museum locations.