Skip to content

Commit 35b3765

Browse files
resolve cursor reuse hang in RECORD mode
1 parent acbb31b commit 35b3765

3 files changed

Lines changed: 151 additions & 17 deletions

File tree

drift/instrumentation/psycopg/e2e-tests/src/app.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,133 @@ def test_server_cursor_scroll():
506506
except Exception as e:
507507
return jsonify({"error": str(e)}), 500
508508

509+
@app.route("/test/cursor-reuse")
510+
def test_cursor_reuse():
511+
"""Test reusing cursor for multiple queries.
512+
513+
Tests whether the instrumentation correctly handles reusing a cursor for multiple execute() calls.
514+
"""
515+
try:
516+
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
517+
# First query
518+
cur.execute("SELECT id, name FROM users WHERE id = 1")
519+
row1 = cur.fetchone()
520+
521+
# Second query on same cursor
522+
cur.execute("SELECT id, name FROM users WHERE id = 2")
523+
row2 = cur.fetchone()
524+
525+
# Third query
526+
cur.execute("SELECT COUNT(*) FROM users")
527+
count = cur.fetchone()[0]
528+
529+
return jsonify({
530+
"row1": {"id": row1[0], "name": row1[1]} if row1 else None,
531+
"row2": {"id": row2[0], "name": row2[1]} if row2 else None,
532+
"count": count
533+
})
534+
except Exception as e:
535+
return jsonify({"error": str(e)}), 500
536+
537+
@app.route("/test/sql-composed")
538+
def test_sql_composed():
539+
"""Test psycopg.sql.SQL() composed queries."""
540+
try:
541+
from psycopg import sql
542+
543+
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
544+
table = sql.Identifier("users")
545+
columns = sql.SQL(", ").join([
546+
sql.Identifier("id"),
547+
sql.Identifier("name"),
548+
sql.Identifier("email")
549+
])
550+
551+
query = sql.SQL("SELECT {} FROM {} ORDER BY id LIMIT 3").format(columns, table)
552+
cur.execute(query)
553+
rows = cur.fetchall()
554+
555+
return jsonify({
556+
"count": len(rows),
557+
"data": [{"id": r[0], "name": r[1], "email": r[2]} for r in rows]
558+
})
559+
except Exception as e:
560+
return jsonify({"error": str(e)}), 500
561+
562+
563+
564+
565+
# ===== BUG HUNTING TEST ENDPOINTS =====
566+
# These endpoints expose confirmed bugs in the psycopg instrumentation
567+
# Endpoints that passed tests have been removed
568+
569+
@app.route("/test/binary-uuid")
570+
def test_binary_uuid():
571+
"""Test binary UUID data type.
572+
573+
BUG HYPOTHESIS: UUID types may not serialize/deserialize correctly
574+
during RECORD/REPLAY because they are binary.
575+
"""
576+
try:
577+
import uuid
578+
579+
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
580+
# Create a temp table with UUID column
581+
cur.execute("CREATE TEMP TABLE uuid_test (id UUID PRIMARY KEY, name TEXT)")
582+
583+
# Insert a UUID
584+
test_uuid = uuid.uuid4()
585+
cur.execute(
586+
"INSERT INTO uuid_test (id, name) VALUES (%s, %s) RETURNING id, name",
587+
(test_uuid, "UUID Test")
588+
)
589+
inserted = cur.fetchone()
590+
591+
# Query it back
592+
cur.execute("SELECT id, name FROM uuid_test WHERE id = %s", (test_uuid,))
593+
queried = cur.fetchone()
594+
595+
conn.commit()
596+
597+
return jsonify({
598+
"inserted_uuid": str(inserted[0]) if inserted else None,
599+
"queried_uuid": str(queried[0]) if queried else None,
600+
"match": str(inserted[0]) == str(queried[0]) if inserted and queried else False
601+
})
602+
except Exception as e:
603+
return jsonify({"error": str(e)}), 500
604+
605+
606+
@app.route("/test/binary-bytea")
607+
def test_binary_bytea():
608+
"""Test binary bytea data type.
609+
610+
BUG HYPOTHESIS: Binary data (bytea) may not serialize/deserialize
611+
correctly during RECORD/REPLAY.
612+
"""
613+
try:
614+
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
615+
# Create a temp table with bytea column
616+
cur.execute("CREATE TEMP TABLE bytea_test (id SERIAL, data BYTEA)")
617+
618+
# Insert binary data
619+
test_data = b'\x00\x01\x02\x03\xff\xfe\xfd'
620+
cur.execute(
621+
"INSERT INTO bytea_test (data) VALUES (%s) RETURNING id, data",
622+
(test_data,)
623+
)
624+
inserted = cur.fetchone()
625+
626+
conn.commit()
627+
628+
# Convert bytes to hex for JSON serialization
629+
return jsonify({
630+
"inserted_id": inserted[0] if inserted else None,
631+
"data_hex": inserted[1].hex() if inserted and inserted[1] else None,
632+
"data_length": len(inserted[1]) if inserted and inserted[1] else 0
633+
})
634+
except Exception as e:
635+
return jsonify({"error": str(e)}), 500
509636

