Skip to content

fix

0fca713
Select commit
Loading
Failed to load commit list.
Merged

feat: instrument RedisCluster and ClusterPipeline for record/replay #91

fix
0fca713
Select commit
Loading
Failed to load commit list.
tusk-dev / Tusk Tester succeeded Apr 11, 2026 in 4m 5s

Generated 30 tests - 30 passed

Go to Tusk to view more details or incorporate tests ↗

test_redis_instrumentation.py (RedisInstrumentation._deserialize_value, RedisInstrumentation._serialize_value, RedisInstrumentation._get_pipeline_commands, RedisInstrumentation._traced_cluster_pipeline_execute, RedisInstrumentation._traced_async_cluster_pipeline_execute) - 30 generated, 30 ✓

Full test file

File path: tests/unit/test_redis_instrumentation.py

"""Unit tests for RedisInstrumentation helper methods."""

from __future__ import annotations

import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from drift.core.types import TuskDriftMode
from drift.instrumentation.redis.instrumentation import RedisInstrumentation


class TestRedisInstrumentationHelpers:
    """Tests for RedisInstrumentation helper methods."""

    @pytest.fixture
    def instrumentation(self):
        """Create RedisInstrumentation instance without patching Redis."""
        return RedisInstrumentation(enabled=False)


class TestDeserializeValue(TestRedisInstrumentationHelpers):
    """Tests for _deserialize_value."""

    def test_string_passthrough(self, instrumentation):
        """Strings should be returned unchanged."""
        result = instrumentation._deserialize_value("hello")
        assert result == "hello"

    def test_none_passthrough(self, instrumentation):
        """None should be returned unchanged."""
        result = instrumentation._deserialize_value(None)
        assert result is None

    def test_integer_float_converted_to_int(self, instrumentation):
        """Float values with no fractional part (e.g. 42.0) should be converted to int.

        Redis INCR returns an integer; JSON round-trip via float would lose the type.
        """
        result = instrumentation._deserialize_value(42.0)
        assert result == 42
        assert isinstance(result, int)

    def test_non_integer_float_returned_as_float(self, instrumentation):
        """Non-integer floats should be returned unchanged."""
        result = instrumentation._deserialize_value(3.14)
        assert result == 3.14
        assert isinstance(result, float)

    def test_float_wrapper_dict_returns_float(self, instrumentation):
        """{'__float__': True, 'value': v} wrapper should deserialize to float(v)."""
        result = instrumentation._deserialize_value({"__float__": True, "value": 1.5})
        assert result == 1.5
        assert isinstance(result, float)

    def test_float_wrapper_with_integer_value_returns_float_not_int(self, instrumentation):
        """__float__ wrapper should always return float, even when value is a whole number."""
        result = instrumentation._deserialize_value({"__float__": True, "value": 10.0})
        assert isinstance(result, float)
        assert result == 10.0

    def test_bytes_utf8_wrapper_returns_bytes(self, instrumentation):
        """{'__bytes__': True, 'encoding': 'utf8', 'value': str} should return bytes."""
        result = instrumentation._deserialize_value({"__bytes__": True, "encoding": "utf8", "value": "hello"})
        assert result == b"hello"
        assert isinstance(result, bytes)

    def test_list_deserializes_elements(self, instrumentation):
        """Lists should have each element deserialized recursively."""
        result = instrumentation._deserialize_value([42.0, "text", None])
        # 42.0 is an integer float — should become int
        assert result == [42, "text", None]
        assert isinstance(result[0], int)


