Merge branch 'master' into copilot/support-sortname-tag-es-de

This commit is contained in:
Georges-Antoine Assi
2026-06-18 09:04:17 -04:00
755 changed files with 120532 additions and 5823 deletions

View File

@@ -192,6 +192,22 @@ async def test_hybrid_auth_backend_invalid_scheme():
assert result is None
@pytest.mark.parametrize("authorization_header", ["Bearer ", "Foo", "a b c"])
async def test_hybrid_auth_backend_malformed_authorization_header(
authorization_header: str,
):
class MockConnection(HTTPConnection):
def __init__(self):
self.scope: dict[str, dict] = {"session": {}}
self._headers = {"Authorization": authorization_header}
backend = HybridAuthBackend()
conn = MockConnection()
result = await backend.authenticate(conn)
assert result is None
async def test_hybrid_auth_backend_with_refresh_token(editor_user: User):
refresh_token = oauth_handler.create_refresh_token(
data={

View File

@@ -259,7 +259,7 @@ interactions:
code: 200
message: OK
- request:
body: '{"mD5":"7de64234ee20788b9d74d2fdb3462aed","shA1":"77693a00418a9d8971b7a005f2001d997e359bff","crc":"d56d1c89"}'
body: '[{"mD5":"7de64234ee20788b9d74d2fdb3462aed","shA1":"77693a00418a9d8971b7a005f2001d997e359bff","crc":"d56d1c89"}]'
headers:
accept:
- "*/*"
@@ -743,7 +743,7 @@ interactions:
code: 200
message: OK
- request:
body: '{"mD5":"7de64234ee20788b9d74d2fdb3462aed","shA1":"77693a00418a9d8971b7a005f2001d997e359bff","crc":"d56d1c89"}'
body: '[{"mD5":"7de64234ee20788b9d74d2fdb3462aed","shA1":"77693a00418a9d8971b7a005f2001d997e359bff","crc":"d56d1c89"}]'
headers:
accept:
- "*/*"

View File

@@ -0,0 +1,24 @@
from handler.database import db_collection_handler
from models.collection import SmartCollection
from models.user import User
def test_get_smart_collection_roms_normalizes_legacy_selected_status_lists(
admin_user: User, mocker
):
get_roms_scalar = mocker.patch("handler.database.db_rom_handler.get_roms_scalar")
smart_collection = SmartCollection(
name="Finished games",
description="",
user_id=admin_user.id,
filter_criteria={"selected_status": ["finished", "completed_100"]},
)
db_collection_handler.get_smart_collection_roms(
smart_collection, user_id=admin_user.id
)
assert get_roms_scalar.call_args.kwargs["statuses"] == [
"finished",
"completed_100",
]

View File

@@ -66,6 +66,86 @@ class TestGetSyncsForDeviceAndSaves:
assert result == []
class TestGetSyncsForSaves:
def test_returns_syncs_across_devices_with_name(
self, admin_user: User, rom: Rom, save: Save
):
dev_a = db_device_handler.add_device(
Device(id="multi-dev-a", user_id=admin_user.id, name="Device A")
)
dev_b = db_device_handler.add_device(
Device(id="multi-dev-b", user_id=admin_user.id, name="Device B")
)
db_device_save_sync_handler.upsert_sync(dev_a.id, save.id)
db_device_save_sync_handler.upsert_sync(dev_b.id, save.id)
result = db_device_save_sync_handler.get_syncs_for_saves([save.id])
assert set(result.keys()) == {save.id}
by_device = {sync.device_id: name for sync, name in result[save.id]}
assert by_device == {dev_a.id: "Device A", dev_b.id: "Device B"}
def test_filters_to_requested_saves(self, admin_user: User, rom: Rom):
device = db_device_handler.add_device(
Device(id="multi-dev-c", user_id=admin_user.id, name="Device C")
)
saves = []
for i in range(3):
s = db_save_handler.add_save(
Save(
rom_id=rom.id,
user_id=admin_user.id,
file_name=f"multi_{i}.sav",
file_name_no_tags=f"multi_{i}",
file_name_no_ext=f"multi_{i}",
file_extension="sav",
emulator="emu",
file_path=f"{rom.platform_slug}/saves",
file_size_bytes=100,
)
)
saves.append(s)
db_device_save_sync_handler.upsert_sync(device.id, s.id)
result = db_device_save_sync_handler.get_syncs_for_saves(
[saves[0].id, saves[2].id]
)
assert set(result.keys()) == {saves[0].id, saves[2].id}
def test_empty_save_ids_returns_empty(self, admin_user: User):
result = db_device_save_sync_handler.get_syncs_for_saves([])
assert result == {}
class TestOriginDeviceCascade:
def test_origin_device_id_nulled_on_device_delete(self, admin_user: User, rom: Rom):
device = db_device_handler.add_device(
Device(id="origin-dev", user_id=admin_user.id, name="Origin")
)
save = db_save_handler.add_save(
Save(
rom_id=rom.id,
user_id=admin_user.id,
file_name="origin_cascade.sav",
file_name_no_tags="origin_cascade",
file_name_no_ext="origin_cascade",
file_extension="sav",
emulator="emu",
file_path=f"{rom.platform_slug}/saves",
file_size_bytes=100,
origin_device_id=device.id,
)
)
assert save.origin_device_id == device.id
db_device_handler.delete_device(device.id, admin_user.id)
refreshed = db_save_handler.get_save(user_id=admin_user.id, id=save.id)
assert refreshed is not None
assert refreshed.origin_device_id is None
class TestUpsertSync:
def test_creates_new_sync(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(

View File

@@ -0,0 +1,227 @@
"""Unit tests for the Redis-backed, versioned filter/char-index cache.
`with_filter_values` and `with_char_index` memoise their (expensive) results
in Redis under a key that embeds a global version number. Bumping that version
(via `invalidate_filter_values_cache`) is what makes every previously-cached
entry stale at once, and the per-version "keys set" is what lets the bump
actually delete the old entries instead of leaking them until TTL.
These tests pin down that machinery:
1. storing a value also registers its key in the version set,
2. a cache hit returns the exact same shape as the cache miss that filled it,
3. `invalidate_filter_values_cache()` deletes the prior version's keys.
"""
import json
from typing import cast
import pytest
from sqlalchemy import select
from sqlalchemy.orm import Query
from tests.conftest import session as session_factory
from handler.database import db_rom_handler
from handler.database.roms_handler import (
ROM_FILTERS_CACHE_VERSION_KEY,
_filter_values_cache_keys_key,
_filter_values_cache_version,
_store_versioned_cache,
)
from handler.redis_handler import sync_cache
from models.rom import Rom
@pytest.fixture(autouse=True)
def _flush_cache():
"""Start every test from an empty cache (version resets to "0").
The autouse `clear_database` fixture bumps the version key on its way in;
flushing afterwards keeps these tests independent of that counter.
"""
sync_cache.flushall()
yield
sync_cache.flushall()
def _decode_members(raw_members: set) -> set[str]:
return {m.decode() if isinstance(m, bytes) else m for m in raw_members}
def _set_rom_genres(rom_id: int, genres: list[str]) -> None:
"""Drive the `roms_metadata` view by writing the source `igdb_metadata`.
`roms_metadata` is a DB view derived from `roms.igdb_metadata` (etc.), so
metadata is seeded by writing that JSON column, not by inserting into the
view.
"""
with session_factory.begin() as s:
rom: Rom | None = s.get(Rom, rom_id)
assert rom is not None
metadata = dict(rom.igdb_metadata or {})
metadata["genres"] = genres
rom.igdb_metadata = metadata
@pytest.fixture
def rom_with_metadata(rom: Rom) -> Rom:
"""Give the shared `rom` a genre so filter values are non-empty."""
_set_rom_genres(rom.id, ["RPG"])
return rom
class TestStoreVersionedCache:
def test_stores_value_and_registers_key_in_version_set(self):
version = _filter_values_cache_version()
assert version == "0"
redis_key = f"filter_values:probe:v{version}"
result = {"genres": ["RPG"], "platforms": [1]}
_store_versioned_cache(redis_key, version, result)
# The value itself round-trips as JSON under the key.
cached_result = sync_cache.get(redis_key)
assert cached_result is not None
assert json.loads(cached_result) == result
# ...and the key is recorded in this version's keys-set so a later
# invalidation can find and delete it.
members = _decode_members(
sync_cache.smembers(_filter_values_cache_keys_key(version))
)
assert redis_key in members
def test_skips_write_when_version_advanced_mid_flight(self):
"""If the version moved on while we were computing, don't write back.
This guards the race where an invalidation lands between reading the
version and storing the result: a stale entry must not resurrect under
a key the next invalidation no longer knows about.
"""
stale_version = _filter_values_cache_version() # "0"
sync_cache.set(ROM_FILTERS_CACHE_VERSION_KEY, "9")
redis_key = f"filter_values:probe:v{stale_version}"
_store_versioned_cache(redis_key, stale_version, {"genres": []})
assert sync_cache.get(redis_key) is None
assert (
sync_cache.smembers(_filter_values_cache_keys_key(stale_version)) == set()
)
class TestCacheHitMatchesMiss:
def test_with_filter_values_hit_matches_miss(self, rom_with_metadata: Rom):
query = cast(Query[Rom], select(Rom))
cache_key = "all:test-filters"
miss = db_rom_handler.with_filter_values(query=query, cache_key=cache_key)
# The miss must have populated the cache under the current version.
version = _filter_values_cache_version()
redis_key = f"filter_values:{cache_key}:v{version}"
assert sync_cache.get(redis_key) is not None
assert miss["genres"] == ["RPG"]
# Mutating the DB after the miss must NOT change a subsequent hit:
# proves the second call is served from cache, byte-for-byte identical.
_set_rom_genres(rom_with_metadata.id, ["RPG", "Action"])
hit = db_rom_handler.with_filter_values(query=query, cache_key=cache_key)
assert hit == miss
assert hit.keys() == miss.keys()
def test_with_filter_values_without_cache_key_does_not_cache(
self, rom_with_metadata: Rom
):
query = cast(Query[Rom], select(Rom))
result = db_rom_handler.with_filter_values(query=query)
# Nothing written, and a sane shape is still returned.
assert sync_cache.keys("filter_values:*") == []
assert result["genres"] == ["RPG"]
def test_with_char_index_hit_matches_miss(self, rom: Rom):
query = cast(Query[Rom], select(Rom))
cache_key = "all:test-charindex"
miss = db_rom_handler.with_char_index(
query=query, order_by_attr=Rom.name, cache_key=cache_key
)
version = _filter_values_cache_version()
redis_key = f"char_index:{cache_key}:v{version}"
assert sync_cache.get(redis_key) is not None
# "test_rom" -> first letter "t" at position 0.
assert dict(miss) == {"t": 0}
# Add a ROM under a new letter; a cache hit must ignore it.
with session_factory.begin() as s:
s.add(
Rom(
platform_id=rom.platform_id,
name="Another",
slug="another-slug",
fs_name="another.zip",
fs_name_no_tags="another",
fs_name_no_ext="another",
fs_extension="zip",
fs_path=rom.fs_path,
)
)
hit = db_rom_handler.with_char_index(
query=query, order_by_attr=Rom.name, cache_key=cache_key
)
# Same consumed shape as the miss (the endpoint folds this into a dict).
assert dict(hit) == dict(miss) == {"t": 0}
class TestInvalidateFilterValuesCache:
def test_deletes_prior_version_keys_and_set(self, rom_with_metadata: Rom):
query = cast(Query[Rom], select(Rom))
# Populate both caches under the current version.
db_rom_handler.with_filter_values(query=query, cache_key="all:filters")
db_rom_handler.with_char_index(
query=query, order_by_attr=Rom.name, cache_key="all:charindex"
)
old_version = _filter_values_cache_version()
old_keys_set = _filter_values_cache_keys_key(old_version)
old_keys = _decode_members(sync_cache.smembers(old_keys_set))
assert old_keys # something was registered
assert all(sync_cache.get(key) is not None for key in old_keys)
db_rom_handler.invalidate_filter_values_cache()
# Version advanced...
new_version = _filter_values_cache_version()
assert int(new_version) == int(old_version) + 1
# ...the prior version's value keys are gone...
assert all(sync_cache.get(key) is None for key in old_keys)
# ...and so is the bookkeeping set itself.
assert sync_cache.smembers(old_keys_set) == set()
def test_recomputes_under_new_version_after_invalidation(
self, rom_with_metadata: Rom
):
query = cast(Query[Rom], select(Rom))
cache_key = "all:filters"
db_rom_handler.with_filter_values(query=query, cache_key=cache_key)
# Change the data, then invalidate: the next call must reflect the change
# because the old cached entry no longer resolves under the new version.
_set_rom_genres(rom_with_metadata.id, ["RPG", "Action"])
db_rom_handler.invalidate_filter_values_cache()
fresh = db_rom_handler.with_filter_values(query=query, cache_key=cache_key)
assert fresh["genres"] == ["Action", "RPG"]
new_version = _filter_values_cache_version()
assert sync_cache.get(f"filter_values:{cache_key}:v{new_version}") is not None

View File

@@ -0,0 +1,33 @@
"""Unit tests for DBRomsHandler's derived-column bookkeeping.
Bulk `update()` bypasses the ORM `@validates` hooks, so `update_rom` keeps
the columns derived from `name` / `fs_name` in sync explicitly.
"""
from handler.database import db_rom_handler
from models.rom import Rom
class TestUpdateRomDerivedColumns:
def test_update_name_resyncs_name_sort_key(self, rom: Rom):
updated = db_rom_handler.update_rom(rom.id, {"name": "The New Name 2"})
assert updated.name == "The New Name 2"
assert updated.name_sort_key == "new name 000000000002"
def test_update_fs_name_resyncs_all_parts(self, rom: Rom):
updated = db_rom_handler.update_rom(rom.id, {"fs_name": "Sonic (Europe).md"})
assert updated.fs_name == "Sonic (Europe).md"
assert updated.fs_name_no_tags == "Sonic"
assert updated.fs_name_no_ext == "Sonic (Europe)"
# The extension is resynced too — the rename endpoint used to omit it.
assert updated.fs_extension == "md"
def test_update_unrelated_field_leaves_derived_columns(self, rom: Rom):
updated = db_rom_handler.update_rom(rom.id, {"summary": "just a summary"})
assert updated.summary == "just a summary"
assert updated.fs_name_no_tags == "test_rom"
assert updated.fs_extension == "zip"
assert updated.name_sort_key == "test_rom"

View File

@@ -101,6 +101,7 @@ class TestFSPlatformsHandler:
self, handler: FSPlatformsHandler, config
):
"""Test get_platforms_directory with Structure B ({platform}/roms)"""
config.has_structure_path_a = False
config.has_structure_path_b = True
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
@@ -203,7 +204,7 @@ class TestFSPlatformsHandler:
self, handler: FSPlatformsHandler, config
):
"""Test that get_platforms calls list_directories with correct path"""
config.has_structure_path_b = False
config.has_structure_path_a = True
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
@@ -217,6 +218,8 @@ class TestFSPlatformsHandler:
self, handler: FSPlatformsHandler, config
):
"""Test that get_platforms calls list_directories with empty path for normal structure"""
config.has_structure_path_a = False
config.has_structure_path_b = True
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
@@ -226,6 +229,48 @@ class TestFSPlatformsHandler:
await handler.get_platforms()
mock_list.assert_called_once_with(path="")
async def test_get_platforms_bootstraps_structure_a_when_none_detected(
self, handler: FSPlatformsHandler, config
):
"""When no structure exists, get_platforms creates Structure A (roms folder)
and returns an empty list instead of raising."""
config.has_structure_path_a = False
config.has_structure_path_b = False
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch.object(
handler, "list_directories", side_effect=FileNotFoundError
):
with patch.object(handler, "create_library_structure") as mock_create:
result = await handler.get_platforms()
assert result == []
mock_create.assert_called_once()
async def test_get_platforms_returns_empty_when_bootstrap_fails(
self, handler: FSPlatformsHandler, config
):
"""If creating the default structure fails, get_platforms still returns an
empty list rather than propagating the error (so the heartbeat stays healthy).
"""
config.has_structure_path_a = False
config.has_structure_path_b = False
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch.object(
handler, "list_directories", side_effect=FileNotFoundError
):
with patch.object(
handler,
"create_library_structure",
side_effect=PermissionError("read-only filesystem"),
):
result = await handler.get_platforms()
assert result == []
def test_integration_with_base_handler_methods(self, handler: FSPlatformsHandler):
"""Test that FSPlatformsHandler properly inherits from FSHandler"""
# Test that handler has base methods
@@ -315,23 +360,20 @@ class TestFSPlatformsHandler:
self, handler: FSPlatformsHandler, config
):
"""Test detect_library_structure detects Structure A (roms/{platform})"""
roms_path = f"{LIBRARY_BASE_PATH}/{config.ROMS_FOLDER_NAME}"
config.has_structure_path_a = True
config.has_structure_path_b = False
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch("os.path.exists") as mock_exists:
mock_exists.return_value = True
result = handler.detect_library_structure()
assert result == LibraryStructure.A
mock_exists.assert_called_once_with(roms_path)
result = handler.detect_library_structure()
assert result == LibraryStructure.A
def test_detect_library_structure_structure_b(
self, handler: FSPlatformsHandler, config
):
"""Test detect_library_structure detects Structure B ({platform}/roms)"""
config.has_structure_path_a = False
config.has_structure_path_b = True
with patch(
@@ -340,39 +382,40 @@ class TestFSPlatformsHandler:
result = handler.detect_library_structure()
assert result == LibraryStructure.B
def test_detect_library_structure_b_takes_priority_over_a(
def test_detect_library_structure_a_takes_priority_over_b(
self, handler: FSPlatformsHandler, config
):
"""Structure B is reported even when the top-level roms folder exists."""
"""Structure A is reported when the top-level roms folder exists, even
when Structure B directories are also present."""
config.has_structure_path_a = True
config.has_structure_path_b = True
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch("os.path.exists", return_value=True):
result = handler.detect_library_structure()
assert result == LibraryStructure.B
result = handler.detect_library_structure()
assert result == LibraryStructure.A
def test_detect_library_structure_none(self, handler: FSPlatformsHandler, config):
"""Test detect_library_structure returns None when no structure detected"""
config.has_structure_path_a = False
config.has_structure_path_b = False
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch("os.path.exists", return_value=False):
result = handler.detect_library_structure()
assert result is None
result = handler.detect_library_structure()
assert result is None
def test_detect_library_structure_empty_library(
self, handler: FSPlatformsHandler, config
):
"""Test detect_library_structure with empty library directory"""
config.has_structure_path_a = False
config.has_structure_path_b = False
with patch(
"handler.filesystem.platforms_handler.cm.get_config", return_value=config
):
with patch("os.path.exists", return_value=False):
result = handler.detect_library_structure()
assert result is None
result = handler.detect_library_structure()
assert result is None

View File

@@ -5,13 +5,30 @@ from unittest.mock import AsyncMock, patch
import pytest
from adapters.services.igdb_types import GameType
from handler.metadata.igdb_handler import IGDBHandler
from handler.metadata.igdb_handler import (
FAMICOM_IGDB_ID,
NES_IGDB_ID,
SNES_IGDB_ID,
SUPER_FAMICOM_IGDB_ID,
IGDBHandler,
_build_platforms_where,
_platform_igdb_ids_with_twin,
)
GENESIS_IGDB_ID = 29
def _make_game(game_id: int, name: str) -> dict:
"""Build a minimal IGDB Game dict for testing."""
def _make_game(
game_id: int,
name: str,
alternative_names: list[str] | None = None,
game_localizations: list[str] | None = None,
) -> dict:
"""Build a minimal IGDB Game dict for testing.
``alternative_names`` and ``game_localizations`` accept plain title strings
and are wrapped into the ``{"name": ...}`` shape IGDB returns.
"""
return {
"id": game_id,
"name": name,
@@ -24,7 +41,7 @@ def _make_game(game_id: int, name: str) -> dict:
"cover": None,
"screenshots": [],
"platforms": [{"id": GENESIS_IGDB_ID, "name": "Sega Mega Drive/Genesis"}],
"alternative_names": [],
"alternative_names": [{"name": n} for n in (alternative_names or [])],
"genres": [],
"franchise": None,
"franchises": [],
@@ -41,7 +58,7 @@ def _make_game(game_id: int, name: str) -> dict:
"videos": [],
"age_ratings": [],
"multiplayer_modes": [],
"game_localizations": [],
"game_localizations": [{"name": n} for n in (game_localizations or [])],
}
@@ -155,3 +172,488 @@ class TestSearchRomGameTypeFilter:
f"Expected Ecco: The Tides of Time (id=5379), got {result.get('name')} (id={result.get('id')}). "
"The expanded search must consider ALL results, not just the first."
)
class TestSearchRomLocalizedNames:
"""Tests for matching ROMs by localized / alternative titles.
Regression coverage for issue #3435: a ROM whose No-Intro / ReDump filename
uses a localized title (e.g. ``007 - Die Welt Ist Nicht Genug (Germany)``)
must match the IGDB game that lists that title in ``alternative_names`` or
``game_localizations``, not only the primary English ``name``.
"""
# James Bond 007: The World Is Not Enough, IGDB id 158962, has the German
# alternative name "007 - Die Welt Ist Nicht Genug" (issue #3435).
ENGLISH_NAME = "James Bond 007: The World Is Not Enough"
GERMAN_TITLE = "007 - Die Welt Ist Nicht Genug"
GAME_ID = 158962
@pytest.mark.asyncio
async def test_alt_name_match_in_primary_games_search(self):
"""A localized filename must match when IGDB returns the game on the
primary games-endpoint pass and the term only matches an alternative
name, not the primary English name."""
handler = IGDBHandler()
game = _make_game(
self.GAME_ID,
self.ENGLISH_NAME,
alternative_names=[self.GERMAN_TITLE],
)
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
# Primary games-endpoint pass returns the game (IGDB's fuzzy search
# surfaces it via the alt name), but the term won't match the
# English primary name on its own.
return [game]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"007 die welt ist nicht genug", GENESIS_IGDB_ID
)
assert result is not None
assert result["id"] == self.GAME_ID, (
f"Expected {self.ENGLISH_NAME} (id={self.GAME_ID}) via its German "
f"alternative name, got {result.get('name')} (id={result.get('id')})"
)
@pytest.mark.asyncio
async def test_alt_name_match_in_expanded_search(self):
"""A localized filename must match when the game is only discovered via
the expanded ``/search`` alternative_name query and the term matches an
alternative name rather than the primary English name."""
handler = IGDBHandler()
game = _make_game(
self.GAME_ID,
self.ENGLISH_NAME,
alternative_names=[self.GERMAN_TITLE],
)
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
# Expanded game-details lookup (id filter) returns the full game.
if where and where.startswith("("):
return [game]
# Primary pass returns nothing useful.
return []
expanded_results = [{"game": {"id": self.GAME_ID}, "name": self.ENGLISH_NAME}]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=expanded_results,
),
):
result = await handler._search_rom(
"007 die welt ist nicht genug", GENESIS_IGDB_ID
)
assert result is not None
assert result["id"] == self.GAME_ID, (
f"Expected {self.ENGLISH_NAME} (id={self.GAME_ID}) via its German "
f"alternative name, got {result.get('name')} (id={result.get('id')})"
)
@pytest.mark.asyncio
async def test_localization_name_match_in_expanded_search(self):
"""A localized filename must match when the matching title lives in
``game_localizations`` rather than ``alternative_names``."""
handler = IGDBHandler()
game = _make_game(
self.GAME_ID,
self.ENGLISH_NAME,
game_localizations=[self.GERMAN_TITLE],
)
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
if where and where.startswith("("):
return [game]
return []
expanded_results = [{"game": {"id": self.GAME_ID}, "name": self.ENGLISH_NAME}]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=expanded_results,
),
):
result = await handler._search_rom(
"007 die welt ist nicht genug", GENESIS_IGDB_ID
)
assert result is not None
assert result["id"] == self.GAME_ID
@pytest.mark.asyncio
async def test_primary_english_name_still_matches(self):
"""Indexing alternative titles must not regress matching by the primary
English name."""
handler = IGDBHandler()
game = _make_game(
self.GAME_ID,
self.ENGLISH_NAME,
alternative_names=[self.GERMAN_TITLE],
)
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
return [game]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"james bond 007 the world is not enough", GENESIS_IGDB_ID
)
assert result is not None
assert result["id"] == self.GAME_ID
@pytest.mark.asyncio
async def test_primary_name_wins_over_other_games_alt_name(self):
"""When a search term equals one game's primary name and another game's
alternative name, the primary-name owner must win (alt titles fill in
only names not already claimed by a primary name)."""
handler = IGDBHandler()
primary = _make_game(100, "Contra")
# A different, higher-id game that lists "Contra" as an alt title.
other = _make_game(200, "Probotector", alternative_names=["Contra"])
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
return [other, primary]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom("contra", GENESIS_IGDB_ID)
assert result is not None
assert result["id"] == 100, (
"Expected the game whose primary name is 'Contra' (id=100), not the "
f"game that merely lists it as an alternative name; got id={result.get('id')}"
)
class TestRegionalTwinPlatformHelpers:
"""Unit tests for the regional-twin platform helpers (issue #3462).
IGDB files a console and its regional twin (SNES/Super Famicom,
NES/Famicom) under separate platform ids, so a search must include both to
match region-exclusive titles.
"""
def test_twin_pairs_are_bidirectional(self):
"""Each console resolves to its regional twin in both directions."""
assert _platform_igdb_ids_with_twin(SNES_IGDB_ID) == [
SNES_IGDB_ID,
SUPER_FAMICOM_IGDB_ID,
]
assert _platform_igdb_ids_with_twin(SUPER_FAMICOM_IGDB_ID) == [
SUPER_FAMICOM_IGDB_ID,
SNES_IGDB_ID,
]
assert _platform_igdb_ids_with_twin(NES_IGDB_ID) == [
NES_IGDB_ID,
FAMICOM_IGDB_ID,
]
assert _platform_igdb_ids_with_twin(FAMICOM_IGDB_ID) == [
FAMICOM_IGDB_ID,
NES_IGDB_ID,
]
def test_non_twin_platform_has_no_twin(self):
"""A platform without a regional twin resolves to itself only."""
assert _platform_igdb_ids_with_twin(GENESIS_IGDB_ID) == [GENESIS_IGDB_ID]
def test_build_where_single_platform_is_unparenthesized(self):
"""A non-twin platform keeps the original single-clause shape."""
assert (
_build_platforms_where(GENESIS_IGDB_ID) == f"platforms=[{GENESIS_IGDB_ID}]"
)
assert (
_build_platforms_where(GENESIS_IGDB_ID, field="game.platforms")
== f"game.platforms=[{GENESIS_IGDB_ID}]"
)
def test_build_where_twin_platform_is_an_or_group(self):
"""A twin platform produces a parenthesized OR of both platform ids."""
assert (
_build_platforms_where(SNES_IGDB_ID)
== f"(platforms=[{SNES_IGDB_ID}] | platforms=[{SUPER_FAMICOM_IGDB_ID}])"
)
assert (
_build_platforms_where(NES_IGDB_ID, field="game.platforms")
== f"(game.platforms=[{NES_IGDB_ID}] | game.platforms=[{FAMICOM_IGDB_ID}])"
)
class TestSearchRomRegionalTwinPlatforms:
"""Tests that IGDB search includes a platform's regional twin (issue #3462).
A Japan-only Super Famicom title (e.g. *Rudra no Hihou*) lives only under
IGDB's Super Famicom platform, so an ``snes`` scan that filtered to the SNES
platform alone would silently drop it. The search must query both twins.
"""
@pytest.mark.asyncio
async def test_snes_search_matches_super_famicom_only_game(self):
"""A Super-Famicom-only game must match when scanned from ``snes``."""
handler = IGDBHandler()
# Rudra no Hihou is catalogued under Super Famicom (58) only.
rudra = _make_game(829, "Rudra no Hihou")
captured_wheres: list[str] = []
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
captured_wheres.append(where or "")
# IGDB only surfaces the game when the Super Famicom platform is
# part of the filter.
if where and f"platforms=[{SUPER_FAMICOM_IGDB_ID}]" in where:
return [rudra]
return []
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"rudra no hihou", SNES_IGDB_ID, with_game_type=True
)
assert result is not None
assert result["id"] == 829, (
"Expected Rudra no Hihou (id=829) to match from an snes scan via the "
f"Super Famicom platform; got {result.get('name')} (id={result.get('id')})"
)
# The primary platform filter must mention both twins.
assert any(
f"platforms=[{SNES_IGDB_ID}]" in w
and f"platforms=[{SUPER_FAMICOM_IGDB_ID}]" in w
for w in captured_wheres
), f"Expected SNES + Super Famicom in the platform filter, got: {captured_wheres}"
@pytest.mark.asyncio
async def test_nes_search_matches_famicom_only_game(self):
"""A Famicom-only game must match when scanned from ``nes``."""
handler = IGDBHandler()
famicom_only = _make_game(1234, "Famicom Mukashi Banashi")
captured_wheres: list[str] = []
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
captured_wheres.append(where or "")
if where and f"platforms=[{FAMICOM_IGDB_ID}]" in where:
return [famicom_only]
return []
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"famicom mukashi banashi", NES_IGDB_ID, with_game_type=True
)
assert result is not None
assert result["id"] == 1234
assert any(
f"platforms=[{NES_IGDB_ID}]" in w and f"platforms=[{FAMICOM_IGDB_ID}]" in w
for w in captured_wheres
), f"Expected NES + Famicom in the platform filter, got: {captured_wheres}"
@pytest.mark.asyncio
async def test_super_famicom_search_matches_snes_only_game(self):
"""A Western-only SNES game must match when scanned from ``sfam``."""
handler = IGDBHandler()
snes_only = _make_game(4321, "EarthBound")
captured_wheres: list[str] = []
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
captured_wheres.append(where or "")
if where and f"platforms=[{SNES_IGDB_ID}]" in where:
return [snes_only]
return []
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"earthbound", SUPER_FAMICOM_IGDB_ID, with_game_type=True
)
assert result is not None
assert result["id"] == 4321
@pytest.mark.asyncio
async def test_non_twin_platform_filter_is_single_platform(self):
"""A platform without a twin must keep querying only its own id."""
handler = IGDBHandler()
game = _make_game(1799, "Ecco the Dolphin")
captured_wheres: list[str] = []
async def mock_list_games(
search_term=None, fields=None, where=None, limit=None
):
captured_wheres.append(where or "")
return [game]
with (
patch(
"handler.metadata.igdb_handler.IGDBHandler.is_enabled",
return_value=True,
),
patch.object(
handler.igdb_service,
"list_games",
side_effect=mock_list_games,
),
patch.object(
handler.igdb_service,
"search",
new_callable=AsyncMock,
return_value=[],
),
):
result = await handler._search_rom(
"ecco the dolphin", GENESIS_IGDB_ID, with_game_type=True
)
assert result is not None
assert result["id"] == 1799
# No twin → no OR group, just the single platform clause.
assert all(
" | " not in w for w in captured_wheres
), f"Non-twin platform should not OR a twin platform, got: {captured_wheres}"
assert any(
f"platforms=[{GENESIS_IGDB_ID}]" in w for w in captured_wheres
), captured_wheres

