Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions python/pyfory/meta/meta_compressor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import zlib
from abc import ABC, abstractmethod
from typing import Optional


class MetaCompressor(ABC):
"""
An interface used to compress class metadata such as field names and types.
The implementation of this interface should be thread safe.
"""

@abstractmethod
def compress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""
Compress the given data.

Args:
data: The data to compress
offset: Starting offset in the data
size: Size of data to compress (if None, uses len(data) - offset)

Returns:
Compressed data as bytes
"""
pass

@abstractmethod
def decompress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""
Decompress the given data.

Args:
data: The compressed data to decompress
offset: Starting offset in the data
size: Size of data to decompress (if None, uses len(data) - offset)

Returns:
Decompressed data as bytes
"""
pass


class DeflaterMetaCompressor(MetaCompressor):
"""
A meta compressor based on zlib compression algorithm (equivalent to Java's Deflater).
This implementation is thread safe.
"""

def compress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""
Compress the given data using zlib.

Args:
data: The data to compress
offset: Starting offset in the data
size: Size of data to compress (if None, uses len(data) - offset)

Returns:
Compressed data as bytes
"""
if size is None:
size = len(data) - offset

if size <= 0:
return b""

# Use zlib.compress which is equivalent to Java's Deflater
return zlib.compress(data[offset : offset + size])

def decompress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""
Decompress the given data using zlib.

Args:
data: The compressed data to decompress
offset: Starting offset in the data
size: Size of data to decompress (if None, uses len(data) - offset)

Returns:
Decompressed data as bytes
"""
if size is None:
size = len(data) - offset

if size <= 0:
return b""

# Use zlib.decompress which is equivalent to Java's Inflater
return zlib.decompress(data[offset : offset + size])

def __hash__(self) -> int:
"""Return hash code based on class type."""
return hash(DeflaterMetaCompressor)

def __eq__(self, other) -> bool:
"""Check equality based on class type."""
if self is other:
return True
return other is not None and isinstance(other, DeflaterMetaCompressor)


def check_meta_compressor(compressor: MetaCompressor) -> MetaCompressor:
"""
Check whether MetaCompressor implements `__eq__/__hash__` method. If not implemented,
return TypeEqualMetaCompressor instead which compare equality by the compressor type
for better serializer compile cache.

Args:
compressor: The compressor to check

Returns:
The compressor or a TypeEqualMetaCompressor wrapper
"""
# Check if the compressor has custom __eq__ and __hash__ methods
# by comparing with the default object methods
if compressor.__class__.__eq__ == object.__eq__ or compressor.__class__.__hash__ == object.__hash__:
return TypeEqualMetaCompressor(compressor)
return compressor


class TypeEqualMetaCompressor(MetaCompressor):
"""
A MetaCompressor wrapper which compare equality by the compressor type for better
serializer compile cache.
"""

def __init__(self, compressor: MetaCompressor):
"""
Initialize with the wrapped compressor.

Args:
compressor: The compressor to wrap
"""
self.compressor = compressor

