Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
fixed:
- Fail closed when auth is enabled without Auth0 configuration and require auth on calculate_demo.
1 change: 1 addition & 0 deletions policyengine_household_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def readiness_check():


@app.route("/<country_id>/calculate_demo", methods=["POST"])
@require_auth_if_enabled()
@limiter.limit("1 per second")
def calculate_demo(country_id):
return get_calculate(country_id)
Expand Down
56 changes: 50 additions & 6 deletions policyengine_household_api/decorators/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,45 @@

from typing import Optional, Any, Callable
from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6750 import BearerTokenValidator
from ..auth.validation import Auth0JWTBearerTokenValidator
from ..utils.config_loader import get_config, get_config_value
from ..utils.config_loader import get_config_value


class AuthConfigurationError(RuntimeError):
"""Raised when authentication is enabled but required config is missing."""


class StaticBearerToken:
"""Minimal token object for test-only bearer token validation."""

def __init__(self, token_string: str, scope: str = ""):
self.token_string = token_string
self.scope = scope

def is_expired(self) -> bool:
return False

def is_revoked(self) -> bool:
return False

def get_scope(self) -> str:
return self.scope


class StaticBearerTokenValidator(BearerTokenValidator):
"""Accept a single configured bearer token for test environments."""

def __init__(self, expected_token: str):
super().__init__()
self.expected_token = expected_token

def authenticate_token(
self, token_string: Optional[str]
) -> Optional[StaticBearerToken]:
if token_string == self.expected_token:
return StaticBearerToken(token_string)
return None


class NoOpDecorator:
Expand Down Expand Up @@ -63,14 +100,22 @@ def _setup_authentication(self) -> None:
"""
# Check if Auth0 is explicitly enabled via configuration
self._auth_enabled = get_config_value("auth.enabled", False)
app_environment = get_config_value("app.environment", "")
auth0_test_token = get_config_value("auth.auth0.test_token", "")

# Get Auth0 configuration values
auth0_address = get_config_value("auth.auth0.address", "")
auth0_audience = get_config_value("auth.auth0.audience", "")

# Initialize the appropriate decorator
if self._auth_enabled:
if auth0_address and auth0_audience:
if app_environment == "test_with_auth" and auth0_test_token:
resource_protector = ResourceProtector()
resource_protector.register_token_validator(
StaticBearerTokenValidator(auth0_test_token)
)
self._decorator = resource_protector
elif auth0_address and auth0_audience:
# Set up real Auth0 authentication
resource_protector = ResourceProtector()
validator = Auth0JWTBearerTokenValidator(
Expand All @@ -79,10 +124,9 @@ def _setup_authentication(self) -> None:
resource_protector.register_token_validator(validator)
self._decorator = resource_protector
else:
# Auth was requested but configuration is missing
print("Warning: Auth enabled but Auth0 configuration missing")
self._auth_enabled = False
self._decorator = NoOpDecorator()
raise AuthConfigurationError(
"Auth enabled but Auth0 configuration missing"
)
else:
# Authentication is disabled
self._decorator = NoOpDecorator()
Expand Down
54 changes: 47 additions & 7 deletions tests/unit/decorators/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
Unit tests for the conditional authentication decorator.
"""

