Skip to content

Commit a5d280b

Browse files
patch immediate_execute_command
1 parent 0f0d37c commit a5d280b

3 files changed

Lines changed: 136 additions & 0 deletions

File tree

drift/instrumentation/redis/e2e-tests/src/app.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,43 @@ def test_binary_data():
207207
except Exception as e:
208208
return jsonify({"error": str(e)}), 500
209209

210+
@app.route("/test/transaction-watch", methods=["GET"])
211+
def test_transaction_watch():
212+
"""Test transaction with WATCH pattern.
213+
214+
This tests whether WATCH/MULTI/EXEC transaction pattern works correctly.
215+
"""
216+
try:
217+
# Set initial value
218+
redis_client.set("test:watch:counter", "10")
219+
220+
# Start a watched transaction
221+
pipe = redis_client.pipeline(transaction=True)
222+
pipe.watch("test:watch:counter")
223+
224+
# Get current value (this happens outside the transaction)
225+
current = int(redis_client.get("test:watch:counter"))
226+
227+
# Start the transaction
228+
pipe.multi()
229+
pipe.set("test:watch:counter", str(current + 5))
230+
pipe.get("test:watch:counter")
231+
232+
# Execute
233+
results = pipe.execute()
234+
235+
# Clean up
236+
redis_client.delete("test:watch:counter")
237+
238+
return jsonify({
239+
"success": True,
240+
"initial_value": 10,
241+
"expected_final": 15,
242+
"results": results
243+
})
244+
except Exception as e:
245+
return jsonify({"error": str(e)}), 500
246+
210247

211248
if __name__ == "__main__":
212249
sdk.mark_app_as_ready()

drift/instrumentation/redis/e2e-tests/src/test_requests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ def make_request(method, endpoint, **kwargs):
6161
# Binary data handling
6262
make_request("GET", "/test/binary-data")
6363

64+
make_request("GET", "/test/transaction-watch")
65+
6466
print("\nAll requests completed successfully")

drift/instrumentation/redis/instrumentation.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,29 @@ def patched_pipeline_execute(pipeline_self, *args, **kwargs):
108108

109109
Pipeline.execute = patched_pipeline_execute
110110
logger.debug("redis.client.Pipeline.execute instrumented")
111+
112+
# Patch Pipeline.immediate_execute_command for WATCH and other immediate commands
113+
if hasattr(Pipeline, "immediate_execute_command"):
114+
original_immediate = Pipeline.immediate_execute_command
115+
self._original_pipeline_immediate_execute = original_immediate
116+
117+
def patched_pipeline_immediate_execute(pipeline_self, *args, **kwargs):
118+
"""Patched Pipeline.immediate_execute_command method."""
119+
sdk = TuskDrift.get_instance()
120+
121+
if sdk.mode == TuskDriftMode.DISABLED:
122+
return original_immediate(pipeline_self, *args, **kwargs)
123+
124+
return instrumentation._traced_pipeline_immediate_execute(
125+
pipeline_self,
126+
original_immediate,
127+
sdk,
128+
args,
129+
kwargs,
130+
)
131+
132+
Pipeline.immediate_execute_command = patched_pipeline_immediate_execute
133+
logger.debug("redis.client.Pipeline.immediate_execute_command instrumented")
111134
except ImportError:
112135
logger.debug("redis.client.Pipeline not available")
113136

@@ -162,6 +185,28 @@ async def patched_async_pipeline_execute(pipeline_self, *args, **kwargs):
162185

163186
AsyncPipeline.execute = patched_async_pipeline_execute
164187
logger.debug("redis.asyncio.client.Pipeline.execute instrumented")
188+
189+
# Patch async Pipeline.immediate_execute_command for WATCH and other immediate commands
190+
if hasattr(AsyncPipeline, "immediate_execute_command"):
191+
original_async_immediate = AsyncPipeline.immediate_execute_command
192+
193+
async def patched_async_pipeline_immediate_execute(pipeline_self, *args, **kwargs):
194+
"""Patched async Pipeline.immediate_execute_command method."""
195+
sdk = TuskDrift.get_instance()
196+
197+
if sdk.mode == TuskDriftMode.DISABLED:
198+
return await original_async_immediate(pipeline_self, *args, **kwargs)
199+
200+
return await instrumentation._traced_async_pipeline_immediate_execute(
201+
pipeline_self,
202+
original_async_immediate,
203+
sdk,
204+
args,
205+
kwargs,
206+
)
207+
208+
AsyncPipeline.immediate_execute_command = patched_async_pipeline_immediate_execute
209+
logger.debug("redis.asyncio.client.Pipeline.immediate_execute_command instrumented")
165210
except ImportError:
166211
logger.debug("redis.asyncio.client.Pipeline not available")
167212
except ImportError:
@@ -196,6 +241,58 @@ def original_call():
196241
span_kind=OTelSpanKind.CLIENT,
197242
)
198243

244+
def _traced_pipeline_immediate_execute(
245+
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
246+
) -> Any:
247+
"""Traced Pipeline.immediate_execute_command method for WATCH and other immediate commands."""
248+
if sdk.mode == TuskDriftMode.DISABLED:
249+
return original_execute(pipeline, *args, **kwargs)
250+
251+
command_name = args[0] if args else "UNKNOWN"
252+
command_str = self._format_command(args)
253+
254+
def original_call():
255+
return original_execute(pipeline, *args, **kwargs)
256+
257+
if sdk.mode == TuskDriftMode.REPLAY:
258+
return handle_replay_mode(
259+
replay_mode_handler=lambda: self._replay_execute_command(sdk, command_name, command_str, args),
260+
no_op_request_handler=lambda: self._get_default_response(command_name),
261+
is_server_request=False,
262+
)
263+
264+
# RECORD mode
265+
return handle_record_mode(
266+
original_function_call=original_call,
267+
record_mode_handler=lambda is_pre_app_start: self._record_execute_command(
268+
pipeline, original_execute, sdk, args, kwargs, command_name, command_str, is_pre_app_start
269+
),
270+
span_kind=OTelSpanKind.CLIENT,
271+
)
272+
273+
async def _traced_async_pipeline_immediate_execute(
274+
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
275+
) -> Any:
276+
"""Traced async Pipeline.immediate_execute_command method for WATCH and other immediate commands."""
277+
if sdk.mode == TuskDriftMode.DISABLED:
278+
return await original_execute(pipeline, *args, **kwargs)
279+
280+
command_name = args[0] if args else "UNKNOWN"
281+
command_str = self._format_command(args)
282+
283+
# For REPLAY mode, use sync mocking (mocks are retrieved synchronously)
284+
if sdk.mode == TuskDriftMode.REPLAY:
285+
return handle_replay_mode(
286+
replay_mode_handler=lambda: self._replay_execute_command(sdk, command_name, command_str, args),
287+
no_op_request_handler=lambda: self._get_default_response(command_name),
288+
is_server_request=False,
289+
)
290+
291+
# RECORD mode with async execution
292+
return await self._record_async_execute_command(
293+
pipeline, original_execute, sdk, args, kwargs, command_name, command_str
294+
)
295+
199296
def _replay_execute_command(self, sdk: TuskDrift, command_name: str, command_str: str, args: tuple) -> Any:
200297
"""Handle REPLAY mode for execute_command."""
201298
span_name = f"redis.{command_name}"

0 commit comments

Comments
 (0)