Skip to content

Commit b018bfb

Browse files
fix(API Usage): Update subscription cache from database-backed usage (#7024)
Co-authored-by: Matthew Elwell <matthew.elwell@flagsmith.com>
1 parent 02618c6 commit b018bfb

File tree

10 files changed

+338
-38
lines changed

10 files changed

+338
-38
lines changed

api/app_analytics/analytics_db_service.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
from collections import defaultdict
12
from datetime import datetime, timedelta
23

34
import structlog
4-
from common.core.utils import using_database_replica
5+
from common.core.utils import is_saas, using_database_replica
56
from dateutil.relativedelta import relativedelta
67
from django.conf import settings
78
from django.db.models import Q, Sum
@@ -122,6 +123,41 @@ def get_usage_data_from_local_db(
122123
return map_annotated_api_usage_buckets_to_usage_data(qs)
123124

124125

126+
def get_top_organisations_from_local_db(
127+
date_start: datetime,
128+
) -> dict[int, int]:
129+
"""
130+
Return a mapping of organisation ID to total API call count from the
131+
Postgres analytics database, for all organisations with usage since
132+
``date_start``. Non-SaaS deployments only.
133+
"""
134+
if is_saas():
135+
raise RuntimeError("Must not run in SaaS mode")
136+
137+
environment_id_to_organisation_id: dict[int, int] = dict(
138+
using_database_replica(Environment.objects).values_list(
139+
"id", "project__organisation_id"
140+
)
141+
)
142+
143+
usage_per_environment = (
144+
APIUsageBucket.objects.filter(
145+
created_at__gte=date_start,
146+
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
147+
)
148+
.values("environment_id")
149+
.annotate(total=Sum("total_count"))
150+
)
151+
152+
calls_per_organisation: defaultdict[int, int] = defaultdict(int)
153+
for row in usage_per_environment:
154+
organisation_id = environment_id_to_organisation_id.get(row["environment_id"])
155+
if organisation_id is not None:
156+
calls_per_organisation[organisation_id] += row["total"]
157+
158+
return dict(calls_per_organisation)
159+
160+
125161
def get_total_events_count(
126162
organisation: Organisation,
127163
date_start: datetime | None = None,

api/organisations/subscription_info_cache.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from django.conf import settings
55
from django.utils import timezone
66

7+
from app_analytics.analytics_db_service import get_top_organisations_from_local_db
78
from app_analytics.influxdb_wrapper import get_top_organisations
89

910
from .chargebee import get_subscription_metadata_from_id # type: ignore[attr-defined]
@@ -15,7 +16,7 @@
1516
]
1617

1718

18-
def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, ...]): # type: ignore[no-untyped-def]
19+
def update_caches(*update_cache_entities: SubscriptionCacheEntity) -> None:
1920
"""
2021
Update the cache objects for an update_cache_entity in the database.
2122
"""
@@ -32,8 +33,14 @@ def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, .
3233
for org in organisations
3334
}
3435

35-
if SubscriptionCacheEntity.INFLUX in update_cache_entities:
36-
_update_caches_with_influx_data(organisation_info_cache_dict)
36+
if (
37+
SubscriptionCacheEntity.API_USAGE in update_cache_entities
38+
# NOTE: SubscriptionCacheEntity.INFLUX is superseded, but must live
39+
# forever for the sake of task processor continuity during version updates.
40+
# TODO: https://github.com/Flagsmith/flagsmith/pull/7024
41+
or SubscriptionCacheEntity.INFLUX in update_cache_entities
42+
):
43+
_update_caches_with_api_usage_data(organisation_info_cache_dict)
3744

3845
if SubscriptionCacheEntity.CHARGEBEE in update_cache_entities:
3946
_update_caches_with_chargebee_data(organisations, organisation_info_cache_dict)
@@ -64,14 +71,17 @@ def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, .
6471
)
6572

6673

