Skip to content

Commit e8057f0

Browse files
feat: instrument RedisCluster and ClusterPipeline for record/replay
1 parent 46a8936 commit e8057f0

4 files changed

Lines changed: 292 additions & 1 deletion

File tree

drift/instrumentation/redis/e2e-tests/docker-compose.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,46 @@ services:
77
timeout: 5s
88
retries: 5
99

10+
redis-node-1:
11+
image: redis:7-alpine
12+
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-1 --appendonly yes
13+
healthcheck:
14+
test: ["CMD", "redis-cli", "ping"]
15+
interval: 5s
16+
timeout: 5s
17+
retries: 5
18+
19+
redis-node-2:
20+
image: redis:7-alpine
21+
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-2 --appendonly yes
22+
healthcheck:
23+
test: ["CMD", "redis-cli", "ping"]
24+
interval: 5s
25+
timeout: 5s
26+
retries: 5
27+
28+
redis-node-3:
29+
image: redis:7-alpine
30+
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-3 --appendonly yes
31+
healthcheck:
32+
test: ["CMD", "redis-cli", "ping"]
33+
interval: 5s
34+
timeout: 5s
35+
retries: 5
36+
37+
redis-cluster-init:
38+
image: redis:7-alpine
39+
depends_on:
40+
redis-node-1:
41+
condition: service_healthy
42+
redis-node-2:
43+
condition: service_healthy
44+
redis-node-3:
45+
condition: service_healthy
46+
command: >
47+
sh -c "redis-cli --cluster create redis-node-1:6379 redis-node-2:6379 redis-node-3:6379 --cluster-replicas 0 --cluster-yes"
48+
restart: "no"
49+
1050
app:
1151
build:
1252
context: ../../../..
@@ -21,10 +61,14 @@ services:
2161
depends_on:
2262
redis:
2363
condition: service_healthy
64+
redis-cluster-init:
65+
condition: service_completed_successfully
2466
environment:
2567
- PORT=8000
2668
- REDIS_HOST=redis
2769
- REDIS_PORT=6379
70+
- REDIS_CLUSTER_HOST=redis-node-1
71+
- REDIS_CLUSTER_PORT=6379
2872
- TUSK_ANALYTICS_DISABLED=1
2973
- PYTHONUNBUFFERED=1
3074
- TUSK_USE_RUST_CORE=${TUSK_USE_RUST_CORE:-0}

drift/instrumentation/redis/e2e-tests/src/app.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,27 @@
1515

1616
app = Flask(__name__)
1717

18-
# Initialize Redis client
18+
# Initialize Redis client (standalone)
1919
redis_client = redis.Redis(
2020
host=os.getenv("REDIS_HOST", "redis"), port=int(os.getenv("REDIS_PORT", "6379")), db=0, decode_responses=True
2121
)
2222

23+
# Lazy-initialized RedisCluster client.
24+
# Initialized on first use so that cluster connection failures don't prevent
25+
# the rest of the app from starting (avoids breaking non-cluster tests).
26+
_cluster_client = None
27+
28+
29+
def get_cluster_client():
30+
global _cluster_client
31+
if _cluster_client is None:
32+
_cluster_client = redis.RedisCluster(
33+
host=os.getenv("REDIS_CLUSTER_HOST", "redis-node-1"),
34+
port=int(os.getenv("REDIS_CLUSTER_PORT", "6379")),
35+
decode_responses=True,
36+
)
37+
return _cluster_client
38+
2339

2440
@app.route("/health")
2541
def health():
@@ -245,6 +261,65 @@ def test_transaction_watch():
245261
return jsonify({"error": str(e)}), 500
246262

247263

