Files
romm/backend/tests/utils/test_rate_limiter.py
Claude cd41422660 Use a concurrency limiter for ScreenScraper, honoring account threads
ScreenScraper enforces a per-account *thread* (concurrency) cap rather than
a request rate. Requests can take several seconds, so spacing out request
starts at 1/s could still leave multiple requests overlapping in flight,
exceeding the cap and getting rejected with ScreenScraper's custom HTTP codes.

- Add ConcurrencyLimiter: a runtime-resizable, loop-agnostic limiter that
  bounds simultaneous in-flight operations (held for the whole request via
  async context manager), instead of spacing request starts.
- Switch the ScreenScraper service from the req/s RateLimiter to a module-level
  ConcurrencyLimiter defaulting to a single thread.
- Recognize contributor/donor accounts: parse `ssuser.maxthreads` from each
  response and raise the concurrency allowance to match, so supporters scrape
  with their full thread count instead of the conservative default.

Adds unit tests for the limiter (blocking, wake-on-release, runtime resize)
and for the ScreenScraper slot-holding and thread-allowance updates.

https://claude.ai/code/session_01133QQuWvq8Zm25DZMP9PVr
2026-06-06 16:02:13 +00:00

136 lines
4.3 KiB
Python

import asyncio
import pytest
from utils.rate_limiter import ConcurrencyLimiter, RateLimiter
def _record_sleeps(monkeypatch) -> list[float]:
"""Replace asyncio.sleep with a no-op that records requested delays."""
sleeps: list[float] = []
async def fake_sleep(delay: float) -> None:
sleeps.append(delay)
monkeypatch.setattr(asyncio, "sleep", fake_sleep)
return sleeps
class TestRateLimiter:
async def test_first_acquire_does_not_sleep(self, monkeypatch):
sleeps = _record_sleeps(monkeypatch)
limiter = RateLimiter(requests_per_second=4)
await limiter.acquire()
assert sleeps == []
async def test_slots_advance_by_interval(self, monkeypatch):
sleeps = _record_sleeps(monkeypatch)
limiter = RateLimiter(requests_per_second=4)
for _ in range(5):
await limiter.acquire()
# First grant is immediate; the next four are spaced one interval apart.
assert len(sleeps) == 4
for index, delay in enumerate(sleeps, start=1):
assert delay == pytest.approx(index * 0.25, abs=0.05)
async def test_concurrent_callers_are_spaced(self):
rate = 50
interval = 1 / rate
count = 5
limiter = RateLimiter(requests_per_second=rate)
loop = asyncio.get_running_loop()
start = loop.time()
await asyncio.gather(*(limiter.acquire() for _ in range(count)))
elapsed = loop.time() - start
assert elapsed >= (count - 1) * interval
assert elapsed < (count - 1) * interval + 0.5
@pytest.mark.parametrize("rate", [0, -1, -0.5])
def test_non_positive_rate_raises(self, rate):
with pytest.raises(ValueError):
RateLimiter(requests_per_second=rate)
class TestConcurrencyLimiter:
@pytest.mark.parametrize("value", [0, -1])
def test_non_positive_capacity_raises(self, value):
with pytest.raises(ValueError):
ConcurrencyLimiter(max_concurrency=value)
async def test_acquire_release_tracks_in_flight(self):
limiter = ConcurrencyLimiter(max_concurrency=2)
await limiter.acquire()
await limiter.acquire()
assert limiter.in_flight == 2
limiter.release()
assert limiter.in_flight == 1
async def test_context_manager_releases(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
async with limiter:
assert limiter.in_flight == 1
assert limiter.in_flight == 0
async def test_context_manager_releases_on_error(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
with pytest.raises(RuntimeError):
async with limiter:
raise RuntimeError("boom")
assert limiter.in_flight == 0
async def test_acquire_blocks_until_slot_freed(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
await limiter.acquire()
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0) # Let the waiter run and block.
assert not waiter.done()
limiter.release()
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 1
async def test_set_max_concurrency_wakes_waiters(self):
limiter = ConcurrencyLimiter(max_concurrency=1)
await limiter.acquire()
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0)
assert not waiter.done()
# Opening a second slot should release the blocked acquirer.
limiter.set_max_concurrency(2)
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 2
async def test_lowering_capacity_blocks_new_acquirers(self):
limiter = ConcurrencyLimiter(max_concurrency=2)
await limiter.acquire()
limiter.set_max_concurrency(1)
waiter = asyncio.ensure_future(limiter.acquire())
await asyncio.sleep(0)
# Already at the new cap, so the next acquirer must wait.
assert not waiter.done()
limiter.release()
await asyncio.wait_for(waiter, timeout=1)
assert limiter.in_flight == 1
@pytest.mark.parametrize("value", [0, -1])
def test_set_non_positive_capacity_raises(self, value):
limiter = ConcurrencyLimiter(max_concurrency=1)
with pytest.raises(ValueError):
limiter.set_max_concurrency(value)