|
| 1 | +"""Tests for SQLAlchemy v2 compatibility. |
| 2 | +
|
| 3 | +These tests verify that the database layer works correctly with |
| 4 | +SQLAlchemy v2, specifically: |
| 5 | +- The _ResultProxy wrapper provides fetchone()/fetchall() on eagerly |
| 6 | + fetched results. |
| 7 | +- The remote (non-local) query path uses connection-based execution |
| 8 | + instead of the removed engine.execute(). |
| 9 | +- Row objects returned from the remote path support dict-like access |
| 10 | + (dict(row) and row["key"]). |
| 11 | +""" |
| 12 | + |
| 13 | +import pytest |
| 14 | +import sqlalchemy |
| 15 | + |
| 16 | +from policyengine_api.data.data import _ResultProxy, PolicyEngineDatabase |
| 17 | + |
| 18 | + |
| 19 | +class TestSQLAlchemyVersion: |
| 20 | + """Verify that SQLAlchemy v2 is installed.""" |
| 21 | + |
| 22 | + def test_sqlalchemy_version_is_v2(self): |
| 23 | + major = int(sqlalchemy.__version__.split(".")[0]) |
| 24 | + assert ( |
| 25 | + major >= 2 |
| 26 | + ), f"Expected SQLAlchemy v2+, got {sqlalchemy.__version__}" |
| 27 | + |
| 28 | + |
| 29 | +class TestResultProxy: |
| 30 | + """Test the _ResultProxy wrapper that bridges SQLAlchemy v2 |
| 31 | + connection-scoped results with the existing query() API.""" |
| 32 | + |
| 33 | + def test_fetchone_returns_dict_like_rows(self): |
| 34 | + """Rows returned by fetchone() should support dict() and |
| 35 | + key-based access.""" |
| 36 | + engine = sqlalchemy.create_engine("sqlite://") |
| 37 | + with engine.connect() as conn: |
| 38 | + conn.exec_driver_sql( |
| 39 | + "CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)" |
| 40 | + ) |
| 41 | + conn.exec_driver_sql("INSERT INTO test VALUES (1, 'hello')") |
| 42 | + result = conn.exec_driver_sql("SELECT * FROM test") |
| 43 | + proxy = _ResultProxy(result) |
| 44 | + |
| 45 | + row = proxy.fetchone() |
| 46 | + assert row is not None |
| 47 | + assert dict(row) == {"id": 1, "name": "hello"} |
| 48 | + assert row["id"] == 1 |
| 49 | + assert row["name"] == "hello" |
| 50 | + |
| 51 | + def test_fetchone_returns_none_when_exhausted(self): |
| 52 | + engine = sqlalchemy.create_engine("sqlite://") |
| 53 | + with engine.connect() as conn: |
| 54 | + conn.exec_driver_sql("CREATE TABLE test (id INTEGER PRIMARY KEY)") |
| 55 | + result = conn.exec_driver_sql("SELECT * FROM test") |
| 56 | + proxy = _ResultProxy(result) |
| 57 | + |
| 58 | + assert proxy.fetchone() is None |
| 59 | + |
| 60 | + def test_fetchall_returns_all_rows(self): |
| 61 | + engine = sqlalchemy.create_engine("sqlite://") |
| 62 | + with engine.connect() as conn: |
| 63 | + conn.exec_driver_sql( |
| 64 | + "CREATE TABLE test (id INTEGER PRIMARY KEY, val TEXT)" |
| 65 | + ) |
| 66 | + conn.exec_driver_sql("INSERT INTO test VALUES (1, 'a')") |
| 67 | + conn.exec_driver_sql("INSERT INTO test VALUES (2, 'b')") |
| 68 | + conn.exec_driver_sql("INSERT INTO test VALUES (3, 'c')") |
| 69 | + result = conn.exec_driver_sql("SELECT * FROM test") |
| 70 | + proxy = _ResultProxy(result) |
| 71 | + |
| 72 | + rows = proxy.fetchall() |
| 73 | + assert len(rows) == 3 |
| 74 | + assert dict(rows[0]) == {"id": 1, "val": "a"} |
| 75 | + assert dict(rows[2]) == {"id": 3, "val": "c"} |
| 76 | + |
| 77 | + def test_fetchone_then_fetchall_respects_cursor_position(self): |
| 78 | + engine = sqlalchemy.create_engine("sqlite://") |
| 79 | + with engine.connect() as conn: |
| 80 | + conn.exec_driver_sql("CREATE TABLE test (id INTEGER PRIMARY KEY)") |
| 81 | + conn.exec_driver_sql("INSERT INTO test VALUES (1)") |
| 82 | + conn.exec_driver_sql("INSERT INTO test VALUES (2)") |
| 83 | + conn.exec_driver_sql("INSERT INTO test VALUES (3)") |
| 84 | + result = conn.exec_driver_sql("SELECT * FROM test") |
| 85 | + proxy = _ResultProxy(result) |
| 86 | + |
| 87 | + first = proxy.fetchone() |
| 88 | + assert dict(first) == {"id": 1} |
| 89 | + remaining = proxy.fetchall() |
| 90 | + assert len(remaining) == 2 |
| 91 | + assert dict(remaining[0]) == {"id": 2} |
| 92 | + |
| 93 | + def test_result_proxy_for_insert_statement(self): |
| 94 | + """INSERT statements produce no rows; _ResultProxy should |
| 95 | + handle this gracefully.""" |
| 96 | + engine = sqlalchemy.create_engine("sqlite://") |
| 97 | + with engine.connect() as conn: |
| 98 | + conn.exec_driver_sql("CREATE TABLE test (id INTEGER PRIMARY KEY)") |
| 99 | + result = conn.exec_driver_sql("INSERT INTO test VALUES (1)") |
| 100 | + proxy = _ResultProxy(result) |
| 101 | + |
| 102 | + assert proxy.fetchone() is None |
| 103 | + assert proxy.fetchall() == [] |
| 104 | + |
| 105 | + |
| 106 | +class TestRemoteQueryPath: |
| 107 | + """Test the non-local query path that uses SQLAlchemy engine |
| 108 | + with connection-based execution (v2 pattern).""" |
| 109 | + |
| 110 | + def _make_remote_db(self): |
| 111 | + """Create a PolicyEngineDatabase-like object that uses |
| 112 | + a SQLAlchemy engine (the 'remote' path) but backed by |
| 113 | + in-memory SQLite for testing.""" |
| 114 | + db = PolicyEngineDatabase.__new__(PolicyEngineDatabase) |
| 115 | + db.local = False |
| 116 | + db.pool = sqlalchemy.create_engine("sqlite://") |
| 117 | + # Initialize schema using the remote path |
| 118 | + with db.pool.connect() as conn: |
| 119 | + conn.exec_driver_sql( |
| 120 | + "CREATE TABLE test_table " |
| 121 | + "(id INTEGER PRIMARY KEY, name TEXT, value REAL)" |
| 122 | + ) |
| 123 | + conn.commit() |
| 124 | + return db |
| 125 | + |
| 126 | + def test_remote_insert_and_select(self): |
| 127 | + """Test INSERT then SELECT through the remote query path.""" |
| 128 | + db = self._make_remote_db() |
| 129 | + |
| 130 | + # Note: remote path converts ? to %s for MySQL, but SQLite |
| 131 | + # uses ? natively. Since exec_driver_sql passes to the DBAPI |
| 132 | + # driver directly and SQLite's driver uses ?, we need to |
| 133 | + # test with the actual query() method which does the conversion. |
| 134 | + # For SQLite DBAPI, ? is the native marker. |
| 135 | + |
| 136 | + # Use exec_driver_sql directly to bypass ?->%s conversion |
| 137 | + # (which would break SQLite) |
| 138 | + db._execute_remote( |
| 139 | + [ |
| 140 | + "INSERT INTO test_table (id, name, value) VALUES (?, ?, ?)", |
| 141 | + (1, "test", 3.14), |
| 142 | + ] |
| 143 | + ) |
| 144 | + |
| 145 | + result = db._execute_remote( |
| 146 | + ["SELECT * FROM test_table WHERE id = ?", (1,)] |
| 147 | + ) |
| 148 | + row = result.fetchone() |
| 149 | + assert row is not None |
| 150 | + assert row["id"] == 1 |
| 151 | + assert row["name"] == "test" |
| 152 | + assert row["value"] == 3.14 |
| 153 | + assert dict(row) == {"id": 1, "name": "test", "value": 3.14} |
| 154 | + |
| 155 | + def test_remote_select_no_results(self): |
| 156 | + db = self._make_remote_db() |
| 157 | + result = db._execute_remote( |
| 158 | + ["SELECT * FROM test_table WHERE id = ?", (999,)] |
| 159 | + ) |
| 160 | + assert result.fetchone() is None |
| 161 | + |
| 162 | + def test_remote_update(self): |
| 163 | + db = self._make_remote_db() |
| 164 | + db._execute_remote( |
| 165 | + [ |
| 166 | + "INSERT INTO test_table (id, name, value) VALUES (?, ?, ?)", |
| 167 | + (1, "original", 1.0), |
| 168 | + ] |
| 169 | + ) |
| 170 | + db._execute_remote( |
| 171 | + [ |
| 172 | + "UPDATE test_table SET name = ? WHERE id = ?", |
| 173 | + ("updated", 1), |
| 174 | + ] |
| 175 | + ) |
| 176 | + result = db._execute_remote( |
| 177 | + ["SELECT * FROM test_table WHERE id = ?", (1,)] |
| 178 | + ) |
| 179 | + row = result.fetchone() |
| 180 | + assert row["name"] == "updated" |
| 181 | + |
| 182 | + def test_remote_delete(self): |
| 183 | + db = self._make_remote_db() |
| 184 | + db._execute_remote( |
| 185 | + [ |
| 186 | + "INSERT INTO test_table (id, name, value) VALUES (?, ?, ?)", |
| 187 | + (1, "to_delete", 0.0), |
| 188 | + ] |
| 189 | + ) |
| 190 | + db._execute_remote(["DELETE FROM test_table WHERE id = ?", (1,)]) |
| 191 | + result = db._execute_remote( |
| 192 | + ["SELECT * FROM test_table WHERE id = ?", (1,)] |
| 193 | + ) |
| 194 | + assert result.fetchone() is None |
0 commit comments