mirror of
https://github.com/rommapp/romm.git
synced 2026-06-27 22:35:57 +00:00
Backend: - Resolve the acting user from the authenticated socket session on connect instead of trusting the client-supplied user_id, so a client can no longer spoof a "now playing" session for another user. Only rom_id/device_id come from the payload. - Emit activity:update/clear through the already-initialised socket server instead of opening (and leaking) a fresh AsyncRedisManager per REST heartbeat. - Collapse get_all_active's per-key GET into a single MGET. - Drop the pure pass-through _build_activity_entry helper. Frontend: - Remove all activity emits from the v1 EmulatorJS Player; the v2 shell is the single driver of the activity lifecycle. - Remove activity from the v1 UI entirely (Activity view, ActivityBtn, ActivePlayers on game details, navigation, and the now-v2-only route). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
164 lines
6.7 KiB
Python
164 lines
6.7 KiB
Python
"""Real-time backend log streaming over Socket.IO (admin only).
|
|
|
|
Pieces:
|
|
- ``connect`` handler on the main socket server: resolves the session user and,
|
|
if they are an admin, joins them to the ``admin`` room. It never rejects a
|
|
connection, so the existing scan/sync sockets keep working for everyone.
|
|
- ``start_log_forwarder``: a single background task (Redis-lock guarded) that
|
|
subscribes to the ``romm:logs`` pub/sub channel — fed by ``LogStreamHandler``
|
|
in every backend process — and relays each line to the ``admin`` room.
|
|
- ``get_recent_logs``: reads the capped ring buffer for backfill on view open.
|
|
"""
|
|
|
|
import asyncio
|
|
import uuid
|
|
from typing import Any, Final
|
|
|
|
import socketio # type: ignore
|
|
|
|
from config import DISABLE_LOGS_VIEWER, REDIS_URL
|
|
from endpoints.sockets.activity import store_authenticated_user
|
|
from handler.database import db_user_handler
|
|
from handler.redis_handler import async_cache
|
|
from handler.socket_handler import socket_handler
|
|
from logger.log_stream_handler import LOG_BUFFER_KEY, LOG_CHANNEL
|
|
from logger.logger import log
|
|
from models.user import Role
|
|
from utils import json_module
|
|
from utils.auth import get_session_from_environ
|
|
|
|
ADMIN_ROOM: Final = "admin"
|
|
FORWARDER_LOCK_KEY: Final = "romm:logs:forwarder"
|
|
FORWARDER_LOCK_TTL: Final = 30 # seconds
|
|
|
|
|
|
@socket_handler.socket_server.on("connect") # type: ignore
|
|
async def connect(sid: str, environ: dict[str, Any], auth: Any = None) -> None:
|
|
"""Resolve the authenticated user on socket connect.
|
|
|
|
Stores the user id in the socket session so activity events can trust the
|
|
server-resolved identity instead of a client-supplied ``user_id``, and joins
|
|
admin users to the log-streaming room. Always returns ``None`` (accepts the
|
|
connection) — only identity storage and room membership are gated, so the
|
|
existing scan/sync sockets keep working for everyone.
|
|
"""
|
|
try:
|
|
session = await get_session_from_environ(environ)
|
|
if session.get("iss") != "romm:auth":
|
|
return
|
|
|
|
username = session.get("sub")
|
|
if not username:
|
|
return
|
|
|
|
user = db_user_handler.get_user_by_username(username)
|
|
if not user or not user.enabled:
|
|
return
|
|
|
|
await store_authenticated_user(sid, user.id)
|
|
|
|
if not DISABLE_LOGS_VIEWER and user.role == Role.ADMIN:
|
|
await socket_handler.socket_server.enter_room(sid, ADMIN_ROOM)
|
|
except Exception: # noqa: BLE001 - never let auth resolution refuse a socket
|
|
log.exception("Failed to resolve user on socket connect")
|
|
|
|
|
|
async def get_recent_logs(limit: int) -> list[dict[str, Any]]:
|
|
"""Return the most recent buffered log lines in chronological order."""
|
|
raw = await async_cache.lrange(LOG_BUFFER_KEY, 0, max(0, limit - 1))
|
|
entries: list[dict[str, Any]] = []
|
|
for item in raw:
|
|
try:
|
|
entries.append(json_module.loads(item))
|
|
except Exception: # noqa: BLE001 - skip malformed entry # nosec B112
|
|
continue
|
|
# The buffer is newest-first (LPUSH); callers want oldest-first.
|
|
entries.reverse()
|
|
return entries
|
|
|
|
|
|
async def start_log_forwarder() -> None:
|
|
"""Relay log lines from Redis pub/sub to admin Socket.IO clients.
|
|
|
|
Guarded by a Redis lock so that, when more than one web worker is running
|
|
(WEB_SERVER_CONCURRENCY > 1), exactly one forwards and clients don't receive
|
|
duplicate lines.
|
|
"""
|
|
if DISABLE_LOGS_VIEWER:
|
|
return
|
|
|
|
lock_id = str(uuid.uuid4())
|
|
pubsub = None
|
|
# Write-only manager for emitting — the same proven path scan/sync use. It
|
|
# publishes the room emit to Redis; the main socket server's read-side
|
|
# manager resolves `admin` room membership and delivers. Created inside the
|
|
# running loop (like sync.py) rather than at import time.
|
|
socket_manager = socketio.AsyncRedisManager(REDIS_URL, write_only=True)
|
|
try:
|
|
while True:
|
|
got_lock = await async_cache.set(
|
|
FORWARDER_LOCK_KEY, lock_id, nx=True, ex=FORWARDER_LOCK_TTL
|
|
)
|
|
if not got_lock:
|
|
# Another worker owns the forwarder; check back periodically in
|
|
# case it dies and the lock expires.
|
|
await asyncio.sleep(FORWARDER_LOCK_TTL / 2)
|
|
continue
|
|
|
|
pubsub = async_cache.pubsub()
|
|
await pubsub.subscribe(LOG_CHANNEL)
|
|
log.info("Log stream forwarder started")
|
|
try:
|
|
while True:
|
|
# Heartbeat the lock, but only while we still own it. If
|
|
# another worker took it (after a stall / Redis hiccup),
|
|
# stop forwarding to avoid duplicate lines — the outer loop
|
|
# will then contend for the lock again.
|
|
if await async_cache.get(FORWARDER_LOCK_KEY) != lock_id:
|
|
break
|
|
await async_cache.set(
|
|
FORWARDER_LOCK_KEY, lock_id, ex=FORWARDER_LOCK_TTL
|
|
)
|
|
message = await pubsub.get_message(
|
|
ignore_subscribe_messages=True,
|
|
timeout=FORWARDER_LOCK_TTL / 3,
|
|
)
|
|
if not message:
|
|
continue
|
|
data = message.get("data")
|
|
if not data:
|
|
continue
|
|
# One bad message or emit must not kill the forwarder —
|
|
# swallow per-line and keep relaying.
|
|
try:
|
|
payload = json_module.loads(data)
|
|
await socket_manager.emit(
|
|
"logs:entry", payload, room=ADMIN_ROOM
|
|
)
|
|
except Exception: # noqa: BLE001 - keep forwarding # nosec B112
|
|
continue
|
|
finally:
|
|
await pubsub.unsubscribe(LOG_CHANNEL)
|
|
await pubsub.aclose() # type: ignore[attr-defined]
|
|
pubsub = None
|
|
except asyncio.CancelledError:
|
|
if pubsub is not None:
|
|
await pubsub.aclose() # type: ignore[attr-defined]
|
|
# Release the lock on shutdown so a restart (e.g. uvicorn --reload)
|
|
# resumes forwarding immediately instead of waiting out the TTL.
|
|
await _release_lock(lock_id)
|
|
raise
|
|
except Exception: # noqa: BLE001 - never let the forwarder kill the app
|
|
log.exception("Log forwarder crashed")
|
|
await _release_lock(lock_id)
|
|
|
|
|
|
async def _release_lock(lock_id: str) -> None:
|
|
"""Delete the forwarder lock only if we still own it."""
|
|
try:
|
|
current = await async_cache.get(FORWARDER_LOCK_KEY)
|
|
if current == lock_id:
|
|
await async_cache.delete(FORWARDER_LOCK_KEY)
|
|
except Exception: # noqa: BLE001 - best-effort cleanup # nosec B110
|
|
pass
|