264+
@app.route("/test/cluster-set-get", methods=["GET"])
265+
def test_cluster_set_get():
266+
"""Test SET/GET through RedisCluster.
267+
268+
RedisCluster.execute_command is a separate method from Redis.execute_command
269+
"""
270+
try:
271+
cluster = get_cluster_client()
272+
cluster.set("test:cluster:key1", "cluster_value1")
273+
cluster.set("test:cluster:key2", "cluster_value2")
274+
val1 = cluster.get("test:cluster:key1")
275+
val2 = cluster.get("test:cluster:key2")
276+
cluster.delete("test:cluster:key1", "test:cluster:key2")
277+
return jsonify({"success": True, "val1": val1, "val2": val2})
278+
except Exception as e:
279+
return jsonify({"error": str(e)}), 500
280+
281+
282+
@app.route("/test/cluster-incr", methods=["GET"])
283+
def test_cluster_incr():
284+
"""Test INCR through RedisCluster.
285+
286+
Exercises the cluster's execute_command path for a simple write operation.
287+
"""
288+
try:
289+
cluster = get_cluster_client()
290+
cluster.set("test:cluster:counter", "0")
291+
cluster.incr("test:cluster:counter")
292+
cluster.incr("test:cluster:counter")
293+
cluster.incr("test:cluster:counter")
294+
val = cluster.get("test:cluster:counter")
295+
cluster.delete("test:cluster:counter")
296+
return jsonify({"success": True, "value": int(val)})
297+
except Exception as e:
298+
return jsonify({"error": str(e)}), 500
299+
300+
301+
@app.route("/test/cluster-pipeline", methods=["GET"])
302+
def test_cluster_pipeline():
303+
"""Test pipeline through RedisCluster (ClusterPipeline).
304+
305+
ClusterPipeline.execute_command is also a separate path.
306+
All keys use the same hash tag {cp} to ensure they land on the same slot,
307+
which is required for cluster pipelines.
308+
"""
309+
try:
310+
cluster = get_cluster_client()
311+
pipe = cluster.pipeline()
312+
pipe.set("{cp}:key1", "pipe_val1")
313+
pipe.set("{cp}:key2", "pipe_val2")
314+
pipe.get("{cp}:key1")
315+
pipe.get("{cp}:key2")
316+
results = pipe.execute()
317+
cluster.delete("{cp}:key1", "{cp}:key2")
318+
return jsonify({"success": True, "results": results})
319+
except Exception as e:
320+
return jsonify({"error": str(e)}), 500
321+
322+
248323
if __name__ == "__main__":
249324
sdk.mark_app_as_ready()
250325
app.run(host="0.0.0.0", port=8000, debug=False)

drift/instrumentation/redis/e2e-tests/src/test_requests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@
4444

4545
make_request("GET", "/test/transaction-watch")
4646

47+
# RedisCluster operations
48+
make_request("GET", "/test/cluster-set-get")
49+
make_request("GET", "/test/cluster-incr")
50+
make_request("GET", "/test/cluster-pipeline")
51+
4752
print_request_summary()

drift/instrumentation/redis/instrumentation.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,110 @@ async def patched_async_pipeline_immediate_execute(pipeline_self, *args, **kwarg
212212
except ImportError:
213213
logger.debug("redis.asyncio not available")
214214

215+
# Patch RedisCluster.execute_command (separate class, NOT a subclass of Redis)
216+
try:
217+
from redis.cluster import ClusterPipeline as SyncClusterPipeline
218+
from redis.cluster import RedisCluster
219+
220+
if hasattr(RedisCluster, "execute_command"):
221+
original_cluster_execute = RedisCluster.execute_command
222+
instrumentation = self
223+
224+
def patched_cluster_execute_command(cluster_self, *args, **kwargs):
225+
"""Patched RedisCluster.execute_command method."""
226+
sdk = TuskDrift.get_instance()
227+
228+
if sdk.mode == TuskDriftMode.DISABLED:
229+
return original_cluster_execute(cluster_self, *args, **kwargs)
230+
231+
return instrumentation._traced_execute_command(
232+
cluster_self,
233+
original_cluster_execute,
234+
sdk,
235+
args,
236+
kwargs,
237+
)
238+
239+
RedisCluster.execute_command = patched_cluster_execute_command
240+
logger.debug("redis.cluster.RedisCluster.execute_command instrumented")
241+
242+
# Patch ClusterPipeline.execute
243+
if hasattr(SyncClusterPipeline, "execute"):
244+
original_cluster_pipeline_execute = SyncClusterPipeline.execute
245+
instrumentation = self
246+
247+
def patched_cluster_pipeline_execute(pipeline_self, *args, **kwargs):
248+
"""Patched ClusterPipeline.execute method."""
249+
sdk = TuskDrift.get_instance()
250+
251+
if sdk.mode == TuskDriftMode.DISABLED:
252+
return original_cluster_pipeline_execute(pipeline_self, *args, **kwargs)
253+
254+
return instrumentation._traced_cluster_pipeline_execute(
255+
pipeline_self,
256+
original_cluster_pipeline_execute,
257+
sdk,
258+
args,
259+
kwargs,
260+
)
261+
262+
SyncClusterPipeline.execute = patched_cluster_pipeline_execute
263+
logger.debug("redis.cluster.ClusterPipeline.execute instrumented")
264+
except ImportError:
265+
logger.debug("redis.cluster not available")
266+
267+
# Patch async RedisCluster.execute_command
268+
try:
269+
from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline
270+
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
271+
272+
if hasattr(AsyncRedisCluster, "execute_command"):
273+
original_async_cluster_execute = AsyncRedisCluster.execute_command
274+
instrumentation = self
275+
276+
async def patched_async_cluster_execute_command(cluster_self, *args, **kwargs):
277+
"""Patched async RedisCluster.execute_command method."""
278+
sdk = TuskDrift.get_instance()
279+
280+
if sdk.mode == TuskDriftMode.DISABLED:
281+
return await original_async_cluster_execute(cluster_self, *args, **kwargs)
282+
283+
return await instrumentation._traced_async_execute_command(
284+
cluster_self,
285+
original_async_cluster_execute,
286+
sdk,
287+
args,
288+
kwargs,
289+
)
290+
291+
AsyncRedisCluster.execute_command = patched_async_cluster_execute_command
292+
logger.debug("redis.asyncio.cluster.RedisCluster.execute_command instrumented")
293+
294+
# Patch async ClusterPipeline.execute
295+
if hasattr(AsyncClusterPipeline, "execute"):
296+
original_async_cluster_pipeline_execute = AsyncClusterPipeline.execute
297+
instrumentation = self
298+
299+
async def patched_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs):
300+
"""Patched async ClusterPipeline.execute method."""
301+
sdk = TuskDrift.get_instance()
302+
303+
if sdk.mode == TuskDriftMode.DISABLED:
304+
return await original_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs)
305+
306+
return await instrumentation._traced_async_cluster_pipeline_execute(
307+
pipeline_self,
308+
original_async_cluster_pipeline_execute,
309+
sdk,
310+
args,
311+
kwargs,
312+
)
313+
314+
AsyncClusterPipeline.execute = patched_async_cluster_pipeline_execute
315+
logger.debug("redis.asyncio.cluster.ClusterPipeline.execute instrumented")
316+
except ImportError:
317+
logger.debug("redis.asyncio.cluster not available")
318+
215319
def _traced_execute_command(
216320
self, redis_client: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
217321
) -> Any:
@@ -519,6 +623,59 @@ async def _traced_async_pipeline_execute(
519623
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack
520624
)
521625

