Skip to content

Commit 11bbff9

Browse files
author
Josh Bendson
committed
refactor: Eliminate code duplication in hybrid auth functions
- Extract common logic into _hybrid_auth_impl() internal function - get_current_user_hybrid() and get_current_admin_user_hybrid() are now thin wrappers - Reduces duplication by 11 lines while maintaining same functionality - Admin check is parameterized via require_admin boolean
1 parent 593d86c commit 11bbff9

1 file changed

Lines changed: 74 additions & 85 deletions

File tree

src/code_indexer/server/auth/dependencies.py

Lines changed: 74 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -501,19 +501,18 @@ async def get_current_user_for_mcp(request: Request) -> User:
501501
)
502502

503503

504-
async def get_current_user_hybrid(
504+
async def _hybrid_auth_impl(
505505
request: Request,
506-
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
506+
credentials: Optional[HTTPAuthorizationCredentials],
507+
require_admin: bool = False,
507508
) -> User:
508509
"""
509-
Get current user supporting both session-based and token-based authentication.
510-
511-
This function tries session-based authentication first (for web UI),
512-
then falls back to token-based authentication (for API clients).
510+
Internal implementation for hybrid authentication.
513511
514512
Args:
515513
request: FastAPI Request object
516514
credentials: Optional bearer token credentials
515+
require_admin: If True, require admin role
517516
518517
Returns:
519518
Authenticated User object
@@ -525,65 +524,107 @@ async def get_current_user_hybrid(
525524
import logging
526525

527526
logger = logging.getLogger(__name__)
527+
auth_type = "admin" if require_admin else "user"
528528

529529
# Try session-based auth first (for web UI)
530530
session_manager = get_session_manager()
531531
session_cookie_value = request.cookies.get(SESSION_COOKIE_NAME)
532532

533533
logger.info(
534-
f"User hybrid auth: session_cookie={'present' if session_cookie_value else 'absent'}"
534+
f"Hybrid auth ({auth_type}): session_cookie={'present' if session_cookie_value else 'absent'}"
535535
)
536536

537537
if session_cookie_value:
538538
session = session_manager.get_session(request)
539539
logger.info(
540-
f"User hybrid auth: session={'valid' if session else 'invalid'}, username={session.username if session else None}"
540+
f"Hybrid auth ({auth_type}): session={'valid' if session else 'invalid'}, "
541+
f"username={session.username if session else None}, "
542+
f"role={session.role if session else None}"
541543
)
544+
545+
# Check admin requirement for session auth
542546
if session:
543-
# Create User object from session
544-
if not user_manager:
545-
logger.error("User hybrid auth: user_manager not initialized")
547+
if require_admin and session.role != "admin":
548+
logger.debug(f"Hybrid auth ({auth_type}): Session valid but not admin")
549+
else:
550+
# Create User object from session
551+
if not user_manager:
552+
logger.error(f"Hybrid auth ({auth_type}): user_manager not initialized")
553+
raise HTTPException(
554+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
555+
detail="User manager not initialized",
556+
)
557+
user = user_manager.get_user(session.username)
558+
logger.debug(
559+
f"Hybrid auth ({auth_type}): user lookup for {session.username}: {user is not None}"
560+
)
561+
if user:
562+
logger.info(
563+
f"Hybrid auth ({auth_type}): Session auth SUCCESS for {session.username}"
564+
)
565+
return user
566+
# Session is valid but user not found - this shouldn't happen
567+
logger.error(
568+
f"Hybrid auth ({auth_type}): User {session.username} not found in database"
569+
)
546570
raise HTTPException(
547571
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
548-
detail="User manager not initialized",
572+
detail=f"User '{session.username}' not found in user database",
549573
)
550-
user = user_manager.get_user(session.username)
551-
logger.debug(
552-
f"User hybrid auth: user lookup for {session.username}: {user is not None}"
553-
)
554-
if user:
555-
logger.info(
556-
f"User hybrid auth: Session auth SUCCESS for {session.username}"
557-
)
558-
return user
559-
# Session is valid but user not found - this shouldn't happen
560-
logger.error(
561-
f"User hybrid auth: User {session.username} not found in database"
562-
)
563-
raise HTTPException(
564-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
565-
detail=f"User '{session.username}' not found in user database",
566-
)
567-
logger.debug(f"User hybrid auth: Session invalid")
574+
else:
575+
logger.debug(f"Hybrid auth ({auth_type}): Session invalid")
568576

569577
# Fall back to token-based auth only if no session cookie exists
570578
if not session_cookie_value and credentials:
571579
try:
572580
current_user = get_current_user(request, credentials)
573-
logger.info(f"User hybrid auth: Token auth SUCCESS for {current_user.username}")
581+
582+
# Check admin requirement for token auth
583+
if require_admin and not current_user.has_permission("manage_users"):
584+
raise HTTPException(
585+
status_code=status.HTTP_403_FORBIDDEN,
586+
detail="Admin access required",
587+
)
588+
589+
logger.info(
590+
f"Hybrid auth ({auth_type}): Token auth SUCCESS for {current_user.username}"
591+
)
574592
return current_user
575593
except HTTPException:
576594
raise
577595

578596
# No valid authentication found
579-
logger.warning("User hybrid auth: No valid authentication found")
597+
logger.warning(f"Hybrid auth ({auth_type}): No valid authentication found")
580598
raise HTTPException(
581599
status_code=status.HTTP_401_UNAUTHORIZED,
582600
detail="Authentication required",
583601
headers={"WWW-Authenticate": _build_www_authenticate_header()},
584602
)
585603

586604

605+
async def get_current_user_hybrid(
606+
request: Request,
607+
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
608+
) -> User:
609+
"""
610+
Get current user supporting both session-based and token-based authentication.
611+
612+
This function tries session-based authentication first (for web UI),
613+
then falls back to token-based authentication (for API clients).
614+
615+
Args:
616+
request: FastAPI Request object
617+
credentials: Optional bearer token credentials
618+
619+
Returns:
620+
Authenticated User object
621+
622+
Raises:
623+
HTTPException: If authentication fails
624+
"""
625+
return await _hybrid_auth_impl(request, credentials, require_admin=False)
626+
627+
587628
async def get_current_admin_user_hybrid(
588629
request: Request,
589630
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
@@ -604,56 +645,4 @@ async def get_current_admin_user_hybrid(
604645
Raises:
605646
HTTPException: If not authenticated or not admin
606647
"""
607-
# Try session-based auth first (for web UI)
608-
from code_indexer.server.web.auth import get_session_manager, SESSION_COOKIE_NAME
609-
import logging
610-
logger = logging.getLogger(__name__)
611-
612-
session_manager = get_session_manager()
613-
session_cookie_value = request.cookies.get(SESSION_COOKIE_NAME)
614-
615-
logger.info(f"Hybrid auth: session_cookie={session_cookie_value[:20] + '...' if session_cookie_value else None}")
616-
617-
if session_cookie_value:
618-
session = session_manager.get_session(request)
619-
logger.info(f"Hybrid auth: session={session}, role={getattr(session, 'role', None) if session else None}")
620-
if session and session.role == "admin":
621-
# Create User object from session
622-
if not user_manager:
623-
logger.error("Hybrid auth: user_manager not initialized")
624-
raise HTTPException(
625-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
626-
detail="User manager not initialized",
627-
)
628-
user = user_manager.get_user(session.username)
629-
logger.debug(f"Hybrid auth: user lookup for {session.username}: {user is not None}")
630-
if user:
631-
logger.info(f"Hybrid auth: Session auth SUCCESS for {session.username}")
632-
return user
633-
# Session is valid but user not found - this shouldn't happen
634-
logger.error(f"Hybrid auth: User {session.username} not found in database")
635-
raise HTTPException(
636-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
637-
detail=f"User '{session.username}' not found in user database",
638-
)
639-
logger.debug(f"Hybrid auth: Session invalid or not admin")
640-
641-
# Fall back to token-based auth only if no session cookie exists
642-
if not session_cookie_value and credentials:
643-
try:
644-
current_user = get_current_user(request, credentials)
645-
if not current_user.has_permission("manage_users"):
646-
raise HTTPException(
647-
status_code=status.HTTP_403_FORBIDDEN,
648-
detail="Admin access required",
649-
)
650-
return current_user
651-
except HTTPException:
652-
raise
653-
654-
# No valid authentication found
655-
raise HTTPException(
656-
status_code=status.HTTP_401_UNAUTHORIZED,
657-
detail="Authentication required",
658-
headers={"WWW-Authenticate": _build_www_authenticate_header()},
659-
)
648+
return await _hybrid_auth_impl(request, credentials, require_admin=True)

0 commit comments

Comments
 (0)