refactor(bgg): align provider with existing patterns

- Use services.api_request with new response_format="xml" option
- Rename metadata() to boardgame() for consistency
- Add helper functions for parsing XML metadata
- Add BGG rate limiter (2 req/s) in services.py
- Remove 202 handling (not used by search/thing endpoints)
- Fix boardgame detail page not showing most data #1133
- Fix boardgame metadata refresh not working #1134
- Use COLORS constant for lime theme in config
- Add lime CSS color for calendar page
- Fix date_key: "publish_date" → "year"
This commit is contained in:
FuzzyGrim
2026-01-28 21:07:45 +01:00
parent 072bd41922
commit 2fe051d92a
4 changed files with 262 additions and 251 deletions

View File

@@ -14,6 +14,7 @@ COLORS = {
"yellow": {"text": "text-yellow-400", "hex": "#eab308"},
"fuchsia": {"text": "text-fuchsia-400", "hex": "#d946ef"},
"cyan": {"text": "text-cyan-400", "hex": "#06b6d4"},
"lime": {"text": "text-lime-400", "hex": "#84cc16"},
}
# --- Central Configuration Dictionary ---
@@ -161,8 +162,8 @@ MEDIA_TYPE_CONFIG = {
"sample_query": "Catan",
"unicode_icon": "🎲",
"verb": ("play", "played"),
"text_color": "text-lime-400",
"stats_color": "#84cc16",
"text_color": COLORS["lime"]["text"],
"stats_color": COLORS["lime"]["hex"],
"svg_icon": """
<rect width="18" height="18" x="3" y="3" rx="2" ry="2"/>
<circle cx="8" cy="8" r="2"/>
@@ -170,7 +171,7 @@ MEDIA_TYPE_CONFIG = {
<circle cx="16" cy="16" r="2"/>
<path d="M8 16v-2"/>""",
"unit": ("#", "Play"),
"date_key": "publish_date",
"date_key": "year",
},
}

View File

