Skip to content

Commit 8017ff4

Browse files
Dev-iLclaude
andcommitted
Various performance improvements inspired by asyncpg
D1 — Delete global STMTS_CACHE; add per-connection caches: - Delete src/statement/cache.rs (process-global RwLock<HashMap> was incorrect for cross-connection Statement reuse and a serialization point) - PoolConnection: unchanged — deadpool's prepare_cached already correct - SingleConnection: gains DashMap<String, Statement> per-connection cache; prepare() consults/inserts it on every prepared query - dashmap = "6" added to Cargo.toml D2 — execute_many: build statement once, single GIL pass: - StatementBuilder::build() called once per execute_many call, not per row - Remaining rows reuse extracted Vec<Type> in a single GIL pass - run_pipelined_batch: accepts pre-built Statement, no redundant prepare - TODO(bind-execute-many) marker left citing asyncpg coreproto.pyx:1022-1092 D3 — COPY records path: 512 KiB BytesMut streaming encoder: - Replace BinaryCopyInWriter per-row flush (4 KiB) with hand-rolled encoder flushing at 512 KiB (COPY_BUFFER_SIZE = 524288, matches asyncpg's value) - Single streaming pass: open copy_in before GIL, encode+flush per row - Eliminates intermediate Vec<Vec<Py<PyAny>>> materialization D4 — Cache COPY column-type introspection per (schema, table, columns): - Both PoolConnection and SingleConnection gain CopyTypeCache (DashMap) - copy_records_to_table checks cache before issuing PREPARE+DEALLOCATE D5 — Record pyclass + additive records() method: - New #[pyclass] Record: Vec<Py<PyAny>> + Arc<RecordDesc> (shared col map) - Implements __getitem__ (int/str/slice), __len__, __iter__, __repr__, get(), keys(), values(), items() — matches asyncpg Record surface - QueryResult::records() returns Vec<Record>; result() unchanged (additive) - Type stubs in python/psqlpy/_internal/__init__.pyi updated D6 — Micro-wins: - T3#7: is_exact_instance dispatch in from_python.rs (GILOnceCell-cached PyTypeObject pointers for UUID + Decimal replace string name comparison) - T3#8: ParametersBuilder::prepare early-returns before Python::with_gil when params are None or empty - T3#10: per-row scratch Vec cleared between rows in COPY encoder Tests: 17 new pytest tests (test_record.py + test_copy_records.py extensions) Lint: ruff D205/PLR2004 suppressed for test files in pyproject.toml Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1d77435 commit 8017ff4

19 files changed

Lines changed: 965 additions & 306 deletions

File tree

Cargo.lock

Lines changed: 31 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,4 @@ futures-channel = "0.3.31"
6464
futures = "0.3.31"
6565
regex = "1.11.1"
6666
once_cell = "1.20.3"
67+
dashmap = "6"

