Files
romm/backend/handler/auth/hybrid_auth.py
nendo 519abc1645 Add device authorization flow for TV-app-style pairing (RFC 8628)
Implements RFC 8628-style device authorization so clients
(argosy-launcher, grout) can pair by display instead of manually
copying tokens. Device posts to an open /api/auth/device/init with
its identifier and requested scopes; the server returns device_code
+ user_code + QR URL. User scans QR, lands at /pair/device, approves
(optionally editing name/scopes/expiry); the device's next poll on
/api/auth/device/token returns a ClientToken bound 1:1 to a newly-
created (or deduped) Device record. Downstream endpoints
(/play-sessions, /sync/negotiate) infer device_id from the bound
token so the client doesn't have to ship it on every call.

- Migrations 0080/0081: devices.client_device_identifier (unique
  per user) and client_tokens.device_id FK (ON DELETE SET NULL)
- Five new endpoints under /api/auth/device (init/pending/approve/
  deny/token) with Redis-backed state, per-IP rate limits, and
  RFC-compliant error codes (authorization_pending, slow_down,
  expired_token, access_denied)
- HybridAuthBackend surfaces bound device_id on request.state and
  bumps devices.last_seen with a 5-minute debounce
- /api/users/me returns current_device_id for bound tokens so a
  device can identify itself from its token alone
- Frontend approval screen at /pair/device with editable scopes/
  name/expiry (defaults to Never), 3s auto-close countdown
- ClientApiTokens settings list shows bound-device chip
- 20 i18n keys added to all 17 locales; generated models updated
- 52 new tests across 13 classes; full suite 1334 passed

Planning and review assisted by Claude Code.
2026-06-18 05:24:32 +09:00

110 lines
4.1 KiB
Python

from datetime import datetime, timezone
from fastapi.security.http import HTTPBasic
from starlette.authentication import AuthCredentials, AuthenticationBackend
from starlette.requests import HTTPConnection
from config import KIOSK_MODE
from handler.auth import auth_handler, oauth_handler
from handler.database import (
db_client_token_handler,
db_device_handler,
db_user_handler,
)
from models.user import User
from utils.datetime import to_utc
from .constants import READ_SCOPES
class HybridAuthBackend(AuthenticationBackend):
async def authenticate(
self, conn: HTTPConnection
) -> tuple[AuthCredentials, User] | None:
# Check if session key already stored in cache
user = await auth_handler.get_current_active_user_from_session(conn)
if user:
user.set_last_active()
return (AuthCredentials(user.oauth_scopes), user)
# Check if Authorization header exists
if "Authorization" in conn.headers:
auth_header_parts = conn.headers["Authorization"].split()
if len(auth_header_parts) != 2:
return None
scheme, token = auth_header_parts
# Check if basic auth header is valid
if scheme.lower() == "basic":
credentials = await HTTPBasic().__call__(conn) # type: ignore[arg-type]
if not credentials:
return None
user = auth_handler.authenticate_user(
credentials.username, credentials.password
)
if user is None or not user.enabled:
return None
user.set_last_active()
return (AuthCredentials(user.oauth_scopes), user)
# Check if bearer auth header is valid
if scheme.lower() == "bearer":
# Client API tokens use the rmm_ prefix
if token.startswith("rmm_"):
hashed = auth_handler.hash_client_token(token)
client_token = db_client_token_handler.get_token_by_hash(hashed)
if client_token is None:
return None
if client_token.expires_at and to_utc(
client_token.expires_at
) < datetime.now(timezone.utc):
return None
user = db_user_handler.get_user(client_token.user_id)
if user is None or not user.enabled:
return None
token_scopes = set(client_token.scopes.split())
effective_scopes = list(token_scopes & set(user.oauth_scopes))
db_client_token_handler.update_last_used(client_token.id)
user.set_last_active()
conn.state.client_token_id = client_token.id
conn.state.device_id = client_token.device_id
if client_token.device_id:
db_device_handler.update_last_seen_debounced(
client_token.device_id
)
return (AuthCredentials(effective_scopes), user)
# OAuth JWT bearer tokens
(
user,
claims,
) = await oauth_handler.get_current_active_user_from_bearer_token(token)
if user is None or claims is None:
return None
# Only access tokens can request resources
if claims.get("type") != "access":
return None
# Only grant access to resources with overlapping scopes
token_scopes = set(list(claims.get("scopes", "").split(" ")))
overlapping_scopes = list(token_scopes & set(user.oauth_scopes))
user.set_last_active()
return (AuthCredentials(overlapping_scopes), user)
# Check if we're in KIOSK_MODE
if KIOSK_MODE:
user = User.kiosk_mode_user()
return (AuthCredentials(READ_SCOPES), user)
return None