1- from dataclasses import dataclass
2- from typing import AsyncIterator , Type , Dict
3- from mcp .server .fastmcp import FastMCP
41import os
5- from contextlib import asynccontextmanager
62from abc import ABC , abstractmethod
7- from PyPDF2 import PdfReader as PyPdfReader
3+ from collections .abc import AsyncIterator
4+ from contextlib import asynccontextmanager
5+ from dataclasses import dataclass
6+ from typing import override
7+
88from docx import Document as DocxDocument
9+ from mcp .server .fastmcp import FastMCP
910from openpyxl import load_workbook
11+ from PyPDF2 import PdfReader as PyPdfReader
1012
1113# Directory where documents are stored
1214DOCUMENT_DIRECTORY = os .getenv ("DOCUMENT_DIRECTORY" , "./documents" )
1315
16+
1417@dataclass
1518class AppContext :
1619 """Application context for lifecycle management."""
20+
1721 document_directory : str
1822
23+
1924# Initialize the MCP server (lifespan added below)
2025mcp = FastMCP ("Document Reader" )
2126
27+
2228@asynccontextmanager
2329async def app_lifespan (server : FastMCP ) -> AsyncIterator [AppContext ]:
2430 """Manage application lifecycle with type-safe context"""
@@ -30,15 +36,17 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
3036 # Cleanup (if needed)
3137 pass
3238
39+
3340# Assign lifespan to server
34- mcp .lifespan = app_lifespan
41+ mcp .lifespan = app_lifespan # type: ignore[reportAttributeAccessIssue]
3542
3643
3744# ------------------------- Document Reader Architecture -------------------------
3845
46+
3947class DocumentReader (ABC ):
4048 """Abstract base class for document readers"""
41-
49+
4250 @abstractmethod
4351 def read (self , file_path : str ) -> str :
4452 """Read and extract text from a document"""
@@ -47,29 +55,30 @@ def read(self, file_path: str) -> str:
4755
4856class DocxReader (DocumentReader ):
4957 """DOCX document reader implementation"""
50-
58+
59+ @override
5160 def read (self , file_path : str ) -> str :
5261 """Read and extract text from DOCX file"""
5362 try :
5463 doc = DocxDocument (file_path )
5564 text = []
56-
65+
5766 # Extract paragraph text
5867 for paragraph in doc .paragraphs :
5968 if paragraph .text :
6069 text .append (paragraph .text )
61-
70+
6271 # Extract table content
6372 for table in doc .tables :
6473 for row in table .rows :
6574 row_text = []
6675 for cell in row .cells :
67- cell_text = ' ' .join ([p .text for p in cell .paragraphs ]).strip ()
76+ cell_text = " " .join ([p .text for p in cell .paragraphs ]).strip ()
6877 if cell_text :
6978 row_text .append (cell_text )
7079 if row_text :
71- text .append (' \t ' .join (row_text ))
72-
80+ text .append (" \t " .join (row_text ))
81+
7382 extracted_text = "\n " .join (text )
7483 return extracted_text if extracted_text else "No text found in the DOCX."
7584 except Exception as e :
@@ -78,20 +87,21 @@ def read(self, file_path: str) -> str:
7887
7988class PdfReader (DocumentReader ):
8089 """PDF document reader implementation"""
81-
90+
91+ @override
8292 def read (self , file_path : str ) -> str :
8393 """Read and extract text from PDF file"""
8494 try :
85- with open (file_path , 'rb' ) as file :
95+ with open (file_path , "rb" ) as file :
8696 pdf_reader = PyPdfReader (file )
8797 text = []
88-
98+
8999 # Extract text from each page
90100 for page in pdf_reader .pages :
91101 page_text = page .extract_text ()
92102 if page_text :
93103 text .append (page_text .strip ())
94-
104+
95105 extracted_text = "\n \n " .join (text )
96106 return extracted_text if extracted_text else "No text found in the PDF."
97107 except Exception as e :
@@ -100,74 +110,78 @@ def read(self, file_path: str) -> str:
100110
101111class TxtReader (DocumentReader ):
102112 """TXT document reader implementation"""
103-
113+
114+ @override
104115 def read (self , file_path : str ) -> str :
105116 """Read and extract text from TXT file with encoding handling"""
106117 # Supported encodings in priority order
107- encodings = [' utf-8' , ' gbk' , ' gb2312' , ' ansi' , ' latin-1' ]
108-
118+ encodings = [" utf-8" , " gbk" , " gb2312" , " ansi" , " latin-1" ]
119+
109120 for encoding in encodings :
110121 try :
111- with open (file_path , 'r' , encoding = encoding ) as f :
122+ with open (file_path , "r" , encoding = encoding ) as f :
112123 text = f .read ()
113124 return text if text else "No text found in the TXT file."
114125 except UnicodeDecodeError :
115126 continue
116127 except Exception as e :
117128 return f"Error reading TXT: { str (e )} "
118-
129+
119130 return "Error reading TXT: Could not decode file with any supported encoding."
120131
121132
122133class ExcelReader (DocumentReader ):
123134 """Excel document reader implementation"""
124-
135+
136+ @override
125137 def read (self , file_path : str ) -> str :
126138 """Read and extract text from Excel file"""
127139 try :
128140 wb = load_workbook (file_path , read_only = True )
129141 text = []
130-
142+
131143 # Extract text from all sheets
132144 for sheet_name in wb .sheetnames :
133145 sheet = wb [sheet_name ]
134146 text .append (f"=== Sheet: { sheet_name } ===" )
135-
147+
136148 # Extract cell content
137149 for row in sheet .iter_rows (values_only = True ):
138150 row_text = [str (cell ) if cell is not None else "" for cell in row ]
139151 if any (row_text ): # Only add non-empty rows
140152 text .append ("\t " .join (row_text ))
141-
153+
142154 text .append ("" ) # Add blank line between sheets
143-
155+
144156 extracted_text = "\n " .join (text )
145- wb .close () # Properly close the workbook
146- return extracted_text if extracted_text else "No text found in the Excel file."
157+ wb .close ()
158+ return (
159+ extracted_text if extracted_text else "No text found in the Excel file."
160+ )
147161 except Exception as e :
148162 return f"Error reading Excel: { str (e )} "
149163
150164
151165class DocumentReaderFactory :
152166 """Factory for creating document readers based on file extension"""
153-
167+
154168 # Mapping of file extensions to reader classes
155- _readers : Dict [str , Type [DocumentReader ]] = {
156- ' .txt' : TxtReader ,
157- ' .docx' : DocxReader ,
158- ' .pdf' : PdfReader ,
159- ' .xlsx' : ExcelReader ,
160- ' .xls' : ExcelReader
169+ _readers : dict [str , type [DocumentReader ]] = {
170+ " .txt" : TxtReader ,
171+ " .docx" : DocxReader ,
172+ " .pdf" : PdfReader ,
173+ " .xlsx" : ExcelReader ,
174+ " .xls" : ExcelReader ,
161175 }
162-
176+
163177 @classmethod
164178 def get_reader (cls , file_path : str ) -> DocumentReader :
165179 """Get appropriate reader for the given file"""
166180 _ , ext = os .path .splitext (file_path .lower ())
167181 if ext not in cls ._readers :
168182 raise ValueError (f"Unsupported document type: { ext } " )
169183 return cls ._readers [ext ]()
170-
184+
171185 @classmethod
172186 def is_supported (cls , file_path : str ) -> bool :
173187 """Check if the file type is supported"""
@@ -177,36 +191,34 @@ def is_supported(cls, file_path: str) -> bool:
177191
178192# ------------------------- Tool Functions -------------------------
179193
180- def _get_document_path (ctx , filename : str ) -> str :
194+
195+ def _get_document_path (ctx : object , filename : str ) -> str :
181196 """Get full document path from context or environment"""
182197 try :
183- doc_dir = getattr (ctx , ' document_directory' , DOCUMENT_DIRECTORY )
184- except :
198+ doc_dir = getattr (ctx , " document_directory" , DOCUMENT_DIRECTORY )
199+ except Exception :
185200 doc_dir = DOCUMENT_DIRECTORY
186201 return os .path .join (doc_dir , filename )
187202
188203
189-
190-
191-
192204@mcp .tool ()
193- def read_document (ctx , filename : str ) -> str :
205+ def read_document (ctx : object , filename : str ) -> str :
194206 """
195207 Reads and extracts text from a specified document file.
196208 Supports multiple document types: TXT, DOCX, PDF, Excel (XLSX, XLS).
197-
209+
198210 :param ctx: FastMCP context
199211 :param filename: Name of the document file to read
200212 :return: Extracted text from the document
201213 """
202214 doc_path = _get_document_path (ctx , filename )
203-
215+
204216 if not os .path .exists (doc_path ):
205217 return f"Error: File '{ filename } ' not found at { doc_path } ."
206-
218+
207219 if not DocumentReaderFactory .is_supported (doc_path ):
208220 return f"Error: Unsupported document type for file '{ filename } '."
209-
221+
210222 try :
211223 reader = DocumentReaderFactory .get_reader (doc_path )
212224 return reader .read (doc_path )
@@ -220,4 +232,4 @@ def main():
220232
221233
222234if __name__ == "__main__" :
223- main ()
235+ main ()
0 commit comments