import pytest
from unittest.mock import Mock
from policyengine_household_api.decorators.auth import (
NoOpDecorator,
ConditionalAuthDecorator,
AuthConfigurationError,
create_auth_decorator,
StaticBearerTokenValidator,
)
from tests.fixtures.decorators.auth import (
AUTH0_CONFIG_DATA,
auth_enabled_environment,
auth_test_environment,
auth_disabled_environment,
auth_enabled_missing_config_environment,
auth_backward_compat_environment,
Expand Down Expand Up @@ -66,6 +70,29 @@ def test__given_multiple_functions__each_passes_through_unchanged(self):
class TestConditionalAuthDecoratorWithAuthEnabled:
"""Test ConditionalAuthDecorator with authentication enabled."""

def test__given_test_auth_environment__uses_static_token_validator(
self,
auth_test_environment,
mock_resource_protector,
mock_auth0_validator,
):
_, mock_protector_instance = mock_resource_protector
mock_validator_class, _ = mock_auth0_validator

decorator = ConditionalAuthDecorator()

mock_validator_class.assert_not_called()
registered_validator = (
mock_protector_instance.register_token_validator.call_args[0][0]
)
assert isinstance(registered_validator, StaticBearerTokenValidator)
assert registered_validator.expected_token == "test-jwt-token"
assert decorator.get_decorator() is mock_protector_instance
assert decorator.is_enabled is True

auth_test_environment.assert_any_call("app.environment", "")
auth_test_environment.assert_any_call("auth.auth0.test_token", "")

def test__given_auth_enabled_with_valid_config__auth0_is_configured(
self,
auth_enabled_environment,
Expand Down Expand Up @@ -97,26 +124,26 @@ def test__given_auth_enabled_with_valid_config__auth0_is_configured(
auth_enabled_environment.assert_any_call("auth.auth0.address", "")
auth_enabled_environment.assert_any_call("auth.auth0.audience", "")

def test__given_auth_enabled_missing_config__falls_back_to_noop(
def test__given_auth_enabled_missing_config__raises_configuration_error(
self,
auth_enabled_missing_config_environment,
mock_resource_protector,
mock_auth0_validator,
):
"""Test fallback to NoOp when auth is enabled but config is missing."""
"""Test auth fails closed when auth is enabled but config is missing."""
mock_protector_class, _ = mock_resource_protector
mock_validator_class, _ = mock_auth0_validator

decorator = ConditionalAuthDecorator()
with pytest.raises(
AuthConfigurationError,
match="Auth enabled but Auth0 configuration missing",
):
ConditionalAuthDecorator()

# Verify Auth0 components were not created
mock_validator_class.assert_not_called()
mock_protector_class.assert_not_called()

# Verify we get a NoOpDecorator
assert isinstance(decorator.get_decorator(), NoOpDecorator)
assert decorator.is_enabled is False

# Verify configuration was checked
auth_enabled_missing_config_environment.assert_any_call(
"auth.enabled", False
Expand Down Expand Up @@ -182,3 +209,16 @@ def test__given_auth_disabled__returns_noop_decorator(
decorator = create_auth_decorator()

assert isinstance(decorator, NoOpDecorator)

def test__given_auth_enabled_missing_config__raises_configuration_error(
self,
auth_enabled_missing_config_environment,
mock_resource_protector,
mock_auth0_validator,
):
"""Test that factory raises when auth is enabled but misconfigured."""
with pytest.raises(
AuthConfigurationError,
match="Auth enabled but Auth0 configuration missing",
):
create_auth_decorator()
46 changes: 46 additions & 0 deletions tests/unit/endpoints/test_calculate_demo_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Regression tests for calculate_demo authentication."""

import importlib
from contextlib import contextmanager
from unittest.mock import patch

import policyengine_household_api.api as household_api
import policyengine_household_api.decorators.auth as auth_module


@contextmanager
def _auth_enabled_app():
config_values = {
"app.environment": "test_with_auth",
"auth.enabled": True,
"auth.auth0.address": "test-tenant.auth0.com",
"auth.auth0.audience": "https://test-api-identifier",
"auth.auth0.test_token": "test-jwt-token",
}

def get_config_value(path: str, default=None):
return config_values.get(path, default)

with patch(
"policyengine_household_api.utils.config_loader.get_config_value",
side_effect=get_config_value,
):
reloaded_auth_module = importlib.reload(auth_module)
reloaded_api_module = importlib.reload(household_api)
try:
yield reloaded_api_module
finally:
importlib.reload(reloaded_auth_module)
importlib.reload(reloaded_api_module)


def test_calculate_demo_requires_auth_when_auth_is_enabled():
with _auth_enabled_app() as api_module:
client = api_module.app.test_client()

response = client.post(
"/us/calculate_demo",
headers={"Content-Type": "application/json"},
)

assert response.status_code == 401
Loading