-
Notifications
You must be signed in to change notification settings - Fork 183
Expand file tree
/
Copy pathknowledge.py
More file actions
311 lines (264 loc) · 11.8 KB
/
knowledge.py
File metadata and controls
311 lines (264 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""Knowledge graph models."""
import uuid
from datetime import datetime
from basic_memory.utils import ensure_timezone_aware
from typing import Optional
from sqlalchemy import (
BigInteger,
CheckConstraint,
Integer,
String,
Text,
ForeignKey,
UniqueConstraint,
DateTime,
Index,
JSON,
Float,
text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from basic_memory.models.base import Base
from basic_memory.utils import generate_permalink
class Entity(Base):
"""Core entity in the knowledge graph.
Entities represent semantic nodes maintained by the AI layer. Each entity:
- Has a unique numeric ID (database-generated)
- Maps to a file on disk
- Maintains a checksum for change detection
- Tracks both source file and semantic properties
- Belongs to a specific project
"""
__tablename__ = "entity"
__table_args__ = (
# Regular indexes
Index("ix_note_type", "note_type"),
Index("ix_entity_title", "title"),
Index("ix_entity_external_id", "external_id", unique=True),
Index("ix_entity_created_at", "created_at"), # For timeline queries
Index("ix_entity_updated_at", "updated_at"), # For timeline queries
Index("ix_entity_project_id", "project_id"), # For project filtering
# Project-specific uniqueness constraints
Index(
"uix_entity_permalink_project",
"permalink",
"project_id",
unique=True,
sqlite_where=text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
),
Index(
"uix_entity_file_path_project",
"file_path",
"project_id",
unique=True,
),
)
# Core identity
id: Mapped[int] = mapped_column(Integer, primary_key=True)
# External UUID for API references - stable identifier that won't change
external_id: Mapped[str] = mapped_column(String, unique=True, default=lambda: str(uuid.uuid4()))
title: Mapped[str] = mapped_column(String)
note_type: Mapped[str] = mapped_column(String)
entity_metadata: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
content_type: Mapped[str] = mapped_column(String)
# Project reference
project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), nullable=False)
# Normalized path for URIs - required for markdown files only
permalink: Mapped[Optional[str]] = mapped_column(String, nullable=True, index=True)
# Actual filesystem relative path
file_path: Mapped[str] = mapped_column(String, index=True)
# checksum of file
checksum: Mapped[Optional[str]] = mapped_column(String, nullable=True)
# File metadata for sync
# mtime: file modification timestamp (Unix epoch float) for change detection
mtime: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
# size: file size in bytes for quick change detection
size: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# Metadata and tracking
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now().astimezone()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now().astimezone(),
onupdate=lambda: datetime.now().astimezone(),
)
# Who created this entity (cloud user_profile_id UUID, null for local/CLI usage)
created_by: Mapped[Optional[str]] = mapped_column(String, nullable=True, default=None)
# Who last modified this entity (cloud user_profile_id UUID, null for local/CLI usage)
last_updated_by: Mapped[Optional[str]] = mapped_column(String, nullable=True, default=None)
# Relationships
project = relationship("Project", back_populates="entities")
observations = relationship(
"Observation", back_populates="entity", cascade="all, delete-orphan"
)
outgoing_relations = relationship(
"Relation",
back_populates="from_entity",
foreign_keys="[Relation.from_id]",
cascade="all, delete-orphan",
)
incoming_relations = relationship(
"Relation",
back_populates="to_entity",
foreign_keys="[Relation.to_id]",
cascade="all, delete-orphan",
)
note_content = relationship(
"NoteContent",
back_populates="entity",
cascade="all, delete-orphan",
uselist=False,
)
@property
def relations(self):
"""Get all relations (incoming and outgoing) for this entity."""
return self.incoming_relations + self.outgoing_relations
@property
def is_markdown(self):
"""Check if the entity is a markdown file."""
return self.content_type == "text/markdown"
def __getattribute__(self, name):
"""Override attribute access to ensure datetime fields are timezone-aware."""
value = super().__getattribute__(name)
# Ensure datetime fields are timezone-aware
if name in ("created_at", "updated_at") and isinstance(value, datetime):
return ensure_timezone_aware(value)
return value
def __repr__(self) -> str:
return f"Entity(id={self.id}, external_id='{self.external_id}', name='{self.title}', type='{self.note_type}', checksum='{self.checksum}')"
class NoteContent(Base):
"""Materialized markdown content and sync state for a note entity."""
__tablename__ = "note_content"
__table_args__ = (
CheckConstraint(
"file_write_status IN ("
"'pending', "
"'writing', "
"'synced', "
"'failed', "
"'external_change_detected'"
")",
name="ck_note_content_file_write_status",
),
Index("ix_note_content_project_id", "project_id"),
Index("ix_note_content_file_path", "file_path"),
Index("ix_note_content_external_id", "external_id", unique=True),
)
# Core identity mirrored from entity for hot note reads
entity_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("entity.id", ondelete="CASCADE"),
primary_key=True,
)
project_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("project.id", ondelete="CASCADE"),
nullable=False,
)
external_id: Mapped[str] = mapped_column(String, nullable=False)
file_path: Mapped[str] = mapped_column(String, nullable=False)
# Materialized content version tracked in the tenant database
markdown_content: Mapped[str] = mapped_column(Text, nullable=False)
db_version: Mapped[int] = mapped_column(BigInteger, nullable=False)
db_checksum: Mapped[str] = mapped_column(String, nullable=False)
# File materialization state tracked against the latest write attempts
file_version: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True)
file_checksum: Mapped[Optional[str]] = mapped_column(String, nullable=True)
file_write_status: Mapped[str] = mapped_column(String, nullable=False, default="pending")
last_source: Mapped[Optional[str]] = mapped_column(String, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now().astimezone(),
onupdate=lambda: datetime.now().astimezone(),
)
file_updated_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
last_materialization_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
last_materialization_attempt_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
entity = relationship("Entity", back_populates="note_content")
def __repr__(self) -> str: # pragma: no cover
return (
f"NoteContent(entity_id={self.entity_id}, external_id='{self.external_id}', "
f"file_path='{self.file_path}', file_write_status='{self.file_write_status}')"
)
class Observation(Base):
"""An observation about an entity.
Observations are atomic facts or notes about an entity.
"""
__tablename__ = "observation"
__table_args__ = (
Index("ix_observation_entity_id", "entity_id"), # Add FK index
Index("ix_observation_category", "category"), # Add category index
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), index=True)
entity_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
content: Mapped[str] = mapped_column(Text)
category: Mapped[str] = mapped_column(String, nullable=False, default="note")
context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tags: Mapped[Optional[list[str]]] = mapped_column(
JSON, nullable=True, default=list, server_default="[]"
)
# Relationships
entity = relationship("Entity", back_populates="observations")
@property
def permalink(self) -> str:
"""Create synthetic permalink for the observation.
We can construct these because observations are always defined in
and owned by a single entity.
Content is truncated to 200 chars to stay under PostgreSQL's
btree index limit of 2704 bytes.
"""
# Truncate content to avoid exceeding PostgreSQL's btree index limit
content_for_permalink = self.content[:200] if len(self.content) > 200 else self.content
return generate_permalink(
f"{self.entity.permalink}/observations/{self.category}/{content_for_permalink}"
)
def __repr__(self) -> str: # pragma: no cover
return f"Observation(id={self.id}, entity_id={self.entity_id}, content='{self.content}')"
class Relation(Base):
"""A directed relation between two entities."""
__tablename__ = "relation"
__table_args__ = (
UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation_from_id_to_id"),
UniqueConstraint(
"from_id", "to_name", "relation_type", name="uix_relation_from_id_to_name"
),
Index("ix_relation_type", "relation_type"),
Index("ix_relation_from_id", "from_id"), # Add FK indexes
Index("ix_relation_to_id", "to_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), index=True)
from_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
to_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("entity.id", ondelete="CASCADE"), nullable=True
)
to_name: Mapped[str] = mapped_column(String)
relation_type: Mapped[str] = mapped_column(String)
context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Relationships
from_entity = relationship(
"Entity", foreign_keys=[from_id], back_populates="outgoing_relations"
)
to_entity = relationship("Entity", foreign_keys=[to_id], back_populates="incoming_relations")
@property
def permalink(self) -> str:
"""Create relation permalink showing the semantic connection.
Format: source/relation_type/target
Example: "specs/search/implements/features/search-ui"
"""
# Only create permalinks when both source and target have permalinks
from_permalink = self.from_entity.permalink or self.from_entity.file_path
if self.to_entity:
to_permalink = self.to_entity.permalink or self.to_entity.file_path
return generate_permalink(f"{from_permalink}/{self.relation_type}/{to_permalink}")
return generate_permalink(f"{from_permalink}/{self.relation_type}/{self.to_name}")
def __repr__(self) -> str:
return f"Relation(id={self.id}, from_id={self.from_id}, to_id={self.to_id}, to_name={self.to_name}, type='{self.relation_type}')" # pragma: no cover