11import os
22from abc import ABC , abstractmethod
3- from collections .abc import AsyncIterator
4- from contextlib import asynccontextmanager
5- from dataclasses import dataclass
3+ from pathlib import Path
64
75from docx import Document as DocxDocument
86from mcp .server .fastmcp import FastMCP
97from openpyxl import load_workbook
108from pypdf import PdfReader as PyPdfReader
119from typing_extensions import override
1210
13- # Directory where documents are stored
14- DOCUMENT_DIRECTORY = os .getenv ("DOCUMENT_DIRECTORY" , "./documents" )
15-
16-
17- @dataclass
18- class AppContext :
19- """Application context for lifecycle management."""
20-
21- document_directory : str
22-
23-
24- # Initialize the MCP server (lifespan added below)
2511mcp = FastMCP ("Document Reader" )
2612
2713
28- @asynccontextmanager
29- async def app_lifespan (server : FastMCP ) -> AsyncIterator [AppContext ]:
30- """Manage application lifecycle with type-safe context"""
31- try :
32- # Ensure document directory exists
33- os .makedirs (DOCUMENT_DIRECTORY , exist_ok = True )
34- yield AppContext (document_directory = DOCUMENT_DIRECTORY )
35- finally :
36- # Cleanup (if needed)
37- pass
38-
39-
40- # Assign lifespan to server
41- mcp .lifespan = app_lifespan # type: ignore[reportAttributeAccessIssue]
42-
43-
44- # ------------------------- Document Reader Architecture -------------------------
45-
46-
4714class DocumentReader (ABC ):
4815 """Abstract base class for document readers"""
4916
@@ -63,12 +30,10 @@ def read(self, file_path: str) -> str:
6330 doc = DocxDocument (file_path )
6431 text = []
6532
66- # Extract paragraph text
6733 for paragraph in doc .paragraphs :
6834 if paragraph .text :
6935 text .append (paragraph .text )
7036
71- # Extract table content
7237 for table in doc .tables :
7338 for row in table .rows :
7439 row_text = []
@@ -96,7 +61,6 @@ def read(self, file_path: str) -> str:
9661 pdf_reader = PyPdfReader (file )
9762 text = []
9863
99- # Extract text from each page
10064 for page in pdf_reader .pages :
10165 page_text = page .extract_text ()
10266 if page_text :
@@ -114,7 +78,6 @@ class TxtReader(DocumentReader):
11478 @override
11579 def read (self , file_path : str ) -> str :
11680 """Read and extract text from TXT file with encoding handling"""
117- # Supported encodings in priority order
11881 encodings = ["utf-8" , "gbk" , "gb2312" , "latin-1" ]
11982
12083 for encoding in encodings :
@@ -140,18 +103,16 @@ def read(self, file_path: str) -> str:
140103 wb = load_workbook (file_path , read_only = True )
141104 text = []
142105
143- # Extract text from all sheets
144106 for sheet_name in wb .sheetnames :
145107 sheet = wb [sheet_name ]
146108 text .append (f"=== Sheet: { sheet_name } ===" )
147109
148- # Extract cell content
149110 for row in sheet .iter_rows (values_only = True ):
150111 row_text = [str (cell ) if cell is not None else "" for cell in row ]
151- if any (row_text ): # Only add non-empty rows
112+ if any (row_text ):
152113 text .append ("\t " .join (row_text ))
153114
154- text .append ("" ) # Add blank line between sheets
115+ text .append ("" )
155116
156117 extracted_text = "\n " .join (text )
157118 wb .close ()
@@ -165,7 +126,6 @@ def read(self, file_path: str) -> str:
165126class DocumentReaderFactory :
166127 """Factory for creating document readers based on file extension"""
167128
168- # Mapping of file extensions to reader classes
169129 _readers : dict [str , type [DocumentReader ]] = {
170130 ".txt" : TxtReader ,
171131 ".docx" : DocxReader ,
@@ -189,69 +149,31 @@ def is_supported(cls, file_path: str) -> bool:
189149 return ext in cls ._readers
190150
191151
192- # ------------------------- Tool Functions -------------------------
193-
194-
195- def _get_document_path (ctx : object , filename : str ) -> str :
196- """获取文档路径,防止路径遍历攻击。
197-
198- Args:
199- ctx: FastMCP 上下文对象
200- filename: 文件名
201-
202- Returns:
203- str: 安全的完整文件路径
204-
205- Raises:
206- ValueError: 当检测到路径遍历攻击时
207- """
208- doc_dir = getattr (ctx , "document_directory" , DOCUMENT_DIRECTORY )
209-
210- # 使用 basename 防止路径遍历
211- safe_filename = os .path .basename (filename )
212-
213- # 构建完整路径
214- full_path = os .path .join (doc_dir , safe_filename )
215-
216- # 验证路径在允许的目录内
217- real_path = os .path .realpath (full_path )
218- real_doc_dir = os .path .realpath (doc_dir )
219-
220- if not real_path .startswith (real_doc_dir + os .sep ) and real_path != real_doc_dir :
221- raise ValueError ("Access denied: path outside document directory" )
222-
223- return full_path
224-
225-
226152@mcp .tool ()
227- def read_document (ctx : object , filename : str ) -> str :
153+ def read_document (filename : str ) -> str :
228154 """
229155 Reads and extracts text from a specified document file.
230156 Supports multiple document types: TXT, DOCX, PDF, Excel (XLSX, XLS).
231157
232- :param ctx: FastMCP context
233- :param filename: Name of the document file to read
158+ :param filename: Path to the document file to read
159+ (supports absolute or relative paths)
234160 :return: Extracted text from the document
235161 """
236- try :
237- doc_path = _get_document_path (ctx , filename )
238- except ValueError :
239- return "Error: Invalid file path."
162+ file_path = Path (filename )
240163
241- if not os . path . exists (doc_path ):
164+ if not file_path . exists ():
242165 return f"Error: File '{ filename } ' not found."
243166
244- if not DocumentReaderFactory .is_supported (doc_path ):
167+ if not DocumentReaderFactory .is_supported (str ( file_path ) ):
245168 return f"Error: Unsupported document type for file '{ filename } '."
246169
247170 try :
248- reader = DocumentReaderFactory .get_reader (doc_path )
249- return reader .read (doc_path )
171+ reader = DocumentReaderFactory .get_reader (str ( file_path ) )
172+ return reader .read (str ( file_path ) )
250173 except Exception as e :
251174 return f"Error reading document: { str (e )} "
252175
253176
254- # Run the MCP server
255177def main ():
256178 mcp .run ()
257179
0 commit comments