Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions drift/instrumentation/redis/e2e-tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@ services:
timeout: 5s
retries: 5

redis-node-1:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-1 --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

redis-node-2:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-2 --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

redis-node-3:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-3 --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

redis-cluster-init:
image: redis:7-alpine
depends_on:
redis-node-1:
condition: service_healthy
redis-node-2:
condition: service_healthy
redis-node-3:
condition: service_healthy
command: >
sh -c "redis-cli --cluster create redis-node-1:6379 redis-node-2:6379 redis-node-3:6379 --cluster-replicas 0 --cluster-yes"
restart: "no"

app:
build:
context: ../../../..
Expand All @@ -21,10 +61,14 @@ services:
depends_on:
redis:
condition: service_healthy
redis-cluster-init:
condition: service_completed_successfully
environment:
- PORT=8000
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_CLUSTER_HOST=redis-node-1
- REDIS_CLUSTER_PORT=6379
- TUSK_ANALYTICS_DISABLED=1
- PYTHONUNBUFFERED=1
- TUSK_USE_RUST_CORE=${TUSK_USE_RUST_CORE:-0}
Expand Down
96 changes: 95 additions & 1 deletion drift/instrumentation/redis/e2e-tests/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@

app = Flask(__name__)

# Initialize Redis client
# Initialize Redis client (standalone)
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "redis"), port=int(os.getenv("REDIS_PORT", "6379")), db=0, decode_responses=True
)

# Lazy-initialized RedisCluster client.
# Initialized on first use so that cluster connection failures don't prevent
# the rest of the app from starting (avoids breaking non-cluster tests).
_cluster_client = None


def get_cluster_client():
global _cluster_client
if _cluster_client is None:
_cluster_client = redis.RedisCluster(
host=os.getenv("REDIS_CLUSTER_HOST", "redis-node-1"),
port=int(os.getenv("REDIS_CLUSTER_PORT", "6379")),
decode_responses=True,
)
return _cluster_client


@app.route("/health")
def health():
Expand Down Expand Up @@ -245,6 +261,84 @@ def test_transaction_watch():
return jsonify({"error": str(e)}), 500


@app.route("/test/cluster-set-get", methods=["GET"])
def test_cluster_set_get():
"""Test SET/GET through RedisCluster.

RedisCluster.execute_command is a separate method from Redis.execute_command
"""
try:
cluster = get_cluster_client()
cluster.set("test:cluster:key1", "cluster_value1")
cluster.set("test:cluster:key2", "cluster_value2")
val1 = cluster.get("test:cluster:key1")
val2 = cluster.get("test:cluster:key2")
cluster.delete("test:cluster:key1", "test:cluster:key2")
return jsonify({"success": True, "val1": val1, "val2": val2})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/cluster-incr", methods=["GET"])
def test_cluster_incr():
"""Test INCR through RedisCluster.

Exercises the cluster's execute_command path for a simple write operation.
"""
try:
cluster = get_cluster_client()
cluster.set("test:cluster:counter", "0")
cluster.incr("test:cluster:counter")
cluster.incr("test:cluster:counter")
cluster.incr("test:cluster:counter")
val = cluster.get("test:cluster:counter")
cluster.delete("test:cluster:counter")
return jsonify({"success": True, "value": int(val)})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/cluster-pipeline", methods=["GET"])
def test_cluster_pipeline():
"""Test pipeline through RedisCluster (ClusterPipeline).

ClusterPipeline.execute_command is also a separate path.
All keys use the same hash tag {cp} to ensure they land on the same slot,
which is required for cluster pipelines.
"""
try:
cluster = get_cluster_client()
pipe = cluster.pipeline()
pipe.set("{cp}:key1", "pipe_val1")
pipe.set("{cp}:key2", "pipe_val2")
pipe.get("{cp}:key1")
pipe.get("{cp}:key2")
results = pipe.execute()
cluster.delete("{cp}:key1", "{cp}:key2")
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500

