Files
romm/backend/utils/pegasus_exporter.py
Georges-Antoine Assi b324ea5e37 fix typo
2026-06-23 22:38:43 -04:00

426 lines
16 KiB
Python

from datetime import UTC, datetime
from pathlib import Path
from fastapi import Request
from handler.database import db_platform_handler, db_rom_handler
from handler.filesystem import fs_platform_handler, fs_resource_handler
from handler.metadata.base_handler import UniversalPlatformSlug as UPS
from logger.logger import log
from models.platform import Platform
from models.rom import Rom
from utils.filesystem import link_or_copy_file
# Map RomM platform slugs to canonical Pegasus (collection name, shortname) pairs.
# Source: https://www.pegasus-frontend.org/docs/user-guide/meta-files/
# Only platforms present in the official Pegasus "Common platform names" table are listed.
SLUG_TO_PEGASUS: dict[UPS, tuple[str, str]] = {
# Atari
UPS.ATARI2600: ("Atari 2600", "atari2600"),
UPS.ATARI5200: ("Atari 5200", "atari5200"),
UPS.ATARI7800: ("Atari 7800", "atari7800"),
UPS.ATARI800: ("Atari 800", "atari800"),
UPS.ATARI_JAGUAR_CD: ("Atari Jaguar CD", "atarijaguarcd"),
UPS.JAGUAR: ("Atari Jaguar", "atarijaguar"),
UPS.LYNX: ("Atari Lynx", "atarilynx"),
UPS.ATARI_ST: ("Atari ST", "atarist"),
UPS.ATARI_XEGS: ("Atari XE", "atarixe"),
# Nintendo handhelds
UPS.GB: ("Game Boy", "gb"),
UPS.GBC: ("Game Boy Color", "gbc"),
UPS.GBA: ("Game Boy Advance", "gba"),
UPS.NDS: ("Nintendo DS", "nds"),
UPS.N3DS: ("Nintendo 3DS", "3ds"),
UPS.G_AND_W: ("Nintendo Game-and-Watch", "gameandwatch"),
UPS.VIRTUALBOY: ("Nintendo VirtualBoy", "virtualboy"),
# Nintendo home consoles
UPS.NES: ("Nintendo Entertainment System", "nes"),
UPS.FAMICOM: ("Nintendo Entertainment System", "nes"),
UPS.FDS: ("Famicom Disk System", "fds"),
UPS.SNES: ("Super Nintendo Entertainment System", "snes"),
UPS.SFAM: ("Super Nintendo Entertainment System", "snes"),
UPS.N64: ("Nintendo 64", "n64"),
UPS.NGC: ("Nintendo GameCube", "gc"),
UPS.WII: ("Nintendo Wii", "wii"),
UPS.WIIU: ("Nintendo WiiU", "wiiu"),
UPS.SWITCH: ("Nintendo Switch", "switch"),
# Sega
UPS.SG1000: ("SEGA SG-1000", "sg1000"),
UPS.SMS: ("Sega Master System", "mastersystem"),
UPS.GENESIS: ("Sega Genesis", "genesis"),
UPS.SEGACD: ("SEGA CD", "segacd"),
UPS.SEGA32: ("SEGA 32X", "sega32x"),
UPS.SEGACD32: ("SEGA CD 32X", "sega32x"),
UPS.SATURN: ("Sega Saturn", "saturn"),
UPS.DC: ("Sega Dreamcast", "dreamcast"),
UPS.GAMEGEAR: ("SEGA GameGear", "gamegear"),
# Sony
UPS.PSX: ("PlayStation", "psx"),
UPS.PS2: ("PlayStation 2", "ps2"),
UPS.PS3: ("PlayStation 3", "ps3"),
UPS.PSP: ("PlayStation Portable", "psp"),
UPS.PSVITA: ("PlayStation Vita", "psvita"),
# Microsoft
UPS.XBOX: ("Xbox", "xbox"),
UPS.XBOX360: ("Xbox 360", "xbox360"),
# NEC / PC Engine
UPS.TG16: ("TurboGrafx 16", "turbografx16"),
UPS.TURBOGRAFX_CD: ("PC Engine CD", "pcengine"),
UPS.PC_FX: ("PC-FX", "pcfx"),
# SNK Neo Geo
UPS.NEOGEOAES: ("Neo Geo", "neogeo"),
UPS.NEOGEOMVS: ("Neo Geo", "neogeo"),
UPS.NEO_GEO_CD: ("Neo Geo CD", "neogeocd"),
UPS.NEO_GEO_POCKET: ("Neo Geo Pocket", "ngp"),
UPS.NEO_GEO_POCKET_COLOR: ("Neo Geo Pocket Color", "ngpc"),
# Commodore / Amiga
UPS.AMIGA: ("Amiga", "amiga"),
UPS.AMIGA_CD32: ("Amiga CD32", "amigacd32"),
UPS.COMMODORE_CDTV: ("Amiga CDTV", "amigacdtv"),
UPS.C64: ("Commodore 64", "c64"),
# Amstrad / Sharp / other home computers
UPS.ACPC: ("Amstrad CPC", "amstradcpc"),
UPS.SHARP_X68000: ("Sharp X68000", "x68000"),
UPS.MSX: ("MSX", "msx"),
UPS.DOS: ("DOS", "dos"),
UPS.PC_BOOTER: ("PC", "pc"),
UPS.LINUX: ("Linux", "linux"),
UPS.MAC: ("Macintosh", "macintosh"),
UPS.ANDROID: ("Android", "android"),
UPS.WIN: ("Windows", "windows"),
# Arcade
UPS.ARCADE: ("Arcade", "arcade"),
# Other consoles / platforms
UPS._3DO: ("3DO", "3do"),
UPS.APPLEII: ("Apple II", "apple2"),
UPS.COLECOVISION: ("ColecoVision", "colecovision"),
UPS.INTELLIVISION: ("Intellivision", "intellivision"),
UPS.ODYSSEY_2: ("Odyssey 2", "odyssey2"),
UPS.VECTREX: ("Vectrex", "vectrex"),
UPS.SUPERGRAFX: ("SuperGrafx", "supergrafx"),
UPS.SAM_COUPE: ("SAM coupe", "samcoupe"),
UPS.SCUMMVM: ("Scumm VM", "scummvm"),
UPS.TIC_80: ("TIC80", "tic80"),
UPS.DRAGON_32_SLASH_64: ("Dragon 32", "dragon32"),
# PC-88 / PC-98
UPS.PC_8800_SERIES: ("PC 88", "pc88"),
UPS.PC_9800_SERIES: ("PC 98", "pc98"),
# WonderSwan
UPS.WONDERSWAN: ("WonderSwan", "wonderswan"),
UPS.SWANCRYSTAL: ("WonderSwan/Color", "wonderswancolor"),
# ZX Spectrum / ZX81
UPS.ZXS: ("ZX Spectrum", "zxspectrum"),
UPS.ZX81: ("ZX81", "zx81"),
UPS.STEAM: ("Steam", "steam"),
}
# Map Pegasus asset keys to subdirectory names inside assets/
ASSET_DIRS: dict[str, str] = {
"box_front": "covers",
"box_back": "backcovers",
"box_full": "boxes",
"screenshot": "screenshots",
"titlescreen": "titlescreens",
"video": "videos",
"marquee": "marquees",
"cartridge": "cartridges",
"logo": "logos",
"background": "backgrounds",
"bezel": "bezels",
}
class PegasusExporter:
"""Export RomM collections to Pegasus Frontend metadata.pegasus.txt format"""
def __init__(self, local_export: bool = False):
self.local_export = local_export
@staticmethod
def _resolve_collection(platform: Platform) -> tuple[str, str]:
"""Return (collection_name, shortname) for a platform."""
if platform.slug in SLUG_TO_PEGASUS:
return SLUG_TO_PEGASUS[UPS(platform.slug)]
return (platform.custom_name or platform.name, platform.slug)
def _format_release_date(self, timestamp: int) -> str:
"""Format release date to YYYY-MM-DD format"""
return datetime.fromtimestamp(timestamp / 1000, tz=UTC).strftime("%Y-%m-%d")
def _format_rating(self, average_rating: float) -> str:
"""Format rating as percentage (0-100%). Input is on 0-10 scale."""
clamped_rating = max(0, min(100, average_rating))
return f"{int(clamped_rating)}%"
def _escape_multiline(self, text: str) -> str:
"""Indent continuation lines for multi-line values in Pegasus format"""
lines = text.strip().splitlines()
if len(lines) <= 1:
return text.strip()
return (
lines[0]
+ "\n"
+ "\n".join(f" {line}" if line.strip() else " ." for line in lines[1:])
)
def _collect_assets(self, rom: Rom) -> dict[str, Path]:
"""Collect available media assets for a ROM using model properties.
Returns a dict mapping Pegasus asset key to the absolute source file path.
"""
assets: dict[str, Path] = {}
if rom.path_cover_l:
assets["box_front"] = fs_resource_handler.validate_path(rom.path_cover_l)
if rom.path_screenshots:
assets["screenshot"] = fs_resource_handler.validate_path(
rom.path_screenshots[0]
)
if rom.path_video:
assets["video"] = fs_resource_handler.validate_path(rom.path_video)
# Extended media from screenscraper / gamelist metadata
ss = rom.ss_metadata or {}
gl = rom.gamelist_metadata or {}
extended: dict[str, list[str]] = {
"box_full": [ss.get("box3d_path", ""), gl.get("box3d_path", "")],
"box_back": [ss.get("box2d_back_path", ""), gl.get("box2d_back_path", "")],
"logo": [ss.get("logo_path", "")],
"marquee": [gl.get("marquee_path", "")],
"cartridge": [ss.get("physical_path", ""), gl.get("physical_path", "")],
"background": [ss.get("fanart_path", ""), gl.get("fanart_path", "")],
"titlescreen": [
ss.get("title_screen_path", ""),
gl.get("title_screen_path", ""),
],
"bezel": [ss.get("bezel_path", "")],
}
for pegasus_key, candidates in extended.items():
if pegasus_key in assets:
continue
for candidate in candidates:
if candidate:
assets[pegasus_key] = fs_resource_handler.validate_path(candidate)
break
return assets
def _create_game_entry(
self,
rom: Rom,
request: Request | None,
exported_assets: dict[str, str] | None = None,
) -> str:
"""Create a game entry for a ROM in Pegasus metadata format"""
lines: list[str] = []
# Game title (required)
lines.append(f"game: {rom.name or rom.fs_name}")
# File path
if self.local_export:
lines.append(f"file: {rom.fs_name}")
else:
if request is None:
raise ValueError(
"Request object must be provided for non-local exports"
)
lines.append(
f"file: {request.url_for('get_rom_content', id=rom.id, file_name=rom.fs_name)}"
)
# Sort title (use fs_name_no_tags if different from name)
if rom.name and rom.fs_name_no_tags and rom.name != rom.fs_name_no_tags:
lines.append(f"sort-by: {rom.fs_name_no_tags}")
# Developers and publishers
if rom.metadatum and rom.metadatum.companies:
if len(rom.metadatum.companies) > 0:
lines.append(f"developer: {rom.metadatum.companies[0]}")
if len(rom.metadatum.companies) > 1:
lines.append(f"publisher: {rom.metadatum.companies[1]}")
# Genres
if rom.metadatum and rom.metadatum.genres:
for genre in rom.metadatum.genres:
lines.append(f"genre: {genre}")
# Tags (rom tags like region, language info)
if rom.tags:
for tag in rom.tags:
lines.append(f"tag: {tag}")
# Player count
if rom.metadatum and rom.metadatum.player_count:
lines.append(f"players: {rom.metadatum.player_count}")
# Summary / description
if rom.summary:
lines.append(f"description: {self._escape_multiline(rom.summary)}")
# Release date
if rom.metadatum and rom.metadatum.first_release_date is not None:
lines.append(
f"release: {self._format_release_date(rom.metadatum.first_release_date)}"
)
# Rating
if rom.metadatum and rom.metadatum.average_rating is not None:
lines.append(f"rating: {self._format_rating(rom.metadatum.average_rating)}")
# Asset references (relative paths to asset files)
if exported_assets:
for asset_key, rel_path in exported_assets.items():
lines.append(f"assets.{asset_key}: {rel_path}")
# RomM-specific extensions (x-* fields)
if rom.regions:
lines.append(f"x-region: {', '.join(rom.regions)}")
if rom.languages:
lines.append(f"x-language: {', '.join(rom.languages)}")
lines.append(f"x-romm-id: {rom.id}")
return "\n".join(lines)
def _copy_asset(self, source: Path, dest: Path) -> bool:
"""Place ``source`` at ``dest`` via hardlink (same filesystem) or copy
(otherwise). Returns True on success."""
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists():
return True
try:
link_or_copy_file(source, dest)
return True
except OSError as e:
log.warning(f"Failed to copy {source} -> {dest}: {e}")
return False
def export_platform_to_pegasus(
self, platform_id: int, request: Request | None
) -> str:
"""Export a platform's ROMs to metadata.pegasus.txt format
Args:
platform_id: Platform ID to export
request: FastAPI request object for URL generation
Returns:
String content in Pegasus metadata format
"""
platform = db_platform_handler.get_platform(platform_id)
if not platform:
raise ValueError(f"Platform with ID {platform_id} not found")
roms = db_rom_handler.get_roms_scalar(platform_ids=[platform_id])
lines: list[str] = []
# Collection header
collection_name, shortname = self._resolve_collection(platform)
lines.append(f"collection: {collection_name}")
lines.append(f"shortname: {shortname}")
lines.append("")
# Game entries
game_count = 0
for rom in roms:
if not rom.missing_from_fs:
if game_count > 0:
lines.append("")
lines.append(self._create_game_entry(rom, request=request))
game_count += 1
log.info(f"Exported {game_count} ROMs for platform {platform.name}")
return "\n".join(lines) + "\n"
async def export_platform_to_file(
self,
platform_id: int,
request: Request | None,
) -> bool:
"""Export platform ROMs to metadata.pegasus.txt file in the platform's directory,
including media assets copied into a local assets/ folder.
Args:
platform_id: Platform ID to export
request: FastAPI request object for URL generation
Returns:
True if successful, False otherwise
"""
try:
platform = db_platform_handler.get_platform(platform_id)
if not platform:
log.error(f"Platform with ID {platform_id} not found")
return False
platform_fs_structure = fs_platform_handler.get_platform_fs_structure(
platform.fs_slug
)
platform_dir = fs_platform_handler.base_path / platform_fs_structure
roms = db_rom_handler.get_roms_scalar(platform_ids=[platform_id])
lines: list[str] = []
# Collection header
collection_name, shortname = self._resolve_collection(platform)
lines.append(f"collection: {collection_name}")
lines.append(f"shortname: {shortname}")
lines.append("")
game_count = 0
for rom in roms:
if rom.missing_from_fs:
continue
exported_assets: dict[str, str] = {}
if self.local_export:
assets = self._collect_assets(rom)
for asset_key, source_path in assets.items():
subdir = ASSET_DIRS.get(asset_key, asset_key)
dest_name = f"{rom.fs_name_no_ext}{source_path.suffix}"
dest_path = platform_dir / "assets" / subdir / dest_name
if self._copy_asset(source_path, dest_path):
exported_assets[asset_key] = f"assets/{subdir}/{dest_name}"
if game_count > 0:
lines.append("")
lines.append(
self._create_game_entry(
rom,
request=request,
exported_assets=exported_assets if exported_assets else None,
)
)
game_count += 1
content = "\n".join(lines) + "\n"
await fs_platform_handler.write_file(
content.encode("utf-8"),
platform_fs_structure,
"metadata.pegasus.txt",
)
log.info(
f"Exported metadata.pegasus.txt with {game_count} ROMs for platform {platform.name}"
)
return True
except Exception as e:
log.error(
f"Failed to export metadata.pegasus.txt for platform {platform_id}: {e}"
)
return False