Skip to content

Commit 0ebd42d

Browse files
patch async pipeline execute
1 parent 13ec035 commit 0ebd42d

1 file changed

Lines changed: 104 additions & 0 deletions

File tree

drift/instrumentation/redis/instrumentation.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,33 @@ async def patched_async_execute_command(redis_self, *args, **kwargs):
137137

138138
async_redis_class.execute_command = patched_async_execute_command
139139
logger.debug("redis.asyncio.Redis.execute_command instrumented")
140+
141+
# Patch async Pipeline.execute
142+
try:
143+
from redis.asyncio.client import Pipeline as AsyncPipeline
144+
145+
if hasattr(AsyncPipeline, "execute"):
146+
original_async_pipeline_execute = AsyncPipeline.execute
147+
148+
async def patched_async_pipeline_execute(pipeline_self, *args, **kwargs):
149+
"""Patched async Pipeline.execute method."""
150+
sdk = TuskDrift.get_instance()
151+
152+
if sdk.mode == TuskDriftMode.DISABLED:
153+
return await original_async_pipeline_execute(pipeline_self, *args, **kwargs)
154+
155+
return await instrumentation._traced_async_pipeline_execute(
156+
pipeline_self,
157+
original_async_pipeline_execute,
158+
sdk,
159+
args,
160+
kwargs,
161+
)
162+
163+
AsyncPipeline.execute = patched_async_pipeline_execute
164+
logger.debug("redis.asyncio.client.Pipeline.execute instrumented")
165+
except ImportError:
166+
logger.debug("redis.asyncio.client.Pipeline not available")
140167
except ImportError:
141168
logger.debug("redis.asyncio not available")
142169

@@ -372,6 +399,83 @@ def original_call():
372399
span_kind=OTelSpanKind.CLIENT,
373400
)
374401

402+
async def _traced_async_pipeline_execute(
403+
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
404+
) -> Any:
405+
"""Traced async Pipeline.execute method."""
406+
if sdk.mode == TuskDriftMode.DISABLED:
407+
return await original_execute(pipeline, *args, **kwargs)
408+
409+
# Get commands from pipeline
410+
command_stack = self._get_pipeline_commands(pipeline)
411+
command_str = self._format_pipeline_commands(command_stack)
412+
413+
if sdk.mode == TuskDriftMode.REPLAY:
414+
return handle_replay_mode(
415+
replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack),
416+
no_op_request_handler=lambda: [], # Empty list for pipeline
417+
is_server_request=False,
418+
)
419+
420+
# RECORD mode with async execution
421+
return await self._record_async_pipeline_execute(
422+
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack
423+
)
424+
425+
async def _record_async_pipeline_execute(
426+
self,
427+
pipeline: Any,
428+
original_execute: Any,
429+
sdk: TuskDrift,
430+
args: tuple,
431+
kwargs: dict,
432+
command_str: str,
433+
command_stack: list,
434+
) -> Any:
435+
"""Handle async RECORD mode for pipeline execute."""
436+
is_pre_app_start = not sdk.app_ready
437+
span_name = "redis.pipeline"
438+
439+
# Create span using SpanUtils
440+
span_info = SpanUtils.create_span(
441+
CreateSpanOptions(
442+
name=span_name,
443+
kind=OTelSpanKind.CLIENT,
444+
attributes={
445+
TdSpanAttributes.NAME: span_name,
446+
TdSpanAttributes.PACKAGE_NAME: "redis",
447+
TdSpanAttributes.INSTRUMENTATION_NAME: "RedisInstrumentation",
448+
TdSpanAttributes.SUBMODULE_NAME: "pipeline",
449+
TdSpanAttributes.PACKAGE_TYPE: PackageType.REDIS.name,
450+
TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start,
451+
},
452+
is_pre_app_start=is_pre_app_start,
453+
)
454+
)
455+
456+
if not span_info:
457+
# Fallback to original call if span creation fails
458+
return await original_execute(pipeline, *args, **kwargs)
459+
460+
error = None
461+
result = None
462+
463+
with SpanUtils.with_span(span_info):
464+
try:
465+
result = await original_execute(pipeline, *args, **kwargs)
466+
return result
467+
except Exception as e:
468+
error = e
469+
raise
470+
finally:
471+
self._finalize_pipeline_span(
472+
span_info.span,
473+
command_str,
474+
command_stack,
475+
result if error is None else None,
476+
error,
477+
)
478+
375479
def _replay_pipeline_execute(self, sdk: TuskDrift, command_str: str, command_stack: list) -> Any:
376480
"""Handle REPLAY mode for pipeline execute."""
377481
span_name = "redis.pipeline"

0 commit comments

Comments
 (0)