def compress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""Delegate compression to the wrapped compressor."""
return self.compressor.compress(data, offset, size)

def decompress(self, data: bytes, offset: int = 0, size: Optional[int] = None) -> bytes:
"""Delegate decompression to the wrapped compressor."""
return self.compressor.decompress(data, offset, size)

def __eq__(self, other) -> bool:
"""Check equality based on compressor class type."""
if other is None or not isinstance(other, TypeEqualMetaCompressor):
return False
return self.compressor.__class__ == other.compressor.__class__

def __hash__(self) -> int:
"""Return hash code based on compressor class type."""
return hash(self.compressor.__class__)
160 changes: 160 additions & 0 deletions python/pyfory/meta/test_meta_compressor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest
import threading
from pyfory.meta.meta_compressor import MetaCompressor, DeflaterMetaCompressor, TypeEqualMetaCompressor, check_meta_compressor


@pytest.fixture
def compressor():
"""Fixture providing a DeflaterMetaCompressor instance."""
return DeflaterMetaCompressor()


@pytest.fixture
def test_data():
"""Fixture providing test data for compression tests."""
return b"This is some test data that should be compressed and decompressed correctly"


class TestDeflaterMetaCompressor:
def test_compress_decompress(self, compressor, test_data):
"""Test that compression and decompression work correctly."""
compressed = compressor.compress(test_data)
decompressed = compressor.decompress(compressed)

assert decompressed == test_data
assert len(compressed) < len(test_data)

def test_compress_decompress_with_offset(self, compressor, test_data):
"""Test compression and decompression with offset."""
offset = 5
size = 20
compressed = compressor.compress(test_data, offset, size)
decompressed = compressor.decompress(compressed)

assert decompressed == test_data[offset : offset + size]

def test_compress_decompress_empty_data(self, compressor):
"""Test compression and decompression with empty data."""
empty_data = b""
compressed = compressor.compress(empty_data)
decompressed = compressor.decompress(compressed)

assert decompressed == empty_data

def test_compress_decompress_small_data(self, compressor):
"""Test compression and decompression with small data."""
small_data = b"abc"
compressed = compressor.compress(small_data)
decompressed = compressor.decompress(compressed)

assert decompressed == small_data

def test_equality_and_hash(self, compressor):
"""Test equality and hash methods."""
compressor1 = DeflaterMetaCompressor()
compressor2 = DeflaterMetaCompressor()

# Test equality
assert compressor1 == compressor2
assert compressor1 == compressor1 # Same instance

# Test hash
assert hash(compressor1) == hash(compressor2)
assert hash(compressor1) == hash(DeflaterMetaCompressor)

def test_thread_safety(self):
"""Test that the compressor is thread safe (basic test)."""
results = []
errors = []

def compress_worker():
try:
compressor = DeflaterMetaCompressor()
for i in range(100):
data = f"Test data {i}".encode("utf-8")
compressed = compressor.compress(data)
decompressed = compressor.decompress(compressed)
if decompressed != data:
errors.append(f"Mismatch at iteration {i}")
results.append("success")
except Exception as e:
errors.append(str(e))

threads = []
for _ in range(4):
thread = threading.Thread(target=compress_worker)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

assert len(results) == 4
assert len(errors) == 0


class TestTypeEqualMetaCompressor:
def test_wrapper_functionality(self):
"""Test that TypeEqualMetaCompressor correctly wraps another compressor."""
original_compressor = DeflaterMetaCompressor()
wrapped_compressor = TypeEqualMetaCompressor(original_compressor)

test_data = b"Test data for wrapper"
compressed = wrapped_compressor.compress(test_data)
decompressed = wrapped_compressor.decompress(compressed)

assert decompressed == test_data

def test_equality_by_type(self):
"""Test that TypeEqualMetaCompressor compares equality by type."""
compressor1 = DeflaterMetaCompressor()
wrapped1 = TypeEqualMetaCompressor(compressor1)
wrapped2 = TypeEqualMetaCompressor(compressor1)

# Should be equal because they wrap the same type
assert wrapped1 == wrapped2
assert hash(wrapped1) == hash(wrapped2)


class TestCheckMetaCompressor:
def test_check_meta_compressor_with_proper_implementation(self):
"""Test check_meta_compressor with a compressor that has proper __eq__/__hash__."""
compressor = DeflaterMetaCompressor()
result = check_meta_compressor(compressor)

# Should return the original compressor since it has proper __eq__/__hash__
assert result is compressor

def test_check_meta_compressor_without_proper_implementation(self):
"""Test check_meta_compressor with a compressor that lacks proper __eq__/__hash__."""

class SimpleCompressor(MetaCompressor):
def compress(self, data, offset=0, size=None):
return data

def decompress(self, data, offset=0, size=None):
return data

compressor = SimpleCompressor()
result = check_meta_compressor(compressor)

# Should return a TypeEqualMetaCompressor wrapper
assert isinstance(result, TypeEqualMetaCompressor)
assert result.compressor is compressor
Loading