Files
romm/backend/endpoints/tasks.py
nendo edb5d15420 Fix save-sync hash drift, archival save leak, and dedupe scoping
Cleanup pass on save-sync addressing three independent failure modes
that interact in production data: content_hash drift between client
and server, null-slot archival saves leaking into sync flows, and
content-hash dedupe collapsing legitimately-distinct slots.

Bug fixes
- compute_content_hash dispatched on zipfile.is_zipfile(relative_path),
  which silently returned False whenever the process's CWD wasn't
  ASSETS_BASE_PATH. Every zip save fell through to the raw-MD5 branch,
  persisting hashes that disagreed with clients computing the intended
  per-entry zip-hash. Resolve to a full path before the dispatch.
- _build_negotiate_plan, sync_push_pull_task, and sync_watcher all
  treated null-slot saves as sync-eligible. Null-slot saves represent
  web-UI / archival uploads; including them in negotiate plans matched
  them against device pushes by filename and overwrote archival data.
  Filter null-slot saves at all three call sites.
- get_save_by_content_hash matched on (rom_id, user_id, content_hash)
  only, so identical bytes uploaded to different slots collapsed into
  one record. Scope the lookup by slot when provided so clone-save-
  to-new-slot creates a distinct row per slot.
- get_save_by_filename matched on (rom_id, user_id, file_name) only.
  When two uploads to different slots happened in the same wall-clock
  second (the datetime tag is per-second), the second upload UPDATED
  the first record's slot instead of creating a distinct row. Scope
  the filename lookup by slot too.

One-shot recovery
- New recompute_save_content_hashes manual task walks every Save row,
  recomputes via the fixed dispatch, and updates rows whose values
  differ. Idempotent; safe to re-run.
- Backend startup runs a COUNT(content_hash IS NULL) query and, if
  any rows exist, enqueues the recompute task on the low-priority
  RQ queue. The API process moves on; the worker handles the
  recompute out-of-band. Subsequent restarts find zero NULL hashes
  and skip. Admins can also trigger the task manually.

Test infrastructure
- Added tests/_zipfile_shim.reload_zipfile() mirroring the pattern
  from utils/zip_cache.py for the same zipfile-inflate64 + CPython
  3.13.5 incompatibility. Test fixtures that build ZIPs call it
  immediately before opening the archive.
2026-05-29 17:00:01 +09:00

408 lines
12 KiB
Python

