Skip to content

Commit 4e4d835

Browse files
authored
feat: Support gzip compression for DynamoDB environment documents (#6816)
1 parent 800ac8d commit 4e4d835

17 files changed

Lines changed: 611 additions & 40 deletions

File tree

api/app/settings/common.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -986,21 +986,21 @@
986986
DEFAULT_ORG_STORE_TRAITS_VALUE = env.bool("DEFAULT_ORG_STORE_TRAITS_VALUE", True)
987987

988988
# DynamoDB table name for storing environment
989-
ENVIRONMENTS_TABLE_NAME_DYNAMO = env.str("ENVIRONMENTS_TABLE_NAME_DYNAMO", None)
989+
ENVIRONMENTS_TABLE_NAME_DYNAMO = env.str("ENVIRONMENTS_TABLE_NAME_DYNAMO", "")
990990

991991
# V2 was created to improve storage over overrides data.
992-
ENVIRONMENTS_V2_TABLE_NAME_DYNAMO = env.str("ENVIRONMENTS_V2_TABLE_NAME_DYNAMO", None)
992+
ENVIRONMENTS_V2_TABLE_NAME_DYNAMO = env.str("ENVIRONMENTS_V2_TABLE_NAME_DYNAMO", "")
993993

994994
# DynamoDB table name for storing identities
995-
IDENTITIES_TABLE_NAME_DYNAMO = env.str("IDENTITIES_TABLE_NAME_DYNAMO", None)
995+
IDENTITIES_TABLE_NAME_DYNAMO = env.str("IDENTITIES_TABLE_NAME_DYNAMO", "")
996996

997997
# DynamoDB table name for storing environment api keys
998998
ENVIRONMENTS_API_KEY_TABLE_NAME_DYNAMO = env.str(
999-
"ENVIRONMENTS_API_KEY_TABLE_NAME_DYNAMO", None
999+
"ENVIRONMENTS_API_KEY_TABLE_NAME_DYNAMO", ""
10001000
)
10011001

10021002
# DynamoDB table name for storing project metadata(currently only used for identity migration)
1003-
PROJECT_METADATA_TABLE_NAME_DYNAMO = env.str("PROJECT_METADATA_TABLE_NAME_DYNAMO", None)
1003+
PROJECT_METADATA_TABLE_NAME_DYNAMO = env.str("PROJECT_METADATA_TABLE_NAME_DYNAMO", "")
10041004

10051005
# Front end environment variables
10061006
API_URL = env("API_URL", default="/api/v1/")

api/environments/dynamodb/constants.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,30 @@
88

99
DYNAMODB_MAX_BATCH_WRITE_ITEM_COUNT = 25
1010
IDENTITIES_PAGINATION_LIMIT = 1000
11+
12+
# DynamoDB max item size is 400 KB (409,600 bytes).
13+
DOCUMENT_SIZE_HISTOGRAM_BUCKETS = (
14+
1_000,
15+
5_000,
16+
10_000,
17+
25_000,
18+
50_000,
19+
100_000,
20+
200_000,
21+
300_000,
22+
409_600,
23+
)
24+
25+
COMPRESSION_RATIO_HISTOGRAM_BUCKETS = (
26+
0.05,
27+
0.1,
28+
0.15,
29+
0.2,
30+
0.25,
31+
0.3,
32+
0.4,
33+
0.5,
34+
0.6,
35+
0.8,
36+
1.0,
37+
)

api/environments/dynamodb/utils.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,33 @@
1+
import json
2+
from collections.abc import Mapping
3+
from decimal import Decimal
4+
from typing import Any
5+
6+
7+
def estimate_document_size(document: Mapping[str, Any]) -> int:
8+
"""Estimate the size of a DynamoDB item in bytes.
9+
10+
Uses compact JSON serialization with type-aware encoding
11+
for Decimal (numeric) and bytes (binary) values.
12+
"""
13+
return len(
14+
json.dumps(document, default=_json_default, separators=(",", ":")).encode()
15+
)
16+
17+
18+
def _json_default(obj: object) -> Any:
19+
if isinstance(obj, bytes):
20+
# Binary values: use raw byte length as a placeholder string
21+
# to approximate DynamoDB Binary (B) type storage.
22+
return "0" * len(obj)
23+
if isinstance(obj, Decimal):
24+
# Convert to native numeric types for compact JSON serialization.
25+
if obj == obj.to_integral_value():
26+
return int(obj)
27+
return float(obj)
28+
return str(obj)
29+
30+
131
def get_environments_v2_identity_override_document_key(
232
feature_id: int | None = None,
333
identity_uuid: str | None = None,

api/environments/dynamodb/wrappers/base.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
DynamoDBOutput = QueryOutputTableTypeDef | ScanOutputTableTypeDef
1919

20-
P = typing.ParamSpec("P")
21-
2220
# Avoid `decimal.Rounded` when reading large numbers
2321
# See https://github.com/boto/boto3/issues/2500
2422
boto3.dynamodb.types.DYNAMODB_CONTEXT = Context(prec=100)
2523

2624

2725
class BaseDynamoWrapper:
28-
table_name: str = None # type: ignore[assignment]
26+
table_name: str = ""
2927

3028
def __init__(self) -> None:
3129
self._table: typing.Optional["Table"] = None
@@ -39,29 +37,30 @@ def table(self) -> typing.Optional["Table"]:
3937
def get_table_name(self) -> str:
4038
return self.table_name
4139

42-
def get_table(self) -> typing.Optional["Table"]: # type: ignore[return]
40+
def get_table(self) -> "Table | None":
4341
if table_name := self.get_table_name():
4442
return boto3.resource("dynamodb", config=Config(tcp_keepalive=True)).Table(
4543
table_name
4644
)
45+
return None
4746

4847
@property
4948
def is_enabled(self) -> bool:
5049
return self.table is not None
5150

52-
def _iter_all_items( # type: ignore[valid-type]
51+
def _iter_all_items(
5352
self,
54-
response_getter_method: "typing.Callable[[P], DynamoDBOutput]", # type: ignore[valid-type]
55-
**kwargs: "P.kwargs",
53+
response_getter_method: "typing.Callable[..., DynamoDBOutput]",
54+
**kwargs: typing.Any,
5655
) -> typing.Generator[dict[str, "TableAttributeValueTypeDef"], None, None]:
5756
response_getter = partial(response_getter_method, **kwargs)
5857
set_context(
5958
"dynamodb",
60-
{"table_name": self.table_name, **kwargs},
59+
{"table_name": self.get_table_name(), **kwargs},
6160
)
6261

6362
while True:
64-
query_response = response_getter() # type: ignore[call-arg]
63+
query_response = response_getter()
6564

6665
for item in query_response["Items"]:
6766
yield item
@@ -73,17 +72,19 @@ def _iter_all_items( # type: ignore[valid-type]
7372
response_getter.keywords["ExclusiveStartKey"] = last_evaluated_key
7473
set_context(
7574
"dynamodb",
76-
{"table_name": self.table_name, **response_getter.keywords},
75+
{"table_name": self.get_table_name(), **response_getter.keywords},
7776
)
7877

7978
def scan_iter_all_items(
8079
self,
8180
**kwargs: typing.Any,
8281
) -> typing.Generator[dict[str, "TableAttributeValueTypeDef"], None, None]:
83-
return self._iter_all_items(self.table.scan, **kwargs) # type: ignore[arg-type,union-attr]
82+
assert self.table is not None
83+
return self._iter_all_items(self.table.scan, **kwargs)
8484

8585
def query_iter_all_items(
8686
self,
8787
**kwargs: typing.Any,
8888
) -> typing.Generator[dict[str, "TableAttributeValueTypeDef"], None, None]:
89-
return self._iter_all_items(self.table.query, **kwargs) # type: ignore[arg-type,union-attr]
89+
assert self.table is not None
90+
return self._iter_all_items(self.table.query, **kwargs)

api/environments/dynamodb/wrappers/environment_api_key_wrapper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414

1515
class DynamoEnvironmentAPIKeyWrapper(BaseDynamoWrapper):
16-
table_name = settings.ENVIRONMENTS_API_KEY_TABLE_NAME_DYNAMO # type: ignore[assignment]
16+
table_name = settings.ENVIRONMENTS_API_KEY_TABLE_NAME_DYNAMO
1717

1818
def write_api_key(self, api_key: "EnvironmentAPIKey"): # type: ignore[no-untyped-def]
1919
self.write_api_keys([api_key])

api/environments/dynamodb/wrappers/environment_wrapper.py

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import abc
12
import typing
23
from typing import Any, Iterable
34

5+
import structlog
46
from boto3.dynamodb.conditions import Key
57
from django.conf import settings
68
from django.core.exceptions import ObjectDoesNotExist
9+
from django.db.models import prefetch_related_objects
710

811
from environments.dynamodb.constants import (
912
DYNAMODB_MAX_BATCH_WRITE_ITEM_COUNT,
@@ -12,9 +15,17 @@
1215
)
1316
from environments.dynamodb.types import IdentityOverridesV2Changeset
1417
from environments.dynamodb.utils import (
18+
estimate_document_size,
1519
get_environments_v2_identity_override_document_key,
1620
)
21+
from environments.metrics import (
22+
flagsmith_dynamo_environment_document_compression_ratio,
23+
flagsmith_dynamo_environment_document_size_bytes,
24+
)
25+
from integrations.flagsmith.client import get_client
1726
from util.mappers import (
27+
map_environment_to_compressed_environment_document,
28+
map_environment_to_compressed_environment_v2_document,
1829
map_environment_to_environment_document,
1930
map_environment_to_environment_v2_document,
2031
map_identity_override_to_identity_override_document,
@@ -27,26 +38,82 @@
2738
from mypy_boto3_dynamodb.type_defs import QueryInputRequestTypeDef
2839

2940
from environments.models import Environment
41+
from util.dataclasses import CompressedEnvironmentDocument
42+
43+
logger = structlog.get_logger("dynamodb")
3044

3145

32-
class BaseDynamoEnvironmentWrapper(BaseDynamoWrapper):
46+
class BaseDynamoEnvironmentWrapper(BaseDynamoWrapper, abc.ABC):
3347
def write_environment(self, environment: "Environment") -> None:
3448
self.write_environments([environment])
3549

3650
def write_environments(self, environments: Iterable["Environment"]) -> None:
37-
raise NotImplementedError()
51+
self._write_environments(environments)
52+
53+
@abc.abstractmethod
54+
def _map_environment_document(
55+
self,
56+
environment: "Environment",
57+
) -> dict[str, Any]: ...
58+
59+
@abc.abstractmethod
60+
def _map_compressed_environment_document(
61+
self,
62+
environment: "Environment",
63+
) -> "CompressedEnvironmentDocument": ...
64+
65+
def _write_environments(self, environments: Iterable["Environment"]) -> None:
66+
flagsmith_client = get_client("local", local_eval=True)
67+
prefetch_related_objects(
68+
environments,
69+
"project__organisation",
70+
"project__organisation__subscription",
71+
)
72+
73+
assert self.table
74+
with self.table.batch_writer() as writer:
75+
for environment in environments:
76+
organisation = environment.project.organisation
77+
if flagsmith_client.get_identity_flags(
78+
organisation.flagsmith_identifier,
79+
traits=organisation.flagsmith_on_flagsmith_api_traits,
80+
).is_feature_enabled("compress_dynamo_documents"):
81+
result = self._map_compressed_environment_document(environment)
82+
writer.put_item(Item=result.document)
83+
84+
flagsmith_dynamo_environment_document_size_bytes.labels(
85+
table=self.get_table_name(),
86+
compressed="true",
87+
).observe(result.compressed_size_bytes)
88+
flagsmith_dynamo_environment_document_compression_ratio.labels(
89+
table=self.get_table_name(),
90+
).observe(result.compression_ratio)
91+
logger.info(
92+
"environment-document-compressed",
93+
environment_id=environment.id,
94+
environment_api_key=environment.api_key,
95+
)
96+
else:
97+
item = self._map_environment_document(environment)
98+
writer.put_item(Item=item)
99+
100+
flagsmith_dynamo_environment_document_size_bytes.labels(
101+
table=self.get_table_name(),
102+
compressed="false",
103+
).observe(estimate_document_size(item))
38104

39105

40106
class DynamoEnvironmentWrapper(BaseDynamoEnvironmentWrapper):
41107
def get_table_name(self) -> str | None: # type: ignore[override]
42108
return settings.ENVIRONMENTS_TABLE_NAME_DYNAMO
43109

44-
def write_environments(self, environments: Iterable["Environment"]): # type: ignore[no-untyped-def]
45-
with self.table.batch_writer() as writer: # type: ignore[union-attr]
46-
for environment in environments:
47-
writer.put_item(
48-
Item=map_environment_to_environment_document(environment),
49-
)
110+
def _map_environment_document(self, environment: "Environment") -> dict[str, Any]:
111+
return map_environment_to_environment_document(environment)
112+
113+
def _map_compressed_environment_document(
114+
self, environment: "Environment"
115+
) -> "CompressedEnvironmentDocument":
116+
return map_environment_to_compressed_environment_document(environment)
50117

51118
def get_item(self, api_key: str) -> dict: # type: ignore[type-arg]
52119
try:
@@ -59,7 +126,7 @@ def delete_environment(self, api_key: str) -> None:
59126

60127

61128
class DynamoEnvironmentV2Wrapper(BaseDynamoEnvironmentWrapper):
62-
def get_table_name(self) -> str | None: # type: ignore[override]
129+
def get_table_name(self) -> str:
63130
return settings.ENVIRONMENTS_V2_TABLE_NAME_DYNAMO
64131

65132
def get_identity_overrides_by_environment_id(
@@ -122,12 +189,14 @@ def update_identity_overrides(
122189
),
123190
)
124191

125-
def write_environments(self, environments: Iterable["Environment"]) -> None:
126-
with self.table.batch_writer() as writer: # type: ignore[union-attr]
127-
for environment in environments:
128-
writer.put_item(
129-
Item=map_environment_to_environment_v2_document(environment),
130-
)
192+
def _map_environment_document(self, environment: "Environment") -> dict[str, Any]:
193+
return map_environment_to_environment_v2_document(environment)
194+
195+
def _map_compressed_environment_document(
196+
self,
197+
environment: "Environment",
198+
) -> "CompressedEnvironmentDocument":
199+
return map_environment_to_compressed_environment_v2_document(environment)
131200

132201
def delete_environment(self, environment_id: int): # type: ignore[no-untyped-def]
133202
environment_id = str(environment_id) # type: ignore[assignment]

api/environments/metrics.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import prometheus_client
22

3+
from environments.dynamodb.constants import (
4+
COMPRESSION_RATIO_HISTOGRAM_BUCKETS,
5+
DOCUMENT_SIZE_HISTOGRAM_BUCKETS,
6+
)
7+
38
CACHE_HIT = "CACHE_HIT"
49
CACHE_MISS = "CACHE_MISS"
510

@@ -8,3 +13,17 @@
813
"Results of cache retrieval for environment document. `result` label is either `hit` or `miss`.",
914
["result"],
1015
)
16+
17+
flagsmith_dynamo_environment_document_size_bytes = prometheus_client.Histogram(
18+
"flagsmith_dynamo_environment_document_size_bytes",
19+
"Size of environment documents written to DynamoDB.",
20+
["table", "compressed"],
21+
buckets=DOCUMENT_SIZE_HISTOGRAM_BUCKETS,
22+
)
23+
24+
flagsmith_dynamo_environment_document_compression_ratio = prometheus_client.Histogram(
25+
"flagsmith_dynamo_environment_document_compression_ratio",
26+
"Compression ratio (compressed_size / uncompressed_size) of environment documents.",
27+
["table"],
28+
buckets=COMPRESSION_RATIO_HISTOGRAM_BUCKETS,
29+
)

0 commit comments

Comments
 (0)