class TestSerializeValue(TestRedisInstrumentationHelpers):
    """Tests for _serialize_value."""

    def test_float_serialized_to_wrapper(self, instrumentation):
        """Floats should be wrapped in {'__float__': True, 'value': ...}."""
        result = instrumentation._serialize_value(3.14)
        assert result == {"__float__": True, "value": 3.14}

    def test_float_zero_serialized_to_wrapper(self, instrumentation):
        """Float zero should also be wrapped."""
        result = instrumentation._serialize_value(0.0)
        assert result == {"__float__": True, "value": 0.0}

    def test_int_returned_as_is(self, instrumentation):
        """Integers should be returned as-is (no wrapping)."""
        result = instrumentation._serialize_value(42)
        assert result == 42
        assert isinstance(result, int)

    def test_bool_returned_as_is(self, instrumentation):
        """Bools should be returned as-is."""
        assert instrumentation._serialize_value(True) is True
        assert instrumentation._serialize_value(False) is False

    def test_string_returned_as_is(self, instrumentation):
        """Strings should be returned unchanged."""
        result = instrumentation._serialize_value("hello")
        assert result == "hello"

    def test_none_returned_as_is(self, instrumentation):
        """None should be returned unchanged."""
        result = instrumentation._serialize_value(None)
        assert result is None

    def test_bytes_utf8_serialized_to_wrapper(self, instrumentation):
        """Bytes that decode as UTF-8 should be wrapped in __bytes__ dict."""
        result = instrumentation._serialize_value(b"hello")
        assert result == {"__bytes__": True, "encoding": "utf8", "value": "hello"}

    def test_list_with_float_serialized_recursively(self, instrumentation):
        """Lists containing floats should have their elements serialized."""
        result = instrumentation._serialize_value([1.5, "x", 99])
        assert result == [{"__float__": True, "value": 1.5}, "x", 99]

    def test_serialize_deserialize_float_roundtrip(self, instrumentation):
        """Serializing then deserializing a float should recover the original value."""
        original = 2.71828
        serialized = instrumentation._serialize_value(original)
        deserialized = instrumentation._deserialize_value(serialized)
        assert deserialized == original
        assert isinstance(deserialized, float)


class TestGetPipelineCommands(TestRedisInstrumentationHelpers):
    """Tests for _get_pipeline_commands."""

    def test_returns_command_stack_attribute(self, instrumentation):
        """Should return command_stack when pipeline has that attribute (standard Pipeline)."""
        commands = ["cmd1", "cmd2"]
        pipeline = SimpleNamespace(command_stack=commands)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands

    def test_returns_execution_strategy_private_command_queue(self, instrumentation):
        """Should return _execution_strategy._command_queue for sync ClusterPipeline."""
        commands = ["SET key val", "GET key"]
        strategy = SimpleNamespace(_command_queue=commands)
        pipeline = SimpleNamespace(_execution_strategy=strategy)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands

    def test_returns_execution_strategy_public_command_queue_fallback(self, instrumentation):
        """Should return _execution_strategy.command_queue when _command_queue is absent."""
        commands = ["SET", "GET"]
        strategy = SimpleNamespace(command_queue=commands)
        pipeline = SimpleNamespace(_execution_strategy=strategy)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands

    def test_returns_direct_command_queue_for_async_cluster(self, instrumentation):
        """Should return _command_queue directly for async ClusterPipeline."""
        commands = ["HSET", "HGET"]
        pipeline = SimpleNamespace(_command_queue=commands)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands

    def test_returns_empty_list_when_no_known_attributes(self, instrumentation):
        """Should return empty list when pipeline has none of the known command storage attributes."""
        pipeline = SimpleNamespace()
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == []

    def test_execution_strategy_takes_priority_over_command_stack(self, instrumentation):
        """_execution_strategy._command_queue must take priority over command_stack (redis-py #3703)."""
        real_commands = ["SET k v"]
        fake_empty = []
        strategy = SimpleNamespace(_command_queue=real_commands)
        # ClusterPipeline has both attrs but command_stack is always empty
        pipeline = SimpleNamespace(_execution_strategy=strategy, command_stack=fake_empty)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result is real_commands


