Skip to content

Commit f3a2975

Browse files
committed
Merge PR #546 from hrshjswniii/bugfix/Persistent-Storage-Leak
2 parents a751354 + e26787a commit f3a2975

24 files changed

Lines changed: 1869 additions & 689 deletions

backend/app/database.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,7 @@ def _migrate_schema():
189189
("documents", "drive_file_id", "ALTER TABLE documents ADD COLUMN drive_file_id VARCHAR(255)"),
190190
("documents", "drive_folder_id", "ALTER TABLE documents ADD COLUMN drive_folder_id VARCHAR(255)"),
191191
("documents", "drive_synced_at", "ALTER TABLE documents ADD COLUMN drive_synced_at TIMESTAMP"),
192-
("documents", "processing_progress", "ALTER TABLE documents ADD COLUMN processing_progress INTEGER DEFAULT 0"),
193-
("documents", "processing_stage", "ALTER TABLE documents ADD COLUMN processing_stage VARCHAR(20) DEFAULT 'queued'"),
194-
("documents", "retry_count", "ALTER TABLE documents ADD COLUMN retry_count INTEGER DEFAULT 0"),
195-
("documents", "last_error_traceback", "ALTER TABLE documents ADD COLUMN last_error_traceback TEXT"),
196-
("documents", "processing_started_at", "ALTER TABLE documents ADD COLUMN processing_started_at TIMESTAMP"),
197-
("documents", "completed_at", "ALTER TABLE documents ADD COLUMN completed_at TIMESTAMP"),
198-
("documents", "extracted_urls", "ALTER TABLE documents ADD COLUMN extracted_urls TEXT"),
192+
("documents", "workspace_id", "ALTER TABLE documents ADD COLUMN workspace_id VARCHAR(36)"),
199193
]
200194
for table, column, ddl in docs_migrations:
201195
if column not in existing_docs_columns:

backend/app/main.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,70 @@
4040
settings = get_settings()
4141

4242

43+
async def document_cleanup_job():
44+
"""Background loop to periodically purge documents not accessed in 30 days."""
45+
import asyncio
46+
from datetime import datetime, timedelta, timezone
47+
logger.info("Starting document cleanup background job loop")
48+
while True:
49+
try:
50+
from app.database import SessionLocal
51+
from app.models import Document
52+
from app.rag.vectorstore import delete_document_chunks
53+
from sqlalchemy import or_
54+
55+
db = SessionLocal()
56+
try:
57+
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
58+
expired_docs = db.query(Document).filter(
59+
or_(
60+
Document.last_accessed_at < cutoff,
61+
Document.last_accessed_at.is_(None) & (Document.uploaded_at < cutoff)
62+
)
63+
).all()
64+
65+
for doc in expired_docs:
66+
logger.info(f"Auto-cleanup: Purging document {doc.id} ('{doc.original_name}') due to inactivity since {doc.last_accessed_at or doc.uploaded_at}")
67+
68+
# Delete physical file
69+
filepath = os.path.join(settings.UPLOAD_DIR, doc.user_id, doc.filename)
70+
if os.path.exists(filepath):
71+
try:
72+
os.remove(filepath)
73+
except Exception as e:
74+
logger.warning(f"Auto-cleanup: Failed to delete physical file {filepath}: {e}")
75+
76+
# Delete vectors
77+
try:
78+
delete_document_chunks(document_id=doc.id, user_id=doc.user_id)
79+
except Exception as e:
80+
logger.warning(f"Auto-cleanup: Error deleting vectors for document {doc.id}: {e}")
81+
82+
# Delete knowledge graph
83+
try:
84+
from app.rag.graph_builder import delete_graph
85+
delete_graph(user_id=doc.user_id, document_id=doc.id)
86+
except Exception as e:
87+
logger.warning(f"Auto-cleanup: Error deleting graph for document {doc.id}: {e}")
88+
89+
# Delete database record
90+
db.delete(doc)
91+
92+
db.commit()
93+
if expired_docs:
94+
logger.info(f"Auto-cleanup: Purged {len(expired_docs)} documents.")
95+
except Exception as exc:
96+
logger.error(f"Auto-cleanup job encountered error: {exc}", exc_info=True)
97+
finally:
98+
db.close()
99+
100+
except Exception as e:
101+
logger.error(f"Error in document cleanup background loop: {e}", exc_info=True)
102+
103+
# Run every 24 hours (86400 seconds)
104+
await asyncio.sleep(86400)
105+
106+
43107
@asynccontextmanager
44108
async def lifespan(app: FastAPI):
45109
"""Application startup/shutdown lifecycle."""

