Merge pull request #3384 from Spinnich/pr/screenscraper-api-and-redaction

fix(screenscraper): improve API handling, KO scrape data, and metadata sanitization
This commit is contained in:
Georges-Antoine Assi
2026-05-20 09:11:36 -04:00
committed by GitHub
5 changed files with 349 additions and 14 deletions

View File

@@ -85,10 +85,34 @@ class ScreenScraperService:
) from exc
except aiohttp.ClientResponseError as err:
if err.status == http.HTTPStatus.TOO_MANY_REQUESTS:
# Retry after 2 seconds if rate limit hit
log.warning("ScreenScraper: rate limit hit, retrying after 2s")
await asyncio.sleep(2)
elif err.status == 426:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ScreenScraper has blacklisted this application version. Please update RomM.",
) from err
elif err.status == 430:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily scrape quota exhausted. Try again tomorrow.",
) from err
elif err.status == 431:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily unrecognized-ROM quota exhausted. Try again tomorrow.",
) from err
elif err.status == 423:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="ScreenScraper API is currently offline.",
) from err
elif err.status == http.HTTPStatus.UNAUTHORIZED:
log.warning(
"ScreenScraper API is temporarily unavailable (server CPU >60%)"
)
return {}
else:
# Log the error and return an empty dict if the request fails with a different code
log.error(err)
return {}
except json.JSONDecodeError as exc:
@@ -117,11 +141,32 @@ class ScreenScraperService:
)
return await res.json()
except (aiohttp.ClientResponseError, aiohttp.ServerTimeoutError) as err:
if (
isinstance(err, aiohttp.ClientResponseError)
and err.status == http.HTTPStatus.UNAUTHORIZED
):
return {}
if isinstance(err, aiohttp.ClientResponseError):
if err.status == http.HTTPStatus.UNAUTHORIZED:
log.warning(
"ScreenScraper API is temporarily unavailable (server CPU >60%)"
)
return {}
elif err.status == 426:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ScreenScraper has blacklisted this application version. Please update RomM.",
) from err
elif err.status == 430:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily scrape quota exhausted. Try again tomorrow.",
) from err
elif err.status == 431:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily unrecognized-ROM quota exhausted. Try again tomorrow.",
) from err
elif err.status == 423:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="ScreenScraper API is currently offline.",
) from err
log.error(err)
return {}

View File

