Skip to content

Commit 5d015da

Browse files
authored
Merge pull request #314 from aaronsb/feature/fuse-ontology-lifecycle
feat(fuse): write-back cache, ingest directory, and document deletion
2 parents 4508936 + e4db81b commit 5d015da

17 files changed

Lines changed: 2725 additions & 78 deletions

File tree

Makefile

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
.PHONY: help test test-unit test-api test-program test-verbose test-list lint lint-queries \
77
coverage coverage-verbose coverage-staleness \
88
todos todos-verbose todos-age slopscan \
9-
docs docs-cli docs-mcp docs-site \
9+
build-cli build-fuse install-cli install-fuse test-cli test-fuse \
10+
docs docs-cli docs-mcp docs-fuse docs-site \
1011
publish publish-status \
1112
diagnose status logs rebuild-api rebuild-web rebuild-all \
1213
build-operator push-operator \
@@ -81,16 +82,44 @@ todos-age: ## Include git blame age per marker
8182
slopscan: ## SLOPSCAN 9000: detect agent antipatterns
8283
@python3 $(SCRIPTS)/lint/todo_inventory.py --slop -v
8384

85+
##@ User Tools — Build
86+
87+
build-cli: ## Build CLI + MCP (TypeScript)
88+
@cd cli && npm install && npm run build
89+
90+
build-fuse: ## Build FUSE driver (Python wheel)
91+
@cd fuse && python3 -m build
92+
93+
##@ User Tools — Install
94+
95+
install-cli: build-cli ## Install kg CLI + MCP server to ~/.local
96+
@cd cli && npm install -g --prefix "$(HOME)/.local"
97+
98+
install-fuse: ## Install kg-fuse via pipx
99+
@command -v pipx >/dev/null || { echo "Error: pipx not found. Install with: python3 -m pip install --user pipx"; exit 1; }
100+
@cd fuse && pipx install --force .
101+
102+
##@ User Tools — Test
103+
104+
test-cli: ## Run CLI test suite
105+
@cd cli && npm test
106+
107+
test-fuse: ## Run FUSE driver test suite
108+
@cd fuse && { [ -x .venv/bin/python ] && .venv/bin/python -m pytest tests/ -x -q -o "addopts=" || ~/.local/share/pipx/venvs/kg-fuse/bin/python -m pytest tests/ -x -q -o "addopts=" || python3 -m pytest tests/ -x -q -o "addopts="; }
109+
84110
##@ Documentation
85111

86-
docs: docs-cli docs-mcp ## Generate all reference docs (CLI + MCP)
112+
docs: docs-cli docs-mcp docs-fuse ## Generate all reference docs
87113

88114
docs-cli: ## Generate CLI command reference
89115
@cd cli && npm run docs:cli
90116

91117
docs-mcp: ## Generate MCP server tool reference
92118
@cd cli && npm run docs:mcp
93119

120+
docs-fuse: ## Generate FUSE driver API reference (markdown)
121+
@python3 fuse/scripts/generate-fuse-docs.py
122+
94123
docs-site: ## Build documentation site (MkDocs)
95124
@./site/scripts/docs build
96125

api/app/lib/age_client/base.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,31 @@ def _parse_agtype(self, agtype_value: Any) -> Any:
299299
# If not JSON, return as-is (might be a simple value)
300300
return agtype_value
301301

302+
def refresh_epoch(self):
303+
"""Recompute graph_change_counter after a mutation.
304+
305+
Must be called after any operation that changes graph object counts
306+
(create/delete concepts, sources, edges, ontologies, documents).
307+
The counter is a snapshot sum of all object counts — FUSE and other
308+
clients poll it to detect staleness and invalidate caches.
309+
310+
This is a graph integrity requirement: every mutation path must call
311+
this, regardless of whether the caller is FUSE, CLI, curl, or the
312+
web UI. Without it, caches serve stale data until the next periodic
313+
refresh.
314+
"""
315+
conn = None
316+
try:
317+
conn = self.pool.getconn()
318+
with conn.cursor() as cur:
319+
cur.execute("SELECT refresh_graph_metrics()")
320+
conn.commit()
321+
except Exception as e:
322+
logger.warning(f"Failed to refresh graph epoch: {e}")
323+
finally:
324+
if conn:
325+
self.pool.putconn(conn)
326+
302327
def close(self):
303328
"""Close all database connections."""
304329
if hasattr(self, 'pool'):