View File

@@ -667,6 +667,18 @@ class TestAddSsAuthToUrl:
assert download_query.get("systemeid") == ["1"]
class TestGetPlatform:
"""Tests for SSHandler.get_platform — the slug → ScreenScraper system map."""
def test_unmapped_platform_returns_none_ss_id(self):
"""A slug with no ScreenScraper mapping yields ss_id=None (lookup skipped)."""
handler = SSHandler()
platform = handler.get_platform("not-a-real-platform")
assert platform["ss_id"] is None
assert platform["slug"] == "not-a-real-platform"
class TestGetRomType:
def _file(self, ext: str, top_level: bool = True) -> MagicMock:
f = MagicMock()
@@ -699,6 +711,103 @@ class TestLookupRom:
f.file_name = "bios.bin"
return f
def _make_unhashed_file(
self, file_name: str = "Adventure Island II (USA).nes"
) -> MagicMock:
"""A top-level file with no hashes, as produced for NON_HASHABLE_PLATFORMS
or when SKIP_HASH_CALCULATION is enabled."""
f = MagicMock()
f.file_size_bytes = 131072
f.is_top_level = True
f.file_extension = "nes"
f.md5_hash = ""
f.sha1_hash = ""
f.crc_hash = ""
f.file_name = file_name
f.archive_members = None
return f
@pytest.mark.asyncio
async def test_no_hash_still_attempts_jeuinfos_by_filename(self):
"""A file with no hashes must still reach jeuInfos using the filename
(romnom) + platform (systemeid), instead of bailing out and degrading to
the weaker jeuRecherche name search."""
handler = SSHandler()
mock_file = self._make_unhashed_file("Adventure Island II (USA).nes")
captured = {}
async def capture(**kwargs):
captured.update(kwargs)
return None
with patch.object(handler.ss_service, "get_game_info", side_effect=capture):
result, is_not_game = await handler.lookup_rom(
MagicMock(platform_slug="nes"), 3, [mock_file]
)
assert captured, "get_game_info should be called even without hashes"
assert captured.get("rom_name") == "Adventure Island II (USA).nes"
assert captured.get("system_id") == 3
assert not captured.get("md5")
assert not captured.get("sha1")
assert not captured.get("crc")
assert result["ss_id"] is None
assert is_not_game is False
@pytest.mark.asyncio
async def test_no_hash_match_builds_game(self):
"""When jeuInfos matches an un-hashed file by filename, the game is built
and returned (the romnom matcher bridges number-style differences such as
'Adventure Island II' -> 'Adventure Island 2')."""
config = _make_config(region_priority=["us"])
game = {
"id": "1234",
"noms": [{"region": "us", "text": "Adventure Island 2"}],
"medias": [],
"synopsis": [],
"dates": [],
"genres": [],
"familles": [],
"modes": [],
"joueurs": {},
"note": {},
}
handler = SSHandler()
mock_file = self._make_unhashed_file("Adventure Island II (USA).nes")
rom = MagicMock(platform_slug="nes", platform_id=1, id=100, regions=["USA"])
with (
patch("handler.metadata.ss_handler.cm.get_config", return_value=config),
patch.object(handler.ss_service, "get_game_info", return_value=game),
):
result, is_not_game = await handler.lookup_rom(rom, 3, [mock_file])
assert result["ss_id"] == 1234
assert result["name"] == "Adventure Island 2"
assert is_not_game is False
@pytest.mark.asyncio
async def test_no_hash_no_filename_skips_lookup(self):
"""With neither a hash nor a filename there is nothing to match on, so the
lookup is skipped without spending an API call."""
handler = SSHandler()
mock_file = self._make_unhashed_file(file_name="")
called = False
async def capture(**kwargs):
nonlocal called
called = True
return None
with patch.object(handler.ss_service, "get_game_info", side_effect=capture):
result, is_not_game = await handler.lookup_rom(
MagicMock(platform_slug="nes"), 3, [mock_file]
)
assert called is False
assert result["ss_id"] is None
assert is_not_game is False
@pytest.mark.asyncio
async def test_returns_notgame_flag_on_notgame_field(self):
notgame_response = {
@@ -819,3 +928,97 @@ class TestLookupRom:
)
assert result["ss_id"] is None
assert is_not_game is True
class TestSearchTermEncoding:
"""Regression tests for issue #3467: the SS name-search term must be
URL-encoded exactly once.
The handler must pass the *raw* (un-percent-encoded) term to the service
layer, which percent-encodes it a single time when building the request URL
via ``with_query(...)``. Pre-encoding the term in the handler caused a
second round of encoding (``%2B`` -> ``%252B``), so ScreenScraper searched
for literal gibberish and returned no match for any title containing a
character that has to be URL-encoded (``+``, ``&``, an apostrophe, ...).
"""
@pytest.mark.asyncio
@pytest.mark.parametrize(
("search_term", "literal", "double_encoded"),
[
("super mario 3d world + bowsers fury", "+", "%2B"),
("sonic & knuckles", "&", "%26"),
("marvel's spider-man", "'", "%27"),
],
)
async def test_search_rom_passes_unencoded_term_to_service(
self, search_term, literal, double_encoded
):
"""``_search_rom`` hands the service a term that is not pre-encoded."""
handler = SSHandler()
captured: dict = {}
async def capture(**kwargs):
captured.update(kwargs)
return []
with patch.object(handler.ss_service, "search_games", side_effect=capture):
await handler._search_rom(search_term, 225)
term = captured["term"]
assert literal in term
assert double_encoded not in term
@pytest.mark.asyncio
async def test_search_rom_still_transliterates_unicode(self):
"""Unidecode is still applied so accented titles match ScreenScraper."""
handler = SSHandler()
captured: dict = {}
async def capture(**kwargs):
captured.update(kwargs)
return []
with patch.object(handler.ss_service, "search_games", side_effect=capture):
await handler._search_rom("Pokémon Snap", 14)
assert captured["term"] == "Pokemon Snap"
@pytest.mark.asyncio
async def test_get_matched_roms_by_name_passes_unencoded_term(self):
"""``get_matched_roms_by_name`` also avoids pre-encoding the term."""
handler = SSHandler()
captured: dict = {}
async def capture(**kwargs):
captured.update(kwargs)
return []
with (
patch("handler.metadata.ss_handler.SCREENSCRAPER_USER", "user1"),
patch("handler.metadata.ss_handler.SCREENSCRAPER_PASSWORD", "pw1"),
patch.object(handler.ss_service, "search_games", side_effect=capture),
):
await handler.get_matched_roms_by_name(MagicMock(), "sonic & knuckles", 1)
term = captured["term"]
assert "&" in term
assert "%26" not in term
@pytest.mark.asyncio
async def test_search_rom_url_single_encodes_plus(self):
"""End-to-end through the real service: a ``+`` is encoded exactly once
in the request URL (``%2B``), never doubly (``%252B``)."""
handler = SSHandler()
captured: dict = {}
async def capture_request(url, *args, **kwargs):
captured["url"] = url
return {"response": {"jeux": []}}
with patch.object(handler.ss_service, "_request", side_effect=capture_request):
await handler._search_rom("super mario 3d world + bowsers fury", 225)
url = captured["url"]
assert "%2B" in url
assert "%252B" not in url

