diff --git a/backend/adapters/services/screenscraper.py b/backend/adapters/services/screenscraper.py index bdba3b98d..e3d6283a1 100644 --- a/backend/adapters/services/screenscraper.py +++ b/backend/adapters/services/screenscraper.py @@ -14,15 +14,40 @@ from config import SCREENSCRAPER_PASSWORD, SCREENSCRAPER_USER from logger.logger import log from utils import get_version from utils.context import ctx_aiohttp_session -from utils.rate_limiter import RateLimiter +from utils.rate_limiter import ConcurrencyLimiter SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode() SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode() LOGIN_ERROR_CHECK: Final = "Erreur de login" -# ScreenScraper throttles the free tier to roughly one request per second. -SS_MAX_REQUESTS_PER_SECOND: Final[float] = 1 -_rate_limiter = RateLimiter(SS_MAX_REQUESTS_PER_SECOND) +# ScreenScraper enforces a per-account *thread* (concurrency) cap rather than a +# request rate. Because a request can take several seconds, spacing out request +# starts is not enough -- overlapping requests would exceed the cap and get +# rejected. We instead bound simultaneous in-flight requests. The free/anonymous +# tier allows a single thread; contributors and donors get more, which we learn +# from `ssuser.maxthreads` in each response and apply via the limiter. +SS_DEFAULT_MAX_THREADS: Final[int] = 1 +_concurrency_limiter = ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS) + + +def _update_thread_allowance(response: dict) -> None: + """Raise (or lower) the concurrency cap to the account's advertised threads. + + ScreenScraper reports the per-account thread allowance in + ``response.ssuser.maxthreads`` (higher for contributors/donors). Honouring it + lets supporters scrape with their full concurrency instead of the + conservative default. + """ + try: + max_threads = int(response["response"]["ssuser"]["maxthreads"]) + except (AttributeError, KeyError, TypeError, ValueError): + return + + if max_threads < 1 or max_threads == _concurrency_limiter.max_concurrency: + return + + log.info("ScreenScraper: setting thread allowance to %d", max_threads) + _concurrency_limiter.set_max_concurrency(max_threads) async def auth_middleware( @@ -62,22 +87,24 @@ class ScreenScraperService: request_timeout, ) try: - await _rate_limiter.acquire() - res = await aiohttp_session.get( - url, - headers={"user-agent": f"RomM/{get_version()}"}, - middlewares=(auth_middleware,), - timeout=ClientTimeout(total=request_timeout), - ) - res.raise_for_status() - res_text = await res.text() - if LOGIN_ERROR_CHECK in res_text: - log.error("Invalid ScreenScraper credentials") - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid ScreenScraper credentials", + async with _concurrency_limiter: + res = await aiohttp_session.get( + url, + headers={"user-agent": f"RomM/{get_version()}"}, + middlewares=(auth_middleware,), + timeout=ClientTimeout(total=request_timeout), ) - return await res.json() + res.raise_for_status() + res_text = await res.text() + if LOGIN_ERROR_CHECK in res_text: + log.error("Invalid ScreenScraper credentials") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid ScreenScraper credentials", + ) + data = await res.json() + _update_thread_allowance(data) + return data except aiohttp.ServerTimeoutError: # Retry the request once if it times out pass @@ -131,22 +158,24 @@ class ScreenScraperService: url, request_timeout, ) - await _rate_limiter.acquire() - res = await aiohttp_session.get( - url, - headers={"user-agent": f"RomM/{get_version()}"}, - middlewares=(auth_middleware,), - timeout=ClientTimeout(total=request_timeout), - ) - res.raise_for_status() - res_text = await res.text() - if LOGIN_ERROR_CHECK in res_text: - log.error("Invalid ScreenScraper credentials") - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid ScreenScraper credentials", + async with _concurrency_limiter: + res = await aiohttp_session.get( + url, + headers={"user-agent": f"RomM/{get_version()}"}, + middlewares=(auth_middleware,), + timeout=ClientTimeout(total=request_timeout), ) - return await res.json() + res.raise_for_status() + res_text = await res.text() + if LOGIN_ERROR_CHECK in res_text: + log.error("Invalid ScreenScraper credentials") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid ScreenScraper credentials", + ) + data = await res.json() + _update_thread_allowance(data) + return data except (aiohttp.ClientResponseError, aiohttp.ServerTimeoutError) as err: if isinstance(err, aiohttp.ClientResponseError): if err.status == http.HTTPStatus.UNAUTHORIZED: diff --git a/backend/adapters/services/screenscraper_types.py b/backend/adapters/services/screenscraper_types.py index f9aff5421..91491debc 100644 --- a/backend/adapters/services/screenscraper_types.py +++ b/backend/adapters/services/screenscraper_types.py @@ -1,4 +1,16 @@ -from typing import Literal, TypedDict +from typing import Literal, NotRequired, TypedDict + + +# Per-account limits returned in `response.ssuser`. Contributors/donors get a +# higher `maxthreads` allowance. All values arrive as strings. +# Reference: https://api.screenscraper.fr/webapi2.php +class SSUser(TypedDict): + id: NotRequired[str] + niveau: NotRequired[str] + maxthreads: NotRequired[str] + maxrequestspermin: NotRequired[str] + maxrequestsperday: NotRequired[str] + requeststoday: NotRequired[str] class SSText(TypedDict): diff --git a/backend/tests/adapters/services/conftest.py b/backend/tests/adapters/services/conftest.py index 11b4fcac3..9f98d00a3 100644 --- a/backend/tests/adapters/services/conftest.py +++ b/backend/tests/adapters/services/conftest.py @@ -5,24 +5,34 @@ import aiohttp import pytest import pytest_asyncio +from adapters.services.screenscraper import SS_DEFAULT_MAX_THREADS +from utils.rate_limiter import ConcurrencyLimiter + @pytest.fixture(autouse=True) def _disable_metadata_rate_limiters(monkeypatch): """Neutralize the pre-emptive rate limiters during tests. - Each metadata service spaces its requests via a module-level ``RateLimiter`` - whose ``acquire`` sleeps to stay under the provider's req/s cap. Letting it - run would add real delays and inject extra ``asyncio.sleep`` calls that - interfere with retry assertions, so we replace ``acquire`` with a no-op. + Each req/s-limited service spaces its requests via a module-level + ``RateLimiter`` whose ``acquire`` sleeps to stay under the provider's cap. + Letting it run would add real delays and inject extra ``asyncio.sleep`` calls + that interfere with retry assertions, so we replace ``acquire`` with a no-op. """ for module in ( "adapters.services.igdb", "adapters.services.mobygames", "adapters.services.retroachievements", - "adapters.services.screenscraper", ): monkeypatch.setattr(f"{module}._rate_limiter.acquire", AsyncMock()) + # ScreenScraper uses a concurrency limiter whose capacity mutates at runtime + # from `ssuser.maxthreads`. Swap in a fresh instance so each test starts from + # the default allowance with a clean (per-loop) waiter queue. + monkeypatch.setattr( + "adapters.services.screenscraper._concurrency_limiter", + ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS), + ) + @pytest_asyncio.fixture async def mock_ctx_aiohttp_session(): diff --git a/backend/tests/adapters/services/test_screenscraper.py b/backend/tests/adapters/services/test_screenscraper.py index 7e97fa316..99cc0f57a 100644 --- a/backend/tests/adapters/services/test_screenscraper.py +++ b/backend/tests/adapters/services/test_screenscraper.py @@ -149,12 +149,14 @@ class TestScreenScraperServiceUnit: mock_response.json.assert_called_once() @pytest.mark.asyncio - async def test_request_acquires_rate_limiter(self, service, monkeypatch): - """Test that the request reserves a rate-limiter slot before sending.""" + async def test_request_holds_concurrency_slot(self, service, monkeypatch): + """Test that the request acquires and releases a concurrency slot.""" + import adapters.services.screenscraper as ss_module + acquire_mock = AsyncMock() - monkeypatch.setattr( - "adapters.services.screenscraper._rate_limiter.acquire", acquire_mock - ) + release_mock = MagicMock() + monkeypatch.setattr(ss_module._concurrency_limiter, "acquire", acquire_mock) + monkeypatch.setattr(ss_module._concurrency_limiter, "release", release_mock) mock_session = AsyncMock() mock_response = MagicMock() @@ -170,6 +172,55 @@ class TestScreenScraperServiceUnit: await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") acquire_mock.assert_awaited_once() + release_mock.assert_called_once() + + @pytest.mark.asyncio + async def test_request_updates_thread_allowance_from_ssuser( + self, service, monkeypatch + ): + """Test that `ssuser.maxthreads` raises the concurrency cap (donor perk).""" + import adapters.services.screenscraper as ss_module + + assert ss_module._concurrency_limiter.max_concurrency == 1 + + mock_session = AsyncMock() + mock_response = MagicMock() + mock_response.json = AsyncMock( + return_value={"response": {"ssuser": {"maxthreads": "5"}}} + ) + mock_response.text = AsyncMock(return_value="{}") + mock_response.raise_for_status.return_value = None + mock_session.get.return_value = mock_response + + mock_context = MagicMock() + mock_context.get.return_value = mock_session + + with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context): + await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") + + assert ss_module._concurrency_limiter.max_concurrency == 5 + + @pytest.mark.asyncio + async def test_request_ignores_invalid_maxthreads(self, service, monkeypatch): + """Test that a missing or unparsable `maxthreads` leaves the cap untouched.""" + import adapters.services.screenscraper as ss_module + + mock_session = AsyncMock() + mock_response = MagicMock() + mock_response.json = AsyncMock( + return_value={"response": {"ssuser": {"maxthreads": "not-a-number"}}} + ) + mock_response.text = AsyncMock(return_value="{}") + mock_response.raise_for_status.return_value = None + mock_session.get.return_value = mock_response + + mock_context = MagicMock() + mock_context.get.return_value = mock_session + + with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context): + await service._request("https://api.screenscraper.fr/api2/jeuInfos.php") + + assert ss_module._concurrency_limiter.max_concurrency == 1 @pytest.mark.asyncio async def test_request_login_error(self, service): diff --git a/backend/tests/utils/test_rate_limiter.py b/backend/tests/utils/test_rate_limiter.py index 584ade24d..852cee77f 100644 --- a/backend/tests/utils/test_rate_limiter.py +++ b/backend/tests/utils/test_rate_limiter.py @@ -2,7 +2,7 @@ import asyncio import pytest -from utils.rate_limiter import RateLimiter +from utils.rate_limiter import ConcurrencyLimiter, RateLimiter def _record_sleeps(monkeypatch) -> list[float]: @@ -55,3 +55,81 @@ class TestRateLimiter: def test_non_positive_rate_raises(self, rate): with pytest.raises(ValueError): RateLimiter(requests_per_second=rate) + + +class TestConcurrencyLimiter: + @pytest.mark.parametrize("value", [0, -1]) + def test_non_positive_capacity_raises(self, value): + with pytest.raises(ValueError): + ConcurrencyLimiter(max_concurrency=value) + + async def test_acquire_release_tracks_in_flight(self): + limiter = ConcurrencyLimiter(max_concurrency=2) + + await limiter.acquire() + await limiter.acquire() + assert limiter.in_flight == 2 + + limiter.release() + assert limiter.in_flight == 1 + + async def test_context_manager_releases(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + + async with limiter: + assert limiter.in_flight == 1 + assert limiter.in_flight == 0 + + async def test_context_manager_releases_on_error(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + + with pytest.raises(RuntimeError): + async with limiter: + raise RuntimeError("boom") + + assert limiter.in_flight == 0 + + async def test_acquire_blocks_until_slot_freed(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + await limiter.acquire() + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) # Let the waiter run and block. + assert not waiter.done() + + limiter.release() + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 1 + + async def test_set_max_concurrency_wakes_waiters(self): + limiter = ConcurrencyLimiter(max_concurrency=1) + await limiter.acquire() + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) + assert not waiter.done() + + # Opening a second slot should release the blocked acquirer. + limiter.set_max_concurrency(2) + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 2 + + async def test_lowering_capacity_blocks_new_acquirers(self): + limiter = ConcurrencyLimiter(max_concurrency=2) + await limiter.acquire() + limiter.set_max_concurrency(1) + + waiter = asyncio.ensure_future(limiter.acquire()) + await asyncio.sleep(0) + # Already at the new cap, so the next acquirer must wait. + assert not waiter.done() + + limiter.release() + await asyncio.wait_for(waiter, timeout=1) + assert limiter.in_flight == 1 + + @pytest.mark.parametrize("value", [0, -1]) + def test_set_non_positive_capacity_raises(self, value): + limiter = ConcurrencyLimiter(max_concurrency=1) + with pytest.raises(ValueError): + limiter.set_max_concurrency(value) diff --git a/backend/utils/rate_limiter.py b/backend/utils/rate_limiter.py index 2b41f0d9b..f17db7fdc 100644 --- a/backend/utils/rate_limiter.py +++ b/backend/utils/rate_limiter.py @@ -1,4 +1,5 @@ import asyncio +from collections import deque class RateLimiter: @@ -26,3 +27,88 @@ class RateLimiter: delay = slot - now if delay > 0: await asyncio.sleep(delay) + + +class ConcurrencyLimiter: + """Caps the number of in-flight operations, with a runtime-adjustable capacity. + + Unlike :class:`RateLimiter`, which spaces out *when* requests start, this + bounds how many run *simultaneously*. It suits APIs that enforce a per-account + thread/connection cap (e.g. ScreenScraper) rather than a call rate: because a + slot is held for the whole request, slow responses can never cause overlapping + requests to exceed the cap. + + The capacity can be raised or lowered at runtime via + :meth:`set_max_concurrency` -- for instance once an API response reveals the + account's allowance. Use it as an async context manager so the slot is always + released, even if the wrapped request raises:: + + async with limiter: + await do_request() + + The implementation is loop-agnostic: waiter futures are created against the + running loop on acquisition, so a single shared instance works across the + distinct event loops used by, e.g., the test suite. + """ + + def __init__(self, max_concurrency: int) -> None: + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + self._max_concurrency = max_concurrency + self._in_flight = 0 + self._waiters: deque[asyncio.Future[None]] = deque() + + @property + def max_concurrency(self) -> int: + return self._max_concurrency + + @property + def in_flight(self) -> int: + return self._in_flight + + def set_max_concurrency(self, max_concurrency: int) -> None: + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + previous = self._max_concurrency + self._max_concurrency = max_concurrency + # Wake one waiter per newly opened slot; each re-checks capacity itself. + for _ in range(max(0, max_concurrency - previous)): + self._wake_next() + + async def acquire(self) -> None: + # Re-check on every wake-up: another coroutine may have taken the slot, + # or the capacity may have been lowered while we waited. + while self._in_flight >= self._max_concurrency: + loop = asyncio.get_running_loop() + waiter = loop.create_future() + self._waiters.append(waiter) + try: + try: + await waiter + finally: + self._waiters.remove(waiter) + except asyncio.CancelledError: + # We were granted a slot but cancelled before using it; pass the + # grant on so a waiting peer is not stranded. + if not waiter.cancelled(): + self._wake_next() + raise + self._in_flight += 1 + + def release(self) -> None: + if self._in_flight > 0: + self._in_flight -= 1 + self._wake_next() + + def _wake_next(self) -> None: + for waiter in self._waiters: + if not waiter.done(): + waiter.set_result(None) + return + + async def __aenter__(self) -> "ConcurrencyLimiter": + await self.acquire() + return self + + async def __aexit__(self, *exc_info: object) -> None: + self.release()