from datetime import datetime, timezone
from typing import Any, TypedDict
from fastapi import Body, HTTPException, Request
from rq import Worker
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.registry import FailedJobRegistry, FinishedJobRegistry
from config import (
ENABLE_RESCAN_ON_FILESYSTEM_CHANGE,
RESCAN_ON_FILESYSTEM_CHANGE_DELAY,
TASK_RESULT_TTL,
TASK_TIMEOUT,
)
from decorators.auth import protected_route
from endpoints.responses import (
CleanupTaskStatusResponse,
ConversionTaskStatusResponse,
GenericTaskStatusResponse,
ScanTaskStatusResponse,
SyncTaskStatusResponse,
TaskExecutionResponse,
TaskStatusResponse,
UpdateTaskStatusResponse,
WatcherTaskStatusResponse,
)
from endpoints.responses.tasks import GroupedTasksDict, TaskInfo
from handler.auth.constants import Scope
from handler.redis_handler import (
default_queue,
get_job_func_name,
high_prio_queue,
low_prio_queue,
redis_client,
)
from tasks.manual.cleanup_missing_roms import cleanup_missing_roms_task
from tasks.manual.cleanup_orphaned_resources import cleanup_orphaned_resources_task
from tasks.manual.recompute_save_content_hashes import (
recompute_save_content_hashes_task,
)
from tasks.manual.sync_folder_scan import sync_folder_scan_task
from tasks.scheduled.convert_images_to_webp import convert_images_to_webp_task
from tasks.scheduled.scan_library import scan_library_task
from tasks.scheduled.update_launchbox_metadata import update_launchbox_metadata_task
from tasks.scheduled.update_switch_titledb import update_switch_titledb_task
from tasks.tasks import (
Task,
TaskType,
)
from utils.router import APIRouter
router = APIRouter(
prefix="/tasks",
tags=["tasks"],
)
class ScheduledTask(TypedDict):
name: str
type: TaskType
task: Task
class ManualTask(ScheduledTask):
pass
scheduled_tasks: list[ScheduledTask] = [
ScheduledTask(
{
"name": "scan_library",
"type": TaskType.SCAN,
"task": scan_library_task,
}
),
ScheduledTask(
{
"name": "update_launchbox_metadata",
"type": TaskType.UPDATE,
"task": update_launchbox_metadata_task,
}
),
ScheduledTask(
{
"name": "update_switch_titledb",
"type": TaskType.UPDATE,
"task": update_switch_titledb_task,
}
),
ScheduledTask(
{
"name": "convert_images_to_webp",
"type": TaskType.CONVERSION,
"task": convert_images_to_webp_task,
}
),
]
manual_tasks: list[ManualTask] = [
ManualTask(
{
"name": "cleanup_orphaned_resources",
"type": TaskType.CLEANUP,
"task": cleanup_orphaned_resources_task,
}
),
ManualTask(
{
"name": "cleanup_missing_roms",
"type": TaskType.CLEANUP,
"task": cleanup_missing_roms_task,
}
),
ManualTask(
{
"name": "sync_folder_scan",
"type": TaskType.SYNC,
"task": sync_folder_scan_task,
}
),
ManualTask(
{
"name": "recompute_save_content_hashes",
"type": TaskType.CLEANUP,
"task": recompute_save_content_hashes_task,
}
),
]
def _build_task_info(name: str, task: Task) -> TaskInfo:
"""Builds a TaskInfo object from task details."""
return TaskInfo(
name=name,
type=task.task_type,
title=task.title,
description=task.description,
enabled=task.enabled,
manual_run=task.manual_run,
cron_string=task.cron_string or "",
)
def _build_task_status_response(
job: Job,
) -> TaskStatusResponse:
job_meta = job.get_meta()
task_type = job_meta.get("task_type")
task_name = job_meta.get("task_name") or get_job_func_name(job)
# Convert datetime objects to ISO format strings
created_at = job.created_at.isoformat() if job.created_at else None
started_at = job.started_at.isoformat() if job.started_at else None
ended_at = job.ended_at.isoformat() if job.ended_at else None
enqueued_at = job.enqueued_at.isoformat() if job.enqueued_at else None
common_data = {
"task_name": task_name,
"task_id": job.id,
"status": job.get_status(),
"created_at": created_at,
"enqueued_at": enqueued_at,
"started_at": started_at,
"ended_at": ended_at,
}
if not task_type:
return GenericTaskStatusResponse(
task_type=TaskType.GENERIC,
meta={},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
match TaskType(task_type):
case TaskType.SCAN:
return ScanTaskStatusResponse(
task_type=TaskType.SCAN,
meta={"scan_stats": job_meta.get("scan_stats")},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.CONVERSION:
return ConversionTaskStatusResponse(
task_type=TaskType.CONVERSION,
meta={"conversion_stats": job_meta.get("conversion_stats")},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.UPDATE:
return UpdateTaskStatusResponse(
task_type=TaskType.UPDATE,
meta={"update_stats": job_meta.get("update_stats")},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.CLEANUP:
return CleanupTaskStatusResponse(
task_type=TaskType.CLEANUP,
meta={"cleanup_stats": job_meta.get("cleanup_stats")},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.SYNC:
return SyncTaskStatusResponse(
task_type=TaskType.SYNC,
meta={},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.WATCHER:
return WatcherTaskStatusResponse(
task_type=TaskType.WATCHER,
meta={},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case TaskType.GENERIC:
return GenericTaskStatusResponse(
task_type=TaskType.GENERIC,
meta={},
**common_data, # trunk-ignore(mypy/typeddict-item)
)
case _:
raise ValueError(f"Invalid task type: {task_type}")
@protected_route(router.get, "", [Scope.TASKS_RUN])
async def list_tasks(request: Request) -> GroupedTasksDict:
"""List all available tasks grouped by task type.
Args:
request (Request): FastAPI Request object
Returns:
GroupedTasksDict: Dictionary with tasks grouped by their type (scheduled, manual, watcher)
"""
# Initialize the grouped tasks dictionary
grouped_tasks: GroupedTasksDict = {
"scheduled": [],
"manual": [],
"watcher": [],
}
for task in manual_tasks:
grouped_tasks["manual"].append(_build_task_info(task["name"], task["task"]))
for task in scheduled_tasks:
grouped_tasks["scheduled"].append(_build_task_info(task["name"], task["task"]))
# Add the adhoc watcher task
grouped_tasks["watcher"].append(
TaskInfo(
name="filesystem_watcher",
type=TaskType.WATCHER,
title="Rescan on filesystem change",
description=f"Runs a scan when a change is detected in the library path, with a {RESCAN_ON_FILESYSTEM_CHANGE_DELAY} minute delay",
enabled=ENABLE_RESCAN_ON_FILESYSTEM_CHANGE,
manual_run=False,
cron_string="",
)
)
return grouped_tasks
@protected_route(router.get, "/status", [Scope.TASKS_RUN])
async def get_tasks_status(request: Request) -> list[TaskStatusResponse]:
"""Get all active, queued, completed, and failed tasks.
Args:
request (Request): FastAPI Request object
Returns:
list[TaskStatusResponse]: List of all tasks with their current status
"""
all_tasks: list[TaskStatusResponse] = []
# Get currently running jobs from workers
workers = Worker.all(connection=redis_client)
for worker in workers:
current_job = worker.get_current_job()
if current_job:
all_tasks.append(_build_task_status_response(current_job))
# Get all jobs from the queues (including completed ones)
low_prio_jobs = low_prio_queue.get_jobs()
default_prio_jobs = default_queue.get_jobs()
high_prio_jobs = high_prio_queue.get_jobs()
for job in low_prio_jobs + default_prio_jobs + high_prio_jobs:
all_tasks.append(_build_task_status_response(job))
# Get finished jobs from all queues
finished_registries = [
FinishedJobRegistry(queue=low_prio_queue),
FinishedJobRegistry(queue=default_queue),
FinishedJobRegistry(queue=high_prio_queue),
]
failed_registries = [
FailedJobRegistry(queue=low_prio_queue),
FailedJobRegistry(queue=default_queue),
FailedJobRegistry(queue=high_prio_queue),
]
# Process finished jobs
for registry in finished_registries:
for job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=redis_client)
except NoSuchJobError:
registry.remove(job_id)
continue
all_tasks.append(
_build_task_status_response(
job,
)
)
# Process failed jobs
for registry in failed_registries:
for job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=redis_client)
except NoSuchJobError:
registry.remove(job_id)
continue
all_tasks.append(_build_task_status_response(job))
all_tasks.sort(
key=lambda x: x["started_at"] or x["enqueued_at"] or x["created_at"] or "",
reverse=True,
)
return all_tasks
@protected_route(router.get, "/{task_id}", [Scope.TASKS_RUN])
async def get_task_by_id(request: Request, task_id: str) -> TaskStatusResponse:
"""Get the status of a task by its job ID.
Args:
request (Request): FastAPI Request object
task_id (str): Job ID of the task to retrieve status for
Returns:
TaskStatusResponse: Task status information
"""
try:
job = Job.fetch(task_id, connection=low_prio_queue.connection)
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Task with ID '{task_id}' not found",
) from e
return _build_task_status_response(job)
TASK_KWARGS = Body(default=None)
@protected_route(router.post, "/run/{task_name}", [Scope.TASKS_RUN])
async def run_single_task(
request: Request,
task_name: str,
task_kwargs: dict[str, Any] | None = TASK_KWARGS,
) -> TaskExecutionResponse:
"""Run a single task endpoint.
Args:
request (Request): FastAPI Request object
task_name (str): Name of the task to run
task_kwargs (dict | None): Optional keyword arguments forwarded to the task's run() method
Returns:
TaskExecutionResponse: Task execution response with details
"""
all_tasks = {task["name"]: task["task"] for task in manual_tasks + scheduled_tasks}
if task_name not in all_tasks:
available_tasks = list(all_tasks.keys())
raise HTTPException(
status_code=404,
detail=f"Task '{task_name}' not found, available tasks are {', '.join(available_tasks)}",
)
task_instance = all_tasks[task_name]
if not task_instance.enabled or not task_instance.manual_run:
raise HTTPException(
status_code=400,
detail=f"Task '{task_name}' cannot be run",
)
job = low_prio_queue.enqueue(
task_instance.run,
kwargs=task_kwargs or {},
job_timeout=TASK_TIMEOUT,
result_ttl=TASK_RESULT_TTL,
meta={
"task_name": task_instance.title,
"task_type": task_instance.task_type.value,
},
)
return {
"task_name": task_instance.title,
"task_id": job.id,
"status": job.get_status() or JobStatus.QUEUED,
"created_at": (
job.created_at.isoformat()
if job.created_at
else datetime.now(timezone.utc).isoformat()
),
"enqueued_at": job.enqueued_at.isoformat() if job.enqueued_at else None,
}