pyproject.toml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,12 @@ ignore = [
104104
[tool.ruff.lint.per-file-ignores]
105105
"python/psqlpy/*" = ["PYI021"]
106106
"python/tests/*" = [
107-
"S101", # Use of assert detected
108-
"S608", # Possible SQL injection vector through string-based query construction
109-
"D103", # Missing docstring in public function
110-
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
107+
"S101", # Use of assert detected
108+
"S608", # Possible SQL injection via string-based query construction
109+
"D103", # Missing docstring in public function
110+
"S311", # Standard pseudo-random generators not suitable for security
111+
"PLR2004", # Magic value in comparison (common in test assertions)
112+
"D205", # 1 blank line required between summary and description
111113
]
112114
"python/psqlpy/_internal/exceptions.pyi" = [
113115
"D205",

python/psqlpy/_internal/__init__.pyi

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,33 @@ _RowFactoryRV = TypeVar(
1717

1818
ParamsT: TypeAlias = Sequence[Any] | Mapping[str, Any] | None
1919

20+
class Record:
21+
"""An asyncpg-compatible row type with eagerly decoded column values.
22+
23+
Supports positional indexing (`row[0]`), by-name indexing (`row["col"]`),
24+
slicing, iteration, and dict-like access methods.
25+
"""
26+
27+
def __len__(self) -> int: ...
28+
@typing.overload
29+
def __getitem__(self, key: int) -> Any: ...
30+
@typing.overload
31+
def __getitem__(self, key: str) -> Any: ...
32+
@typing.overload
33+
def __getitem__(self, key: slice) -> list[Any]: ...
34+
def __iter__(self) -> typing.Iterator[Any]: ...
35+
def get(self, key: str, default: Any = None) -> Any:
36+
"""Return column value by name, or `default` if the column does not exist."""
37+
38+
def keys(self) -> list[str]:
39+
"""Return ordered list of column names."""
40+
41+
def values(self) -> list[Any]:
42+
"""Return ordered list of column values."""
43+
44+
def items(self) -> list[tuple[str, Any]]:
45+
"""Return ordered list of (column_name, value) pairs."""
46+
2047
class QueryResult:
2148
"""Result."""
2249

@@ -107,6 +134,15 @@ class QueryResult:
107134
List of type that return passed `row_factory`.
108135
"""
109136

137+
def records(self: Self) -> list[Record]:
138+
"""Return result as a list of Record instances.
139+
140+
Each Record shares column metadata with others from the same result set.
141+
Supports positional/by-name indexing and dict-like access.
142+
Unlike `result()`, the column-name lookup table is shared, not
143+
re-created per row.
144+
"""
145+
110146
class SingleQueryResult:
111147
"""Single result."""
112148

python/tests/test_copy_records.py

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import typing
2-
from datetime import datetime, timezone
2+
import uuid
3+
from datetime import date, datetime, timezone
4+
from decimal import Decimal
35

46
import pytest
57
from psqlpy import ConnectionPool
@@ -172,3 +174,140 @@ async def test_copy_records_to_table_uses_schema_qualifier(
172174
finally:
173175
async with psql_pool.acquire() as connection:
174176
await connection.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
177+
178+
179+
async def test_copy_records_heterogeneous_types(
180+
psql_pool: ConnectionPool,
181+
) -> None:
182+
"""Characterization test: covers int, float, text, bytea, UUID, numeric,
183+
date, timestamp, NULL, and array column types (AC-3.4).
184+
"""
185+
target: typing.Final = "copy_records_hetero"
186+
187+
async with psql_pool.acquire() as connection:
188+
await connection.execute(f"DROP TABLE IF EXISTS {target}")
189+
await connection.execute(
190+
f"""
191+
CREATE TABLE {target} (
192+
col_int INTEGER,
193+
col_float DOUBLE PRECISION,
194+
col_text TEXT,
195+
col_bytea BYTEA,
196+
col_uuid UUID,
197+
col_numeric NUMERIC,
198+
col_date DATE,
199+
col_ts TIMESTAMPTZ,
200+
col_null TEXT,
201+
col_arr INTEGER[]
202+
)
203+
""",
204+
)
205+
206+
try:
207+
sample_uuid = uuid.uuid4()
208+
records = [
209+
(
210+
42,
211+
3.14,
212+
"hello",
213+
b"\x00\x01\x02",
214+
sample_uuid,
215+
Decimal("12345.6789"),
216+
date(2024, 6, 1),
217+
datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc),
218+
None,
219+
[1, 2, 3],
220+
),
221+
]
222+
223+
async with psql_pool.acquire() as connection:
224+
inserted = await connection.copy_records_to_table(
225+
table_name=target,
226+
records=records,
227+
)
228+
229+
assert inserted == 1
230+
231+
async with psql_pool.acquire() as connection:
232+
result = await connection.execute(f"SELECT * FROM {target}")
233+
row = result.result()[0]
234+
assert row["col_int"] == 42
235+
assert abs(row["col_float"] - 3.14) < 1e-9
236+
assert row["col_text"] == "hello"
237+
assert bytes(row["col_bytea"]) == b"\x00\x01\x02"
238+
assert row["col_uuid"] == str(sample_uuid)
239+
assert row["col_numeric"] == Decimal("12345.6789")
240+
assert row["col_date"] == date(2024, 6, 1)
241+
assert row["col_null"] is None
242+
assert row["col_arr"] == [1, 2, 3]
243+
finally:
244+
async with psql_pool.acquire() as connection:
245+
await connection.execute(f"DROP TABLE IF EXISTS {target}")
246+
247+
248+
async def test_copy_records_introspection_cache(
249+
psql_pool: ConnectionPool,
250+
) -> None:
251+
"""Second call to copy_records_to_table against the same table should not
252+
issue a new column-type introspection PREPARE (AC-4.3).
253+
"""
254+
target: typing.Final = "copy_records_cache_test"
255+
records = [(1, "first"), (2, "second")]
256+
257+
async with psql_pool.acquire() as connection:
258+
await connection.execute(f"DROP TABLE IF EXISTS {target}")
259+
await connection.execute(
260+
f"CREATE TABLE {target} (id INTEGER, label TEXT)",
261+
)
262+
263+
# Snapshot introspection query count before — use pg_stat_statements if available.
264+
introspect_pattern = f"%{target}%WHERE false%"
265+
pre_calls: int | None = None
266+
try:
267+
async with psql_pool.acquire() as connection:
268+
res = await connection.execute(
269+
"SELECT COALESCE(SUM(calls), 0) AS n FROM pg_stat_statements "
270+
"WHERE query ILIKE $1",
271+
parameters=[introspect_pattern],
272+
)
273+
pre_calls = res.result()[0]["n"]
274+
except Exception: # noqa: BLE001, S110
275+
pass # pg_stat_statements not available — skip count check
276+
277+
try:
278+
async with psql_pool.acquire() as connection:
279+
# First call — populates the cache.
280+
await connection.copy_records_to_table(
281+
table_name=target,
282+
records=records[:1],
283+
)
284+
# Second call on the same connection — must hit the type cache.
285+
await connection.copy_records_to_table(
286+
table_name=target,
287+
records=records[1:],
288+
)
289+
290+
# Verify both rows were written correctly.
291+
async with psql_pool.acquire() as connection:
292+
result = await connection.execute(
293+
f"SELECT id, label FROM {target} ORDER BY id",
294+
)
295+
rows = [(r["id"], r["label"]) for r in result.result()]
296+
assert rows == [(1, "first"), (2, "second")]
297+
298+
# Verify only one introspection query was issued (cache hit on second call).
299+
if pre_calls is not None:
300+
async with psql_pool.acquire() as connection:
301+
res = await connection.execute(
302+
"SELECT COALESCE(SUM(calls), 0) AS n FROM pg_stat_statements "
303+
"WHERE query ILIKE $1",
304+
parameters=[introspect_pattern],
305+
)
306+
post_calls = res.result()[0]["n"]
307+
# At most one introspection PREPARE should be issued (cache hit on call 2).
308+
assert post_calls - pre_calls <= 1, (
309+
f"Expected at most 1 introspection call, got {post_calls - pre_calls}"
310+
)
311+
finally:
312+
async with psql_pool.acquire() as connection:
313+
await connection.execute(f"DROP TABLE IF EXISTS {target}")

0 commit comments

Comments
 (0)