backend/app/models.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class User(Base):
171171
cascade="all, delete-orphan",
172172
)
173173
workspace_memberships = relationship(
174-
"WorkspaceMember",
174+
"WorkspaceMembership",
175175
back_populates="user",
176176
cascade="all, delete-orphan",
177177
)
@@ -197,6 +197,32 @@ class ApiKey(Base):
197197
user = relationship("User", back_populates="api_keys")
198198

199199

200+
class Workspace(Base):
201+
__tablename__ = "workspaces"
202+
203+
id = Column(String(36), primary_key=True, default=generate_uuid)
204+
name = Column(String(100), nullable=False)
205+
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
206+
207+
# Relationships
208+
memberships = relationship("WorkspaceMembership", back_populates="workspace", cascade="all, delete-orphan")
209+
documents = relationship("Document", back_populates="workspace")
210+
211+
212+
class WorkspaceMembership(Base):
213+
__tablename__ = "workspace_memberships"
214+
215+
id = Column(String(36), primary_key=True, default=generate_uuid)
216+
workspace_id = Column(String(36), ForeignKey("workspaces.id"), nullable=False, index=True)
217+
user_id = Column(GUID, ForeignKey("users.id"), nullable=False, index=True)
218+
role = Column(String(20), default="member", nullable=False) # "admin" | "member"
219+
joined_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
220+
221+
# Relationships
222+
workspace = relationship("Workspace", back_populates="memberships")
223+
user = relationship("User", back_populates="workspace_memberships")
224+
225+
200226
class WorkspaceInvitation(Base):
201227
__tablename__ = "workspace_invitations"
202228

@@ -348,6 +374,7 @@ class Document(Base):
348374
drive_synced_at = Column(DateTime, nullable=True)
349375
is_deleted = Column(Boolean, default=False, nullable=False, index=True)
350376
deleted_at = Column(DateTime, nullable=True)
377+
workspace_id = Column(String(36), ForeignKey("workspaces.id"), nullable=True, index=True)
351378
processing_progress = Column(Integer, default=0)
352379
processing_stage = Column(String(20), default="queued")
353380
retry_count = Column(Integer, default=0)
@@ -358,6 +385,7 @@ class Document(Base):
358385

