Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions spp_cel_domain/models/cel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def compile_and_preview(
model: str,
expr: str,
limit: int = 50,
offset: int = 0,
fields: list[str] | None = None,
materialize_sql: bool = False,
) -> dict[str, Any]:
Expand Down Expand Up @@ -465,15 +466,23 @@ def compile_and_preview(
preview_ids = [] # Count-only mode
elif limit == 0:
# Use the default from the method signature (50)
preview_recordset = self.env[model].search(final_domain, limit=50)
preview_recordset = self.env[model].search(final_domain, limit=50, offset=offset)
preview_ids = preview_recordset.ids
else:
preview_recordset = self.env[model].search(final_domain, limit=limit)
preview_recordset = self.env[model].search(final_domain, limit=limit, offset=offset)
preview_ids = preview_recordset.ids

# Read full record data if fields requested (for JSON-safe preview)
if preview_ids and fields:
preview_data = preview_recordset.read(fields)
# Replace phone_number_ids with phone for reading, then enrich
read_fields = [f for f in fields if f != "phone_number_ids"]
preview_data = preview_recordset.read(read_fields)
if "phone_number_ids" in fields:
for rec_data in preview_data:
rec_id = rec_data["id"]
partner = preview_recordset.filtered(lambda r, rid=rec_id: r.id == rid)
phones = partner.phone_number_ids.filtered(lambda p: not p.disabled).mapped("phone_no")
rec_data["phone_numbers"] = phones
# Enrich explanation with metrics info if any
metrics_section = ""
if metrics_info:
Expand Down
7 changes: 7 additions & 0 deletions spp_cel_domain/models/cel_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ class CELService(models.AbstractModel):
_name = "spp.cel.service"
_description = "CEL Expression Service"

@api.model
def check_search_access(self):
"""Check if the current user has CEL search access."""
return self.env.user.has_group("spp_cel_registry_search.group_cel_search_user")

@api.model
def compile_expression(
self,
expression,
profile,
base_domain=None,
limit=0,
offset=0,
fields=None,
materialize_sql=False,
):
Expand Down Expand Up @@ -109,6 +115,7 @@ def compile_expression(
root_model,
expanded_expression,
limit=limit,
offset=offset,
fields=fields,
materialize_sql=materialize_sql,
)
Expand Down
78 changes: 78 additions & 0 deletions spp_cel_domain/tests/test_cel_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Adversarial tests for CEL Service - designed to BREAK the service facade."""

from odoo import fields
from odoo.tests import TransactionCase, tagged


Expand Down Expand Up @@ -521,3 +522,80 @@ def test_output_type_string(self):
"""Output type 'string' should be accepted."""
result = self.service.validate_formula_expression('"hello"', "registry_groups", output_type="string")
self.assertTrue(result["valid"], f"Error: {result.get('error')}")

# --- Offset / Pagination Tests ---

def test_compile_expression_offset_returns_different_records(self):
"""compile_expression with offset should return different records."""
for i in range(5):
self.env["res.partner"].create({"name": f"OffsetSvcTest{i}", "is_registrant": True, "is_group": False})
result1 = self.service.compile_expression(
'r.name == "OffsetSvcTest0" || r.name == "OffsetSvcTest1" || '
'r.name == "OffsetSvcTest2" || r.name == "OffsetSvcTest3" || '
'r.name == "OffsetSvcTest4"',
"registry_individuals",
limit=2,
offset=0,
fields=["id", "name"],
)
result2 = self.service.compile_expression(
'r.name == "OffsetSvcTest0" || r.name == "OffsetSvcTest1" || '
'r.name == "OffsetSvcTest2" || r.name == "OffsetSvcTest3" || '
'r.name == "OffsetSvcTest4"',
"registry_individuals",
limit=2,
offset=2,
fields=["id", "name"],
)
self.assertTrue(result1["valid"], f"Error: {result1.get('error')}")
self.assertTrue(result2["valid"], f"Error: {result2.get('error')}")
ids1 = {r["id"] for r in result1.get("preview_records", [])}
ids2 = {r["id"] for r in result2.get("preview_records", [])}
self.assertFalse(ids1 & ids2, "Offset pages should not overlap")

def test_compile_expression_offset_beyond_results(self):
"""compile_expression with offset beyond total should return empty."""
result = self.service.compile_expression("true", "registry_individuals", limit=10, offset=999999, fields=["id"])
self.assertTrue(result["valid"])
self.assertEqual(len(result.get("preview_records", [])), 0)

def test_compile_expression_phone_number_enrichment(self):
"""compile_expression with phone_number_ids field should enrich results."""
partner = self.env["res.partner"].create({"name": "PhoneSvcTest99", "is_registrant": True, "is_group": False})
if "spp.phone.number" in self.env:
self.env["spp.phone.number"].create({"partner_id": partner.id, "phone_no": "+639171111111"})
self.env["spp.phone.number"].create({"partner_id": partner.id, "phone_no": "+639172222222"})
result = self.service.compile_expression(
'r.name == "PhoneSvcTest99"',
"registry_individuals",
limit=10,
fields=["id", "name", "phone_number_ids"],
)
self.assertTrue(result["valid"])
records = result.get("preview_records", [])
self.assertEqual(len(records), 1)
if "spp.phone.number" in self.env:
self.assertIn("phone_numbers", records[0])
self.assertEqual(len(records[0]["phone_numbers"]), 2)

def test_compile_expression_phone_excludes_disabled(self):
"""Phone enrichment should exclude disabled phone numbers."""
partner = self.env["res.partner"].create(
{"name": "PhoneDisabledTest99", "is_registrant": True, "is_group": False}
)
if "spp.phone.number" in self.env:
self.env["spp.phone.number"].create({"partner_id": partner.id, "phone_no": "+639173333333"})
disabled_phone = self.env["spp.phone.number"].create(
{"partner_id": partner.id, "phone_no": "+639174444444"}
)
disabled_phone.disabled = fields.Datetime.now()
result = self.service.compile_expression(
'r.name == "PhoneDisabledTest99"',
"registry_individuals",
limit=10,
fields=["id", "phone_number_ids"],
)
records = result.get("preview_records", [])
if "spp.phone.number" in self.env and records:
self.assertEqual(len(records[0]["phone_numbers"]), 1)
self.assertEqual(records[0]["phone_numbers"], ["+639173333333"])
Loading
Loading