Skip to content

Commit d483887

Browse files
authored
Merge pull request #15 from rhel-lightspeed/chore/structlog-and-lock-cleanup
chore: adopt structlog and add asyncio.Lock to engine singleton
2 parents 1259372 + a05e58d commit d483887

5 files changed

Lines changed: 75 additions & 71 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424

2525
### Changed
2626

27-
- Deferred heavy `docs2db-api` imports (torch, transformers) in `engine.py` to reduce module load time
28-
- Migrated startup health check and engine shutdown to FastMCP lifespan hook for proper async lifecycle management
29-
- Simplified `__main__.py` — removed manual `asyncio.run()` calls for health check and cleanup
27+
- Deferred heavy `docs2db-api` imports (torch, transformers) in `engine.py` to reduce module load time (RSPEED-3047)
28+
- Migrated startup health check and engine shutdown to FastMCP lifespan hook for proper async lifecycle management (RSPEED-3047)
29+
- Simplified `__main__.py` — removed manual `asyncio.run()` calls for health check and cleanup (RSPEED-3047)
30+
- Adopted structlog throughout `engine.py`, `server.py`, `search_documents.py` — consistent keyword-arg logging, no f-strings
31+
- Added `asyncio.Lock` to `get_engine()` singleton to guard against concurrent initialization
32+
- `shutdown_engine()` now awaits `engine.close()` before clearing the singleton reference
3033
- Upgraded fastmcp from 2.x to 3.x (`>=3.3.1, <4`)
3134
- Removed standalone `mcp` dependency (pulled in transitively by fastmcp 3.x)
3235
- Updated smoke tests to match FastMCP 3.x decorator behavior (`@mcp.tool` now returns the original function)

src/docs2db_mcp/__main__.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@
44
import os
55
import sys
66

7+
import structlog
8+
79

810
# Must read transport before importing modules that configure logging
911
transport = os.environ.get("DOCS2DB_MCP_TRANSPORT", "sse")
1012

