From 5bb10dacd1995bbbb1ec6c1f6d269abd427126cf Mon Sep 17 00:00:00 2001 From: nendo Date: Fri, 29 May 2026 17:38:49 +0900 Subject: [PATCH] SaveSync: paginate recompute task scan by primary key get_all_saves() materialized every Save row across all users into a single .all() list. On instances with very large libraries that's a real RAM ceiling and pins every row for the lifetime of the recompute run. Replace it with get_saves_after_id(after_id, limit) and have the recompute task drive keyset pagination in PAGE_SIZE-row chunks. SQLAlchemy streaming via .execution_options(yield_per=...) is incompatible with the per-call session lifetime that @begin_session enforces (the session exits before the consumer iterates), so keyset paging from the caller is the cleanest fit. Behavior is unchanged: same row coverage, same idempotency, same counters. Memory usage drops from O(all saves) to O(PAGE_SIZE). --- backend/handler/database/saves_handler.py | 16 +++- .../manual/recompute_save_content_hashes.py | 87 +++++++++++-------- .../handler/database/test_saves_handler.py | 60 +++++++++++++ 3 files changed, 123 insertions(+), 40 deletions(-) diff --git a/backend/handler/database/saves_handler.py b/backend/handler/database/saves_handler.py index 2e2030780..f9577e25e 100644 --- a/backend/handler/database/saves_handler.py +++ b/backend/handler/database/saves_handler.py @@ -196,10 +196,18 @@ class DBSavesHandler(DBBaseHandler): ) @begin_session - def get_all_saves( + def get_saves_after_id( self, + after_id: int, + limit: int, session: Session = None, # type: ignore ) -> Sequence[Save]: - """Every Save row across all users, ordered by id. Used by the - recompute_save_content_hashes maintenance task.""" - return session.scalars(select(Save).order_by(asc(Save.id))).all() + """Page Save rows by primary key. Returns up to ``limit`` rows with + ``id > after_id``, ordered by id. Used by the + recompute_save_content_hashes maintenance task to walk every row in + bounded-memory batches: streaming via ``yield_per`` is incompatible + with the per-call session lifetime that ``@begin_session`` enforces, + so the caller drives pagination with this method instead.""" + return session.scalars( + select(Save).where(Save.id > after_id).order_by(asc(Save.id)).limit(limit) + ).all() diff --git a/backend/tasks/manual/recompute_save_content_hashes.py b/backend/tasks/manual/recompute_save_content_hashes.py index b0d33032a..beae3d395 100644 --- a/backend/tasks/manual/recompute_save_content_hashes.py +++ b/backend/tasks/manual/recompute_save_content_hashes.py @@ -23,6 +23,7 @@ from tasks.tasks import Task, TaskType, update_job_meta from utils.context import initialize_context META_FLUSH_EVERY = 100 +PAGE_SIZE = 1000 @dataclass @@ -69,44 +70,58 @@ class RecomputeSaveContentHashesTask(Task): log.info(f"Starting {self.title} task...") stats = RecomputeSaveHashesStats() - saves = db_save_handler.get_all_saves() - for save in saves: - stats.saves_scanned += 1 + # Keyset-paginate by primary key instead of loading every Save row + # at once. On instances with very large save libraries the full + # table can be hundreds of thousands of rows; .all() would pull + # them all into the worker's RAM and pin them for the whole run. + last_id = 0 + while True: + batch = db_save_handler.get_saves_after_id( + after_id=last_id, limit=PAGE_SIZE + ) + if not batch: + break + + for save in batch: + stats.saves_scanned += 1 + last_id = save.id + + relative_path = f"{save.file_path}/{save.file_name}" + try: + new_hash = await fs_asset_handler.compute_content_hash( + relative_path + ) + except Exception as e: + log.warning( + f"Failed to compute content_hash for save {save.id} " + f"({relative_path}): {e}" + ) + stats.errors += 1 + self._maybe_flush(stats) + continue + + if new_hash is None: + stats.saves_missing_fs += 1 + self._maybe_flush(stats) + continue + + if new_hash == save.content_hash: + stats.saves_unchanged += 1 + self._maybe_flush(stats) + continue + + try: + db_save_handler.update_save(save.id, {"content_hash": new_hash}) + stats.saves_updated += 1 + log.debug( + f"Rewrote content_hash for save {save.id} " + f"({relative_path}): {save.content_hash} -> {new_hash}" + ) + except Exception as e: + log.warning(f"Failed to update save {save.id}: {e}") + stats.errors += 1 - relative_path = f"{save.file_path}/{save.file_name}" - try: - new_hash = await fs_asset_handler.compute_content_hash(relative_path) - except Exception as e: - log.warning( - f"Failed to compute content_hash for save {save.id} " - f"({relative_path}): {e}" - ) - stats.errors += 1 self._maybe_flush(stats) - continue - - if new_hash is None: - stats.saves_missing_fs += 1 - self._maybe_flush(stats) - continue - - if new_hash == save.content_hash: - stats.saves_unchanged += 1 - self._maybe_flush(stats) - continue - - try: - db_save_handler.update_save(save.id, {"content_hash": new_hash}) - stats.saves_updated += 1 - log.debug( - f"Rewrote content_hash for save {save.id} " - f"({relative_path}): {save.content_hash} -> {new_hash}" - ) - except Exception as e: - log.warning(f"Failed to update save {save.id}: {e}") - stats.errors += 1 - - self._maybe_flush(stats) stats.flush() log.info( diff --git a/backend/tests/handler/database/test_saves_handler.py b/backend/tests/handler/database/test_saves_handler.py index 6275a16b9..1c7d4f411 100644 --- a/backend/tests/handler/database/test_saves_handler.py +++ b/backend/tests/handler/database/test_saves_handler.py @@ -444,6 +444,66 @@ class TestDBSavesHandlerGetSaveByContentHash: assert result.id == created.id +class TestDBSavesHandlerGetSavesAfterId: + """Cover keyset pagination used by the recompute_save_content_hashes + maintenance task. Per-page bounded reads avoid materializing the full + saves table on instances with very large libraries.""" + + def test_paginates_in_order_and_terminates(self, admin_user: User, rom: Rom): + created_ids = [] + for i in range(5): + created = db_save_handler.add_save( + Save( + rom_id=rom.id, + user_id=admin_user.id, + file_name=f"page_{i}.sav", + file_name_no_tags=f"page_{i}", + file_name_no_ext=f"page_{i}", + file_extension="sav", + emulator="test_emu", + file_path=f"{rom.platform_slug}/saves", + file_size_bytes=100, + slot=f"slot_{i}", + ) + ) + created_ids.append(created.id) + + seen: list[int] = [] + last_id = 0 + while True: + batch = db_save_handler.get_saves_after_id(after_id=last_id, limit=2) + if not batch: + break + for s in batch: + seen.append(s.id) + last_id = s.id + + for cid in created_ids: + assert cid in seen + # IDs returned monotonically by primary key + assert seen == sorted(seen) + + def test_after_id_excludes_anchor_row(self, admin_user: User, rom: Rom): + created = db_save_handler.add_save( + Save( + rom_id=rom.id, + user_id=admin_user.id, + file_name="anchor.sav", + file_name_no_tags="anchor", + file_name_no_ext="anchor", + file_extension="sav", + emulator="test_emu", + file_path=f"{rom.platform_slug}/saves", + file_size_bytes=100, + slot="anchor_slot", + ) + ) + + batch = db_save_handler.get_saves_after_id(after_id=created.id, limit=10) + + assert all(s.id > created.id for s in batch) + + class TestDBSavesHandlerSummary: def test_get_saves_summary_basic(self, admin_user: User, rom: Rom): from datetime import datetime, timedelta, timezone