Skip to content

Commit 0027c7b

Browse files
committed
perf: replace OFFSET pagination with ID-range batching in async jobs
OFFSET N causes PostgreSQL to scan and discard N rows, making later batches progressively slower. This replaces all async job dispatchers with NTILE-based ID-range batching that uses WHERE id BETWEEN min_id AND max_id, which is O(1) via the primary key index.
1 parent 7f5517d commit 0027c7b

9 files changed

Lines changed: 542 additions & 28 deletions

File tree

spp_programs/__manifest__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"name": "OpenSPP Programs",
55
"summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.",
66
"category": "OpenSPP/Core",
7-
"version": "19.0.2.0.6",
7+
"version": "19.0.2.0.7",
88
"sequence": 1,
99
"author": "OpenSPP.org",
1010
"website": "https://github.com/OpenSPP/OpenSPP2",

spp_programs/models/cycle.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,9 @@ def _get_beneficiaries_domain(self, states=None):
614614
return domain
615615

616616
@api.model
617-
def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False, last_id=None):
617+
def get_beneficiaries(
618+
self, state, offset=0, limit=None, order=None, count=False, last_id=None, min_id=None, max_id=None
619+
):
618620
"""
619621
Get beneficiaries by state with pagination support.
620622
@@ -624,9 +626,12 @@ def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False
624626
:param order: Sort order
625627
:param count: If True, return count instead of records
626628
:param last_id: For cursor-based pagination - ID of last record from previous batch (more efficient)
629+
:param min_id: For ID-range pagination - minimum record ID (inclusive)
630+
:param max_id: For ID-range pagination - maximum record ID (inclusive)
627631
:return: Recordset or count
628632
629-
Note: For large datasets, use cursor-based pagination with last_id parameter instead of offset.
633+
Note: For large datasets, prefer min_id/max_id (ID-range) or last_id (cursor)
634+
pagination over offset-based pagination.
630635
"""
631636
if isinstance(state, str):
632637
state = [state]
@@ -635,7 +640,12 @@ def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False
635640
if count:
636641
return self.env["spp.cycle.membership"].search_count(domain, limit=limit)
637642

638-
# Use cursor-based pagination if last_id is provided (more efficient)
643+
# ID-range pagination (best for parallel job dispatch)
644+
if min_id is not None and max_id is not None:
645+
domain = domain + [("id", ">=", min_id), ("id", "<=", max_id)]
646+
return self.env["spp.cycle.membership"].search(domain, order=order or "id")
647+
648+
# Cursor-based pagination (good for sequential iteration)
639649
if last_id is not None:
640650
domain = domain + [("id", ">", last_id)]
641651
return self.env["spp.cycle.membership"].search(domain, limit=limit, order=order or "id")

spp_programs/models/managers/cycle_manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,19 @@ def mark_prepare_entitlement_as_done(self, cycle, msg):
2323
cycle._compute_total_entitlements_count()
2424
return
2525

26-
def _prepare_entitlements(self, cycle, offset=0, limit=None, do_count=False):
26+
def _prepare_entitlements(self, cycle, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
2727
"""Prepare Entitlements
2828
Get the beneficiaries and generate their entitlements.
2929
3030
:param cycle: The cycle
31-
:param offset: Optional integer value for the ORM search offset
32-
:param limit: Optional integer value for the ORM search limit
31+
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
32+
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
33+
:param min_id: Minimum record ID for ID-range pagination (inclusive)
34+
:param max_id: Maximum record ID for ID-range pagination (inclusive)
3335
:param do_count: Boolean - set to False to not run compute function
3436
:return:
3537
"""
36-
super()._prepare_entitlements(cycle, offset, limit, do_count)
38+
super()._prepare_entitlements(cycle, offset, limit, min_id=min_id, max_id=max_id, do_count=do_count)
3739
if do_count:
3840
# Update Statistics
3941
cycle._compute_inkind_entitlements_count()

spp_programs/models/managers/cycle_manager_base.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from odoo.addons.job_worker.delay import group
1212

1313
from .. import constants
14+
from .pagination_utils import compute_id_ranges
1415

1516
_logger = logging.getLogger(__name__)
1617

@@ -515,21 +516,32 @@ def _check_eligibility_async(self, cycle, beneficiaries_count):
515516
cycle.message_post(body=_("Eligibility check of %s beneficiaries started.", beneficiaries_count))
516517
cycle.write({"is_locked": True, "locked_reason": "Eligibility check of beneficiaries"})
517518

519+
states = ("draft", "enrolled", "not_eligible")
520+
id_ranges = compute_id_ranges(
521+
self.env.cr,
522+
"spp_cycle_membership",
523+
"cycle_id = %s AND state IN %s",
524+
(cycle.id, states),
525+
self.MAX_ROW_JOB_QUEUE,
526+
)
527+
518528
jobs = []
519-
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
520-
jobs.append(
521-
self.delayable(channel="cycle")._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
522-
)
529+
for min_id, max_id in id_ranges:
530+
jobs.append(self.delayable(channel="cycle")._check_eligibility(cycle, min_id=min_id, max_id=max_id))
523531
main_job = group(*jobs)
524532
main_job.on_done(self.delayable(channel="cycle").mark_check_eligibility_as_done(cycle))
525533
main_job.delay()
526534

527-
def _check_eligibility(self, cycle, beneficiaries=None, offset=0, limit=None, do_count=False):
535+
def _check_eligibility(
536+
self, cycle, beneficiaries=None, offset=0, limit=None, min_id=None, max_id=None, do_count=False
537+
):
528538
if beneficiaries is None:
529539
beneficiaries = cycle.get_beneficiaries(
530540
["draft", "enrolled", "not_eligible"],
531541
offset=offset,
532542
limit=limit,
543+
min_id=min_id,
544+
max_id=max_id,
533545
order="id",
534546
)
535547

@@ -585,26 +597,38 @@ def _prepare_entitlements_async(self, cycle, beneficiaries_count):
585597
}
586598
)
587599