api/app/routes/concepts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ async def create_concept(
8181
outcome="success"
8282
)
8383

84+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
85+
age_client.refresh_epoch()
86+
8487
return result
8588
except ValueError as e:
8689
raise HTTPException(
@@ -253,6 +256,10 @@ async def delete_concept(
253256

254257
try:
255258
await service.delete_concept(concept_id=concept_id, cascade=cascade)
259+
260+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
261+
age_client.refresh_epoch()
262+
256263
return None # 204 No Content
257264
except ValueError as e:
258265
if "not found" in str(e).lower():

api/app/routes/documents.py

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ..lib.age_client import AGEClient
2121
from ..lib.garage import get_source_storage, get_image_storage
2222
from ..lib.similarity_calculator import cosine_similarity
23-
from ..dependencies.auth import get_current_active_user
23+
from ..dependencies.auth import get_current_active_user, CurrentUser, require_permission
2424
from ..models.auth import UserInDB
2525

2626
router = APIRouter(prefix="/documents", tags=["documents"])
@@ -107,6 +107,14 @@ class DocumentListResponse(BaseModel):
107107
offset: int
108108

109109

110+
class DocumentDeleteResponse(BaseModel):
111+
"""Response from document deletion."""
112+
document_id: str
113+
deleted: bool
114+
sources_deleted: int
115+
orphaned_concepts_deleted: int
116+
117+
110118
class DocumentConceptItem(BaseModel):
111119
"""Concept extracted from a document."""
112120
concept_id: str
@@ -999,3 +1007,171 @@ async def get_document_concepts_bulk(
9991007
raise HTTPException(status_code=500, detail=f"Failed to get bulk concepts: {str(e)}")
10001008
finally:
10011009
client.close()
1010+
1011+
1012+
# ============================================================================
1013+
# Document Deletion
1014+
# ============================================================================
1015+
1016+
@router.delete("/{document_id}", response_model=DocumentDeleteResponse)
1017+
async def delete_document(
1018+
document_id: str,
1019+
current_user: CurrentUser,
1020+
_: None = Depends(require_permission("sources", "delete")),
1021+
):
1022+
"""Delete a document and cascade-remove orphaned concepts.
1023+
1024+
Deletes all data associated with a single document:
1025+
- Source nodes (chunks) linked via DocumentMeta
1026+
- Instance nodes linked to those sources
1027+
- source_embeddings records
1028+
- Garage storage objects (source documents and images)
1029+
- The DocumentMeta node itself
1030+
- Orphaned Concept nodes (concepts with no remaining sources)
1031+
1032+
Follows the same cascade pattern as DELETE /ontology/{name}
1033+
but scoped to a single document.
1034+
1035+
Authorization: Requires sources:delete permission.
1036+
"""
1037+
from ..services.job_queue import get_job_queue
1038+
1039+
client = AGEClient()
1040+
queue = get_job_queue()
1041+
try:
1042+
# Find the DocumentMeta node and its sources
1043+
doc_meta = client._execute_cypher("""
1044+
MATCH (d:DocumentMeta {document_id: $document_id})
1045+
RETURN d.document_id as document_id,
1046+
d.filename as filename,
1047+
d.ontology as ontology
1048+
""", params={"document_id": document_id}, fetch_one=True)
1049+
1050+
if not doc_meta:
1051+
raise HTTPException(status_code=404, detail=f"Document '{document_id}' not found")
1052+
1053+
ontology_name = doc_meta.get("ontology")
1054+
filename = doc_meta.get("filename", document_id)
1055+
logger.info(f"Deleting document '{filename}' (id={document_id}) from ontology '{ontology_name}'")
1056+
1057+
# Capture source_ids before deletion
1058+
source_ids_result = client._execute_cypher("""
1059+
MATCH (d:DocumentMeta {document_id: $document_id})-[:HAS_SOURCE]->(s:Source)
1060+
RETURN s.source_id as source_id, s.storage_key as storage_key
1061+
""", params={"document_id": document_id})
1062+
1063+
source_ids = [r["source_id"] for r in (source_ids_result or []) if r.get("source_id")]
1064+
storage_keys = [r["storage_key"] for r in (source_ids_result or []) if r.get("storage_key")]
1065+
1066+
# Clean up Garage storage objects
1067+
try:
1068+
# Delete images (via storage_key on Source nodes)
1069+
if storage_keys:
1070+
image_storage = get_image_storage()
1071+
for key in storage_keys:
1072+
try:
1073+
image_storage.delete(key)
1074+
except Exception as e:
1075+
logger.warning(f"Failed to delete Garage image {key}: {e}")
1076+
1077+
# Delete source documents
1078+
try:
1079+
source_storage = get_source_storage()
1080+
for sid in source_ids:
1081+
try:
1082+
source_storage.delete(sid)
1083+
except Exception as e:
1084+
logger.warning(f"Failed to delete source doc {sid} from Garage: {e}")
1085+
except Exception as e:
1086+
logger.warning(f"Failed to initialize source storage: {e}")
1087+
1088+
except Exception as e:
1089+
logger.warning(f"Failed to initialize Garage for cleanup: {e}")
1090+
1091+
# Delete Instance nodes linked to this document's sources
1092+
client._execute_cypher("""
1093+
MATCH (d:DocumentMeta {document_id: $document_id})-[:HAS_SOURCE]->(s:Source)
1094+
MATCH (i:Instance)-[:FROM_SOURCE]->(s)
1095+
DETACH DELETE i
1096+
""", params={"document_id": document_id})
1097+
1098+
# Delete Source nodes
1099+
result = client._execute_cypher("""
1100+
MATCH (d:DocumentMeta {document_id: $document_id})-[:HAS_SOURCE]->(s:Source)
1101+
DETACH DELETE s
1102+
RETURN count(s) as deleted_count
1103+
""", params={"document_id": document_id}, fetch_one=True)
1104+
1105+
sources_deleted = result["deleted_count"] if result else 0
1106+
1107+
# Delete source_embeddings
1108+
if source_ids:
1109+
conn = None
1110+
try:
1111+
conn = client.pool.getconn()
1112+
with conn.cursor() as cur:
1113+
cur.execute("""
1114+
DELETE FROM kg_api.source_embeddings
1115+
WHERE source_id = ANY(%s)
1116+
""", (source_ids,))
1117+
conn.commit()
1118+
except Exception as e:
1119+
logger.warning(f"Failed to delete source embeddings: {e}")
1120+
finally:
1121+
if conn:
1122+
client.pool.putconn(conn)
1123+
1124+
# Delete the DocumentMeta node
1125+
client._execute_cypher("""
1126+
MATCH (d:DocumentMeta {document_id: $document_id})
1127+
DETACH DELETE d
1128+
""", params={"document_id": document_id})
1129+
1130+
# Clean up orphaned concepts (concepts with no remaining sources)
1131+
orphaned_result = client._execute_cypher("""
1132+
MATCH (c:Concept)
1133+
WHERE NOT EXISTS { MATCH (c)-[:APPEARS]->(:Source) }
1134+
DETACH DELETE c
1135+
RETURN count(c) as orphaned_count
1136+
""", fetch_one=True)
1137+
1138+
orphaned_count = orphaned_result["orphaned_count"] if orphaned_result else 0
1139+
1140+
# Delete job records for this document
1141+
try:
1142+
conn = None
1143+
conn = client.pool.getconn()
1144+
with conn.cursor() as cur:
1145+
cur.execute("""
1146+
DELETE FROM kg_api.jobs
1147+
WHERE source_filename = %s AND ontology = %s
1148+
""", (filename, ontology_name))
1149+
conn.commit()
1150+
except Exception as e:
1151+
logger.warning(f"Failed to delete job records: {e}")
1152+
finally:
1153+
if conn:
1154+
client.pool.putconn(conn)
1155+
1156+
logger.info(
1157+
f"Deleted document '{filename}': "
1158+
f"{sources_deleted} sources, {orphaned_count} orphaned concepts"
1159+
)
1160+
1161+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
1162+
client.refresh_epoch()
1163+
1164+
return DocumentDeleteResponse(
1165+
document_id=document_id,
1166+
deleted=True,
1167+
sources_deleted=sources_deleted,
1168+
orphaned_concepts_deleted=orphaned_count,
1169+
)
1170+
1171+
except HTTPException:
1172+
raise
1173+
except Exception as e:
1174+
logger.error(f"Failed to delete document: {e}", exc_info=True)
1175+
raise HTTPException(status_code=500, detail=f"Failed to delete document: {str(e)}")
1176+
finally:
1177+
client.close()

api/app/routes/edges.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ async def create_edge(
8787
outcome="success"
8888
)
8989

90+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
91+
age_client.refresh_epoch()
92+
9093
return result
9194
except ValueError as e:
9295
raise HTTPException(
@@ -234,6 +237,10 @@ async def delete_edge(
234237
to_concept_id=to_concept_id,
235238
relationship_type=relationship_type
236239
)
240+
241+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
242+
age_client.refresh_epoch()
243+
237244
return None # 204 No Content
238245
except ValueError as e:
239246
if "not found" in str(e).lower():

api/app/routes/ontology.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ async def create_ontology(
139139
except Exception as e:
140140
logger.warning(f"Failed to generate embedding for new ontology '{request.name}': {e}")
141141

142+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
143+
client.refresh_epoch()
144+
142145
return OntologyNodeResponse(
143146
ontology_id=node.get('ontology_id', ontology_id),
144147
name=request.name,
@@ -927,16 +930,15 @@ async def delete_ontology(
927930
client = get_age_client()
928931
queue = get_job_queue()
929932
try:
930-
# Check if ontology exists
933+
# Check if ontology exists (Ontology node or Source nodes)
931934
if not force:
932935
check = client._execute_cypher("""
933-
MATCH (s:Source {document: $name})
934-
WITH count(s) as source_count
935-
OPTIONAL MATCH (c:Concept)-[:APPEARS]->(s:Source {document: $name})
936-
RETURN source_count, count(DISTINCT c) as concept_count
936+
OPTIONAL MATCH (o:Ontology {name: $name})
937+
OPTIONAL MATCH (s:Source {document: $name})
938+
RETURN count(o) as ontology_node_count, count(s) as source_count
937939
""", params={"name": ontology_name}, fetch_one=True)
938940

939-
if not check or check['source_count'] == 0:
941+
if not check or (check['ontology_node_count'] == 0 and check['source_count'] == 0):
940942
raise HTTPException(
941943
status_code=404,
942944
detail=f"Ontology '{ontology_name}' not found"
@@ -1071,6 +1073,9 @@ async def delete_ontology(
10711073
if jobs_deleted > 0:
10721074
logger.info(f"Deleted {jobs_deleted} job records for ontology '{ontology_name}'")
10731075

1076+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
1077+
client.refresh_epoch()
1078+
10741079
return OntologyDeleteResponse(
10751080
ontology=ontology_name,
10761081
deleted=True,
@@ -1476,6 +1481,9 @@ async def create_ontology_edge(
14761481
source="manual",
14771482
)
14781483

1484+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
1485+
client.refresh_epoch()
1486+
14791487
return OntologyEdge(
14801488
from_ontology=ontology_name,
14811489
to_ontology=request.to_ontology,
@@ -1532,6 +1540,9 @@ async def delete_ontology_edge(
15321540
detail=f"No {edge_type} edge found from '{ontology_name}' to '{to_ontology}'",
15331541
)
15341542

1543+
# Refresh graph epoch so caches (FUSE, etc.) detect the change
1544+
client.refresh_epoch()
1545+
15351546
return {"deleted": deleted, "from_ontology": ontology_name,
15361547
"to_ontology": to_ontology, "edge_type": edge_type}
15371548

0 commit comments

Comments
 (0)