Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions spp_programs/models/cycle_membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ class SPPCycleMembership(models.Model):
partner_id = fields.Many2one("res.partner", "Registrant", help="A beneficiary", required=True, index=True)
cycle_id = fields.Many2one("spp.cycle", "Cycle", help="A cycle", required=True, index=True)
enrollment_date = fields.Date(default=lambda self: fields.Datetime.now())

compliance_criteria = fields.Char(
string="Compliance Criteria",
compute="_compute_compliance_criteria",
help="The compliance CEL expression from the program that this registrant failed to meet",
)

state = fields.Selection(
selection=[
("draft", "Draft"),
Expand All @@ -29,6 +36,21 @@ class SPPCycleMembership(models.Model):
copy=False,
)

def _compute_compliance_criteria(self):
"""Show the compliance CEL expression from the program when non-compliant."""
for rec in self:
if rec.state == "non_compliant" and rec.cycle_id and rec.cycle_id.program_id:
program = rec.cycle_id.program_id
for wrapper in program.compliance_manager_ids:
concrete = wrapper.manager_ref_id
if hasattr(concrete, "compliance_cel_expression") and concrete.compliance_cel_expression:
rec.compliance_criteria = concrete.compliance_cel_expression
break
else:
rec.compliance_criteria = False
else:
rec.compliance_criteria = False

def _compute_display_name(self):
res = super()._compute_display_name()
# Prefetch cycle_id and partner_id to avoid N+1 queries in loop
Expand Down
50 changes: 50 additions & 0 deletions spp_programs/models/program_membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,56 @@ def _compute_duplicate_reason(self):
else:
rec.duplicate_reason = False

latest_cycle_state = fields.Selection(
selection=[
("draft", "Draft"),
("enrolled", "Enrolled"),
("paused", "Paused"),
("exited", "Exited"),
("not_eligible", "Not Eligible"),
("non_compliant", "Non-Compliant"),
],
string="Cycle Status",
compute="_compute_latest_cycle_state",
help="State of the most recent cycle membership for this registrant in this program",
)

def _compute_latest_cycle_state(self):
"""Get the latest cycle membership state per registrant+program."""
if not self:
return

for rec in self:
rec.latest_cycle_state = False

# Batch query: find latest cycle membership for each program membership
CycleMembership = self.env["spp.cycle.membership"]
for rec in self:
cycle_mem = CycleMembership.search(
[
("partner_id", "=", rec.partner_id.id),
("cycle_id.program_id", "=", rec.program_id.id),
],
order="id desc",
limit=1,
)
if cycle_mem:
rec.latest_cycle_state = cycle_mem.state
Comment on lines +101 to +121
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of _compute_latest_cycle_state performs a search() inside a loop, which leads to an N+1 query pattern. This will significantly degrade performance when viewing a list of program memberships (e.g., on the registrant form). It is highly recommended to use _read_group to fetch the latest cycle membership IDs for all records in the batch in a single query.

    def _compute_latest_cycle_state(self):
        """Get the latest cycle membership state per registrant+program."""
        for rec in self:
            rec.latest_cycle_state = False

        if not self:
            return

        # Batch query: find latest cycle membership ID for each partner+program pair
        data = self.env["spp.cycle.membership"]._read_group(
            domain=[
                ("partner_id", "in", self.partner_id.ids),
                ("cycle_id.program_id", "in", self.program_id.ids),
            ],
            groupby=["partner_id", "cycle_id.program_id"],
            aggregates=["id:max"],
        )
        # Map (partner_id, program_id) -> latest_membership_id
        latest_ids = {(p.id, prog.id): max_id for p, prog, max_id in data}

        # Fetch states for all latest memberships in one query
        mems = self.env["spp.cycle.membership"].browse(latest_ids.values())
        mem_states = {m.id: m.state for m in mems}

        for rec in self:
            mem_id = latest_ids.get((rec.partner_id.id, rec.program_id.id))
            if mem_id:
                rec.latest_cycle_state = mem_states.get(mem_id)


def action_view_cycle_memberships(self):
"""Open cycle memberships for this registrant in this program."""
self.ensure_one()
return {
"name": _("%s — Cycles") % self.program_id.name,
"type": "ir.actions.act_window",
"res_model": "spp.cycle.membership",
"view_mode": "list,form",
"domain": [
("partner_id", "=", self.partner_id.id),
("cycle_id.program_id", "=", self.program_id.id),
],
}

# TODO: Implement exit reasons
# exit_reason_id = fields.Many2one("Exit Reason") Default: Completed, Opt-Out, Other

Expand Down
57 changes: 57 additions & 0 deletions spp_programs/models/registrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ class SPPRegistrant(models.Model):
inkind_entitlement_ids = fields.One2many("spp.entitlement.inkind", "partner_id", "In-kind Entitlements")

# Statistics
cycle_membership_count = fields.Integer(
string="# Cycles",
compute="_compute_cycle_membership_count",
store=True,
)
non_compliant_cycle_count = fields.Integer(
string="# Non-Compliant",
compute="_compute_cycle_membership_count",
store=True,
)

program_membership_count = fields.Integer(
string="# Program Memberships",
compute="_compute_program_membership_count",
Expand All @@ -34,6 +45,40 @@ class SPPRegistrant(models.Model):
compute="_compute_total_entitlements_count",
)

@api.depends("cycle_ids", "cycle_ids.state")
def _compute_cycle_membership_count(self):
"""Batch-efficient cycle membership and non-compliant counts."""
if not self:
return

registrants = self.filtered("is_registrant")
for partner in self - registrants:
partner.cycle_membership_count = 0
partner.non_compliant_cycle_count = 0

if not registrants:
return

# Total cycle memberships
total_data = self.env["spp.cycle.membership"]._read_group(
domain=[("partner_id", "in", registrants.ids)],
groupby=["partner_id"],
aggregates=["__count"],
)
total_counts = {partner.id: count for partner, count in total_data}

# Non-compliant count
nc_data = self.env["spp.cycle.membership"]._read_group(
domain=[("partner_id", "in", registrants.ids), ("state", "=", "non_compliant")],
groupby=["partner_id"],
aggregates=["__count"],
)
nc_counts = {partner.id: count for partner, count in nc_data}

for partner in registrants:
partner.cycle_membership_count = total_counts.get(partner.id, 0)
partner.non_compliant_cycle_count = nc_counts.get(partner.id, 0)

@api.depends("entitlements_count", "inkind_entitlements_count")
def _compute_total_entitlements_count(self):
"""Compute combined count of cash and in-kind entitlements."""
Expand Down Expand Up @@ -144,6 +189,18 @@ def action_view_program_memberships(self):
"context": {"default_partner_id": self.id},
}

