Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c8b7bad
fix cursor-stream
sohankshirsagar Jan 13, 2026
1bdc5ca
refactor psycopg instrumentations to use mock utils
sohankshirsagar Jan 13, 2026
132b556
instrument ServerCursor
sohankshirsagar Jan 13, 2026
6e568a1
add psycopg cursor.copy() instrumentation for COPY operations
sohankshirsagar Jan 13, 2026
8feb8a7
fix multiple queries on same cursor
sohankshirsagar Jan 13, 2026
8723755
fix psycopg pipeline mode: defer result capture until sync()
sohankshirsagar Jan 14, 2026
da7329d
fix cursor iteration
sohankshirsagar Jan 14, 2026
8421029
fix executemany with returning=True instrumentation for psycopg
sohankshirsagar Jan 14, 2026
9c4a03e
fix cursor.scroll() support in psycopg replay mode
sohankshirsagar Jan 14, 2026
3f801cf
fix cursor.rownumber property returning null during REPLAY mode
sohankshirsagar Jan 14, 2026
b7c8502
fix psycopg cursor.statusmessage capture and replay
sohankshirsagar Jan 14, 2026
15a32a8
fix nextset() iteration for executemany with returning=True
sohankshirsagar Jan 14, 2026
02a5b45
fix cursor.scroll() position tracking in RECORD mode
sohankshirsagar Jan 14, 2026
acbb31b
refactor
sohankshirsagar Jan 14, 2026
35b3765
resolve cursor reuse hang in RECORD mode
sohankshirsagar Jan 14, 2026
d22ed0a
Fix UUID parameter serialization mismatch in psycopg REPLAY mode
sohankshirsagar Jan 14, 2026
765fd9b
fix bytea serialization to preserve binary data during record/replay
sohankshirsagar Jan 14, 2026
c95337f
Fix kwargs_row row factory handling in psycopg instrumentation
sohankshirsagar Jan 14, 2026
9a407ab
check number of tests replayed in e2e tests
sohankshirsagar Jan 14, 2026
be8de29
Fix scalar_row factory handling in psycopg instrumentation
sohankshirsagar Jan 15, 2026
0a2a1c9
handle replaying uuid properly
sohankshirsagar Jan 15, 2026
f6d4d32
Fix binary format hang by deferring result capture until fetch
sohankshirsagar Jan 15, 2026
8ad42ac
Enable null-values test for psycopg instrumentation
sohankshirsagar Jan 15, 2026
0526956
Clean up null-values test comments
sohankshirsagar Jan 15, 2026
2449d40
Enable transaction context manager test for psycopg instrumentation
sohankshirsagar Jan 15, 2026
a9ab04f
Clean up transaction context manager test comments
sohankshirsagar Jan 15, 2026
7dbf9eb
fix logging OOM issue during e2e tests
sohankshirsagar Jan 15, 2026
5fc28b2
Fix cursor.set_result() not mocked in REPLAY mode for executemany wit…
sohankshirsagar Jan 15, 2026
6903b87
Refactor psycopg instrumentation to reduce code duplication
sohankshirsagar Jan 15, 2026
f04b88b
Fix Decimal and timedelta serialization for consistent RECORD/REPLAY …
sohankshirsagar Jan 15, 2026
0170747
Refactor psycopg instrumentation: extract common helper methods
sohankshirsagar Jan 15, 2026
66453ad
Fix inet/cidr network type serialization for REPLAY mode
sohankshirsagar Jan 15, 2026
3418673
Fix psycopg Range type serialization for REPLAY mode
sohankshirsagar Jan 15, 2026
08f5ce7
fix format + lint issues
sohankshirsagar Jan 15, 2026
e5101ed
fix type errors
sohankshirsagar Jan 15, 2026
70e04b7
format
sohankshirsagar Jan 16, 2026
26f92f7
try catch record mode
sohankshirsagar Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions drift/instrumentation/httpx/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ def _try_get_mock_from_request_sync(
input_value=input_value,
kind=SpanKind.CLIENT,
input_schema_merges=input_schema_merges,
is_pre_app_start=not sdk.app_ready,
)

