Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to

- Drop support for Python 3.8

### Changed

- Reload Basic Auth credentials file after change

### Fixed

- Fix type of `statement.result.score.scaled` from `int` to `Decimal`
Expand Down
49 changes: 37 additions & 12 deletions src/ralph/api/auth/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
import os
from functools import lru_cache
from pathlib import Path
from threading import Lock
from typing import Any, Iterator, List, Optional
Expand Down Expand Up @@ -75,7 +74,21 @@ def ensure_unique_username(self) -> Any:
return self


@lru_cache()
def basic_auth_file_cache_key(auth_file_path: os.PathLike = settings.AUTH_FILE):
"""Key used by cachetools to index cached user credentials results."""
if not os.path.exists(auth_file_path):
return None
return (
auth_file_path,
os.path.getmtime(auth_file_path),
)


@cached(
TTLCache(maxsize=settings.AUTH_CACHE_MAX_SIZE, ttl=settings.AUTH_CACHE_TTL),
lock=Lock(),
key=basic_auth_file_cache_key,
)
def get_stored_credentials(auth_file: os.PathLike) -> ServerUsersCredentials:
"""Helper to read the credentials/scopes file.

Expand All @@ -99,20 +112,31 @@ def get_stored_credentials(auth_file: os.PathLike) -> ServerUsersCredentials:
return ServerUsersCredentials.model_validate_json(f.read())


def basic_auth_user_cache_key(
credentials: Optional[HTTPBasicCredentials] = Depends(security),
auth_file_path: os.PathLike = settings.AUTH_FILE,
):
"""Key used by cachetools to index cached user credentials results."""
if credentials is None:
return None
if not os.path.exists(auth_file_path):
return None
return (
auth_file_path,
os.path.getmtime(auth_file_path),
credentials.username,
credentials.password,
)


@cached(
TTLCache(maxsize=settings.AUTH_CACHE_MAX_SIZE, ttl=settings.AUTH_CACHE_TTL),
lock=Lock(),
key=lambda credentials: (
(
credentials.username,
credentials.password,
)
if credentials is not None
else None
),
key=basic_auth_user_cache_key,
)
def get_basic_auth_user(
credentials: Optional[HTTPBasicCredentials] = Depends(security),
auth_file_path: os.PathLike = settings.AUTH_FILE,
) -> AuthenticatedUser:
"""Check valid auth parameters.

Expand All @@ -121,22 +145,23 @@ def get_basic_auth_user(

Args:
credentials (iterator): auth parameters from the Authorization header
auth_file_path (os.PathLike): path to credentials file

Raises:
HTTPException
"""
if not credentials:
logger.debug("No credentials were found for Basic auth")
return None

try:
user = next(
filter(
lambda u: u.username == credentials.username,
get_stored_credentials(settings.AUTH_FILE),
get_stored_credentials(auth_file_path),
)
)
hashed_password = user.hash

except StopIteration:
# next() gets the first item in the enumerable; if there is none, it
# raises a StopIteration error as it is out of bounds.
Expand Down
73 changes: 70 additions & 3 deletions tests/api/auth/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import base64
import json
import os

import bcrypt
import pytest
Expand All @@ -12,7 +13,6 @@
ServerUsersCredentials,
UserCredentials,
get_basic_auth_user,
get_stored_credentials,
)
from ralph.api.auth.user import AuthenticatedUser, UserScopes
from ralph.conf import AuthBackend, Settings, settings
Expand All @@ -31,6 +31,25 @@
]
)

STORED_CREDENTIALS_MORE = json.dumps(
[
{
"username": "ralph",
"hash": bcrypt.hashpw(b"admin", bcrypt.gensalt()).decode("UTF-8"),
"scopes": ["statements/read/mine", "statements/write"],
"agent": {"mbox": "mailto:ralph@example.com"},
"target": "custom_target",
},
{
"username": "foo",
"hash": bcrypt.hashpw(b"bar", bcrypt.gensalt()).decode("UTF-8"),
"scopes": ["statements/read/mine", "statements/write"],
"agent": {"mbox": "mailto:foo@example.com"},
"target": "custom_target",
},
]
)


def test_api_auth_basic_model_serveruserscredentials():
"""Test api.auth ServerUsersCredentials model."""
Expand Down Expand Up @@ -101,16 +120,16 @@ def test_api_auth_basic_caching_credentials(fs):

auth_file_path = settings.APP_DIR / "auth.json"
fs.create_file(auth_file_path, contents=STORED_CREDENTIALS)
auth_file_modified_time = os.path.getmtime(auth_file_path)
get_basic_auth_user.cache_clear()
get_stored_credentials.cache_clear()

credentials = HTTPBasicCredentials(username="ralph", password="admin")

# Call function as in a first request with these credentials
get_basic_auth_user(credentials=credentials)

assert get_basic_auth_user.cache.popitem() == (
("ralph", "admin"),
(auth_file_path, auth_file_modified_time, "ralph", "admin"),
AuthenticatedUser(
agent={"mbox": "mailto:ralph@example.com"},
scopes=UserScopes(["statements/read/mine", "statements/write"]),
Expand All @@ -119,6 +138,54 @@ def test_api_auth_basic_caching_credentials(fs):
)


def test_api_auth_basic_new_credentials(fs):
"""Test the authentication with new credentials and without clearing cache"""

auth_file_path = settings.APP_DIR / "auth.json"
fs.create_file(auth_file_path, contents=STORED_CREDENTIALS)
get_basic_auth_user.cache_clear()

credentials = HTTPBasicCredentials(username="ralph", password="admin")
# Try to authenticate with known user, create cache entry
get_basic_auth_user(credentials)
# Add a new user to credentials file
# In order to test this, we do NOT clear cache
auth_file_mtime = os.path.getmtime(auth_file_path)
# FIX: Force update of modification time.
# It does not seem to be updated by setting the content of the pyfakefs file
os.remove(auth_file_path)
fs.create_file(auth_file_path, contents=STORED_CREDENTIALS_MORE)
assert os.path.getmtime(auth_file_path) > auth_file_mtime
credentials_new = HTTPBasicCredentials(username="foo", password="bar")

# Try to authenticate with new user, should succeed
get_basic_auth_user(credentials_new)


def test_api_auth_basic_deleted_credentials(fs):
"""Test the authentication with deleted credentials and without clearing cache."""

auth_file_path = settings.APP_DIR / "auth.json"
fs.create_file(auth_file_path, contents=STORED_CREDENTIALS_MORE)
get_basic_auth_user.cache_clear()

credentials = HTTPBasicCredentials(username="foo", password="bar")
# Try to authenticate with known user, create cache entry
get_basic_auth_user(credentials)

# In order to test this, we do NOT clear cache
auth_file_mtime = os.path.getmtime(auth_file_path)
# FIX: Force update of modification time.
# It does not seem to be updated by setting the content of the pyfakefs file
os.remove(auth_file_path)
fs.create_file(auth_file_path, contents=STORED_CREDENTIALS)
assert os.path.getmtime(auth_file_path) > auth_file_mtime

# Try to authenticate with a deleted user, should fail
with pytest.raises(HTTPException):
get_basic_auth_user(credentials)


def test_api_auth_basic_with_wrong_password(fs):
"""Test the authentication with a wrong password."""

Expand Down