complete rest of handler

This commit is contained in:
Georges-Antoine Assi
2025-09-20 19:38:26 -04:00
parent 0176ce3997
commit 7a5185fe22
9 changed files with 58 additions and 62 deletions

View File

@@ -111,6 +111,15 @@ class RetroAchievementsService:
log.error("Error decoding JSON response from ScreenScraper: %s", exc)
return {}
async def get_achievement_of_the_week(self) -> dict:
"""Retrieve the achievement of the week.
Reference: https://api-docs.retroachievements.org/v1/get-achievement-of-the-week.html
"""
url = self.url.joinpath("API_GetAchievementOfTheWeek.php")
response = await self._request(str(url))
return response
async def get_game_extended_details(self, game_id: int) -> RAGameExtendedDetails:
"""Retrieve extended metadata about a game, targeted via its unique ID.

View File

@@ -186,7 +186,7 @@ class SteamGridDBService:
Reference: https://www.steamgriddb.com/api/v2#tag/GAMES/operation/getGameById
"""
url = self.url.joinpath("games", str(game_id))
url = self.url.joinpath("games/id", str(game_id))
response = await self._request(str(url))
if not response or "data" not in response:
return None

View File

@@ -152,7 +152,6 @@ class FlashpointHandler(MetadataHandler):
if not self.is_enabled():
return False
# make a request to the Flashpoint API to check if the API is working
try:
response = await self._request(self.platforms_url, {})
except Exception as e:

View File

@@ -113,6 +113,7 @@ class HasheousHandler(MetadataHandler):
if DEV_MODE
else "https://hasheous.org/api/v1"
)
self.healthcheck_endpoint = f"{self.BASE_URL}/HealthCheck"
self.platform_endpoint = f"{self.BASE_URL}/Lookup/Platforms"
self.games_endpoint = f"{self.BASE_URL}/Lookup/ByHash"
self.proxy_igdb_game_endpoint = f"{self.BASE_URL}/MetadataProxy/IGDB/Game"
@@ -130,19 +131,18 @@ class HasheousHandler(MetadataHandler):
return HASHEOUS_API_ENABLED
async def heartbeat(self) -> bool:
return True
if not self.is_enabled():
return False
# if not self.is_enabled():
# return False
httpx_client = ctx_httpx_client.get()
try:
response = await httpx_client.get(self.healthcheck_endpoint)
response.raise_for_status()
except Exception as e:
log.error("Error checking Hasheous API: %s", e)
return False
# # make a request to the Hasheous API to check if the API is working
# try:
# response = await self._request(self.platform_endpoint, "GET")
# except Exception as e:
# log.error("Error checking Hasheous API: %s", e)
# return False
# return bool(response)
return bool(response)
async def _request(
self,
@@ -185,7 +185,6 @@ class HasheousHandler(MetadataHandler):
# Make the request
res = await httpx_client.request(method, **request_kwargs)
res.raise_for_status()
return res.json()
except httpx.HTTPStatusError as exc:

View File

@@ -171,6 +171,7 @@ class HowLongToBeatHandler(MetadataHandler):
def __init__(self) -> None:
self.base_url = "https://howlongtobeat.com"
self.user_endpoint = f"{self.base_url}/api/user"
self.search_url = f"{self.base_url}/api/seek/28b235595e8e894c"
self.min_similarity_score: Final = 0.85
@@ -179,22 +180,19 @@ class HowLongToBeatHandler(MetadataHandler):
return HLTB_API_ENABLED
async def heartbeat(self) -> bool:
if not self.is_enabled():
return False
httpx_client = ctx_httpx_client.get()
try:
response = await httpx_client.get(self.user_endpoint)
response.raise_for_status()
except Exception as e:
log.error("Error checking HLTB API: %s", e)
return False
return True
# if not self.is_enabled():
# return False
# # make a request to the HLTB API to check if the API is working
# try:
# async with ctx_httpx_client() as client:
# response = await client.get(self.search_url, params={"q": "test"})
# response.raise_for_status()
# except Exception as e:
# log.error("Error checking HLTB API: %s", e)
# return False
# return True
@staticmethod
def extract_hltb_id_from_filename(fs_name: str) -> int | None:
"""Extract HLTB ID from filename tag like (hltb-12345)."""

View File

@@ -313,7 +313,6 @@ class IGDBHandler(MetadataHandler):
if not self.is_enabled():
return False
# make a request to the IGDB API to check if the API is working
try:
roms = await self.igdb_service.list_games(
fields=["id"],

View File

@@ -48,25 +48,23 @@ class PlaymatchHandler(MetadataHandler):
def __init__(self):
self.base_url = "https://playmatch.retrorealm.dev/api"
self.identify_url = f"{self.base_url}/identify/ids"
self.healthcheck_url = f"{self.base_url}/health"
@classmethod
def is_enabled(cls) -> bool:
return PLAYMATCH_API_ENABLED
async def heartbeat(self) -> bool:
return True
if not self.is_enabled():
return False
# if not self.is_enabled():
# return False
try:
response = await self._request(self.healthcheck_url, {})
except Exception as e:
log.error("Error checking Playmatch API: %s", e)
return False
# # make a request to the Playmatch API to check if the API is working
# try:
# response = await self._request(self.identify_url, {"hashes": ["test"]})
# except Exception as e:
# log.error("Error checking Playmatch API: %s", e)
# return False
# return bool(response)
return bool(response)
async def _request(self, url: str, query: dict) -> dict:
"""

View File

@@ -133,19 +133,16 @@ class RAHandler(MetadataHandler):
return bool(RETROACHIEVEMENTS_API_KEY)
async def heartbeat(self) -> bool:
return True
if not self.is_enabled():
return False
# if not self.is_enabled():
# return False
try:
response = await self.ra_service.get_achievement_of_the_week()
except Exception as e:
log.error("Error checking RetroAchievements API: %s", e)
return False
# # make a request to the RetroAchievements API to check if the API is working
# try:
# response = await self.ra_service.get_console_ids()
# except Exception as e:
# log.error("Error checking RetroAchievements API: %s", e)
# return False
# return bool(response)
return bool(response)
@staticmethod
def extract_ra_id_from_filename(fs_name: str) -> int | None:

View File

@@ -35,19 +35,16 @@ class SGDBBaseHandler(MetadataHandler):
return bool(STEAMGRIDDB_API_KEY)
async def heartbeat(self) -> bool:
return True
if not self.is_enabled():
return False
# if not self.is_enabled():
# return False
try:
response = await self.sgdb_service.get_game_by_id(1)
except Exception as e:
log.error("Error checking SteamGridDB API: %s", e)
return False
# # make a request to the SteamGridDB API to check if the API is working
# try:
# response = await self.sgdb_service.get_platforms()
# except Exception as e:
# log.error("Error checking SteamGridDB API: %s", e)
# return False
# return bool(response)
return bool(response)
async def get_rom_by_id(self, sgdb_id: int) -> SGDBRom:
"""Get ROM details by SteamGridDB ID."""