if not mock_response_output or not mock_response_output.found:
Expand Down
174 changes: 174 additions & 0 deletions drift/instrumentation/psycopg/e2e-tests/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,180 @@ def db_transaction():
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/cursor-stream")
def test_cursor_stream():
"""Test cursor.stream() - generator-based result streaming.

This tests whether the instrumentation captures streaming queries
that return results as a generator.
"""
try:
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
# Stream results row-by-row instead of fetchall
results = []
for row in cur.stream("SELECT id, name, email FROM users ORDER BY id LIMIT 5"):
results.append({"id": row[0], "name": row[1], "email": row[2]})
return jsonify({"count": len(results), "data": results})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/server-cursor")
def test_server_cursor():
"""Test ServerCursor (named cursor) - server-side cursor.

This tests whether the instrumentation captures server-side cursors
which use DECLARE CURSOR on the database server.
"""
try:
with psycopg.connect(get_conn_string()) as conn:
# Named cursor creates a server-side cursor
with conn.cursor(name="test_server_cursor") as cur:
cur.execute("SELECT id, name, email FROM users ORDER BY id LIMIT 5")
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description] if cur.description else ["id", "name", "email"]
results = [dict(zip(columns, row, strict=False)) for row in rows]
return jsonify({"count": len(results), "data": results})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/copy-to")
def test_copy_to():
"""Test cursor.copy() with COPY TO - bulk data export.

This tests whether the instrumentation captures COPY operations.
"""
try:
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
# Use COPY to export data
output = []
with cur.copy("COPY (SELECT id, name, email FROM users ORDER BY id LIMIT 5) TO STDOUT") as copy:
for row in copy:
# Handle both bytes and memoryview
if isinstance(row, memoryview):
row = bytes(row)
output.append(row.decode('utf-8').strip())
return jsonify({"count": len(output), "data": output})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/multiple-queries")
def test_multiple_queries():
"""Test multiple queries in same connection.

This tests whether multiple queries in the same connection
are all captured and replayed correctly.
"""
try:
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
# Query 1
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone()[0]

# Query 2
cur.execute("SELECT MAX(id) FROM users")
max_id = cur.fetchone()[0]

# Query 3
cur.execute("SELECT MIN(id) FROM users")
min_id = cur.fetchone()[0]

return jsonify({"count": count, "max_id": max_id, "min_id": min_id})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/pipeline-mode")
def test_pipeline_mode():
"""Test pipeline mode - batched operations.

Pipeline mode allows sending multiple queries without waiting for results.
This tests whether the instrumentation handles pipeline mode correctly.
"""
try:
with psycopg.connect(get_conn_string()) as conn:
# Enter pipeline mode
with conn.pipeline() as p:
cur1 = conn.execute("SELECT id, name FROM users ORDER BY id LIMIT 3")
cur2 = conn.execute("SELECT COUNT(*) FROM users")
# Sync the pipeline to get results
p.sync()

rows1 = cur1.fetchall()
count = cur2.fetchone()[0]

return jsonify({
"rows": [{"id": r[0], "name": r[1]} for r in rows1],
"count": count
})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/dict-row-factory")
def test_dict_row_factory():
"""Test dict_row row factory.

Tests whether the instrumentation correctly handles dict row factories
which return dictionaries instead of tuples.
"""
try:
from psycopg.rows import dict_row

with psycopg.connect(get_conn_string(), row_factory=dict_row) as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name, email FROM users ORDER BY id LIMIT 3")
rows = cur.fetchall()

return jsonify({
"count": len(rows),
"data": rows # Already dictionaries
})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/namedtuple-row-factory")
def test_namedtuple_row_factory():
"""Test namedtuple_row row factory.

Tests whether the instrumentation correctly handles namedtuple row factories.
"""
try:
from psycopg.rows import namedtuple_row

with psycopg.connect(get_conn_string(), row_factory=namedtuple_row) as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name, email FROM users ORDER BY id LIMIT 3")
rows = cur.fetchall()

# Convert named tuples to dicts for JSON serialization
return jsonify({
"count": len(rows),
"data": [{"id": r.id, "name": r.name, "email": r.email} for r in rows]
})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/cursor-iteration")
def test_cursor_iteration():
"""Test direct cursor iteration (for row in cursor).

Tests whether iterating over cursor directly works correctly.
"""
try:
with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur:
cur.execute("SELECT id, name FROM users ORDER BY id LIMIT 5")

# Iterate directly over cursor
results = []
for row in cur:
results.append({"id": row[0], "name": row[1]})

return jsonify({
"count": len(results),
"data": results
})
except Exception as e:
return jsonify({"error": str(e)}), 500


if __name__ == "__main__":
sdk.mark_app_as_ready()
Expand Down
16 changes: 16 additions & 0 deletions drift/instrumentation/psycopg/e2e-tests/src/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,20 @@ def make_request(method, endpoint, **kwargs):
if user_id:
make_request("DELETE", f"/db/delete/{user_id}")

make_request("GET", "/test/cursor-stream")

make_request("GET", "/test/server-cursor")

make_request("GET", "/test/copy-to")

make_request("GET", "/test/multiple-queries")

make_request("GET", "/test/pipeline-mode")

make_request("GET", "/test/dict-row-factory")

make_request("GET", "/test/namedtuple-row-factory")

make_request("GET", "/test/cursor-iteration")

print("\nAll requests completed successfully")
Loading
Loading