@@ -1,25 +1,12 @@
"""BoardGameGeek (BGG) API provider for board game metadata.
IMPORTANT: As of November 2024, BGG requires API registration and authorization tokens.
To use this provider:
1. Register for API access at: https://boardgamegeek.com/using_the_xml_api
2. Obtain your Bearer token from BGG
3. Set BGG_API_TOKEN environment variable with your token
API Documentation: https://boardgamegeek.com/wiki/page/BGG_XML_API2
Registration & Authorization: https://boardgamegeek.com/using_the_xml_api
API Terms: https://boardgamegeek.com/wiki/page/XML_API_Terms_of_Use
Rate Limiting: BGG recommends waiting 5 seconds between requests
to avoid 500/503 errors.
"""
import logging
import time
import requests
from defusedxml import ElementTree
from django.conf import settings
from django.core.cache import cache
@@ -29,107 +16,114 @@ from app.providers import services
logger = logging.getLogger(__name__)
base_url = "https://boardgamegeek.com/xmlapi2"
# BGG's /thing endpoint has a max of 20 IDs per request
RESULTS_PER_PAGE = 20
# Rate limiting: Disabled - BGG handles rate limiting on their end
# If you encounter 429/500/503 errors, enable by setting MIN_REQUEST_INTERVAL > 0
MIN_REQUEST_INTERVAL = 0 # seconds
_rate_limit_state = {"last_request_time": 0}
# HTTP status codes
HTTP_UNAUTHORIZED = 401
HTTP_ACCEPTED = 202
def _rate_limit():
"""Ensure minimum time between BGG API requests."""
current_time = time.time()
time_since_last = current_time - _rate_limit_state["last_request_time"]
if time_since_last < MIN_REQUEST_INTERVAL:
sleep_time = MIN_REQUEST_INTERVAL - time_since_last
time.sleep(sleep_time)
_rate_limit_state["last_request_time"] = time.time()
def _bgg_request(endpoint, params=None):
"""Make a rate-limited request to BGG API.
Args:
endpoint: API endpoint (e.g., 'search', 'thing')
params: Query parameters dict
Returns:
ElementTree root element
Raises:
ProviderAPIError: On API errors
"""
_rate_limit()
url = f"{base_url}/{endpoint}"
headers = {
"User-Agent": "Yamtrack/1.0 (https://github.com/FuzzyGrim/Yamtrack)",
}
bgg_token = getattr(settings, "BGG_API_TOKEN", None)
if bgg_token:
headers["Authorization"] = f"Bearer {bgg_token}"
try:
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == HTTP_UNAUTHORIZED:
logger.error(
"BGG API requires authorization. Register at "
"https://boardgamegeek.com/using_the_xml_api "
"and set BGG_API_TOKEN in your environment.",
)
response.raise_for_status()
if response.status_code == HTTP_ACCEPTED:
logger.info("BGG queued request, retrying...")
time.sleep(2)
return _bgg_request(endpoint, params)
except requests.exceptions.HTTPError as error:
raise services.ProviderAPIError(Sources.BGG.value, error) from error
except ElementTree.ParseError as error:
logger.exception("Failed to parse BGG XML response")
def handle_error(error):
"""Handle BGG API errors."""
if error.response.status_code == requests.codes.unauthorized:
raise services.ProviderAPIError(
Sources.BGG.value,
error,
"Invalid XML response from BGG",
) from error
else:
return ElementTree.fromstring(response.text)
"BGG API requires authorization",
)
raise services.ProviderAPIError(Sources.BGG.value, error)
def _fetch_thumbnails(page_ids):
"""Fetch thumbnail images for a list of game IDs.
def search(query, page):
"""Search for board games on BoardGameGeek."""
cache_key = (
f"search_{Sources.BGG.value}_{MediaTypes.BOARDGAME.value}_{query}_{page}"
)
data = cache.get(cache_key)
Args:
page_ids: List of BGG game IDs
if data is None:
# Cache search results separately so page changes don't re-query BGG
search_results_cache_key = (
f"search_results_{Sources.BGG.value}_{MediaTypes.BOARDGAME.value}_{query}"
)
all_results = cache.get(search_results_cache_key)
Returns:
Dict mapping game_id to image URL
"""
if not page_ids:
if all_results is None:
try:
root = services.api_request(
Sources.BGG.value,
"GET",
f"{base_url}/search",
params={"query": query, "type": "boardgame"},
headers={"Authorization": f"Bearer {settings.BGG_API_TOKEN}"},
response_format="xml",
)
except requests.exceptions.HTTPError as error:
handle_error(error)
# Parse all results (BGG returns all at once, no server-side pagination)
all_results = []
for item in root.findall(".//item"):
game_id = item.get("id")
name_elem = item.find("name")
if name_elem is not None and game_id:
all_results.append(
{
"id": game_id,
"name": name_elem.get("value", "Unknown"),
}
)
cache.set(search_results_cache_key, all_results)
# Client-side pagination
total_results = len(all_results)
start_idx = (page - 1) * RESULTS_PER_PAGE
end_idx = start_idx + RESULTS_PER_PAGE
page_results = all_results[start_idx:end_idx]
# Fetch thumbnails for this page
thumbnails = _fetch_thumbnails([r["id"] for r in page_results])
results = [
{
"media_id": r["id"],
"source": Sources.BGG.value,
"media_type": MediaTypes.BOARDGAME.value,
"title": r["name"],
"image": thumbnails.get(r["id"], settings.IMG_NONE),
}
for r in page_results
]
data = helpers.format_search_response(
page,
RESULTS_PER_PAGE,
total_results,
results,
)
cache.set(cache_key, data)
return data
def _fetch_thumbnails(game_ids):
"""Fetch thumbnail images for a list of game IDs."""
if not game_ids:
return {}
try:
thing_params = {
"id": ",".join(page_ids),
# Don't filter by type - allows BGG to return expansions/accessories too
}
thing_root = _bgg_request("thing", thing_params)
root = services.api_request(
Sources.BGG.value,
"GET",
f"{base_url}/thing",
params={"id": ",".join(game_ids)},
headers={"Authorization": f"Bearer {settings.BGG_API_TOKEN}"},
response_format="xml",
)
thumbnails = {}
for item in thing_root.findall(".//item"):
for item in root.findall(".//item"):
game_id = item.get("id")
# Try thumbnail first, fall back to full image if no thumbnail
thumbnail_elem = item.find("thumbnail")
if thumbnail_elem is not None and thumbnail_elem.text:
thumbnails[game_id] = thumbnail_elem.text
@@ -137,178 +131,165 @@ def _fetch_thumbnails(page_ids):
image_elem = item.find("image")
if image_elem is not None and image_elem.text:
thumbnails[game_id] = image_elem.text
except services.ProviderAPIError:
logger.warning("Failed to fetch thumbnails from BGG")
except (requests.exceptions.HTTPError, services.ProviderAPIError):
logger.exception("Failed to fetch thumbnails from BGG")
return {}
else:
return thumbnails
def _build_search_results(page_ids, game_names, thumbnails):
"""Build search result list from game data.
def boardgame(media_id):
"""Return the metadata for the selected board game from BGG."""
cache_key = f"{Sources.BGG.value}_{MediaTypes.BOARDGAME.value}_{media_id}"
data = cache.get(cache_key)
Args:
page_ids: List of game IDs for current page
game_names: Dict mapping game_id to name
thumbnails: Dict mapping game_id to image URL
if data is None:
try:
root = services.api_request(
Sources.BGG.value,
"GET",
f"{base_url}/thing",
params={"id": media_id, "stats": "1"},
headers={"Authorization": f"Bearer {settings.BGG_API_TOKEN}"},
response_format="xml",
)
except requests.exceptions.HTTPError as error:
handle_error(error)
Returns:
List of search result dicts
"""
return [
{
"media_id": game_id,
item = root.find(".//item")
if item is None:
services.raise_not_found_error(Sources.BGG.value, media_id, "boardgame")
data = {
"media_id": media_id,
"source": Sources.BGG.value,
"source_url": f"https://boardgamegeek.com/boardgame/{media_id}",
"media_type": MediaTypes.BOARDGAME.value,
"title": game_names[game_id],
"image": thumbnails.get(game_id, settings.IMG_NONE),
"title": get_title(item),
"max_progress": None,
"image": get_image(item),
"synopsis": get_description(item),
"genres": get_categories(item),
"score": get_score(item),
"score_count": get_score_count(item),
"details": {
"year": get_year(item),
"players": get_players(item),
"playtime": get_playtime(item),
"min_age": get_min_age(item),
"designers": get_designers(item),
"publishers": get_publishers(item),
},
}
for game_id in page_ids
]
def search(query, page=1):
"""Search for board games on BoardGameGeek.
Args:
query: Search term
page: Page number for client-side pagination
Returns:
Formatted search response with results
"""
# Cache all game IDs separately from page-specific results
ids_cache_key = f"bgg_search_ids_{query.lower()}"
game_data = cache.get(ids_cache_key)
if not game_data:
# First search or cache expired - fetch from BGG
params = {
"query": query,
"type": "boardgame",
}
root = _bgg_request("search", params)
game_ids = []
game_names = {}
for item in root.findall(".//item"):
game_id = item.get("id")
name_elem = item.find("name")
if name_elem is not None and game_id:
game_ids.append(game_id)
game_names[game_id] = name_elem.get("value", "Unknown")
game_data = {"ids": game_ids, "names": game_names}
cache.set(ids_cache_key, game_data, 60 * 60 * 24)
game_ids = game_data["ids"]
game_names = game_data["names"]
page_cache_key = f"bgg_search_page_{query.lower()}_p{page}"
cached_page = cache.get(page_cache_key)
if cached_page:
return cached_page
total_results = len(game_ids)
start_idx = (page - 1) * RESULTS_PER_PAGE
end_idx = start_idx + RESULTS_PER_PAGE
page_ids = game_ids[start_idx:end_idx]
# Fetch thumbnails only for current page
thumbnails = _fetch_thumbnails(page_ids)
results = _build_search_results(page_ids, game_names, thumbnails)
data = helpers.format_search_response(
page=page,
per_page=RESULTS_PER_PAGE,
total_results=total_results,
results=results,
)
cache.set(page_cache_key, data, 60 * 60 * 24)
cache.set(cache_key, data)
return data
def metadata(media_id):
"""Get detailed metadata for a board game.
Args:
media_id: BGG thing ID
Returns:
Dict with game details
"""
cache_key = f"bgg_metadata_{media_id}"
cached = cache.get(cache_key)
if cached:
return cached
params = {
"id": media_id,
"stats": "1",
}
root = _bgg_request("thing", params)
item = root.find(".//item")
if item is None:
raise services.ProviderAPIError(
Sources.BGG.value,
None,
f"Game not found: {media_id}",
)
# Extract primary name
def get_title(item):
"""Return the primary name of the game."""
name_elem = item.find(".//name[@type='primary']")
title = name_elem.get("value", "Unknown") if name_elem is not None else "Unknown"
return name_elem.get("value", "Unknown") if name_elem is not None else "Unknown"
# Extract image
def get_image(item):
"""Return the image URL."""
image_elem = item.find("image")
image = image_elem.text if image_elem is not None else settings.IMG_NONE
if image_elem is not None and image_elem.text:
return image_elem.text
return settings.IMG_NONE
# Extract description
def get_description(item):
"""Return the description."""
desc_elem = item.find("description")
description = desc_elem.text if desc_elem is not None else ""
if desc_elem is not None and desc_elem.text:
return desc_elem.text
return "No synopsis available"
# Extract year
def get_year(item):
"""Return the year published."""
year_elem = item.find("yearpublished")
year = year_elem.get("value", "") if year_elem is not None else ""
return year_elem.get("value") if year_elem is not None else None
# Extract player counts
def get_players(item):
"""Return the player count range."""
minplayers_elem = item.find("minplayers")
maxplayers_elem = item.find("maxplayers")
minplayers = minplayers_elem.get("value", "") if minplayers_elem is not None else ""
maxplayers = maxplayers_elem.get("value", "") if maxplayers_elem is not None else ""
minplayers = minplayers_elem.get("value") if minplayers_elem is not None else None
maxplayers = maxplayers_elem.get("value") if maxplayers_elem is not None else None
# Extract playtime
if minplayers and maxplayers:
if minplayers == maxplayers:
return f"{minplayers} players"
return f"{minplayers}-{maxplayers} players"
return None
def get_playtime(item):
"""Return the playing time."""
playtime_elem = item.find("playingtime")
playtime = playtime_elem.get("value", "") if playtime_elem is not None else ""
playtime = playtime_elem.get("value") if playtime_elem is not None else None
return f"{playtime} min" if playtime else None
# Extract minimum age
def get_min_age(item):
"""Return the minimum age."""
minage_elem = item.find("minage")
minage = minage_elem.get("value", "") if minage_elem is not None else ""
minage = minage_elem.get("value") if minage_elem is not None else None
return f"{minage}+" if minage else None
# Extract BGG rating
def get_score(item):
"""Return the average rating."""
avg_rating_elem = item.find(".//statistics/ratings/average")
avg_rating = avg_rating_elem.get("value", "") if avg_rating_elem is not None else ""
if avg_rating_elem is not None:
try:
return round(float(avg_rating_elem.get("value", 0)), 1)
except ValueError:
return None
return None
result = {
"media_id": media_id,
"source": Sources.BGG.value,
"media_type": MediaTypes.BOARDGAME.value,
"title": title,
"image": image,
"description": description,
"year": year,
"players": f"{minplayers}-{maxplayers}" if minplayers and maxplayers else "",
"playtime": f"{playtime} min" if playtime else "",
"age": f"{minage}+" if minage else "",
"bgg_rating": avg_rating,
# Board games don't have max progress - tracks plays instead
"max_progress": None,
"related": {},
}
cache.set(cache_key, result, 60 * 60 * 24 * 7)
return result
def get_score_count(item):
"""Return the number of ratings."""
usersrated_elem = item.find(".//statistics/ratings/usersrated")
if usersrated_elem is not None:
try:
return int(usersrated_elem.get("value", 0))
except ValueError:
return None
return None
def get_categories(item):
"""Return the list of categories."""
categories = [
link.get("value")
for link in item.findall(".//link[@type='boardgamecategory']")
if link.get("value")
]
return categories if categories else None
def get_designers(item):
"""Return the list of designers."""
designers = [
link.get("value")
for link in item.findall(".//link[@type='boardgamedesigner']")
if link.get("value")
]
return ", ".join(designers) if designers else None
def get_publishers(item):
"""Return the first few publishers."""
publishers = [
link.get("value")
for link in item.findall(".//link[@type='boardgamepublisher']")[:3]
if link.get("value")
]
return ", ".join(publishers) if publishers else None

View File

@@ -2,6 +2,7 @@ import logging
import time
import requests
from defusedxml import ElementTree
from django.conf import settings
from pyrate_limiter import RedisBucket
from redis import ConnectionPool
@@ -75,6 +76,10 @@ session.mount(
"https://api.hardcover.app/v1/graphql",
LimiterAdapter(per_minute=50),
)
session.mount(
"https://boardgamegeek.com/xmlapi2",
LimiterAdapter(per_second=2),
)
class ProviderAPIError(Exception):
@@ -127,8 +132,29 @@ def raise_not_found_error(provider, media_id, media_type="item"):
raise ProviderAPIError(provider, mock_error, error_msg)
def api_request(provider, method, url, params=None, data=None, headers=None):
"""Make a request to the API and return the response as a dictionary."""
def api_request(
provider,
method,
url,
params=None,
data=None,
headers=None,
response_format="json",
):
"""Make a request to the API and return the response.
Args:
provider: Provider identifier for error messages
method: HTTP method ("GET" or "POST")
url: Request URL
params: Query params for GET, JSON body for POST
data: Raw data for POST
headers: Request headers
response_format: "json" (default) or "xml" for XML parsing
Returns:
Parsed JSON dict or ElementTree for XML
"""
try:
request_kwargs = {
"url": url,
@@ -146,6 +172,9 @@ def api_request(provider, method, url, params=None, data=None, headers=None):
response = request_func(**request_kwargs)
response.raise_for_status()
if response_format == "xml":
return ElementTree.fromstring(response.text)
return response.json()
except requests.exceptions.HTTPError as error:
@@ -165,6 +194,7 @@ def api_request(provider, method, url, params=None, data=None, headers=None):
params=params,
data=data,
headers=headers,
response_format=response_format,
)
raise error from None
@@ -208,7 +238,7 @@ def get_media_metadata(
if source == Sources.HARDCOVER.value
else openlibrary.book(media_id),
MediaTypes.COMIC.value: lambda: comicvine.comic(media_id),
MediaTypes.BOARDGAME.value: lambda: bgg.metadata(media_id),
MediaTypes.BOARDGAME.value: lambda: bgg.boardgame(media_id),
}
return metadata_retrievers[media_type]()

View File

@@ -24,6 +24,7 @@
--color-amber-700: oklch(55.5% 0.163 48.998);
--color-yellow-400: oklch(85.2% 0.199 91.936);
--color-yellow-600: oklch(68.1% 0.162 75.834);
--color-lime-400: oklch(84.1% 0.238 128.85);
--color-green-400: oklch(79.2% 0.209 151.711);
--color-emerald-300: oklch(84.5% 0.143 164.978);
--color-emerald-400: oklch(76.5% 0.177 163.223);
@@ -1632,6 +1633,9 @@
.text-indigo-600 {
color: var(--color-indigo-600);
}
.text-lime-400 {
color: var(--color-lime-400);
}
.text-orange-400 {
color: var(--color-orange-400);
}
@@ -2282,11 +2286,6 @@
--tw-ring-color: var(--color-indigo-200);
}
}
.focus\:ring-indigo-300 {
&:focus {
--tw-ring-color: var(--color-indigo-300);
}
}
.focus\:ring-indigo-400 {
&:focus {
--tw-ring-color: var(--color-indigo-400);