Files
romm/backend/endpoints/sockets/logs.py
Georges-Antoine Assi 0460133992 Secure activity identity, cut Redis churn, remove v1 activity
Backend:
- Resolve the acting user from the authenticated socket session on
  connect instead of trusting the client-supplied user_id, so a client
  can no longer spoof a "now playing" session for another user. Only
  rom_id/device_id come from the payload.
- Emit activity:update/clear through the already-initialised socket
  server instead of opening (and leaking) a fresh AsyncRedisManager per
  REST heartbeat.
- Collapse get_all_active's per-key GET into a single MGET.
- Drop the pure pass-through _build_activity_entry helper.

Frontend:
- Remove all activity emits from the v1 EmulatorJS Player; the v2 shell
  is the single driver of the activity lifecycle.
- Remove activity from the v1 UI entirely (Activity view, ActivityBtn,
  ActivePlayers on game details, navigation, and the now-v2-only route).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 11:12:18 -04:00

164 lines
6.7 KiB
Python

"""Real-time backend log streaming over Socket.IO (admin only).
Pieces:
- ``connect`` handler on the main socket server: resolves the session user and,
if they are an admin, joins them to the ``admin`` room. It never rejects a
connection, so the existing scan/sync sockets keep working for everyone.
- ``start_log_forwarder``: a single background task (Redis-lock guarded) that
subscribes to the ``romm:logs`` pub/sub channel — fed by ``LogStreamHandler``
in every backend process — and relays each line to the ``admin`` room.
- ``get_recent_logs``: reads the capped ring buffer for backfill on view open.
"""
import asyncio
import uuid
from typing import Any, Final
import socketio # type: ignore
from config import DISABLE_LOGS_VIEWER, REDIS_URL
from endpoints.sockets.activity import store_authenticated_user
from handler.database import db_user_handler
from handler.redis_handler import async_cache
from handler.socket_handler import socket_handler
from logger.log_stream_handler import LOG_BUFFER_KEY, LOG_CHANNEL
from logger.logger import log
from models.user import Role
from utils import json_module
from utils.auth import get_session_from_environ
ADMIN_ROOM: Final = "admin"
FORWARDER_LOCK_KEY: Final = "romm:logs:forwarder"
FORWARDER_LOCK_TTL: Final = 30 # seconds
@socket_handler.socket_server.on("connect") # type: ignore
async def connect(sid: str, environ: dict[str, Any], auth: Any = None) -> None:
"""Resolve the authenticated user on socket connect.
Stores the user id in the socket session so activity events can trust the
server-resolved identity instead of a client-supplied ``user_id``, and joins
admin users to the log-streaming room. Always returns ``None`` (accepts the
connection) — only identity storage and room membership are gated, so the
existing scan/sync sockets keep working for everyone.
"""
try:
session = await get_session_from_environ(environ)
if session.get("iss") != "romm:auth":
return
username = session.get("sub")
if not username:
return
user = db_user_handler.get_user_by_username(username)
if not user or not user.enabled:
return
await store_authenticated_user(sid, user.id)
if not DISABLE_LOGS_VIEWER and user.role == Role.ADMIN:
await socket_handler.socket_server.enter_room(sid, ADMIN_ROOM)
except Exception: # noqa: BLE001 - never let auth resolution refuse a socket
log.exception("Failed to resolve user on socket connect")
async def get_recent_logs(limit: int) -> list[dict[str, Any]]:
"""Return the most recent buffered log lines in chronological order."""
raw = await async_cache.lrange(LOG_BUFFER_KEY, 0, max(0, limit - 1))
entries: list[dict[str, Any]] = []
for item in raw:
try:
entries.append(json_module.loads(item))
except Exception: # noqa: BLE001 - skip malformed entry # nosec B112
continue
# The buffer is newest-first (LPUSH); callers want oldest-first.
entries.reverse()
return entries
async def start_log_forwarder() -> None:
"""Relay log lines from Redis pub/sub to admin Socket.IO clients.
Guarded by a Redis lock so that, when more than one web worker is running
(WEB_SERVER_CONCURRENCY > 1), exactly one forwards and clients don't receive
duplicate lines.
"""
if DISABLE_LOGS_VIEWER:
return
lock_id = str(uuid.uuid4())
pubsub = None
# Write-only manager for emitting — the same proven path scan/sync use. It
# publishes the room emit to Redis; the main socket server's read-side
# manager resolves `admin` room membership and delivers. Created inside the
# running loop (like sync.py) rather than at import time.
socket_manager = socketio.AsyncRedisManager(REDIS_URL, write_only=True)
try:
while True:
got_lock = await async_cache.set(
FORWARDER_LOCK_KEY, lock_id, nx=True, ex=FORWARDER_LOCK_TTL
)
if not got_lock:
# Another worker owns the forwarder; check back periodically in
# case it dies and the lock expires.
await asyncio.sleep(FORWARDER_LOCK_TTL / 2)
continue
pubsub = async_cache.pubsub()
await pubsub.subscribe(LOG_CHANNEL)
log.info("Log stream forwarder started")
try:
while True:
# Heartbeat the lock, but only while we still own it. If
# another worker took it (after a stall / Redis hiccup),
# stop forwarding to avoid duplicate lines — the outer loop
# will then contend for the lock again.
if await async_cache.get(FORWARDER_LOCK_KEY) != lock_id:
break
await async_cache.set(
FORWARDER_LOCK_KEY, lock_id, ex=FORWARDER_LOCK_TTL
)
message = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=FORWARDER_LOCK_TTL / 3,
)
if not message:
continue
data = message.get("data")
if not data:
continue
# One bad message or emit must not kill the forwarder —
# swallow per-line and keep relaying.
try:
payload = json_module.loads(data)
await socket_manager.emit(
"logs:entry", payload, room=ADMIN_ROOM
)
except Exception: # noqa: BLE001 - keep forwarding # nosec B112
continue
finally:
await pubsub.unsubscribe(LOG_CHANNEL)
await pubsub.aclose() # type: ignore[attr-defined]
pubsub = None
except asyncio.CancelledError:
if pubsub is not None:
await pubsub.aclose() # type: ignore[attr-defined]
# Release the lock on shutdown so a restart (e.g. uvicorn --reload)
# resumes forwarding immediately instead of waiting out the TTL.
await _release_lock(lock_id)
raise
except Exception: # noqa: BLE001 - never let the forwarder kill the app
log.exception("Log forwarder crashed")
await _release_lock(lock_id)
async def _release_lock(lock_id: str) -> None:
"""Delete the forwarder lock only if we still own it."""
try:
current = await async_cache.get(FORWARDER_LOCK_KEY)
if current == lock_id:
await async_cache.delete(FORWARDER_LOCK_KEY)
except Exception: # noqa: BLE001 - best-effort cleanup # nosec B110
pass