feat: instrument RedisCluster and ClusterPipeline for record/replay #91
Generated 30 tests - 30 passed
Go to Tusk to view more details or incorporate tests ↗
test_redis_instrumentation.py (RedisInstrumentation._deserialize_value, RedisInstrumentation._serialize_value, RedisInstrumentation._get_pipeline_commands, RedisInstrumentation._traced_cluster_pipeline_execute, RedisInstrumentation._traced_async_cluster_pipeline_execute) - 30 generated, 30 ✓
Full test file
File path: tests/unit/test_redis_instrumentation.py
"""Unit tests for RedisInstrumentation helper methods."""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from drift.core.types import TuskDriftMode
from drift.instrumentation.redis.instrumentation import RedisInstrumentation
class TestRedisInstrumentationHelpers:
"""Tests for RedisInstrumentation helper methods."""
@pytest.fixture
def instrumentation(self):
"""Create RedisInstrumentation instance without patching Redis."""
return RedisInstrumentation(enabled=False)
class TestDeserializeValue(TestRedisInstrumentationHelpers):
"""Tests for _deserialize_value."""
def test_string_passthrough(self, instrumentation):
"""Strings should be returned unchanged."""
result = instrumentation._deserialize_value("hello")
assert result == "hello"
def test_none_passthrough(self, instrumentation):
"""None should be returned unchanged."""
result = instrumentation._deserialize_value(None)
assert result is None
def test_integer_float_converted_to_int(self, instrumentation):
"""Float values with no fractional part (e.g. 42.0) should be converted to int.
Redis INCR returns an integer; JSON round-trip via float would lose the type.
"""
result = instrumentation._deserialize_value(42.0)
assert result == 42
assert isinstance(result, int)
def test_non_integer_float_returned_as_float(self, instrumentation):
"""Non-integer floats should be returned unchanged."""
result = instrumentation._deserialize_value(3.14)
assert result == 3.14
assert isinstance(result, float)
def test_float_wrapper_dict_returns_float(self, instrumentation):
"""{'__float__': True, 'value': v} wrapper should deserialize to float(v)."""
result = instrumentation._deserialize_value({"__float__": True, "value": 1.5})
assert result == 1.5
assert isinstance(result, float)
def test_float_wrapper_with_integer_value_returns_float_not_int(self, instrumentation):
"""__float__ wrapper should always return float, even when value is a whole number."""
result = instrumentation._deserialize_value({"__float__": True, "value": 10.0})
assert isinstance(result, float)
assert result == 10.0
def test_bytes_utf8_wrapper_returns_bytes(self, instrumentation):
"""{'__bytes__': True, 'encoding': 'utf8', 'value': str} should return bytes."""
result = instrumentation._deserialize_value({"__bytes__": True, "encoding": "utf8", "value": "hello"})
assert result == b"hello"
assert isinstance(result, bytes)
def test_list_deserializes_elements(self, instrumentation):
"""Lists should have each element deserialized recursively."""
result = instrumentation._deserialize_value([42.0, "text", None])
# 42.0 is an integer float — should become int
assert result == [42, "text", None]
assert isinstance(result[0], int)
class TestSerializeValue(TestRedisInstrumentationHelpers):
"""Tests for _serialize_value."""
def test_float_serialized_to_wrapper(self, instrumentation):
"""Floats should be wrapped in {'__float__': True, 'value': ...}."""
result = instrumentation._serialize_value(3.14)
assert result == {"__float__": True, "value": 3.14}
def test_float_zero_serialized_to_wrapper(self, instrumentation):
"""Float zero should also be wrapped."""
result = instrumentation._serialize_value(0.0)
assert result == {"__float__": True, "value": 0.0}
def test_int_returned_as_is(self, instrumentation):
"""Integers should be returned as-is (no wrapping)."""
result = instrumentation._serialize_value(42)
assert result == 42
assert isinstance(result, int)
def test_bool_returned_as_is(self, instrumentation):
"""Bools should be returned as-is."""
assert instrumentation._serialize_value(True) is True
assert instrumentation._serialize_value(False) is False
def test_string_returned_as_is(self, instrumentation):
"""Strings should be returned unchanged."""
result = instrumentation._serialize_value("hello")
assert result == "hello"
def test_none_returned_as_is(self, instrumentation):
"""None should be returned unchanged."""
result = instrumentation._serialize_value(None)
assert result is None
def test_bytes_utf8_serialized_to_wrapper(self, instrumentation):
"""Bytes that decode as UTF-8 should be wrapped in __bytes__ dict."""
result = instrumentation._serialize_value(b"hello")
assert result == {"__bytes__": True, "encoding": "utf8", "value": "hello"}
def test_list_with_float_serialized_recursively(self, instrumentation):
"""Lists containing floats should have their elements serialized."""
result = instrumentation._serialize_value([1.5, "x", 99])
assert result == [{"__float__": True, "value": 1.5}, "x", 99]
def test_serialize_deserialize_float_roundtrip(self, instrumentation):
"""Serializing then deserializing a float should recover the original value."""
original = 2.71828
serialized = instrumentation._serialize_value(original)
deserialized = instrumentation._deserialize_value(serialized)
assert deserialized == original
assert isinstance(deserialized, float)
class TestGetPipelineCommands(TestRedisInstrumentationHelpers):
"""Tests for _get_pipeline_commands."""
def test_returns_command_stack_attribute(self, instrumentation):
"""Should return command_stack when pipeline has that attribute (standard Pipeline)."""
commands = ["cmd1", "cmd2"]
pipeline = SimpleNamespace(command_stack=commands)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commands
def test_returns_execution_strategy_private_command_queue(self, instrumentation):
"""Should return _execution_strategy._command_queue for sync ClusterPipeline."""
commands = ["SET key val", "GET key"]
strategy = SimpleNamespace(_command_queue=commands)
pipeline = SimpleNamespace(_execution_strategy=strategy)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commands
def test_returns_execution_strategy_public_command_queue_fallback(self, instrumentation):
"""Should return _execution_strategy.command_queue when _command_queue is absent."""
commands = ["SET", "GET"]
strategy = SimpleNamespace(command_queue=commands)
pipeline = SimpleNamespace(_execution_strategy=strategy)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commands
def test_returns_direct_command_queue_for_async_cluster(self, instrumentation):
"""Should return _command_queue directly for async ClusterPipeline."""
commands = ["HSET", "HGET"]
pipeline = SimpleNamespace(_command_queue=commands)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commands
def test_returns_empty_list_when_no_known_attributes(self, instrumentation):
"""Should return empty list when pipeline has none of the known command storage attributes."""
pipeline = SimpleNamespace()
result = instrumentation._get_pipeline_commands(pipeline)
assert result == []
def test_execution_strategy_takes_priority_over_command_stack(self, instrumentation):
"""_execution_strategy._command_queue must take priority over command_stack (redis-py #3703)."""
real_commands = ["SET k v"]
fake_empty = []
strategy = SimpleNamespace(_command_queue=real_commands)
# ClusterPipeline has both attrs but command_stack is always empty
pipeline = SimpleNamespace(_execution_strategy=strategy, command_stack=fake_empty)
result = instrumentation._get_pipeline_commands(pipeline)
assert result is real_commands
class TestTracedClusterPipelineExecute(TestRedisInstrumentationHelpers):
"""Tests for _traced_cluster_pipeline_execute."""
def _make_sdk(self, mode: TuskDriftMode) -> MagicMock:
sdk = MagicMock()
sdk.mode = mode
return sdk
def test_disabled_mode_calls_original_execute(self, instrumentation):
"""In DISABLED mode, _traced_cluster_pipeline_execute should call original_execute directly."""
pipeline = SimpleNamespace(command_stack=[])
original_execute = MagicMock(return_value=["result"])
sdk = self._make_sdk(TuskDriftMode.DISABLED)
result = instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
original_execute.assert_called_once_with(pipeline)
assert result == ["result"]
def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
"""In REPLAY mode, should route to handle_replay_mode."""
commands = ["SET k v", "GET k"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = MagicMock()
sdk = self._make_sdk(TuskDriftMode.REPLAY)
with patch(
"drift.instrumentation.redis.instrumentation.handle_replay_mode",
return_value=["mocked_result"],
) as mock_handle_replay:
result = instrumentation._traced_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
mock_handle_replay.assert_called_once()
assert result == ["mocked_result"]
original_execute.assert_not_called()
def test_record_mode_uses_handle_record_mode(self, instrumentation):
"""In RECORD mode, should route to handle_record_mode."""
commands = ["SET k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = MagicMock()
sdk = self._make_sdk(TuskDriftMode.RECORD)
with patch(
"drift.instrumentation.redis.instrumentation.handle_record_mode",
return_value=["recorded"],
) as mock_handle_record:
result = instrumentation._traced_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
mock_handle_record.assert_called_once()
assert result == ["recorded"]
def test_command_queue_snapshotted_before_execution(self, instrumentation):
"""Command queue must be snapshotted before original_execute runs.
ClusterPipeline.execute() clears the queue in its finally block, so
_traced_cluster_pipeline_execute must capture it beforehand.
"""
commands_before = ["SET k v", "INCR counter"]
# Simulate ClusterPipeline resetting queue during execute
pipeline = MagicMock()
pipeline._execution_strategy = SimpleNamespace(_command_queue=commands_before[:])
def side_effect_execute(p, *a, **kw):
# Simulate ClusterPipeline clearing queue in finally
pipeline._execution_strategy._command_queue.clear()
return ["ok", 1]
original_execute = MagicMock(side_effect=side_effect_execute)
sdk = self._make_sdk(TuskDriftMode.RECORD)
captured_command_stack = None
def fake_record_pipeline_execute(p, orig, s, a, kw, cmd_str, cmd_stack, is_pre):
nonlocal captured_command_stack
captured_command_stack = cmd_stack
return ["ok", 1]
with patch.object(instrumentation, "_record_pipeline_execute", side_effect=fake_record_pipeline_execute):
with patch(
"drift.instrumentation.redis.instrumentation.handle_record_mode",
side_effect=lambda original_function_call, record_mode_handler, span_kind: record_mode_handler(False),
):
instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
# The snapshot should contain the original commands, not the cleared list
assert captured_command_stack == commands_before
class TestTracedAsyncClusterPipelineExecute(TestRedisInstrumentationHelpers):
"""Tests for _traced_async_cluster_pipeline_execute."""
def _make_sdk(self, mode: TuskDriftMode) -> MagicMock:
sdk = MagicMock()
sdk.mode = mode
return sdk
def test_disabled_mode_calls_original_execute(self, instrumentation):
"""In DISABLED mode, should await original_execute directly."""
pipeline = SimpleNamespace(command_stack=[])
original_execute = AsyncMock(return_value=["async_result"])
sdk = self._make_sdk(TuskDriftMode.DISABLED)
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
)
original_execute.assert_called_once_with(pipeline)
assert result == ["async_result"]
def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
"""In REPLAY mode, should route to handle_replay_mode."""
commands = ["HSET h k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = AsyncMock()
sdk = self._make_sdk(TuskDriftMode.REPLAY)
with patch(
"drift.instrumentation.redis.instrumentation.handle_replay_mode",
return_value=["async_mocked"],
) as mock_handle_replay:
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
)
mock_handle_replay.assert_called_once()
assert result == ["async_mocked"]
original_execute.assert_not_called()
def test_record_mode_delegates_to_record_async_pipeline_execute(self, instrumentation):
"""In RECORD mode, should delegate to _record_async_pipeline_execute."""
commands = ["SET k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = AsyncMock()
sdk = self._make_sdk(TuskDriftMode.RECORD)
with patch.object(
instrumentation,
"_record_async_pipeline_execute",
new_callable=AsyncMock,
return_value=["async_recorded"],
) as mock_record_async:
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
)
mock_record_async.assert_called_once()
assert result == ["async_recorded"]1. [High value] RedisInstrumentation._deserialize_value should return strings unchanged ✓
Details
- Verifies that string inputs are not modified during deserialization.
- Prevents unintended data transformation and maintains data integrity.
def test_string_passthrough(self, instrumentation):
"""Strings should be returned unchanged."""
result = instrumentation._deserialize_value("hello")
assert result == "hello"Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
2. [High value] RedisInstrumentation._deserialize_value should return None unchanged when input is None ✓
Details
- Ensures that
Nonevalues are not altered during deserialization. - Prevents unintended conversion of null Redis responses, preserving data integrity.
def test_none_passthrough(self, instrumentation):
"""None should be returned unchanged."""
result = instrumentation._deserialize_value(None)
assert result is NoneTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
3. [High value] RedisInstrumentation._deserialize_value should convert integer floats to int ✓
Details
- Prevents type mismatches by ensuring values like 42.0 are returned as int, not float.
- Aligns deserialization behavior with Redis's native integer return for commands like INCR.
- Avoids issues caused by JSON round-trip converting integers to floats.
def test_integer_float_converted_to_int(self, instrumentation):
"""Float values with no fractional part (e.g. 42.0) should be converted to int.
Redis INCR returns an integer; JSON round-trip via float would lose the type.
"""
result = instrumentation._deserialize_value(42.0)
assert result == 42
assert isinstance(result, int)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
4. [High value] RedisInstrumentation._deserialize_value should return non-integer floats unchanged as float ✓
Details
- Ensures that non-integer float values are not altered during deserialization.
- Prevents unintended type conversion or rounding of float values.
- Maintains data integrity for float values stored in Redis.
def test_non_integer_float_returned_as_float(self, instrumentation):
"""Non-integer floats should be returned unchanged."""
result = instrumentation._deserialize_value(3.14)
assert result == 3.14
assert isinstance(result, float)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
5. [High value] RedisInstrumentation._deserialize_value should deserialize float wrapper dict to float value ✓
Details
- Ensures that custom float wrappers in the form {'float': True, 'value': v} are correctly converted to float values.
- Validates accurate replay of Redis responses involving float values.
- Prevents potential data type mismatches during deserialization.
def test_float_wrapper_dict_returns_float(self, instrumentation):
"""{'__float__': True, 'value': v} wrapper should deserialize to float(v)."""
result = instrumentation._deserialize_value({"__float__": True, "value": 1.5})
assert result == 1.5
assert isinstance(result, float)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
6. [High value] RedisInstrumentation._deserialize_value should always return float for __float__ wrapper even if value is a whole number ✓
Details
- Verifies that values wrapped with
__float__are consistently deserialized asfloat, notint. - Prevents unintended type conversion and ensures type fidelity for downstream consumers.
def test_float_wrapper_with_integer_value_returns_float_not_int(self, instrumentation):
"""__float__ wrapper should always return float, even when value is a whole number."""
result = instrumentation._deserialize_value({"__float__": True, "value": 10.0})
assert isinstance(result, float)
assert result == 10.0Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
7. [High value] RedisInstrumentation._deserialize_value should return bytes for bytes wrapper with utf8 encoding ✓
Details
- Ensures that dictionaries with
{'__bytes__': True, 'encoding': 'utf8', 'value': str}are properly deserialized to bytes. - Critical for correct handling of binary data stored as encoded strings in Redis.
def test_bytes_utf8_wrapper_returns_bytes(self, instrumentation):
"""{'__bytes__': True, 'encoding': 'utf8', 'value': str} should return bytes."""
result = instrumentation._deserialize_value({"__bytes__": True, "encoding": "utf8", "value": "hello"})
assert result == b"hello"
assert isinstance(result, bytes)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
8. [High value] RedisInstrumentation._deserialize_value should recursively deserialize elements in a list ✓
Details
- Verifies that each element in a list is deserialized individually, maintaining correct types.
- Ensures nested data structures are handled properly during deserialization.
- Prevents type inconsistencies when lists contain mixed or convertible types.
def test_list_deserializes_elements(self, instrumentation):
"""Lists should have each element deserialized recursively."""
result = instrumentation._deserialize_value([42.0, "text", None])
# 42.0 is an integer float — should become int
assert result == [42, "text", None]
assert isinstance(result[0], int)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
9. [High value] RedisInstrumentation._get_pipeline_commands should return command_stack when present ✓
Details
- Verifies that the function correctly extracts commands from the
command_stackattribute of a standard Pipeline object. - Ensures instrumentation relies on accurate command retrieval for standard Pipeline usage.
def test_returns_command_stack_attribute(self, instrumentation):
"""Should return command_stack when pipeline has that attribute (standard Pipeline)."""
commands = ["cmd1", "cmd2"]
pipeline = SimpleNamespace(command_stack=commands)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commandsTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
10. [High value] RedisInstrumentation._get_pipeline_commands should return _execution_strategy._command_queue for sync ClusterPipeline ✓
Details
- Ensures command extraction logic works for synchronous cluster pipelines.
- Critical for accurate record/replay of Redis cluster operations.
- Verifies access to private command queue attribute in pipeline object.
def test_returns_execution_strategy_private_command_queue(self, instrumentation):
"""Should return _execution_strategy._command_queue for sync ClusterPipeline."""
commands = ["SET key val", "GET key"]
strategy = SimpleNamespace(_command_queue=commands)
pipeline = SimpleNamespace(_execution_strategy=strategy)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commandsTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
11. [High value] RedisInstrumentation._get_pipeline_commands should return _execution_strategy.command_queue when _command_queue is absent ✓
Details
- Validates fallback mechanism for command extraction from pipeline objects
- Ensures compatibility with pipeline implementations lacking
_command_queueattribute - Prevents errors by confirming alternative attribute access is handled correctly
def test_returns_execution_strategy_public_command_queue_fallback(self, instrumentation):
"""Should return _execution_strategy.command_queue when _command_queue is absent."""
commands = ["SET", "GET"]
strategy = SimpleNamespace(command_queue=commands)
pipeline = SimpleNamespace(_execution_strategy=strategy)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commandsTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
12. [High value] RedisInstrumentation._get_pipeline_commands should return _command_queue directly for async ClusterPipeline ✓
Details
- Validates that async
ClusterPipelineobjects are handled correctly by returning their_command_queueattribute. - Ensures compatibility with async Redis instrumentation workflows.
- Prevents potential issues with command extraction in async pipeline scenarios.
def test_returns_direct_command_queue_for_async_cluster(self, instrumentation):
"""Should return _command_queue directly for async ClusterPipeline."""
commands = ["HSET", "HGET"]
pipeline = SimpleNamespace(_command_queue=commands)
result = instrumentation._get_pipeline_commands(pipeline)
assert result == commandsTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
13. [High value] RedisInstrumentation._get_pipeline_commands should return empty list when no known command storage attributes are present ✓
Details
- Ensures method handles pipelines without expected command storage attributes gracefully.
- Prevents potential errors or exceptions in edge cases where pipeline structure is unexpected.
- Confirms robustness of command extraction logic.
def test_returns_empty_list_when_no_known_attributes(self, instrumentation):
"""Should return empty list when pipeline has none of the known command storage attributes."""
pipeline = SimpleNamespace()
result = instrumentation._get_pipeline_commands(pipeline)
assert result == []Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
14. [High value] RedisInstrumentation._get_pipeline_commands should prioritize _execution_strategy._command_queue over command_stack ✓
Details
- Verifies that
_get_pipeline_commandsreturns commands from_execution_strategy._command_queuewhen both_execution_strategy._command_queueandcommand_stackare present. - Ensures compatibility with redis-py ClusterPipeline where
command_stackmay be empty but_execution_strategy._command_queuecontains the actual commands. - Prevents instrumentation errors by guaranteeing the correct command source is used for accurate tracing.
def test_execution_strategy_takes_priority_over_command_stack(self, instrumentation):
"""_execution_strategy._command_queue must take priority over command_stack (redis-py #3703)."""
real_commands = ["SET k v"]
fake_empty = []
strategy = SimpleNamespace(_command_queue=real_commands)
# ClusterPipeline has both attrs but command_stack is always empty
pipeline = SimpleNamespace(_execution_strategy=strategy, command_stack=fake_empty)
result = instrumentation._get_pipeline_commands(pipeline)
assert result is real_commandsTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
15. [High value] RedisInstrumentation._serialize_value should wrap floats in {'__float__': True, 'value': ...} ✓
Details
- Ensures float values are serialized with a wrapper for type preservation.
- Prevents loss of float type information during record/replay cycles.
- Critical for accurate deserialization and data integrity.
def test_float_serialized_to_wrapper(self, instrumentation):
"""Floats should be wrapped in {'__float__': True, 'value': ...}."""
result = instrumentation._serialize_value(3.14)
assert result == {"__float__": True, "value": 3.14}Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
16. [High value] RedisInstrumentation._serialize_value should wrap float zero in float wrapper object ✓
Details
- Ensures that
0.0is serialized as a dictionary with__float__andvaluekeys. - Prevents inconsistencies or edge case errors when handling float zero values in Redis instrumentation.
def test_float_zero_serialized_to_wrapper(self, instrumentation):
"""Float zero should also be wrapped."""
result = instrumentation._serialize_value(0.0)
assert result == {"__float__": True, "value": 0.0}Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
17. [High value] RedisInstrumentation._serialize_value should return integers as-is without wrapping ✓
Details
- Verifies that integer values are not altered or wrapped during serialization.
- Maintains type integrity and avoids unnecessary processing for integer inputs.
- Ensures efficient handling of numeric data in Redis instrumentation.
def test_int_returned_as_is(self, instrumentation):
"""Integers should be returned as-is (no wrapping)."""
result = instrumentation._serialize_value(42)
assert result == 42
assert isinstance(result, int)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
18. [High value] RedisInstrumentation._serialize_value should return bools as-is ✓
Details
- Ensures boolean values are not converted or serialized to other types.
- Maintains logical correctness when handling boolean data in Redis instrumentation.
def test_bool_returned_as_is(self, instrumentation):
"""Bools should be returned as-is."""
assert instrumentation._serialize_value(True) is True
assert instrumentation._serialize_value(False) is FalseTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
19. [High value] RedisInstrumentation._serialize_value should return strings unchanged ✓
Details
- Verifies that string inputs are not altered during serialization.
- Prevents unintended data transformation for string values.
- Maintains data integrity when handling string types.
def test_string_returned_as_is(self, instrumentation):
"""Strings should be returned unchanged."""
result = instrumentation._serialize_value("hello")
assert result == "hello"Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
20. [High value] RedisInstrumentation._serialize_value should return None unchanged ✓
Details
- Ensures that
Nonevalues are not altered during serialization. - Validates correct handling of null values, which is important for downstream logic.
def test_none_returned_as_is(self, instrumentation):
"""None should be returned unchanged."""
result = instrumentation._serialize_value(None)
assert result is NoneTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
21. [High value] RedisInstrumentation._serialize_value should wrap UTF-8 bytes in __bytes__ dict ✓
Details
- Ensures UTF-8 encoded bytes are serialized with encoding and type information.
- Prevents loss of binary data fidelity during replay or deserialization.
- Validates that the output format is consistent for bytes input.
def test_bytes_utf8_serialized_to_wrapper(self, instrumentation):
"""Bytes that decode as UTF-8 should be wrapped in __bytes__ dict."""
result = instrumentation._serialize_value(b"hello")
assert result == {"__bytes__": True, "encoding": "utf8", "value": "hello"}Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
22. [High value] RedisInstrumentation._serialize_value should recursively serialize list elements, including floats ✓
Details
- Verifies that lists are processed element-wise for serialization.
- Ensures that float values within lists are wrapped with the expected serialization format.
- Confirms that mixed-type lists retain correct serialization for each element.
def test_list_with_float_serialized_recursively(self, instrumentation):
"""Lists containing floats should have their elements serialized."""
result = instrumentation._serialize_value([1.5, "x", 99])
assert result == [{"__float__": True, "value": 1.5}, "x", 99]Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
23. [High value] RedisInstrumentation._serialize_value and _deserialize_value should roundtrip float values ✓
Details
- Ensures that float values are accurately preserved through serialization and deserialization.
- Critical for maintaining data integrity during record/replay operations.
- Verifies that the deserialized value matches the original and retains its type.
def test_serialize_deserialize_float_roundtrip(self, instrumentation):
"""Serializing then deserializing a float should recover the original value."""
original = 2.71828
serialized = instrumentation._serialize_value(original)
deserialized = instrumentation._deserialize_value(serialized)
assert deserialized == original
assert isinstance(deserialized, float)Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
24. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should await original_execute directly in DISABLED mode ✓
Details
- Ensures that instrumentation does not alter async execution when mode is DISABLED.
- Verifies that the original async pipeline execution is preserved without additional tracing or wrapping.
- Confirms that the function awaits and returns the result from
original_executeas expected.
def test_disabled_mode_calls_original_execute(self, instrumentation):
"""In DISABLED mode, should await original_execute directly."""
pipeline = SimpleNamespace(command_stack=[])
original_execute = AsyncMock(return_value=["async_result"])
sdk = self._make_sdk(TuskDriftMode.DISABLED)
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
)
original_execute.assert_called_once_with(pipeline)
assert result == ["async_result"]Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
25. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should route to handle_replay_mode in REPLAY mode ✓
Details
- Verifies async replay mode prevents real Redis calls by routing to
handle_replay_mode. - Ensures that
original_executeis not called when in REPLAY mode. - Confirms correct handling of command stack during replay.
def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
"""In REPLAY mode, should route to handle_replay_mode."""
commands = ["HSET h k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = AsyncMock()
sdk = self._make_sdk(TuskDriftMode.REPLAY)
with patch(
"drift.instrumentation.redis.instrumentation.handle_replay_mode",
return_value=["async_mocked"],
) as mock_handle_replay:
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
)
mock_handle_replay.assert_called_once()
assert result == ["async_mocked"]
original_execute.assert_not_called()Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
26. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should delegate to _record_async_pipeline_execute in RECORD mode ✓
Details
- Ensures async pipeline commands are properly recorded for replay when in RECORD mode.
- Verifies correct delegation to
_record_async_pipeline_executeto maintain expected instrumentation behavior. - Prevents potential issues with missing or incorrect command recording in async cluster pipelines.
def test_record_mode_delegates_to_record_async_pipeline_execute(self, instrumentation):
"""In RECORD mode, should delegate to _record_async_pipeline_execute."""
commands = ["SET k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = AsyncMock()
sdk = self._make_sdk(TuskDriftMode.RECORD)
with patch.object(
instrumentation,
"_record_async_pipeline_execute",
new_callable=AsyncMock,
return_value=["async_recorded"],
) as mock_record_async:
result = asyncio.get_event_loop().run_until_complete(
instrumentation._traced_async_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
)
mock_record_async.assert_called_once()
assert result == ["async_recorded"]Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
27. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should call original_execute directly in DISABLED mode ✓
Details
- Ensures that instrumentation does not alter or wrap the execution when mode is DISABLED.
- Verifies that the original pipeline execution logic is preserved without additional tracing or modification.
def test_disabled_mode_calls_original_execute(self, instrumentation):
"""In DISABLED mode, _traced_cluster_pipeline_execute should call original_execute directly."""
pipeline = SimpleNamespace(command_stack=[])
original_execute = MagicMock(return_value=["result"])
sdk = self._make_sdk(TuskDriftMode.DISABLED)
result = instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
original_execute.assert_called_once_with(pipeline)
assert result == ["result"]Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
28. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should route to handle_replay_mode in REPLAY mode ✓
Details
- Verifies that in REPLAY mode, the function avoids real Redis calls.
- Ensures that
handle_replay_modeis invoked instead of executing the pipeline. - Prevents unintended side effects during replay by not calling the original execute method.
def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
"""In REPLAY mode, should route to handle_replay_mode."""
commands = ["SET k v", "GET k"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = MagicMock()
sdk = self._make_sdk(TuskDriftMode.REPLAY)
with patch(
"drift.instrumentation.redis.instrumentation.handle_replay_mode",
return_value=["mocked_result"],
) as mock_handle_replay:
result = instrumentation._traced_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
mock_handle_replay.assert_called_once()
assert result == ["mocked_result"]
original_execute.assert_not_called()Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
29. [High value] _traced_cluster_pipeline_execute should route to handle_record_mode in RECORD mode ✓
Details
- Ensures that when in RECORD mode, the function delegates command handling to
handle_record_mode. - Validates that commands are properly recorded for later replay, maintaining expected instrumentation behavior.
def test_record_mode_uses_handle_record_mode(self, instrumentation):
"""In RECORD mode, should route to handle_record_mode."""
commands = ["SET k v"]
pipeline = SimpleNamespace(command_stack=commands)
original_execute = MagicMock()
sdk = self._make_sdk(TuskDriftMode.RECORD)
with patch(
"drift.instrumentation.redis.instrumentation.handle_record_mode",
return_value=["recorded"],
) as mock_handle_record:
result = instrumentation._traced_cluster_pipeline_execute(
pipeline, original_execute, sdk, args=(), kwargs={}
)
mock_handle_record.assert_called_once()
assert result == ["recorded"]Test execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
30. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should snapshot command queue before original_execute runs ✓
Details
- Verifies that the command queue is captured before being cleared by the original execute method.
- Prevents loss of command data necessary for accurate record/replay of cluster pipeline operations.
- Ensures the snapshot contains the intended commands, not an empty or cleared list.
def test_command_queue_snapshotted_before_execution(self, instrumentation):
"""Command queue must be snapshotted before original_execute runs.
ClusterPipeline.execute() clears the queue in its finally block, so
_traced_cluster_pipeline_execute must capture it beforehand.
"""
commands_before = ["SET k v", "INCR counter"]
# Simulate ClusterPipeline resetting queue during execute
pipeline = MagicMock()
pipeline._execution_strategy = SimpleNamespace(_command_queue=commands_before[:])
def side_effect_execute(p, *a, **kw):
# Simulate ClusterPipeline clearing queue in finally
pipeline._execution_strategy._command_queue.clear()
return ["ok", 1]
original_execute = MagicMock(side_effect=side_effect_execute)
sdk = self._make_sdk(TuskDriftMode.RECORD)
captured_command_stack = None
def fake_record_pipeline_execute(p, orig, s, a, kw, cmd_str, cmd_stack, is_pre):
nonlocal captured_command_stack
captured_command_stack = cmd_stack
return ["ok", 1]
with patch.object(instrumentation, "_record_pipeline_execute", side_effect=fake_record_pipeline_execute):
with patch(
"drift.instrumentation.redis.instrumentation.handle_record_mode",
side_effect=lambda original_function_call, record_mode_handler, span_kind: record_mode_handler(False),
):
instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
# The snapshot should contain the original commands, not the cleared list
assert captured_command_stack == commands_beforeTest execution raw output
Exit code: 0
Stdout:
.............................. [100%]
30 passed in 0.19s
Legend:
✓ - test passed (your code handles the scenario)
✗ - test failed (your code does not handle the scenario)