Skip to content

Commit 4f67003

Browse files
authored
fix(DB replication): Prevent replica lag issues in SDK views (#6009)
1 parent 642527f commit 4f67003

17 files changed

Lines changed: 663 additions & 518 deletions

File tree

api/app/routers.py

Lines changed: 17 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,161 +1,40 @@
1-
import logging
2-
import random
3-
from enum import Enum
1+
from typing import Any, Literal
42

5-
from django.conf import settings
6-
from django.core.cache import cache
7-
from django.db import connections
3+
from django.db.models import Model
84

9-
from .exceptions import ImproperlyConfiguredError
10-
11-
logger = logging.getLogger(__name__)
12-
13-
CONNECTION_CHECK_CACHE_TTL = 2
14-
15-
16-
class ReplicaReadStrategy(Enum):
17-
DISTRIBUTED = "DISTRIBUTED"
18-
SEQUENTIAL = "SEQUENTIAL"
19-
20-
21-
def connection_check(database: str) -> bool:
22-
try:
23-
conn = connections.create_connection(database)
24-
conn.connect()
25-
usable = conn.is_usable()
26-
if not usable:
27-
logger.warning(
28-
f"Unable to access database {database} during connection check"
29-
)
30-
except Exception:
31-
usable = False
32-
logger.error(
33-
"Encountered exception during connection",
34-
exc_info=True,
35-
)
36-
37-
if usable:
38-
cache.set(
39-
f"db_connection_active.{database}", "online", CONNECTION_CHECK_CACHE_TTL
40-
)
41-
else:
42-
cache.set(
43-
f"db_connection_active.{database}", "offline", CONNECTION_CHECK_CACHE_TTL
44-
)
45-
46-
return usable
47-
48-
49-
class PrimaryReplicaRouter:
50-
def db_for_read(self, model, **hints): # type: ignore[no-untyped-def]
51-
if settings.NUM_DB_REPLICAS == 0:
52-
return "default"
53-
54-
replicas = [f"replica_{i}" for i in range(1, settings.NUM_DB_REPLICAS + 1)]
55-
replica = self._get_replica(replicas)
56-
if replica:
57-
# This return is the most likely as replicas should be
58-
# online and properly functioning.
59-
return replica
60-
61-
# Since no replicas are available, fall back to the cross
62-
# region replicas which have worse availability.
63-
cross_region_replicas = [
64-
f"cross_region_replica_{i}"
65-
for i in range(1, settings.NUM_CROSS_REGION_DB_REPLICAS + 1)
66-
]
67-
68-
cross_region_replica = self._get_replica(cross_region_replicas)
69-
if cross_region_replica:
70-
return cross_region_replica
71-
72-
# No available replicas, so fallback to the default.
73-
logger.warning(
74-
"Unable to serve any available replicas, falling back to default database"
75-
)
76-
return "default"
77-
78-
def db_for_write(self, model, **hints): # type: ignore[no-untyped-def]
79-
return "default"
80-
81-
def allow_relation(self, obj1, obj2, **hints): # type: ignore[no-untyped-def]
82-
"""
83-
Relations between objects are allowed if both objects are
84-
in the primary/replica pool.
85-
"""
86-
db_set = {
87-
"default",
88-
*[f"replica_{i}" for i in range(1, settings.NUM_DB_REPLICAS + 1)],
89-
*[
90-
f"cross_region_replica_{i}"
91-
for i in range(1, settings.NUM_CROSS_REGION_DB_REPLICAS + 1)
92-
],
93-
}
94-
if obj1._state.db in db_set and obj2._state.db in db_set:
95-
return True
96-
return None
97-
98-
def allow_migrate(self, db, app_label, model_name=None, **hints): # type: ignore[no-untyped-def]
99-
return db == "default"
100-
101-
def _get_replica(self, replicas: list[str]) -> None | str: # type: ignore[return]
102-
while replicas:
103-
if settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.DISTRIBUTED:
104-
database = random.choice(replicas)
105-
elif settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.SEQUENTIAL:
106-
database = replicas[0]
107-
else:
108-
raise ImproperlyConfiguredError(
109-
f"Unknown REPLICA_READ_STRATEGY {settings.REPLICA_READ_STRATEGY}"
110-
)
111-
112-
replicas.remove(database)
113-
db_cache = cache.get(f"db_connection_active.{database}")
114-
if db_cache == "online":
115-
return database
116-
if db_cache == "offline":
117-
continue
118-
119-
if connection_check(database):
120-
return database
5+
AnalyticsDatabaseName = Literal["analytics"]
1216

1227

1238
class AnalyticsRouter:
1249
route_app_labels = ["app_analytics"]
12510

126-
def db_for_read(self, model, **hints): # type: ignore[no-untyped-def]
127-
"""
128-
Attempts to read analytics models go to 'analytics' database.
129-
"""
11+
def db_for_read(
12+
self, model: type[Model], **hints: Any
13+
) -> AnalyticsDatabaseName | None:
14+
"""Route read queries to the 'analytics' database"""
13015
if model._meta.app_label in self.route_app_labels:
13116
return "analytics"
13217
return None
13318

134-
def db_for_write(self, model, **hints): # type: ignore[no-untyped-def]
135-
"""
136-
Attempts to write analytics models go to 'analytics' database.
137-
"""
19+
def db_for_write(
20+
self, model: type[Model], **hints: Any
21+
) -> AnalyticsDatabaseName | None:
22+
"""Route write queries to the 'analytics' database"""
13823
if model._meta.app_label in self.route_app_labels:
13924
return "analytics"
14025
return None
14126

142-
def allow_relation(self, obj1, obj2, **hints): # type: ignore[no-untyped-def]
143-
"""
144-
Relations between objects are allowed if both objects are
145-
in the analytics database.
146-
"""
27+
def allow_relation(self, obj1: Model, obj2: Model, **hints: Any) -> bool | None:
28+
"""Allow relations between analytics models"""
14729
if (
14830
obj1._meta.app_label in self.route_app_labels
14931
and obj2._meta.app_label in self.route_app_labels
15032
):
15133
return True
15234
return None
15335

154-
def allow_migrate(self, db, app_label, model_name=None, **hints): # type: ignore[no-untyped-def]
155-
"""
156-
Make sure the analytics app only appears in the 'analytics' database
157-
"""
158-
if app_label in self.route_app_labels:
159-
if db != "default":
160-
return db == "analytics"
36+
def allow_migrate(self, db: str, app_label: str, **hints: Any) -> bool | None:
37+
"""Ensure the analytics database only gets analytics models"""
38+
if db == "analytics":
39+
return app_label in self.route_app_labels
16140
return None

api/app/settings/common.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import django_stubs_ext
2121
import prometheus_client
2222
import pytz
23+
from common.core import ReplicaReadStrategy
2324
from corsheaders.defaults import default_headers # type: ignore[import-untyped]
2425
from django.core.exceptions import ImproperlyConfigured
2526
from django.core.management.utils import get_random_secret_key
2627
from environs import Env
2728
from task_processor.task_run_method import TaskRunMethod
2829

29-
from app.routers import ReplicaReadStrategy
3030
from app.utils import get_numbered_env_vars_with_prefix
3131
from app_analytics.constants import TRACK_HEADERS
3232
from environments.enums import EnvironmentDocumentCacheMode
@@ -172,9 +172,7 @@
172172
db_conn_max_age = env.int("DJANGO_DB_CONN_MAX_AGE", 60)
173173
DJANGO_DB_CONN_MAX_AGE = 0 if db_conn_max_age == -1 else db_conn_max_age
174174

175-
DATABASE_ROUTERS = ["app.routers.PrimaryReplicaRouter"]
176-
NUM_DB_REPLICAS = 0
177-
NUM_CROSS_REGION_DB_REPLICAS = 0
175+
DATABASE_ROUTERS: list[str] = []
178176
# Allows collectstatic to run without a database, mainly for Docker builds to collectstatic at build time
179177
if "DATABASE_URL" in os.environ:
180178
DATABASES = {
@@ -193,7 +191,6 @@
193191
if not os.getenv("REPLICA_DATABASE_URL_0")
194192
else get_numbered_env_vars_with_prefix("REPLICA_DATABASE_URL_")
195193
)
196-
NUM_DB_REPLICAS = len(REPLICA_DATABASE_URLS)
197194

198195
# Cross region replica databases are used as fallbacks if the
199196
# primary replica set becomes unavailable.
@@ -210,7 +207,6 @@
210207
if not os.getenv("CROSS_REGION_REPLICA_DATABASE_URL_0")
211208
else get_numbered_env_vars_with_prefix("CROSS_REGION_REPLICA_DATABASE_URL_")
212209
)
213-
NUM_CROSS_REGION_DB_REPLICAS = len(CROSS_REGION_REPLICA_DATABASE_URLS)
214210

