From cd41422660e03a1e9657e1601a2a2bfad6808014 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 6 Jun 2026 16:02:13 +0000 Subject: [PATCH] Use a concurrency limiter for ScreenScraper, honoring account threads ScreenScraper enforces a per-account *thread* (concurrency) cap rather than a request rate. Requests can take several seconds, so spacing out request starts at 1/s could still leave multiple requests overlapping in flight, exceeding the cap and getting rejected with ScreenScraper's custom HTTP codes. - Add ConcurrencyLimiter: a runtime-resizable, loop-agnostic limiter that bounds simultaneous in-flight operations (held for the whole request via async context manager), instead of spacing request starts. - Switch the ScreenScraper service from the req/s RateLimiter to a module-level ConcurrencyLimiter defaulting to a single thread. - Recognize contributor/donor accounts: parse `ssuser.maxthreads` from each response and raise the concurrency allowance to match, so supporters scrape with their full thread count instead of the conservative default. Adds unit tests for the limiter (blocking, wake-on-release, runtime resize) and for the ScreenScraper slot-holding and thread-allowance updates. https://claude.ai/code/session_01133QQuWvq8Zm25DZMP9PVr --- backend/adapters/services/screenscraper.py | 97 ++++++++++++------- .../adapters/services/screenscraper_types.py | 14 ++- backend/tests/adapters/services/conftest.py | 20 +++- .../adapters/services/test_screenscraper.py | 61 +++++++++++- backend/tests/utils/test_rate_limiter.py | 80 ++++++++++++++- backend/utils/rate_limiter.py | 86 ++++++++++++++++ 6 files changed, 312 insertions(+), 46 deletions(-) diff --git a/backend/adapters/services/screenscraper.py b/backend/adapters/services/screenscraper.py index bdba3b98d..e3d6283a1 100644 --- a/backend/adapters/services/screenscraper.py +++ b/backend/adapters/services/screenscraper.py @@ -14,15 +14,40 @@ from config import SCREENSCRAPER_PASSWORD, SCREENSCRAPER_USER from logger.logger import log from utils import get_version from utils.context import ctx_aiohttp_session -from utils.rate_limiter import RateLimiter +from utils.rate_limiter import ConcurrencyLimiter SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode() SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode() LOGIN_ERROR_CHECK: Final = "Erreur de login" -# ScreenScraper throttles the free tier to roughly one request per second. -SS_MAX_REQUESTS_PER_SECOND: Final[float] = 1 -_rate_limiter = RateLimiter(SS_MAX_REQUESTS_PER_SECOND) +# ScreenScraper enforces a per-account *thread* (concurrency) cap rather than a +# request rate. Because a request can take several seconds, spacing out request +# starts is not enough -- overlapping requests would exceed the cap and get +# rejected. We instead bound simultaneous in-flight requests. The free/anonymous +# tier allows a single thread; contributors and donors get more, which we learn +# from `ssuser.maxthreads` in each response and apply via the limiter. +SS_DEFAULT_MAX_THREADS: Final[int] = 1 +_concurrency_limiter = ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS) + + +def _update_thread_allowance(response: dict) -> None: + """Raise (or lower) the concurrency cap to the account's advertised threads. + + ScreenScraper reports the per-account thread allowance in + ``response.ssuser.maxthreads`` (higher for contributors/donors). Honouring it + lets supporters scrape with their full concurrency instead of the + conservative default. + """ + try: + max_threads = int(response["response"]["ssuser"]["maxthreads"]) + except (AttributeError, KeyError, TypeError, ValueError): + return + + if max_threads < 1 or max_threads == _concurrency_limiter.max_concurrency: + return + + log.info("ScreenScraper: setting thread allowance to %d", max_threads) + _concurrency_limiter.set_max_concurrency(max_threads) async def auth_middleware( @@ -62,22 +87,24 @@ class ScreenScraperService: request_timeout, ) try: - await _rate_limiter.acquire() - res = await aiohttp_session.get( - url, - headers={"user-agent": f"RomM/{get_version()}"}, - middlewares=(auth_middleware,), - timeout=ClientTimeout(total=request_timeout), - ) - res.raise_for_status() - res_text = await res.text() - if LOGIN_ERROR_CHECK in res_text: - log.error("Invalid ScreenScraper credentials") - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid ScreenScraper credentials", + async with _concurrency_limiter: + res = await aiohttp_session.get( + url, + headers={"user-agent": f"RomM/{get_version()}"}, + middlewares=(auth_middleware,), + timeout=ClientTimeout(total=request_timeout), ) - return await res.json() + res.raise_for_status() + res_text = await res.text() + if LOGIN_ERROR_CHECK in res_text: + log.error("Invalid ScreenScraper credentials") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid ScreenScraper credentials", + ) + data = await res.json() + _update_thread_allowance(data) + return data except aiohttp.ServerTimeoutError: # Retry the request once if it times out pass @@ -131,22 +158,24 @@ class ScreenScraperService: url, request_timeout, ) - await _rate_limiter.acquire() - res = await aiohttp_session.get( - url, - headers={"user-agent": f"RomM/{get_version()}"}, - middlewares=(auth_middleware,), - timeout=ClientTimeout(total=request_timeout), - ) - res.raise_for_status() - res_text = await res.text() - if LOGIN_ERROR_CHECK in res_text: - log.error("Invalid ScreenScraper credentials") - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid ScreenScraper credentials", + async with _concurrency_limiter: + res = await aiohttp_session.get( + url, + headers={"user-agent": f"RomM/{get_version()}"}, + middlewares=(auth_middleware,), + timeout=ClientTimeout(total=request_timeout), ) - return await res.json() + res.raise_for_status() + res_text = await res.text() + if LOGIN_ERROR_CHECK in res_text: + log.error("Invalid ScreenScraper credentials") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid ScreenScraper credentials", + ) + data = await res.json() + _update_thread_allowance(data) + return data except (aiohttp.ClientResponseError, aiohttp.ServerTimeoutError) as err: if isinstance(err, aiohttp.ClientResponseError): if err.status == http.HTTPStatus.UNAUTHORIZED: diff --git a/backend/adapters/services/screenscraper_types.py b/backend/adapters/services/screenscraper_types.py index f9aff5421..91491debc 100644 --- a/backend/adapters/services/screenscraper_types.py +++ b/backend/adapters/services/screenscraper_types.py @@ -1,4 +1,16 @@ -from typing import Literal, TypedDict +from typing import Literal, NotRequired, TypedDict + + +# Per-account limits returned in `response.ssuser`. Contributors/donors get a +# higher `maxthreads` allowance. All values arrive as strings. +# Reference: https://api.screenscraper.fr/webapi2.php +class SSUser(TypedDict): + id: NotRequired[str] + niveau: NotRequired[str] + maxthreads: NotRequired[str] + maxrequestspermin: NotRequired[str] + maxrequestsperday: NotRequired[str] + requeststoday: NotRequired[str] class SSText(TypedDict): diff --git a/backend/tests/adapters/services/conftest.py b/backend/tests/adapters/services/conftest.py index 11b4fcac3..9f98d00a3 100644 --- a/backend/tests/adapters/services/conftest.py +++ b/backend/tests/adapters/services/conftest.py @@ -5,24 +5,34 @@ import aiohttp import pytest import pytest_asyncio +from adapters.services.screenscraper import SS_DEFAULT_MAX_THREADS +from utils.rate_limiter import ConcurrencyLimiter + @pytest.fixture(autouse=True) def _disable_metadata_rate_limiters(monkeypatch): """Neutralize the pre-emptive rate limiters during tests. - Each metadata service spaces its requests via a module-level ``RateLimiter`` - whose ``acquire`` sleeps to stay under the provider's req/s cap. Letting it - run would add real delays and inject extra ``asyncio.sleep`` calls that - interfere with retry assertions, so we replace ``acquire`` with a no-op. + Each req/s-limited service spaces its requests via a module-level + ``RateLimiter`` whose ``acquire`` sleeps to stay under the provider's cap. + Letting it run would add real delays and inject extra ``asyncio.sleep`` calls + that interfere with retry assertions, so we replace ``acquire`` with a no-op. """ for module in ( "adapters.services.igdb", "adapters.services.mobygames", "adapters.services.retroachievements", - "adapters.services.screenscraper", ): monkeypatch.setattr(f"{module}._rate_limiter.acquire", AsyncMock()) + # ScreenScraper uses a concurrency limiter whose capacity mutates at runtime + # from `ssuser.maxthreads`. Swap in a fresh instance so each test starts from + # the default allowance with a clean (per-loop) waiter queue. + monkeypatch.setattr( + "adapters.services.screenscraper._concurrency_limiter", + ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS), + ) + @pytest_asyncio.fixture async def mock_ctx_aiohttp_session(): diff --git a/backend/tests/adapters/services/test_screenscraper.py b/backend/tests/adapters/services/test_screenscraper.py index 7e97fa316..99cc0f57a 100644 --- a/backend/tests/adapters/services/test_screenscraper.py +++ b/backend/tests/adapters/services/test_screenscraper.py @@ -149,12 +149,14 @@ class TestScreenScraperServiceUnit: mock_response.json.assert_called_once() @pytest.mark.asyncio - async def test_request_acquires_rate_limiter(self, service, monkeypatch): - """Test that the request reserves a rate-limiter slot before sending.""" + async def test_request_holds_concurrency_slot(self, service, monkeypatch): + """Test that the request acquires and releases a concurrency slot.""" + import adapters.services.screenscraper as ss_module + acquire_mock = AsyncMock() - monkeypatch.setattr( - "adapters.services.screenscraper._rate_limiter.acquire", acquire_mock - ) + release_mock = MagicMock() + monkeypatch.setattr(ss_module._concurrency_limiter, "acquire", acquire_mock) + monkeypatch.setattr(ss_module._concurrency_limiter, "release", release_mock) mock_session = AsyncMock() mock_response = MagicMock() @@ -170,6 +172,55 @@ class TestScreenScraperServiceUnit: await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") acquire_mock.assert_awaited_once() + release_mock.assert_called_once() + + @pytest.mark.asyncio + async def test_request_updates_thread_allowance_from_ssuser( + self, service, monkeypatch + ): + """Test that `ssuser.maxthreads` raises the concurrency cap (donor perk).""" + import adapters.services.screenscraper as ss_module + + assert ss_module._concurrency_limiter.max_concurrency == 1 + + mock_session = AsyncMock() + mock_response = MagicMock() + mock_response.json = AsyncMock( + return_value={"response": {"ssuser": {"maxthreads": "5"}}} + ) + mock_response.text = AsyncMock(return_value="{}") + mock_response.raise_for_status.return_value = None + mock_session.get.return_value = mock_response + + mock_context = MagicMock() + mock_context.get.return_value = mock_session + + with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context): + await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") + + assert ss_module._concurrency_limiter.max_concurrency == 5 + + @pytest.mark.asyncio + async def test_request_ignores_invalid_maxthreads(self, service, monkeypatch): + """Test that a missing or unparsable `maxthreads` leaves the cap untouched.""" + import adapters.services.screenscraper as ss_module + + mock_session = AsyncMock() + mock_response = MagicMock() + mock_response.json = AsyncMock( + return_value={"response": {"ssuser": {"maxthreads": "not-a-number"}}} + ) + mock_response.text = AsyncMock(return_value="{}") + mock_response.raise_for_status.return_value = None + mock_session.get.return_value = mock_response + + mock_context = MagicMock() + mock_context.get.return_value = mock_session + + with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context): + await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") + + assert ss_module._concurrency_limiter.max_concurrency == 1 @pytest.mark.asyncio async def test_request_login_error(self, service): diff --git a/backend/tests/utils/test_rate_limiter.py b/backend/tests/utils/test_rate_limiter.py index 584ade24d..852cee77f 100644 --- a/backend/tests/utils/test_rate_limiter.py +++ b/backend/tests/utils/test_rate_limiter.py @@ -2,7 +2,7 @@ import asyncio import pytest -from utils.rate_limiter import RateLimiter +from utils.rate_limiter import ConcurrencyLimiter, RateLimiter def _record_sleeps(monkeypatch) -> list[float]: @@ -55,3 +55,81 @@ class TestRateLimiter: def test_non_positive_rate_raises(self, rate): with pytest.raises(ValueError): RateLimiter(requests_per_second=rate) + + +class TestConcurrencyLimiter: + @pytest.mark.parametrize("value", [0, -1]) + def test_non_positive_capacity_raises(self, value): + with pytest.raises(ValueError): + ConcurrencyLimiter(max_concurrency=value) + + async def test_acquire_release_tracks_in_flight(self): + limiter = ConcurrencyLimiter(max_concurrency=2) + + await limiter.acquire() + await limiter.acquire() + assert limiter.in_flight == 2 + + limiter.release() + assert limiter.in_flight == 1 + + async def test_context_manager_releases(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + + async with limiter: + assert limiter.in_flight == 1 + assert limiter.in_flight == 0 + + async def test_context_manager_releases_on_error(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + + with pytest.raises(RuntimeError): + async with limiter: + raise RuntimeError("boom") + + assert limiter.in_flight == 0 + + async def test_acquire_blocks_until_slot_freed(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + await limiter.acquire() + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) # Let the waiter run and block. + assert not waiter.done() + + limiter.release() + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 1 + + async def test_set_max_concurrency_wakes_waiters(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + await limiter.acquire() + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) + assert not waiter.done() + + # Opening a second slot should release the blocked acquirer. + limiter.set_max_concurrency(2) + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 2 + + async def test_lowering_capacity_blocks_new_acquirers(self): + limiter = ConcurrencyLimiter(max_concurrency=2) + await limiter.acquire() + limiter.set_max_concurrency(1) + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) + # Already at the new cap, so the next acquirer must wait. + assert not waiter.done() + + limiter.release() + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 1 + + @pytest.mark.parametrize("value", [0, -1]) + def test_set_non_positive_capacity_raises(self, value): + limiter = ConcurrencyLimiter(max_concurrency=1) + with pytest.raises(ValueError): + limiter.set_max_concurrency(value) diff --git a/backend/utils/rate_limiter.py b/backend/utils/rate_limiter.py index 2b41f0d9b..f17db7fdc 100644 --- a/backend/utils/rate_limiter.py +++ b/backend/utils/rate_limiter.py @@ -1,4 +1,5 @@ import asyncio +from collections import deque class RateLimiter: @@ -26,3 +27,88 @@ class RateLimiter: delay = slot - now if delay > 0: await asyncio.sleep(delay) + + +class ConcurrencyLimiter: + """Caps the number of in-flight operations, with a runtime-adjustable capacity. + + Unlike :class:`RateLimiter`, which spaces out *when* requests start, this + bounds how many run *simultaneously*. It suits APIs that enforce a per-account + thread/connection cap (e.g. ScreenScraper) rather than a call rate: because a + slot is held for the whole request, slow responses can never cause overlapping + requests to exceed the cap. + + The capacity can be raised or lowered at runtime via + :meth:`set_max_concurrency` -- for instance once an API response reveals the + account's allowance. Use it as an async context manager so the slot is always + released, even if the wrapped request raises:: + + async with limiter: + await do_request() + + The implementation is loop-agnostic: waiter futures are created against the + running loop on acquisition, so a single shared instance works across the + distinct event loops used by, e.g., the test suite. + """ + + def __init__(self, max_concurrency: int) -> None: + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + self._max_concurrency = max_concurrency + self._in_flight = 0 + self._waiters: deque[asyncio.Future[None]] = deque() + + @property + def max_concurrency(self) -> int: + return self._max_concurrency + + @property + def in_flight(self) -> int: + return self._in_flight + + def set_max_concurrency(self, max_concurrency: int) -> None: + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + previous = self._max_concurrency + self._max_concurrency = max_concurrency + # Wake one waiter per newly opened slot; each re-checks capacity itself. + for _ in range(max(0, max_concurrency - previous)): + self._wake_next() + + async def acquire(self) -> None: + # Re-check on every wake-up: another coroutine may have taken the slot, + # or the capacity may have been lowered while we waited. + while self._in_flight >= self._max_concurrency: + loop = asyncio.get_running_loop() + waiter = loop.create_future() + self._waiters.append(waiter) + try: + try: + await waiter + finally: + self._waiters.remove(waiter) + except asyncio.CancelledError: + # We were granted a slot but cancelled before using it; pass the + # grant on so a waiting peer is not stranded. + if not waiter.cancelled(): + self._wake_next() + raise + self._in_flight += 1 + + def release(self) -> None: + if self._in_flight > 0: + self._in_flight -= 1 + self._wake_next() + + def _wake_next(self) -> None: + for waiter in self._waiters: + if not waiter.done(): + waiter.set_result(None) + return + + async def __aenter__(self) -> "ConcurrencyLimiter": + await self.acquire() + return self + + async def __aexit__(self, *exc_info: object) -> None: + self.release()