359386
# Relationships
360387
owner = relationship("User", back_populates="documents")
388+
workspace = relationship("Workspace", back_populates="documents")
361389
messages = relationship(
362390
"ChatMessage",
363391
back_populates="document",

backend/app/rag/agent.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,18 @@ def get_agent_executor(
6565
hf_token: Optional[str] = None,
6666
top_k: Optional[int] = None,
6767
chat_history: Optional[List[Dict[str, str]]] = None,
68+
workspace: Optional[str] = None,
6869
):
6970
"""Initialize the LangChain ReAct agent executor."""
7071

7172
# Initialize tools
72-
pdf_tool = PDFSearchTool(user_id=user_id, document_id=document_id, document_ids=document_ids, top_k=top_k)
73+
pdf_tool = PDFSearchTool(
74+
user_id=user_id,
75+
document_id=document_id,
76+
document_ids=document_ids,
77+
workspace=workspace,
78+
top_k=top_k,
79+
)
7380
tools = [pdf_tool, MathTool(), WebSearchTool()]
7481

7582
# Initialize LLM
@@ -140,6 +147,7 @@ def generate_answer(
140147
hf_token: Optional[str] = None,
141148
top_k: Optional[int] = None,
142149
chat_history: Optional[List[Dict[str, str]]] = None,
150+
workspace: Optional[str] = None,
143151
) -> Dict[str, Any]:
144152
"""
145153
Agentic generation: retrieve via tools → reason → generate answer.
@@ -165,7 +173,15 @@ def generate_answer(
165173

166174
# ── Run Agent ────────────────────────────────────
167175
try:
168-
executor, pdf_tool, formatted_history = get_agent_executor(user_id, document_id, document_ids, hf_token, top_k, chat_history)
176+
executor, pdf_tool, formatted_history = get_agent_executor(
177+
user_id=user_id,
178+
document_id=document_id,
179+
document_ids=document_ids,
180+
hf_token=hf_token,
181+
top_k=top_k,
182+
chat_history=chat_history,
183+
workspace=workspace,
184+
)
169185
result = executor.invoke({"input": question, "chat_history": formatted_history})
170186

171187
raw_answer = result.get("output", "")
@@ -214,6 +230,7 @@ def generate_answer_stream(
214230
hf_token: Optional[str] = None,
215231
top_k: Optional[int] = None,
216232
chat_history: Optional[List[Dict[str, str]]] = None,
233+
workspace: Optional[str] = None,
217234
) -> Generator[str, None, None]:
218235
"""
219236
Streaming Agentic pipeline.
@@ -239,7 +256,15 @@ def generate_answer_stream(
239256

240257
# ── Run Agent ────────────────────────────────────
241258
try:
242-
executor, pdf_tool, formatted_history = get_agent_executor(user_id, document_id, document_ids, hf_token, top_k, chat_history)
259+
executor, pdf_tool, formatted_history = get_agent_executor(
260+
user_id=user_id,
261+
document_id=document_id,
262+
document_ids=document_ids,
263+
hf_token=hf_token,
264+
top_k=top_k,
265+
chat_history=chat_history,
266+
workspace=workspace,
267+
)
243268

244269
sources_sent = False
245270

backend/app/rag/bm25.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,13 @@ def query_bm25(
169169

170170
user_dir = get_bm25_dir(user_id)
171171
all_results = []
172-
173-
for path in glob.glob(os.path.join(user_dir, "*.json")):
172+
173+
for path in glob.glob(os.path.join(user_dir, "*.pkl")):
174+
# Filter by document_ids if provided
175+
if document_ids is not None:
176+
doc_id = os.path.basename(path).rsplit(".", 1)[0]
177+
if doc_id not in document_ids:
178+
continue
174179
results = _query_single_index(path, tokenized_query, top_k)
175180
all_results.extend(results)
176181

backend/app/rag/graph_retriever.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,21 @@
1818
settings = get_settings()
1919

2020

21-
def _candidate_graphs(user_id: str, document_id: Optional[str]) -> Iterable[nx.Graph]:
21+
def _candidate_graphs(
22+
user_id: str,
23+
document_id: Optional[str],
24+
document_ids: Optional[List[str]] = None,
25+
) -> Iterable[nx.Graph]:
2226
if document_id:
2327
graph = load_graph(user_id, document_id)
2428
return [graph] if graph is not None else []
29+
elif document_ids:
30+
graphs = []
31+
for doc_id in document_ids:
32+
graph = load_graph(user_id, doc_id)
33+
if graph is not None:
34+
graphs.append(graph)
35+
return graphs
2536

2637
graphs = []
2738
for path in iter_graph_paths(user_id):
@@ -67,12 +78,17 @@ def get_entity_context(
6778
query: str,
6879
user_id: str,
6980
document_id: Optional[str] = None,
81+
document_ids: Optional[List[str]] = None,
7082
) -> str:
7183
"""Return compact graph relationship context relevant to the query."""
7284
relationships: Dict[Tuple[str, str], Dict[str, object]] = {}
7385

7486
try:
75-
graphs = _candidate_graphs(user_id=user_id, document_id=document_id)
87+
graphs = _candidate_graphs(
88+
user_id=user_id,
89+
document_id=document_id,
90+
document_ids=document_ids,
91+
)
7692
for graph in graphs:
7793
matched_nodes = _match_query_nodes(graph, query)
7894

0 commit comments

Comments
 (0)