215211
# DISTRIBUTED spreads the load out across replicas while
216212
# SEQUENTIAL only falls back once the first replica connection is faulty
@@ -272,7 +268,7 @@
272268
TASK_PROCESSOR_DATABASE_PORT = env("TASK_PROCESSOR_DATABASE_PORT", default="")
273269
TASK_PROCESSOR_DATABASE_NAME = env("TASK_PROCESSOR_DATABASE_NAME", default="")
274270

275-
if TASK_PROCESSOR_DATABASE_URL or TASK_PROCESSOR_DATABASE_NAME: # pragma: no cover
271+
if TASK_PROCESSOR_DATABASE_URL or TASK_PROCESSOR_DATABASE_NAME:
276272
if TASK_PROCESSOR_DATABASE_URL:
277273
DATABASES["task_processor"] = dj_database_url.parse(
278274
TASK_PROCESSOR_DATABASE_URL,
@@ -685,7 +681,7 @@
685681
LOGGING["loggers"][""]["handlers"].append("azure")
686682

687683
ENABLE_DB_LOGGING = env.bool("DJANGO_ENABLE_DB_LOGGING", default=False)
688-
if ENABLE_DB_LOGGING: # pragma: no cover
684+
if ENABLE_DB_LOGGING:
689685
if not DEBUG:
690686
warnings.warn("Setting DEBUG=True to ensure DB logging functions correctly.")
691687
DEBUG = True
@@ -777,7 +773,7 @@
777773
warnings.warn(
778774
"Ignoring CACHE_ENVIRONMENT_DOCUMENT_SECONDS variable "
779775
'since CACHE_ENVIRONMENT_DOCUMENT_MODE == "PERSISTENT"'
780-
) # pragma: no cover
776+
)
781777

