From ec50f75d77a6c2219c87cecb126adb5abfca8ca3 Mon Sep 17 00:00:00 2001 From: nendo Date: Fri, 29 May 2026 17:35:32 +0900 Subject: [PATCH] SaveSync: dedupe bootstrap recompute job across API restarts Pass a deterministic job_id and unique=True to low_prio_queue.enqueue so a restart while a previous bootstrap recompute is still queued or running no-ops the second enqueue. Without this, every API restart with a NULL-hash row left would push another duplicate job onto the low-priority queue; RQ would happily run both back-to-back, redoing the same scans and content-hash reads against the filesystem. RQ raises rq.exceptions.DuplicateJobError when unique=True hits an existing job ID. Swallow it with a log line and let other enqueue failures fall through to the generic exception path so they still get logged with a traceback. --- backend/startup.py | 10 ++++++++++ backend/tests/test_startup.py | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/backend/startup.py b/backend/startup.py index a1c4a80e9..f5266d180 100644 --- a/backend/startup.py +++ b/backend/startup.py @@ -4,6 +4,7 @@ import asyncio import sentry_sdk from opentelemetry import trace +from rq.exceptions import DuplicateJobError from config import ( ENABLE_SCHEDULED_CONVERT_IMAGES_TO_WEBP, @@ -47,6 +48,8 @@ from utils.context import initialize_context tracer = trace.get_tracer(__name__) +RECOMPUTE_SAVE_HASHES_JOB_ID = "recompute_save_content_hashes:bootstrap" + def _enqueue_recompute_save_hashes_if_needed() -> None: """Backfill content_hash for saves uploaded before the path-resolution @@ -71,6 +74,8 @@ def _enqueue_recompute_save_hashes_if_needed() -> None: try: low_prio_queue.enqueue( recompute_save_content_hashes_task.run, + job_id=RECOMPUTE_SAVE_HASHES_JOB_ID, + unique=True, job_timeout=TASK_TIMEOUT, meta={ "task_name": recompute_save_content_hashes_task.title, @@ -81,6 +86,11 @@ def _enqueue_recompute_save_hashes_if_needed() -> None: f"Enqueued recompute_save_content_hashes ({missing} saves with NULL content_hash); " "running on low-priority worker" ) + except DuplicateJobError: + log.info( + "recompute_save_content_hashes already queued or running from a " + "previous restart; skipping enqueue" + ) except Exception: log.exception( "Failed to enqueue recompute_save_content_hashes; admins can run it manually" diff --git a/backend/tests/test_startup.py b/backend/tests/test_startup.py index 9a4afffff..741dcd338 100644 --- a/backend/tests/test_startup.py +++ b/backend/tests/test_startup.py @@ -1,6 +1,7 @@ """Tests for startup-time auto-enqueue of the recompute task.""" import startup +from rq.exceptions import DuplicateJobError def test_enqueue_recompute_skips_when_no_missing_hashes(mocker): @@ -38,6 +39,25 @@ def test_enqueue_recompute_fires_when_missing_hashes_present(mocker): # Job timeout must be passed; otherwise long-running recomputes get killed # by RQ's default short timeout on very large libraries. assert kwargs["job_timeout"] == startup.TASK_TIMEOUT + # Deterministic job_id + unique=True prevent duplicate enqueues across + # API process restarts while a previous recompute is still running. + assert kwargs["job_id"] == startup.RECOMPUTE_SAVE_HASHES_JOB_ID + assert kwargs["unique"] is True + + +def test_enqueue_recompute_silently_skips_duplicate(mocker): + """A DuplicateJobError from RQ (job already queued/running) is logged and + swallowed, not propagated.""" + mocker.patch.object( + startup.db_save_handler, "count_saves_missing_content_hash", return_value=10 + ) + mocker.patch.object( + startup.low_prio_queue, + "enqueue", + side_effect=DuplicateJobError("already enqueued"), + ) + + startup._enqueue_recompute_save_hashes_if_needed() def test_enqueue_recompute_swallows_count_error(mocker):