510637
if __name__ == "__main__":
511638
sdk.mark_app_as_ready()

drift/instrumentation/psycopg/e2e-tests/src/test_requests.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,18 @@ def make_request(method, endpoint, **kwargs):
9292

9393
make_request("GET", "/test/cursor-scroll")
9494

95+
make_request("GET", "/test/cursor-reuse")
96+
97+
make_request("GET", "/test/sql-composed")
98+
99+
# ===== BUG HUNTING TEST ENDPOINTS =====
100+
# These tests expose confirmed bugs in the psycopg instrumentation
101+
# See BUG_TRACKING.md for detailed information about each bug
102+
print("\n--- Bug Hunting Tests (REPLAY mode bugs - pass RECORD but fail REPLAY) ---\n")
103+
104+
# Bug 8: UUID parameter serialization issue during REPLAY
105+
make_request("GET", "/test/binary-uuid")
106+
# Bug 9: bytea data deserialization returns string instead of bytes
107+
make_request("GET", "/test/binary-bytea")
108+
95109
print("\nAll requests completed successfully")

drift/instrumentation/psycopg/instrumentation.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -418,17 +418,16 @@ def _record_execute(
418418
) -> Any:
419419
"""Handle RECORD mode for execute - create span and execute query."""
420420
# Reset cursor state from any previous execute() on this cursor.
421-
# This ensures fetch methods work correctly for multiple queries on the same cursor.
422-
# We must restore original fetch methods so _finalize_query_span can call the real
423-
# psycopg fetchall() method, not our patched version from a previous query.
424-
if hasattr(cursor, '_tusk_original_fetchone'):
425-
cursor.fetchone = cursor._tusk_original_fetchone
426-
cursor.fetchmany = cursor._tusk_original_fetchmany
427-
cursor.fetchall = cursor._tusk_original_fetchall
421+
# Delete instance attribute overrides to expose original class methods.
422+
# This is safer than saving/restoring bound methods which can become stale.
423+
if hasattr(cursor, '_tusk_patched'):
424+
# Remove patched instance attributes to expose class methods
425+
for attr in ('fetchone', 'fetchmany', 'fetchall', 'scroll'):
426+
if attr in cursor.__dict__:
427+
delattr(cursor, attr)
428428
cursor._tusk_rows = None
429429
cursor._tusk_index = 0
430-
if hasattr(cursor, '_tusk_original_scroll'):
431-
cursor.scroll = cursor._tusk_original_scroll
430+
del cursor._tusk_patched
432431

433432
span_info = SpanUtils.create_span(
434433
CreateSpanOptions(
@@ -1635,18 +1634,12 @@ def patched_scroll(value: int, mode: str = "relative") -> None:
16351634

16361635
cursor._tusk_index = newpos # pyright: ignore[reportAttributeAccessIssue]
16371636

1638-
# Save original fetch methods before patching (only if not already saved)
1639-
# These will be restored at the start of the next execute() call
1640-
if not hasattr(cursor, '_tusk_original_fetchone'):
1641-
cursor._tusk_original_fetchone = cursor.fetchone # pyright: ignore[reportAttributeAccessIssue]
1642-
cursor._tusk_original_fetchmany = cursor.fetchmany # pyright: ignore[reportAttributeAccessIssue]
1643-
cursor._tusk_original_fetchall = cursor.fetchall # pyright: ignore[reportAttributeAccessIssue]
1644-
cursor._tusk_original_scroll = cursor.scroll # pyright: ignore[reportAttributeAccessIssue]
1645-
1637+
# Patch fetch methods with our versions that return stored rows
16461638
cursor.fetchone = patched_fetchone # pyright: ignore[reportAttributeAccessIssue]
16471639
cursor.fetchmany = patched_fetchmany # pyright: ignore[reportAttributeAccessIssue]
16481640
cursor.fetchall = patched_fetchall # pyright: ignore[reportAttributeAccessIssue]
16491641
cursor.scroll = patched_scroll # pyright: ignore[reportAttributeAccessIssue]
1642+
cursor._tusk_patched = True # pyright: ignore[reportAttributeAccessIssue]
16501643

16511644
except Exception as fetch_error:
16521645
logger.debug(f"Could not fetch rows (query might not return rows): {fetch_error}")

0 commit comments

Comments
 (0)