@app.route("/test/cluster-pipeline-transaction", methods=["GET"])
def test_cluster_pipeline_transaction():
"""Test ClusterPipeline with transaction mode.

Uses TransactionStrategy internally. All keys must be on the same slot.
"""
try:
cluster = get_cluster_client()
pipe = cluster.pipeline(transaction=True)
pipe.set("{tx}:key1", "txval1")
pipe.set("{tx}:key2", "txval2")
pipe.get("{tx}:key1")
pipe.get("{tx}:key2")
results = pipe.execute()
cluster.delete("{tx}:key1", "{tx}:key2")
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500


if __name__ == "__main__":
sdk.mark_app_as_ready()
app.run(host="0.0.0.0", port=8000, debug=False)
8 changes: 8 additions & 0 deletions drift/instrumentation/redis/e2e-tests/src/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@

make_request("GET", "/test/transaction-watch")

# RedisCluster operations
make_request("GET", "/test/cluster-set-get")
make_request("GET", "/test/cluster-incr")
make_request("GET", "/test/cluster-pipeline")

# RedisCluster
make_request("GET", "/test/cluster-pipeline-transaction")

print_request_summary()
180 changes: 178 additions & 2 deletions drift/instrumentation/redis/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,110 @@ async def patched_async_pipeline_immediate_execute(pipeline_self, *args, **kwarg
except ImportError:
logger.debug("redis.asyncio not available")

# Patch RedisCluster.execute_command (separate class, NOT a subclass of Redis)
try:
from redis.cluster import ClusterPipeline as SyncClusterPipeline
from redis.cluster import RedisCluster

if hasattr(RedisCluster, "execute_command"):
original_cluster_execute = RedisCluster.execute_command
instrumentation = self

def patched_cluster_execute_command(cluster_self, *args, **kwargs):
"""Patched RedisCluster.execute_command method."""
sdk = TuskDrift.get_instance()

if sdk.mode == TuskDriftMode.DISABLED:
return original_cluster_execute(cluster_self, *args, **kwargs)

return instrumentation._traced_execute_command(
cluster_self,
original_cluster_execute,
sdk,
args,
kwargs,
)

RedisCluster.execute_command = patched_cluster_execute_command
logger.debug("redis.cluster.RedisCluster.execute_command instrumented")

# Patch ClusterPipeline.execute
if hasattr(SyncClusterPipeline, "execute"):
original_cluster_pipeline_execute = SyncClusterPipeline.execute
instrumentation = self

def patched_cluster_pipeline_execute(pipeline_self, *args, **kwargs):
"""Patched ClusterPipeline.execute method."""
sdk = TuskDrift.get_instance()

if sdk.mode == TuskDriftMode.DISABLED:
return original_cluster_pipeline_execute(pipeline_self, *args, **kwargs)

return instrumentation._traced_cluster_pipeline_execute(
pipeline_self,
original_cluster_pipeline_execute,
sdk,
args,
kwargs,
)

SyncClusterPipeline.execute = patched_cluster_pipeline_execute
logger.debug("redis.cluster.ClusterPipeline.execute instrumented")
except ImportError:
logger.debug("redis.cluster not available")

# Patch async RedisCluster.execute_command
try:
from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster

if hasattr(AsyncRedisCluster, "execute_command"):
original_async_cluster_execute = AsyncRedisCluster.execute_command
instrumentation = self

async def patched_async_cluster_execute_command(cluster_self, *args, **kwargs):
"""Patched async RedisCluster.execute_command method."""
sdk = TuskDrift.get_instance()

if sdk.mode == TuskDriftMode.DISABLED:
return await original_async_cluster_execute(cluster_self, *args, **kwargs)

return await instrumentation._traced_async_execute_command(
cluster_self,
original_async_cluster_execute,
sdk,
args,
kwargs,
)

AsyncRedisCluster.execute_command = patched_async_cluster_execute_command
logger.debug("redis.asyncio.cluster.RedisCluster.execute_command instrumented")

# Patch async ClusterPipeline.execute
if hasattr(AsyncClusterPipeline, "execute"):
original_async_cluster_pipeline_execute = AsyncClusterPipeline.execute
instrumentation = self

async def patched_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs):
"""Patched async ClusterPipeline.execute method."""
sdk = TuskDrift.get_instance()

if sdk.mode == TuskDriftMode.DISABLED:
return await original_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs)

