Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions drift/core/communication/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def _python_to_value(value: Any) -> Any:
from betterproto.lib.google.protobuf import ListValue, Value

if value is None:
from betterproto.lib.google.protobuf import NullValue

return Value(null_value=NullValue.NULL_VALUE) # type: ignore[arg-type]
# betterproto 2.0.0b7 uses integer 0 for null value (NullValue.NULL_VALUE doesn't exist)
return Value(null_value=0) # type: ignore[arg-type]
elif isinstance(value, bool):
return Value(bool_value=value)
elif isinstance(value, (int, float)):
Expand Down
5 changes: 2 additions & 3 deletions drift/core/span_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ def _value_to_proto(value: Any) -> ProtoValue:
proto_value = ProtoValue()

if value is None:
from betterproto.lib.google.protobuf import NullValue

proto_value.null_value = NullValue.NULL_VALUE # type: ignore[assignment]
# betterproto 2.0.0b7 uses integer 0 for null value (NullValue.NULL_VALUE doesn't exist)
proto_value.null_value = 0 # type: ignore[assignment]
elif isinstance(value, bool):
proto_value.bool_value = value
elif isinstance(value, (int, float)):
Expand Down
5 changes: 2 additions & 3 deletions drift/core/tracing/adapters/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,8 @@ def _dict_to_struct(data: dict[str, Any]) -> Struct:
def value_to_proto(val: Any) -> Value:
"""Convert a Python value to protobuf Value."""
if val is None:
from betterproto.lib.google.protobuf import NullValue

return Value(null_value=NullValue.NULL_VALUE) # type: ignore[arg-type]
# betterproto 2.0.0b7 uses integer 0 for null value (NullValue.NULL_VALUE doesn't exist)
return Value(null_value=0) # type: ignore[arg-type]
elif isinstance(val, bool):
return Value(bool_value=val)
elif isinstance(val, (int, float)):
Expand Down
156 changes: 156 additions & 0 deletions drift/instrumentation/redis/e2e-tests/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,162 @@ def redis_keys(pattern):
return jsonify({"error": str(e)}), 500


@app.route("/test/mget-mset", methods=["GET"])
def test_mget_mset():
"""Test MGET/MSET - multiple key operations."""
try:
# MSET multiple keys
redis_client.mset({"test:mset:key1": "value1", "test:mset:key2": "value2", "test:mset:key3": "value3"})
# MGET multiple keys
result = redis_client.mget(["test:mset:key1", "test:mset:key2", "test:mset:key3", "test:mset:nonexistent"])
# Clean up
redis_client.delete("test:mset:key1", "test:mset:key2", "test:mset:key3")
return jsonify({"success": True, "result": result})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/pipeline-basic", methods=["GET"])
def test_pipeline_basic():
"""Test basic pipeline operations."""
try:
pipe = redis_client.pipeline()
pipe.set("test:pipe:key1", "value1")
pipe.set("test:pipe:key2", "value2")
pipe.get("test:pipe:key1")
pipe.get("test:pipe:key2")
results = pipe.execute()
# Clean up
redis_client.delete("test:pipe:key1", "test:pipe:key2")
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/pipeline-no-transaction", methods=["GET"])
def test_pipeline_no_transaction():
"""Test pipeline with transaction=False."""
try:
pipe = redis_client.pipeline(transaction=False)
pipe.set("test:pipe:notx:key1", "value1")
pipe.incr("test:pipe:notx:counter")
pipe.get("test:pipe:notx:key1")
results = pipe.execute()
# Clean up
redis_client.delete("test:pipe:notx:key1", "test:pipe:notx:counter")
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/async-pipeline", methods=["GET"])
def test_async_pipeline():
"""Test async pipeline operations using asyncio."""
import asyncio

import redis.asyncio as aioredis

async def run_async_pipeline():
# Create async Redis client
async_client = aioredis.Redis(
host=os.getenv("REDIS_HOST", "redis"),
port=int(os.getenv("REDIS_PORT", "6379")),
db=0,
decode_responses=True,
)

try:
# Create async pipeline
pipe = async_client.pipeline()
pipe.set("test:async:pipe:key1", "async_value1")
pipe.set("test:async:pipe:key2", "async_value2")
pipe.get("test:async:pipe:key1")
pipe.get("test:async:pipe:key2")
results = await pipe.execute()

# Clean up
await async_client.delete("test:async:pipe:key1", "test:async:pipe:key2")

return results
finally:
await async_client.aclose()

try:
results = asyncio.run(run_async_pipeline())
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/binary-data", methods=["GET"])
def test_binary_data():
"""Test binary data that cannot be decoded as UTF-8."""
try:
# Create a Redis client without decode_responses for binary data
binary_client = redis.Redis(
host=os.getenv("REDIS_HOST", "redis"),
port=int(os.getenv("REDIS_PORT", "6379")),
db=0,
decode_responses=False,
)

# Binary data that cannot be decoded as UTF-8
binary_value = bytes([0x80, 0x81, 0x82, 0xFF, 0xFE, 0xFD])

# Set binary data
binary_client.set("test:binary:key", binary_value)

# Get binary data back
retrieved = binary_client.get("test:binary:key")

# Clean up
binary_client.delete("test:binary:key")

return jsonify(
{
"success": True,
"original_hex": binary_value.hex(),
"retrieved_hex": retrieved.hex() if retrieved else None,
"match": binary_value == retrieved,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500


@app.route("/test/transaction-watch", methods=["GET"])
def test_transaction_watch():
"""Test transaction with WATCH pattern.

This tests whether WATCH/MULTI/EXEC transaction pattern works correctly.
"""
try:
# Set initial value
redis_client.set("test:watch:counter", "10")

# Start a watched transaction
pipe = redis_client.pipeline(transaction=True)
pipe.watch("test:watch:counter")

# Get current value (this happens outside the transaction)
current = int(redis_client.get("test:watch:counter"))

# Start the transaction
pipe.multi()
pipe.set("test:watch:counter", str(current + 5))
pipe.get("test:watch:counter")

# Execute
results = pipe.execute()

# Clean up
redis_client.delete("test:watch:counter")

return jsonify({"success": True, "initial_value": 10, "expected_final": 15, "results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500


if __name__ == "__main__":
sdk.mark_app_as_ready()
app.run(host="0.0.0.0", port=8000, debug=False)
17 changes: 15 additions & 2 deletions drift/instrumentation/redis/e2e-tests/src/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def make_request(method, endpoint, **kwargs):
# Get operations
make_request("GET", "/redis/get/test_key")
make_request("GET", "/redis/get/test_key_expiry")
# TODO: figure out why this test fails during replay
# make_request("GET", "/redis/get/nonexistent_key")
make_request("GET", "/redis/get/nonexistent_key")

# Increment operations
make_request("POST", "/redis/incr/counter")
Expand All @@ -49,4 +48,18 @@ def make_request(method, endpoint, **kwargs):
make_request("DELETE", "/redis/delete/test_key")
make_request("DELETE", "/redis/delete/counter")

make_request("GET", "/test/mget-mset")

# Pipeline operations
make_request("GET", "/test/pipeline-basic")
make_request("GET", "/test/pipeline-no-transaction")

# Async Pipeline operations
make_request("GET", "/test/async-pipeline")

# Binary data handling
make_request("GET", "/test/binary-data")

make_request("GET", "/test/transaction-watch")

print("\nAll requests completed successfully")
Loading