@@ -1,4 +1,4 @@
import base64
import html
import re
from datetime import datetime
from typing import Final, NotRequired, TypedDict
@@ -30,8 +30,6 @@ from .base_handler import (
strip_sensitive_query_params,
)
SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode()
SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode()
SENSITIVE_KEYS = {"ssid", "sspassword"}
@@ -82,6 +80,10 @@ ARCADES_SS_IDS: Final = [ARCADE_SS_ID, CPS1_SS_ID, CPS2_SS_ID, CPS3_SS_ID]
# Regex to detect ScreenScraper ID tags in filenames like (ssfr-12345)
SS_TAG_REGEX = re.compile(r"\(ssfr-(\d+)\)", re.IGNORECASE)
NOTGAME_NAME_PREFIX: Final = "ZZZ(NOTGAME)"
_ISO_EXTENSIONS: Final = frozenset({"iso", "cue", "chd", "gdi", "cdi", "bin"})
ACCEPTABLE_FILE_EXTENSIONS_BY_PLATFORM_SLUG = {
UPS.DC: ["cue", "chd", "gdi", "cdi"],
UPS.SEGACD: ["cue", "chd", "bin"],
@@ -150,6 +152,23 @@ class SSRom(BaseRom):
ss_metadata: NotRequired[SSMetadata]
def _is_notgame(game: SSGame) -> bool:
if game.get("notgame") == "true":
return True
return any(
name.get("text", "").upper().startswith(NOTGAME_NAME_PREFIX)
for name in game.get("noms", [])
)
def _get_rom_type(file: RomFile) -> str:
if not file.is_top_level:
return "dossier"
if file.file_extension.lower() in _ISO_EXTENSIONS:
return "iso"
return "rom"
def extract_media_from_ss_game(rom: Rom, game: SSGame) -> SSMetadataMedia:
preferred_media_types = get_preferred_media_types()
@@ -482,8 +501,8 @@ def build_ss_game(rom: Rom, game: SSGame) -> SSRom:
ss_id = int(game["id"]) if game.get("id") is not None else None
game_rom: SSRom = {
"ss_id": ss_id,
"name": res_name.replace(" : ", ": "), # Normalize colons
"summary": res_summary,
"name": html.unescape(res_name.replace(" : ", ": ")), # Normalize colons
"summary": html.unescape(res_summary),
"url_cover": str(url_cover) if url_cover else "",
"url_manual": str(url_manual) if url_manual else "",
"url_screenshots": url_screenshots,
@@ -534,6 +553,11 @@ class SSHandler(MetadataHandler):
games_by_name: dict[str, SSGame] = {}
for rom in roms:
if _is_notgame(rom):
log.warning(
"ScreenScraper: Received notgame entry in search results, ignoring"
)
continue
for name in rom.get("noms", []):
if name["text"] not in games_by_name or int(rom["id"]) < int(
games_by_name[name["text"]]["id"]
@@ -612,10 +636,18 @@ class SSHandler(MetadataHandler):
sha1=sha1_hash,
crc=crc_hash,
rom_size_bytes=fs_size_bytes,
rom_name=first_file.file_name,
rom_type=_get_rom_type(first_file),
)
if not res:
return SSRom(ss_id=None)
if _is_notgame(res):
log.warning(
"ScreenScraper: Received notgame entry from hash lookup, ignoring"
)
return SSRom(ss_id=None)
return build_ss_game(rom, res)
async def get_rom(self, rom: Rom, file_name: str, platform_ss_id: int) -> SSRom:
@@ -645,6 +677,9 @@ class SSHandler(MetadataHandler):
search_term = fs_rom_handler.get_file_name_with_no_tags(file_name)
fallback_rom = SSRom(ss_id=None)
if not search_term:
return fallback_rom
# Support for PS2 OPL filename format
match = PS2_OPL_REGEX.match(file_name)
if platform_ss_id == PS2_SS_ID and match:
@@ -766,7 +801,7 @@ class SSHandler(MetadataHandler):
return [
build_ss_game(rom, game)
for game in matched_games
if _is_ss_region(game) and game.get("id")
if not _is_notgame(game) and _is_ss_region(game) and game.get("id")
]

View File

@@ -1,4 +1,5 @@
import logging
import re
from pprint import pformat
from colorama import Fore, Style, init
@@ -55,6 +56,30 @@ LOGGING_CONFIG = {
}
_pattern_cache: re.Pattern[str] | None = None
def _credential_pattern() -> re.Pattern[str] | None:
global _pattern_cache
if _pattern_cache is not None:
return _pattern_cache
try:
# Late import: base_handler indirectly imports this module via
# logger.logger, so referencing it at module top would be a
# circular import.
from handler.metadata.base_handler import SENSITIVE_KEYS
except ImportError:
# base_handler is mid-load (a few boot-time log lines fire before
# it finishes). Skip redaction for this record; the next call
# after boot will populate the cache.
return None
_pattern_cache = re.compile(
rf"({'|'.join(re.escape(k) for k in SENSITIVE_KEYS)})=[^&\s\"]*",
re.IGNORECASE,
)
return _pattern_cache
def should_strip_ansi() -> bool:
"""Determine if ANSI escape codes should be stripped."""
# Check if an explicit environment variable is set to control color behavior
@@ -116,7 +141,9 @@ class Formatter(logging.Formatter):
}
log_fmt = formats.get(record.levelno)
formatter = logging.Formatter(fmt=log_fmt, datefmt="%Y-%m-%d %H:%M:%S")
return formatter.format(record)
output = formatter.format(record)
pattern = _credential_pattern()
return pattern.sub(r"\1=***", output) if pattern else output
def highlight(msg: str = "", color=YELLOW) -> str:

View File

@@ -300,6 +300,94 @@ class TestScreenScraperServiceUnit:
assert result == {}
@pytest.mark.asyncio
async def test_request_blacklisted_raises_403(self, service):
"""Test that HTTP 426 (blacklisted version) raises HTTP 403."""
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientResponseError(
request_info=MagicMock(), history=(), status=426
)
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
with pytest.raises(HTTPException) as exc_info:
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
assert "blacklisted" in exc_info.value.detail
@pytest.mark.asyncio
async def test_request_daily_quota_exhausted_raises_429(self, service):
"""Test that HTTP 430 (daily scrape quota) raises HTTP 429."""
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientResponseError(
request_info=MagicMock(), history=(), status=430
)
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
with pytest.raises(HTTPException) as exc_info:
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert exc_info.value.status_code == status.HTTP_429_TOO_MANY_REQUESTS
assert "daily scrape quota" in exc_info.value.detail
@pytest.mark.asyncio
async def test_request_unrecognized_rom_quota_exhausted_raises_429(self, service):
"""Test that HTTP 431 (unrecognized ROM quota) raises HTTP 429."""
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientResponseError(
request_info=MagicMock(), history=(), status=431
)
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
with pytest.raises(HTTPException) as exc_info:
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert exc_info.value.status_code == status.HTTP_429_TOO_MANY_REQUESTS
assert "unrecognized" in exc_info.value.detail
@pytest.mark.asyncio
async def test_request_api_offline_raises_503(self, service):
"""Test that HTTP 423 (API offline) raises HTTP 503."""
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientResponseError(
request_info=MagicMock(), history=(), status=423
)
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
with pytest.raises(HTTPException) as exc_info:
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert "offline" in exc_info.value.detail
@pytest.mark.asyncio
async def test_request_cpu_throttle_returns_empty_dict(self, service):
"""Test that HTTP 401 (server CPU throttle) returns empty dict without retrying."""
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientResponseError(
request_info=MagicMock(),
history=(),
status=http.HTTPStatus.UNAUTHORIZED,
)
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
result = await service._request(
"https://api.screenscraper.fr/api2/jeuInfos.php"
)
assert result == {}
assert mock_session.get.call_count == 1
@pytest.mark.asyncio
async def test_get_game_info_with_crc(self, service):
"""Test get_game_info with CRC parameter."""

View File

@@ -2,10 +2,13 @@
from typing import cast
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs, urlparse
from adapters.services.screenscraper_types import SSGame
from config.config_manager import Config, MetadataMediaType
from handler.metadata.ss_handler import (
_get_rom_type,
_is_notgame,
extract_media_from_ss_game,
get_preferred_regions,
)
@@ -160,3 +163,140 @@ class TestExtractMediaFromSsGame:
assert result["box2d_url"] is not None
assert "box-2D(us)" in result["box2d_url"]
class TestIsNotgame:
def _game(self, notgame: str = "false", names: list[str] | None = None) -> SSGame:
return cast(
SSGame,
{
"notgame": notgame,
"noms": [
{"region": "ss", "text": n} for n in (names or ["Clean Game"])
],
},
)
def test_notgame_field_true(self):
assert _is_notgame(self._game(notgame="true")) is True
def test_notgame_field_false_clean_name(self):
assert _is_notgame(self._game(notgame="false")) is False
def test_zzz_notgame_lowercase_name(self):
assert _is_notgame(self._game(names=["ZZZ(notgame)"])) is True
def test_zzz_notgame_long_form(self):
assert (
_is_notgame(self._game(names=["ZZZ(NOTGAME):Fichier Annexes - Non Jeux"]))
is True
)
def test_zzz_prefix_only_no_match(self):
assert _is_notgame(self._game(names=["ZZZ Game Title"])) is False
def test_missing_notgame_field_clean_name(self):
game = cast(SSGame, {"noms": [{"region": "ss", "text": "Normal Game"}]})
assert _is_notgame(game) is False
class TestExtractMediaSensitiveKeyStripping:
def test_media_url_credentials_stripped(self):
config = _make_config()
rom = MagicMock()
rom.platform_id = 1
rom.id = 100
full_url = "https://screenscraper.fr/img.png?ssid=user&sspassword=pass&devid=dev&devpassword=devpass&other=keep"
game = cast(
SSGame,
{
"medias": [
{
"type": "box-2D",
"parent": "jeu",
"region": "us",
"url": full_url,
"crc": "",
"md5": "",
"sha1": "",
"size": "0",
"format": "png",
}
]
},
)
with (
patch("handler.metadata.ss_handler.cm.get_config", return_value=config),
patch(
"handler.metadata.ss_handler.fs_resource_handler.get_media_resources_path",
return_value="roms/1/100/box2d",
),
):
result = extract_media_from_ss_game(rom, game)
# ssid/sspassword must be stripped before storage so user creds don't
# end up in the DB. Dev creds and other params must be preserved so
# subsequent media fetches still resolve.
stored_url = result["box2d_url"]
assert stored_url is not None
query = parse_qs(urlparse(stored_url).query)
assert "ssid" not in query
assert "sspassword" not in query
assert query.get("devid") == ["dev"]
assert query.get("devpassword") == ["devpass"]
assert query.get("other") == ["keep"]
def test_clean_url_unchanged(self):
config = _make_config()
rom = MagicMock()
rom.platform_id = 1
rom.id = 100
clean_url = "https://screenscraper.fr/img.png?format=png"
game = cast(
SSGame,
{
"medias": [
{
"type": "box-2D",
"parent": "jeu",
"region": "us",
"url": clean_url,
"crc": "",
"md5": "",
"sha1": "",
"size": "0",
"format": "png",
}
]
},
)
with (
patch("handler.metadata.ss_handler.cm.get_config", return_value=config),
patch(
"handler.metadata.ss_handler.fs_resource_handler.get_media_resources_path",
return_value="roms/1/100/box2d",
),
):
result = extract_media_from_ss_game(rom, game)
assert result["box2d_url"] == clean_url
class TestGetRomType:
def _file(self, ext: str, top_level: bool = True) -> MagicMock:
f = MagicMock()
f.file_extension = ext
f.is_top_level = top_level
return f
def test_iso_extension(self):
assert _get_rom_type(self._file("iso")) == "iso"
def test_chd_extension(self):
assert _get_rom_type(self._file("chd")) == "iso"
def test_rom_extension(self):
assert _get_rom_type(self._file("nes")) == "rom"
def test_folder_based_rom(self):
assert _get_rom_type(self._file("bin", top_level=False)) == "dossier"