Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions src/basic_memory/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@


# Add this function to tell Alembic what to include/exclude
def include_object(object, name, type_, reflected, compare_to):
def include_object(obj, name, type_, reflected, compare_to):
# Ignore SQLite FTS tables
if type_ == "table" and name.startswith("search_index"):
return False
Expand Down Expand Up @@ -118,6 +118,54 @@ async def run_async_migrations(connectable):
await connectable.dispose()


def _run_async_migrations_with_asyncio_run(connectable) -> None:
"""Run async migrations with asyncio.run while closing failed coroutines.

Trigger: asyncio.run() may reject execution when another event loop is already active.
Why: Python raises before awaiting the coroutine, which otherwise leaks a
RuntimeWarning about an un-awaited coroutine.
Outcome: close the pending coroutine before bubbling the RuntimeError to the
fallback path.
"""
migration_coro = run_async_migrations(connectable)
try:
asyncio.run(migration_coro)
except RuntimeError:
migration_coro.close()
raise


def _run_async_migrations_in_thread(connectable) -> None:
"""Run async migrations in a dedicated thread with its own event loop."""
import concurrent.futures

def run_in_thread():
"""Run async migrations in a new event loop in a separate thread."""
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(run_async_migrations(connectable))
finally:
new_loop.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
future.result() # Wait for completion and re-raise any exceptions


def _run_async_engine_migrations(connectable) -> None:
"""Run async-engine migrations with a running-loop fallback."""
try:
_run_async_migrations_with_asyncio_run(connectable)
except RuntimeError as e:
if "cannot be called from a running event loop" in str(e):
# We're in a running event loop (likely uvloop or Python 3.14+ tests).
# Switch to a dedicated thread so Alembic can finish without nesting loops.
_run_async_migrations_in_thread(connectable)
else:
raise