class TestTracedClusterPipelineExecute(TestRedisInstrumentationHelpers):
    """Tests for _traced_cluster_pipeline_execute."""

    def _make_sdk(self, mode: TuskDriftMode) -> MagicMock:
        sdk = MagicMock()
        sdk.mode = mode
        return sdk

    def test_disabled_mode_calls_original_execute(self, instrumentation):
        """In DISABLED mode, _traced_cluster_pipeline_execute should call original_execute directly."""
        pipeline = SimpleNamespace(command_stack=[])
        original_execute = MagicMock(return_value=["result"])
        sdk = self._make_sdk(TuskDriftMode.DISABLED)

        result = instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})

        original_execute.assert_called_once_with(pipeline)
        assert result == ["result"]

    def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
        """In REPLAY mode, should route to handle_replay_mode."""
        commands = ["SET k v", "GET k"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = MagicMock()
        sdk = self._make_sdk(TuskDriftMode.REPLAY)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_replay_mode",
            return_value=["mocked_result"],
        ) as mock_handle_replay:
            result = instrumentation._traced_cluster_pipeline_execute(
                pipeline, original_execute, sdk, args=(), kwargs={}
            )

        mock_handle_replay.assert_called_once()
        assert result == ["mocked_result"]
        original_execute.assert_not_called()

    def test_record_mode_uses_handle_record_mode(self, instrumentation):
        """In RECORD mode, should route to handle_record_mode."""
        commands = ["SET k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = MagicMock()
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_record_mode",
            return_value=["recorded"],
        ) as mock_handle_record:
            result = instrumentation._traced_cluster_pipeline_execute(
                pipeline, original_execute, sdk, args=(), kwargs={}
            )

        mock_handle_record.assert_called_once()
        assert result == ["recorded"]

    def test_command_queue_snapshotted_before_execution(self, instrumentation):
        """Command queue must be snapshotted before original_execute runs.

        ClusterPipeline.execute() clears the queue in its finally block, so
        _traced_cluster_pipeline_execute must capture it beforehand.
        """
        commands_before = ["SET k v", "INCR counter"]
        # Simulate ClusterPipeline resetting queue during execute
        pipeline = MagicMock()
        pipeline._execution_strategy = SimpleNamespace(_command_queue=commands_before[:])

        def side_effect_execute(p, *a, **kw):
            # Simulate ClusterPipeline clearing queue in finally
            pipeline._execution_strategy._command_queue.clear()
            return ["ok", 1]

        original_execute = MagicMock(side_effect=side_effect_execute)
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        captured_command_stack = None

        def fake_record_pipeline_execute(p, orig, s, a, kw, cmd_str, cmd_stack, is_pre):
            nonlocal captured_command_stack
            captured_command_stack = cmd_stack
            return ["ok", 1]

        with patch.object(instrumentation, "_record_pipeline_execute", side_effect=fake_record_pipeline_execute):
            with patch(
                "drift.instrumentation.redis.instrumentation.handle_record_mode",
                side_effect=lambda original_function_call, record_mode_handler, span_kind: record_mode_handler(False),
            ):
                instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})

        # The snapshot should contain the original commands, not the cleared list
        assert captured_command_stack == commands_before