1113
if transport == "sse":
12-
# stdout not used by MCP protocol, logging can go there
1314
logging.basicConfig(
1415
level=logging.INFO,
1516
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
1617
stream=sys.stderr,
1718
)
1819
else:
1920
# stdout is reserved for MCP protocol, must redirect all logging to stderr
20-
# set CRITICAL level before importing docs2db-api to minimize startup logs
2121
os.environ.setdefault("DOCS2DB_LOG_LEVEL", "CRITICAL")
2222
logging.basicConfig(
2323
level=logging.INFO,
@@ -30,27 +30,28 @@
3030
from docs2db_mcp.server import mcp # noqa: E402
3131

3232

33-
logger = logging.getLogger(__name__)
33+
logger = structlog.get_logger(__name__)
3434

3535

3636
def main() -> None:
3737
"""Run the MCP server."""
38-
logger.info("Starting docs2db MCP server on %s:%s", CONFIG.host, CONFIG.port)
39-
logger.info("Transport: %s", CONFIG.transport)
40-
logger.info("Database: %s:%s/%s", CONFIG.db_host, CONFIG.db_port, CONFIG.db_database)
4138
logger.info(
42-
"RAG settings: threshold=%s, max_chunks=%s, reranking=%s",
43-
CONFIG.rag_similarity_threshold,
44-
CONFIG.rag_max_chunks,
45-
CONFIG.rag_enable_reranking,
39+
"Starting docs2db MCP server",
40+
host=CONFIG.host,
41+
port=CONFIG.port,
42+
transport=CONFIG.transport,
43+
database=f"{CONFIG.db_host}:{CONFIG.db_port}/{CONFIG.db_database}",
44+
rag_threshold=CONFIG.rag_similarity_threshold,
45+
rag_max_chunks=CONFIG.rag_max_chunks,
46+
rag_reranking=CONFIG.rag_enable_reranking,
4647
)
4748

4849
try:
4950
mcp.run(transport=CONFIG.transport, **CONFIG.transport_kwargs)
5051
except KeyboardInterrupt:
5152
logger.info("Received keyboard interrupt")
5253
except Exception as e:
53-
logger.error("Server error: %s", e, exc_info=True)
54+
logger.error("Server error", error=str(e), exc_info=True)
5455
sys.exit(1)
5556

5657

src/docs2db_mcp/engine.py

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@
66

77
from __future__ import annotations
88

9-
import logging
9+
import asyncio
1010

1111
from typing import TYPE_CHECKING
1212

13+
import structlog
14+
1315
from docs2db_mcp.config import CONFIG
1416

1517

1618
if TYPE_CHECKING:
1719
from docs2db_api.rag.engine import UniversalRAGEngine
1820

1921

20-
logger = logging.getLogger(__name__)
22+
logger = structlog.get_logger(__name__)
2123

2224
_engine: UniversalRAGEngine | None = None
25+
_engine_lock = asyncio.Lock()
2326

2427

2528
async def get_engine() -> UniversalRAGEngine:
@@ -37,41 +40,39 @@ async def get_engine() -> UniversalRAGEngine:
3740
"""
3841
global _engine
3942

40-
if _engine is None:
41-
from docs2db_api.rag.engine import RAGConfig
42-
from docs2db_api.rag.engine import UniversalRAGEngine
43-
44-
logger.info("Initializing UniversalRAGEngine")
45-
46-
# Configure database connection (dict format)
47-
db_config = {
48-
"host": CONFIG.db_host,
49-
"port": str(CONFIG.db_port),
50-
"database": CONFIG.db_database,
51-
"user": CONFIG.db_user,
52-
"password": CONFIG.db_password,
53-
}
54-
55-
# Configure RAG (no LLM config - refinement disabled)
56-
rag_config = RAGConfig()
57-
rag_config.similarity_threshold = CONFIG.rag_similarity_threshold
58-
rag_config.max_chunks = CONFIG.rag_max_chunks
59-
rag_config.enable_reranking = CONFIG.rag_enable_reranking
60-
rag_config.enable_question_refinement = False # Disabled for tool calling
61-
62-
# Create and initialize engine
63-
_engine = UniversalRAGEngine(
64-
config=rag_config,
65-
db_config=db_config,
66-
)
67-
68-
try:
69-
await _engine.start()
70-
logger.info("UniversalRAGEngine initialized successfully")
71-
except Exception as e:
72-
logger.error("Failed to initialize RAG engine: %s", e)
73-
_engine = None
74-
raise
43+
async with _engine_lock:
44+
if _engine is None:
45+
from docs2db_api.rag.engine import RAGConfig
46+
from docs2db_api.rag.engine import UniversalRAGEngine
47+
48+
logger.info("Initializing UniversalRAGEngine")
49+
50+
db_config = {
51+
"host": CONFIG.db_host,
52+
"port": str(CONFIG.db_port),
53+
"database": CONFIG.db_database,
54+
"user": CONFIG.db_user,
55+
"password": CONFIG.db_password,
56+
}
57+
58+
rag_config = RAGConfig()
59+
rag_config.similarity_threshold = CONFIG.rag_similarity_threshold
60+
rag_config.max_chunks = CONFIG.rag_max_chunks
61+
rag_config.enable_reranking = CONFIG.rag_enable_reranking
62+
rag_config.enable_question_refinement = False
63+
64+
_engine = UniversalRAGEngine(
65+
config=rag_config,
66+
db_config=db_config,
67+
)
68+
69+
try:
70+
await _engine.start()
71+
logger.info("UniversalRAGEngine initialized successfully")
72+
except Exception as e:
73+
logger.error("Failed to initialize RAG engine", error=str(e))
74+
_engine = None
75+
raise
7576

7677
return _engine
7778

@@ -80,11 +81,13 @@ async def shutdown_engine() -> None:
8081
"""Shutdown the RAG engine and cleanup resources."""
8182
global _engine
8283

83-
if _engine is not None:
84-
logger.info("Shutting down UniversalRAGEngine")
85-
# UniversalRAGEngine doesn't have a stop() method
86-
# Just set to None to allow garbage collection
87-
_engine = None
84+
async with _engine_lock:
85+
if _engine is not None:
86+
logger.info("Shutting down UniversalRAGEngine")
87+
try:
88+
await _engine.close()
89+
finally:
90+
_engine = None
8891

8992

9093
async def health_check() -> None:
@@ -96,27 +99,23 @@ async def health_check() -> None:
9699
logger.info("Performing startup health check...")
97100

98101
try:
99-
# Initialize engine and connect to database
100102
engine = await get_engine()
101103
logger.info("Database connection established")
102104

103-
# Perform a simple test query to verify the system works
104-
test_query = "test"
105105
result = await engine.search_documents(
106-
query=test_query,
106+
query="test",
107107
max_chunks=1,
108108
similarity_threshold=0.0,
109109
enable_reranking=False,
110110
)
111111

112-
# Verify we got a valid response
113112
if result is None:
114113
msg = "Health check query returned None"
115114
raise Exception(msg)
116115

117-
logger.info("Test query successful (returned %d documents)", len(result.documents))
116+
logger.info("Test query successful", document_count=len(result.documents))
118117
logger.info("Health check passed - system is ready")
119118

120119
except Exception as e:
121-
logger.error("Health check failed: %s", e, exc_info=True)
120+
logger.error("Health check failed", error=str(e), exc_info=True)
122121
raise Exception(f"Startup health check failed - cannot connect to database or perform queries: {e}") from e

src/docs2db_mcp/server.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
"""FastMCP server instance and initialization."""
22

3-
import logging
43
import sys
54

65
from collections.abc import AsyncIterator
76
from contextlib import asynccontextmanager
87

8+
import structlog
9+
910
from fastmcp import FastMCP
1011

1112
from docs2db_mcp.config import CONFIG
1213
from docs2db_mcp.engine import health_check
1314
from docs2db_mcp.engine import shutdown_engine
1415

1516

16-
logger = logging.getLogger(__name__)
17+
logger = structlog.get_logger(__name__)
1718

1819

1920
@asynccontextmanager

src/docs2db_mcp/tools/search_documents.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
"""search_documents tool implementation."""
22

3-
import logging
4-
53
from typing import Any
64

5+
import structlog
6+
77
from mcp.types import ToolAnnotations
88

99
from docs2db_mcp.engine import get_engine
1010
from docs2db_mcp.server import mcp
1111

1212

13-
logger = logging.getLogger(__name__)
13+
logger = structlog.get_logger(__name__)
1414

1515

1616
@mcp.tool(
@@ -46,7 +46,7 @@ async def search_documents(
4646
- query_used: The original query (refinement disabled)
4747
- num_results: Number of results returned
4848
"""
49-
logger.info(f"Searching for: {query!r} (max_chunks={max_chunks})")
49+
logger.info("Searching", query_length=len(query), max_chunks=max_chunks)
5050

5151
try:
5252
engine = await get_engine()
@@ -73,7 +73,7 @@ async def search_documents(
7373
for doc in result.documents
7474
]
7575

76-
logger.info(f"Found {len(chunks)} results")
76+
logger.info("Search complete", num_results=len(chunks))
7777

7878
return {
7979
"chunks": chunks,
@@ -82,7 +82,7 @@ async def search_documents(
8282
}
8383

8484
except Exception as e:
85-
logger.error(f"Search failed: {e}", exc_info=True)
85+
logger.error("Search failed", error=str(e), exc_info=True)
8686
return {
8787
"chunks": [],
8888
"query_used": query,

0 commit comments

Comments
 (0)