-
Notifications
You must be signed in to change notification settings - Fork 192
Expand file tree
/
Copy pathenv.py
More file actions
217 lines (174 loc) · 7.76 KB
/
env.py
File metadata and controls
217 lines (174 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""Alembic environment configuration."""
import asyncio
import os
from logging.config import fileConfig
# Allow nested event loops (needed for pytest-asyncio and other async contexts)
# Note: nest_asyncio doesn't work with uvloop or Python 3.14+, so we handle those cases separately
import sys
if sys.version_info < (3, 14):
try:
import nest_asyncio
nest_asyncio.apply()
except (ImportError, ValueError):
# nest_asyncio not available or can't patch this loop type (e.g., uvloop)
pass
# For Python 3.14+, we rely on the thread-based fallback in run_migrations_online()
from sqlalchemy import engine_from_config, pool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from alembic import context
from basic_memory.config import ConfigManager
# Trigger: only set test env when actually running under pytest
# Why: alembic/env.py is imported during normal operations (MCP server startup, migrations)
# but we only want test behavior during actual test runs
# Outcome: prevents is_test_env from returning True in production, enabling watch service
if os.getenv("PYTEST_CURRENT_TEST") is not None:
os.environ["BASIC_MEMORY_ENV"] = "test"
# Import after setting environment variable # noqa: E402
from basic_memory.models import Base # noqa: E402
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Load app config - this will read environment variables (BASIC_MEMORY_DATABASE_BACKEND, etc.)
# due to Pydantic's env_prefix="BASIC_MEMORY_" setting
app_config = ConfigManager().config
# Set the SQLAlchemy URL based on database backend configuration
# If the URL is already set in config (e.g., from run_migrations), use that
# Otherwise, get it from app config
# Note: alembic.ini has a placeholder URL "driver://user:pass@localhost/dbname" that we need to override
current_url = config.get_main_option("sqlalchemy.url")
if not current_url or current_url == "driver://user:pass@localhost/dbname":
from basic_memory.db import DatabaseType
sqlalchemy_url = DatabaseType.get_db_url(
app_config.database_path, DatabaseType.FILESYSTEM, app_config
)
config.set_main_option("sqlalchemy.url", sqlalchemy_url)
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# Add this function to tell Alembic what to include/exclude
def include_object(obj, name, type_, reflected, compare_to):
# Ignore SQLite FTS tables
if type_ == "table" and name.startswith("search_index"):
return False
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
"""Execute migrations with the given connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
render_as_batch=True,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations(connectable):
"""Run migrations asynchronously with AsyncEngine."""
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def _run_async_migrations_with_asyncio_run(connectable) -> None:
"""Run async migrations with asyncio.run while closing failed coroutines.
Trigger: asyncio.run() may reject execution when another event loop is already active.
Why: Python raises before awaiting the coroutine, which otherwise leaks a
RuntimeWarning about an un-awaited coroutine.
Outcome: close the pending coroutine before bubbling the RuntimeError to the
fallback path.
"""
migration_coro = run_async_migrations(connectable)
try:
asyncio.run(migration_coro)
except RuntimeError:
migration_coro.close()
raise
def _run_async_migrations_in_thread(connectable) -> None:
"""Run async migrations in a dedicated thread with its own event loop."""
import concurrent.futures
def run_in_thread():
"""Run async migrations in a new event loop in a separate thread."""
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(run_async_migrations(connectable))
finally:
new_loop.close()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
future.result() # Wait for completion and re-raise any exceptions
def _run_async_engine_migrations(connectable) -> None:
"""Run async-engine migrations with a running-loop fallback."""
try:
_run_async_migrations_with_asyncio_run(connectable)
except RuntimeError as e:
if "cannot be called from a running event loop" in str(e):
# We're in a running event loop (likely uvloop or Python 3.14+ tests).
# Switch to a dedicated thread so Alembic can finish without nesting loops.
_run_async_migrations_in_thread(connectable)
else:
raise
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
Supports both sync engines (SQLite) and async engines (PostgreSQL with asyncpg).
"""
# Check if a connection/engine was provided (e.g., from run_migrations)
connectable = context.config.attributes.get("connection", None)
if connectable is None:
# No connection provided, create engine from config
url = context.config.get_main_option("sqlalchemy.url")
# Check if it's an async URL (sqlite+aiosqlite or postgresql+asyncpg)
if url and ("+asyncpg" in url or "+aiosqlite" in url):
# Create async engine for asyncpg or aiosqlite
connectable = create_async_engine(
url,
poolclass=pool.NullPool,
future=True,
)
else:
# Create sync engine for regular sqlite or postgresql
connectable = engine_from_config(
context.config.get_section(context.config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
# Handle async engines (PostgreSQL with asyncpg)
if isinstance(connectable, AsyncEngine):
# Trigger: async engines need Alembic work to cross the sync/async boundary.
# Why: most callers can use asyncio.run(), but running-loop contexts need a thread fallback.
# Outcome: migrations complete without leaking un-awaited coroutines.
_run_async_engine_migrations(connectable)
else:
# Handle sync engines (SQLite) or sync connections
if hasattr(connectable, "connect"):
# It's an engine, get a connection
with connectable.connect() as connection:
do_run_migrations(connection)
else:
# It's already a connection
do_run_migrations(connectable)
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()