return await instrumentation._traced_async_cluster_pipeline_execute(
pipeline_self,
original_async_cluster_pipeline_execute,
sdk,
args,
kwargs,
)

AsyncClusterPipeline.execute = patched_async_cluster_pipeline_execute
logger.debug("redis.asyncio.cluster.ClusterPipeline.execute instrumented")
except ImportError:
logger.debug("redis.asyncio.cluster not available")

def _traced_execute_command(
self, redis_client: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
) -> Any:
Expand Down Expand Up @@ -519,6 +623,59 @@ async def _traced_async_pipeline_execute(
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack
)

def _traced_cluster_pipeline_execute(
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
) -> Any:
"""Traced ClusterPipeline.execute method.

Must snapshot the command queue before calling original_execute because
ClusterPipeline.execute resets the queue in its finally block.
"""
if sdk.mode == TuskDriftMode.DISABLED:
return original_execute(pipeline, *args, **kwargs)

command_stack = list(self._get_pipeline_commands(pipeline))
command_str = self._format_pipeline_commands(command_stack)

def original_call():
return original_execute(pipeline, *args, **kwargs)

if sdk.mode == TuskDriftMode.REPLAY:
return handle_replay_mode(
replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack),
no_op_request_handler=lambda: [],
is_server_request=False,
)

return handle_record_mode(
original_function_call=original_call,
record_mode_handler=lambda is_pre_app_start: self._record_pipeline_execute(
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack, is_pre_app_start
),
span_kind=OTelSpanKind.CLIENT,
)

async def _traced_async_cluster_pipeline_execute(
self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict
) -> Any:
"""Traced async ClusterPipeline.execute method."""
if sdk.mode == TuskDriftMode.DISABLED:
return await original_execute(pipeline, *args, **kwargs)

command_stack = list(self._get_pipeline_commands(pipeline))
command_str = self._format_pipeline_commands(command_stack)

if sdk.mode == TuskDriftMode.REPLAY:
return handle_replay_mode(
replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack),
no_op_request_handler=lambda: [],
is_server_request=False,
)

return await self._record_async_pipeline_execute(
pipeline, original_execute, sdk, args, kwargs, command_str, command_stack
)

async def _record_async_pipeline_execute(
self,
pipeline: Any,
Expand Down Expand Up @@ -693,11 +850,28 @@ def _format_command(self, args: tuple) -> str:
return " ".join(parts)

def _get_pipeline_commands(self, pipeline: Any) -> list:
"""Extract commands from pipeline."""
"""Extract commands from pipeline.

ClusterPipeline has an always-empty ``command_stack`` attribute while
the real commands live in ``_execution_strategy._command_queue``, so we
check ``_execution_strategy`` first to avoid returning the empty list.
"""
try:
# ClusterPipeline stores commands in _execution_strategy._command_queue.
# Must be checked before command_stack because ClusterPipeline also
# has a command_stack attr that is always empty (redis-py #3703).
if hasattr(pipeline, "_execution_strategy"):
strategy = pipeline._execution_strategy
if hasattr(strategy, "_command_queue"):
return strategy._command_queue
if hasattr(strategy, "command_queue"):
return strategy.command_queue
# Async ClusterPipeline stores commands in _command_queue directly
if hasattr(pipeline, "_command_queue"):
return pipeline._command_queue
Comment thread
cursor[bot] marked this conversation as resolved.
if hasattr(pipeline, "command_stack"):
return pipeline.command_stack
elif hasattr(pipeline, "_command_stack"):
if hasattr(pipeline, "_command_stack"):
return pipeline._command_stack
except AttributeError:
pass
Expand Down Expand Up @@ -948,6 +1122,8 @@ def _deserialize_value(self, value: Any) -> Any:
return {k: self._deserialize_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self._deserialize_value(v) for v in value]
elif isinstance(value, float) and value.is_integer():
Comment thread
sohankshirsagar marked this conversation as resolved.
return int(value)
Comment thread
sohankshirsagar marked this conversation as resolved.
return value

def _deserialize_response(self, mock_data: dict[str, Any]) -> Any:
Expand Down
Loading