mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
426 lines
16 KiB
Python
426 lines
16 KiB
Python
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
from fastapi import Request
|
|
|
|
from handler.database import db_platform_handler, db_rom_handler
|
|
from handler.filesystem import fs_platform_handler, fs_resource_handler
|
|
from handler.metadata.base_handler import UniversalPlatformSlug as UPS
|
|
from logger.logger import log
|
|
from models.platform import Platform
|
|
from models.rom import Rom
|
|
from utils.filesystem import link_or_copy_file
|
|
|
|
# Map RomM platform slugs to canonical Pegasus (collection name, shortname) pairs.
|
|
# Source: https://www.pegasus-frontend.org/docs/user-guide/meta-files/
|
|
# Only platforms present in the official Pegasus "Common platform names" table are listed.
|
|
SLUG_TO_PEGASUS: dict[UPS, tuple[str, str]] = {
|
|
# Atari
|
|
UPS.ATARI2600: ("Atari 2600", "atari2600"),
|
|
UPS.ATARI5200: ("Atari 5200", "atari5200"),
|
|
UPS.ATARI7800: ("Atari 7800", "atari7800"),
|
|
UPS.ATARI800: ("Atari 800", "atari800"),
|
|
UPS.ATARI_JAGUAR_CD: ("Atari Jaguar CD", "atarijaguarcd"),
|
|
UPS.JAGUAR: ("Atari Jaguar", "atarijaguar"),
|
|
UPS.LYNX: ("Atari Lynx", "atarilynx"),
|
|
UPS.ATARI_ST: ("Atari ST", "atarist"),
|
|
UPS.ATARI_XEGS: ("Atari XE", "atarixe"),
|
|
# Nintendo handhelds
|
|
UPS.GB: ("Game Boy", "gb"),
|
|
UPS.GBC: ("Game Boy Color", "gbc"),
|
|
UPS.GBA: ("Game Boy Advance", "gba"),
|
|
UPS.NDS: ("Nintendo DS", "nds"),
|
|
UPS.N3DS: ("Nintendo 3DS", "3ds"),
|
|
UPS.G_AND_W: ("Nintendo Game-and-Watch", "gameandwatch"),
|
|
UPS.VIRTUALBOY: ("Nintendo VirtualBoy", "virtualboy"),
|
|
# Nintendo home consoles
|
|
UPS.NES: ("Nintendo Entertainment System", "nes"),
|
|
UPS.FAMICOM: ("Nintendo Entertainment System", "nes"),
|
|
UPS.FDS: ("Famicom Disk System", "fds"),
|
|
UPS.SNES: ("Super Nintendo Entertainment System", "snes"),
|
|
UPS.SFAM: ("Super Nintendo Entertainment System", "snes"),
|
|
UPS.N64: ("Nintendo 64", "n64"),
|
|
UPS.NGC: ("Nintendo GameCube", "gc"),
|
|
UPS.WII: ("Nintendo Wii", "wii"),
|
|
UPS.WIIU: ("Nintendo WiiU", "wiiu"),
|
|
UPS.SWITCH: ("Nintendo Switch", "switch"),
|
|
# Sega
|
|
UPS.SG1000: ("SEGA SG-1000", "sg1000"),
|
|
UPS.SMS: ("Sega Master System", "mastersystem"),
|
|
UPS.GENESIS: ("Sega Genesis", "genesis"),
|
|
UPS.SEGACD: ("SEGA CD", "segacd"),
|
|
UPS.SEGA32: ("SEGA 32X", "sega32x"),
|
|
UPS.SEGACD32: ("SEGA CD 32X", "sega32x"),
|
|
UPS.SATURN: ("Sega Saturn", "saturn"),
|
|
UPS.DC: ("Sega Dreamcast", "dreamcast"),
|
|
UPS.GAMEGEAR: ("SEGA GameGear", "gamegear"),
|
|
# Sony
|
|
UPS.PSX: ("PlayStation", "psx"),
|
|
UPS.PS2: ("PlayStation 2", "ps2"),
|
|
UPS.PS3: ("PlayStation 3", "ps3"),
|
|
UPS.PSP: ("PlayStation Portable", "psp"),
|
|
UPS.PSVITA: ("PlayStation Vita", "psvita"),
|
|
# Microsoft
|
|
UPS.XBOX: ("Xbox", "xbox"),
|
|
UPS.XBOX360: ("Xbox 360", "xbox360"),
|
|
# NEC / PC Engine
|
|
UPS.TG16: ("TurboGrafx 16", "turbografx16"),
|
|
UPS.TURBOGRAFX_CD: ("PC Engine CD", "pcengine"),
|
|
UPS.PC_FX: ("PC-FX", "pcfx"),
|
|
# SNK Neo Geo
|
|
UPS.NEOGEOAES: ("Neo Geo", "neogeo"),
|
|
UPS.NEOGEOMVS: ("Neo Geo", "neogeo"),
|
|
UPS.NEO_GEO_CD: ("Neo Geo CD", "neogeocd"),
|
|
UPS.NEO_GEO_POCKET: ("Neo Geo Pocket", "ngp"),
|
|
UPS.NEO_GEO_POCKET_COLOR: ("Neo Geo Pocket Color", "ngpc"),
|
|
# Commodore / Amiga
|
|
UPS.AMIGA: ("Amiga", "amiga"),
|
|
UPS.AMIGA_CD32: ("Amiga CD32", "amigacd32"),
|
|
UPS.COMMODORE_CDTV: ("Amiga CDTV", "amigacdtv"),
|
|
UPS.C64: ("Commodore 64", "c64"),
|
|
# Amstrad / Sharp / other home computers
|
|
UPS.ACPC: ("Amstrad CPC", "amstradcpc"),
|
|
UPS.SHARP_X68000: ("Sharp X68000", "x68000"),
|
|
UPS.MSX: ("MSX", "msx"),
|
|
UPS.DOS: ("DOS", "dos"),
|
|
UPS.PC_BOOTER: ("PC", "pc"),
|
|
UPS.LINUX: ("Linux", "linux"),
|
|
UPS.MAC: ("Macintosh", "macintosh"),
|
|
UPS.ANDROID: ("Android", "android"),
|
|
UPS.WIN: ("Windows", "windows"),
|
|
# Arcade
|
|
UPS.ARCADE: ("Arcade", "arcade"),
|
|
# Other consoles / platforms
|
|
UPS._3DO: ("3DO", "3do"),
|
|
UPS.APPLEII: ("Apple II", "apple2"),
|
|
UPS.COLECOVISION: ("ColecoVision", "colecovision"),
|
|
UPS.INTELLIVISION: ("Intellivision", "intellivision"),
|
|
UPS.ODYSSEY_2: ("Odyssey 2", "odyssey2"),
|
|
UPS.VECTREX: ("Vectrex", "vectrex"),
|
|
UPS.SUPERGRAFX: ("SuperGrafx", "supergrafx"),
|
|
UPS.SAM_COUPE: ("SAM coupe", "samcoupe"),
|
|
UPS.SCUMMVM: ("Scumm VM", "scummvm"),
|
|
UPS.TIC_80: ("TIC80", "tic80"),
|
|
UPS.DRAGON_32_SLASH_64: ("Dragon 32", "dragon32"),
|
|
# PC-88 / PC-98
|
|
UPS.PC_8800_SERIES: ("PC 88", "pc88"),
|
|
UPS.PC_9800_SERIES: ("PC 98", "pc98"),
|
|
# WonderSwan
|
|
UPS.WONDERSWAN: ("WonderSwan", "wonderswan"),
|
|
UPS.SWANCRYSTAL: ("WonderSwan/Color", "wonderswancolor"),
|
|
# ZX Spectrum / ZX81
|
|
UPS.ZXS: ("ZX Spectrum", "zxspectrum"),
|
|
UPS.ZX81: ("ZX81", "zx81"),
|
|
UPS.STEAM: ("Steam", "steam"),
|
|
}
|
|
|
|
# Map Pegasus asset keys to subdirectory names inside assets/
|
|
ASSET_DIRS: dict[str, str] = {
|
|
"box_front": "covers",
|
|
"box_back": "backcovers",
|
|
"box_full": "boxes",
|
|
"screenshot": "screenshots",
|
|
"titlescreen": "titlescreens",
|
|
"video": "videos",
|
|
"marquee": "marquees",
|
|
"cartridge": "cartridges",
|
|
"logo": "logos",
|
|
"background": "backgrounds",
|
|
"bezel": "bezels",
|
|
}
|
|
|
|
|
|
class PegasusExporter:
|
|
"""Export RomM collections to Pegasus Frontend metadata.pegasus.txt format"""
|
|
|
|
def __init__(self, local_export: bool = False):
|
|
self.local_export = local_export
|
|
|
|
@staticmethod
|
|
def _resolve_collection(platform: Platform) -> tuple[str, str]:
|
|
"""Return (collection_name, shortname) for a platform."""
|
|
if platform.slug in SLUG_TO_PEGASUS:
|
|
return SLUG_TO_PEGASUS[UPS(platform.slug)]
|
|
|
|
return (platform.custom_name or platform.name, platform.slug)
|
|
|
|
def _format_release_date(self, timestamp: int) -> str:
|
|
"""Format release date to YYYY-MM-DD format"""
|
|
return datetime.fromtimestamp(timestamp / 1000, tz=UTC).strftime("%Y-%m-%d")
|
|
|
|
def _format_rating(self, average_rating: float) -> str:
|
|
"""Format rating as percentage (0-100%). Input is on 0-10 scale."""
|
|
clamped_rating = max(0, min(100, average_rating))
|
|
return f"{int(clamped_rating)}%"
|
|
|
|
def _escape_multiline(self, text: str) -> str:
|
|
"""Indent continuation lines for multi-line values in Pegasus format"""
|
|
lines = text.strip().splitlines()
|
|
if len(lines) <= 1:
|
|
return text.strip()
|
|
return (
|
|
lines[0]
|
|
+ "\n"
|
|
+ "\n".join(f" {line}" if line.strip() else " ." for line in lines[1:])
|
|
)
|
|
|
|
def _collect_assets(self, rom: Rom) -> dict[str, Path]:
|
|
"""Collect available media assets for a ROM using model properties.
|
|
|
|
Returns a dict mapping Pegasus asset key to the absolute source file path.
|
|
"""
|
|
assets: dict[str, Path] = {}
|
|
|
|
if rom.path_cover_l:
|
|
assets["box_front"] = fs_resource_handler.validate_path(rom.path_cover_l)
|
|
|
|
if rom.path_screenshots:
|
|
assets["screenshot"] = fs_resource_handler.validate_path(
|
|
rom.path_screenshots[0]
|
|
)
|
|
|
|
if rom.path_video:
|
|
assets["video"] = fs_resource_handler.validate_path(rom.path_video)
|
|
|
|
# Extended media from screenscraper / gamelist metadata
|
|
ss = rom.ss_metadata or {}
|
|
gl = rom.gamelist_metadata or {}
|
|
|
|
extended: dict[str, list[str]] = {
|
|
"box_full": [ss.get("box3d_path", ""), gl.get("box3d_path", "")],
|
|
"box_back": [ss.get("box2d_back_path", ""), gl.get("box2d_back_path", "")],
|
|
"logo": [ss.get("logo_path", "")],
|
|
"marquee": [gl.get("marquee_path", "")],
|
|
"cartridge": [ss.get("physical_path", ""), gl.get("physical_path", "")],
|
|
"background": [ss.get("fanart_path", ""), gl.get("fanart_path", "")],
|
|
"titlescreen": [
|
|
ss.get("title_screen_path", ""),
|
|
gl.get("title_screen_path", ""),
|
|
],
|
|
"bezel": [ss.get("bezel_path", "")],
|
|
}
|
|
|
|
for pegasus_key, candidates in extended.items():
|
|
if pegasus_key in assets:
|
|
continue
|
|
for candidate in candidates:
|
|
if candidate:
|
|
assets[pegasus_key] = fs_resource_handler.validate_path(candidate)
|
|
break
|
|
|
|
return assets
|
|
|
|
def _create_game_entry(
|
|
self,
|
|
rom: Rom,
|
|
request: Request | None,
|
|
exported_assets: dict[str, str] | None = None,
|
|
) -> str:
|
|
"""Create a game entry for a ROM in Pegasus metadata format"""
|
|
lines: list[str] = []
|
|
|
|
# Game title (required)
|
|
lines.append(f"game: {rom.name or rom.fs_name}")
|
|
|
|
# File path
|
|
if self.local_export:
|
|
lines.append(f"file: {rom.fs_name}")
|
|
else:
|
|
if request is None:
|
|
raise ValueError(
|
|
"Request object must be provided for non-local exports"
|
|
)
|
|
lines.append(
|
|
f"file: {request.url_for('get_rom_content', id=rom.id, file_name=rom.fs_name)}"
|
|
)
|
|
|
|
# Sort title (use fs_name_no_tags if different from name)
|
|
if rom.name and rom.fs_name_no_tags and rom.name != rom.fs_name_no_tags:
|
|
lines.append(f"sort-by: {rom.fs_name_no_tags}")
|
|
|
|
# Developers and publishers
|
|
if rom.metadatum and rom.metadatum.companies:
|
|
if len(rom.metadatum.companies) > 0:
|
|
lines.append(f"developer: {rom.metadatum.companies[0]}")
|
|
if len(rom.metadatum.companies) > 1:
|
|
lines.append(f"publisher: {rom.metadatum.companies[1]}")
|
|
|
|
# Genres
|
|
if rom.metadatum and rom.metadatum.genres:
|
|
for genre in rom.metadatum.genres:
|
|
lines.append(f"genre: {genre}")
|
|
|
|
# Tags (rom tags like region, language info)
|
|
if rom.tags:
|
|
for tag in rom.tags:
|
|
lines.append(f"tag: {tag}")
|
|
|
|
# Player count
|
|
if rom.metadatum and rom.metadatum.player_count:
|
|
lines.append(f"players: {rom.metadatum.player_count}")
|
|
|
|
# Summary / description
|
|
if rom.summary:
|
|
lines.append(f"description: {self._escape_multiline(rom.summary)}")
|
|
|
|
# Release date
|
|
if rom.metadatum and rom.metadatum.first_release_date is not None:
|
|
lines.append(
|
|
f"release: {self._format_release_date(rom.metadatum.first_release_date)}"
|
|
)
|
|
|
|
# Rating
|
|
if rom.metadatum and rom.metadatum.average_rating is not None:
|
|
lines.append(f"rating: {self._format_rating(rom.metadatum.average_rating)}")
|
|
|
|
# Asset references (relative paths to asset files)
|
|
if exported_assets:
|
|
for asset_key, rel_path in exported_assets.items():
|
|
lines.append(f"assets.{asset_key}: {rel_path}")
|
|
|
|
# RomM-specific extensions (x-* fields)
|
|
if rom.regions:
|
|
lines.append(f"x-region: {', '.join(rom.regions)}")
|
|
|
|
if rom.languages:
|
|
lines.append(f"x-language: {', '.join(rom.languages)}")
|
|
|
|
lines.append(f"x-romm-id: {rom.id}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _copy_asset(self, source: Path, dest: Path) -> bool:
|
|
"""Place ``source`` at ``dest`` via hardlink (same filesystem) or copy
|
|
(otherwise). Returns True on success."""
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
if dest.exists():
|
|
return True
|
|
|
|
try:
|
|
link_or_copy_file(source, dest)
|
|
return True
|
|
except OSError as e:
|
|
log.warning(f"Failed to copy {source} -> {dest}: {e}")
|
|
return False
|
|
|
|
def export_platform_to_pegasus(
|
|
self, platform_id: int, request: Request | None
|
|
) -> str:
|
|
"""Export a platform's ROMs to metadata.pegasus.txt format
|
|
|
|
Args:
|
|
platform_id: Platform ID to export
|
|
request: FastAPI request object for URL generation
|
|
|
|
Returns:
|
|
String content in Pegasus metadata format
|
|
"""
|
|
platform = db_platform_handler.get_platform(platform_id)
|
|
if not platform:
|
|
raise ValueError(f"Platform with ID {platform_id} not found")
|
|
|
|
roms = db_rom_handler.get_roms_scalar(platform_ids=[platform_id])
|
|
|
|
lines: list[str] = []
|
|
|
|
# Collection header
|
|
collection_name, shortname = self._resolve_collection(platform)
|
|
lines.append(f"collection: {collection_name}")
|
|
lines.append(f"shortname: {shortname}")
|
|
lines.append("")
|
|
|
|
# Game entries
|
|
game_count = 0
|
|
for rom in roms:
|
|
if not rom.missing_from_fs:
|
|
if game_count > 0:
|
|
lines.append("")
|
|
lines.append(self._create_game_entry(rom, request=request))
|
|
game_count += 1
|
|
|
|
log.info(f"Exported {game_count} ROMs for platform {platform.name}")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
async def export_platform_to_file(
|
|
self,
|
|
platform_id: int,
|
|
request: Request | None,
|
|
) -> bool:
|
|
"""Export platform ROMs to metadata.pegasus.txt file in the platform's directory,
|
|
including media assets copied into a local assets/ folder.
|
|
|
|
Args:
|
|
platform_id: Platform ID to export
|
|
request: FastAPI request object for URL generation
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
platform = db_platform_handler.get_platform(platform_id)
|
|
if not platform:
|
|
log.error(f"Platform with ID {platform_id} not found")
|
|
return False
|
|
|
|
platform_fs_structure = fs_platform_handler.get_platform_fs_structure(
|
|
platform.fs_slug
|
|
)
|
|
platform_dir = fs_platform_handler.base_path / platform_fs_structure
|
|
|
|
roms = db_rom_handler.get_roms_scalar(platform_ids=[platform_id])
|
|
|
|
lines: list[str] = []
|
|
|
|
# Collection header
|
|
collection_name, shortname = self._resolve_collection(platform)
|
|
lines.append(f"collection: {collection_name}")
|
|
lines.append(f"shortname: {shortname}")
|
|
lines.append("")
|
|
|
|
game_count = 0
|
|
for rom in roms:
|
|
if rom.missing_from_fs:
|
|
continue
|
|
|
|
exported_assets: dict[str, str] = {}
|
|
|
|
if self.local_export:
|
|
assets = self._collect_assets(rom)
|
|
|
|
for asset_key, source_path in assets.items():
|
|
subdir = ASSET_DIRS.get(asset_key, asset_key)
|
|
dest_name = f"{rom.fs_name_no_ext}{source_path.suffix}"
|
|
dest_path = platform_dir / "assets" / subdir / dest_name
|
|
|
|
if self._copy_asset(source_path, dest_path):
|
|
exported_assets[asset_key] = f"assets/{subdir}/{dest_name}"
|
|
|
|
if game_count > 0:
|
|
lines.append("")
|
|
|
|
lines.append(
|
|
self._create_game_entry(
|
|
rom,
|
|
request=request,
|
|
exported_assets=exported_assets if exported_assets else None,
|
|
)
|
|
)
|
|
game_count += 1
|
|
|
|
content = "\n".join(lines) + "\n"
|
|
await fs_platform_handler.write_file(
|
|
content.encode("utf-8"),
|
|
platform_fs_structure,
|
|
"metadata.pegasus.txt",
|
|
)
|
|
|
|
log.info(
|
|
f"Exported metadata.pegasus.txt with {game_count} ROMs for platform {platform.name}"
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
log.error(
|
|
f"Failed to export metadata.pegasus.txt for platform {platform_id}: {e}"
|
|
)
|
|
return False
|