run watchdog as a native task

This commit is contained in:
Georges-Antoine Assi
2025-08-06 23:25:47 -04:00
parent e156158a3e
commit 7e0cb2272d
3 changed files with 77 additions and 99 deletions

View File

@@ -1,4 +1,5 @@
import os
import sys
from datetime import timedelta
import sentry_sdk
@@ -23,99 +24,74 @@ from watchdog.events import (
DirDeletedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileSystemEvent,
FileSystemEventHandler,
FileSystemMovedEvent,
)
from watchdog.observers import Observer
sentry_sdk.init(
dsn=SENTRY_DSN,
release=f"romm@{get_version()}",
)
path = (
cm.get_config().HIGH_PRIO_STRUCTURE_PATH
if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH)
else LIBRARY_BASE_PATH
)
structure_level = 2 if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH) else 1
valid_events = [
DirCreatedEvent.event_type,
DirDeletedEvent.event_type,
FileCreatedEvent.event_type,
FileDeletedEvent.event_type,
FileSystemMovedEvent.event_type,
]
class EventHandler(FileSystemEventHandler):
"""Filesystem event handler"""
def on_any_event(src_path: str, _dest_path: str, event_type: str, object: str):
if event_type not in valid_events:
return
def on_any_event(self, event: FileSystemEvent) -> None:
"""Catch-all event handler.
if not ENABLE_RESCAN_ON_FILESYSTEM_CHANGE:
return
Args:
event: The event object representing the file system event.
"""
if not ENABLE_RESCAN_ON_FILESYSTEM_CHANGE:
return None
src_path = os.fsdecode(src_path)
src_path = os.fsdecode(event.src_path)
event_src = src_path.split(LIBRARY_BASE_PATH)[-1]
fs_slug = event_src.split("/")[structure_level]
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
# Ignore .DS_Store files
if src_path.endswith(".DS_Store"):
return None
log.info(f"Filesystem event: {event_type} {event_src} {fs_slug} {db_platform}")
event_src = src_path.split(path)[-1]
fs_slug = event_src.split("/")[1]
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
# Skip if a scan is already scheduled
for job in tasks_scheduler.get_jobs():
if isinstance(job, Job):
if job.func_name == "endpoints.sockets.scan.scan_platforms":
if job.args[0] == []:
log.info("Full rescan already scheduled")
return
log.info(f"Filesystem event: {event.event_type} {event_src}")
if db_platform and db_platform.id in job.args[0]:
log.info(f"Scan already scheduled for {hl(fs_slug)}")
return
# Skip if a scan is already scheduled
for job in tasks_scheduler.get_jobs():
if isinstance(job, Job):
if job.func_name == "endpoints.sockets.scan.scan_platforms":
if job.args[0] == []:
log.info("Full rescan already scheduled")
return None
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
if db_platform and db_platform.id in job.args[0]:
log.info(f"Scan already scheduled for {hl(fs_slug)}")
return None
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
# Any change to a platform directory should trigger a full rescan
if event.is_directory and event_src.count("/") == 1:
log.info(f"Platform directory changed, {rescan_in_msg}")
tasks_scheduler.enqueue_in(time_delta, scan_platforms, [])
elif db_platform:
# Otherwise trigger a rescan for the specific platform
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
tasks_scheduler.enqueue_in(
time_delta,
scan_platforms,
[db_platform.id],
scan_type=ScanType.QUICK,
)
# Any change to a platform directory should trigger a full rescan
if object == "directory" and event_src.count("/") == 1:
log.info(f"Platform directory changed, {rescan_in_msg}")
tasks_scheduler.enqueue_in(time_delta, scan_platforms, [])
elif db_platform:
# Otherwise trigger a rescan for the specific platform
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
tasks_scheduler.enqueue_in(
time_delta,
scan_platforms,
[db_platform.id],
scan_type=ScanType.QUICK,
)
if __name__ == "__main__":
observer = Observer()
observer.schedule(
EventHandler(),
path,
recursive=True,
event_filter=[
DirCreatedEvent,
DirDeletedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileSystemMovedEvent,
],
)
observer.start()
watch_src_path = sys.argv[1]
watch_dest_path = sys.argv[2]
watch_event_type = sys.argv[3]
watch_object = sys.argv[4]
log.info(f"Watching {hl(path)} for changes")
try:
while observer.is_alive():
observer.join(1)
finally:
observer.stop()
observer.join()
on_any_event(watch_src_path, watch_dest_path, watch_event_type, watch_object)

