Skip to content

Commit c95fccb

Browse files
fix: replace OFFSET pagination with keyset pagination in identity migration (#6877)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 0e0e649 commit c95fccb

File tree

4 files changed

+121
-87
lines changed

4 files changed

+121
-87
lines changed

api/environments/dynamodb/migrator.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from collections.abc import Iterator
2+
13
from django.db.models import Prefetch
24

35
from edge_api.identities.events import send_migration_event
@@ -6,7 +8,6 @@
68
from features.models import FeatureState
79
from features.multivariate.models import MultivariateFeatureStateValue
810
from projects.models import Project
9-
from util.queryset import iterator_with_prefetch
1011

1112
from .types import DynamoProjectMetadata, ProjectIdentityMigrationStatus
1213
from .wrappers import (
@@ -17,6 +18,53 @@
1718

1819

1920
class IdentityMigrator:
21+
@staticmethod
22+
def iter_identities_in_chunks(
23+
project_id: int, chunk_size: int = 2000
24+
) -> Iterator[Identity]:
25+
"""
26+
Yield identities in fixed-size chunks using keyset pagination.
27+
28+
We don't use Django's built-in QuerySet.iterator() here because
29+
it uses server-side cursors (DECLARE/FETCH), which plan the
30+
entire result set upfront and can hit statement_timeout on
31+
large tables.
32+
33+
Each chunk issues a WHERE pk > last_pk ORDER BY pk LIMIT N query,
34+
which is O(1) via pk index seek regardless of table size.
35+
"""
36+
identities_qs = (
37+
Identity.objects.filter(environment__project__id=project_id)
38+
.select_related("environment")
39+
.prefetch_related(
40+
"identity_traits",
41+
Prefetch(
42+
"identity_features",
43+
queryset=FeatureState.objects.select_related(
44+
"feature", "feature_state_value"
45+
),
46+
),
47+
Prefetch(
48+
"identity_features__multivariate_feature_state_values",
49+
queryset=MultivariateFeatureStateValue.objects.select_related(
50+
"multivariate_feature_option"
51+
),
52+
),
53+
)
54+
)
55+
queryset = identities_qs.order_by("pk")
56+
last_pk = None
57+
58+
while True:
59+
chunk_qs = (
60+
queryset.filter(pk__gt=last_pk) if last_pk is not None else queryset
61+
)
62+
chunk = list(chunk_qs[:chunk_size])
63+
if not chunk:
64+
break
65+
yield from chunk
66+
last_pk = chunk[-1].pk
67+
2068
def __init__(self, project_id): # type: ignore[no-untyped-def]
2169
self.project_metadata = DynamoProjectMetadata.get_or_new(project_id)
2270

@@ -63,24 +111,5 @@ def migrate(self): # type: ignore[no-untyped-def]
63111
identity_wrapper = DynamoIdentityWrapper(
64112
environment_wrapper=environment_wrapper
65113
)
66-
identities = (
67-
Identity.objects.filter(environment__project__id=project_id)
68-
.select_related("environment")
69-
.prefetch_related(
70-
"identity_traits",
71-
Prefetch(
72-
"identity_features",
73-
queryset=FeatureState.objects.select_related(
74-
"feature", "feature_state_value"
75-
),
76-
),
77-
Prefetch(
78-
"identity_features__multivariate_feature_state_values",
79-
queryset=MultivariateFeatureStateValue.objects.select_related(
80-
"multivariate_feature_option"
81-
),
82-
),
83-
)
84-
)
85-
identity_wrapper.write_identities(iterator_with_prefetch(identities)) # type: ignore[no-untyped-call]
114+
identity_wrapper.write_identities(self.iter_identities_in_chunks(project_id))
86115
self.project_metadata.finish_identity_migration() # type: ignore[no-untyped-call]

api/tests/unit/environments/dynamodb/test_unit_migrator.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
from pytest_django.asserts import assertQuerySetEqual as assert_queryset_equal
2+
from pytest_django.fixtures import DjangoAssertNumQueries
23

34
from environments.dynamodb.migrator import IdentityMigrator
45
from environments.dynamodb.types import (
56
DynamoProjectMetadata,
67
ProjectIdentityMigrationStatus,
78
)
89
from environments.identities.models import Identity
10+
from environments.identities.traits.models import Trait
911
from environments.models import Environment, EnvironmentAPIKey
12+
from projects.models import Project
1013

1114

1215
def test_migrate_calls_internal_methods_with_correct_arguments( # type: ignore[no-untyped-def]
@@ -48,7 +51,7 @@ def test_migrate_calls_internal_methods_with_correct_arguments( # type: ignore[
4851
assert kwargs == {}
4952

5053
assert_queryset_equal(
51-
args[0], Identity.objects.filter(environment__project__id=project.id)
54+
list(args[0]), Identity.objects.filter(environment__project__id=project.id)
5255
)
5356
# and
5457
args, kwargs = mocked_environment_wrapper.return_value.write_environments.call_args
@@ -168,3 +171,70 @@ def test_get_migration_status_returns_correct_migraion_status_for_in_progress_mi
168171
# Then
169172
assert status == ProjectIdentityMigrationStatus.MIGRATION_IN_PROGRESS
170173
mocked_project_metadata.get_or_new.assert_called_with(project_id)
174+
175+
176+
def test_iter_identities_in_chunks__multiple_chunks__yields_all_in_pk_order(
177+
project: Project,
178+
environment: Environment,
179+
) -> None:
180+
# Given
181+
identities = [
182+
Identity.objects.create(identifier=f"identity_{i}", environment=environment)
183+
for i in range(5)
184+
]
185+
for identity in identities:
186+
Trait.objects.create(
187+
identity=identity,
188+
trait_key="key",
189+
value_type="unicode",
190+
string_value="val",
191+
)
192+
193+
# When
194+
result = list(IdentityMigrator.iter_identities_in_chunks(project.id, chunk_size=2))
195+
196+
# Then
197+
expected_pks = sorted(i.pk for i in identities)
198+
assert [i.pk for i in result] == expected_pks
199+
assert len(result) == 5
200+
201+
202+
def test_iter_identities_in_chunks__preserves_prefetch_related(
203+
project: Project,
204+
environment: Environment,
205+
django_assert_num_queries: DjangoAssertNumQueries,
206+
) -> None:
207+
# Given
208+
identities = [
209+
Identity.objects.create(identifier=f"identity_{i}", environment=environment)
210+
for i in range(3)
211+
]
212+
for identity in identities:
213+
Trait.objects.create(
214+
identity=identity,
215+
trait_key="key",
216+
value_type="unicode",
217+
string_value="val",
218+
)
219+
220+
result = list(IdentityMigrator.iter_identities_in_chunks(project.id))
221+
222+
# When — accessing prefetched relations should cause no additional queries.
223+
with django_assert_num_queries(0):
224+
for identity in result:
225+
list(identity.identity_traits.all())
226+
227+
228+
def test_iter_identities_in_chunks__empty_queryset__yields_nothing(
229+
project: Project,
230+
environment: Environment,
231+
django_assert_num_queries: DjangoAssertNumQueries,
232+
) -> None:
233+
# Given — no identities created.
234+
235+
# When
236+
with django_assert_num_queries(1):
237+
result = list(IdentityMigrator.iter_identities_in_chunks(project.id))
238+
239+
# Then
240+
assert result == []

api/tests/unit/util/test_queryset.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

api/util/queryset.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

0 commit comments

Comments
 (0)