Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/master-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,12 @@ jobs:
use_oidc: true
directory: coverage-reports
fail_ci_if_error: false

- name: Upload coverage to Datadog
if: always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: coverage-reports
format: cobertura
9 changes: 9 additions & 0 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ jobs:
use_oidc: true
directory: coverage-reports
fail_ci_if_error: false

- name: Upload coverage to Datadog
if: always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: coverage-reports
format: cobertura
9 changes: 9 additions & 0 deletions .github/workflows/pr-all-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,12 @@ jobs:
use_oidc: true
directory: coverage-reports
fail_ci_if_error: false

- name: Upload coverage to Datadog
if: always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: coverage-reports
format: cobertura
9 changes: 9 additions & 0 deletions .github/workflows/pr-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ jobs:
use_oidc: true
directory: coverage-reports
fail_ci_if_error: false

- name: Upload coverage to Datadog
if: always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: coverage-reports
format: cobertura
9 changes: 9 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ jobs:
directory: coverage-reports
fail_ci_if_error: false

- name: Upload coverage to Datadog
if: always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: coverage-reports
format: cobertura

check:
needs:
- test
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/test-fips-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,15 @@ jobs:
use_oidc: true
files: "${{ inputs.target || 'tls' }}/coverage.xml"
flags: "${{ inputs.target || 'tls' }}"

- name: Upload coverage to Datadog
if: >
!github.event.repository.private &&
always()
continue-on-error: true
uses: DataDog/coverage-upload-github-action@9bbbf86d16f7db1b14c5b885e61cf0d96053686a # v1.0.0
with:
api_key: ${{ secrets.DD_API_KEY }}
files: "${{ inputs.target || 'tls' }}/coverage.xml"
format: cobertura
flags: "${{ inputs.target || 'tls' }}"
2 changes: 2 additions & 0 deletions code-coverage.datadog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema-version: v1
carryforward: true
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/23224.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Avro deserialization for schemas with logical types (decimal, uuid, date, time, timestamp) that caused "Object of type ... is not JSON serializable" errors.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
"""Message deserialization for Kafka messages."""

import base64
import datetime
import decimal
import hashlib
import json
import uuid
from io import BytesIO

from bson import decode as bson_decode
Expand All @@ -17,6 +20,25 @@
SCHEMA_REGISTRY_MAGIC_BYTE = 0x00


class _AvroJSONEncoder(json.JSONEncoder):
"""JSON encoder that handles types returned by fastavro for Avro logical types."""

def default(self, obj):
if isinstance(obj, decimal.Decimal):
return str(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, datetime.date):
return obj.isoformat()
if isinstance(obj, datetime.time):
return obj.isoformat()
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
return super().default(obj)


def _read_varint(data):
shift = 0
result = 0
Expand Down Expand Up @@ -305,7 +327,7 @@ def _deserialize_avro(self, message: bytes, schema) -> str:
f"Read {bytes_read} bytes, but message has {total_bytes} bytes."
)

return json.dumps(data)
return json.dumps(data, cls=_AvroJSONEncoder)
except Exception as e:
raise ValueError(f"Failed to deserialize Avro message: {e}")

Expand Down
45 changes: 45 additions & 0 deletions kafka_actions/tests/test_message_deserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,51 @@ def test_avro_explicit_schema_registry_configuration(self):
assert deserialized_msg_sr.value['title'] == 'The Go Programming Language'
assert deserialized_msg_sr.value_schema_id == 350

def test_avro_logical_types_decimal_timestamp_uuid(self):
"""Test that Avro messages with all logical types deserialize correctly."""
log = MagicMock()
deserializer = MessageDeserializer(log)

avro_schema = json.dumps(
{
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "long"},
{
"name": "amount",
"type": {"type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 4},
},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "due_date", "type": {"type": "int", "logicalType": "date"}},
{"name": "due_time", "type": {"type": "int", "logicalType": "time-millis"}},
{"name": "tx_id", "type": {"type": "string", "logicalType": "uuid"}},
{"name": "memo", "type": "string"},
],
}
)

# Payment: id=42, amount=99.9500, created_at=2024-06-15T12:00:00Z, due_date=2024-07-01,
# due_time=14:30:00, tx_id=550e8400-e29b-41d4-a716-446655440000, memo="Test payment"
avro_message = (
b'T\x06\x0f@L\x80\xa8\xa6\xbc\x83d\x82\xb7\x02'
b'\x80\x89\xe41'
b'H550e8400-e29b-41d4-a716-446655440000\x18Test payment'
)

result_str, schema_id = deserializer.deserialize_message(avro_message, 'avro', avro_schema, False)
assert result_str is not None
assert schema_id is None

result = json.loads(result_str)
assert result['id'] == 42
assert result['amount'] == '99.9500'
assert '2024-06-15' in result['created_at']
assert result['due_date'] == '2024-07-01'
assert result['due_time'] == '14:30:00'
assert result['tx_id'] == '550e8400-e29b-41d4-a716-446655440000'
assert result['memo'] == 'Test payment'

def test_protobuf_explicit_schema_registry_configuration(self):
"""Test that explicit Protobuf schema registry configuration is enforced."""
log = MagicMock()
Expand Down
Loading