|
9 | 9 |
|
10 | 10 | from odoo.exceptions import ValidationError |
11 | 11 | from odoo.tests import tagged |
| 12 | +from odoo.tests.common import TransactionCase |
12 | 13 |
|
13 | 14 | from .common import AnalyticsTestCase |
14 | 15 |
|
@@ -489,3 +490,332 @@ def test_check_scope_allowed_no_rule(self): |
489 | 490 | # Should not raise -- no rule means default allow |
490 | 491 | scope_dict = {"scope_type": "explicit", "explicit_partner_ids": self.registrants[:3].ids} |
491 | 492 | service._check_scope_allowed(scope_dict) |
| 493 | + |
| 494 | + |
| 495 | +@tagged("post_install", "-at_install") |
| 496 | +class TestAnalyticsScopeActions(AnalyticsTestCase): |
| 497 | + """Tests for action_preview_registrants, action_refresh_cache, and _check_area_tags.""" |
| 498 | + |
| 499 | + def test_action_preview_registrants_returns_act_window(self): |
| 500 | + """action_preview_registrants must return an ir.actions.act_window dict.""" |
| 501 | + scope = self.create_scope( |
| 502 | + "explicit", |
| 503 | + explicit_partner_ids=[(6, 0, self.registrants[:5].ids)], |
| 504 | + ) |
| 505 | + action = scope.action_preview_registrants() |
| 506 | + self.assertEqual(action["type"], "ir.actions.act_window") |
| 507 | + self.assertEqual(action["res_model"], "res.partner") |
| 508 | + self.assertIn("list,form", action["view_mode"]) |
| 509 | + |
| 510 | + def test_action_preview_registrants_domain_contains_ids(self): |
| 511 | + """action_preview_registrants domain must restrict to scope's registrant IDs.""" |
| 512 | + partner_ids = self.registrants[:3].ids |
| 513 | + scope = self.create_scope( |
| 514 | + "explicit", |
| 515 | + explicit_partner_ids=[(6, 0, partner_ids)], |
| 516 | + ) |
| 517 | + action = scope.action_preview_registrants() |
| 518 | + domain = action["domain"] |
| 519 | + # The domain should contain an 'id in [...]' filter |
| 520 | + self.assertTrue(any(condition[0] == "id" for condition in domain)) |
| 521 | + id_condition = next(c for c in domain if c[0] == "id") |
| 522 | + self.assertEqual(set(id_condition[2]), set(partner_ids)) |
| 523 | + |
| 524 | + def test_action_preview_registrants_name_includes_scope_name(self): |
| 525 | + """action_preview_registrants name must mention the scope name.""" |
| 526 | + scope = self.create_scope( |
| 527 | + "explicit", |
| 528 | + name="My Preview Scope", |
| 529 | + explicit_partner_ids=[(6, 0, self.registrants[:2].ids)], |
| 530 | + ) |
| 531 | + action = scope.action_preview_registrants() |
| 532 | + self.assertIn("My Preview Scope", action["name"]) |
| 533 | + |
| 534 | + def test_action_preview_registrants_context_disables_create_delete(self): |
| 535 | + """action_preview_registrants context must set create and delete to False.""" |
| 536 | + scope = self.create_scope( |
| 537 | + "explicit", |
| 538 | + explicit_partner_ids=[(6, 0, self.registrants[:2].ids)], |
| 539 | + ) |
| 540 | + action = scope.action_preview_registrants() |
| 541 | + self.assertFalse(action["context"].get("create")) |
| 542 | + self.assertFalse(action["context"].get("delete")) |
| 543 | + |
| 544 | + def test_action_refresh_cache_returns_true(self): |
| 545 | + """action_refresh_cache must return True.""" |
| 546 | + scope = self.create_scope( |
| 547 | + "explicit", |
| 548 | + explicit_partner_ids=[(6, 0, self.registrants[:5].ids)], |
| 549 | + ) |
| 550 | + result = scope.action_refresh_cache() |
| 551 | + self.assertTrue(result) |
| 552 | + |
| 553 | + def test_action_refresh_cache_updates_last_cache_refresh(self): |
| 554 | + """action_refresh_cache must write last_cache_refresh timestamp.""" |
| 555 | + scope = self.create_scope( |
| 556 | + "explicit", |
| 557 | + explicit_partner_ids=[(6, 0, self.registrants[:5].ids)], |
| 558 | + ) |
| 559 | + self.assertFalse(scope.last_cache_refresh) |
| 560 | + scope.action_refresh_cache() |
| 561 | + self.assertTrue(scope.last_cache_refresh) |
| 562 | + |
| 563 | + def test_check_area_tags_raises_when_no_tags(self): |
| 564 | + """_check_area_tags must raise ValidationError when area_tag scope has no tags.""" |
| 565 | + with self.assertRaises(ValidationError): |
| 566 | + self.create_scope("area_tag") |
| 567 | + |
| 568 | + def test_check_area_tags_passes_with_tags(self): |
| 569 | + """_check_area_tags must not raise when area_tag scope has tags.""" |
| 570 | + scope = self.create_scope( |
| 571 | + "area_tag", |
| 572 | + area_tag_ids=[(6, 0, [self.tag_urban.id])], |
| 573 | + ) |
| 574 | + self.assertTrue(scope.id) |
| 575 | + |
| 576 | + def test_check_area_tags_raises_when_tags_cleared(self): |
| 577 | + """_check_area_tags must raise if area_tags are cleared after creation.""" |
| 578 | + scope = self.create_scope( |
| 579 | + "area_tag", |
| 580 | + area_tag_ids=[(6, 0, [self.tag_urban.id])], |
| 581 | + ) |
| 582 | + with self.assertRaises(ValidationError): |
| 583 | + scope.write({"area_tag_ids": [(5, 0, 0)]}) |
| 584 | + |
| 585 | + |
| 586 | +@tagged("post_install", "-at_install") |
| 587 | +class TestAggregationServiceInternals(AnalyticsTestCase): |
| 588 | + """Tests for internal methods of spp.analytics.service.""" |
| 589 | + |
| 590 | + @classmethod |
| 591 | + def setUpClass(cls): |
| 592 | + super().setUpClass() |
| 593 | + cls.service = cls.env["spp.analytics.service"] |
| 594 | + |
| 595 | + def test_get_effective_rule_returns_none_when_no_rule(self): |
| 596 | + """_get_effective_rule must return None (falsy) when no rule matches.""" |
| 597 | + user_no_rule = self.env["res.users"].create( |
| 598 | + { |
| 599 | + "name": "Effective Rule Test User", |
| 600 | + "login": "effective_rule_test_user", |
| 601 | + "email": "effectiverule@test.com", |
| 602 | + } |
| 603 | + ) |
| 604 | + service = self.service.with_user(user_no_rule) |
| 605 | + rule = service._get_effective_rule() |
| 606 | + self.assertFalse(rule) |
| 607 | + |
| 608 | + def test_get_effective_rule_returns_user_rule(self): |
| 609 | + """_get_effective_rule must return the matching rule for the current user.""" |
| 610 | + test_user = self.env["res.users"].create( |
| 611 | + { |
| 612 | + "name": "Rule Test User", |
| 613 | + "login": "rule_test_user2", |
| 614 | + "email": "ruletest2@test.com", |
| 615 | + } |
| 616 | + ) |
| 617 | + created_rule = self.env["spp.analytics.access.rule"].create( |
| 618 | + { |
| 619 | + "name": "Effective Rule For User", |
| 620 | + "access_level": "individual", |
| 621 | + "user_id": test_user.id, |
| 622 | + } |
| 623 | + ) |
| 624 | + service = self.service.with_user(test_user) |
| 625 | + rule = service._get_effective_rule() |
| 626 | + self.assertTrue(rule) |
| 627 | + self.assertEqual(rule.id, created_rule.id) |
| 628 | + |
| 629 | + def test_access_level_from_rule_returns_rule_level(self): |
| 630 | + """_access_level_from_rule must return the rule's access_level when rule present.""" |
| 631 | + rule = self.create_access_rule("individual", user_id=self.env.user.id, group_id=False) |
| 632 | + level = self.service._access_level_from_rule(rule) |
| 633 | + self.assertEqual(level, "individual") |
| 634 | + |
| 635 | + def test_access_level_from_rule_defaults_to_aggregate_when_none(self): |
| 636 | + """_access_level_from_rule must return 'aggregate' when rule is None/falsy.""" |
| 637 | + level = self.service._access_level_from_rule(None) |
| 638 | + self.assertEqual(level, "aggregate") |
| 639 | + |
| 640 | + def test_k_threshold_from_rule_returns_rule_threshold(self): |
| 641 | + """_k_threshold_from_rule must return minimum_k_anonymity from rule.""" |
| 642 | + rule = self.create_access_rule( |
| 643 | + "aggregate", |
| 644 | + user_id=self.env.user.id, |
| 645 | + group_id=False, |
| 646 | + minimum_k_anonymity=7, |
| 647 | + ) |
| 648 | + threshold = self.service._k_threshold_from_rule(rule) |
| 649 | + self.assertEqual(threshold, 7) |
| 650 | + |
| 651 | + def test_k_threshold_from_rule_uses_privacy_default_when_none(self): |
| 652 | + """_k_threshold_from_rule must fall back to privacy service default when no rule.""" |
| 653 | + threshold = self.service._k_threshold_from_rule(None) |
| 654 | + privacy_default = self.env["spp.metric.privacy"].DEFAULT_K_THRESHOLD |
| 655 | + self.assertEqual(threshold, privacy_default) |
| 656 | + |
| 657 | + def test_compute_single_statistic_delegates_to_registry(self): |
| 658 | + """_compute_single_statistic must delegate to indicator registry.""" |
| 659 | + ids = self.registrants[:5].ids |
| 660 | + result = self.service._compute_single_statistic("count", ids) |
| 661 | + self.assertEqual(result, 5) |
| 662 | + |
| 663 | + def test_compute_single_statistic_returns_none_for_unknown(self): |
| 664 | + """_compute_single_statistic must return None for an unknown statistic.""" |
| 665 | + result = self.service._compute_single_statistic("unknown_stat_xyz", self.registrants[:3].ids) |
| 666 | + self.assertIsNone(result) |
| 667 | + |
| 668 | + def test_compute_breakdown_returns_dict(self): |
| 669 | + """_compute_breakdown must return a dict with breakdown cells.""" |
| 670 | + ids = self.registrants.ids |
| 671 | + result = self.service._compute_breakdown(ids, ["registrant_type"], None, None) |
| 672 | + self.assertIsInstance(result, dict) |
| 673 | + self.assertGreater(len(result), 0) |
| 674 | + |
| 675 | + def test_compute_breakdown_with_statistics(self): |
| 676 | + """_compute_breakdown with statistics must include stats per cell.""" |
| 677 | + ids = self.registrants.ids |
| 678 | + result = self.service._compute_breakdown(ids, ["registrant_type"], ["count"], None) |
| 679 | + self.assertIsInstance(result, dict) |
| 680 | + # Each cell should have a count key |
| 681 | + for _cell_key, cell in result.items(): |
| 682 | + self.assertIn("count", cell) |
| 683 | + |
| 684 | + |
| 685 | +@tagged("post_install", "-at_install") |
| 686 | +class TestScopeResolverCelMethods(AnalyticsTestCase): |
| 687 | + """Tests for CEL resolution methods in spp.analytics.scope.resolver.""" |
| 688 | + |
| 689 | + @classmethod |
| 690 | + def setUpClass(cls): |
| 691 | + super().setUpClass() |
| 692 | + cls.resolver = cls.env["spp.analytics.scope.resolver"] |
| 693 | + |
| 694 | + def test_resolve_cel_record_without_executor_returns_empty(self): |
| 695 | + """_resolve_cel with a CEL scope record returns empty when executor unavailable.""" |
| 696 | + scope = self.create_scope( |
| 697 | + "cel", |
| 698 | + cel_expression="r.is_group == false", |
| 699 | + cel_profile="registry_individuals", |
| 700 | + ) |
| 701 | + # When spp.cel.executor is not installed the resolver logs an error and returns [] |
| 702 | + if self.env.get("spp.cel.executor") is None: |
| 703 | + result = self.resolver._resolve_cel(scope) |
| 704 | + self.assertEqual(result, []) |
| 705 | + else: |
| 706 | + # Module IS installed; just check it returns a list |
| 707 | + result = self.resolver._resolve_cel(scope) |
| 708 | + self.assertIsInstance(result, list) |
| 709 | + |
| 710 | + def test_resolve_cel_record_empty_expression_returns_empty(self): |
| 711 | + """_resolve_cel_expression with empty string must return empty list.""" |
| 712 | + result = self.resolver._resolve_cel_expression("", "registry_individuals") |
| 713 | + self.assertEqual(result, []) |
| 714 | + |
| 715 | + def test_resolve_cel_inline_without_executor_returns_empty(self): |
| 716 | + """_resolve_cel_inline with an inline CEL scope returns empty when executor unavailable.""" |
| 717 | + scope_dict = { |
| 718 | + "scope_type": "cel", |
| 719 | + "cel_expression": "r.is_group == false", |
| 720 | + "cel_profile": "registry_individuals", |
| 721 | + } |
| 722 | + if self.env.get("spp.cel.executor") is None: |
| 723 | + result = self.resolver._resolve_cel_inline(scope_dict) |
| 724 | + self.assertEqual(result, []) |
| 725 | + else: |
| 726 | + result = self.resolver._resolve_cel_inline(scope_dict) |
| 727 | + self.assertIsInstance(result, list) |
| 728 | + |
| 729 | + def test_resolve_cel_inline_missing_expression_returns_empty(self): |
| 730 | + """_resolve_cel_inline with missing cel_expression returns empty list.""" |
| 731 | + scope_dict = { |
| 732 | + "scope_type": "cel", |
| 733 | + # No cel_expression key |
| 734 | + } |
| 735 | + result = self.resolver._resolve_cel_inline(scope_dict) |
| 736 | + self.assertEqual(result, []) |
| 737 | + |
| 738 | + def test_resolve_dispatches_to_resolve_cel(self): |
| 739 | + """resolve() on a CEL record scope must call the CEL resolver, not raise.""" |
| 740 | + scope = self.create_scope( |
| 741 | + "cel", |
| 742 | + cel_expression="r.is_group == false", |
| 743 | + ) |
| 744 | + # Should not raise; returns a list (possibly empty if executor missing) |
| 745 | + result = self.resolver.resolve(scope) |
| 746 | + self.assertIsInstance(result, list) |
| 747 | + |
| 748 | + |
| 749 | +@tagged("post_install", "-at_install") |
| 750 | +class TestIndicatorRegistryInternals(TransactionCase): |
| 751 | + """Tests for internal dispatch methods of spp.analytics.indicator.registry.""" |
| 752 | + |
| 753 | + @classmethod |
| 754 | + def setUpClass(cls): |
| 755 | + super().setUpClass() |
| 756 | + cls.indicator_registry = cls.env["spp.analytics.indicator.registry"] |
| 757 | + cls.registrants = cls.env["res.partner"] |
| 758 | + for i in range(10): |
| 759 | + cls.registrants |= cls.env["res.partner"].create( |
| 760 | + { |
| 761 | + "name": f"Registry Internals Test {i}", |
| 762 | + "is_registrant": True, |
| 763 | + } |
| 764 | + ) |
| 765 | + |
| 766 | + def test_get_builtin_returns_method_for_count(self): |
| 767 | + """_get_builtin('count') must return a callable.""" |
| 768 | + method = self.indicator_registry._get_builtin("count") |
| 769 | + self.assertIsNotNone(method) |
| 770 | + self.assertTrue(callable(method)) |
| 771 | + |
| 772 | + def test_get_builtin_returns_method_for_gini(self): |
| 773 | + """_get_builtin('gini') must return a callable.""" |
| 774 | + method = self.indicator_registry._get_builtin("gini") |
| 775 | + self.assertIsNotNone(method) |
| 776 | + self.assertTrue(callable(method)) |
| 777 | + |
| 778 | + def test_get_builtin_returns_method_for_gini_coefficient_alias(self): |
| 779 | + """_get_builtin('gini_coefficient') must return a callable (alias).""" |
| 780 | + method = self.indicator_registry._get_builtin("gini_coefficient") |
| 781 | + self.assertIsNotNone(method) |
| 782 | + self.assertTrue(callable(method)) |
| 783 | + |
| 784 | + def test_get_builtin_returns_none_for_unknown(self): |
| 785 | + """_get_builtin must return None for an unregistered statistic name.""" |
| 786 | + method = self.indicator_registry._get_builtin("no_such_stat") |
| 787 | + self.assertIsNone(method) |
| 788 | + |
| 789 | + def test_get_builtin_count_callable_returns_correct_count(self): |
| 790 | + """The callable returned by _get_builtin('count') must compute correctly.""" |
| 791 | + method = self.indicator_registry._get_builtin("count") |
| 792 | + result = method(self.registrants[:7].ids) |
| 793 | + self.assertEqual(result, 7) |
| 794 | + |
| 795 | + def test_try_statistic_model_returns_none_when_model_absent(self): |
| 796 | + """_try_statistic_model returns None when spp.indicator is not installed.""" |
| 797 | + if self.env.get("spp.indicator") is not None: |
| 798 | + self.skipTest("spp_indicator is installed; testing absence is not possible") |
| 799 | + result = self.indicator_registry._try_statistic_model("some_stat", self.registrants.ids) |
| 800 | + self.assertIsNone(result) |
| 801 | + |
| 802 | + def test_try_statistic_model_returns_none_for_missing_stat(self): |
| 803 | + """_try_statistic_model returns None when named statistic record does not exist.""" |
| 804 | + if self.env.get("spp.indicator") is None: |
| 805 | + self.skipTest("spp_indicator not installed") |
| 806 | + result = self.indicator_registry._try_statistic_model("nonexistent_stat_abc", self.registrants.ids) |
| 807 | + self.assertIsNone(result) |
| 808 | + |
| 809 | + def test_try_variable_model_returns_none_when_model_absent(self): |
| 810 | + """_try_variable_model returns None when spp.cel.variable is not installed.""" |
| 811 | + if self.env.get("spp.cel.variable") is not None: |
| 812 | + self.skipTest("spp_cel is installed; testing absence is not possible") |
| 813 | + result = self.indicator_registry._try_variable_model("some_var", self.registrants.ids) |
| 814 | + self.assertIsNone(result) |
| 815 | + |
| 816 | + def test_try_variable_model_returns_none_for_missing_variable(self): |
| 817 | + """_try_variable_model returns None when named variable record does not exist.""" |
| 818 | + if self.env.get("spp.cel.variable") is None: |
| 819 | + self.skipTest("spp_cel not installed") |
| 820 | + result = self.indicator_registry._try_variable_model("nonexistent_variable_xyz", self.registrants.ids) |
| 821 | + self.assertIsNone(result) |
0 commit comments