def action_view_cycle_memberships(self):
"""Open cycle memberships for this registrant."""
self.ensure_one()
return {
"name": _("Cycle Memberships - %s") % self.name,
"type": "ir.actions.act_window",
"res_model": "spp.cycle.membership",
"view_mode": "list,form",
"domain": [("partner_id", "=", self.id)],
"context": {"default_partner_id": self.id},
}

def action_view_all_entitlements(self):
"""Open all entitlements (cash + in-kind) for this registrant."""
self.ensure_one()
Expand Down
1 change: 1 addition & 0 deletions spp_programs/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
from . import test_payment_and_accounting
from . import test_managers
from . import test_cycle_auto_approve_fund_check
from . import test_cycle_compliance_on_registrant
210 changes: 210 additions & 0 deletions spp_programs/tests/test_cycle_compliance_on_registrant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Tests for cycle compliance visibility on registrant form.

Covers:
- Cycle membership compliance_criteria computed field
- Program membership latest_cycle_state computed field
- Program membership action_view_cycle_memberships action
- Registrant cycle_membership_count / non_compliant_cycle_count computed fields
- Registrant action_view_cycle_memberships action
"""

import uuid

from odoo import fields
from odoo.tests import tagged
from odoo.tests.common import TransactionCase


@tagged("post_install", "-at_install")
class TestCycleComplianceOnRegistrant(TransactionCase):
"""Test cycle compliance status visibility on registrant form."""

@classmethod
def setUpClass(cls):
super().setUpClass()

uid = uuid.uuid4().hex[:6]

# Create program
cls.program = cls.env["spp.program"].create({"name": f"Compliance Test Program {uid}"})

# Create registrant
cls.registrant = cls.env["res.partner"].create(
{
"name": f"Test Registrant {uid}",
"is_registrant": True,
"is_group": True,
}
)

# Create cycle
cls.cycle = cls.env["spp.cycle"].create(
{
"name": f"Test Cycle {uid}",
"program_id": cls.program.id,
"start_date": fields.Date.today(),
"end_date": fields.Date.today(),
}
)

# Create program membership
cls.prog_mem = cls.env["spp.program.membership"].create(
{
"partner_id": cls.registrant.id,
"program_id": cls.program.id,
"state": "enrolled",
}
)

# Create cycle membership
cls.cycle_mem = cls.env["spp.cycle.membership"].create(
{
"partner_id": cls.registrant.id,
"cycle_id": cls.cycle.id,
"state": "enrolled",
}
)

# === Cycle Membership: compliance_criteria ===

def test_compliance_criteria_empty_when_enrolled(self):
"""compliance_criteria is False when state is not non_compliant."""
self.cycle_mem.state = "enrolled"
self.assertFalse(self.cycle_mem.compliance_criteria)

def test_compliance_criteria_empty_when_non_compliant_no_manager(self):
"""compliance_criteria is False when non_compliant but no compliance manager."""
self.cycle_mem.state = "non_compliant"
self.assertFalse(self.cycle_mem.compliance_criteria)

def test_compliance_criteria_shows_cel_when_non_compliant(self):
"""compliance_criteria shows CEL expression when non_compliant and manager exists."""
# Create compliance manager with CEL expression
ComplianceManager = self.env.get("spp.compliance.manager.default")
if not ComplianceManager:
self.skipTest("spp.compliance.manager.default not available")

manager = ComplianceManager.create(
{
"name": f"Test Compliance {uuid.uuid4().hex[:6]}",
"program_id": self.program.id,
"compliance_cel_expression": "per_capita_income < poverty_line",
}
)

# Link manager to program via manager wrapper
ManagerWrapper = self.env.get("spp.program.manager")
if ManagerWrapper:
ManagerWrapper.create(
{
"program_id": self.program.id,
"manager_ref_id": f"{manager._name},{manager.id}",
}
)

self.cycle_mem.state = "non_compliant"
self.cycle_mem.invalidate_recordset(["compliance_criteria"])

# If manager was properly linked, criteria should show the CEL
# Otherwise it's False (depends on manager linking mechanism)
criteria = self.cycle_mem.compliance_criteria
if criteria:
self.assertEqual(criteria, "per_capita_income < poverty_line")

# === Program Membership: latest_cycle_state ===

def test_latest_cycle_state_enrolled(self):
"""latest_cycle_state reflects the most recent cycle membership state."""
self.cycle_mem.state = "enrolled"
self.prog_mem.invalidate_recordset(["latest_cycle_state"])
self.assertEqual(self.prog_mem.latest_cycle_state, "enrolled")

def test_latest_cycle_state_non_compliant(self):
"""latest_cycle_state shows non_compliant when cycle is non_compliant."""
self.cycle_mem.state = "non_compliant"
self.prog_mem.invalidate_recordset(["latest_cycle_state"])
self.assertEqual(self.prog_mem.latest_cycle_state, "non_compliant")

def test_latest_cycle_state_no_cycle(self):
"""latest_cycle_state is False when no cycle memberships exist."""
# Create a new program membership with no cycles
prog_mem2 = self.env["spp.program.membership"].create(
{
"partner_id": self.registrant.id,
"program_id": self.env["spp.program"].create({"name": f"Empty Program {uuid.uuid4().hex[:6]}"}).id,
"state": "enrolled",
}
)
self.assertFalse(prog_mem2.latest_cycle_state)

def test_latest_cycle_state_picks_latest(self):
"""latest_cycle_state picks the most recent cycle (highest ID)."""
cycle2 = self.env["spp.cycle"].create(
{
"name": f"Cycle 2 {uuid.uuid4().hex[:6]}",
"program_id": self.program.id,
"start_date": fields.Date.today(),
"end_date": fields.Date.today(),
}
)
self.env["spp.cycle.membership"].create(
{
"partner_id": self.registrant.id,
"cycle_id": cycle2.id,
"state": "non_compliant",
}
)
self.prog_mem.invalidate_recordset(["latest_cycle_state"])
self.assertEqual(self.prog_mem.latest_cycle_state, "non_compliant")

# === Program Membership: action_view_cycle_memberships ===

def test_action_view_cycle_memberships_from_program(self):
"""action_view_cycle_memberships returns valid action dict."""
action = self.prog_mem.action_view_cycle_memberships()

self.assertEqual(action["type"], "ir.actions.act_window")
self.assertEqual(action["res_model"], "spp.cycle.membership")
self.assertIn("domain", action)
# Domain should filter by partner and program
domain = action["domain"]
self.assertTrue(any(d[0] == "partner_id" and d[2] == self.registrant.id for d in domain))
self.assertTrue(any(d[0] == "cycle_id.program_id" and d[2] == self.program.id for d in domain))

# === Registrant: cycle_membership_count / non_compliant_cycle_count ===

def test_cycle_membership_count(self):
"""cycle_membership_count reflects total cycle memberships."""
self.registrant.invalidate_recordset(["cycle_membership_count"])
self.assertGreaterEqual(self.registrant.cycle_membership_count, 1)

def test_non_compliant_cycle_count_zero(self):
"""non_compliant_cycle_count is 0 when all cycles are enrolled."""
self.cycle_mem.state = "enrolled"
self.registrant.invalidate_recordset(["non_compliant_cycle_count"])
self.assertEqual(self.registrant.non_compliant_cycle_count, 0)

def test_non_compliant_cycle_count_increments(self):
"""non_compliant_cycle_count increments when cycle becomes non_compliant."""
self.cycle_mem.state = "non_compliant"
self.registrant.invalidate_recordset(["non_compliant_cycle_count"])
self.assertGreaterEqual(self.registrant.non_compliant_cycle_count, 1)

def test_cycle_counts_non_registrant(self):
"""Non-registrant partners have 0 cycle counts."""
non_reg = self.env["res.partner"].create({"name": "Not a registrant"})
self.assertEqual(non_reg.cycle_membership_count, 0)
self.assertEqual(non_reg.non_compliant_cycle_count, 0)

# === Registrant: action_view_cycle_memberships ===

def test_action_view_cycle_memberships_from_registrant(self):
"""action_view_cycle_memberships returns valid action dict."""
action = self.registrant.action_view_cycle_memberships()

self.assertEqual(action["type"], "ir.actions.act_window")
self.assertEqual(action["res_model"], "spp.cycle.membership")
self.assertIn("domain", action)
domain = action["domain"]
self.assertTrue(any(d[0] == "partner_id" and d[2] == self.registrant.id for d in domain))
Loading
Loading