diff --git a/backend/startup.py b/backend/startup.py index a1c4a80e9..f5266d180 100644 --- a/backend/startup.py +++ b/backend/startup.py @@ -4,6 +4,7 @@ import asyncio import sentry_sdk from opentelemetry import trace +from rq.exceptions import DuplicateJobError from config import ( ENABLE_SCHEDULED_CONVERT_IMAGES_TO_WEBP, @@ -47,6 +48,8 @@ from utils.context import initialize_context tracer = trace.get_tracer(__name__) +RECOMPUTE_SAVE_HASHES_JOB_ID = "recompute_save_content_hashes:bootstrap" + def _enqueue_recompute_save_hashes_if_needed() -> None: """Backfill content_hash for saves uploaded before the path-resolution @@ -71,6 +74,8 @@ def _enqueue_recompute_save_hashes_if_needed() -> None: try: low_prio_queue.enqueue( recompute_save_content_hashes_task.run, + job_id=RECOMPUTE_SAVE_HASHES_JOB_ID, + unique=True, job_timeout=TASK_TIMEOUT, meta={ "task_name": recompute_save_content_hashes_task.title, @@ -81,6 +86,11 @@ def _enqueue_recompute_save_hashes_if_needed() -> None: f"Enqueued recompute_save_content_hashes ({missing} saves with NULL content_hash); " "running on low-priority worker" ) + except DuplicateJobError: + log.info( + "recompute_save_content_hashes already queued or running from a " + "previous restart; skipping enqueue" + ) except Exception: log.exception( "Failed to enqueue recompute_save_content_hashes; admins can run it manually" diff --git a/backend/tests/test_startup.py b/backend/tests/test_startup.py index 9a4afffff..741dcd338 100644 --- a/backend/tests/test_startup.py +++ b/backend/tests/test_startup.py @@ -1,6 +1,7 @@ """Tests for startup-time auto-enqueue of the recompute task.""" import startup +from rq.exceptions import DuplicateJobError def test_enqueue_recompute_skips_when_no_missing_hashes(mocker): @@ -38,6 +39,25 @@ def test_enqueue_recompute_fires_when_missing_hashes_present(mocker): # Job timeout must be passed; otherwise long-running recomputes get killed # by RQ's default short timeout on very large libraries. assert kwargs["job_timeout"] == startup.TASK_TIMEOUT + # Deterministic job_id + unique=True prevent duplicate enqueues across + # API process restarts while a previous recompute is still running. + assert kwargs["job_id"] == startup.RECOMPUTE_SAVE_HASHES_JOB_ID + assert kwargs["unique"] is True + + +def test_enqueue_recompute_silently_skips_duplicate(mocker): + """A DuplicateJobError from RQ (job already queued/running) is logged and + swallowed, not propagated.""" + mocker.patch.object( + startup.db_save_handler, "count_saves_missing_content_hash", return_value=10 + ) + mocker.patch.object( + startup.low_prio_queue, + "enqueue", + side_effect=DuplicateJobError("already enqueued"), + ) + + startup._enqueue_recompute_save_hashes_if_needed() def test_enqueue_recompute_swallows_count_error(mocker):