diff --git a/backend/watcher.py b/backend/watcher.py index 4c2b67823..4a45bd037 100644 --- a/backend/watcher.py +++ b/backend/watcher.py @@ -7,7 +7,8 @@ from typing import cast import sentry_sdk from opentelemetry import trace -from rq.job import Job +from rq import Worker +from rq.job import Job, JobStatus from config import ( ENABLE_RESCAN_ON_FILESYSTEM_CHANGE, @@ -32,6 +33,7 @@ from handler.metadata import ( meta_ss_handler, meta_tgdb_handler, ) +from handler.redis_handler import low_prio_queue, redis_client from handler.scan_handler import MetadataSource, ScanType from logger.formatter import CYAN from logger.formatter import highlight as hl @@ -67,17 +69,60 @@ VALID_EVENTS = frozenset( Change = tuple[EventType, str] +def get_pending_scan_jobs() -> list[Job]: + """Get all pending scan jobs (scheduled, queued, or running) for scan_platforms function. + + Returns: + list[Job]: List of pending scan jobs that are not completed or failed + """ + pending_jobs = [] + + # Get jobs from the scheduler (delayed/scheduled jobs) + scheduled_jobs = tasks_scheduler.get_jobs() + for job in scheduled_jobs: + if ( + isinstance(job, Job) + and job.func_name == "endpoints.sockets.scan.scan_platforms" + and job.get_status() + in [JobStatus.SCHEDULED, JobStatus.QUEUED, JobStatus.STARTED] + ): + pending_jobs.append(job) + + # Get jobs from the queue (immediate jobs) + queue_jobs = low_prio_queue.get_jobs() + for job in queue_jobs: + if ( + isinstance(job, Job) + and job.func_name == "endpoints.sockets.scan.scan_platforms" + and job.get_status() in [JobStatus.QUEUED, JobStatus.STARTED] + ): + pending_jobs.append(job) + + # Get currently running jobs from workers + workers = Worker.all(connection=redis_client) + for worker in workers: + current_job = worker.get_current_job() + if ( + current_job + and current_job.func_name == "endpoints.sockets.scan.scan_platforms" + and current_job.get_status() == JobStatus.STARTED + ): + pending_jobs.append(current_job) + + return pending_jobs + + def process_changes(changes: Sequence[Change]) -> None: if not ENABLE_RESCAN_ON_FILESYSTEM_CHANGE: return - # Filter for valid events. + # Filter for valid events changes = [change for change in changes if change[0] in VALID_EVENTS] if not changes: return with tracer.start_as_current_span("process_changes"): - # Find affected platform slugs. + # Find affected platform slugs fs_slugs: set[str] = set() changes_platform_directory = False for change in changes: @@ -101,7 +146,7 @@ def process_changes(changes: Sequence[Change]) -> None: log.info("No valid filesystem slugs found in changes, exiting...") return - # Check whether any metadata source is enabled. + # Check whether any metadata source is enabled source_mapping: dict[str, bool] = { MetadataSource.IGDB: meta_igdb_handler.is_enabled(), MetadataSource.SS: meta_ss_handler.is_enabled(), @@ -119,23 +164,21 @@ def process_changes(changes: Sequence[Change]) -> None: log.warning("No metadata sources enabled, skipping rescan") return - # Get currently scheduled jobs for the scan_platforms function. - already_scheduled_jobs = [ - job - for job in tasks_scheduler.get_jobs() - if isinstance(job, Job) - and job.func_name == "endpoints.sockets.scan.scan_platforms" - ] + # Get currently pending scan jobs (scheduled, queued, or running) + pending_jobs = get_pending_scan_jobs() - # If a full rescan is already scheduled, skip further processing. - if any(job.args[0] == [] for job in already_scheduled_jobs): - log.info("Full rescan already scheduled") + # If a full rescan is already scheduled, skip further processing + full_rescan_jobs = [ + job for job in pending_jobs if job.args and job.args[0] == [] + ] + if full_rescan_jobs: + log.info(f"Full rescan already scheduled ({len(full_rescan_jobs)} job(s))") return time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY) rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes." - # Any change to a platform directory should trigger a full rescan. + # Any change to a platform directory should trigger a full rescan if changes_platform_directory: log.info(f"Platform directory changed, {rescan_in_msg}") tasks_scheduler.enqueue_in( @@ -153,16 +196,23 @@ def process_changes(changes: Sequence[Change]) -> None: ) return - # Otherwise, process each platform slug. + # Otherwise, process each platform slug for fs_slug in fs_slugs: - # TODO: Query platforms from the database in bulk. + # TODO: Query platforms from the database in bulk db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug) if not db_platform: continue - # Skip if a scan is already scheduled for this platform. - if any(db_platform.id in job.args[0] for job in already_scheduled_jobs): - log.info(f"Scan already scheduled for {hl(fs_slug)}") + # Skip if a scan is already scheduled for this platform + platform_scan_jobs = [ + job + for job in pending_jobs + if job.args and db_platform.id in job.args[0] + ] + if platform_scan_jobs: + log.info( + f"Scan already scheduled for {hl(fs_slug)} ({len(platform_scan_jobs)} job(s))" + ) continue log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")