mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
[ROMM-2338] Consistent pending job fetch in watcher
This commit is contained in:
@@ -7,7 +7,8 @@ from typing import cast
|
||||
|
||||
import sentry_sdk
|
||||
from opentelemetry import trace
|
||||
from rq.job import Job
|
||||
from rq import Worker
|
||||
from rq.job import Job, JobStatus
|
||||
|
||||
from config import (
|
||||
ENABLE_RESCAN_ON_FILESYSTEM_CHANGE,
|
||||
@@ -32,6 +33,7 @@ from handler.metadata import (
|
||||
meta_ss_handler,
|
||||
meta_tgdb_handler,
|
||||
)
|
||||
from handler.redis_handler import low_prio_queue, redis_client
|
||||
from handler.scan_handler import MetadataSource, ScanType
|
||||
from logger.formatter import CYAN
|
||||
from logger.formatter import highlight as hl
|
||||
@@ -67,17 +69,60 @@ VALID_EVENTS = frozenset(
|
||||
Change = tuple[EventType, str]
|
||||
|
||||
|
||||
def get_pending_scan_jobs() -> list[Job]:
|
||||
"""Get all pending scan jobs (scheduled, queued, or running) for scan_platforms function.
|
||||
|
||||
Returns:
|
||||
list[Job]: List of pending scan jobs that are not completed or failed
|
||||
"""
|
||||
pending_jobs = []
|
||||
|
||||
# Get jobs from the scheduler (delayed/scheduled jobs)
|
||||
scheduled_jobs = tasks_scheduler.get_jobs()
|
||||
for job in scheduled_jobs:
|
||||
if (
|
||||
isinstance(job, Job)
|
||||
and job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
and job.get_status()
|
||||
in [JobStatus.SCHEDULED, JobStatus.QUEUED, JobStatus.STARTED]
|
||||
):
|
||||
pending_jobs.append(job)
|
||||
|
||||
# Get jobs from the queue (immediate jobs)
|
||||
queue_jobs = low_prio_queue.get_jobs()
|
||||
for job in queue_jobs:
|
||||
if (
|
||||
isinstance(job, Job)
|
||||
and job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
and job.get_status() in [JobStatus.QUEUED, JobStatus.STARTED]
|
||||
):
|
||||
pending_jobs.append(job)
|
||||
|
||||
# Get currently running jobs from workers
|
||||
workers = Worker.all(connection=redis_client)
|
||||
for worker in workers:
|
||||
current_job = worker.get_current_job()
|
||||
if (
|
||||
current_job
|
||||
and current_job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
and current_job.get_status() == JobStatus.STARTED
|
||||
):
|
||||
pending_jobs.append(current_job)
|
||||
|
||||
return pending_jobs
|
||||
|
||||
|
||||
def process_changes(changes: Sequence[Change]) -> None:
|
||||
if not ENABLE_RESCAN_ON_FILESYSTEM_CHANGE:
|
||||
return
|
||||
|
||||
# Filter for valid events.
|
||||
# Filter for valid events
|
||||
changes = [change for change in changes if change[0] in VALID_EVENTS]
|
||||
if not changes:
|
||||
return
|
||||
|
||||
with tracer.start_as_current_span("process_changes"):
|
||||
# Find affected platform slugs.
|
||||
# Find affected platform slugs
|
||||
fs_slugs: set[str] = set()
|
||||
changes_platform_directory = False
|
||||
for change in changes:
|
||||
@@ -101,7 +146,7 @@ def process_changes(changes: Sequence[Change]) -> None:
|
||||
log.info("No valid filesystem slugs found in changes, exiting...")
|
||||
return
|
||||
|
||||
# Check whether any metadata source is enabled.
|
||||
# Check whether any metadata source is enabled
|
||||
source_mapping: dict[str, bool] = {
|
||||
MetadataSource.IGDB: meta_igdb_handler.is_enabled(),
|
||||
MetadataSource.SS: meta_ss_handler.is_enabled(),
|
||||
@@ -119,23 +164,21 @@ def process_changes(changes: Sequence[Change]) -> None:
|
||||
log.warning("No metadata sources enabled, skipping rescan")
|
||||
return
|
||||
|
||||
# Get currently scheduled jobs for the scan_platforms function.
|
||||
already_scheduled_jobs = [
|
||||
job
|
||||
for job in tasks_scheduler.get_jobs()
|
||||
if isinstance(job, Job)
|
||||
and job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
]
|
||||
# Get currently pending scan jobs (scheduled, queued, or running)
|
||||
pending_jobs = get_pending_scan_jobs()
|
||||
|
||||
# If a full rescan is already scheduled, skip further processing.
|
||||
if any(job.args[0] == [] for job in already_scheduled_jobs):
|
||||
log.info("Full rescan already scheduled")
|
||||
# If a full rescan is already scheduled, skip further processing
|
||||
full_rescan_jobs = [
|
||||
job for job in pending_jobs if job.args and job.args[0] == []
|
||||
]
|
||||
if full_rescan_jobs:
|
||||
log.info(f"Full rescan already scheduled ({len(full_rescan_jobs)} job(s))")
|
||||
return
|
||||
|
||||
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
|
||||
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
|
||||
|
||||
# Any change to a platform directory should trigger a full rescan.
|
||||
# Any change to a platform directory should trigger a full rescan
|
||||
if changes_platform_directory:
|
||||
log.info(f"Platform directory changed, {rescan_in_msg}")
|
||||
tasks_scheduler.enqueue_in(
|
||||
@@ -153,16 +196,23 @@ def process_changes(changes: Sequence[Change]) -> None:
|
||||
)
|
||||
return
|
||||
|
||||
# Otherwise, process each platform slug.
|
||||
# Otherwise, process each platform slug
|
||||
for fs_slug in fs_slugs:
|
||||
# TODO: Query platforms from the database in bulk.
|
||||
# TODO: Query platforms from the database in bulk
|
||||
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
|
||||
if not db_platform:
|
||||
continue
|
||||
|
||||
# Skip if a scan is already scheduled for this platform.
|
||||
if any(db_platform.id in job.args[0] for job in already_scheduled_jobs):
|
||||
log.info(f"Scan already scheduled for {hl(fs_slug)}")
|
||||
# Skip if a scan is already scheduled for this platform
|
||||
platform_scan_jobs = [
|
||||
job
|
||||
for job in pending_jobs
|
||||
if job.args and db_platform.id in job.args[0]
|
||||
]
|
||||
if platform_scan_jobs:
|
||||
log.info(
|
||||
f"Scan already scheduled for {hl(fs_slug)} ({len(platform_scan_jobs)} job(s))"
|
||||
)
|
||||
continue
|
||||
|
||||
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
|
||||
|
||||
Reference in New Issue
Block a user