Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions astrbot/cli/commands/cmd_conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import hashlib
import json
import zoneinfo
from collections.abc import Callable
from typing import Any

import click

from astrbot.core.utils.auth_password import (
hash_dashboard_password,
validate_dashboard_password,
)

from ..utils import check_astrbot_root, get_astrbot_root


Expand Down Expand Up @@ -39,9 +43,11 @@ def _validate_dashboard_username(value: str) -> str:

def _validate_dashboard_password(value: str) -> str:
"""Validate Dashboard password"""
if not value:
raise click.ClickException("Password cannot be empty")
return hashlib.md5(value.encode()).hexdigest()
try:
validate_dashboard_password(value)
except ValueError as e:
raise click.ClickException(str(e))
return hash_dashboard_password(value)


def _validate_timezone(value: str) -> str:
Expand Down
10 changes: 10 additions & 0 deletions astrbot/core/config/astrbot_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import os

from astrbot.core.utils.astrbot_path import get_astrbot_data_path
from astrbot.core.utils.auth_password import (
normalize_dashboard_password_hash,
)

from .default import DEFAULT_CONFIG, DEFAULT_VALUE_MAP

Expand Down Expand Up @@ -59,6 +62,13 @@ def __init__(

# 检查配置完整性,并插入
has_new = self.check_config_integrity(default_config, conf)
if (
"dashboard" in conf
and isinstance(conf["dashboard"], dict)
and not conf["dashboard"].get("password")
):
conf["dashboard"]["password"] = normalize_dashboard_password_hash("")
has_new = True
self.update(conf)
if has_new:
self.save_config()
Expand Down
2 changes: 1 addition & 1 deletion astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
"dashboard": {
"enable": True,
"username": "astrbot",
"password": "77b90590a8945a7d36c963981a307dc9",
"password": "",
"jwt_secret": "",
"host": "0.0.0.0",
"port": 6185,
Expand Down
167 changes: 167 additions & 0 deletions astrbot/core/utils/auth_password.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Utilities for dashboard password hashing and verification."""

import hashlib
import hmac
import re
import secrets
from typing import Any

_PBKDF2_ITERATIONS = 600_000
_PBKDF2_SALT_BYTES = 16
_PBKDF2_ALGORITHM = "pbkdf2_sha256"
_PBKDF2_FORMAT = f"{_PBKDF2_ALGORITHM}$"
_LEGACY_MD5_LENGTH = 32
_DASHBOARD_PASSWORD_MIN_LENGTH = 12
DEFAULT_DASHBOARD_PASSWORD = "astrbot"


def hash_dashboard_password(raw_password: str) -> str:
"""Return a salted hash for dashboard password using PBKDF2-HMAC-SHA256."""
if not isinstance(raw_password, str) or raw_password == "":
raise ValueError("Password cannot be empty")

salt = secrets.token_hex(_PBKDF2_SALT_BYTES)
digest = hashlib.pbkdf2_hmac(
"sha256",
raw_password.encode("utf-8"),
bytes.fromhex(salt),
_PBKDF2_ITERATIONS,
).hex()
return f"{_PBKDF2_FORMAT}{_PBKDF2_ITERATIONS}${salt}${digest}"


def validate_dashboard_password(raw_password: str) -> None:
"""Validate whether dashboard password meets the minimal complexity policy."""
if not isinstance(raw_password, str) or raw_password == "":
raise ValueError("Password cannot be empty")
if len(raw_password) < _DASHBOARD_PASSWORD_MIN_LENGTH:
raise ValueError(
f"Password must be at least {_DASHBOARD_PASSWORD_MIN_LENGTH} characters long"
)

if not re.search(r"[A-Z]", raw_password):
raise ValueError("Password must include at least one uppercase letter")
if not re.search(r"[a-z]", raw_password):
raise ValueError("Password must include at least one lowercase letter")
if not re.search(r"\d", raw_password):
raise ValueError("Password must include at least one digit")


def normalize_dashboard_password_hash(stored_password: str) -> str:
"""Ensure dashboard password has a value, fallback to default dashboard password hash."""
if not stored_password:
return hash_dashboard_password(DEFAULT_DASHBOARD_PASSWORD)
return stored_password


def _is_legacy_md5_hash(stored: str) -> bool:
return (
isinstance(stored, str)
and len(stored) == _LEGACY_MD5_LENGTH
and all(c in "0123456789abcdefABCDEF" for c in stored)
)


def _is_pbkdf2_hash(stored: str) -> bool:
return isinstance(stored, str) and stored.startswith(_PBKDF2_FORMAT)


def get_dashboard_login_challenge(stored_hash: str) -> dict[str, Any]:
"""Return the public challenge parameters needed for proof-based login."""
if _is_legacy_md5_hash(stored_hash):
return {"algorithm": "legacy_md5"}

if _is_pbkdf2_hash(stored_hash):
parts: list[str] = stored_hash.split("$")
if len(parts) != 4:
raise ValueError("Invalid dashboard password hash")
_, iterations_s, salt, _ = parts
return {
"algorithm": _PBKDF2_ALGORITHM,
"iterations": int(iterations_s),
"salt": salt,
}

raise ValueError("Unsupported dashboard password hash")


def verify_dashboard_login_proof(
stored_hash: str, challenge_nonce: str, proof: str
) -> bool:
"""Verify an HMAC-SHA256 login proof generated from the stored password secret."""
if (
not isinstance(stored_hash, str)
or not isinstance(challenge_nonce, str)
or not isinstance(proof, str)
):
return False

proof_key: bytes
if _is_legacy_md5_hash(stored_hash):
proof_key = stored_hash.lower().encode("utf-8")
elif _is_pbkdf2_hash(stored_hash):
parts: list[str] = stored_hash.split("$")
if len(parts) != 4:
return False
_, _, _, digest = parts
try:
proof_key = bytes.fromhex(digest)
except ValueError:
return False
else:
return False

expected = hmac.new(
proof_key,
challenge_nonce.encode("utf-8"),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected.lower(), proof.lower())


def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring verify_dashboard_password into a small dispatcher plus per-scheme helpers so each hash type’s detection, parsing, and verification are clearly separated.

verify_dashboard_password is carrying multiple responsibilities (scheme detection, parsing, and verification) and mixing semantics in a single branch. You can keep all behavior but make it easier to read and test by introducing a small dispatcher + per-scheme helpers.

For example:

from enum import Enum, auto

class _HashScheme(Enum):
    LEGACY_MD5 = auto()
    PBKDF2 = auto()
    UNKNOWN = auto()


def _get_hash_scheme(stored_hash: str) -> _HashScheme:
    if _is_legacy_md5_hash(stored_hash):
        return _HashScheme.LEGACY_MD5
    if _is_pbkdf2_hash(stored_hash):
        return _HashScheme.PBKDF2
    return _HashScheme.UNKNOWN

Then split the verification logic into named functions so the compatibility behavior is explicit:

def _verify_legacy_md5_hash(stored_hash: str, candidate_password: str) -> bool:
    # Keep compatibility with existing md5-based deployments:
    # new clients send plain password, old clients may send md5 of it.
    candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest()
    stored = stored_hash.lower()
    return (
        hmac.compare_digest(stored, candidate_md5.lower())
        or hmac.compare_digest(stored, candidate_password.lower())
    )


def _verify_pbkdf2_hash(stored_hash: str, candidate_password: str) -> bool:
    parts: list[str] = stored_hash.split("$")
    if len(parts) != 4:
        return False
    _, iterations_s, salt, digest = parts
    try:
        iterations = int(iterations_s)
        stored_key = bytes.fromhex(digest)
        salt_bytes = bytes.fromhex(salt)
    except (TypeError, ValueError):
        return False

    candidate_key = hashlib.pbkdf2_hmac(
        "sha256",
        candidate_password.encode("utf-8"),
        salt_bytes,
        iterations,
    )
    return hmac.compare_digest(stored_key, candidate_key)

verify_dashboard_password then becomes a straightforward dispatcher:

def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool:
    if not isinstance(stored_hash, str) or not isinstance(candidate_password, str):
        return False

    scheme = _get_hash_scheme(stored_hash)
    if scheme is _HashScheme.LEGACY_MD5:
        return _verify_legacy_md5_hash(stored_hash, candidate_password)
    if scheme is _HashScheme.PBKDF2:
        return _verify_pbkdf2_hash(stored_hash, candidate_password)
    return False

This keeps all existing behavior (including the legacy MD5 dual comparison and default-password checks) but makes each piece self-contained and easier to reason about and test individually.

"""Verify password against legacy md5 or new PBKDF2-SHA256 format."""
if not isinstance(stored_hash, str) or not isinstance(candidate_password, str):
return False

if _is_legacy_md5_hash(stored_hash):
# Keep compatibility with existing md5-based deployments:
# challenge-based clients send proof, fallback clients may send plain password or md5.
candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest()
return hmac.compare_digest(
stored_hash.lower(), candidate_md5.lower()
) or hmac.compare_digest(
stored_hash.lower(),
candidate_password.lower(),
)

if _is_pbkdf2_hash(stored_hash):
parts: list[str] = stored_hash.split("$")
if len(parts) != 4:
return False
_, iterations_s, salt, digest = parts
try:
iterations = int(iterations_s)
stored_key = bytes.fromhex(digest)
salt_bytes = bytes.fromhex(salt)
except (TypeError, ValueError):
return False
candidate_key = hashlib.pbkdf2_hmac(
"sha256",
candidate_password.encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(stored_key, candidate_key)

return False


def is_default_dashboard_password(stored_hash: str) -> bool:
"""Check whether the password still equals the built-in default value."""
return verify_dashboard_password(stored_hash, DEFAULT_DASHBOARD_PASSWORD)


def is_legacy_dashboard_password(stored_hash: str) -> bool:
"""Check whether the password is still stored with legacy MD5."""
return _is_legacy_md5_hash(stored_hash)
Loading
Loading