View File

@@ -198,35 +198,29 @@ start_bin_rq_worker() {
high default low &
}
# function that runs our independent python scripts and creates corresponding PID files,
start_python() {
SCRIPT="${1}"
info_log "Starting ${SCRIPT}"
python3 "${SCRIPT}.py" &
start_bin_watcher() {
info_log "Starting watcher"
watchmedo shell-command \
--patterns='**/*' \
--ignore-patterns='.DS_Store' \
--recursive \
--command='uv run python watcher.py "${watch_src_path}" "${watch_dest_path}" "${watch_event_type}" "${watch_object}"' \
/romm/library &
WATCHER_PID=$!
echo "${WATCHER_PID}" >"/tmp/${SCRIPT}.pid"
echo "${WATCHER_PID}" >/tmp/watcher.pid
}
watchdog_process_pid() {
TYPE=$1
PROCESS=$2
if [[ -f "/tmp/${PROCESS}.pid" ]]; then
# check if the pid we last wrote to our state file is actually active
# Check if the pid we last wrote to our state file is actually active
PID=$(cat "/tmp/${PROCESS}.pid") || true
if [[ ! -d "/proc/${PID}" ]]; then
if [[ ${TYPE} == "bin" ]]; then
start_bin_"${PROCESS}"
elif [[ ${TYPE} == "python" ]]; then
start_python "${PROCESS}"
fi
start_bin_"${PROCESS}"
fi
else
# start process if we dont have a corresponding PID file
if [[ ${TYPE} == "bin" ]]; then
start_bin_"${PROCESS}"
elif [[ ${TYPE} == "python" ]]; then
start_python "${PROCESS}"
fi
# Start process if we dont have a corresponding PID file
start_bin_"${PROCESS}"
fi
}
@@ -267,7 +261,7 @@ rm /tmp/*.pid -f
# Start Valkey server if REDIS_HOST is not set (which would mean user is using an external Redis/Valkey)
if [[ -z ${REDIS_HOST} ]]; then
watchdog_process_pid bin valkey-server
watchdog_process_pid valkey-server
else
warn_log "REDIS_HOST is set, not starting internal valkey-server"
fi
@@ -282,21 +276,21 @@ fi
# main loop
while ! ((exited)); do
watchdog_process_pid bin gunicorn
watchdog_process_pid gunicorn
# only start the scheduler if enabled
if [[ ${ENABLE_SCHEDULED_RESCAN} == "true" || ${ENABLE_SCHEDULED_UPDATE_SWITCH_TITLEDB} == "true" || ${ENABLE_SCHEDULED_UPDATE_LAUNCHBOX_METADATA} == "true" ]]; then
watchdog_process_pid bin rq_scheduler
watchdog_process_pid rq_scheduler
fi
watchdog_process_pid bin rq_worker
watchdog_process_pid rq_worker
# only start the watcher if enabled
if [[ ${ENABLE_RESCAN_ON_FILESYSTEM_CHANGE} == "true" ]]; then
watchdog_process_pid python watcher
watchdog_process_pid watcher
fi
watchdog_process_pid bin nginx
watchdog_process_pid nginx
# check for died processes every 5 seconds
sleep 5

View File

@@ -64,6 +64,14 @@ PYTHONPATH="/app/backend:${PYTHONPATH-}" rq worker \
--url "${REDIS_URL}" \
high default low &
echo "Starting watcher..."
watchmedo shell-command \
--patterns='**/*' \
--ignore-patterns='.DS_Store' \
--recursive \
--command='uv run python watcher.py "${watch_src_path}" "${watch_dest_path}" "${watch_event_type}" "${watch_object}"' \
/app/romm/library &
# Start the frontend dev server
cd /app/frontend
npm run dev &