class TestTracedAsyncClusterPipelineExecute(TestRedisInstrumentationHelpers):
    """Tests for _traced_async_cluster_pipeline_execute."""

    def _make_sdk(self, mode: TuskDriftMode) -> MagicMock:
        sdk = MagicMock()
        sdk.mode = mode
        return sdk

    def test_disabled_mode_calls_original_execute(self, instrumentation):
        """In DISABLED mode, should await original_execute directly."""
        pipeline = SimpleNamespace(command_stack=[])
        original_execute = AsyncMock(return_value=["async_result"])
        sdk = self._make_sdk(TuskDriftMode.DISABLED)

        result = asyncio.get_event_loop().run_until_complete(
            instrumentation._traced_async_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
        )

        original_execute.assert_called_once_with(pipeline)
        assert result == ["async_result"]

    def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
        """In REPLAY mode, should route to handle_replay_mode."""
        commands = ["HSET h k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = AsyncMock()
        sdk = self._make_sdk(TuskDriftMode.REPLAY)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_replay_mode",
            return_value=["async_mocked"],
        ) as mock_handle_replay:
            result = asyncio.get_event_loop().run_until_complete(
                instrumentation._traced_async_cluster_pipeline_execute(
                    pipeline, original_execute, sdk, args=(), kwargs={}
                )
            )

        mock_handle_replay.assert_called_once()
        assert result == ["async_mocked"]
        original_execute.assert_not_called()

    def test_record_mode_delegates_to_record_async_pipeline_execute(self, instrumentation):
        """In RECORD mode, should delegate to _record_async_pipeline_execute."""
        commands = ["SET k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = AsyncMock()
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        with patch.object(
            instrumentation,
            "_record_async_pipeline_execute",
            new_callable=AsyncMock,
            return_value=["async_recorded"],
        ) as mock_record_async:
            result = asyncio.get_event_loop().run_until_complete(
                instrumentation._traced_async_cluster_pipeline_execute(
                    pipeline, original_execute, sdk, args=(), kwargs={}
                )
            )

        mock_record_async.assert_called_once()
        assert result == ["async_recorded"]

1. [High value] RedisInstrumentation._deserialize_value should return strings unchanged ✓

Details
  • Verifies that string inputs are not modified during deserialization.
  • Prevents unintended data transformation and maintains data integrity.
    def test_string_passthrough(self, instrumentation):
        """Strings should be returned unchanged."""
        result = instrumentation._deserialize_value("hello")
        assert result == "hello"
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

2. [High value] RedisInstrumentation._deserialize_value should return None unchanged when input is None ✓

Details
  • Ensures that None values are not altered during deserialization.
  • Prevents unintended conversion of null Redis responses, preserving data integrity.
    def test_none_passthrough(self, instrumentation):
        """None should be returned unchanged."""
        result = instrumentation._deserialize_value(None)
        assert result is None
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

3. [High value] RedisInstrumentation._deserialize_value should convert integer floats to int ✓

Details
  • Prevents type mismatches by ensuring values like 42.0 are returned as int, not float.
  • Aligns deserialization behavior with Redis's native integer return for commands like INCR.
  • Avoids issues caused by JSON round-trip converting integers to floats.
    def test_integer_float_converted_to_int(self, instrumentation):
        """Float values with no fractional part (e.g. 42.0) should be converted to int.

        Redis INCR returns an integer; JSON round-trip via float would lose the type.
        """
        result = instrumentation._deserialize_value(42.0)
        assert result == 42
        assert isinstance(result, int)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

4. [High value] RedisInstrumentation._deserialize_value should return non-integer floats unchanged as float ✓

Details
  • Ensures that non-integer float values are not altered during deserialization.
  • Prevents unintended type conversion or rounding of float values.
  • Maintains data integrity for float values stored in Redis.
    def test_non_integer_float_returned_as_float(self, instrumentation):
        """Non-integer floats should be returned unchanged."""
        result = instrumentation._deserialize_value(3.14)
        assert result == 3.14
        assert isinstance(result, float)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

5. [High value] RedisInstrumentation._deserialize_value should deserialize float wrapper dict to float value ✓

Details
  • Ensures that custom float wrappers in the form {'float': True, 'value': v} are correctly converted to float values.
  • Validates accurate replay of Redis responses involving float values.
  • Prevents potential data type mismatches during deserialization.
    def test_float_wrapper_dict_returns_float(self, instrumentation):
        """{'__float__': True, 'value': v} wrapper should deserialize to float(v)."""
        result = instrumentation._deserialize_value({"__float__": True, "value": 1.5})
        assert result == 1.5
        assert isinstance(result, float)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

6. [High value] RedisInstrumentation._deserialize_value should always return float for __float__ wrapper even if value is a whole number ✓

Details
  • Verifies that values wrapped with __float__ are consistently deserialized as float, not int.
  • Prevents unintended type conversion and ensures type fidelity for downstream consumers.
    def test_float_wrapper_with_integer_value_returns_float_not_int(self, instrumentation):
        """__float__ wrapper should always return float, even when value is a whole number."""
        result = instrumentation._deserialize_value({"__float__": True, "value": 10.0})
        assert isinstance(result, float)
        assert result == 10.0
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

7. [High value] RedisInstrumentation._deserialize_value should return bytes for bytes wrapper with utf8 encoding ✓

Details
  • Ensures that dictionaries with {'__bytes__': True, 'encoding': 'utf8', 'value': str} are properly deserialized to bytes.
  • Critical for correct handling of binary data stored as encoded strings in Redis.
    def test_bytes_utf8_wrapper_returns_bytes(self, instrumentation):
        """{'__bytes__': True, 'encoding': 'utf8', 'value': str} should return bytes."""
        result = instrumentation._deserialize_value({"__bytes__": True, "encoding": "utf8", "value": "hello"})
        assert result == b"hello"
        assert isinstance(result, bytes)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

8. [High value] RedisInstrumentation._deserialize_value should recursively deserialize elements in a list ✓

Details
  • Verifies that each element in a list is deserialized individually, maintaining correct types.
  • Ensures nested data structures are handled properly during deserialization.
  • Prevents type inconsistencies when lists contain mixed or convertible types.
    def test_list_deserializes_elements(self, instrumentation):
        """Lists should have each element deserialized recursively."""
        result = instrumentation._deserialize_value([42.0, "text", None])
        # 42.0 is an integer float — should become int
        assert result == [42, "text", None]
        assert isinstance(result[0], int)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

9. [High value] RedisInstrumentation._get_pipeline_commands should return command_stack when present ✓

Details
  • Verifies that the function correctly extracts commands from the command_stack attribute of a standard Pipeline object.
  • Ensures instrumentation relies on accurate command retrieval for standard Pipeline usage.
    def test_returns_command_stack_attribute(self, instrumentation):
        """Should return command_stack when pipeline has that attribute (standard Pipeline)."""
        commands = ["cmd1", "cmd2"]
        pipeline = SimpleNamespace(command_stack=commands)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

10. [High value] RedisInstrumentation._get_pipeline_commands should return _execution_strategy._command_queue for sync ClusterPipeline

Details
  • Ensures command extraction logic works for synchronous cluster pipelines.
  • Critical for accurate record/replay of Redis cluster operations.
  • Verifies access to private command queue attribute in pipeline object.
    def test_returns_execution_strategy_private_command_queue(self, instrumentation):
        """Should return _execution_strategy._command_queue for sync ClusterPipeline."""
        commands = ["SET key val", "GET key"]
        strategy = SimpleNamespace(_command_queue=commands)
        pipeline = SimpleNamespace(_execution_strategy=strategy)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

11. [High value] RedisInstrumentation._get_pipeline_commands should return _execution_strategy.command_queue when _command_queue is absent ✓

Details
  • Validates fallback mechanism for command extraction from pipeline objects
  • Ensures compatibility with pipeline implementations lacking _command_queue attribute
  • Prevents errors by confirming alternative attribute access is handled correctly
    def test_returns_execution_strategy_public_command_queue_fallback(self, instrumentation):
        """Should return _execution_strategy.command_queue when _command_queue is absent."""
        commands = ["SET", "GET"]
        strategy = SimpleNamespace(command_queue=commands)
        pipeline = SimpleNamespace(_execution_strategy=strategy)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

12. [High value] RedisInstrumentation._get_pipeline_commands should return _command_queue directly for async ClusterPipeline

Details
  • Validates that async ClusterPipeline objects are handled correctly by returning their _command_queue attribute.
  • Ensures compatibility with async Redis instrumentation workflows.
  • Prevents potential issues with command extraction in async pipeline scenarios.
    def test_returns_direct_command_queue_for_async_cluster(self, instrumentation):
        """Should return _command_queue directly for async ClusterPipeline."""
        commands = ["HSET", "HGET"]
        pipeline = SimpleNamespace(_command_queue=commands)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == commands
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

13. [High value] RedisInstrumentation._get_pipeline_commands should return empty list when no known command storage attributes are present ✓

Details
  • Ensures method handles pipelines without expected command storage attributes gracefully.
  • Prevents potential errors or exceptions in edge cases where pipeline structure is unexpected.
  • Confirms robustness of command extraction logic.
    def test_returns_empty_list_when_no_known_attributes(self, instrumentation):
        """Should return empty list when pipeline has none of the known command storage attributes."""
        pipeline = SimpleNamespace()
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result == []
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

14. [High value] RedisInstrumentation._get_pipeline_commands should prioritize _execution_strategy._command_queue over command_stack

Details
  • Verifies that _get_pipeline_commands returns commands from _execution_strategy._command_queue when both _execution_strategy._command_queue and command_stack are present.
  • Ensures compatibility with redis-py ClusterPipeline where command_stack may be empty but _execution_strategy._command_queue contains the actual commands.
  • Prevents instrumentation errors by guaranteeing the correct command source is used for accurate tracing.
    def test_execution_strategy_takes_priority_over_command_stack(self, instrumentation):
        """_execution_strategy._command_queue must take priority over command_stack (redis-py #3703)."""
        real_commands = ["SET k v"]
        fake_empty = []
        strategy = SimpleNamespace(_command_queue=real_commands)
        # ClusterPipeline has both attrs but command_stack is always empty
        pipeline = SimpleNamespace(_execution_strategy=strategy, command_stack=fake_empty)
        result = instrumentation._get_pipeline_commands(pipeline)
        assert result is real_commands
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

15. [High value] RedisInstrumentation._serialize_value should wrap floats in {'__float__': True, 'value': ...}

Details
  • Ensures float values are serialized with a wrapper for type preservation.
  • Prevents loss of float type information during record/replay cycles.
  • Critical for accurate deserialization and data integrity.
    def test_float_serialized_to_wrapper(self, instrumentation):
        """Floats should be wrapped in {'__float__': True, 'value': ...}."""
        result = instrumentation._serialize_value(3.14)
        assert result == {"__float__": True, "value": 3.14}
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

16. [High value] RedisInstrumentation._serialize_value should wrap float zero in float wrapper object ✓

Details
  • Ensures that 0.0 is serialized as a dictionary with __float__ and value keys.
  • Prevents inconsistencies or edge case errors when handling float zero values in Redis instrumentation.
    def test_float_zero_serialized_to_wrapper(self, instrumentation):
        """Float zero should also be wrapped."""
        result = instrumentation._serialize_value(0.0)
        assert result == {"__float__": True, "value": 0.0}
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

17. [High value] RedisInstrumentation._serialize_value should return integers as-is without wrapping ✓

Details
  • Verifies that integer values are not altered or wrapped during serialization.
  • Maintains type integrity and avoids unnecessary processing for integer inputs.
  • Ensures efficient handling of numeric data in Redis instrumentation.
    def test_int_returned_as_is(self, instrumentation):
        """Integers should be returned as-is (no wrapping)."""
        result = instrumentation._serialize_value(42)
        assert result == 42
        assert isinstance(result, int)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

18. [High value] RedisInstrumentation._serialize_value should return bools as-is ✓

Details
  • Ensures boolean values are not converted or serialized to other types.
  • Maintains logical correctness when handling boolean data in Redis instrumentation.
    def test_bool_returned_as_is(self, instrumentation):
        """Bools should be returned as-is."""
        assert instrumentation._serialize_value(True) is True
        assert instrumentation._serialize_value(False) is False
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

19. [High value] RedisInstrumentation._serialize_value should return strings unchanged ✓

Details
  • Verifies that string inputs are not altered during serialization.
  • Prevents unintended data transformation for string values.
  • Maintains data integrity when handling string types.
    def test_string_returned_as_is(self, instrumentation):
        """Strings should be returned unchanged."""
        result = instrumentation._serialize_value("hello")
        assert result == "hello"
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

20. [High value] RedisInstrumentation._serialize_value should return None unchanged ✓

Details
  • Ensures that None values are not altered during serialization.
  • Validates correct handling of null values, which is important for downstream logic.
    def test_none_returned_as_is(self, instrumentation):
        """None should be returned unchanged."""
        result = instrumentation._serialize_value(None)
        assert result is None
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

21. [High value] RedisInstrumentation._serialize_value should wrap UTF-8 bytes in __bytes__ dict ✓

Details
  • Ensures UTF-8 encoded bytes are serialized with encoding and type information.
  • Prevents loss of binary data fidelity during replay or deserialization.
  • Validates that the output format is consistent for bytes input.
    def test_bytes_utf8_serialized_to_wrapper(self, instrumentation):
        """Bytes that decode as UTF-8 should be wrapped in __bytes__ dict."""
        result = instrumentation._serialize_value(b"hello")
        assert result == {"__bytes__": True, "encoding": "utf8", "value": "hello"}
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

22. [High value] RedisInstrumentation._serialize_value should recursively serialize list elements, including floats ✓

Details
  • Verifies that lists are processed element-wise for serialization.
  • Ensures that float values within lists are wrapped with the expected serialization format.
  • Confirms that mixed-type lists retain correct serialization for each element.
    def test_list_with_float_serialized_recursively(self, instrumentation):
        """Lists containing floats should have their elements serialized."""
        result = instrumentation._serialize_value([1.5, "x", 99])
        assert result == [{"__float__": True, "value": 1.5}, "x", 99]
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

23. [High value] RedisInstrumentation._serialize_value and _deserialize_value should roundtrip float values ✓

Details
  • Ensures that float values are accurately preserved through serialization and deserialization.
  • Critical for maintaining data integrity during record/replay operations.
  • Verifies that the deserialized value matches the original and retains its type.
    def test_serialize_deserialize_float_roundtrip(self, instrumentation):
        """Serializing then deserializing a float should recover the original value."""
        original = 2.71828
        serialized = instrumentation._serialize_value(original)
        deserialized = instrumentation._deserialize_value(serialized)
        assert deserialized == original
        assert isinstance(deserialized, float)
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

24. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should await original_execute directly in DISABLED mode ✓

Details
  • Ensures that instrumentation does not alter async execution when mode is DISABLED.
  • Verifies that the original async pipeline execution is preserved without additional tracing or wrapping.
  • Confirms that the function awaits and returns the result from original_execute as expected.
    def test_disabled_mode_calls_original_execute(self, instrumentation):
        """In DISABLED mode, should await original_execute directly."""
        pipeline = SimpleNamespace(command_stack=[])
        original_execute = AsyncMock(return_value=["async_result"])
        sdk = self._make_sdk(TuskDriftMode.DISABLED)

        result = asyncio.get_event_loop().run_until_complete(
            instrumentation._traced_async_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})
        )

        original_execute.assert_called_once_with(pipeline)
        assert result == ["async_result"]
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

25. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should route to handle_replay_mode in REPLAY mode ✓

Details
  • Verifies async replay mode prevents real Redis calls by routing to handle_replay_mode.
  • Ensures that original_execute is not called when in REPLAY mode.
  • Confirms correct handling of command stack during replay.
    def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
        """In REPLAY mode, should route to handle_replay_mode."""
        commands = ["HSET h k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = AsyncMock()
        sdk = self._make_sdk(TuskDriftMode.REPLAY)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_replay_mode",
            return_value=["async_mocked"],
        ) as mock_handle_replay:
            result = asyncio.get_event_loop().run_until_complete(
                instrumentation._traced_async_cluster_pipeline_execute(
                    pipeline, original_execute, sdk, args=(), kwargs={}
                )
            )

        mock_handle_replay.assert_called_once()
        assert result == ["async_mocked"]
        original_execute.assert_not_called()
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

26. [High value] RedisInstrumentation._traced_async_cluster_pipeline_execute should delegate to _record_async_pipeline_execute in RECORD mode ✓

Details
  • Ensures async pipeline commands are properly recorded for replay when in RECORD mode.
  • Verifies correct delegation to _record_async_pipeline_execute to maintain expected instrumentation behavior.
  • Prevents potential issues with missing or incorrect command recording in async cluster pipelines.
    def test_record_mode_delegates_to_record_async_pipeline_execute(self, instrumentation):
        """In RECORD mode, should delegate to _record_async_pipeline_execute."""
        commands = ["SET k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = AsyncMock()
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        with patch.object(
            instrumentation,
            "_record_async_pipeline_execute",
            new_callable=AsyncMock,
            return_value=["async_recorded"],
        ) as mock_record_async:
            result = asyncio.get_event_loop().run_until_complete(
                instrumentation._traced_async_cluster_pipeline_execute(
                    pipeline, original_execute, sdk, args=(), kwargs={}
                )
            )

        mock_record_async.assert_called_once()
        assert result == ["async_recorded"]
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

27. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should call original_execute directly in DISABLED mode ✓

Details
  • Ensures that instrumentation does not alter or wrap the execution when mode is DISABLED.
  • Verifies that the original pipeline execution logic is preserved without additional tracing or modification.
    def test_disabled_mode_calls_original_execute(self, instrumentation):
        """In DISABLED mode, _traced_cluster_pipeline_execute should call original_execute directly."""
        pipeline = SimpleNamespace(command_stack=[])
        original_execute = MagicMock(return_value=["result"])
        sdk = self._make_sdk(TuskDriftMode.DISABLED)

        result = instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})

        original_execute.assert_called_once_with(pipeline)
        assert result == ["result"]
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

28. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should route to handle_replay_mode in REPLAY mode ✓

Details
  • Verifies that in REPLAY mode, the function avoids real Redis calls.
  • Ensures that handle_replay_mode is invoked instead of executing the pipeline.
  • Prevents unintended side effects during replay by not calling the original execute method.
    def test_replay_mode_uses_handle_replay_mode(self, instrumentation):
        """In REPLAY mode, should route to handle_replay_mode."""
        commands = ["SET k v", "GET k"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = MagicMock()
        sdk = self._make_sdk(TuskDriftMode.REPLAY)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_replay_mode",
            return_value=["mocked_result"],
        ) as mock_handle_replay:
            result = instrumentation._traced_cluster_pipeline_execute(
                pipeline, original_execute, sdk, args=(), kwargs={}
            )

        mock_handle_replay.assert_called_once()
        assert result == ["mocked_result"]
        original_execute.assert_not_called()
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

29. [High value] _traced_cluster_pipeline_execute should route to handle_record_mode in RECORD mode ✓

Details
  • Ensures that when in RECORD mode, the function delegates command handling to handle_record_mode.
  • Validates that commands are properly recorded for later replay, maintaining expected instrumentation behavior.
    def test_record_mode_uses_handle_record_mode(self, instrumentation):
        """In RECORD mode, should route to handle_record_mode."""
        commands = ["SET k v"]
        pipeline = SimpleNamespace(command_stack=commands)
        original_execute = MagicMock()
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        with patch(
            "drift.instrumentation.redis.instrumentation.handle_record_mode",
            return_value=["recorded"],
        ) as mock_handle_record:
            result = instrumentation._traced_cluster_pipeline_execute(
                pipeline, original_execute, sdk, args=(), kwargs={}
            )

        mock_handle_record.assert_called_once()
        assert result == ["recorded"]
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