67-
def _update_caches_with_influx_data(
74+
def _update_caches_with_api_usage_data(
6875
organisation_info_cache_dict: OrganisationSubscriptionInformationCacheDict,
6976
) -> None:
7077
"""
7178
Mutates the provided organisation_info_cache_dict in place to add information about the organisation's
72-
influx usage.
79+
API usage, sourced from either Postgres or InfluxDB.
7380
"""
74-
if not settings.INFLUXDB_TOKEN:
81+
use_postgres = settings.USE_POSTGRES_FOR_ANALYTICS
82+
use_influx = bool(settings.INFLUXDB_TOKEN)
83+
84+
if not use_postgres and not use_influx:
7585
return
7686

7787
for _date_start, limit in (("-30d", ""), ("-7d", ""), ("-24h", "100")):
@@ -85,7 +95,10 @@ def _update_caches_with_influx_data(
8595
else:
8696
assert False, "Expecting either days (d) or hours (h)" # pragma: no cover
8797

88-
org_calls = get_top_organisations(date_start, limit)
98+
if use_postgres:
99+
org_calls = get_top_organisations_from_local_db(date_start)
100+
else:
101+
org_calls = get_top_organisations(date_start, limit)
89102

90103
covered_orgs = set()
91104

api/organisations/subscriptions/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151

5252

5353
class SubscriptionCacheEntity(Enum):
54-
INFLUX = "INFLUX"
54+
INFLUX = "INFLUX" # Deprecated — use API_USAGE.
55+
API_USAGE = "API_USAGE"
5556
CHARGEBEE = "CHARGEBEE"
5657

5758

api/organisations/tasks.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,32 +82,24 @@ def send_org_subscription_cancelled_alert(
8282
)
8383

8484

85-
@register_recurring_task(
86-
run_every=timedelta(hours=6),
87-
)
88-
def update_organisation_subscription_information_influx_cache_recurring(): # type: ignore[no-untyped-def]
89-
"""
90-
We're redefining the task function here to register a recurring task
91-
since the decorators don't stack correctly. (TODO)
92-
"""
93-
update_organisation_subscription_information_influx_cache() # pragma: no cover
85+
@register_recurring_task(run_every=timedelta(hours=6))
86+
def update_organisation_subscription_information_cache_recurring() -> None:
87+
update_organisation_subscription_information_cache()
9488

9589

9690
@register_task_handler()
97-
def update_organisation_subscription_information_influx_cache(): # type: ignore[no-untyped-def]
98-
subscription_info_cache.update_caches((SubscriptionCacheEntity.INFLUX,))
91+
def update_organisation_subscription_information_api_usage_cache() -> None:
92+
subscription_info_cache.update_caches(SubscriptionCacheEntity.API_USAGE)
9993

10094

10195
@register_task_handler(timeout=timedelta(minutes=5))
10296
def update_organisation_subscription_information_cache() -> None:
10397
subscription_info_cache.update_caches(
104-
(SubscriptionCacheEntity.CHARGEBEE, SubscriptionCacheEntity.INFLUX)
98+
SubscriptionCacheEntity.CHARGEBEE, SubscriptionCacheEntity.API_USAGE
10599
)
106100

107101

108-
@register_recurring_task(
109-
run_every=timedelta(hours=12),
110-
)
102+
@register_recurring_task(run_every=timedelta(hours=12))
111103
def finish_subscription_cancellation() -> None:
112104
now = timezone.now()
113105
previously = now + timedelta(hours=-24)

api/sales_dashboard/urls.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
),
5050
path(
5151
"update-organisation-subscription-information-influx-cache",
52-
views.trigger_update_organisation_subscription_information_influx_cache,
52+
views.trigger_update_organisation_subscription_information_api_usage_cache,
5353
name="update-organisation-subscription-information-influx-cache",
5454
),
5555
path(

api/sales_dashboard/views.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
UserOrganisation,
4040
)
4141
from organisations.tasks import (
42+
update_organisation_subscription_information_api_usage_cache,
4243
update_organisation_subscription_information_cache,
43-
update_organisation_subscription_information_influx_cache,
4444
)
4545
from projects.models import Project
4646
from users.models import FFAdminUser
@@ -352,8 +352,8 @@ def download_org_data(request, organisation_id): # type: ignore[no-untyped-def]
352352

353353

354354
@staff_member_required() # type: ignore[misc]
355-
def trigger_update_organisation_subscription_information_influx_cache(request): # type: ignore[no-untyped-def]
356-
update_organisation_subscription_information_influx_cache.delay()
355+
def trigger_update_organisation_subscription_information_api_usage_cache(request): # type: ignore[no-untyped-def]
356+
update_organisation_subscription_information_api_usage_cache.delay()
357357
return HttpResponseRedirect(reverse("sales_dashboard:index"))
358358

359359

api/tests/unit/app_analytics/test_analytics_db_service.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from app_analytics.analytics_db_service import (
1010
get_feature_evaluation_data,
1111
get_feature_evaluation_data_from_local_db,
12+
get_top_organisations_from_local_db,
1213
get_total_events_count,
1314
get_usage_data,
1415
get_usage_data_from_local_db,
@@ -731,3 +732,83 @@ def test_get_usage_data__previous_billing_period__passes_correct_date_range(
731732
date_stop=datetime(2022, 12, 30, 9, 9, 47, 325132, tzinfo=UTC),
732733
labels_filter=None,
733734
)
735+
736+
737+
@pytest.mark.use_analytics_db
738+
@pytest.mark.freeze_time("2023-01-19T09:09:47.325132+00:00")
739+
def test_get_top_organisations_from_local_db__with_data__returns_correct_mapping(
740+
organisation: Organisation,
741+
environment: Environment,
742+
settings: SettingsWrapper,
743+
) -> None:
744+
# Given
745+
now = timezone.now()
746+
read_bucket_size = 15
747+
settings.ANALYTICS_BUCKET_SIZE = read_bucket_size
748+
date_start = now - timedelta(days=30)
749+
750+
for i in range(3):
751+
APIUsageBucket.objects.create(
752+
environment_id=environment.id,
753+
resource=Resource.FLAGS,
754+
total_count=100,
755+
bucket_size=read_bucket_size,
756+
created_at=now - timedelta(days=i),
757+
)
758+
759+
# When
760+
result = get_top_organisations_from_local_db(date_start)
761+
762+
# Then
763+
assert result == {organisation.id: 300}
764+
765+
766+
@pytest.mark.use_analytics_db
767+
@pytest.mark.freeze_time("2023-01-19T09:09:47.325132+00:00")
768+
def test_get_top_organisations_from_local_db__buckets_before_date_start__excluded(
769+
organisation: Organisation,
770+
environment: Environment,
771+
settings: SettingsWrapper,
772+
) -> None:
773+
# Given
774+
now = timezone.now()
775+
read_bucket_size = 15
776+
settings.ANALYTICS_BUCKET_SIZE = read_bucket_size
777+
date_start = now - timedelta(days=7)
778+
779+
# Bucket within range
780+
APIUsageBucket.objects.create(
781+
environment_id=environment.id,
782+
resource=Resource.FLAGS,
783+
total_count=50,
784+
bucket_size=read_bucket_size,
785+
created_at=now - timedelta(days=3),
786+
)
787+
# Bucket outside range (before date_start)
788+
APIUsageBucket.objects.create(
789+
environment_id=environment.id,
790+
resource=Resource.FLAGS,
791+
total_count=200,
792+
bucket_size=read_bucket_size,
793+
created_at=now - timedelta(days=10),
794+
)
795+
796+
# When
797+
result = get_top_organisations_from_local_db(date_start)
798+
799+
# Then
800+
assert result == {organisation.id: 50}
801+
802+
803+
def test_get_top_organisations_from_local_db__saas_mode__raises_runtime_error(
804+
mocker: MockerFixture,
805+
) -> None:
806+
# Given
807+
mocker.patch(
808+
"app_analytics.analytics_db_service.is_saas",
809+
return_value=True,
810+
)
811+
812+
# When / Then
813+
with pytest.raises(RuntimeError, match="Must not run in SaaS mode"):
814+
get_top_organisations_from_local_db(timezone.now() - timedelta(days=30))

0 commit comments

Comments
 (0)