600+
id_ranges = compute_id_ranges(
601+
self.env.cr,
602+
"spp_cycle_membership",
603+
"cycle_id = %s AND state IN %s",
604+
(cycle.id, ("enrolled",)),
605+
self.MAX_ROW_JOB_QUEUE,
606+
)
607+
588608
jobs = []
589-
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
590-
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE))
609+
for min_id, max_id in id_ranges:
610+
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, min_id=min_id, max_id=max_id))
591611
main_job = group(*jobs)
592612
main_job.on_done(
593613
self.delayable(channel="cycle").mark_prepare_entitlement_as_done(cycle, _("Entitlement Ready."))
594614
)
595615
main_job.delay()
596616

597-
def _prepare_entitlements(self, cycle, offset=0, limit=None, do_count=False):
617+
def _prepare_entitlements(self, cycle, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
598618
"""Prepare Entitlements
599619
Get the beneficiaries and generate their entitlements.
600620
601621
:param cycle: The cycle
602-
:param offset: Optional integer value for the ORM search offset
603-
:param limit: Optional integer value for the ORM search limit
622+
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
623+
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
624+
:param min_id: Minimum record ID for ID-range pagination (inclusive)
625+
:param max_id: Maximum record ID for ID-range pagination (inclusive)
604626
:param do_count: Boolean - set to False to not run compute function
605627
:return:
606628
"""
607-
beneficiaries = cycle.get_beneficiaries(["enrolled"], offset=offset, limit=limit, order="id")
629+
beneficiaries = cycle.get_beneficiaries(
630+
["enrolled"], offset=offset, limit=limit, min_id=min_id, max_id=max_id, order="id"
631+
)
608632
ent_manager = self.program_id.get_manager(constants.MANAGER_ENTITLEMENT)
609633
if not ent_manager:
610634
raise UserError(_("No Entitlement Manager defined."))
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
2+
"""Keyset pagination utilities for async job dispatch.
3+
4+
OFFSET-based pagination causes PostgreSQL to scan and discard N rows for
5+
OFFSET N, making later batches progressively slower (O(N) per batch).
6+
7+
This module provides ID-range batching using the NTILE window function,
8+
which pre-computes (min_id, max_id) boundaries in a single query. Each
9+
job then uses WHERE id BETWEEN min_id AND max_id, which is O(1) via the
10+
primary key index regardless of batch position.
11+
"""
12+
13+
import math
14+
15+
16+
def compute_id_ranges(cr, table, where_clause, params, batch_size):
17+
"""Compute ID-range boundaries for parallel job dispatch.
18+
19+
Uses PostgreSQL's NTILE window function to split matching rows into
20+
roughly equal-sized buckets, then returns the (min_id, max_id) of each.
21+
22+
:param cr: Database cursor
23+
:param table: Table name (e.g. 'spp_program_membership')
24+
:param where_clause: SQL WHERE clause without 'WHERE' keyword
25+
(e.g. 'program_id = %s AND state IN %s')
26+
:param params: Tuple of parameters for the WHERE clause
27+
:param batch_size: Target number of rows per batch
28+
:return: List of (min_id, max_id) tuples, ordered by min_id
29+
"""
30+
# Get total count to calculate number of batches
31+
cr.execute(
32+
f"SELECT COUNT(*) FROM {table} WHERE {where_clause}", # noqa: S608 # nosec B608
33+
params,
34+
)
35+
total = cr.fetchone()[0]
36+
if total == 0:
37+
return []
38+
39+
num_batches = math.ceil(total / batch_size)
40+
if num_batches <= 1:
41+
cr.execute(
42+
f"SELECT MIN(id), MAX(id) FROM {table} WHERE {where_clause}", # noqa: S608 # nosec B608
43+
params,
44+
)
45+
row = cr.fetchone()
46+
return [(row[0], row[1])]
47+
48+
# Use NTILE to split rows into equal-sized buckets, then get
49+
# the min/max ID of each bucket as the range boundaries.
50+
cr.execute(
51+
f"""
52+
SELECT MIN(id) AS min_id, MAX(id) AS max_id
53+
FROM (
54+
SELECT id, NTILE(%s) OVER (ORDER BY id) AS tile
55+
FROM {table}
56+
WHERE {where_clause}
57+
) sub
58+
GROUP BY tile
59+
ORDER BY min_id
60+
""", # noqa: S608 # nosec B608
61+
(num_batches, *params),
62+
)
63+
return cr.fetchall()

spp_programs/models/managers/program_manager.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from odoo.addons.job_worker.delay import group
99

1010
from ..programs import SPPProgram
11+
from .pagination_utils import compute_id_ranges
1112

1213
_logger = logging.getLogger(__name__)
1314

@@ -184,28 +185,43 @@ def _enroll_eligible_registrants_async(self, states, members_count):
184185
program.message_post(body=_("Eligibility check of %s beneficiaries started.", members_count))
185186
program.write({"is_locked": True, "locked_reason": "Eligibility check of beneficiaries"})
186187

188+
if isinstance(states, str):
189+
states = [states]
190+
191+
id_ranges = compute_id_ranges(
192+
self.env.cr,
193+
"spp_program_membership",
194+
"program_id = %s AND state IN %s",
195+
(program.id, tuple(states)),
196+
self.MAX_ROW_JOB_QUEUE,
197+
)
198+
187199
jobs = []
188-
for i in range(0, members_count, self.MAX_ROW_JOB_QUEUE):
200+
for min_id, max_id in id_ranges:
189201
jobs.append(
190202
self.delayable(channel="program_manager")._enroll_eligible_registrants(
191-
states, i, self.MAX_ROW_JOB_QUEUE
203+
states, min_id=min_id, max_id=max_id
192204
)
193205
)
194206
main_job = group(*jobs)
195207
main_job.on_done(self.delayable(channel="program_manager").mark_enroll_eligible_as_done())
196208
main_job.delay()
197209

198-
def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=False):
210+
def _enroll_eligible_registrants(self, states, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
199211
"""Enroll Eligible Registrants
200212
201213
:param states: List of states to be used in domain filter
202-
:param offset: Optional integer value for the ORM search offset
203-
:param limit: Optional integer value for the ORM search limit
214+
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
215+
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
216+
:param min_id: Minimum record ID for ID-range pagination (inclusive)
217+
:param max_id: Maximum record ID for ID-range pagination (inclusive)
204218
:param do_count: Boolean - set to False to not run compute functions
205219
:return: Integer - count of not enrolled members
206220
"""
207221
program = self.program_id
208-
members = program.get_beneficiaries(state=states, offset=offset, limit=limit, order="id")
222+
members = program.get_beneficiaries(
223+
state=states, offset=offset, limit=limit, min_id=min_id, max_id=max_id, order="id"
224+
)
209225

210226
member_before = members
211227

spp_programs/models/programs.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,9 @@ def get_managers(self, kind):
314314
return [el.manager_ref_id for el in managers]
315315

316316
@api.model
317-
def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=False, last_id=None):
317+
def get_beneficiaries(
318+
self, state=None, offset=0, limit=None, order=None, count=False, last_id=None, min_id=None, max_id=None
319+
):
318320
"""
319321
Get program beneficiaries with pagination support.
320322
@@ -324,9 +326,12 @@ def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=
324326
:param order: Sort order
325327
:param count: If True, return count instead of records
326328
:param last_id: For cursor-based pagination - ID of last record from previous batch (more efficient)
329+
:param min_id: For ID-range pagination - minimum record ID (inclusive)
330+
:param max_id: For ID-range pagination - maximum record ID (inclusive)
327331
:return: Recordset or count
328332
329-
Note: For large datasets, use cursor-based pagination with last_id parameter instead of offset.
333+
Note: For large datasets, prefer min_id/max_id (ID-range) or last_id (cursor)
334+
pagination over offset-based pagination.
330335
"""
331336
self.ensure_one()
332337
if isinstance(state, str):
@@ -337,7 +342,12 @@ def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=
337342
if count:
338343
return self.env["spp.program.membership"].search_count(domain, limit=limit)
339344

340-
# Use cursor-based pagination if last_id is provided (more efficient)
345+
# ID-range pagination (best for parallel job dispatch)
346+
if min_id is not None and max_id is not None:
347+
domain = domain + [("id", ">=", min_id), ("id", "<=", max_id)]
348+
return self.env["spp.program.membership"].search(domain, order=order or "id")
349+
350+
# Cursor-based pagination (good for sequential iteration)
341351
if last_id is not None:
342352
domain = domain + [("id", ">", last_id)]
343353
return self.env["spp.program.membership"].search(domain, limit=limit, order=order or "id")

spp_programs/tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@
3232
from . import test_payment_and_accounting
3333
from . import test_managers
3434
from . import test_cycle_auto_approve_fund_check
35+
from . import test_keyset_pagination

0 commit comments

Comments
 (0)