Skip to content

Commit 09e2ea0

Browse files
feat: adding max_batch_operations config
1 parent d825052 commit 09e2ea0

2 files changed

Lines changed: 48 additions & 5 deletions

File tree

src/aws_durable_execution_sdk_python/state.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ class CheckpointBatcherConfig:
4545
Attributes:
4646
max_batch_size_bytes: Maximum batch size in bytes (default: 750KB)
4747
max_batch_time_seconds: Maximum time to wait before flushing batch (default: 1.0 second)
48-
max_batch_operations: Maximum number of operations per batch (default: unlimited)
48+
max_batch_operations: Maximum number of operations per batch (default: 250)
4949
"""
5050

51-
max_batch_size_bytes: int = 750 * 1024 # 750KB - private readonly MAX_PAYLOAD_SIZE
52-
max_batch_time_seconds: float = 1.0 # 1 second default
53-
max_batch_operations: int | float = float("inf") # No operation limit by default
51+
max_batch_size_bytes: int = 750 * 1024 # 750KB
52+
max_batch_time_seconds: float = 1.0
53+
max_batch_operations: int = 250
5454

5555

5656
@dataclass(frozen=True)

tests/state_test.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ def test_checkpoint_batcher_config_default_values():
779779

780780
assert config.max_batch_size_bytes == 750 * 1024 # 750KB
781781
assert config.max_batch_time_seconds == 1.0
782-
assert config.max_batch_operations == float("inf")
782+
assert config.max_batch_operations == 250
783783

784784

785785
def test_checkpoint_batcher_config_custom_values():
@@ -803,6 +803,49 @@ def test_checkpoint_batcher_config_immutable():
803803
config.max_batch_size_bytes = 1000
804804

805805

806+
def test_checkpoint_batch_respects_default_max_items_limit():
807+
"""Test that batch collection respects the default MAX_ITEMS_IN_BATCH (250) limit.
808+
809+
This ensures consistency across all Durable Execution SDK implementations.
810+
"""
811+
mock_lambda_client = Mock(spec=LambdaClient)
812+
813+
# Use default config (max_batch_operations=250)
814+
config = CheckpointBatcherConfig(
815+
max_batch_size_bytes=10 * 1024 * 1024, # 10MB - large enough to not trigger size limit
816+
max_batch_time_seconds=10.0,
817+
)
818+
819+
state = ExecutionState(
820+
durable_execution_arn="test_arn",
821+
initial_checkpoint_token="token123", # noqa: S106
822+
operations={},
823+
service_client=mock_lambda_client,
824+
batcher_config=config,
825+
)
826+
827+
# Enqueue 300 small operations (exceeds MAX_ITEMS_IN_BATCH of 250)
828+
for i in range(300):
829+
operation_update = OperationUpdate(
830+
operation_id=f"op_{i}",
831+
operation_type=OperationType.STEP,
832+
action=OperationAction.START,
833+
)
834+
state._checkpoint_queue.put(QueuedOperation(operation_update, None))
835+
836+
# Collect first batch
837+
batch1 = state._collect_checkpoint_batch()
838+
839+
# First batch should have exactly 250 items
840+
assert len(batch1) == 250
841+
842+
# Collect second batch
843+
batch2 = state._collect_checkpoint_batch()
844+
845+
# Second batch should have remaining 50 items
846+
assert len(batch2) == 50
847+
848+
806849
def test_calculate_operation_size_with_operation():
807850
"""Test _calculate_operation_size with a real operation."""
808851
operation_update = OperationUpdate(

0 commit comments

Comments
 (0)