def run_migrations_online() -> None:
"""Run migrations in 'online' mode.

Expand Down Expand Up @@ -148,30 +196,10 @@ def run_migrations_online() -> None:

# Handle async engines (PostgreSQL with asyncpg)
if isinstance(connectable, AsyncEngine):
# Try to run async migrations
# nest_asyncio allows asyncio.run() from within event loops, but doesn't work with uvloop
try:
asyncio.run(run_async_migrations(connectable))
except RuntimeError as e:
if "cannot be called from a running event loop" in str(e):
# We're in a running event loop (likely uvloop) - need to use a different approach
# Create a new thread to run the async migrations
import concurrent.futures

def run_in_thread():
"""Run async migrations in a new event loop in a separate thread."""
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(run_async_migrations(connectable))
finally:
new_loop.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
future.result() # Wait for completion and re-raise any exceptions
else:
raise
# Trigger: async engines need Alembic work to cross the sync/async boundary.
# Why: most callers can use asyncio.run(), but running-loop contexts need a thread fallback.
# Outcome: migrations complete without leaking un-awaited coroutines.
_run_async_engine_migrations(connectable)
else:
# Handle sync engines (SQLite) or sync connections
if hasattr(connectable, "connect"):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Add note_content table

Revision ID: l5g6h7i8j9k0
Revises: k4e5f6g7h8i9
Create Date: 2026-04-04 12:00:00.000000

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision: str = "l5g6h7i8j9k0"
down_revision: Union[str, None] = "k4e5f6g7h8i9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Create note_content for materialized note content and sync state."""
op.create_table(
"note_content",
sa.Column("entity_id", sa.Integer(), nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("external_id", sa.String(), nullable=False),
sa.Column("file_path", sa.String(), nullable=False),
sa.Column("markdown_content", sa.Text(), nullable=False),
sa.Column("db_version", sa.BigInteger(), nullable=False),
sa.Column("db_checksum", sa.String(), nullable=False),
sa.Column("file_version", sa.BigInteger(), nullable=True),
sa.Column("file_checksum", sa.String(), nullable=True),
sa.Column("file_write_status", sa.String(), nullable=False),
sa.Column("last_source", sa.String(), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("file_updated_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_materialization_error", sa.Text(), nullable=True),
sa.Column("last_materialization_attempt_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint(
"file_write_status IN ("
"'pending', "
"'writing', "
"'synced', "
"'failed', "
"'external_change_detected'"
")",
name="ck_note_content_file_write_status",
),
sa.ForeignKeyConstraint(["entity_id"], ["entity.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("entity_id"),
)
op.create_index("ix_note_content_project_id", "note_content", ["project_id"], unique=False)
op.create_index("ix_note_content_file_path", "note_content", ["file_path"], unique=False)
op.create_index("ix_note_content_external_id", "note_content", ["external_id"], unique=True)


def downgrade() -> None:
"""Drop note_content and its supporting indexes."""
op.drop_index("ix_note_content_external_id", table_name="note_content")
op.drop_index("ix_note_content_file_path", table_name="note_content")
op.drop_index("ix_note_content_project_id", table_name="note_content")
op.drop_table("note_content")
3 changes: 2 additions & 1 deletion src/basic_memory/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import basic_memory
from basic_memory.models.base import Base
from basic_memory.models.knowledge import Entity, Observation, Relation
from basic_memory.models.knowledge import Entity, NoteContent, Observation, Relation
from basic_memory.models.project import Project

__all__ = [
"Base",
"Entity",
"NoteContent",
"Observation",
"Relation",
"Project",
Expand Down
76 changes: 76 additions & 0 deletions src/basic_memory/models/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import Optional

from sqlalchemy import (
BigInteger,
CheckConstraint,
Integer,
String,
Text,
Expand Down Expand Up @@ -116,6 +118,12 @@ class Entity(Base):
foreign_keys="[Relation.to_id]",
cascade="all, delete-orphan",
)
note_content = relationship(
"NoteContent",
back_populates="entity",
cascade="all, delete-orphan",
uselist=False,
)

@property
def relations(self):
Expand All @@ -141,6 +149,74 @@ def __repr__(self) -> str:
return f"Entity(id={self.id}, external_id='{self.external_id}', name='{self.title}', type='{self.note_type}', checksum='{self.checksum}')"


class NoteContent(Base):
"""Materialized markdown content and sync state for a note entity."""

__tablename__ = "note_content"
__table_args__ = (
CheckConstraint(
"file_write_status IN ("
"'pending', "
"'writing', "
"'synced', "
"'failed', "
"'external_change_detected'"
")",
name="ck_note_content_file_write_status",
),
Index("ix_note_content_project_id", "project_id"),
Index("ix_note_content_file_path", "file_path"),
Index("ix_note_content_external_id", "external_id", unique=True),
)

# Core identity mirrored from entity for hot note reads
entity_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("entity.id", ondelete="CASCADE"),
primary_key=True,
)
project_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("project.id", ondelete="CASCADE"),
nullable=False,
)
external_id: Mapped[str] = mapped_column(String, nullable=False)
file_path: Mapped[str] = mapped_column(String, nullable=False)

# Materialized content version tracked in the tenant database
markdown_content: Mapped[str] = mapped_column(Text, nullable=False)
db_version: Mapped[int] = mapped_column(BigInteger, nullable=False)
db_checksum: Mapped[str] = mapped_column(String, nullable=False)

# File materialization state tracked against the latest write attempts
file_version: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True)
file_checksum: Mapped[Optional[str]] = mapped_column(String, nullable=True)
file_write_status: Mapped[str] = mapped_column(String, nullable=False, default="pending")
last_source: Mapped[Optional[str]] = mapped_column(String, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now().astimezone(),
onupdate=lambda: datetime.now().astimezone(),
)
file_updated_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
last_materialization_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
last_materialization_attempt_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)

entity = relationship("Entity", back_populates="note_content")

def __repr__(self) -> str: # pragma: no cover
return (
f"NoteContent(entity_id={self.entity_id}, external_id='{self.external_id}', "
f"file_path='{self.file_path}', file_write_status='{self.file_write_status}')"
)


class Observation(Base):
"""An observation about an entity.

Expand Down
2 changes: 2 additions & 0 deletions src/basic_memory/repository/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .entity_repository import EntityRepository
from .note_content_repository import NoteContentRepository
from .observation_repository import ObservationRepository
from .project_repository import ProjectRepository
from .relation_repository import RelationRepository

__all__ = [
"EntityRepository",
"NoteContentRepository",
"ObservationRepository",
"ProjectRepository",
"RelationRepository",
Expand Down
Loading
Loading