Use a concurrency limiter for ScreenScraper, honoring account threads

ScreenScraper enforces a per-account *thread* (concurrency) cap rather than
a request rate. Requests can take several seconds, so spacing out request
starts at 1/s could still leave multiple requests overlapping in flight,
exceeding the cap and getting rejected with ScreenScraper's custom HTTP codes.

- Add ConcurrencyLimiter: a runtime-resizable, loop-agnostic limiter that
  bounds simultaneous in-flight operations (held for the whole request via
  async context manager), instead of spacing request starts.
- Switch the ScreenScraper service from the req/s RateLimiter to a module-level
  ConcurrencyLimiter defaulting to a single thread.
- Recognize contributor/donor accounts: parse `ssuser.maxthreads` from each
  response and raise the concurrency allowance to match, so supporters scrape
  with their full thread count instead of the conservative default.

Adds unit tests for the limiter (blocking, wake-on-release, runtime resize)
and for the ScreenScraper slot-holding and thread-allowance updates.

https://claude.ai/code/session_01133QQuWvq8Zm25DZMP9PVr
This commit is contained in:
Claude
2026-06-06 16:02:13 +00:00
parent 8cc79f9e5c
commit cd41422660
6 changed files with 312 additions and 46 deletions

View File

@@ -14,15 +14,40 @@ from config import SCREENSCRAPER_PASSWORD, SCREENSCRAPER_USER
from logger.logger import log
from utils import get_version
from utils.context import ctx_aiohttp_session
from utils.rate_limiter import RateLimiter
from utils.rate_limiter import ConcurrencyLimiter
SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode()
SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode()
LOGIN_ERROR_CHECK: Final = "Erreur de login"
# ScreenScraper throttles the free tier to roughly one request per second.
SS_MAX_REQUESTS_PER_SECOND: Final[float] = 1
_rate_limiter = RateLimiter(SS_MAX_REQUESTS_PER_SECOND)
# ScreenScraper enforces a per-account *thread* (concurrency) cap rather than a
# request rate. Because a request can take several seconds, spacing out request
# starts is not enough -- overlapping requests would exceed the cap and get
# rejected. We instead bound simultaneous in-flight requests. The free/anonymous
# tier allows a single thread; contributors and donors get more, which we learn
# from `ssuser.maxthreads` in each response and apply via the limiter.
SS_DEFAULT_MAX_THREADS: Final[int] = 1
_concurrency_limiter = ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS)
def _update_thread_allowance(response: dict) -> None:
"""Raise (or lower) the concurrency cap to the account's advertised threads.
ScreenScraper reports the per-account thread allowance in
``response.ssuser.maxthreads`` (higher for contributors/donors). Honouring it
lets supporters scrape with their full concurrency instead of the
conservative default.
"""
try:
max_threads = int(response["response"]["ssuser"]["maxthreads"])
except (AttributeError, KeyError, TypeError, ValueError):
return
if max_threads < 1 or max_threads == _concurrency_limiter.max_concurrency:
return
log.info("ScreenScraper: setting thread allowance to %d", max_threads)
_concurrency_limiter.set_max_concurrency(max_threads)
async def auth_middleware(
@@ -62,22 +87,24 @@ class ScreenScraperService:
request_timeout,
)
try:
await _rate_limiter.acquire()
res = await aiohttp_session.get(
url,
headers={"user-agent": f"RomM/{get_version()}"},
middlewares=(auth_middleware,),
timeout=ClientTimeout(total=request_timeout),
)
res.raise_for_status()
res_text = await res.text()
if LOGIN_ERROR_CHECK in res_text:
log.error("Invalid ScreenScraper credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid ScreenScraper credentials",
async with _concurrency_limiter:
res = await aiohttp_session.get(
url,
headers={"user-agent": f"RomM/{get_version()}"},
middlewares=(auth_middleware,),
timeout=ClientTimeout(total=request_timeout),
)
return await res.json()
res.raise_for_status()
res_text = await res.text()
if LOGIN_ERROR_CHECK in res_text:
log.error("Invalid ScreenScraper credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid ScreenScraper credentials",
)
data = await res.json()
_update_thread_allowance(data)
return data
except aiohttp.ServerTimeoutError:
# Retry the request once if it times out
pass
@@ -131,22 +158,24 @@ class ScreenScraperService:
url,
request_timeout,
)
await _rate_limiter.acquire()
res = await aiohttp_session.get(
url,
headers={"user-agent": f"RomM/{get_version()}"},
middlewares=(auth_middleware,),
timeout=ClientTimeout(total=request_timeout),
)
res.raise_for_status()
res_text = await res.text()
if LOGIN_ERROR_CHECK in res_text:
log.error("Invalid ScreenScraper credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid ScreenScraper credentials",
async with _concurrency_limiter:
res = await aiohttp_session.get(
url,
headers={"user-agent": f"RomM/{get_version()}"},
middlewares=(auth_middleware,),
timeout=ClientTimeout(total=request_timeout),
)
return await res.json()
res.raise_for_status()
res_text = await res.text()
if LOGIN_ERROR_CHECK in res_text:
log.error("Invalid ScreenScraper credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid ScreenScraper credentials",
)
data = await res.json()
_update_thread_allowance(data)
return data
except (aiohttp.ClientResponseError, aiohttp.ServerTimeoutError) as err:
if isinstance(err, aiohttp.ClientResponseError):
if err.status == http.HTTPStatus.UNAUTHORIZED:

View File

@@ -1,4 +1,16 @@
from typing import Literal, TypedDict
from typing import Literal, NotRequired, TypedDict
# Per-account limits returned in `response.ssuser`. Contributors/donors get a
# higher `maxthreads` allowance. All values arrive as strings.
# Reference: https://api.screenscraper.fr/webapi2.php
class SSUser(TypedDict):
id: NotRequired[str]
niveau: NotRequired[str]
maxthreads: NotRequired[str]
maxrequestspermin: NotRequired[str]
maxrequestsperday: NotRequired[str]
requeststoday: NotRequired[str]
class SSText(TypedDict):

View File

@@ -5,24 +5,34 @@ import aiohttp
import pytest
import pytest_asyncio
from adapters.services.screenscraper import SS_DEFAULT_MAX_THREADS
from utils.rate_limiter import ConcurrencyLimiter
@pytest.fixture(autouse=True)
def _disable_metadata_rate_limiters(monkeypatch):
"""Neutralize the pre-emptive rate limiters during tests.
Each metadata service spaces its requests via a module-level ``RateLimiter``
whose ``acquire`` sleeps to stay under the provider's req/s cap. Letting it
run would add real delays and inject extra ``asyncio.sleep`` calls that
interfere with retry assertions, so we replace ``acquire`` with a no-op.
Each req/s-limited service spaces its requests via a module-level
``RateLimiter`` whose ``acquire`` sleeps to stay under the provider's cap.
Letting it run would add real delays and inject extra ``asyncio.sleep`` calls
that interfere with retry assertions, so we replace ``acquire`` with a no-op.
"""
for module in (
"adapters.services.igdb",
"adapters.services.mobygames",
"adapters.services.retroachievements",
"adapters.services.screenscraper",
):
monkeypatch.setattr(f"{module}._rate_limiter.acquire", AsyncMock())
# ScreenScraper uses a concurrency limiter whose capacity mutates at runtime
# from `ssuser.maxthreads`. Swap in a fresh instance so each test starts from
# the default allowance with a clean (per-loop) waiter queue.
monkeypatch.setattr(
"adapters.services.screenscraper._concurrency_limiter",
ConcurrencyLimiter(SS_DEFAULT_MAX_THREADS),
)
@pytest_asyncio.fixture
async def mock_ctx_aiohttp_session():

View File

@@ -149,12 +149,14 @@ class TestScreenScraperServiceUnit:
mock_response.json.assert_called_once()
@pytest.mark.asyncio
async def test_request_acquires_rate_limiter(self, service, monkeypatch):
"""Test that the request reserves a rate-limiter slot before sending."""
async def test_request_holds_concurrency_slot(self, service, monkeypatch):
"""Test that the request acquires and releases a concurrency slot."""
import adapters.services.screenscraper as ss_module
acquire_mock = AsyncMock()
monkeypatch.setattr(
"adapters.services.screenscraper._rate_limiter.acquire", acquire_mock
)
release_mock = MagicMock()
monkeypatch.setattr(ss_module._concurrency_limiter, "acquire", acquire_mock)
monkeypatch.setattr(ss_module._concurrency_limiter, "release", release_mock)
mock_session = AsyncMock()
mock_response = MagicMock()
@@ -170,6 +172,55 @@ class TestScreenScraperServiceUnit:
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
acquire_mock.assert_awaited_once()
release_mock.assert_called_once()
@pytest.mark.asyncio
async def test_request_updates_thread_allowance_from_ssuser(
self, service, monkeypatch
):
"""Test that `ssuser.maxthreads` raises the concurrency cap (donor perk)."""
import adapters.services.screenscraper as ss_module
assert ss_module._concurrency_limiter.max_concurrency == 1
mock_session = AsyncMock()
mock_response = MagicMock()
mock_response.json = AsyncMock(
return_value={"response": {"ssuser": {"maxthreads": "5"}}}
)
mock_response.text = AsyncMock(return_value="{}")
mock_response.raise_for_status.return_value = None
mock_session.get.return_value = mock_response
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert ss_module._concurrency_limiter.max_concurrency == 5
@pytest.mark.asyncio
async def test_request_ignores_invalid_maxthreads(self, service, monkeypatch):
"""Test that a missing or unparsable `maxthreads` leaves the cap untouched."""
import adapters.services.screenscraper as ss_module
mock_session = AsyncMock()
mock_response = MagicMock()
mock_response.json = AsyncMock(
return_value={"response": {"ssuser": {"maxthreads": "not-a-number"}}}
)
mock_response.text = AsyncMock(return_value="{}")
mock_response.raise_for_status.return_value = None
mock_session.get.return_value = mock_response
mock_context = MagicMock()
mock_context.get.return_value = mock_session
with patch("adapters.services.screenscraper.ctx_aiohttp_session", mock_context):
await service._request("https://api.screenscraper.fr/api2/jeuInfos.php")
assert ss_module._concurrency_limiter.max_concurrency == 1
@pytest.mark.asyncio
async def test_request_login_error(self, service):

View File

@@ -2,7 +2,7 @@ import asyncio
import pytest
from utils.rate_limiter import RateLimiter
from utils.rate_limiter import ConcurrencyLimiter, RateLimiter
def _record_sleeps(monkeypatch) -> list[float]:
@@ -55,3 +55,81 @@ class TestRateLimiter:
def test_non_positive_rate_raises(self, rate):
with pytest.raises(ValueError):
RateLimiter(requests_per_second=rate)
class TestConcurrencyLimiter:
@pytest.mark.parametrize("value", [0, -1])
def test_non_positive_capacity_raises(self, value):
with pytest.raises(ValueError):
ConcurrencyLimiter(max_concurrency=value)
async def test_acquire_release_tracks_in_flight(self):
limiter = ConcurrencyLimiter(max_concurrency=2)
await limiter.acquire()
await limiter.acquire()
assert limiter.in_flight == 2
limiter.release()
assert limiter.in_flight == 1
async def test_context_manager_releases(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
async with limiter:
assert limiter.in_flight == 1
assert limiter.in_flight == 0
async def test_context_manager_releases_on_error(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
with pytest.raises(RuntimeError):
async with limiter:
raise RuntimeError("boom")
assert limiter.in_flight == 0
async def test_acquire_blocks_until_slot_freed(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
await limiter.acquire()
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0) # Let the waiter run and block.
assert not waiter.done()
limiter.release()
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 1
async def test_set_max_concurrency_wakes_waiters(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
await limiter.acquire()
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0)
assert not waiter.done()
# Opening a second slot should release the blocked acquirer.
limiter.set_max_concurrency(2)
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 2
async def test_lowering_capacity_blocks_new_acquirers(self):
limiter = ConcurrencyLimiter(max_concurrency=2)
await limiter.acquire()
limiter.set_max_concurrency(1)
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0)
# Already at the new cap, so the next acquirer must wait.
assert not waiter.done()
limiter.release()
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 1
@pytest.mark.parametrize("value", [0, -1])
def test_set_non_positive_capacity_raises(self, value):
limiter = ConcurrencyLimiter(max_concurrency=1)
with pytest.raises(ValueError):
limiter.set_max_concurrency(value)

View File

@@ -1,4 +1,5 @@
import asyncio
from collections import deque
class RateLimiter:
@@ -26,3 +27,88 @@ class RateLimiter:
delay = slot - now
if delay > 0:
await asyncio.sleep(delay)
class ConcurrencyLimiter:
"""Caps the number of in-flight operations, with a runtime-adjustable capacity.
Unlike :class:`RateLimiter`, which spaces out *when* requests start, this
bounds how many run *simultaneously*. It suits APIs that enforce a per-account
thread/connection cap (e.g. ScreenScraper) rather than a call rate: because a
slot is held for the whole request, slow responses can never cause overlapping
requests to exceed the cap.
The capacity can be raised or lowered at runtime via
:meth:`set_max_concurrency` -- for instance once an API response reveals the
account's allowance. Use it as an async context manager so the slot is always
released, even if the wrapped request raises::
async with limiter:
await do_request()
The implementation is loop-agnostic: waiter futures are created against the
running loop on acquisition, so a single shared instance works across the
distinct event loops used by, e.g., the test suite.
"""
def __init__(self, max_concurrency: int) -> None:
if max_concurrency < 1:
raise ValueError("max_concurrency must be at least 1")
self._max_concurrency = max_concurrency
self._in_flight = 0
self._waiters: deque[asyncio.Future[None]] = deque()
@property
def max_concurrency(self) -> int:
return self._max_concurrency
@property
def in_flight(self) -> int:
return self._in_flight
def set_max_concurrency(self, max_concurrency: int) -> None:
if max_concurrency < 1:
raise ValueError("max_concurrency must be at least 1")
previous = self._max_concurrency
self._max_concurrency = max_concurrency
# Wake one waiter per newly opened slot; each re-checks capacity itself.
for _ in range(max(0, max_concurrency - previous)):
self._wake_next()
async def acquire(self) -> None:
# Re-check on every wake-up: another coroutine may have taken the slot,
# or the capacity may have been lowered while we waited.
while self._in_flight >= self._max_concurrency:
loop = asyncio.get_running_loop()
waiter = loop.create_future()
self._waiters.append(waiter)
try:
try:
await waiter
finally:
self._waiters.remove(waiter)
except asyncio.CancelledError:
# We were granted a slot but cancelled before using it; pass the
# grant on so a waiting peer is not stranded.
if not waiter.cancelled():
self._wake_next()
raise
self._in_flight += 1
def release(self) -> None:
if self._in_flight > 0:
self._in_flight -= 1
self._wake_next()
def _wake_next(self) -> None:
for waiter in self._waiters:
if not waiter.done():
waiter.set_result(None)
return
async def __aenter__(self) -> "ConcurrencyLimiter":
await self.acquire()
return self
async def __aexit__(self, *exc_info: object) -> None:
self.release()