782778
USER_THROTTLE_CACHE_NAME = "user-throttle"
783779
USER_THROTTLE_CACHE_BACKEND = env.str(
@@ -1046,7 +1042,7 @@
10461042
# Additional functionality for using SAML in Flagsmith SaaS
10471043
SAML_INSTALLED = importlib.util.find_spec("saml") is not None
10481044

1049-
if SAML_INSTALLED: # pragma: no cover
1045+
if SAML_INSTALLED:
10501046
SAML_REQUESTS_CACHE_LOCATION = "saml_requests_cache"
10511047
CACHES[SAML_REQUESTS_CACHE_LOCATION] = {
10521048
"BACKEND": "django.core.cache.backends.db.DatabaseCache",
@@ -1074,7 +1070,7 @@
10741070
importlib.util.find_spec("release_pipelines_logic") is not None
10751071
)
10761072

1077-
if RELEASE_PIPELINES_LOGIC_INSTALLED: # pragma: no cover
1073+
if RELEASE_PIPELINES_LOGIC_INSTALLED:
10781074
INSTALLED_APPS.append("release_pipelines_logic")
10791075

10801076

@@ -1283,7 +1279,7 @@
12831279
LDAP_AUTH_URL = env.str("LDAP_AUTH_URL", None)
12841280
LDAP_ENABLED = LDAP_INSTALLED and LDAP_AUTH_URL
12851281

1286-
if LDAP_ENABLED: # pragma: no cover
1282+
if LDAP_ENABLED:
12871283
AUTHENTICATION_BACKENDS.insert(0, "django_python3_ldap.auth.LDAPBackend")
12881284
INSTALLED_APPS.append("flagsmith_ldap")
12891285

@@ -1369,7 +1365,7 @@
13691365
)
13701366