30. [High value] RedisInstrumentation._traced_cluster_pipeline_execute should snapshot command queue before original_execute runs ✓

Details
  • Verifies that the command queue is captured before being cleared by the original execute method.
  • Prevents loss of command data necessary for accurate record/replay of cluster pipeline operations.
  • Ensures the snapshot contains the intended commands, not an empty or cleared list.
    def test_command_queue_snapshotted_before_execution(self, instrumentation):
        """Command queue must be snapshotted before original_execute runs.

        ClusterPipeline.execute() clears the queue in its finally block, so
        _traced_cluster_pipeline_execute must capture it beforehand.
        """
        commands_before = ["SET k v", "INCR counter"]
        # Simulate ClusterPipeline resetting queue during execute
        pipeline = MagicMock()
        pipeline._execution_strategy = SimpleNamespace(_command_queue=commands_before[:])

        def side_effect_execute(p, *a, **kw):
            # Simulate ClusterPipeline clearing queue in finally
            pipeline._execution_strategy._command_queue.clear()
            return ["ok", 1]

        original_execute = MagicMock(side_effect=side_effect_execute)
        sdk = self._make_sdk(TuskDriftMode.RECORD)

        captured_command_stack = None

        def fake_record_pipeline_execute(p, orig, s, a, kw, cmd_str, cmd_stack, is_pre):
            nonlocal captured_command_stack
            captured_command_stack = cmd_stack
            return ["ok", 1]

        with patch.object(instrumentation, "_record_pipeline_execute", side_effect=fake_record_pipeline_execute):
            with patch(
                "drift.instrumentation.redis.instrumentation.handle_record_mode",
                side_effect=lambda original_function_call, record_mode_handler, span_kind: record_mode_handler(False),
            ):
                instrumentation._traced_cluster_pipeline_execute(pipeline, original_execute, sdk, args=(), kwargs={})

        # The snapshot should contain the original commands, not the cleared list
        assert captured_command_stack == commands_before
Test execution raw output

Exit code: 0

Stdout:

..............................                                           [100%]
30 passed in 0.19s

Legend:
✓ - test passed (your code handles the scenario)
✗ - test failed (your code does not handle the scenario)