Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion api/app_analytics/analytics_db_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from collections import defaultdict
from datetime import datetime, timedelta

import structlog
from common.core.utils import using_database_replica
from common.core.utils import is_saas, using_database_replica
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.db.models import Q, Sum
Expand Down Expand Up @@ -122,6 +123,41 @@ def get_usage_data_from_local_db(
return map_annotated_api_usage_buckets_to_usage_data(qs)


def get_top_organisations_from_local_db(
date_start: datetime,
) -> dict[int, int]:
"""
Return a mapping of organisation ID to total API call count from the
Postgres analytics database, for all organisations with usage since
``date_start``. Non-SaaS deployments only.
"""
if is_saas():
raise RuntimeError("Must not run in SaaS mode")

environment_id_to_organisation_id: dict[int, int] = dict(
using_database_replica(Environment.objects).values_list(
"id", "project__organisation_id"
)
)

usage_per_environment = (
APIUsageBucket.objects.filter(
created_at__gte=date_start,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
)
.values("environment_id")
.annotate(total=Sum("total_count"))
)

calls_per_organisation: defaultdict[int, int] = defaultdict(int)
for row in usage_per_environment:
organisation_id = environment_id_to_organisation_id.get(row["environment_id"])
if organisation_id is not None:
calls_per_organisation[organisation_id] += row["total"]

return dict(calls_per_organisation)


def get_total_events_count(
organisation: Organisation,
date_start: datetime | None = None,
Expand Down
27 changes: 20 additions & 7 deletions api/organisations/subscription_info_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.conf import settings
from django.utils import timezone

from app_analytics.analytics_db_service import get_top_organisations_from_local_db
from app_analytics.influxdb_wrapper import get_top_organisations

from .chargebee import get_subscription_metadata_from_id # type: ignore[attr-defined]
Expand All @@ -15,7 +16,7 @@
]


def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, ...]): # type: ignore[no-untyped-def]
def update_caches(*update_cache_entities: SubscriptionCacheEntity) -> None:
"""
Update the cache objects for an update_cache_entity in the database.
"""
Expand All @@ -32,8 +33,14 @@ def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, .
for org in organisations
}

if SubscriptionCacheEntity.INFLUX in update_cache_entities:
_update_caches_with_influx_data(organisation_info_cache_dict)
if (
SubscriptionCacheEntity.API_USAGE in update_cache_entities
# NOTE: SubscriptionCacheEntity.INFLUX is superseded, but must live
# forever for the sake of task processor continuity during version updates.
# TODO: https://github.com/Flagsmith/flagsmith/pull/7024
or SubscriptionCacheEntity.INFLUX in update_cache_entities
Comment thread
emyller marked this conversation as resolved.
):
_update_caches_with_api_usage_data(organisation_info_cache_dict)

