feat: Migrate watcher to use watchfiles library

The `watchfiles` library supports event batching, which allows us to
process multiple filesystem changes in a single run.

This change also avoids database calls in the watcher as much as
possible.
This commit is contained in:
Michael Manganiello
2025-08-07 13:01:45 -03:00
parent 1e6bfffe92
commit 76db0ab98c
5 changed files with 88 additions and 95 deletions

View File

@@ -1,6 +1,9 @@
import enum
import json
import os
import sys
from collections.abc import Sequence
from datetime import timedelta
from typing import cast
import sentry_sdk
from config import (
@@ -26,13 +29,6 @@ from logger.logger import log
from rq.job import Job
from tasks.tasks import tasks_scheduler
from utils import get_version
from watchdog.events import (
DirCreatedEvent,
DirDeletedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileSystemMovedEvent,
)
sentry_sdk.init(
dsn=SENTRY_DSN,
@@ -41,58 +37,60 @@ sentry_sdk.init(
structure_level = 2 if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH) else 1
valid_events = frozenset(
@enum.unique
class EventType(enum.StrEnum):
ADDED = "added"
MODIFIED = "modified"
DELETED = "deleted"
VALID_EVENTS = frozenset(
(
DirCreatedEvent.event_type,
DirDeletedEvent.event_type,
FileCreatedEvent.event_type,
FileDeletedEvent.event_type,
FileSystemMovedEvent.event_type,
EventType.ADDED,
EventType.DELETED,
)
)
# A change is a tuple representing a file change, first element is the event type, second is the
# path of the file or directory that changed.
Change = tuple[EventType, str]
def on_any_event(
src_path: str,
_dest_path: str,
event_type: str,
):
if event_type not in valid_events:
return
def process_changes(changes: Sequence[Change]) -> None:
if not ENABLE_RESCAN_ON_FILESYSTEM_CHANGE:
return
src_path = os.fsdecode(src_path)
event_src = src_path.split(LIBRARY_BASE_PATH)[-1]
event_src_parts = event_src.split("/")
if len(event_src_parts) <= structure_level:
log.warning(
f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event."
)
# Filter for valid events.
changes = [change for change in changes if change[0] in VALID_EVENTS]
if not changes:
return
fs_slug = event_src_parts[structure_level]
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
# Find affected platform slugs.
fs_slugs: set[str] = set()
changes_platform_directory = False
for change in changes:
event_type, change_path = change
src_path = os.fsdecode(change_path)
event_src = src_path.split(LIBRARY_BASE_PATH)[-1]
event_src_parts = event_src.split("/")
if len(event_src_parts) <= structure_level:
log.warning(
f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event."
)
continue
log.info(f"Filesystem event: {event_type} {event_src}")
if len(event_src_parts) == structure_level + 1:
changes_platform_directory = True
# Skip if a scan is already scheduled
for job in tasks_scheduler.get_jobs():
if isinstance(job, Job):
if job.func_name == "endpoints.sockets.scan.scan_platforms":
if job.args[0] == []:
log.info("Full rescan already scheduled")
return
log.info(f"Filesystem event: {event_type} {event_src}")
fs_slugs.add(event_src_parts[structure_level])
if db_platform and db_platform.id in job.args[0]:
log.info(f"Scan already scheduled for {hl(fs_slug)}")
return
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
if not fs_slugs:
log.info("No valid filesystem slugs found in changes, exiting...")
return
# Check whether any metadata source is enabled.
source_mapping: dict[str, bool] = {
MetadataSource.IGDB: IGDB_API_ENABLED,
MetadataSource.SS: SS_API_ENABLED,
@@ -102,14 +100,29 @@ def on_any_event(
MetadataSource.HASHEOUS: HASHEOUS_API_ENABLED,
MetadataSource.SGDB: STEAMGRIDDB_API_ENABLED,
}
metadata_sources = [source for source, flag in source_mapping.items() if flag]
if not metadata_sources:
log.warning("No metadata sources enabled, skipping rescan")
return
# Any change to a platform directory should trigger a full rescan
if len(event_src_parts) == structure_level + 1:
# Get currently scheduled jobs for the scan_platforms function.
already_scheduled_jobs = [
job
for job in tasks_scheduler.get_jobs()
if isinstance(job, Job)
and job.func_name == "endpoints.sockets.scan.scan_platforms"
]
# If a full rescan is already scheduled, skip further processing.
if any(job.args[0] == [] for job in already_scheduled_jobs):
log.info("Full rescan already scheduled")
return
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
# Any change to a platform directory should trigger a full rescan.
if changes_platform_directory:
log.info(f"Platform directory changed, {rescan_in_msg}")
tasks_scheduler.enqueue_in(
time_delta,
@@ -118,8 +131,20 @@ def on_any_event(
scan_type=ScanType.UNIDENTIFIED,
metadata_sources=metadata_sources,
)
# Otherwise trigger a rescan for the specific platform
elif db_platform:
return
# Otherwise, process each platform slug.
for fs_slug in fs_slugs:
# TODO: Query platforms from the database in bulk.
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
if not db_platform:
continue
# Skip if a scan is already scheduled for this platform.
if any(db_platform.id in job.args[0] for job in already_scheduled_jobs):
log.info(f"Scan already scheduled for {hl(fs_slug)}")
continue
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
tasks_scheduler.enqueue_in(
time_delta,
@@ -131,8 +156,6 @@ def on_any_event(
if __name__ == "__main__":
watch_src_path = sys.argv[1]
watch_dest_path = sys.argv[2]
watch_event_type = sys.argv[3]
on_any_event(watch_src_path, watch_dest_path, watch_event_type)
changes = cast(list[Change], json.loads(os.getenv("WATCHFILES_CHANGES", "[]")))
if changes:
process_changes(changes)

View File

@@ -201,11 +201,9 @@ start_bin_rq_worker() {
start_bin_watcher() {
info_log "Starting watcher"
watchmedo shell-command \
--patterns='**/*' \
--ignore-patterns='.DS_Store' \
--recursive \
--command='uv run python watcher.py "${watch_src_path}" "${watch_dest_path}" "${watch_event_type}" "${watch_object}"' \
watchfiles \
--target-type command \
'uv run python watcher.py' \
/romm/library &
WATCHER_PID=$!
echo "${WATCHER_PID}" >/tmp/watcher.pid

View File

@@ -67,11 +67,9 @@ PYTHONPATH="/app/backend:${PYTHONPATH-}" rq worker \
high default low &
echo "Starting watcher..."
watchmedo shell-command \
--patterns='**/*' \
--ignore-patterns='.DS_Store' \
--recursive \
--command='uv run python watcher.py "${watch_src_path}" "${watch_dest_path}" "${watch_event_type}" "${watch_object}"' \
watchfiles \
--target-type command \
'uv run python watcher.py' \
/app/romm/library &
# Start the frontend dev server

View File

@@ -48,7 +48,7 @@ dependencies = [
"user-agents ~= 2.2",
"uvicorn ~= 0.35",
"uvicorn-worker ~= 0.3",
"watchdog[watchmedo] ~= 6.0",
"watchfiles ~= 1.1",
"yarl ~= 1.14",
"zipfile-inflate64 ~= 0.1",
]

32
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
resolution-markers = [
"platform_python_implementation != 'PyPy'",
@@ -1723,7 +1723,7 @@ dependencies = [
{ name = "user-agents" },
{ name = "uvicorn" },
{ name = "uvicorn-worker" },
{ name = "watchdog", extra = ["watchmedo"] },
{ name = "watchfiles" },
{ name = "yarl" },
{ name = "zipfile-inflate64" },
]
@@ -1795,7 +1795,7 @@ requires-dist = [
{ name = "user-agents", specifier = "~=2.2" },
{ name = "uvicorn", specifier = "~=0.35" },
{ name = "uvicorn-worker", specifier = "~=0.3" },
{ name = "watchdog", extras = ["watchmedo"], specifier = "~=6.0" },
{ name = "watchfiles", specifier = "~=1.1" },
{ name = "yarl", specifier = "~=1.14" },
{ name = "zipfile-inflate64", specifier = "~=0.1" },
]
@@ -2317,32 +2317,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/13/5d/1f15b252890c968d42b348d1e9b0aa12d5bf3e776704178ec37cceccdb63/vcrpy-7.0.0-py2.py3-none-any.whl", hash = "sha256:55791e26c18daa363435054d8b35bd41a4ac441b6676167635d1b37a71dbe124", size = 42321, upload-time = "2024-12-31T00:07:55.277Z" },
]
[[package]]
name = "watchdog"
version = "6.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/db/7d/7f3d619e951c88ed75c6037b246ddcf2d322812ee8ea189be89511721d54/watchdog-6.0.0.tar.gz", hash = "sha256:9ddf7c82fda3ae8e24decda1338ede66e1c99883db93711d8fb941eaa2d8c282", size = 131220, upload-time = "2024-11-01T14:07:13.037Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/68/98/b0345cabdce2041a01293ba483333582891a3bd5769b08eceb0d406056ef/watchdog-6.0.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:490ab2ef84f11129844c23fb14ecf30ef3d8a6abafd3754a6f75ca1e6654136c", size = 96480, upload-time = "2024-11-01T14:06:42.952Z" },
{ url = "https://files.pythonhosted.org/packages/85/83/cdf13902c626b28eedef7ec4f10745c52aad8a8fe7eb04ed7b1f111ca20e/watchdog-6.0.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:76aae96b00ae814b181bb25b1b98076d5fc84e8a53cd8885a318b42b6d3a5134", size = 88451, upload-time = "2024-11-01T14:06:45.084Z" },
{ url = "https://files.pythonhosted.org/packages/fe/c4/225c87bae08c8b9ec99030cd48ae9c4eca050a59bf5c2255853e18c87b50/watchdog-6.0.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a175f755fc2279e0b7312c0035d52e27211a5bc39719dd529625b1930917345b", size = 89057, upload-time = "2024-11-01T14:06:47.324Z" },
{ url = "https://files.pythonhosted.org/packages/a9/c7/ca4bf3e518cb57a686b2feb4f55a1892fd9a3dd13f470fca14e00f80ea36/watchdog-6.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:7607498efa04a3542ae3e05e64da8202e58159aa1fa4acddf7678d34a35d4f13", size = 79079, upload-time = "2024-11-01T14:06:59.472Z" },
{ url = "https://files.pythonhosted.org/packages/5c/51/d46dc9332f9a647593c947b4b88e2381c8dfc0942d15b8edc0310fa4abb1/watchdog-6.0.0-py3-none-manylinux2014_armv7l.whl", hash = "sha256:9041567ee8953024c83343288ccc458fd0a2d811d6a0fd68c4c22609e3490379", size = 79078, upload-time = "2024-11-01T14:07:01.431Z" },
{ url = "https://files.pythonhosted.org/packages/d4/57/04edbf5e169cd318d5f07b4766fee38e825d64b6913ca157ca32d1a42267/watchdog-6.0.0-py3-none-manylinux2014_i686.whl", hash = "sha256:82dc3e3143c7e38ec49d61af98d6558288c415eac98486a5c581726e0737c00e", size = 79076, upload-time = "2024-11-01T14:07:02.568Z" },
{ url = "https://files.pythonhosted.org/packages/ab/cc/da8422b300e13cb187d2203f20b9253e91058aaf7db65b74142013478e66/watchdog-6.0.0-py3-none-manylinux2014_ppc64.whl", hash = "sha256:212ac9b8bf1161dc91bd09c048048a95ca3a4c4f5e5d4a7d1b1a7d5752a7f96f", size = 79077, upload-time = "2024-11-01T14:07:03.893Z" },
{ url = "https://files.pythonhosted.org/packages/2c/3b/b8964e04ae1a025c44ba8e4291f86e97fac443bca31de8bd98d3263d2fcf/watchdog-6.0.0-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:e3df4cbb9a450c6d49318f6d14f4bbc80d763fa587ba46ec86f99f9e6876bb26", size = 79078, upload-time = "2024-11-01T14:07:05.189Z" },
{ url = "https://files.pythonhosted.org/packages/62/ae/a696eb424bedff7407801c257d4b1afda455fe40821a2be430e173660e81/watchdog-6.0.0-py3-none-manylinux2014_s390x.whl", hash = "sha256:2cce7cfc2008eb51feb6aab51251fd79b85d9894e98ba847408f662b3395ca3c", size = 79077, upload-time = "2024-11-01T14:07:06.376Z" },
{ url = "https://files.pythonhosted.org/packages/b5/e8/dbf020b4d98251a9860752a094d09a65e1b436ad181faf929983f697048f/watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:20ffe5b202af80ab4266dcd3e91aae72bf2da48c0d33bdb15c66658e685e94e2", size = 79078, upload-time = "2024-11-01T14:07:07.547Z" },
{ url = "https://files.pythonhosted.org/packages/07/f6/d0e5b343768e8bcb4cda79f0f2f55051bf26177ecd5651f84c07567461cf/watchdog-6.0.0-py3-none-win32.whl", hash = "sha256:07df1fdd701c5d4c8e55ef6cf55b8f0120fe1aef7ef39a1c6fc6bc2e606d517a", size = 79065, upload-time = "2024-11-01T14:07:09.525Z" },
{ url = "https://files.pythonhosted.org/packages/db/d9/c495884c6e548fce18a8f40568ff120bc3a4b7b99813081c8ac0c936fa64/watchdog-6.0.0-py3-none-win_amd64.whl", hash = "sha256:cbafb470cf848d93b5d013e2ecb245d4aa1c8fd0504e863ccefa32445359d680", size = 79070, upload-time = "2024-11-01T14:07:10.686Z" },
{ url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067, upload-time = "2024-11-01T14:07:11.845Z" },
]
[package.optional-dependencies]
watchmedo = [
{ name = "pyyaml" },
]
[[package]]
name = "watchfiles"
version = "1.1.0"