mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
SaveSync: dedupe bootstrap recompute job across API restarts
Pass a deterministic job_id and unique=True to low_prio_queue.enqueue so a restart while a previous bootstrap recompute is still queued or running no-ops the second enqueue. Without this, every API restart with a NULL-hash row left would push another duplicate job onto the low-priority queue; RQ would happily run both back-to-back, redoing the same scans and content-hash reads against the filesystem. RQ raises rq.exceptions.DuplicateJobError when unique=True hits an existing job ID. Swallow it with a log line and let other enqueue failures fall through to the generic exception path so they still get logged with a traceback.
This commit is contained in:
@@ -4,6 +4,7 @@ import asyncio
|
||||
|
||||
import sentry_sdk
|
||||
from opentelemetry import trace
|
||||
from rq.exceptions import DuplicateJobError
|
||||
|
||||
from config import (
|
||||
ENABLE_SCHEDULED_CONVERT_IMAGES_TO_WEBP,
|
||||
@@ -47,6 +48,8 @@ from utils.context import initialize_context
|
||||
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
RECOMPUTE_SAVE_HASHES_JOB_ID = "recompute_save_content_hashes:bootstrap"
|
||||
|
||||
|
||||
def _enqueue_recompute_save_hashes_if_needed() -> None:
|
||||
"""Backfill content_hash for saves uploaded before the path-resolution
|
||||
@@ -71,6 +74,8 @@ def _enqueue_recompute_save_hashes_if_needed() -> None:
|
||||
try:
|
||||
low_prio_queue.enqueue(
|
||||
recompute_save_content_hashes_task.run,
|
||||
job_id=RECOMPUTE_SAVE_HASHES_JOB_ID,
|
||||
unique=True,
|
||||
job_timeout=TASK_TIMEOUT,
|
||||
meta={
|
||||
"task_name": recompute_save_content_hashes_task.title,
|
||||
@@ -81,6 +86,11 @@ def _enqueue_recompute_save_hashes_if_needed() -> None:
|
||||
f"Enqueued recompute_save_content_hashes ({missing} saves with NULL content_hash); "
|
||||
"running on low-priority worker"
|
||||
)
|
||||
except DuplicateJobError:
|
||||
log.info(
|
||||
"recompute_save_content_hashes already queued or running from a "
|
||||
"previous restart; skipping enqueue"
|
||||
)
|
||||
except Exception:
|
||||
log.exception(
|
||||
"Failed to enqueue recompute_save_content_hashes; admins can run it manually"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Tests for startup-time auto-enqueue of the recompute task."""
|
||||
|
||||
import startup
|
||||
from rq.exceptions import DuplicateJobError
|
||||
|
||||
|
||||
def test_enqueue_recompute_skips_when_no_missing_hashes(mocker):
|
||||
@@ -38,6 +39,25 @@ def test_enqueue_recompute_fires_when_missing_hashes_present(mocker):
|
||||
# Job timeout must be passed; otherwise long-running recomputes get killed
|
||||
# by RQ's default short timeout on very large libraries.
|
||||
assert kwargs["job_timeout"] == startup.TASK_TIMEOUT
|
||||
# Deterministic job_id + unique=True prevent duplicate enqueues across
|
||||
# API process restarts while a previous recompute is still running.
|
||||
assert kwargs["job_id"] == startup.RECOMPUTE_SAVE_HASHES_JOB_ID
|
||||
assert kwargs["unique"] is True
|
||||
|
||||
|
||||
def test_enqueue_recompute_silently_skips_duplicate(mocker):
|
||||
"""A DuplicateJobError from RQ (job already queued/running) is logged and
|
||||
swallowed, not propagated."""
|
||||
mocker.patch.object(
|
||||
startup.db_save_handler, "count_saves_missing_content_hash", return_value=10
|
||||
)
|
||||
mocker.patch.object(
|
||||
startup.low_prio_queue,
|
||||
"enqueue",
|
||||
side_effect=DuplicateJobError("already enqueued"),
|
||||
)
|
||||
|
||||
startup._enqueue_recompute_save_hashes_if_needed()
|
||||
|
||||
|
||||
def test_enqueue_recompute_swallows_count_error(mocker):
|
||||
|
||||
Reference in New Issue
Block a user