626+
def _traced_cluster_pipeline_execute(
627+
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
628+
) -> Any:
629+
"""Traced ClusterPipeline.execute method.
630+
631+
Must snapshot the command queue before calling original_execute because
632+
ClusterPipeline.execute resets the queue in its finally block.
633+
"""
634+
if sdk.mode == TuskDriftMode.DISABLED:
635+
return original_execute(pipeline, *args, **kwargs)
636+
637+
command_stack = list(self._get_pipeline_commands(pipeline))
638+
command_str = self._format_pipeline_commands(command_stack)
639+
640+
def original_call():
641+
return original_execute(pipeline, *args, **kwargs)
642+
643+
if sdk.mode == TuskDriftMode.REPLAY:
644+
return handle_replay_mode(
645+
replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack),
646+
no_op_request_handler=lambda: [],
647+
is_server_request=False,
648+
)
649+
650+
return handle_record_mode(
651+
original_function_call=original_call,
652+
record_mode_handler=lambda is_pre_app_start: self._record_pipeline_execute(
653+
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack, is_pre_app_start
654+
),
655+
span_kind=OTelSpanKind.CLIENT,
656+
)
657+
658+
async def _traced_async_cluster_pipeline_execute(
659+
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
660+
) -> Any:
661+
"""Traced async ClusterPipeline.execute method."""
662+
if sdk.mode == TuskDriftMode.DISABLED:
663+
return await original_execute(pipeline, *args, **kwargs)
664+
665+
command_stack = list(self._get_pipeline_commands(pipeline))
666+
command_str = self._format_pipeline_commands(command_stack)
667+
668+
if sdk.mode == TuskDriftMode.REPLAY:
669+
return handle_replay_mode(
670+
replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack),
671+
no_op_request_handler=lambda: [],
672+
is_server_request=False,
673+
)
674+
675+
return await self._record_async_pipeline_execute(
676+
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack
677+
)
678+
522679
async def _record_async_pipeline_execute(
523680
self,
524681
pipeline: Any,
@@ -699,6 +856,16 @@ def _get_pipeline_commands(self, pipeline: Any) -> list:
699856
return pipeline.command_stack
700857
elif hasattr(pipeline, "_command_stack"):
701858
return pipeline._command_stack
859+
# ClusterPipeline stores commands in _execution_strategy._command_queue
860+
elif hasattr(pipeline, "_execution_strategy"):
861+
strategy = pipeline._execution_strategy
862+
if hasattr(strategy, "_command_queue"):
863+
return strategy._command_queue
864+
elif hasattr(strategy, "command_queue"):
865+
return strategy.command_queue
866+
# Async ClusterPipeline stores commands in _command_queue directly
867+
elif hasattr(pipeline, "_command_queue"):
868+
return pipeline._command_queue
702869
except AttributeError:
703870
pass
704871
return []

0 commit comments

Comments
 (0)