13711367
FEATURE_VALUE_LIMIT = env.int("FEATURE_VALUE_LIMIT", default=20_000)
1372-
if not 0 <= FEATURE_VALUE_LIMIT <= 2000000: # pragma: no cover
1368+
if not 0 <= FEATURE_VALUE_LIMIT <= 2000000:
13731369
raise ImproperlyConfigured(
13741370
"FEATURE_VALUE_LIMIT must be between 0 and 2,000,000 (2MB)."
13751371
)
@@ -1453,7 +1449,7 @@
14531449

14541450
LICENSING_INSTALLED = importlib.util.find_spec("licensing") is not None
14551451

1456-
if LICENSING_INSTALLED: # pragma: no cover
1452+
if LICENSING_INSTALLED:
14571453
INSTALLED_APPS.append("licensing")
14581454

14591455
PROMETHEUS_ENABLED = env.bool("PROMETHEUS_ENABLED", False)

api/app_analytics/analytics_db_service.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import date, datetime, timedelta
22

33
import structlog
4+
from common.core.utils import using_database_replica
45
from dateutil.relativedelta import relativedelta
56
from django.conf import settings
67
from django.db.models import Q, Sum
@@ -38,9 +39,11 @@ def get_usage_data(
3839
period: PeriodType | None = None,
3940
labels_filter: Labels | None = None,
4041
) -> list[UsageData]:
41-
sub_cache = OrganisationSubscriptionInformationCache.objects.filter(
42-
organisation=organisation
43-
).first()
42+
sub_cache = (
43+
using_database_replica(OrganisationSubscriptionInformationCache.objects)
44+
.filter(organisation=organisation)
45+
.first()
46+
)
4447

4548
date_start, date_stop = _get_start_date_and_stop_date_for_subscribed_organisation(
4649
sub_cache=sub_cache,
@@ -92,12 +95,12 @@ def get_usage_data_from_local_db(
9295
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
9396
)
9497
if project_id:
95-
environment_ids = Environment.objects.filter(project_id=project_id).values_list(
96-
"id", flat=True
98+
# Evaluate the queryset because the analytics database has no environments table
99+
environment_ids = list(
100+
using_database_replica(Environment.objects)
101+
.filter(project_id=project_id)
102+
.values_list("id", flat=True)
97103
)
98-
# evaluate the queryset because analytics db does not have
99-
# access to environment/project table
100-
environment_ids = list(environment_ids)
101104
qs = qs.filter(environment_id__in=environment_ids)
102105

103106
if environment_id:
@@ -198,17 +201,12 @@ def get_feature_evaluation_data_from_local_db(
198201

199202

200203
def _get_environment_ids_for_org(organisation: Organisation) -> list[int]:
201-
# We need to do this to prevent Django from generating a query that
202-
# references the environments and projects tables,
203-
# as they do not exist in the analytics database.
204-
return [
205-
*Environment.objects.filter(
206-
project__organisation=organisation,
207-
).values_list(
208-
"id",
209-
flat=True,
210-
)
211-
]
204+
# Evaluate the queryset because the analytics database has no environments table
205+
return list(
206+
using_database_replica(Environment.objects)
207+
.filter(project__organisation=organisation)
208+
.values_list("id", flat=True)
209+
)
212210

213211

214212
def _get_start_date_and_stop_date_for_subscribed_organisation(

api/app_analytics/serializers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import TYPE_CHECKING, Any, get_args
22

3+
from common.core.utils import using_database_replica
34
from django.conf import settings
45
from rest_framework import serializers
56

@@ -84,11 +85,13 @@ class SDKAnalyticsFlagsSerializer(serializers.Serializer): # type: ignore[type-
8485
def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
8586
request = self.context["request"]
8687
environment_feature_names = set(
87-
FeatureState.objects.filter(
88+
using_database_replica(FeatureState.objects)
89+
.filter(
8890
environment=request.environment,
8991
feature_segment=None,
9092
identity=None,
91-
).values_list("feature__name", flat=True)
93+
)
94+
.values_list("feature__name", flat=True)
9295
)
9396
return {
9497
"evaluations": [

0 commit comments

Comments
 (0)