View File

@@ -3,6 +3,7 @@ from datetime import datetime, timezone
import pytest
from sqlalchemy.exc import IntegrityError
from config import ROMM_DB_DRIVER
from handler.auth import auth_handler
from handler.database import (
db_platform_handler,
@@ -73,6 +74,44 @@ def test_roms(rom: Rom, platform: Platform):
assert len(roms) == 1
def test_multi_file_rom_backref_survives_session_close(multi_file_rom: Rom):
"""Multi-file ROM downloads read `file.rom.full_path` after the handler
session closes. The detail loaders eager-load `Rom.files` but not the
reverse `RomFile.rom` relationship, so without the backref being populated
this raises `DetachedInstanceError` (a 500 on the download endpoint).
"""
folder_path = f"{multi_file_rom.fs_path}/{multi_file_rom.fs_name}"
# `multi_file_rom` is returned by `get_rom`, with the session already closed.
assert len(multi_file_rom.files) == 2
for file in multi_file_rom.files:
# All of these dereference `file.rom` on a now-detached instance.
assert file.rom.full_path == folder_path
assert file.is_top_level
assert file.file_name_for_download() == file.file_name
# `get_roms_by_ids` (used by the bulk zip download) must behave the same.
by_ids = db_rom_handler.get_roms_by_ids([multi_file_rom.id])
assert len(by_ids) == 1
for file in by_ids[0].files:
assert file.rom.full_path == folder_path
def test_rom_files_for_rom_id_loads_backref(multi_file_rom: Rom):
"""The scan/metadata-matching fallback fetches a ROM's files on demand and
reads `RomFile.is_top_level` -> `RomFile.rom.full_path`. The backref must be
eager-loaded so it survives the handler session closing.
"""
folder_path = f"{multi_file_rom.fs_path}/{multi_file_rom.fs_name}"
files = db_rom_handler.rom_files_for_rom_id(multi_file_rom.id)
assert len(files) == 2
for file in files:
# Both dereference `file.rom` on a now-detached instance.
assert file.rom.full_path == folder_path
assert file.is_top_level
def test_filter_last_played(rom: Rom, platform: Platform, admin_user: User):
second_rom = db_rom_handler.add_rom(
Rom(
@@ -181,6 +220,59 @@ def test_filter_by_search_term_with_multiple_terms(platform: Platform):
assert actual_rom_ids_single == expected_rom_ids_single
def test_filter_by_search_term_multi_word_and_ranking(platform: Platform):
def _add(name: str) -> Rom:
fs = name.replace(" ", "_")
return db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name=name,
slug=name.lower().replace(" ", "-"),
fs_name=f"{fs}.zip",
fs_name_no_tags=fs,
fs_name_no_ext=fs,
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
ff = _add("Final Fantasy")
ff7 = _add("Final Fantasy VII")
fantasy_final = _add("Fantasy Final") # both words, reversed order
_add("Final Combat") # only "final"
_add("Angelique - Voice Fantasy") # only "fantasy"
_add("Super Mario World") # neither word
results = db_rom_handler.get_roms_scalar(search_term="final fantasy")
result_ids = [r.id for r in results]
# Only titles containing BOTH words appear (AND semantics).
assert set(result_ids) == {ff.id, ff7.id, fantasy_final.id}
# Relevance ordering uses MATCH ... AGAINST, which only runs on
# MySQL/MariaDB; PostgreSQL falls back to name ordering, so the
# phrase-ranking assertions only hold on those drivers.
if ROMM_DB_DRIVER in ("mariadb", "mysql"):
# Exact-order phrase matches rank above the reversed-order match.
assert result_ids.index(ff.id) < result_ids.index(fantasy_final.id)
assert result_ids.index(ff7.id) < result_ids.index(fantasy_final.id)
# The relevance ORDER BY must also survive the group_by_meta_id subquery
# wrapping used by the gallery (each ROM here is its own group).
grouped = db_rom_handler.get_roms_scalar(
search_term="final fantasy", group_by_meta_id=True
)
assert {r.id for r in grouped} == {ff.id, ff7.id, fantasy_final.id}
# An explicit sort takes priority over relevance: ordering by name asc puts
# "Fantasy Final" first (relevance is only the tiebreaker here).
explicit = db_rom_handler.get_roms_scalar(
search_term="final fantasy", order_by="name", order_dir="asc"
)
explicit_ids = [r.id for r in explicit]
assert explicit_ids.index(fantasy_final.id) < explicit_ids.index(ff.id)
def test_sibling_roms_empty_fs_name_no_tags_not_matched(platform: Platform):
"""ROMs with empty fs_name_no_tags should NOT be matched as siblings.

View File

@@ -309,3 +309,70 @@ async def test_scan_rom_unmatched_skips_ra_when_id_and_metadata_exist(
mock_get_rom.assert_not_called()
# Existing ra_id should be preserved
assert result.ra_id == 2774
def _top_level_rom_file(**kwargs) -> RomFile:
"""Build a RomFile whose `is_top_level` cached_property is pre-seeded to
True, so it passes lookup_rom's filtering without a persisted rom."""
file = RomFile(file_path="n64/Game", **kwargs)
file.__dict__["is_top_level"] = True
return file
@patch.object(meta_hasheous_handler, "_request", new_callable=AsyncMock)
@patch.object(meta_hasheous_handler, "is_enabled", return_value=True)
async def test_lookup_rom_sends_all_top_level_file_hashes(
mock_is_enabled, mock_request
):
"""lookup_rom must send the hashes of every top-level file as a list,
using chd_sha1_hash (and only it) for files that have one, and skipping
files with no hashes or zero size."""
mock_request.return_value = {}
files = [
_top_level_rom_file(
file_name="disc1.bin",
file_size_bytes=100,
md5_hash="md5one",
sha1_hash="sha1one",
crc_hash="crcone",
),
# CHD file: only chd_sha1_hash should be sent, raw md5/crc ignored.
_top_level_rom_file(
file_name="disc2.chd",
file_size_bytes=200,
md5_hash="ignoredmd5",
crc_hash="ignoredcrc",
chd_sha1_hash="chdsha1",
),
# Zero-size file: must be filtered out entirely.
_top_level_rom_file(
file_name="empty.bin",
file_size_bytes=0,
md5_hash="zeromd5",
),
# No hashes at all: must be skipped.
_top_level_rom_file(file_name="nohash.bin", file_size_bytes=50),
]
result = await meta_hasheous_handler.lookup_rom("n64", files)
assert result["hasheous_id"] is None
mock_request.assert_called_once()
sent_data = mock_request.call_args.kwargs["data"]
assert sent_data == [
{"mD5": "md5one", "shA1": "sha1one", "crc": "crcone"},
{"shA1": "chdsha1"},
]
@patch.object(meta_hasheous_handler, "_request", new_callable=AsyncMock)
@patch.object(meta_hasheous_handler, "is_enabled", return_value=True)
async def test_lookup_rom_skips_request_when_no_hashes(mock_is_enabled, mock_request):
"""lookup_rom must not hit the API when no file has any usable hash."""
files = [_top_level_rom_file(file_name="nohash.bin", file_size_bytes=50)]
result = await meta_hasheous_handler.lookup_rom("n64", files)
assert result["hasheous_id"] is None
mock_request.assert_not_called()