Skip to content

Commit b061135

Browse files
Merge pull request #151 from OpenSPP/worktree-perf+phase9-concurrency
perf: increase job concurrency, add channel routing and identity keys
2 parents 1f4203c + 832c497 commit b061135

13 files changed

Lines changed: 411 additions & 32 deletions

spp_programs/README.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,18 @@ Dependencies
254254
Changelog
255255
=========
256256

257+
19.0.2.0.10
258+
~~~~~~~~~~~
259+
260+
- Increase parallel-safe channel limits (cycle, eligibility_manager,
261+
program_manager) from 1 to 4
262+
- Add serial ``entitlement_approval`` channel (limit=1) for fund balance
263+
safety
264+
- Add serial ``statistics_refresh`` channel (limit=1) to prevent
265+
concurrent refresh storms
266+
- Add ``identity_key`` to async job dispatchers to prevent duplicate
267+
submission on double-click
268+
257269
19.0.2.0.9
258270
~~~~~~~~~~
259271

spp_programs/__manifest__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"name": "OpenSPP Programs",
55
"summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.",
66
"category": "OpenSPP/Core",
7-
"version": "19.0.2.0.9",
7+
"version": "19.0.2.0.10",
88
"sequence": 1,
99
"author": "OpenSPP.org",
1010
"website": "https://github.com/OpenSPP/OpenSPP2",

spp_programs/data/queue_data.xml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
11
<odoo>
22
<record id="limit_cycle" model="queue.limit">
33
<field name="name">cycle</field>
4-
<field name="limit">1</field>
4+
<field name="limit">4</field>
55
<field name="rate_limit">0</field>
66
</record>
77
<record id="limit_eligibility_manager" model="queue.limit">
88
<field name="name">eligibility_manager</field>
9-
<field name="limit">1</field>
9+
<field name="limit">4</field>
1010
<field name="rate_limit">0</field>
1111
</record>
1212
<record id="limit_program_manager" model="queue.limit">
1313
<field name="name">program_manager</field>
14+
<field name="limit">4</field>
15+
<field name="rate_limit">0</field>
16+
</record>
17+
<record id="limit_entitlement_approval" model="queue.limit">
18+
<field name="name">entitlement_approval</field>
19+
<field name="limit">1</field>
20+
<field name="rate_limit">0</field>
21+
</record>
22+
<record id="limit_statistics_refresh" model="queue.limit">
23+
<field name="name">statistics_refresh</field>
1424
<field name="limit">1</field>
1525
<field name="rate_limit">0</field>
1626
</record>

spp_programs/models/managers/cycle_manager_base.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -527,9 +527,14 @@ def _check_eligibility_async(self, cycle, beneficiaries_count):
527527

528528
jobs = []
529529
for min_id, max_id in id_ranges:
530-
jobs.append(self.delayable(channel="cycle")._check_eligibility(cycle, min_id=min_id, max_id=max_id))
530+
jobs.append(
531+
self.delayable(
532+
channel="cycle",
533+
identity_key=f"check_elig_{cycle.id}_{min_id}",
534+
)._check_eligibility(cycle, min_id=min_id, max_id=max_id)
535+
)
531536
main_job = group(*jobs)
532-
main_job.on_done(self.delayable(channel="cycle").mark_check_eligibility_as_done(cycle))
537+
main_job.on_done(self.delayable(channel="statistics_refresh").mark_check_eligibility_as_done(cycle))
533538
main_job.delay()
534539

535540
def _check_eligibility(
@@ -607,10 +612,17 @@ def _prepare_entitlements_async(self, cycle, beneficiaries_count):
607612

608613
jobs = []
609614
for min_id, max_id in id_ranges:
610-
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, min_id=min_id, max_id=max_id))
615+
jobs.append(
616+
self.delayable(
617+
channel="cycle",
618+
identity_key=f"prepare_ent_{cycle.id}_{min_id}",
619+
)._prepare_entitlements(cycle, min_id=min_id, max_id=max_id)
620+
)
611621
main_job = group(*jobs)
612622
main_job.on_done(
613-
self.delayable(channel="cycle").mark_prepare_entitlement_as_done(cycle, _("Entitlement Ready."))
623+
self.delayable(channel="statistics_refresh").mark_prepare_entitlement_as_done(
624+
cycle, _("Entitlement Ready.")
625+
)
614626
)
615627
main_job.delay()
616628

