-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
302 lines (248 loc) · 10.2 KB
/
main.py
File metadata and controls
302 lines (248 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""Main FastAPI application entry point."""
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from util import dev_routes
from graph.dependencies import get_age_graph_client
from graph.infrastructure.age_client import AgeGraphClient
from graph.presentation import routes as graph_routes
from iam.presentation import router as iam_router
from management.presentation import management_router
from infrastructure.database.dependencies import (
close_database_engines,
init_database_engines,
)
from infrastructure.dependencies import get_age_connection_pool
from infrastructure.logging import configure_logging
from infrastructure.settings import (
get_cors_settings,
get_database_settings,
get_oidc_settings,
get_outbox_worker_settings,
get_spicedb_settings,
)
from infrastructure.version import __version__
from iam.infrastructure.outbox import IAMEventTranslator
from management.infrastructure.outbox import ManagementEventTranslator
from infrastructure.outbox.composite import CompositeEventHandler
from infrastructure.outbox.spicedb_handler import SpiceDBEventHandler
from infrastructure.outbox.event_sources.postgres_notify import (
PostgresNotifyEventSource,
)
from infrastructure.outbox.worker import OutboxWorker
from shared_kernel.authorization.spicedb.client import SpiceDBClient
from shared_kernel.outbox.observability import (
DefaultEventSourceProbe,
DefaultOutboxWorkerProbe,
)
from query.presentation.mcp import mcp_http_app_inner, query_mcp_app
# Configure structlog before any loggers are created
configure_logging()
@asynccontextmanager
async def kartograph_lifespan(app: FastAPI):
"""Application lifespan context.
Manages:
- Database engine lifecycle (created on startup, disposed on shutdown)
- MCP server lifespan
- AGE connection pool lifecycle
- Outbox worker lifecycle
Engines are created here (within the running event loop) to ensure
proper async context for database connections.
State is tracked per-app instance (app.state._mcp_initialized) to maintain
test isolation when multiple app instances are created.
"""
# Initialize MCP state tracking on this app instance
if not hasattr(app.state, "_mcp_initialized"):
app.state._mcp_initialized = False
# Startup: initialize database engines
init_database_engines(app)
# Startup: create shared SpiceDB client for bootstrap and outbox worker
spicedb_settings = get_spicedb_settings()
authz = SpiceDBClient(
endpoint=spicedb_settings.endpoint,
preshared_key=spicedb_settings.preshared_key.get_secret_value(),
use_tls=spicedb_settings.use_tls,
cert_path=spicedb_settings.cert_path,
)
# Startup: ensure default tenant and root workspace exist (single-tenant mode)
if hasattr(app.state, "write_sessionmaker"):
from iam.application.services import TenantBootstrapService
from iam.infrastructure.tenant_repository import TenantRepository
from iam.infrastructure.workspace_repository import WorkspaceRepository
from infrastructure.observability.startup_probe import DefaultStartupProbe
from infrastructure.outbox.repository import OutboxRepository
from infrastructure.settings import get_iam_settings
iam_settings = get_iam_settings()
startup_probe = DefaultStartupProbe()
async with app.state.write_sessionmaker() as session:
outbox = OutboxRepository(session=session)
tenant_repo = TenantRepository(session=session, outbox=outbox)
workspace_repo = WorkspaceRepository(
session=session, authz=authz, outbox=outbox
)
bootstrap_service = TenantBootstrapService(
tenant_repository=tenant_repo,
workspace_repository=workspace_repo,
session=session,
probe=startup_probe,
)
# Resolve workspace name (use setting or fall back to tenant name)
workspace_name = (
iam_settings.default_workspace_name or iam_settings.default_tenant_name
)
await bootstrap_service.ensure_default_tenant_with_workspace(
tenant_name=iam_settings.default_tenant_name,
workspace_name=workspace_name,
)
# Startup: start outbox worker if enabled
outbox_settings = get_outbox_worker_settings()
if outbox_settings.enabled and hasattr(app.state, "write_sessionmaker"):
db_settings = get_database_settings()
# Build database URL for LISTEN
db_url = (
f"postgresql://{db_settings.username}:"
f"{db_settings.password.get_secret_value()}@"
f"{db_settings.host}:{db_settings.port}/{db_settings.database}"
)
# Create observability probe
probe = DefaultOutboxWorkerProbe()
# Build composite handler with registered bounded context handlers
handler = CompositeEventHandler(probe=probe)
# Register SpiceDB handler wrapping the IAM translator
spicedb_handler = SpiceDBEventHandler(
translator=IAMEventTranslator(),
authz=authz,
)
handler.register(spicedb_handler, handler_name="iam")
# Register SpiceDB handler wrapping the Management translator
management_spicedb_handler = SpiceDBEventHandler(
translator=ManagementEventTranslator(),
authz=authz,
)
handler.register(management_spicedb_handler, handler_name="management")
# Create event source for real-time NOTIFY processing
event_source = PostgresNotifyEventSource(
db_url=db_url,
channel="outbox_events",
probe=DefaultEventSourceProbe(),
)
worker = OutboxWorker(
session_factory=app.state.write_sessionmaker,
handler=handler,
probe=probe,
event_source=event_source,
poll_interval_seconds=outbox_settings.poll_interval_seconds,
batch_size=outbox_settings.batch_size,
max_retries=outbox_settings.max_retries,
)
await worker.start()
app.state.outbox_worker = worker
# MCP lifespan - skip if already initialized (e.g., in tests with multiple lifespans)
if not app.state._mcp_initialized:
async with mcp_http_app_inner.lifespan(app):
app.state._mcp_initialized = True
yield
else:
# MCP already initialized in previous lifespan cycle
yield
# Shutdown: stop outbox worker
if hasattr(app.state, "outbox_worker"):
await app.state.outbox_worker.stop()
# Shutdown: close database engines
await close_database_engines(app)
# Shutdown: close AGE connection pool and clear cache for next startup
try:
pool = get_age_connection_pool()
pool.close_all()
# Clear lru_cache so next startup creates a fresh pool
get_age_connection_pool.cache_clear()
except Exception:
# Pool may not be initialized, ignore
pass
app = FastAPI(
title="Kartograph API",
description="Enterprise-Ready Bi-Temporal Knowledge Graphs as a Service",
version=__version__,
lifespan=kartograph_lifespan,
)
# Configure CORS if origins are specified
cors_settings = get_cors_settings()
if cors_settings.is_enabled:
app.add_middleware(
CORSMiddleware,
allow_origins=cors_settings.origins,
allow_credentials=cors_settings.allow_credentials,
allow_methods=cors_settings.allow_methods,
allow_headers=cors_settings.allow_headers,
expose_headers=cors_settings.expose_headers,
max_age=cors_settings.max_age,
)
app.mount(path="/query", app=query_mcp_app)
# Include Graph bounded context routes
app.include_router(graph_routes.router)
# Include IAM bounded context routes
app.include_router(iam_router)
# Include Management bounded context routes
app.include_router(management_router)
# Include dev utility routes (easy to remove for production)
app.include_router(dev_routes.router)
# Log OIDC configuration at startup
def _log_oidc_config() -> None:
"""Log OIDC configuration if available."""
from iam.application.observability import DefaultOIDCConfigProbe
try:
oidc_settings = get_oidc_settings()
DefaultOIDCConfigProbe.log_settings(oidc_settings)
except Exception:
# OIDC settings may fail if client_secret is not configured
pass
_log_oidc_config()
def configure_swagger_oauth2(app: FastAPI) -> None:
"""Configure Swagger UI OAuth2 with PKCE.
Sets up the Swagger UI to authenticate via OAuth2/OIDC using the
Authorization Code flow with PKCE. This uses the public swagger client,
not the confidential API client.
The security scheme itself is registered automatically by
OAuth2AuthorizationCodeBearer in iam/dependencies.py. This function
only configures the Swagger UI initialization parameters.
If OIDC settings are not configured (e.g., missing client_secret),
this function silently returns without configuring Swagger OAuth2.
"""
try:
oidc_settings = get_oidc_settings()
except Exception:
# OIDC not configured, skip Swagger OAuth2
return
# Configure Swagger UI init parameters for PKCE flow
app.swagger_ui_init_oauth = {
"clientId": oidc_settings.swagger_client_id,
"usePkceWithAuthorizationCodeGrant": True,
"scopes": "openid profile email",
}
# Configure Swagger OAuth2 if OIDC is available
configure_swagger_oauth2(app)
@app.get("/health")
def health():
"""Basic health check endpoint."""
return {"status": "ok"}
@app.get("/health/db")
def health_db(
client: Annotated[AgeGraphClient, Depends(get_age_graph_client)],
) -> dict:
"""Check database connection health.
Returns the connection status and graph name.
"""
try:
is_healthy = client.verify_connection()
return {
"status": "ok" if is_healthy else "unhealthy",
"connected": client.is_connected(),
"graph_name": client.graph_name,
}
except Exception as e:
return {
"status": "error",
"connected": False,
"error": str(e),
}