Skip to content

Commit 085c1de

Browse files
refactored psycopg and psycopg2
1 parent 08afc47 commit 085c1de

2 files changed

Lines changed: 353 additions & 329 deletions

File tree

drift/instrumentation/psycopg/instrumentation.py

Lines changed: 171 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@
66
from types import ModuleType
77
from typing import Any
88

9-
from opentelemetry import context as otel_context
109
from opentelemetry import trace
1110
from opentelemetry.trace import SpanKind as OTelSpanKind
12-
from opentelemetry.trace import Status, set_span_in_context
11+
from opentelemetry.trace import Status
1312
from opentelemetry.trace import StatusCode as OTelStatusCode
1413

1514
from ...core.communication.types import MockRequestInput
1615
from ...core.drift_sdk import TuskDrift
1716
from ...core.json_schema_helper import JsonSchemaHelper
17+
from ...core.mode_utils import handle_record_mode, handle_replay_mode
1818
from ...core.tracing import TdSpanAttributes
19+
from ...core.tracing.span_utils import CreateSpanOptions, SpanUtils
1920
from ...core.types import (
2021
CleanSpanData,
2122
Duration,
@@ -325,9 +326,31 @@ def _traced_execute(
325326

326327
query_str = self._query_to_string(query, cursor)
327328

328-
# Create OpenTelemetry span
329-
tracer = sdk.get_tracer()
330-
span = tracer.start_span(
329+
if sdk.mode == TuskDriftMode.REPLAY:
330+
return handle_replay_mode(
331+
replay_mode_handler=lambda: self._replay_execute(cursor, sdk, query_str, params),
332+
no_op_request_handler=lambda: self._noop_execute(cursor),
333+
is_server_request=False,
334+
)
335+
336+
# RECORD mode
337+
return handle_record_mode(
338+
original_function_call=lambda: original_execute(query, params, **kwargs),
339+
record_mode_handler=lambda is_pre_app_start: self._record_execute(
340+
cursor, original_execute, sdk, query, query_str, params, is_pre_app_start, kwargs
341+
),
342+
span_kind=OTelSpanKind.CLIENT,
343+
)
344+
345+
def _noop_execute(self, cursor: Any) -> Any:
346+
"""Handle background requests in REPLAY mode - return cursor with empty mock data."""
347+
cursor._mock_rows = [] # pyright: ignore
348+
cursor._mock_index = 0 # pyright: ignore
349+
return cursor
350+
351+
def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: Any) -> Any:
352+
"""Handle REPLAY mode for execute - fetch mock from CLI."""
353+
span_info = SpanUtils.create_span(CreateSpanOptions(
331354
name="psycopg.query",
332355
kind=OTelSpanKind.CLIENT,
333356
attributes={
@@ -338,80 +361,110 @@ def _traced_execute(
338361
TdSpanAttributes.PACKAGE_TYPE: PackageType.PG.name,
339362
TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready,
340363
},
341-
)
364+
is_pre_app_start=not sdk.app_ready,
365+
))
342366

343-
ctx = otel_context.get_current()
344-
ctx_with_span = set_span_in_context(span, ctx)
345-
token = otel_context.attach(ctx_with_span)
367+
if not span_info:
368+
raise RuntimeError("Error creating span in replay mode")
346369

347-
try:
348-
span_context = span.get_span_context()
349-
trace_id = format(span_context.trace_id, "032x")
350-
span_id = format(span_context.span_id, "016x")
370+
with SpanUtils.with_span(span_info):
371+
mock_result = self._try_get_mock(
372+
sdk, query_str, params,
373+
span_info.trace_id, span_info.span_id, span_info.parent_span_id
374+
)
351375

352-
parent_span = trace.get_current_span(ctx)
353-
parent_span_id = None
354-
if parent_span and parent_span.is_recording():
355-
parent_ctx = parent_span.get_span_context()
356-
parent_span_id = format(parent_ctx.span_id, "016x")
376+
if mock_result is None:
377+
is_pre_app_start = not sdk.app_ready
378+
raise RuntimeError(
379+
f"[Tusk REPLAY] No mock found for psycopg execute query. "
380+
f"This {'pre-app-start ' if is_pre_app_start else ''}query was not recorded during the trace capture. "
381+
f"Query: {query_str[:100]}..."
382+
)
357383

358-
if sdk.mode == TuskDriftMode.REPLAY:
359-
# Handle background requests (app ready + no parent span)
360-
if sdk.app_ready and not parent_span_id:
361-
cursor._mock_rows = [] # pyright: ignore
362-
cursor._mock_index = 0 # pyright: ignore
363-
return cursor
364-
365-
mock_result = self._try_get_mock(sdk, query_str, params, trace_id, span_id, parent_span_id)
366-
367-
if mock_result is None:
368-
is_pre_app_start = not sdk.app_ready
369-
raise RuntimeError(
370-
f"[Tusk REPLAY] No mock found for psycopg execute query. "
371-
f"This {'pre-app-start ' if is_pre_app_start else ''}query was not recorded during the trace capture. "
372-
f"Query: {query_str[:100]}..."
373-
)
384+
self._mock_execute_with_data(cursor, mock_result)
385+
span_info.span.end()
386+
return cursor
374387

375-
self._mock_execute_with_data(cursor, mock_result)
376-
return cursor
388+
def _record_execute(
389+
self,
390+
cursor: Any,
391+
original_execute: Any,
392+
sdk: TuskDrift,
393+
query: str,
394+
query_str: str,
395+
params: Any,
396+
is_pre_app_start: bool,
397+
kwargs: dict,
398+
) -> Any:
399+
"""Handle RECORD mode for execute - create span and execute query."""
400+
span_info = SpanUtils.create_span(CreateSpanOptions(
401+
name="psycopg.query",
402+
kind=OTelSpanKind.CLIENT,
403+
attributes={
404+
TdSpanAttributes.NAME: "psycopg.query",
405+
TdSpanAttributes.PACKAGE_NAME: "psycopg",
406+
TdSpanAttributes.INSTRUMENTATION_NAME: "PsycopgInstrumentation",
407+
TdSpanAttributes.SUBMODULE_NAME: "query",
408+
TdSpanAttributes.PACKAGE_TYPE: PackageType.PG.name,
409+
TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start,
410+
},
411+
is_pre_app_start=is_pre_app_start,
412+
))
413+
414+
if not span_info:
415+
# Fallback to original call if span creation fails
416+
return original_execute(query, params, **kwargs)
377417

378-
# RECORD mode
379-
time.time()
380-
error = None
418+
error = None
419+
result = None
381420

421+
with SpanUtils.with_span(span_info):
382422
try:
383423
result = original_execute(query, params, **kwargs)
384424
return result
385425
except Exception as e:
386426
error = e
387427
raise
388428
finally:
389-
if sdk.mode == TuskDriftMode.RECORD:
390-
self._finalize_query_span(
391-
span,
392-
cursor,
393-
query_str,
394-
params,
395-
error,
396-
)
397-
finally:
398-
otel_context.detach(token)
399-
span.end()
429+
self._finalize_query_span(
430+
span_info.span,
431+
cursor,
432+
query_str,
433+
params,
434+
error,
435+
)
436+
span_info.span.end()
400437

401438
def _traced_executemany(
402439
self, cursor: Any, original_executemany: Any, sdk: TuskDrift, query: str, params_seq, **kwargs
403440
) -> Any:
404441
"""Traced cursor.executemany method."""
405-
# Pass through if SDK is disabled
406442
if sdk.mode == TuskDriftMode.DISABLED:
407443
return original_executemany(query, params_seq, **kwargs)
408444

409-
# Convert query to string if it's a Composed SQL object
410445
query_str = self._query_to_string(query, cursor)
446+
# Convert to list BEFORE executing to avoid iterator exhaustion
447+
params_list = list(params_seq)
448+
449+
if sdk.mode == TuskDriftMode.REPLAY:
450+
return handle_replay_mode(
451+
replay_mode_handler=lambda: self._replay_executemany(cursor, sdk, query_str, params_list),
452+
no_op_request_handler=lambda: self._noop_execute(cursor),
453+
is_server_request=False,
454+
)
455+
456+
# RECORD mode
457+
return handle_record_mode(
458+
original_function_call=lambda: original_executemany(query, params_list, **kwargs),
459+
record_mode_handler=lambda is_pre_app_start: self._record_executemany(
460+
cursor, original_executemany, sdk, query, query_str, params_list, is_pre_app_start, kwargs
461+
),
462+
span_kind=OTelSpanKind.CLIENT,
463+
)
411464

412-
# Create OpenTelemetry span
413-
tracer = sdk.get_tracer()
414-
span = tracer.start_span(
465+
def _replay_executemany(self, cursor: Any, sdk: TuskDrift, query_str: str, params_list: list) -> Any:
466+
"""Handle REPLAY mode for executemany - fetch mock from CLI."""
467+
span_info = SpanUtils.create_span(CreateSpanOptions(
415468
name="psycopg.query",
416469
kind=OTelSpanKind.CLIENT,
417470
attributes={
@@ -422,91 +475,82 @@ def _traced_executemany(
422475
TdSpanAttributes.PACKAGE_TYPE: PackageType.PG.name,
423476
TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready,
424477
},
425-
)
478+
is_pre_app_start=not sdk.app_ready,
479+
))
426480

427-
ctx = otel_context.get_current()
428-
ctx_with_span = set_span_in_context(span, ctx)
429-
token = otel_context.attach(ctx_with_span)
481+
if not span_info:
482+
raise RuntimeError("Error creating span in replay mode")
430483

431-
try:
432-
span_context = span.get_span_context()
433-
trace_id = format(span_context.trace_id, "032x")
434-
span_id = format(span_context.span_id, "016x")
435-
436-
parent_span = trace.get_current_span(ctx)
437-
parent_span_id = None
438-
if parent_span and parent_span.is_recording():
439-
parent_ctx = parent_span.get_span_context()
440-
parent_span_id = format(parent_ctx.span_id, "016x")
441-
442-
# For executemany, we'll treat each parameter set as a batch
443-
# REPLAY mode: Mock ALL queries (including pre-app-start)
444-
if sdk.mode == TuskDriftMode.REPLAY:
445-
# Handle background requests: App is ready + no parent span
446-
# These are background jobs/health checks that run AFTER app startup
447-
# They were never recorded, so return empty result
448-
if sdk.app_ready and not parent_span_id:
449-
logger.debug("Background executemany request (app ready, no parent) - returning empty result")
450-
# Return the cursor
451-
cursor._mock_rows = [] # pyright: ignore
452-
cursor._mock_index = 0 # pyright: ignore
453-
return cursor
454-
455-
# For all other queries (pre-app-start OR within a request trace), get mock
456-
# Convert params_seq to list for serialization
457-
# Wrap in {"_batch": ...} to match the recording format
458-
params_list = list(params_seq)
459-
mock_result = self._try_get_mock(
460-
sdk,
461-
query_str,
462-
{"_batch": params_list},
463-
trace_id,
464-
span_id,
465-
parent_span_id,
484+
with SpanUtils.with_span(span_info):
485+
mock_result = self._try_get_mock(
486+
sdk, query_str, {"_batch": params_list},
487+
span_info.trace_id, span_info.span_id, span_info.parent_span_id
488+
)
489+
490+
if mock_result is None:
491+
is_pre_app_start = not sdk.app_ready
492+
logger.error(
493+
f"No mock found for {'pre-app-start ' if is_pre_app_start else ''}psycopg executemany query in REPLAY mode: {query_str[:100]}"
494+
)
495+
raise RuntimeError(
496+
f"[Tusk REPLAY] No mock found for psycopg executemany query. "
497+
f"This {'pre-app-start ' if is_pre_app_start else ''}query was not recorded during the trace capture. "
498+
f"Query: {query_str[:100]}..."
466499
)
467-
if mock_result is None:
468-
# In REPLAY mode, we MUST have a mock for ALL queries
469-
is_pre_app_start = not sdk.app_ready
470-
logger.error(
471-
f"No mock found for {'pre-app-start ' if is_pre_app_start else ''}psycopg executemany query in REPLAY mode: {query_str[:100]}"
472-
)
473-
raise RuntimeError(
474-
f"[Tusk REPLAY] No mock found for psycopg executemany query. "
475-
f"This {'pre-app-start ' if is_pre_app_start else ''}query was not recorded during the trace capture. "
476-
f"Query: {query_str[:100]}..."
477-
)
478500

479-
# Mock execute by setting cursor internal state
480-
self._mock_execute_with_data(cursor, mock_result)
481-
return cursor # psycopg3 executemany() returns cursor
501+
self._mock_execute_with_data(cursor, mock_result)
502+
span_info.span.end()
503+
return cursor
482504

483-
# RECORD mode: Execute real query and record span
484-
time.time()
485-
error = None
486-
# Convert to list BEFORE executing to avoid iterator exhaustion
487-
params_list = list(params_seq)
505+
def _record_executemany(
506+
self,
507+
cursor: Any,
508+
original_executemany: Any,
509+
sdk: TuskDrift,
510+
query: str,
511+
query_str: str,
512+
params_list: list,
513+
is_pre_app_start: bool,
514+
kwargs: dict,
515+
) -> Any:
516+
"""Handle RECORD mode for executemany - create span and execute query."""
517+
span_info = SpanUtils.create_span(CreateSpanOptions(
518+
name="psycopg.query",
519+
kind=OTelSpanKind.CLIENT,
520+
attributes={
521+
TdSpanAttributes.NAME: "psycopg.query",
522+
TdSpanAttributes.PACKAGE_NAME: "psycopg",
523+
TdSpanAttributes.INSTRUMENTATION_NAME: "PsycopgInstrumentation",
524+
TdSpanAttributes.SUBMODULE_NAME: "query",
525+
TdSpanAttributes.PACKAGE_TYPE: PackageType.PG.name,
526+
TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start,
527+
},
528+
is_pre_app_start=is_pre_app_start,
529+
))
530+
531+
if not span_info:
532+
# Fallback to original call if span creation fails
533+
return original_executemany(query, params_list, **kwargs)
534+
535+
error = None
536+
result = None
488537

538+
with SpanUtils.with_span(span_info):
489539
try:
490540
result = original_executemany(query, params_list, **kwargs)
491541
return result
492542
except Exception as e:
493543
error = e
494544
raise
495545
finally:
496-
# Always create span in RECORD mode (including pre-app-start queries)
497-
# Pre-app-start queries are marked with is_pre_app_start=true flag
498-
if sdk.mode == TuskDriftMode.RECORD:
499-
self._finalize_query_span(
500-
span,
501-
cursor,
502-
query_str,
503-
{"_batch": params_list},
504-
error,
505-
)
506-
finally:
507-
# Reset only span context (trace context is owned by parent)
508-
otel_context.detach(token)
509-
span.end()
546+
self._finalize_query_span(
547+
span_info.span,
548+
cursor,
549+
query_str,
550+
{"_batch": params_list},
551+
error,
552+
)
553+
span_info.span.end()
510554

511555
def _query_to_string(self, query: Any, cursor: Any) -> str:
512556
"""Convert query to string."""

0 commit comments

Comments
 (0)