diff --git a/spp_cel_vocabulary/README.rst b/spp_cel_vocabulary/README.rst
index 94a6d768..7f30a0d3 100644
--- a/spp_cel_vocabulary/README.rst
+++ b/spp_cel_vocabulary/README.rst
@@ -10,9 +10,9 @@ OpenSPP CEL Vocabulary Integration
!! source digest: sha256:187208d6a52a5dda86e67b1024f417772f64d60f710b523e275e30c8adbdeb0c
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-.. |badge1| image:: https://img.shields.io/badge/maturity-Alpha-red.png
+.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png
:target: https://odoo-community.org/page/development-status
- :alt: Alpha
+ :alt: Beta
.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png
:target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html
:alt: License: LGPL-3
@@ -124,10 +124,6 @@ Dependencies
``spp_cel_domain``, ``spp_vocabulary``
-.. IMPORTANT::
- This is an alpha version, the data model and design can change at any time without warning.
- Only for development or testing purpose, do not use in production.
-
**Table of contents**
.. contents::
diff --git a/spp_cel_vocabulary/__init__.py b/spp_cel_vocabulary/__init__.py
index c77867a3..5eeeb990 100644
--- a/spp_cel_vocabulary/__init__.py
+++ b/spp_cel_vocabulary/__init__.py
@@ -29,7 +29,7 @@ def _ensure_concept_groups(env):
standard_groups = [
{
"name": "feminine_gender",
- "display_name": "Feminine Gender",
+ "label": "Feminine Gender",
"cel_function": "is_female",
"target_field": "gender_id",
"description": (
@@ -40,7 +40,7 @@ def _ensure_concept_groups(env):
},
{
"name": "masculine_gender",
- "display_name": "Masculine Gender",
+ "label": "Masculine Gender",
"cel_function": "is_male",
"target_field": "gender_id",
"description": (
@@ -51,7 +51,7 @@ def _ensure_concept_groups(env):
},
{
"name": "head_of_household",
- "display_name": "Head of Household",
+ "label": "Head of Household",
"cel_function": "is_head",
"description": (
"Relationship types indicating head of household role. "
@@ -61,7 +61,7 @@ def _ensure_concept_groups(env):
},
{
"name": "pregnant_eligible",
- "display_name": "Pregnant/Eligible",
+ "label": "Pregnant/Eligible",
"cel_function": "is_pregnant",
"target_field": "pregnancy_status_id",
"description": (
@@ -72,7 +72,7 @@ def _ensure_concept_groups(env):
},
{
"name": "climate_hazards",
- "display_name": "Climate-related Hazards",
+ "label": "Climate-related Hazards",
"cel_function": None,
"description": (
"Hazard types related to climate and weather events "
@@ -82,7 +82,7 @@ def _ensure_concept_groups(env):
},
{
"name": "geophysical_hazards",
- "display_name": "Geophysical Hazards",
+ "label": "Geophysical Hazards",
"cel_function": None,
"description": (
"Hazard types related to earth processes "
@@ -92,7 +92,7 @@ def _ensure_concept_groups(env):
},
{
"name": "children",
- "display_name": "Children",
+ "label": "Children",
"cel_function": None,
"description": (
"Age group codes representing children (typically under 18 years). "
@@ -101,7 +101,7 @@ def _ensure_concept_groups(env):
},
{
"name": "adults",
- "display_name": "Adults",
+ "label": "Adults",
"cel_function": None,
"description": (
"Age group codes representing adults. Add codes from your age group vocabulary if applicable."
@@ -109,7 +109,7 @@ def _ensure_concept_groups(env):
},
{
"name": "elderly",
- "display_name": "Elderly/Senior Citizens",
+ "label": "Elderly/Senior Citizens",
"cel_function": None,
"description": (
"Age group codes representing elderly or senior citizens. "
@@ -118,7 +118,7 @@ def _ensure_concept_groups(env):
},
{
"name": "persons_with_disability",
- "display_name": "Persons with Disability",
+ "label": "Persons with Disability",
"cel_function": None,
"description": (
"Disability type codes. "
diff --git a/spp_cel_vocabulary/__manifest__.py b/spp_cel_vocabulary/__manifest__.py
index e229158f..3c8e39e2 100644
--- a/spp_cel_vocabulary/__manifest__.py
+++ b/spp_cel_vocabulary/__manifest__.py
@@ -7,7 +7,7 @@
"author": "OpenSPP.org",
"website": "https://github.com/OpenSPP/OpenSPP2",
"license": "LGPL-3",
- "development_status": "Alpha",
+ "development_status": "Beta",
"depends": [
"spp_cel_domain",
"spp_vocabulary",
diff --git a/spp_cel_vocabulary/models/cel_vocabulary_functions.py b/spp_cel_vocabulary/models/cel_vocabulary_functions.py
index bb6d0970..e182fe4f 100644
--- a/spp_cel_vocabulary/models/cel_vocabulary_functions.py
+++ b/spp_cel_vocabulary/models/cel_vocabulary_functions.py
@@ -89,9 +89,9 @@ def register_vocabulary_functions(self):
# Register with CEL function registry
if registry.register(name, marked):
count += 1
- _logger.debug(f"[CEL Vocabulary] Registered function '{name}'")
+ _logger.debug("[CEL Vocabulary] Registered function '%s'", name)
- _logger.info(f"[CEL Vocabulary] Registered {count} vocabulary function(s)")
+ _logger.info("[CEL Vocabulary] Registered %d vocabulary function(s)", count)
return count
@@ -110,9 +110,9 @@ def unregister_vocabulary_functions(self):
for name in vocab_funcs.VOCABULARY_FUNCTIONS.keys():
if registry.unregister(name):
count += 1
- _logger.debug(f"[CEL Vocabulary] Unregistered function '{name}'")
+ _logger.debug("[CEL Vocabulary] Unregistered function '%s'", name)
- _logger.info(f"[CEL Vocabulary] Unregistered {count} vocabulary function(s)")
+ _logger.info("[CEL Vocabulary] Unregistered %d vocabulary function(s)", count)
return count
diff --git a/spp_cel_vocabulary/models/cel_vocabulary_translator.py b/spp_cel_vocabulary/models/cel_vocabulary_translator.py
index 1ff9409d..bd639221 100644
--- a/spp_cel_vocabulary/models/cel_vocabulary_translator.py
+++ b/spp_cel_vocabulary/models/cel_vocabulary_translator.py
@@ -33,9 +33,6 @@ class CelVocabularyTranslator(models.AbstractModel):
"is_male": "masculine_gender",
"is_head": "head_of_household",
"is_pregnant": "pregnant_eligible",
- "is_caregiver": "caregiver",
- "is_mother": "mother",
- "is_father": "father",
}
def _to_plan(self, model: str, node: Any, cfg: dict[str, Any], ctx: dict[str, Any]): # noqa: C901
@@ -61,7 +58,7 @@ def _to_plan(self, model: str, node: Any, cfg: dict[str, Any], ctx: dict[str, An
# Handle semantic helpers: is_female(field), is_male(field), etc.
if func_name in self.SEMANTIC_HELPERS and len(node.args) >= 1:
- _logger.debug(f"[CEL Vocabulary] Handling semantic helper {func_name} for model {model}")
+ _logger.debug("[CEL Vocabulary] Handling semantic helper %s for model %s", func_name, model)
return self._handle_semantic_helper(model, node, cfg, ctx, func_name)
# Handle comparisons with code() calls
@@ -105,7 +102,7 @@ def _handle_in_group(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: d
field_name, field_model = self._resolve_field(model, field_node, cfg, ctx)
field_name = self._normalize_field_name(field_model or model, field_name)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to resolve field in in_group(): {e}")
+ _logger.warning("[CEL Vocabulary] Failed to resolve field in in_group(): %s", e)
return (
LeafDomain(model, [("id", "=", 0)]),
"in_group() [FIELD RESOLUTION ERROR]",
@@ -115,7 +112,7 @@ def _handle_in_group(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: d
try:
group_name = self._eval_literal(node.args[1], ctx)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to evaluate group name in in_group(): {e}")
+ _logger.warning("[CEL Vocabulary] Failed to evaluate group name in in_group(): %s", e)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
f"in_group({field_name}, ?) [EVAL ERROR]",
@@ -132,7 +129,7 @@ def _handle_in_group(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: d
group = self.env["spp.vocabulary.concept.group"].search([("name", "=", group_name)], limit=1)
if not group:
- _logger.warning(f"[CEL Vocabulary] Concept group '{group_name}' not found, returning empty domain")
+ _logger.warning("[CEL Vocabulary] Concept group '%s' not found, returning empty domain", group_name)
# Return domain that matches nothing
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
@@ -143,7 +140,7 @@ def _handle_in_group(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: d
uri_list = group.get_code_uris()
if not uri_list:
- _logger.warning(f"[CEL Vocabulary] Concept group '{group_name}' has no codes")
+ _logger.warning("[CEL Vocabulary] Concept group '%s' has no codes", group_name)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
f"in_group({field_name}, '{group_name}') [EMPTY GROUP]",
@@ -188,7 +185,7 @@ def _handle_semantic_helper(
"""
group_name = self.SEMANTIC_HELPERS.get(func_name)
if not group_name:
- _logger.warning(f"[CEL Vocabulary] Unknown semantic helper: {func_name}")
+ _logger.warning("[CEL Vocabulary] Unknown semantic helper: %s", func_name)
return (
LeafDomain(model, [("id", "=", 0)]),
f"{func_name}() [UNKNOWN HELPER]",
@@ -200,7 +197,7 @@ def _handle_semantic_helper(
field_name, field_model = self._resolve_field(model, field_node, cfg, ctx)
field_name = self._normalize_field_name(field_model or model, field_name)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to resolve field in {func_name}(): {e}")
+ _logger.warning("[CEL Vocabulary] Failed to resolve field in %s(): %s", func_name, e)
return (
LeafDomain(model, [("id", "=", 0)]),
f"{func_name}() [FIELD RESOLUTION ERROR]",
@@ -211,7 +208,9 @@ def _handle_semantic_helper(
if not group:
_logger.warning(
- f"[CEL Vocabulary] Concept group '{group_name}' not found for {func_name}(), returning empty domain"
+ "[CEL Vocabulary] Concept group '%s' not found for %s(), returning empty domain",
+ group_name,
+ func_name,
)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
@@ -222,7 +221,7 @@ def _handle_semantic_helper(
uri_list = group.get_code_uris()
if not uri_list:
- _logger.warning(f"[CEL Vocabulary] Concept group '{group_name}' has no codes for {func_name}()")
+ _logger.warning("[CEL Vocabulary] Concept group '%s' has no codes for %s()", group_name, func_name)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
f"{func_name}({field_name}) [GROUP EMPTY]",
@@ -264,7 +263,7 @@ def _handle_code_eq(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: di
field_name, field_model = self._resolve_field(model, field_node, cfg, ctx)
field_name = self._normalize_field_name(field_model or model, field_name)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to resolve field in code_eq(): {e}")
+ _logger.warning("[CEL Vocabulary] Failed to resolve field in code_eq(): %s", e)
return (
LeafDomain(model, [("id", "=", 0)]),
"code_eq() [FIELD RESOLUTION ERROR]",
@@ -274,7 +273,7 @@ def _handle_code_eq(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: di
try:
identifier = self._eval_literal(node.args[1], ctx)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to evaluate identifier in code_eq(): {e}")
+ _logger.warning("[CEL Vocabulary] Failed to evaluate identifier in code_eq(): %s", e)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
f"code_eq({field_name}, ?) [EVAL ERROR]",
@@ -290,7 +289,8 @@ def _handle_code_eq(self, model: str, node: P.Call, cfg: dict[str, Any], ctx: di
if not target_code:
_logger.warning(
- f"[CEL Vocabulary] Could not resolve code identifier '{identifier}', returning empty domain"
+ "[CEL Vocabulary] Could not resolve code identifier '%s', returning empty domain",
+ identifier,
)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
@@ -351,7 +351,7 @@ def _handle_code_comparison(
# Only support equality/inequality for code comparisons
if op not in ("=", "!="):
- _logger.warning(f"[CEL Vocabulary] code() comparisons only support == and !=, got {node.op}")
+ _logger.warning("[CEL Vocabulary] code() comparisons only support == and !=, got %s", node.op)
return super()._to_plan(model, node, cfg, ctx)
# Resolve the field
@@ -359,7 +359,7 @@ def _handle_code_comparison(
field_name, field_model = self._resolve_field(model, field_node, cfg, ctx)
field_name = self._normalize_field_name(field_model or model, field_name)
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to resolve field in code comparison: {e}")
+ _logger.warning("[CEL Vocabulary] Failed to resolve field in code comparison: %s", e)
return (
LeafDomain(model, [("id", "=", 0)]),
"code() comparison [FIELD RESOLUTION ERROR]",
@@ -369,7 +369,7 @@ def _handle_code_comparison(
try:
identifier = self._eval_literal(code_node.args[0], ctx) if code_node.args else None
except Exception as e:
- _logger.warning(f"[CEL Vocabulary] Failed to evaluate code identifier: {e}")
+ _logger.warning("[CEL Vocabulary] Failed to evaluate code identifier: %s", e)
identifier = None
if not identifier:
@@ -386,7 +386,7 @@ def _handle_code_comparison(
target_code = self.env["spp.vocabulary.code"].resolve_alias(identifier)
if not target_code:
- _logger.warning(f"[CEL Vocabulary] Could not resolve code '{identifier}'")
+ _logger.warning("[CEL Vocabulary] Could not resolve code '%s'", identifier)
return (
LeafDomain(field_model or model, [("id", "=", 0)]),
f"{field_name} {op} code('{identifier}') [NOT FOUND]",
diff --git a/spp_cel_vocabulary/services/cel_vocabulary_functions.py b/spp_cel_vocabulary/services/cel_vocabulary_functions.py
index e2ba663f..d824aced 100644
--- a/spp_cel_vocabulary/services/cel_vocabulary_functions.py
+++ b/spp_cel_vocabulary/services/cel_vocabulary_functions.py
@@ -79,7 +79,7 @@ def in_group(env, code_value, group_name):
uri_set = ConceptGroup._get_group_uris_cached(group_name)
if uri_set is None:
- _logger.warning(f"[CEL Vocabulary] Concept group '{group_name}' not found")
+ _logger.warning("[CEL Vocabulary] Concept group '%s' not found", group_name)
return False
if not uri_set:
diff --git a/spp_cel_vocabulary/static/description/index.html b/spp_cel_vocabulary/static/description/index.html
index 9d0d487f..6fd1f7f0 100644
--- a/spp_cel_vocabulary/static/description/index.html
+++ b/spp_cel_vocabulary/static/description/index.html
@@ -369,7 +369,7 @@
OpenSPP CEL Vocabulary Integration
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! source digest: sha256:187208d6a52a5dda86e67b1024f417772f64d60f710b523e275e30c8adbdeb0c
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->
-

+

Integrates vocabulary-aware functions into the CEL (Common Expression
Language) expression engine for eligibility rules. Extends the CEL
translator to resolve vocabulary codes by URI or alias and translate
@@ -490,11 +490,6 @@
Extension Points
Dependencies
spp_cel_domain, spp_vocabulary
-
-
Important
-
This is an alpha version, the data model and design can change at any time without warning.
-Only for development or testing purpose, do not use in production.
-
Table of contents
diff --git a/spp_cel_vocabulary/tests/__init__.py b/spp_cel_vocabulary/tests/__init__.py
index 2c397a16..7eb6f309 100644
--- a/spp_cel_vocabulary/tests/__init__.py
+++ b/spp_cel_vocabulary/tests/__init__.py
@@ -1,5 +1,6 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
from . import test_cel_vocabulary
+from . import test_init_and_coverage
from . import test_vocabulary_cache
from . import test_vocabulary_in_exists
diff --git a/spp_cel_vocabulary/tests/test_init_and_coverage.py b/spp_cel_vocabulary/tests/test_init_and_coverage.py
new file mode 100644
index 00000000..eed5a2c7
--- /dev/null
+++ b/spp_cel_vocabulary/tests/test_init_and_coverage.py
@@ -0,0 +1,906 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+"""
+Additional tests for spp_cel_vocabulary to improve coverage to 95%+.
+
+Covers gaps in:
+- _ensure_concept_groups() from __init__.py
+- CelVocabularyFunctions._ensure_registered()
+- head() function edge cases
+- VocabularyCache.clear()
+- Translator error paths
+- code() function edge cases
+"""
+
+from unittest.mock import patch
+
+from odoo.tests import TransactionCase, tagged
+
+from odoo.addons.spp_cel_domain.tests.common import CELTestDataMixin
+
+
+@tagged("post_install", "-at_install")
+class TestEnsureConceptGroups(TransactionCase):
+ """Test _ensure_concept_groups() from __init__.py."""
+
+ def test_all_10_standard_groups_exist(self):
+ """Test that all 10 standard concept groups exist after module install."""
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+
+ expected_groups = [
+ "feminine_gender",
+ "masculine_gender",
+ "head_of_household",
+ "pregnant_eligible",
+ "climate_hazards",
+ "geophysical_hazards",
+ "children",
+ "adults",
+ "elderly",
+ "persons_with_disability",
+ ]
+
+ for group_name in expected_groups:
+ group = ConceptGroup.search([("name", "=", group_name)], limit=1)
+ self.assertTrue(
+ group,
+ f"Standard concept group '{group_name}' should exist after module install",
+ )
+
+ def test_ensure_concept_groups_idempotent(self):
+ """Test that calling _ensure_concept_groups again doesn't create duplicates."""
+ from .. import _ensure_concept_groups
+
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+
+ # Count groups before
+ count_before = ConceptGroup.search_count([])
+
+ # Call again - should be idempotent
+ _ensure_concept_groups(self.env)
+
+ # Count groups after
+ count_after = ConceptGroup.search_count([])
+
+ self.assertEqual(
+ count_before,
+ count_after,
+ "Calling _ensure_concept_groups again should not create duplicate groups",
+ )
+
+ def test_ensure_concept_groups_creates_missing(self):
+ """Test that _ensure_concept_groups creates groups that are missing."""
+ from .. import _ensure_concept_groups
+
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+
+ # Delete one group to simulate missing
+ elderly_group = ConceptGroup.search([("name", "=", "elderly")], limit=1)
+ if elderly_group:
+ elderly_group.unlink()
+
+ # Verify it's gone
+ self.assertFalse(ConceptGroup.search([("name", "=", "elderly")], limit=1))
+
+ # Re-run
+ _ensure_concept_groups(self.env)
+
+ # Should be recreated
+ self.assertTrue(
+ ConceptGroup.search([("name", "=", "elderly")], limit=1),
+ "elderly group should be recreated by _ensure_concept_groups",
+ )
+
+ def test_concept_groups_with_cel_function(self):
+ """Test that concept groups with cel_function attribute are created correctly."""
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+
+ # Groups that should have cel_function set
+ groups_with_functions = {
+ "feminine_gender": "is_female",
+ "masculine_gender": "is_male",
+ "head_of_household": "is_head",
+ "pregnant_eligible": "is_pregnant",
+ }
+
+ for group_name, expected_func in groups_with_functions.items():
+ group = ConceptGroup.search([("name", "=", group_name)], limit=1)
+ self.assertTrue(group, f"Group '{group_name}' should exist")
+ self.assertEqual(
+ group.cel_function,
+ expected_func,
+ f"Group '{group_name}' should have cel_function='{expected_func}'",
+ )
+
+ def test_concept_groups_without_cel_function(self):
+ """Test that concept groups without cel_function have it unset."""
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+
+ groups_without_functions = [
+ "climate_hazards",
+ "geophysical_hazards",
+ "children",
+ "adults",
+ "elderly",
+ "persons_with_disability",
+ ]
+
+ for group_name in groups_without_functions:
+ group = ConceptGroup.search([("name", "=", group_name)], limit=1)
+ self.assertTrue(group, f"Group '{group_name}' should exist")
+ self.assertFalse(
+ group.cel_function,
+ f"Group '{group_name}' should not have cel_function set",
+ )
+
+
+@tagged("post_install", "-at_install")
+class TestEnsureRegistered(TransactionCase):
+ """Test CelVocabularyFunctions._ensure_registered() lazy mechanism."""
+
+ def test_ensure_registered_registers_when_needed(self):
+ """Test that _ensure_registered registers functions when _needs_registration is True."""
+ vocab_funcs_model = self.env["spp.cel.vocabulary.functions"]
+
+ # Force _needs_registration to True
+ vocab_funcs_model.__class__._needs_registration = True
+
+ # Call _ensure_registered
+ vocab_funcs_model._ensure_registered()
+
+ # Should now be False
+ self.assertFalse(
+ vocab_funcs_model.__class__._needs_registration,
+ "_needs_registration should be False after _ensure_registered",
+ )
+
+ # Functions should be registered
+ registry = self.env["spp.cel.function.registry"]
+ self.assertTrue(registry.is_registered("code"))
+ self.assertTrue(registry.is_registered("in_group"))
+
+ def test_ensure_registered_skips_when_not_needed(self):
+ """Test that _ensure_registered skips if already registered."""
+ vocab_funcs_model = self.env["spp.cel.vocabulary.functions"]
+
+ # First ensure it's registered
+ vocab_funcs_model._ensure_registered()
+ self.assertFalse(vocab_funcs_model.__class__._needs_registration)
+
+ # Calling again should not re-register (flag already False)
+ with patch.object(type(vocab_funcs_model), "register_vocabulary_functions") as mock_register:
+ vocab_funcs_model._ensure_registered()
+ mock_register.assert_not_called()
+
+ def test_register_hook_sets_flag(self):
+ """Test that _register_hook sets _needs_registration to True."""
+ vocab_funcs_model = self.env["spp.cel.vocabulary.functions"]
+
+ # Clear the flag
+ vocab_funcs_model.__class__._needs_registration = False
+
+ # Call _register_hook
+ vocab_funcs_model._register_hook()
+
+ # Flag should be set
+ self.assertTrue(
+ vocab_funcs_model.__class__._needs_registration,
+ "_register_hook should set _needs_registration to True",
+ )
+
+
+@tagged("post_install", "-at_install")
+class TestHeadFunction(TransactionCase, CELTestDataMixin):
+ """Test head() function edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls._test_id = cls._get_unique_test_id()
+
+ # Create the group membership type vocabulary if it doesn't exist
+ Vocabulary = cls.env["spp.vocabulary"]
+ existing_vocab = Vocabulary.search(
+ [("namespace_uri", "=", "urn:openspp:vocab:group-membership-type")],
+ limit=1,
+ )
+ if existing_vocab:
+ cls.membership_vocab = existing_vocab
+ else:
+ cls.membership_vocab = Vocabulary.create(
+ {
+ "name": "Group Membership Type",
+ "namespace_uri": "urn:openspp:vocab:group-membership-type",
+ }
+ )
+
+ # Get or create the 'head' code
+ VocabCode = cls.env["spp.vocabulary.code"]
+ cls.head_code = VocabCode.sudo().get_code("urn:openspp:vocab:group-membership-type", "head")
+ if not cls.head_code:
+ cls.head_code = VocabCode.create(
+ {
+ "vocabulary_id": cls.membership_vocab.id,
+ "code": "head",
+ "display": "Head",
+ }
+ )
+
+ # Create a group
+ cls.group_partner = cls._create_test_partner(
+ name=f"Test Group {cls._test_id}",
+ is_registrant=True,
+ is_group=True,
+ )
+
+ # Create an individual
+ cls.individual_partner = cls._create_test_partner(
+ name=f"Test Individual {cls._test_id}",
+ is_registrant=True,
+ is_group=False,
+ )
+
+ # Create another individual (non-head)
+ cls.individual_nonhead = cls._create_test_partner(
+ name=f"Test NonHead {cls._test_id}",
+ is_registrant=True,
+ is_group=False,
+ )
+
+ # Create membership with head type
+ cls.membership_head = cls.env["spp.group.membership"].create(
+ {
+ "group": cls.group_partner.id,
+ "individual": cls.individual_partner.id,
+ "membership_type_ids": [(6, 0, [cls.head_code.id])],
+ }
+ )
+
+ # Create membership without head type
+ cls.membership_nonhead = cls.env["spp.group.membership"].create(
+ {
+ "group": cls.group_partner.id,
+ "individual": cls.individual_nonhead.id,
+ }
+ )
+
+ def test_head_code_not_found(self):
+ """Test head() when head vocabulary code is missing."""
+ from ..services.cel_vocabulary_functions import head
+
+ # Patch get_code to return empty recordset
+ with patch.object(
+ type(self.env["spp.vocabulary.code"]),
+ "get_code",
+ return_value=self.env["spp.vocabulary.code"].browse(),
+ ):
+ result = head(self.env, self.individual_partner)
+ self.assertFalse(result)
+
+ def test_head_with_membership_parameter(self):
+ """Test head() with _membership parameter."""
+ from ..services.cel_vocabulary_functions import head
+
+ # Pass membership directly
+ result = head(self.env, self.individual_partner, _membership=self.membership_head)
+ self.assertTrue(result)
+
+ # Non-head membership
+ result = head(self.env, self.individual_nonhead, _membership=self.membership_nonhead)
+ self.assertFalse(result)
+
+ def test_head_with_group_parameter(self):
+ """Test head() with _group parameter and active membership."""
+ from ..services.cel_vocabulary_functions import head
+
+ # Should find head membership through group context
+ result = head(self.env, self.individual_partner, _group=self.group_partner)
+ self.assertTrue(result)
+
+ # Non-head through group context
+ result = head(self.env, self.individual_nonhead, _group=self.group_partner)
+ self.assertFalse(result)
+
+ def test_head_fallback_individual_membership_ids(self):
+ """Test head() fallback via individual_membership_ids."""
+ from ..services.cel_vocabulary_functions import head
+
+ # No _membership or _group passed, should fall back to individual_membership_ids
+ result = head(self.env, self.individual_partner)
+ self.assertTrue(result)
+
+ # Non-head individual
+ result = head(self.env, self.individual_nonhead)
+ self.assertFalse(result)
+
+ def test_head_with_empty_member(self):
+ """Test head() with empty/falsy member."""
+ from ..services.cel_vocabulary_functions import head
+
+ result = head(self.env, None)
+ self.assertFalse(result)
+
+ result = head(self.env, self.env["res.partner"].browse())
+ self.assertFalse(result)
+
+ def test_head_with_group_no_membership(self):
+ """Test head() with _group parameter but member has no membership in that group."""
+ from ..services.cel_vocabulary_functions import head
+
+ # Create a different group
+ other_group = self._create_test_partner(
+ name=f"Other Group {self._test_id}",
+ is_registrant=True,
+ is_group=True,
+ )
+
+ # Create a new individual that is NOT a member of any group
+ lone_individual = self._create_test_partner(
+ name=f"Lone Individual {self._test_id}",
+ is_registrant=True,
+ is_group=False,
+ )
+
+ # lone_individual is not a member of other_group
+ result = head(self.env, lone_individual, _group=other_group)
+ self.assertFalse(result)
+
+
+@tagged("post_install", "-at_install")
+class TestVocabularyCacheClear(TransactionCase, CELTestDataMixin):
+ """Test VocabularyCache.clear() method."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls._test_id = cls._get_unique_test_id()
+
+ cls.gender_vocab = cls._create_test_vocabulary(
+ name=f"Gender {cls._test_id}",
+ namespace_uri=f"urn:test:vocab:gender:{cls._test_id}",
+ )
+
+ cls.code_female = cls._create_test_vocabulary_code(
+ vocabulary=cls.gender_vocab,
+ code=f"F_{cls._test_id}",
+ display="Female",
+ )
+
+ ConceptGroup = cls.env["spp.vocabulary.concept.group"]
+ existing_feminine = ConceptGroup.search([("name", "=", "feminine_gender")], limit=1)
+ if existing_feminine:
+ existing_feminine.write({"code_ids": [(4, cls.code_female.id)]})
+ else:
+ cls._create_test_concept_group(
+ name="feminine_gender",
+ display_name="Feminine Gender",
+ codes=[cls.code_female],
+ )
+
+ def test_clear_resets_cache(self):
+ """Test that clear() resets the cache."""
+ from ..services.vocabulary_cache import VocabularyCache
+
+ cache = VocabularyCache(self.env)
+
+ # Populate cache
+ cache.check_membership(self.code_female, "feminine_gender")
+ cache.check_membership(self.code_female, "nonexistent_group")
+
+ # Verify cache has data
+ self.assertGreater(len(cache._group_uri_cache), 0)
+
+ # Clear
+ cache.clear()
+
+ # Verify cache is empty
+ self.assertEqual(len(cache._group_uri_cache), 0)
+
+ def test_clear_resets_stats(self):
+ """Test that clear() resets stats."""
+ from ..services.vocabulary_cache import VocabularyCache
+
+ cache = VocabularyCache(self.env)
+
+ # Populate cache
+ cache.check_membership(self.code_female, "feminine_gender")
+ cache.check_membership(self.code_female, "nonexistent_group")
+
+ stats_before = cache.stats
+ self.assertGreater(stats_before["cached_groups"], 0)
+
+ # Clear
+ cache.clear()
+
+ stats_after = cache.stats
+ self.assertEqual(stats_after["cached_groups"], 0)
+ self.assertEqual(stats_after["missing_groups"], 0)
+
+ def test_cache_works_after_clear(self):
+ """Test that cache works correctly after being cleared."""
+ from ..services.vocabulary_cache import VocabularyCache
+
+ cache = VocabularyCache(self.env)
+
+ # Populate, clear, then re-populate
+ cache.check_membership(self.code_female, "feminine_gender")
+ cache.clear()
+
+ # Should work fine after clear
+ result = cache.check_membership(self.code_female, "feminine_gender")
+ self.assertTrue(result)
+ self.assertIn("feminine_gender", cache._group_uri_cache)
+
+
+@tagged("post_install", "-at_install")
+class TestTranslatorErrorPaths(TransactionCase, CELTestDataMixin):
+ """Test translator error paths in cel_vocabulary_translator.py."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls._test_id = cls._get_unique_test_id()
+
+ # Create test vocabulary and codes
+ cls.gender_vocab = cls._create_test_vocabulary(
+ name=f"Gender {cls._test_id}",
+ namespace_uri=f"urn:test:vocab:gender:{cls._test_id}",
+ )
+
+ cls.code_female = cls._create_test_vocabulary_code(
+ vocabulary=cls.gender_vocab,
+ code=f"F_{cls._test_id}",
+ display="Female",
+ )
+
+ # Create an empty concept group (no codes)
+ cls.empty_group = cls._create_test_concept_group(
+ name=f"empty_group_{cls._test_id}",
+ display_name="Empty Group",
+ )
+
+ cls.translator = cls.env["spp.cel.translator"]
+
+ def test_semantic_helper_unknown_name(self):
+ """Test _handle_semantic_helper() with unknown helper name."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ # Create a fake Call node for an unknown semantic helper
+ func_ident = P.Ident("unknown_helper")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ call_node = P.Call(func_ident, [field_arg])
+
+ plan, explain = self.translator._handle_semantic_helper("res.partner", call_node, {}, {}, "unknown_helper")
+
+ self.assertIn("UNKNOWN HELPER", explain)
+
+ def test_in_group_empty_uri_list(self):
+ """Test _handle_in_group() when group has empty URI list."""
+ expr = f'in_group(r.gender_id, "{self.empty_group.name}")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+
+ self.assertIn("EMPTY GROUP", explain.upper())
+
+ def test_code_comparison_unsupported_gt_operator(self):
+ """Test _handle_code_comparison() with > operator falls back to parent."""
+ # Using r.gender_id > code("Female") should fall back since > is not supported
+ expr = f'r.gender_id > code("F_{self._test_id}")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ # This should not raise, it should fall back to parent translator
+ try:
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+ # If it succeeds, that's fine (parent handled it)
+ except Exception: # pylint: disable=except-pass
+ # Parent translator may also reject this, which is fine
+ pass # nosemgrep
+
+ def test_code_comparison_unsupported_lt_operator(self):
+ """Test _handle_code_comparison() with < operator falls back to parent."""
+ expr = f'r.gender_id < code("F_{self._test_id}")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ try:
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+ except Exception: # pylint: disable=except-pass
+ # Parent translator may reject this, which is fine
+ pass # nosemgrep
+
+ def test_in_group_empty_group_name(self):
+ """Test in_group() with empty group name."""
+ expr = 'in_group(r.gender_id, "")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+
+ self.assertIn("INVALID", explain.upper())
+
+ def test_semantic_helper_group_not_found(self):
+ """Test semantic helper when its concept group doesn't exist."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ # Directly call _handle_semantic_helper with a helper whose group we'll make missing
+ func_ident = P.Ident("is_pregnant")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ call_node = P.Call(func_ident, [field_arg])
+
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ # Delete pregnant_eligible group temporarily
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+ pregnant_group = ConceptGroup.search([("name", "=", "pregnant_eligible")], limit=1)
+ if pregnant_group:
+ pregnant_group.unlink()
+
+ plan, explain = self.translator._handle_semantic_helper("res.partner", call_node, cfg, {}, "is_pregnant")
+
+ self.assertIn("NOT FOUND", explain.upper())
+
+ def test_code_eq_nonexistent_code_via_translator(self):
+ """Test code_eq() translation with nonexistent code identifier."""
+ expr = 'code_eq(r.gender_id, "nonexistent_code_xyz_12345")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+
+ self.assertIn("NOT FOUND", explain.upper())
+
+
+@tagged("post_install", "-at_install")
+class TestCodeFunctionEdgeCases(TransactionCase):
+ """Test code() function with various falsy non-string values."""
+
+ def test_code_with_zero(self):
+ """Test code() with 0."""
+ from ..services.cel_vocabulary_functions import code
+
+ result = code(self.env, 0)
+ self.assertFalse(result)
+
+ def test_code_with_false(self):
+ """Test code() with False."""
+ from ..services.cel_vocabulary_functions import code
+
+ result = code(self.env, False)
+ self.assertFalse(result)
+
+ def test_code_with_empty_list(self):
+ """Test code() with empty list."""
+ from ..services.cel_vocabulary_functions import code
+
+ result = code(self.env, [])
+ self.assertFalse(result)
+
+ def test_code_with_empty_dict(self):
+ """Test code() with empty dict."""
+ from ..services.cel_vocabulary_functions import code
+
+ result = code(self.env, {})
+ self.assertFalse(result)
+
+ def test_code_with_none(self):
+ """Test code() with None."""
+ from ..services.cel_vocabulary_functions import code
+
+ result = code(self.env, None)
+ self.assertFalse(result)
+
+
+@tagged("post_install", "-at_install")
+class TestPostInitHook(TransactionCase):
+ """Test post_init_hook from __init__.py."""
+
+ def test_post_init_hook(self):
+ """Test that post_init_hook runs without errors."""
+ from .. import post_init_hook
+
+ # Should run without raising
+ post_init_hook(self.env)
+
+ # Verify concept groups exist
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+ self.assertTrue(ConceptGroup.search([("name", "=", "feminine_gender")], limit=1))
+
+ # Verify functions are registered
+ registry = self.env["spp.cel.function.registry"]
+ self.assertTrue(registry.is_registered("code"))
+
+ def test_post_init_hook_idempotent(self):
+ """Test that post_init_hook can be called multiple times safely."""
+ from .. import post_init_hook
+
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+ count_before = ConceptGroup.search_count([])
+
+ # Call twice
+ post_init_hook(self.env)
+ post_init_hook(self.env)
+
+ count_after = ConceptGroup.search_count([])
+ self.assertEqual(count_before, count_after)
+
+
+@tagged("post_install", "-at_install")
+class TestGetFunctionHelp(TransactionCase):
+ """Test get_function_help edge cases."""
+
+ def test_get_function_help_nonexistent(self):
+ """Test get_function_help with nonexistent function name."""
+ vocab_funcs = self.env["spp.cel.vocabulary.functions"]
+ help_text = vocab_funcs.get_function_help("nonexistent_function_xyz")
+ self.assertEqual(help_text, "")
+
+ def test_get_function_help_head(self):
+ """Test get_function_help for head function."""
+ vocab_funcs = self.env["spp.cel.vocabulary.functions"]
+ help_text = vocab_funcs.get_function_help("head")
+ self.assertIn("head", help_text.lower())
+
+ def test_get_function_help_in_group(self):
+ """Test get_function_help for in_group function."""
+ vocab_funcs = self.env["spp.cel.vocabulary.functions"]
+ help_text = vocab_funcs.get_function_help("in_group")
+ self.assertIn("group", help_text.lower())
+
+
+@tagged("post_install", "-at_install")
+class TestCreateEnvMarker(TransactionCase):
+ """Test _create_env_marker method."""
+
+ def test_create_env_marker_sets_flag(self):
+ """Test that _create_env_marker sets _cel_needs_env on the function."""
+ vocab_funcs = self.env["spp.cel.vocabulary.functions"]
+
+ def dummy_func(env, x):
+ return x
+
+ marked = vocab_funcs._create_env_marker(dummy_func)
+
+ self.assertTrue(getattr(marked, "_cel_needs_env", False))
+ self.assertIs(marked, dummy_func) # Should return same function object
+
+
+@tagged("post_install", "-at_install")
+class TestTranslatorChangedLines(TransactionCase, CELTestDataMixin):
+ """Test translator changed lines for codecov patch coverage.
+
+ Targets the specific _logger.warning/debug lines that were changed
+ from f-strings to lazy % formatting.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls._test_id = cls._get_unique_test_id()
+
+ cls.gender_vocab = cls._create_test_vocabulary(
+ name=f"Gender TL {cls._test_id}",
+ namespace_uri=f"urn:test:vocab:gender-tl:{cls._test_id}",
+ )
+
+ cls.code_female = cls._create_test_vocabulary_code(
+ vocabulary=cls.gender_vocab,
+ code=f"F_TL_{cls._test_id}",
+ display="Female",
+ )
+
+ # Create concept group with codes for successful paths
+ ConceptGroup = cls.env["spp.vocabulary.concept.group"]
+ cls.test_group = ConceptGroup.search([("name", "=", "feminine_gender")], limit=1)
+ if cls.test_group:
+ cls.test_group.write({"code_ids": [(4, cls.code_female.id)]})
+ else:
+ cls.test_group = cls._create_test_concept_group(
+ name="feminine_gender",
+ display_name="Feminine Gender",
+ codes=[cls.code_female],
+ )
+
+ cls.translator = cls.env["spp.cel.translator"]
+
+ def test_successful_semantic_helper_debug_log(self):
+ """Test successful is_female() call covers debug log line in _to_plan."""
+ # This exercises line 61: _logger.debug("...%s...%s", func_name, model)
+ expr = "is_female(r.gender_id)"
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+ # Should succeed (covers the debug log line in _to_plan)
+ self.assertIn("feminine_gender", explain.lower())
+
+ def test_in_group_field_resolution_error(self):
+ """Test in_group() field resolution error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ func_ident = P.Ident("in_group")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ group_arg = P.Literal("feminine_gender")
+ call_node = P.Call(func_ident, [field_arg, group_arg])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_resolve_field",
+ side_effect=ValueError("test field error"),
+ ):
+ plan, explain = self.translator._handle_in_group("res.partner", call_node, cfg, {})
+ self.assertIn("FIELD RESOLUTION ERROR", explain)
+
+ def test_in_group_group_name_eval_error(self):
+ """Test in_group() group name eval error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ func_ident = P.Ident("in_group")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ group_arg = P.Literal("feminine_gender")
+ call_node = P.Call(func_ident, [field_arg, group_arg])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_eval_literal",
+ side_effect=ValueError("test eval error"),
+ ):
+ plan, explain = self.translator._handle_in_group("res.partner", call_node, cfg, {})
+ self.assertIn("EVAL ERROR", explain)
+
+ def test_in_group_group_not_found(self):
+ """Test in_group() with nonexistent group covers warning log."""
+ expr = 'in_group(r.gender_id, "nonexistent_group_xyz_99")'
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator.translate("res.partner", expr, cfg)
+ self.assertIn("GROUP NOT FOUND", explain)
+
+ def test_semantic_helper_field_resolution_error(self):
+ """Test is_female() field resolution error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ func_ident = P.Ident("is_female")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ call_node = P.Call(func_ident, [field_arg])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_resolve_field",
+ side_effect=ValueError("test field error"),
+ ):
+ plan, explain = self.translator._handle_semantic_helper("res.partner", call_node, cfg, {}, "is_female")
+ self.assertIn("FIELD RESOLUTION ERROR", explain)
+
+ def test_semantic_helper_empty_group(self):
+ """Test semantic helper when concept group has no codes covers warning log."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ # Create empty concept group
+ ConceptGroup = self.env["spp.vocabulary.concept.group"]
+ empty_group = ConceptGroup.create(
+ {
+ "name": f"empty_sem_{self._test_id}",
+ "label": "Empty Semantic",
+ }
+ )
+
+ func_ident = P.Ident("is_female")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ call_node = P.Call(func_ident, [field_arg])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ # Temporarily change SEMANTIC_HELPERS to point to our empty group
+ original = self.translator.SEMANTIC_HELPERS.copy()
+ try:
+ self.translator.SEMANTIC_HELPERS["is_female"] = empty_group.name
+ plan, explain = self.translator._handle_semantic_helper("res.partner", call_node, cfg, {}, "is_female")
+ self.assertIn("GROUP EMPTY", explain)
+ finally:
+ self.translator.SEMANTIC_HELPERS.update(original)
+
+ empty_group.unlink()
+
+ def test_code_eq_field_resolution_error(self):
+ """Test code_eq() field resolution error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ func_ident = P.Ident("code_eq")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ identifier = P.Literal("some_code")
+ call_node = P.Call(func_ident, [field_arg, identifier])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_resolve_field",
+ side_effect=ValueError("test field error"),
+ ):
+ plan, explain = self.translator._handle_code_eq("res.partner", call_node, cfg, {})
+ self.assertIn("FIELD RESOLUTION ERROR", explain)
+
+ def test_code_eq_identifier_eval_error(self):
+ """Test code_eq() identifier eval error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ func_ident = P.Ident("code_eq")
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ identifier = P.Literal("some_code")
+ call_node = P.Call(func_ident, [field_arg, identifier])
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ # _handle_code_eq calls _eval_literal for the identifier (args[1]).
+ # Patch it to always raise - field resolution uses _resolve_field, not _eval_literal.
+ with patch.object(
+ type(self.translator),
+ "_eval_literal",
+ side_effect=ValueError("test eval error"),
+ ):
+ plan, explain = self.translator._handle_code_eq("res.partner", call_node, cfg, {})
+ self.assertIn("EVAL ERROR", explain)
+
+ def test_code_comparison_field_resolution_error(self):
+ """Test code() comparison field resolution error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ code_call = P.Call(P.Ident("code"), [P.Literal("female")])
+ compare_node = P.Compare("EQ", field_arg, code_call)
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_resolve_field",
+ side_effect=ValueError("test field error"),
+ ):
+ plan, explain = self.translator._handle_code_comparison(
+ "res.partner", compare_node, cfg, {}, left_is_code=False
+ )
+ self.assertIn("FIELD RESOLUTION ERROR", explain)
+
+ def test_code_comparison_eval_error(self):
+ """Test code() comparison eval error via patch."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ code_call = P.Call(P.Ident("code"), [P.Literal("female")])
+ compare_node = P.Compare("EQ", field_arg, code_call)
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ with patch.object(
+ type(self.translator),
+ "_eval_literal",
+ side_effect=ValueError("test eval error"),
+ ):
+ plan, explain = self.translator._handle_code_comparison(
+ "res.partner", compare_node, cfg, {}, left_is_code=False
+ )
+ # identifier becomes None after eval error, triggering empty check
+ self.assertIn("code(EMPTY)", explain)
+
+ def test_code_comparison_code_not_found(self):
+ """Test code() comparison with nonexistent code covers warning log."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ code_call = P.Call(P.Ident("code"), [P.Literal("nonexistent_xyz_99")])
+ compare_node = P.Compare("EQ", field_arg, code_call)
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator._handle_code_comparison(
+ "res.partner", compare_node, cfg, {}, left_is_code=False
+ )
+ self.assertIn("NOT FOUND", explain)
+
+ def test_code_comparison_empty_identifier(self):
+ """Test code() comparison with empty args covers warning log."""
+ from odoo.addons.spp_cel_domain.services import cel_parser as P
+
+ field_arg = P.Attr(P.Ident("r"), "gender_id")
+ # code() with NO arguments
+ code_call = P.Call(P.Ident("code"), [])
+ compare_node = P.Compare("EQ", field_arg, code_call)
+ cfg = {"symbols": {"r": {"model": "res.partner"}}}
+
+ plan, explain = self.translator._handle_code_comparison(
+ "res.partner", compare_node, cfg, {}, left_is_code=False
+ )
+ self.assertIn("code(EMPTY)", explain)
diff --git a/spp_gis_indicators/README.rst b/spp_gis_indicators/README.rst
index 308e7008..e6a9006d 100644
--- a/spp_gis_indicators/README.rst
+++ b/spp_gis_indicators/README.rst
@@ -10,9 +10,9 @@ OpenSPP GIS Indicators
!! source digest: sha256:3e0d3c935187c1430aaa36bc854f98fde0d3ffa19bf8bafb7b342ebb706da310
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-.. |badge1| image:: https://img.shields.io/badge/maturity-Alpha-red.png
+.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png
:target: https://odoo-community.org/page/development-status
- :alt: Alpha
+ :alt: Beta
.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png
:target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html
:alt: License: LGPL-3
@@ -46,19 +46,12 @@ Key Capabilities
Key Models
~~~~~~~~~~
-+-----------------------------+----------------------------------------+
-| Model | Description |
-+=============================+========================================+
-| ``spp.gis.indicator.layer`` | Configuration linking a CEL variable |
-| | to color scale and classification |
-| | settings |
-+-----------------------------+----------------------------------------+
-| ``spp.gis.color.scale`` | Color scheme definition with JSON |
-| | array of hex colors |
-+-----------------------------+----------------------------------------+
-| ``spp.gis.data.layer`` | Extended with ``choropleth`` geo |
-| | representation option |
-+-----------------------------+----------------------------------------+
+- **spp.gis.indicator.layer** -- Configuration linking a CEL variable to
+ color scale and classification settings
+- **spp.gis.color.scale** -- Color scheme definition with JSON array of
+ hex colors
+- **spp.gis.data.layer** -- Extended with choropleth geo representation
+ option
Configuration
~~~~~~~~~~~~~
@@ -83,13 +76,9 @@ UI Location
Security
~~~~~~~~
-================================== =============================
-Group Access
-================================== =============================
-``spp_security.group_spp_user`` Read
-``spp_security.group_spp_manager`` Read/write/create (no delete)
-``spp_security.group_spp_admin`` Full CRUD
-================================== =============================
+- **spp_security.group_spp_user** -- Read
+- **spp_security.group_spp_manager** -- Read/write/create (no delete)
+- **spp_security.group_spp_admin** -- Full CRUD
Extension Points
~~~~~~~~~~~~~~~~
@@ -105,11 +94,7 @@ Extension Points
Dependencies
~~~~~~~~~~~~
-``spp_gis``, ``spp_hxl_area``
-
-.. IMPORTANT::
- This is an alpha version, the data model and design can change at any time without warning.
- Only for development or testing purpose, do not use in production.
+``spp_gis``, ``spp_hxl_area``, ``spp_registry``, ``spp_security``
**Table of contents**
diff --git a/spp_gis_indicators/__manifest__.py b/spp_gis_indicators/__manifest__.py
index de00df2b..d9e41c6c 100644
--- a/spp_gis_indicators/__manifest__.py
+++ b/spp_gis_indicators/__manifest__.py
@@ -8,11 +8,12 @@
"author": "OpenSPP.org",
"website": "https://github.com/OpenSPP/OpenSPP2",
"license": "LGPL-3",
- "development_status": "Alpha",
+ "development_status": "Beta",
"depends": [
"spp_gis",
"spp_hxl_area",
"spp_registry",
+ "spp_security",
],
"data": [
"security/ir.model.access.csv",
diff --git a/spp_gis_indicators/models/data_layer.py b/spp_gis_indicators/models/data_layer.py
index c0d2525f..4e2ce4f3 100644
--- a/spp_gis_indicators/models/data_layer.py
+++ b/spp_gis_indicators/models/data_layer.py
@@ -1,26 +1,61 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+import json
import logging
-from odoo import fields, models
+from odoo import _, api, models
+from odoo.exceptions import ValidationError
+from odoo.fields import Many2one
_logger = logging.getLogger(__name__)
class GisDataLayerIndicator(models.Model):
- """Extend GIS Data Layer to support choropleth visualization."""
+ """Extend GIS Data Layer to support indicator-based choropleth visualization."""
_inherit = "spp.gis.data.layer"
- geo_repr = fields.Selection(
- selection_add=[
- ("choropleth", "Choropleth (Data-driven colors)"),
- ],
- ondelete={"choropleth": "set default"},
- )
-
- indicator_layer_id = fields.Many2one(
+ indicator_layer_id = Many2one(
"spp.gis.indicator.layer",
string="Indicator Configuration",
help="Configure which indicator to visualize as choropleth",
)
+
+ @api.constrains("geo_repr", "choropleth_field_id", "indicator_layer_id")
+ def _check_choropleth_config(self):
+ """Validate choropleth layers have required configuration.
+
+ Overrides base constraint to accept EITHER choropleth_field_id
+ OR indicator_layer_id for choropleth layers.
+ """
+ for rec in self:
+ if rec.geo_repr == "choropleth" and not rec.choropleth_field_id and not rec.indicator_layer_id:
+ raise ValidationError(_("Choropleth layers require a Value Field or Indicator Configuration."))
+
+ def _get_choropleth_config(self):
+ """Return choropleth configuration dictionary for frontend.
+
+ Overrides base to return indicator-based config when indicator_layer_id
+ is set, falling back to the base field-based config otherwise.
+ """
+ self.ensure_one()
+ if self.geo_repr != "choropleth":
+ return None
+
+ if self.indicator_layer_id:
+ breaks = json.loads(self.indicator_layer_id.break_values or "[]")
+ colors = (
+ self.indicator_layer_id.color_scale_id.get_colors() if self.indicator_layer_id.color_scale_id else []
+ )
+ return {
+ "type": "indicator",
+ "color_ramp": colors,
+ "break_values": breaks,
+ "classification": self.indicator_layer_id.classification_method,
+ "class_count": self.indicator_layer_id.num_classes,
+ "show_legend": True,
+ "legend_title": self.indicator_layer_id.name,
+ "legend_html": self.indicator_layer_id.legend_html,
+ }
+
+ return super()._get_choropleth_config()
diff --git a/spp_gis_indicators/models/indicator_layer.py b/spp_gis_indicators/models/indicator_layer.py
index 7f850885..a40a1e40 100644
--- a/spp_gis_indicators/models/indicator_layer.py
+++ b/spp_gis_indicators/models/indicator_layer.py
@@ -387,6 +387,8 @@ def get_feature_colors(self, area_ids):
num_colors = len(colors)
for ind in indicators:
+ if not ind.value and ind.value != 0:
+ continue
# Determine which class this value falls into
class_idx = 0
for i, break_val in enumerate(breaks):
diff --git a/spp_gis_indicators/readme/DESCRIPTION.md b/spp_gis_indicators/readme/DESCRIPTION.md
index 649299cc..87c6e3fc 100644
--- a/spp_gis_indicators/readme/DESCRIPTION.md
+++ b/spp_gis_indicators/readme/DESCRIPTION.md
@@ -12,11 +12,9 @@ Choropleth visualization for area-level indicators on GIS maps. Maps indicator v
### Key Models
-| Model | Description |
-| -------------------------- | ------------------------------------------------------------------------------- |
-| `spp.gis.indicator.layer` | Configuration linking a CEL variable to color scale and classification settings |
-| `spp.gis.color.scale` | Color scheme definition with JSON array of hex colors |
-| `spp.gis.data.layer` | Extended with `choropleth` geo representation option |
+- **spp.gis.indicator.layer** -- Configuration linking a CEL variable to color scale and classification settings
+- **spp.gis.color.scale** -- Color scheme definition with JSON array of hex colors
+- **spp.gis.data.layer** -- Extended with choropleth geo representation option
### Configuration
@@ -35,11 +33,9 @@ After installing:
### Security
-| Group | Access |
-| -------------------------------- | ----------------------------- |
-| `spp_security.group_spp_user` | Read |
-| `spp_security.group_spp_manager` | Read/write/create (no delete) |
-| `spp_security.group_spp_admin` | Full CRUD |
+- **spp_security.group_spp_user** -- Read
+- **spp_security.group_spp_manager** -- Read/write/create (no delete)
+- **spp_security.group_spp_admin** -- Full CRUD
### Extension Points
@@ -49,4 +45,4 @@ After installing:
### Dependencies
-`spp_gis`, `spp_hxl_area`
+`spp_gis`, `spp_hxl_area`, `spp_registry`, `spp_security`
diff --git a/spp_gis_indicators/static/description/index.html b/spp_gis_indicators/static/description/index.html
index 6bbb451c..ff1a9f99 100644
--- a/spp_gis_indicators/static/description/index.html
+++ b/spp_gis_indicators/static/description/index.html
@@ -369,7 +369,7 @@ OpenSPP GIS Indicators
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! source digest: sha256:3e0d3c935187c1430aaa36bc854f98fde0d3ffa19bf8bafb7b342ebb706da310
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->
-

+

Choropleth visualization for area-level indicators on GIS maps. Maps
indicator values from CEL variables to colors using configurable
classification methods and ColorBrewer-based color scales. Supports
@@ -393,32 +393,14 @@
Key Capabilities
Key Models
-
-
-
-
-
-
-| Model |
-Description |
-
-
-
-| spp.gis.indicator.layer |
-Configuration linking a CEL variable
-to color scale and classification
-settings |
-
-| spp.gis.color.scale |
-Color scheme definition with JSON
-array of hex colors |
-
-| spp.gis.data.layer |
-Extended with choropleth geo
-representation option |
-
-
-
+
+- spp.gis.indicator.layer – Configuration linking a CEL variable to
+color scale and classification settings
+- spp.gis.color.scale – Color scheme definition with JSON array of
+hex colors
+- spp.gis.data.layer – Extended with choropleth geo representation
+option
+
Configuration
@@ -443,28 +425,11 @@ UI Location
Security
-
-
-
-
-
-
-| Group |
-Access |
-
-
-
-| spp_security.group_spp_user |
-Read |
-
-| spp_security.group_spp_manager |
-Read/write/create (no delete) |
-
-| spp_security.group_spp_admin |
-Full CRUD |
-
-
-
+
+- spp_security.group_spp_user – Read
+- spp_security.group_spp_manager – Read/write/create (no delete)
+- spp_security.group_spp_admin – Full CRUD
+
Extension Points
@@ -480,12 +445,7 @@ Extension Points
Dependencies
-
spp_gis, spp_hxl_area
-
-
Important
-
This is an alpha version, the data model and design can change at any time without warning.
-Only for development or testing purpose, do not use in production.
-
+
spp_gis, spp_hxl_area, spp_registry, spp_security
Table of contents
diff --git a/spp_gis_indicators/tests/__init__.py b/spp_gis_indicators/tests/__init__.py
index 721eb799..e324fe3a 100644
--- a/spp_gis_indicators/tests/__init__.py
+++ b/spp_gis_indicators/tests/__init__.py
@@ -1,5 +1,6 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
from . import test_color_scale
+from . import test_coverage_gaps
from . import test_data_layer
from . import test_indicator_layer
diff --git a/spp_gis_indicators/tests/test_coverage_gaps.py b/spp_gis_indicators/tests/test_coverage_gaps.py
new file mode 100644
index 00000000..eb5cbc2a
--- /dev/null
+++ b/spp_gis_indicators/tests/test_coverage_gaps.py
@@ -0,0 +1,1084 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+
+import json
+
+from odoo.exceptions import ValidationError
+from odoo.tests import tagged
+from odoo.tests.common import TransactionCase
+
+
+@tagged("post_install", "-at_install")
+class TestQuantileBreaksEdgeCases(TransactionCase):
+ """Test _compute_quantile_breaks edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.IndicatorLayer = cls.env["spp.gis.indicator.layer"]
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_quantile_edge",
+ "cel_accessor": "test_quantile_edge",
+ "label": "Test Quantile Edge",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ def test_quantile_breaks_empty_values(self):
+ """Test _compute_quantile_breaks with empty values list."""
+ layer = self.IndicatorLayer.create(
+ {
+ "name": "Test Layer",
+ "variable_id": self.variable.id,
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ result = layer._compute_quantile_breaks([], 3)
+ self.assertEqual(result, [])
+
+ def test_quantile_breaks_num_classes_less_than_2(self):
+ """Test _compute_quantile_breaks with num_classes < 2."""
+ layer = self.IndicatorLayer.create(
+ {
+ "name": "Test Layer",
+ "variable_id": self.variable.id,
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ result = layer._compute_quantile_breaks([10, 20, 30], 1)
+ self.assertEqual(result, [])
+
+ def test_quantile_breaks_unique_values_less_than_num_classes(self):
+ """Test _compute_quantile_breaks when unique values < num_classes returns unique_values[:-1]."""
+ layer = self.IndicatorLayer.create(
+ {
+ "name": "Test Layer",
+ "variable_id": self.variable.id,
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ # 2 unique values but requesting 5 classes
+ result = layer._compute_quantile_breaks([10, 10, 20, 20], 5)
+ # unique values are [10, 20], so [:-1] = [10]
+ self.assertEqual(result, [10])
+
+ def test_quantile_breaks_three_unique_for_five_classes(self):
+ """Test _compute_quantile_breaks when 3 unique values but 5 classes requested."""
+ layer = self.IndicatorLayer.create(
+ {
+ "name": "Test Layer",
+ "variable_id": self.variable.id,
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ result = layer._compute_quantile_breaks([10, 20, 30], 5)
+ # unique values [10, 20, 30], 3 < 5, so returns [10, 20]
+ self.assertEqual(result, [10, 20])
+
+
+@tagged("post_install", "-at_install")
+class TestLegendHtmlEdgeCases(TransactionCase):
+ """Test _compute_legend_html edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_legend_edge",
+ "cel_accessor": "test_legend_edge",
+ "label": "Test Legend Edge",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ def test_legend_html_empty_break_values(self):
+ """Test legend HTML is empty when break_values is empty."""
+ # Create a layer with no indicator data so break_values is empty
+ var_empty = self.env["spp.cel.variable"].create(
+ {
+ "name": "test_legend_empty",
+ "cel_accessor": "test_legend_empty",
+ "label": "Test Legend Empty",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Empty Breaks Layer",
+ "variable_id": var_empty.id,
+ "period_key": "2099-01",
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ # No indicators exist for this variable/period, so break_values should be empty
+ self.assertEqual(layer.break_values, "")
+ self.assertEqual(layer.legend_html, "")
+
+ def test_legend_html_with_valid_data(self):
+ """Test legend HTML is generated when color_scale_id and break_values are present."""
+ area1 = self.env["spp.area"].create(
+ {
+ "draft_name": "Legend Test Area 1",
+ "code": "LTA1",
+ }
+ )
+ area2 = self.env["spp.area"].create(
+ {
+ "draft_name": "Legend Test Area 2",
+ "code": "LTA2",
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 50.0,
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area2.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 100.0,
+ }
+ )
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "With Color Scale Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "color_scale_id": self.color_scale.id,
+ "num_classes": 2,
+ }
+ )
+ # Should have legend content since we have data and color scale
+ self.assertTrue(layer.break_values)
+ self.assertTrue(layer.legend_html)
+ self.assertIn("gis-choropleth-legend", layer.legend_html)
+
+
+@tagged("post_install", "-at_install")
+class TestGetFeatureColorsEdgeCases(TransactionCase):
+ """Test get_feature_colors edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_feature_edge",
+ "cel_accessor": "test_feature_edge",
+ "label": "Test Feature Edge",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ cls.area1 = cls.env["spp.area"].create(
+ {
+ "draft_name": "Feature Area 1",
+ "code": "FA1",
+ }
+ )
+ cls.area2 = cls.env["spp.area"].create(
+ {
+ "draft_name": "Feature Area 2",
+ "code": "FA2",
+ }
+ )
+ cls.area3 = cls.env["spp.area"].create(
+ {
+ "draft_name": "Feature Area 3",
+ "code": "FA3",
+ }
+ )
+ cls.area4 = cls.env["spp.area"].create(
+ {
+ "draft_name": "Feature Area 4",
+ "code": "FA4",
+ }
+ )
+
+ def test_get_feature_colors_no_indicators(self):
+ """Test get_feature_colors returns empty dict when no indicators exist."""
+ var_empty = self.env["spp.cel.variable"].create(
+ {
+ "name": "test_fc_empty",
+ "cel_accessor": "test_fc_empty",
+ "label": "Test FC Empty",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "No Indicators Layer",
+ "variable_id": var_empty.id,
+ "period_key": "2099-01",
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ result = layer.get_feature_colors([self.area1.id, self.area2.id])
+ self.assertEqual(result, {})
+
+ def test_get_feature_colors_empty_break_values(self):
+ """Test get_feature_colors when break_values is truly empty string returns empty dict."""
+ var_nodata = self.env["spp.cel.variable"].create(
+ {
+ "name": "test_fc_nodata",
+ "cel_accessor": "test_fc_nodata",
+ "label": "Test FC No Data",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+ # No indicators at all => break_values will be empty string
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Empty Breaks Layer",
+ "variable_id": var_nodata.id,
+ "period_key": "2099-01",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "equal_interval",
+ "num_classes": 3,
+ }
+ )
+ # No data => break_values is empty string
+ self.assertEqual(layer.break_values, "")
+ result = layer.get_feature_colors([self.area1.id, self.area2.id])
+ self.assertEqual(result, {})
+
+ def test_get_feature_colors_all_same_values(self):
+ """Test get_feature_colors when all values are the same (breaks is [])."""
+ var_single = self.env["spp.cel.variable"].create(
+ {
+ "name": "test_fc_single",
+ "cel_accessor": "test_fc_single",
+ "label": "Test FC Single",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+ for area in [self.area1, self.area2]:
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area.id,
+ "variable_id": var_single.id,
+ "period_key": "2024-12",
+ "value": 100.0,
+ }
+ )
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Same Values Layer",
+ "variable_id": var_single.id,
+ "period_key": "2024-12",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "equal_interval",
+ "num_classes": 3,
+ }
+ )
+ # All values equal => breaks is "[]", which is truthy
+ self.assertEqual(layer.break_values, "[]")
+ # With 0 breaks => 1 class, all areas get the first color
+ result = layer.get_feature_colors([self.area1.id, self.area2.id])
+ self.assertEqual(len(result), 2)
+ # All should have the same color (first color)
+ colors = list(result.values())
+ self.assertEqual(colors[0], colors[1])
+
+ def test_get_feature_colors_missing_area_not_in_result(self):
+ """Test that areas without indicators are not in the result."""
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-06",
+ "value": 50.0,
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area2.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-06",
+ "value": 200.0,
+ }
+ )
+ # area3 has no indicator for this period/variable
+
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Missing Area Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-06",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "quantile",
+ "num_classes": 2,
+ }
+ )
+
+ area_ids = [self.area1.id, self.area2.id, self.area3.id]
+ result = layer.get_feature_colors(area_ids)
+ # area3 has no indicator => not in result
+ self.assertNotIn(self.area3.id, result)
+ # area1 and area2 should have colors
+ self.assertIn(self.area1.id, result)
+ self.assertIn(self.area2.id, result)
+
+ def test_get_feature_colors_zero_value_not_skipped(self):
+ """Test that indicators with value=0 are NOT skipped."""
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-07",
+ "value": 0.0,
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area2.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-07",
+ "value": 100.0,
+ }
+ )
+
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Zero Value Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-07",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "quantile",
+ "num_classes": 2,
+ }
+ )
+
+ area_ids = [self.area1.id, self.area2.id]
+ result = layer.get_feature_colors(area_ids)
+ # value=0 should NOT be skipped
+ self.assertIn(self.area1.id, result)
+ self.assertIn(self.area2.id, result)
+
+ def test_get_feature_colors_boundary_values(self):
+ """Test boundary value classification in get_feature_colors."""
+ # Create 4 indicators with known values
+ for area, val in [(self.area1, 10.0), (self.area2, 50.0), (self.area3, 90.0), (self.area4, 100.0)]:
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-08",
+ "value": val,
+ }
+ )
+
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Boundary Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-08",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "manual",
+ "manual_breaks": "50,90",
+ }
+ )
+
+ area_ids = [self.area1.id, self.area2.id, self.area3.id, self.area4.id]
+ result = layer.get_feature_colors(area_ids)
+ self.assertEqual(len(result), 4)
+
+ # area1 (10) is < 50 => class 0 => first color
+ # area2 (50) is >= 50 but < 90 => class 1
+ # area3 (90) is >= 90 => class 2
+ # area4 (100) is >= 90 => class 2
+ # Verify different classes get different colors (at least area1 vs area3/area4)
+ self.assertNotEqual(result[self.area1.id], result[self.area4.id])
+ # area3 and area4 should be same class => same color
+ self.assertEqual(result[self.area3.id], result[self.area4.id])
+
+
+@tagged("post_install", "-at_install")
+class TestGetIndicatorValuesEdgeCases(TransactionCase):
+ """Test _get_indicator_values edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_indicator_vals",
+ "cel_accessor": "test_indicator_vals",
+ "label": "Test Indicator Vals",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ cls.area1 = cls.env["spp.area"].create(
+ {
+ "draft_name": "Indicator Val Area 1",
+ "code": "IVA1",
+ }
+ )
+
+ def test_get_indicator_values_with_incident_filter(self):
+ """Test _get_indicator_values with incident_id filter."""
+ # Create a hazard incident
+ hazard_cat = self.env["spp.hazard.category"].create(
+ {
+ "name": "Test Hazard Category",
+ "code": "THC",
+ }
+ )
+ incident = self.env["spp.hazard.incident"].create(
+ {
+ "name": "Test Incident",
+ "code": "TI001",
+ "category_id": hazard_cat.id,
+ "start_date": "2024-01-01",
+ }
+ )
+
+ # Create indicator with incident
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 999.0,
+ "incident_id": incident.id,
+ }
+ )
+
+ # Create indicator without incident
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 111.0,
+ }
+ )
+
+ # Layer WITH incident filter
+ layer_with_incident = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Incident Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "color_scale_id": self.color_scale.id,
+ "incident_id": incident.id,
+ }
+ )
+ values = layer_with_incident._get_indicator_values()
+ self.assertEqual(len(values), 1)
+ self.assertIn(999.0, values)
+ self.assertNotIn(111.0, values)
+
+ def test_get_indicator_values_empty_period_key(self):
+ """Test _get_indicator_values with empty period_key matches all periods."""
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-01",
+ "value": 10.0,
+ }
+ )
+
+ # Layer with empty period_key: the domain won't include period_key filter
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "No Period Layer",
+ "variable_id": self.variable.id,
+ "period_key": "",
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ values = layer._get_indicator_values()
+ # Should find the indicator since no period_key filter
+ self.assertIn(10.0, values)
+
+
+@tagged("post_install", "-at_install")
+class TestComputeBreakValuesEdgeCases(TransactionCase):
+ """Test _compute_break_values edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_break_edge",
+ "cel_accessor": "test_break_edge",
+ "label": "Test Break Edge",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ def test_break_values_no_indicator_data(self):
+ """Test _compute_break_values with no indicator data results in empty break_values."""
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "No Data Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2099-12",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "quantile",
+ }
+ )
+ self.assertEqual(layer.break_values, "")
+
+ def test_break_values_no_variable(self):
+ """Test _compute_break_values when variable_id triggers empty."""
+ # After creation, we check the logic. variable_id is required,
+ # but the compute explicitly checks for it.
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Break No Var Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2099-12",
+ "color_scale_id": self.color_scale.id,
+ }
+ )
+ # Layer with no data -> empty break_values
+ self.assertEqual(layer.break_values, "")
+
+ def test_break_values_unknown_classification_fallback(self):
+ """Test _compute_break_values with data falls back to empty breaks for unknown method."""
+ # Create indicators so values list is non-empty (use different areas to avoid unique constraint)
+ area1 = self.env["spp.area"].create(
+ {
+ "draft_name": "Break Fallback Area 1",
+ "code": "BFA1",
+ }
+ )
+ area2 = self.env["spp.area"].create(
+ {
+ "draft_name": "Break Fallback Area 2",
+ "code": "BFA2",
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 50.0,
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": area2.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "value": 100.0,
+ }
+ )
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Fallback Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-12",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "quantile",
+ "num_classes": 2,
+ }
+ )
+ # Forcefully set classification_method to an unsupported value via SQL
+ # to test the else branch in _compute_break_values
+ self.env.cr.execute(
+ "UPDATE spp_gis_indicator_layer SET classification_method = %s WHERE id = %s",
+ ("unknown_method", layer.id),
+ )
+ layer.invalidate_recordset()
+ # Re-trigger compute
+ layer._compute_break_values()
+ self.assertEqual(layer.break_values, "[]")
+
+
+@tagged("post_install", "-at_install")
+class TestGetColorForValueEdgeCases(TransactionCase):
+ """Test GisColorScale.get_color_for_value edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.diverging_colors = ["#d73027", "#fc8d59", "#ffffbf", "#91bfdb", "#4575b4"]
+ cls.diverging_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Diverging Test",
+ "scale_type": "diverging",
+ "colors_json": json.dumps(cls.diverging_colors),
+ }
+ )
+
+ cls.sequential_colors = ["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]
+ cls.sequential_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Sequential Test",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(cls.sequential_colors),
+ }
+ )
+
+ def test_diverging_min_equals_max(self):
+ """Test diverging scale when min == max returns center color."""
+ color = self.diverging_scale.get_color_for_value(50, 50, 50)
+ center_idx = len(self.diverging_colors) // 2
+ self.assertEqual(color, self.diverging_colors[center_idx])
+
+ def test_diverging_center_equals_min(self):
+ """Test diverging scale when center == min_val."""
+ # center=0, min_val=0, max_val=100, value below center is clamped
+ # value < center but center == min_val => normalized = 0
+ color = self.diverging_scale.get_color_for_value(0, 0, 100, center=0)
+ # value == min_val => colors[0]
+ self.assertEqual(color, self.diverging_colors[0])
+
+ def test_diverging_center_equals_max(self):
+ """Test diverging scale when max_val == center."""
+ # center=100, min_val=0, max_val=100
+ # value > center is impossible since max_val == center
+ # value at center should map to upper half with normalized=1
+ color = self.diverging_scale.get_color_for_value(50, 0, 100, center=100)
+ # 50 < center (100), so lower half: normalized = (50 - 0) / (100 - 0) = 0.5
+ self.assertIn(color, self.diverging_colors)
+
+ def test_sequential_exact_boundary(self):
+ """Test sequential scale at exact boundary values."""
+ # Test at 0 (min)
+ color_min = self.sequential_scale.get_color_for_value(0, 0, 100)
+ self.assertEqual(color_min, self.sequential_colors[0])
+
+ # Test at 100 (max)
+ color_max = self.sequential_scale.get_color_for_value(100, 0, 100)
+ self.assertEqual(color_max, self.sequential_colors[-1])
+
+ # Test at exactly 25% (first quarter)
+ color_quarter = self.sequential_scale.get_color_for_value(25, 0, 100)
+ self.assertIn(color_quarter, self.sequential_colors)
+
+ def test_sequential_value_slightly_above_min(self):
+ """Test sequential scale with value just above min."""
+ color = self.sequential_scale.get_color_for_value(1, 0, 100)
+ # Should be first or second color
+ self.assertIn(color, self.sequential_colors[:2])
+
+
+@tagged("post_install", "-at_install")
+class TestGetChoroplethConfigEdgeCases(TransactionCase):
+ """Test GisDataLayerIndicator._get_choropleth_config edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.DataLayer = cls.env["spp.gis.data.layer"]
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_choropleth_cfg",
+ "cel_accessor": "test_choropleth_cfg",
+ "label": "Test Choropleth Config",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ cls.indicator_layer = cls.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Test Indicator Config",
+ "variable_id": cls.variable.id,
+ "color_scale_id": cls.color_scale.id,
+ "classification_method": "quantile",
+ "num_classes": 5,
+ }
+ )
+
+ # Find a geo field for testing
+ geo_field = cls.env["ir.model.fields"].search(
+ [
+ ("model", "=", "spp.area"),
+ ("ttype", "=", "geo_polygon"),
+ ],
+ limit=1,
+ )
+ cls.geo_field_id = geo_field.id if geo_field else False
+
+ cls.gis_view = cls.env["ir.ui.view"].search(
+ [("type", "=", "spp_gis")],
+ limit=1,
+ )
+
+ def _skip_if_no_gis_prereqs(self):
+ """Skip test if GIS prerequisites are missing."""
+ if not self.gis_view:
+ self.skipTest("No GIS view available for testing")
+ if not self.geo_field_id:
+ self.skipTest("No geo_polygon field available for testing")
+
+ def test_choropleth_config_indicator_no_color_scale_colors(self):
+ """Test _get_choropleth_config when indicator has color_scale but empty colors."""
+ self._skip_if_no_gis_prereqs()
+
+ # Create a color scale and then clear its colors
+ empty_color_scale = self.env["spp.gis.color.scale"].create(
+ {
+ "name": "Temp Scale",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#000", "#fff"]),
+ }
+ )
+ indicator_layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "No Colors Indicator",
+ "variable_id": self.variable.id,
+ "color_scale_id": empty_color_scale.id,
+ }
+ )
+
+ layer = self.DataLayer.create(
+ {
+ "name": "Config Test Layer",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "choropleth",
+ "indicator_layer_id": indicator_layer.id,
+ }
+ )
+
+ config = layer._get_choropleth_config()
+ self.assertIsNotNone(config)
+ self.assertEqual(config["type"], "indicator")
+ # color_ramp should have colors from the scale
+ self.assertEqual(config["color_ramp"], ["#000", "#fff"])
+
+ def test_choropleth_config_with_empty_break_values(self):
+ """Test _get_choropleth_config when break_values is empty."""
+ self._skip_if_no_gis_prereqs()
+
+ # indicator_layer has no data, so break_values is ""
+ layer = self.DataLayer.create(
+ {
+ "name": "Empty Breaks Config",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "choropleth",
+ "indicator_layer_id": self.indicator_layer.id,
+ }
+ )
+
+ config = layer._get_choropleth_config()
+ self.assertIsNotNone(config)
+ self.assertEqual(config["type"], "indicator")
+ # break_values is empty (""), json.loads("" or "[]") => []
+ self.assertEqual(config["break_values"], [])
+
+ def test_choropleth_config_non_choropleth_returns_none(self):
+ """Test that _get_choropleth_config returns None for non-choropleth layers."""
+ self._skip_if_no_gis_prereqs()
+
+ layer = self.DataLayer.create(
+ {
+ "name": "Basic Layer Config",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "basic",
+ }
+ )
+
+ config = layer._get_choropleth_config()
+ self.assertIsNone(config)
+
+
+@tagged("post_install", "-at_install")
+class TestCheckColorsJsonEdgeCases(TransactionCase):
+ """Test GisColorScale._check_colors_json edge cases."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.ColorScale = cls.env["spp.gis.color.scale"]
+
+ def test_empty_string_colors_json(self):
+ """Test _check_colors_json with empty string passes (skipped by constraint)."""
+ scale = self.ColorScale.create(
+ {
+ "name": "Empty Test",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#000", "#fff"]),
+ }
+ )
+ # Setting colors_json to empty string should pass the constraint
+ # (the constraint has `if not rec.colors_json: continue`)
+ scale.colors_json = ""
+ # No error raised means constraint passes for empty
+
+ def test_short_hex_format_accepted(self):
+ """Test that short hex format (#fff) is accepted by constraint."""
+ scale = self.ColorScale.create(
+ {
+ "name": "Short Hex Test",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#fff", "#000", "#abc"]),
+ }
+ )
+ colors = scale.get_colors()
+ self.assertEqual(colors, ["#fff", "#000", "#abc"])
+
+ def test_mixed_short_long_hex(self):
+ """Test that mix of short and long hex formats is accepted."""
+ scale = self.ColorScale.create(
+ {
+ "name": "Mixed Hex Test",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#fff", "#ff0000"]),
+ }
+ )
+ colors = scale.get_colors()
+ self.assertEqual(len(colors), 2)
+
+ def test_invalid_length_hex_rejected(self):
+ """Test that hex colors with wrong length are rejected."""
+ with self.assertRaises(ValidationError) as ctx:
+ self.ColorScale.create(
+ {
+ "name": "Bad Length",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#ff00", "#ffffff"]),
+ }
+ )
+ self.assertIn("Invalid hex color format", str(ctx.exception))
+
+
+@tagged("post_install", "-at_install")
+class TestDataLayerMethodsDirect(TransactionCase):
+ """Test data_layer.py methods using new() to avoid GIS prereq dependencies."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.DataLayer = cls.env["spp.gis.data.layer"]
+ cls.ColorScale = cls.env["spp.gis.color.scale"]
+ cls.IndicatorLayer = cls.env["spp.gis.indicator.layer"]
+
+ cls.color_scale = cls.ColorScale.create(
+ {
+ "name": "Direct Test Blues",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_direct_dl",
+ "cel_accessor": "test_direct_dl",
+ "label": "Test Direct DL",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ cls.indicator_layer = cls.IndicatorLayer.create(
+ {
+ "name": "Direct Test Indicator",
+ "variable_id": cls.variable.id,
+ "color_scale_id": cls.color_scale.id,
+ "classification_method": "quantile",
+ "num_classes": 5,
+ }
+ )
+
+ def test_get_choropleth_config_non_choropleth_new(self):
+ """Test _get_choropleth_config returns None for non-choropleth using new()."""
+ rec = self.DataLayer.new({"geo_repr": "basic"})
+ config = rec._get_choropleth_config()
+ self.assertIsNone(config)
+
+ def test_get_choropleth_config_indicator_new(self):
+ """Test _get_choropleth_config with indicator_layer_id using new()."""
+ rec = self.DataLayer.new(
+ {
+ "geo_repr": "choropleth",
+ "indicator_layer_id": self.indicator_layer.id,
+ }
+ )
+ config = rec._get_choropleth_config()
+ self.assertIsNotNone(config)
+ self.assertEqual(config["type"], "indicator")
+ self.assertEqual(config["classification"], "quantile")
+ self.assertEqual(config["class_count"], 5)
+ self.assertTrue(config["show_legend"])
+ self.assertEqual(config["legend_title"], "Direct Test Indicator")
+ self.assertIsInstance(config["color_ramp"], list)
+ self.assertIsInstance(config["break_values"], list)
+ self.assertIn("legend_html", config)
+
+ def test_get_choropleth_config_super_fallback_new(self):
+ """Test _get_choropleth_config falls back to super() when no indicator."""
+ rec = self.DataLayer.new({"geo_repr": "choropleth"})
+ # No indicator_layer_id → falls through to super()._get_choropleth_config()
+ # super() returns None since no choropleth_field_id either
+ config = rec._get_choropleth_config()
+ self.assertIsNone(config)
+
+ def test_check_choropleth_config_valid_indicator(self):
+ """Test _check_choropleth_config passes with indicator_layer_id."""
+ rec = self.DataLayer.new(
+ {
+ "geo_repr": "choropleth",
+ "indicator_layer_id": self.indicator_layer.id,
+ }
+ )
+ # Should not raise
+ rec._check_choropleth_config()
+
+ def test_check_choropleth_config_valid_basic(self):
+ """Test _check_choropleth_config passes for non-choropleth."""
+ rec = self.DataLayer.new({"geo_repr": "basic"})
+ # Should not raise
+ rec._check_choropleth_config()
+
+ def test_check_choropleth_config_invalid(self):
+ """Test _check_choropleth_config raises for choropleth without config."""
+ rec = self.DataLayer.new({"geo_repr": "choropleth"})
+ with self.assertRaises(ValidationError):
+ rec._check_choropleth_config()
+
+
+@tagged("post_install", "-at_install")
+class TestGetFeatureColorsNoneValue(TransactionCase):
+ """Test get_feature_colors with None/False indicator values."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls.color_scale = cls.env["spp.gis.color.scale"].create(
+ {
+ "name": "Test None Val",
+ "scale_type": "sequential",
+ "colors_json": json.dumps(["#f7fbff", "#6baed6", "#08306b"]),
+ }
+ )
+
+ cls.variable = cls.env["spp.cel.variable"].create(
+ {
+ "name": "test_none_val",
+ "cel_accessor": "test_none_val",
+ "label": "Test None Val",
+ "value_type": "number",
+ "source_type": "constant",
+ }
+ )
+
+ cls.area1 = cls.env["spp.area"].create({"draft_name": "None Val Area 1", "code": "NVA1"})
+ cls.area2 = cls.env["spp.area"].create({"draft_name": "None Val Area 2", "code": "NVA2"})
+ cls.area3 = cls.env["spp.area"].create({"draft_name": "None Val Area 3", "code": "NVA3"})
+
+ def test_get_feature_colors_with_zero_and_positive(self):
+ """Test get_feature_colors includes both zero and positive values."""
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area1.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-none",
+ "value": 50.0,
+ }
+ )
+ self.env["spp.hxl.area.indicator"].create(
+ {
+ "area_id": self.area2.id,
+ "variable_id": self.variable.id,
+ "period_key": "2024-none",
+ "value": 0.0,
+ }
+ )
+
+ layer = self.env["spp.gis.indicator.layer"].create(
+ {
+ "name": "Zero Pos Test Layer",
+ "variable_id": self.variable.id,
+ "period_key": "2024-none",
+ "color_scale_id": self.color_scale.id,
+ "classification_method": "manual",
+ "manual_breaks": "25",
+ }
+ )
+
+ area_ids = [self.area1.id, self.area2.id, self.area3.id]
+ result = layer.get_feature_colors(area_ids)
+ # area1 (50.0) and area2 (0.0) should have colors
+ self.assertIn(self.area1.id, result)
+ self.assertIn(self.area2.id, result)
+ # area3 has no indicator data
+ self.assertNotIn(self.area3.id, result)
diff --git a/spp_gis_indicators/tests/test_data_layer.py b/spp_gis_indicators/tests/test_data_layer.py
index 760c8a4e..d4f18b7d 100644
--- a/spp_gis_indicators/tests/test_data_layer.py
+++ b/spp_gis_indicators/tests/test_data_layer.py
@@ -2,6 +2,7 @@
import json
+from odoo.exceptions import ValidationError
from odoo.tests import tagged
from odoo.tests.common import TransactionCase
@@ -51,7 +52,7 @@ def setUpClass(cls):
}
)
- # Find or create a geo field for testing
+ # Find a geo field for testing
geo_field = cls.env["ir.model.fields"].search(
[
("model", "=", "spp.area"),
@@ -60,11 +61,20 @@ def setUpClass(cls):
limit=1,
)
- if not geo_field:
- # Create a mock data layer without geo_field
- cls.geo_field_id = False
- else:
- cls.geo_field_id = geo_field.id
+ cls.geo_field_id = geo_field.id if geo_field else False
+
+ # Find a GIS view for testing
+ cls.gis_view = cls.env["ir.ui.view"].search(
+ [("type", "=", "spp_gis")],
+ limit=1,
+ )
+
+ def _skip_if_no_gis_prereqs(self):
+ """Skip test if GIS prerequisites are missing."""
+ if not self.gis_view:
+ self.skipTest("No GIS view available for testing")
+ if not self.geo_field_id:
+ self.skipTest("No geo_polygon field available for testing")
def test_geo_repr_choropleth_option(self):
"""Test that choropleth is available as geo_repr option."""
@@ -78,22 +88,14 @@ def test_geo_repr_choropleth_option(self):
def test_create_data_layer_with_choropleth(self):
"""Test creating a data layer with choropleth representation."""
- # Get any available GIS view
- gis_view = self.env["ir.ui.view"].search(
- [
- ("type", "=", "spp_gis"),
- ],
- limit=1,
- )
-
- if not gis_view:
- self.skipTest("No GIS view available for testing")
+ self._skip_if_no_gis_prereqs()
# Try to create data layer
layer = self.DataLayer.create(
{
"name": "Test Choropleth Layer",
- "view_id": gis_view.id,
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
"geo_repr": "choropleth",
"indicator_layer_id": self.indicator_layer.id,
}
@@ -111,21 +113,14 @@ def test_indicator_layer_field_exists(self):
def test_data_layer_without_indicator(self):
"""Test that data layer can be created without indicator."""
- gis_view = self.env["ir.ui.view"].search(
- [
- ("type", "=", "spp_gis"),
- ],
- limit=1,
- )
-
- if not gis_view:
- self.skipTest("No GIS view available for testing")
+ self._skip_if_no_gis_prereqs()
# Create without indicator_layer_id
layer = self.DataLayer.create(
{
"name": "Non-Choropleth Layer",
- "view_id": gis_view.id,
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
"geo_repr": "basic",
}
)
@@ -135,20 +130,13 @@ def test_data_layer_without_indicator(self):
def test_change_geo_repr_to_choropleth(self):
"""Test changing geo_repr to choropleth after creation."""
- gis_view = self.env["ir.ui.view"].search(
- [
- ("type", "=", "spp_gis"),
- ],
- limit=1,
- )
-
- if not gis_view:
- self.skipTest("No GIS view available for testing")
+ self._skip_if_no_gis_prereqs()
layer = self.DataLayer.create(
{
"name": "Changeable Layer",
- "view_id": gis_view.id,
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
}
)
@@ -162,3 +150,53 @@ def test_change_geo_repr_to_choropleth(self):
self.assertEqual(layer.geo_repr, "choropleth")
self.assertEqual(layer.indicator_layer_id, self.indicator_layer)
+
+ def test_choropleth_requires_config(self):
+ """Test that choropleth without any config raises ValidationError."""
+ self._skip_if_no_gis_prereqs()
+
+ with self.assertRaises(ValidationError):
+ self.DataLayer.create(
+ {
+ "name": "Invalid Choropleth",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "choropleth",
+ }
+ )
+
+ def test_get_choropleth_config_indicator(self):
+ """Test _get_choropleth_config returns indicator config."""
+ self._skip_if_no_gis_prereqs()
+
+ layer = self.DataLayer.create(
+ {
+ "name": "Indicator Choropleth",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "choropleth",
+ "indicator_layer_id": self.indicator_layer.id,
+ }
+ )
+
+ config = layer._get_choropleth_config()
+ self.assertIsNotNone(config)
+ self.assertEqual(config["type"], "indicator")
+ self.assertEqual(config["classification"], "quantile")
+ self.assertEqual(config["class_count"], 5)
+
+ def test_get_choropleth_config_basic(self):
+ """Test _get_choropleth_config returns None for basic layers."""
+ self._skip_if_no_gis_prereqs()
+
+ layer = self.DataLayer.create(
+ {
+ "name": "Basic Layer",
+ "view_id": self.gis_view.id,
+ "geo_field_id": self.geo_field_id,
+ "geo_repr": "basic",
+ }
+ )
+
+ config = layer._get_choropleth_config()
+ self.assertIsNone(config)
diff --git a/spp_gis_indicators/views/color_scale_views.xml b/spp_gis_indicators/views/color_scale_views.xml
index 0459f03b..07bc3f08 100644
--- a/spp_gis_indicators/views/color_scale_views.xml
+++ b/spp_gis_indicators/views/color_scale_views.xml
@@ -94,7 +94,7 @@
Color Scales
gis-color-scales
spp.gis.color.scale
- tree,form
+ list,form
{'search_default_active': 1}
diff --git a/spp_gis_indicators/views/indicator_layer_views.xml b/spp_gis_indicators/views/indicator_layer_views.xml
index 107e7dab..cbb66749 100644
--- a/spp_gis_indicators/views/indicator_layer_views.xml
+++ b/spp_gis_indicators/views/indicator_layer_views.xml
@@ -141,7 +141,7 @@
Indicator Layers
gis-indicator-layers
spp.gis.indicator.layer
- tree,form
+ list,form
{'search_default_active': 1}