@@ -844,15 +856,20 @@ def _add_beneficiaries_async(self, cycle, beneficiaries, state):
844856
jobs = []
845857
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
846858
jobs.append(
847-
self.delayable(channel="cycle")._add_beneficiaries(
859+
self.delayable(
860+
channel="cycle",
861+
identity_key=f"add_benef_{cycle.id}_{i}",
862+
)._add_beneficiaries(
848863
cycle,
849864
beneficiaries[i : i + self.MAX_ROW_JOB_QUEUE],
850865
state,
851866
)
852867
)
853868

854869
main_job = group(*jobs)
855-
main_job.on_done(self.delayable(channel="cycle").mark_import_as_done(cycle, _("Beneficiary import finished.")))
870+
main_job.on_done(
871+
self.delayable(channel="statistics_refresh").mark_import_as_done(cycle, _("Beneficiary import finished."))
872+
)
856873
main_job.delay()
857874

858875
def _add_beneficiaries(self, cycle, beneficiaries, state="draft", do_count=False):

spp_programs/models/managers/eligibility_manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,13 @@ def _import_registrants_async(self, new_beneficiaries, state="draft"):
155155
jobs = []
156156
for i in range(0, len(new_beneficiaries), 10000):
157157
jobs.append(
158-
self.delayable(channel="eligibility_manager")._import_registrants(
159-
new_beneficiaries[i : i + 10000], state
160-
)
158+
self.delayable(
159+
channel="eligibility_manager",
160+
identity_key=f"import_reg_{program.id}_{i}",
161+
)._import_registrants(new_beneficiaries[i : i + 10000], state)
161162
)
162163
main_job = group(*jobs)
163-
main_job.on_done(self.delayable(channel="eligibility_manager").mark_import_as_done())
164+
main_job.on_done(self.delayable(channel="statistics_refresh").mark_import_as_done())
164165
main_job.delay()
165166

166167
def mark_import_as_done(self):

spp_programs/models/managers/entitlement_manager_base.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements):
8989
jobs = []
9090
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
9191
jobs.append(
92-
self.delayable()._set_pending_validation_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE])
92+
self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements(
93+
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
94+
)
9395
)
9496
main_job = group(*jobs)
9597
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation.")))
@@ -137,7 +139,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count):
137139

138140
jobs = []
139141
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
140-
jobs.append(self.delayable()._validate_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
142+
jobs.append(
143+
self.delayable(channel="entitlement_approval")._validate_entitlements(
144+
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
145+
)
146+
)
141147
main_job = group(*jobs)
142148
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
143149
main_job.delay()
@@ -197,7 +203,11 @@ def _cancel_entitlements_async(self, cycle, entitlements, entitlements_count):
197203

198204
jobs = []
199205
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
200-
jobs.append(self.delayable()._cancel_entitlements(entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
206+
jobs.append(
207+
self.delayable(channel="entitlement_approval")._cancel_entitlements(
208+
entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
209+
)
210+
)
201211
main_job = group(*jobs)
202212
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Cancelled.")))
203213
main_job.delay()

spp_programs/models/managers/entitlement_manager_cash.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,11 @@ def _validate_entitlements_async(self, cycle, entitlements, entitlements_count):
319319
jobs = []
320320
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
321321
# Needs to override
322-
jobs.append(self.delayable()._validate_entitlements(cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE]))
322+
jobs.append(
323+
self.delayable(channel="entitlement_approval")._validate_entitlements(
324+
cycle, entitlements[i : i + self.MAX_ROW_JOB_QUEUE]
325+
)
326+
)
323327
main_job = group(*jobs)
324328
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
325329
main_job.delay()

spp_programs/models/managers/entitlement_manager_inkind.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,9 @@ def _set_pending_validation_entitlements_async(self, cycle, entitlements_count):
216216
jobs = []
217217
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
218218
jobs.append(
219-
self.delayable()._set_pending_validation_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
219+
self.delayable(channel="entitlement_approval")._set_pending_validation_entitlements(
220+
cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE
221+
)
220222
)
221223
main_job = group(*jobs)
222224
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Set to Pending Validation.")))
@@ -315,7 +317,11 @@ def _validate_entitlements_async(self, cycle, entitlements_count):
315317

316318
jobs = []
317319
for i in range(0, entitlements_count, self.MAX_ROW_JOB_QUEUE):
318-
jobs.append(self.delayable()._validate_entitlements(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE))
320+
jobs.append(
321+
self.delayable(channel="entitlement_approval")._validate_entitlements(
322+
cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE
323+
)
324+
)
319325
main_job = group(*jobs)
320326
main_job.on_done(self.delayable().mark_job_as_done(cycle, _("Entitlements Validated and Approved.")))
321327
main_job.delay()

spp_programs/models/managers/program_manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,13 @@ def _enroll_eligible_registrants_async(self, states, members_count):
199199
jobs = []
200200
for min_id, max_id in id_ranges:
201201
jobs.append(
202-
self.delayable(channel="program_manager")._enroll_eligible_registrants(
203-
states, min_id=min_id, max_id=max_id
204-
)
202+
self.delayable(
203+
channel="program_manager",
204+
identity_key=f"enroll_eligible_{program.id}_{min_id}",
205+
)._enroll_eligible_registrants(states, min_id=min_id, max_id=max_id)
205206
)
206207
main_job = group(*jobs)
207-
main_job.on_done(self.delayable(channel="program_manager").mark_enroll_eligible_as_done())
208+
main_job.on_done(self.delayable(channel="statistics_refresh").mark_enroll_eligible_as_done())
208209
main_job.delay()
209210

210211
def _enroll_eligible_registrants(self, states, offset=0, limit=None, min_id=None, max_id=None, do_count=False):

spp_programs/readme/HISTORY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
### 19.0.2.0.10
2+
3+
- Increase parallel-safe channel limits (cycle, eligibility_manager, program_manager) from 1 to 4
4+
- Add serial `entitlement_approval` channel (limit=1) for fund balance safety
5+
- Add serial `statistics_refresh` channel (limit=1) to prevent concurrent refresh storms
6+
- Add `identity_key` to async job dispatchers to prevent duplicate submission on double-click
7+
18
### 19.0.2.0.9
29

310
- Add context flags (`skip_registrant_statistics`, `skip_program_statistics`) to suppress expensive computed field recomputation during bulk operations

0 commit comments

Comments
 (0)