if SubscriptionCacheEntity.CHARGEBEE in update_cache_entities:
_update_caches_with_chargebee_data(organisations, organisation_info_cache_dict)
Expand Down Expand Up @@ -64,14 +71,17 @@ def update_caches(update_cache_entities: typing.Tuple[SubscriptionCacheEntity, .
)


def _update_caches_with_influx_data(
def _update_caches_with_api_usage_data(
organisation_info_cache_dict: OrganisationSubscriptionInformationCacheDict,
) -> None:
"""
Mutates the provided organisation_info_cache_dict in place to add information about the organisation's
influx usage.
API usage, sourced from either Postgres or InfluxDB.
"""
if not settings.INFLUXDB_TOKEN:
use_postgres = settings.USE_POSTGRES_FOR_ANALYTICS
use_influx = bool(settings.INFLUXDB_TOKEN)

if not use_postgres and not use_influx:
return

for _date_start, limit in (("-30d", ""), ("-7d", ""), ("-24h", "100")):
Expand All @@ -85,7 +95,10 @@ def _update_caches_with_influx_data(
else:
assert False, "Expecting either days (d) or hours (h)" # pragma: no cover

org_calls = get_top_organisations(date_start, limit)
if use_postgres:
org_calls = get_top_organisations_from_local_db(date_start)
else:
org_calls = get_top_organisations(date_start, limit)

covered_orgs = set()

Expand Down
3 changes: 2 additions & 1 deletion api/organisations/subscriptions/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@


class SubscriptionCacheEntity(Enum):
INFLUX = "INFLUX"
INFLUX = "INFLUX" # Deprecated — use API_USAGE.
API_USAGE = "API_USAGE"
CHARGEBEE = "CHARGEBEE"


Expand Down
22 changes: 7 additions & 15 deletions api/organisations/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,32 +82,24 @@ def send_org_subscription_cancelled_alert(
)


@register_recurring_task(
run_every=timedelta(hours=6),
)
def update_organisation_subscription_information_influx_cache_recurring(): # type: ignore[no-untyped-def]
"""
We're redefining the task function here to register a recurring task
since the decorators don't stack correctly. (TODO)
"""
update_organisation_subscription_information_influx_cache() # pragma: no cover
@register_recurring_task(run_every=timedelta(hours=6))
def update_organisation_subscription_information_cache_recurring() -> None:
update_organisation_subscription_information_cache()


@register_task_handler()
def update_organisation_subscription_information_influx_cache(): # type: ignore[no-untyped-def]
subscription_info_cache.update_caches((SubscriptionCacheEntity.INFLUX,))
def update_organisation_subscription_information_api_usage_cache() -> None:
subscription_info_cache.update_caches(SubscriptionCacheEntity.API_USAGE)


@register_task_handler(timeout=timedelta(minutes=5))
def update_organisation_subscription_information_cache() -> None:
subscription_info_cache.update_caches(
(SubscriptionCacheEntity.CHARGEBEE, SubscriptionCacheEntity.INFLUX)
SubscriptionCacheEntity.CHARGEBEE, SubscriptionCacheEntity.API_USAGE
)


@register_recurring_task(
run_every=timedelta(hours=12),
)
@register_recurring_task(run_every=timedelta(hours=12))
def finish_subscription_cancellation() -> None:
now = timezone.now()
previously = now + timedelta(hours=-24)
Expand Down
2 changes: 1 addition & 1 deletion api/sales_dashboard/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
),
path(
"update-organisation-subscription-information-influx-cache",
views.trigger_update_organisation_subscription_information_influx_cache,
views.trigger_update_organisation_subscription_information_api_usage_cache,
name="update-organisation-subscription-information-influx-cache",
),
path(
Expand Down
6 changes: 3 additions & 3 deletions api/sales_dashboard/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
UserOrganisation,
)
from organisations.tasks import (
update_organisation_subscription_information_api_usage_cache,
update_organisation_subscription_information_cache,
update_organisation_subscription_information_influx_cache,
)
from projects.models import Project
from users.models import FFAdminUser
Expand Down Expand Up @@ -352,8 +352,8 @@ def download_org_data(request, organisation_id): # type: ignore[no-untyped-def]


@staff_member_required() # type: ignore[misc]
def trigger_update_organisation_subscription_information_influx_cache(request): # type: ignore[no-untyped-def]
update_organisation_subscription_information_influx_cache.delay()
def trigger_update_organisation_subscription_information_api_usage_cache(request): # type: ignore[no-untyped-def]
update_organisation_subscription_information_api_usage_cache.delay()
return HttpResponseRedirect(reverse("sales_dashboard:index"))


Expand Down
81 changes: 81 additions & 0 deletions api/tests/unit/app_analytics/test_analytics_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from app_analytics.analytics_db_service import (
get_feature_evaluation_data,
get_feature_evaluation_data_from_local_db,
get_top_organisations_from_local_db,
get_total_events_count,
get_usage_data,
get_usage_data_from_local_db,
Expand Down Expand Up @@ -731,3 +732,83 @@ def test_get_usage_data__previous_billing_period__passes_correct_date_range(
date_stop=datetime(2022, 12, 30, 9, 9, 47, 325132, tzinfo=UTC),
labels_filter=None,
)


@pytest.mark.use_analytics_db
@pytest.mark.freeze_time("2023-01-19T09:09:47.325132+00:00")
def test_get_top_organisations_from_local_db__with_data__returns_correct_mapping(
organisation: Organisation,
environment: Environment,
settings: SettingsWrapper,
) -> None:
# Given
now = timezone.now()
read_bucket_size = 15
settings.ANALYTICS_BUCKET_SIZE = read_bucket_size
date_start = now - timedelta(days=30)

for i in range(3):
APIUsageBucket.objects.create(
environment_id=environment.id,
resource=Resource.FLAGS,
total_count=100,
bucket_size=read_bucket_size,
created_at=now - timedelta(days=i),
)

# When
result = get_top_organisations_from_local_db(date_start)

# Then
assert result == {organisation.id: 300}


@pytest.mark.use_analytics_db
@pytest.mark.freeze_time("2023-01-19T09:09:47.325132+00:00")
def test_get_top_organisations_from_local_db__buckets_before_date_start__excluded(
organisation: Organisation,
environment: Environment,
settings: SettingsWrapper,
) -> None:
# Given
now = timezone.now()
read_bucket_size = 15
settings.ANALYTICS_BUCKET_SIZE = read_bucket_size
date_start = now - timedelta(days=7)

# Bucket within range
APIUsageBucket.objects.create(
environment_id=environment.id,
resource=Resource.FLAGS,
total_count=50,
bucket_size=read_bucket_size,
created_at=now - timedelta(days=3),
)
# Bucket outside range (before date_start)
APIUsageBucket.objects.create(
environment_id=environment.id,
resource=Resource.FLAGS,
total_count=200,
bucket_size=read_bucket_size,
created_at=now - timedelta(days=10),
)

# When
result = get_top_organisations_from_local_db(date_start)

# Then
assert result == {organisation.id: 50}


def test_get_top_organisations_from_local_db__saas_mode__raises_runtime_error(
mocker: MockerFixture,
) -> None:
# Given
mocker.patch(
"app_analytics.analytics_db_service.is_saas",
return_value=True,
)

# When / Then
with pytest.raises(RuntimeError, match="Must not run in SaaS mode"):
get_top_organisations_from_local_db(timezone.now() - timedelta(days=30))
Loading
Loading