Files
romm/backend/handler/auth/dependencies.py

152 lines
5.2 KiB
Python

"""Fine-grained, DB-backed permission checks for endpoint handlers.
The coarse ``@protected_route(..., [Scope.X])`` gate stays the first line of
defense (and is what client tokens / kiosk rely on). These helpers add the
authoritative per-entity / per-action / ownership / visibility decisions on top,
resolved once per request and cached on ``request.state``.
Typical use inside a handler::
perms = get_permissions(request)
assert_can(perms, PermEntity.ROMS, PermAction.DELETE) # library-wide
assert_can(perms, PermEntity.COLLECTIONS, PermAction.WRITE,
owner_id=collection.user_id) # own-data ok
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi import HTTPException, Request, status
from exceptions.endpoint_exceptions import (
PlatformNotFoundInDatabaseException,
RomNotFoundInDatabaseException,
)
from handler.auth.permissions import ResolvedPermissions, resolve_permissions
from models.permission import PermAction, PermEntity
if TYPE_CHECKING:
from models.firmware import Firmware
from models.platform import Platform
from models.rom import Rom
def get_permissions(request: Request) -> ResolvedPermissions:
"""Resolve (and cache for the request) the caller's effective permissions."""
cached = getattr(request.state, "permissions", None)
if cached is not None:
return cached
perms = resolve_permissions(request.user)
request.state.permissions = perms
return perms
def can_access(
perms: ResolvedPermissions,
entity: PermEntity,
action: PermAction,
*,
owner_id: int | None = None,
) -> bool:
"""Whether ``perms`` may perform ``action`` on ``entity``.
Admins bypass. A user may always act on a resource they own (``owner_id``
matches). Otherwise a library-wide (non ``own_only``) grant is required.
"""
if perms.is_admin:
return True
if owner_id is not None and owner_id == perms.user_id:
return True
return perms.allows(entity, action, owned=False)
def assert_can(
perms: ResolvedPermissions,
entity: PermEntity,
action: PermAction,
*,
owner_id: int | None = None,
) -> None:
"""Raise 403 unless ``perms`` may perform ``action`` on ``entity``."""
if not can_access(perms, entity, action, owner_id=owner_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
def require_permission(entity: PermEntity, action: PermAction):
"""FastAPI dependency factory for library-wide gating of a route.
Returns the resolved permissions so the handler can reuse them.
"""
def _dependency(request: Request) -> ResolvedPermissions:
perms = get_permissions(request)
assert_can(perms, entity, action)
return perms
return _dependency
def assert_admin(request: Request) -> ResolvedPermissions:
"""Raise 403 unless the caller is an admin. For permission-management routes."""
perms = get_permissions(request)
if not perms.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Administrator privileges required",
)
return perms
# 404-masking helpers: a hidden entity must read as non-existent, not forbidden,
# so its existence isn't leaked. The auth guard skips the check on unauthenticated
# download endpoints, which carry no permission context to resolve.
def assert_rom_visible(
request: Request, rom: Rom, *, not_found_detail: str | None = None
) -> None:
"""Raise 404 (not 403) when the rom is hidden from the caller.
Defaults to the standard ``RomNotFoundInDatabaseException`` message; pass
``not_found_detail`` for endpoints with a bespoke 404 (metadata-id / hash
lookups) so the masked response is indistinguishable from their not-found.
"""
if request.user.is_authenticated and not get_permissions(request).can_see_rom(
rom.id, rom.platform_id
):
if not_found_detail is not None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=not_found_detail
)
raise RomNotFoundInDatabaseException(rom.id)
def assert_platform_visible(
request: Request, platform: Platform, *, not_found_detail: str | None = None
) -> None:
"""Raise 404 (not 403) when the platform is hidden from the caller.
Pass ``not_found_detail`` to match an endpoint's bespoke 404 message.
"""
if request.user.is_authenticated and not get_permissions(request).can_see_platform(
platform.id
):
if not_found_detail is not None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=not_found_detail
)
raise PlatformNotFoundInDatabaseException(platform.id)
def assert_firmware_visible(request: Request, firmware: Firmware) -> None:
"""Raise 404 when the firmware's platform is hidden (firmware inherits it)."""
if request.user.is_authenticated and not get_permissions(request).can_see_platform(
firmware.platform_id
):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Firmware with ID {firmware.id} not found",
)