mirror of
https://github.com/rommapp/romm.git
synced 2026-06-30 15:55:54 +00:00
- Introduced a new fixture `multi_file_rom` in `conftest.py` to create a ROM with multiple files for testing purposes. - Updated `test_manual.py` and `test_soundtrack.py` to utilize the new fixture for testing manual uploads and soundtrack functionalities. - Enhanced the audio tag extraction logic in `audio_tags.py` to handle oversized audio files and added a function to check allowed audio file extensions. - Modified `nginx.py` to support inline file serving for audio files, allowing for better streaming capabilities. - Improved error handling in the Vue components for soundtrack management, including user feedback for playback errors and metadata loading issues. - Refactored the soundtrack player store to use local storage for volume and mute settings, simplifying state management. - Added new localization strings for soundtrack player actions and error messages in both English (US and GB) locale files.
1533 lines
49 KiB
Python
1533 lines
49 KiB
Python
import functools
|
|
from collections.abc import Iterable, Sequence
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import (
|
|
Integer,
|
|
Row,
|
|
String,
|
|
Text,
|
|
and_,
|
|
case,
|
|
cast,
|
|
delete,
|
|
false,
|
|
func,
|
|
literal,
|
|
not_,
|
|
or_,
|
|
select,
|
|
text,
|
|
update,
|
|
)
|
|
from sqlalchemy.orm import (
|
|
Query,
|
|
QueryableAttribute,
|
|
Session,
|
|
joinedload,
|
|
load_only,
|
|
noload,
|
|
selectinload,
|
|
)
|
|
from sqlalchemy.sql.elements import ColumnElement
|
|
from sqlalchemy.sql.selectable import Select
|
|
|
|
from config import ROMM_DB_DRIVER
|
|
from decorators.database import begin_session
|
|
from handler.metadata.base_handler import UniversalPlatformSlug as UPS
|
|
from models.assets import Save, Screenshot, State
|
|
from models.platform import Platform
|
|
from models.rom import Rom, RomFile, RomFileCategory, RomMetadata, RomNote, RomUser
|
|
from utils.database import (
|
|
json_array_contains_all,
|
|
json_array_contains_any,
|
|
json_array_contains_value,
|
|
)
|
|
|
|
from .base_handler import DBBaseHandler
|
|
|
|
EJS_SUPPORTED_PLATFORMS = [
|
|
UPS._3DO,
|
|
UPS.AMIGA,
|
|
UPS.AMIGA_CD,
|
|
UPS.AMIGA_CD32,
|
|
UPS.ARCADE,
|
|
UPS.NEOGEOAES,
|
|
UPS.NEOGEOMVS,
|
|
UPS.ATARI2600,
|
|
UPS.ATARI5200,
|
|
UPS.ATARI7800,
|
|
UPS.C_PLUS_4,
|
|
UPS.CPET,
|
|
UPS.C64,
|
|
UPS.C128,
|
|
UPS.COLECOVISION,
|
|
UPS.JAGUAR,
|
|
UPS.LYNX,
|
|
UPS.DOS,
|
|
UPS.NEO_GEO_POCKET,
|
|
UPS.NEO_GEO_POCKET_COLOR,
|
|
UPS.NES,
|
|
UPS.FAMICOM,
|
|
UPS.FDS,
|
|
UPS.N64,
|
|
UPS.N64DD,
|
|
UPS.NDS,
|
|
UPS.NINTENDO_DSI,
|
|
UPS.GB,
|
|
UPS.GBA,
|
|
UPS.GBC,
|
|
UPS.PC_FX,
|
|
UPS.PHILIPS_CD_I,
|
|
UPS.PSX,
|
|
UPS.PSP,
|
|
UPS.SEGACD,
|
|
UPS.SEGA32,
|
|
UPS.GENESIS,
|
|
UPS.SMS,
|
|
UPS.GAMEGEAR,
|
|
UPS.SATURN,
|
|
UPS.SNES,
|
|
UPS.SFAM,
|
|
UPS.TG16,
|
|
UPS.VIC_20,
|
|
UPS.VIRTUALBOY,
|
|
UPS.WONDERSWAN,
|
|
UPS.WONDERSWAN_COLOR,
|
|
]
|
|
|
|
RUFFLE_SUPPORTED_PLATFORMS = [
|
|
UPS.BROWSER,
|
|
]
|
|
|
|
STRIP_ARTICLES_REGEX = r"^(the|a|an)\s+"
|
|
|
|
|
|
def _create_metadata_id_case(
|
|
prefix: str,
|
|
id_column: ColumnElement,
|
|
platform_id_column: ColumnElement,
|
|
):
|
|
return case(
|
|
(
|
|
id_column.isnot(None),
|
|
func.concat(
|
|
f"{prefix}-",
|
|
platform_id_column,
|
|
"-",
|
|
id_column,
|
|
),
|
|
),
|
|
else_=None,
|
|
)
|
|
|
|
|
|
def with_details(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
kwargs["query"] = select(Rom).options(
|
|
# Ensure platform is loaded for main ROM objects
|
|
selectinload(Rom.platform),
|
|
selectinload(Rom.saves).options(
|
|
noload(Save.rom),
|
|
noload(Save.user),
|
|
),
|
|
selectinload(Rom.states).options(
|
|
noload(State.rom),
|
|
noload(State.user),
|
|
),
|
|
selectinload(Rom.screenshots).options(
|
|
noload(Screenshot.rom),
|
|
),
|
|
selectinload(Rom.rom_users).options(noload(RomUser.rom)),
|
|
selectinload(Rom.metadatum).options(noload(RomMetadata.rom)),
|
|
selectinload(Rom.files).options(
|
|
joinedload(RomFile.rom).load_only(Rom.fs_path, Rom.fs_name)
|
|
),
|
|
selectinload(Rom.sibling_roms).options(
|
|
noload(Rom.platform), noload(Rom.metadatum)
|
|
),
|
|
selectinload(Rom.collections),
|
|
selectinload(Rom.notes),
|
|
)
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class DBRomsHandler(DBBaseHandler):
|
|
@begin_session
|
|
@with_details
|
|
def add_rom(
|
|
self,
|
|
rom: Rom,
|
|
query: Query = None, # type: ignore
|
|
session: Session = None, # type: ignore
|
|
) -> Rom:
|
|
rom = session.merge(rom)
|
|
session.flush()
|
|
|
|
return session.scalar(query.filter_by(id=rom.id).limit(1))
|
|
|
|
@begin_session
|
|
@with_details
|
|
def get_rom(
|
|
self,
|
|
id: int,
|
|
*,
|
|
query: Query = None, # type: ignore
|
|
session: Session = None, # type: ignore
|
|
) -> Rom | None:
|
|
return session.scalar(query.filter_by(id=id).limit(1))
|
|
|
|
@begin_session
|
|
@with_details
|
|
def get_roms_by_ids(
|
|
self,
|
|
ids: list[int],
|
|
*,
|
|
query: Query = None, # type: ignore
|
|
session: Session = None, # type: ignore
|
|
) -> Sequence[Rom]:
|
|
"""Get multiple ROMs by their IDs."""
|
|
if not ids:
|
|
return []
|
|
return session.scalars(query.filter(Rom.id.in_(ids))).all()
|
|
|
|
def filter_by_platform_id(self, query: Query, platform_id: int):
|
|
return query.filter(Rom.platform_id == platform_id)
|
|
|
|
def _filter_by_platform_ids(
|
|
self, query: Query, platform_ids: Sequence[int]
|
|
) -> Query:
|
|
return query.filter(Rom.platform_id.in_(platform_ids))
|
|
|
|
def _filter_by_collection_id(
|
|
self, query: Query, session: Session, collection_id: int
|
|
):
|
|
from . import db_collection_handler
|
|
|
|
collection = db_collection_handler.get_collection(collection_id)
|
|
|
|
if collection:
|
|
return query.filter(Rom.id.in_(collection.rom_ids))
|
|
return query
|
|
|
|
def _filter_by_virtual_collection_id(
|
|
self, query: Query, session: Session, virtual_collection_id: str
|
|
):
|
|
from . import db_collection_handler
|
|
|
|
v_collection = db_collection_handler.get_virtual_collection(
|
|
virtual_collection_id
|
|
)
|
|
|
|
if v_collection:
|
|
return query.filter(Rom.id.in_(v_collection.rom_ids))
|
|
return query
|
|
|
|
def _filter_by_smart_collection_id(
|
|
self, query: Query, session: Session, smart_collection_id: int, user_id: int
|
|
):
|
|
from . import db_collection_handler
|
|
|
|
smart_collection = db_collection_handler.get_smart_collection(
|
|
smart_collection_id
|
|
)
|
|
|
|
if smart_collection:
|
|
# Ensure the latest ROMs are loaded
|
|
smart_collection = smart_collection.update_properties(user_id)
|
|
return query.filter(Rom.id.in_(smart_collection.rom_ids))
|
|
return query
|
|
|
|
def _filter_by_search_term(self, query: Query, search_term: str):
|
|
terms = [term.strip() for term in search_term.split("|")]
|
|
conditions = [
|
|
condition
|
|
for term in terms
|
|
for condition in (
|
|
Rom.fs_name.ilike(f"%{term}%"),
|
|
Rom.name.ilike(f"%{term}%"),
|
|
)
|
|
if term
|
|
]
|
|
|
|
return query.filter(or_(*conditions))
|
|
|
|
def _filter_by_matched(self, query: Query, value: bool) -> Query:
|
|
"""Filter based on whether the rom is matched to a metadata provider.
|
|
|
|
Args:
|
|
value: True for matched ROMs, False for unmatched ROMs
|
|
"""
|
|
predicate = or_(
|
|
Rom.igdb_id.isnot(None),
|
|
Rom.moby_id.isnot(None),
|
|
Rom.ss_id.isnot(None),
|
|
Rom.ra_id.isnot(None),
|
|
Rom.launchbox_id.isnot(None),
|
|
Rom.hasheous_id.isnot(None),
|
|
Rom.tgdb_id.isnot(None),
|
|
Rom.flashpoint_id.isnot(None),
|
|
)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
def _filter_by_favorite(
|
|
self, query: Query, session: Session, value: bool, user_id: int | None
|
|
) -> Query:
|
|
"""Filter based on whether the rom is in the user's favorites collection."""
|
|
if not user_id:
|
|
return query
|
|
|
|
from . import db_collection_handler
|
|
|
|
favorites_collection = db_collection_handler.get_favorite_collection(user_id)
|
|
if favorites_collection:
|
|
predicate = Rom.id.in_(favorites_collection.rom_ids)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
# If no favorites collection exists, return the original query if non-favorites
|
|
# were requested, or an empty query if favorites were requested.
|
|
if not value:
|
|
return query
|
|
return query.filter(false())
|
|
|
|
def _filter_by_duplicate(self, query: Query, value: bool) -> Query:
|
|
"""Filter based on whether the rom has duplicates."""
|
|
predicate = Rom.sibling_roms.any()
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
def _filter_by_playable(self, query: Query, value: bool) -> Query:
|
|
"""Filter based on whether the rom is playable on supported platforms."""
|
|
predicate = or_(
|
|
Platform.slug.in_(EJS_SUPPORTED_PLATFORMS),
|
|
Platform.slug.in_(RUFFLE_SUPPORTED_PLATFORMS),
|
|
)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.join(Platform).filter(predicate)
|
|
|
|
def _filter_by_last_played(
|
|
self, query: Query, value: bool, user_id: int | None = None
|
|
) -> Query:
|
|
"""Filter based on whether the rom has a last played value for the user."""
|
|
if not user_id:
|
|
return query
|
|
|
|
has_last_played = (
|
|
RomUser.last_played.is_(None)
|
|
if not value
|
|
else RomUser.last_played.isnot(None)
|
|
)
|
|
return query.filter(has_last_played)
|
|
|
|
def _filter_by_has_ra(self, query: Query, value: bool) -> Query:
|
|
predicate = Rom.ra_id.isnot(None)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
def _filter_by_missing_from_fs(self, query: Query, value: bool) -> Query:
|
|
predicate = Rom.missing_from_fs.isnot(False)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
def _filter_by_verified(self, query: Query, value: bool) -> Query:
|
|
keys_to_check = [
|
|
"tosec_match",
|
|
"mame_arcade_match",
|
|
"mame_mess_match",
|
|
"nointro_match",
|
|
"redump_match",
|
|
"whdload_match",
|
|
"ra_match",
|
|
"fbneo_match",
|
|
"puredos_match",
|
|
]
|
|
|
|
if ROMM_DB_DRIVER == "postgresql":
|
|
conditions = " OR ".join(
|
|
f"(hasheous_metadata->>'{key}')::boolean" for key in keys_to_check
|
|
)
|
|
predicate = text(f"({conditions})")
|
|
if not value:
|
|
predicate = text(f"NOT ({conditions})")
|
|
return query.filter(predicate)
|
|
else:
|
|
predicate = or_(
|
|
*(Rom.hasheous_metadata[key].as_boolean() for key in keys_to_check)
|
|
)
|
|
if not value:
|
|
predicate = not_(predicate)
|
|
return query.filter(predicate)
|
|
|
|
def _filter_by_genres(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(RomMetadata.genres, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_franchises(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(RomMetadata.franchises, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_collections(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(RomMetadata.collections, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_companies(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(RomMetadata.companies, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_age_ratings(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(RomMetadata.age_ratings, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_status(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
):
|
|
if not values:
|
|
return query
|
|
|
|
status_filters = []
|
|
for selected_status in values:
|
|
if selected_status == "now_playing":
|
|
status_filters.append(RomUser.now_playing.is_(True))
|
|
elif selected_status == "backlogged":
|
|
status_filters.append(RomUser.backlogged.is_(True))
|
|
elif selected_status == "hidden":
|
|
status_filters.append(RomUser.hidden.is_(True))
|
|
else:
|
|
status_filters.append(RomUser.status == selected_status)
|
|
|
|
comb = and_ if match_all else or_
|
|
condition = comb(*status_filters)
|
|
|
|
# Apply negation if match_none, otherwise apply condition
|
|
query = query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
# Don't apply the hidden filter is hidden is set
|
|
if "hidden" in values:
|
|
return query
|
|
|
|
return query.filter(or_(RomUser.hidden.is_(False), RomUser.hidden.is_(None)))
|
|
|
|
def _filter_by_regions(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(Rom.regions, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_languages(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
op = json_array_contains_all if match_all else json_array_contains_any
|
|
condition = op(Rom.languages, values, session=session)
|
|
return query.filter(~condition) if match_none else query.filter(condition)
|
|
|
|
def _filter_by_player_counts(
|
|
self,
|
|
query: Query,
|
|
*,
|
|
session: Session,
|
|
values: Sequence[str],
|
|
match_all: bool = False,
|
|
match_none: bool = False,
|
|
) -> Query:
|
|
condition = RomMetadata.player_count.in_(values)
|
|
if match_none:
|
|
return query.filter(not_(condition))
|
|
return query.filter(condition)
|
|
|
|
@begin_session
|
|
def filter_roms(
|
|
self,
|
|
query: Query,
|
|
platform_ids: Sequence[int] | None = None,
|
|
collection_id: int | None = None,
|
|
virtual_collection_id: str | None = None,
|
|
smart_collection_id: int | None = None,
|
|
search_term: str | None = None,
|
|
matched: bool | None = None,
|
|
favorite: bool | None = None,
|
|
duplicate: bool | None = None,
|
|
last_played: bool | None = None,
|
|
playable: bool | None = None,
|
|
has_ra: bool | None = None,
|
|
missing: bool | None = None,
|
|
verified: bool | None = None,
|
|
group_by_meta_id: bool = False,
|
|
genres: Sequence[str] | None = None,
|
|
franchises: Sequence[str] | None = None,
|
|
collections: Sequence[str] | None = None,
|
|
companies: Sequence[str] | None = None,
|
|
age_ratings: Sequence[str] | None = None,
|
|
statuses: Sequence[str] | None = None,
|
|
regions: Sequence[str] | None = None,
|
|
languages: Sequence[str] | None = None,
|
|
player_counts: Sequence[str] | None = None,
|
|
# Logic operators for multi-value filters
|
|
genres_logic: str = "any",
|
|
franchises_logic: str = "any",
|
|
collections_logic: str = "any",
|
|
companies_logic: str = "any",
|
|
age_ratings_logic: str = "any",
|
|
regions_logic: str = "any",
|
|
languages_logic: str = "any",
|
|
statuses_logic: str = "any",
|
|
player_counts_logic: str = "any",
|
|
user_id: int | None = None,
|
|
updated_after: datetime | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> Query[Rom]:
|
|
from handler.scan_handler import MetadataSource
|
|
|
|
query = query.options(
|
|
# Ensure platform is loaded for main ROM objects
|
|
selectinload(Rom.platform),
|
|
# Display properties for the current user (last_played)
|
|
selectinload(Rom.rom_users).options(noload(RomUser.rom)),
|
|
# Sort table by metadata (first_release_date)
|
|
selectinload(Rom.metadatum).options(noload(RomMetadata.rom)),
|
|
# Required for multi-file ROM actions and 3DS QR code
|
|
selectinload(Rom.files).options(
|
|
joinedload(RomFile.rom).load_only(Rom.fs_path, Rom.fs_name)
|
|
),
|
|
# Show sibling rom badges on cards
|
|
selectinload(Rom.sibling_roms).options(
|
|
noload(Rom.platform), noload(Rom.metadatum)
|
|
),
|
|
# Show notes indicator on cards
|
|
selectinload(Rom.notes),
|
|
)
|
|
|
|
# Handle platform filtering - platform filtering always uses OR logic since ROMs belong to only one platform
|
|
if platform_ids:
|
|
query = self._filter_by_platform_ids(query, platform_ids)
|
|
|
|
if collection_id:
|
|
query = self._filter_by_collection_id(query, session, collection_id)
|
|
|
|
if virtual_collection_id:
|
|
query = self._filter_by_virtual_collection_id(
|
|
query, session, virtual_collection_id
|
|
)
|
|
|
|
if smart_collection_id and user_id:
|
|
query = self._filter_by_smart_collection_id(
|
|
query, session, smart_collection_id, user_id
|
|
)
|
|
|
|
if search_term:
|
|
query = self._filter_by_search_term(query, search_term)
|
|
|
|
if matched is not None:
|
|
query = self._filter_by_matched(query, value=matched)
|
|
|
|
if favorite is not None:
|
|
query = self._filter_by_favorite(
|
|
query, session=session, value=favorite, user_id=user_id
|
|
)
|
|
|
|
if duplicate is not None:
|
|
query = self._filter_by_duplicate(query, value=duplicate)
|
|
|
|
if last_played is not None:
|
|
query = self._filter_by_last_played(
|
|
query, value=last_played, user_id=user_id
|
|
)
|
|
|
|
if playable is not None:
|
|
query = self._filter_by_playable(query, value=playable)
|
|
|
|
if has_ra is not None:
|
|
query = self._filter_by_has_ra(query, value=has_ra)
|
|
|
|
if missing is not None:
|
|
query = self._filter_by_missing_from_fs(query, value=missing)
|
|
|
|
if verified is not None:
|
|
query = self._filter_by_verified(query, value=verified)
|
|
|
|
if updated_after:
|
|
query = query.filter(Rom.updated_at > updated_after)
|
|
|
|
# BEWARE YE WHO ENTERS HERE 💀
|
|
if group_by_meta_id:
|
|
# Convert NULL is_main_sibling to 0 (false) so it sorts after true values
|
|
is_main_sibling_order = (
|
|
func.coalesce(cast(RomUser.is_main_sibling, Integer), 0).desc()
|
|
if user_id
|
|
else literal(1)
|
|
)
|
|
|
|
# Create a subquery that identifies the primary ROM in each group
|
|
# Priority order: is_main_sibling (desc), then by fs_name_no_ext (asc)
|
|
base_subquery = query.subquery()
|
|
group_subquery = (
|
|
select(base_subquery.c.id)
|
|
.select_from(base_subquery)
|
|
.with_only_columns(
|
|
base_subquery.c.id,
|
|
base_subquery.c.fs_name_no_ext,
|
|
base_subquery.c.platform_id,
|
|
base_subquery.c.igdb_id,
|
|
base_subquery.c.ss_id,
|
|
base_subquery.c.moby_id,
|
|
base_subquery.c.ra_id,
|
|
base_subquery.c.hasheous_id,
|
|
base_subquery.c.launchbox_id,
|
|
base_subquery.c.tgdb_id,
|
|
base_subquery.c.flashpoint_id,
|
|
)
|
|
.outerjoin(
|
|
RomUser,
|
|
and_(
|
|
base_subquery.c.id == RomUser.rom_id, RomUser.user_id == user_id
|
|
),
|
|
)
|
|
.add_columns(
|
|
func.row_number()
|
|
.over(
|
|
partition_by=func.coalesce(
|
|
_create_metadata_id_case(
|
|
MetadataSource.IGDB,
|
|
base_subquery.c.igdb_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.SS,
|
|
base_subquery.c.ss_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.MOBY,
|
|
base_subquery.c.moby_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.RA,
|
|
base_subquery.c.ra_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.HASHEOUS,
|
|
base_subquery.c.hasheous_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.LAUNCHBOX,
|
|
base_subquery.c.launchbox_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.TGDB,
|
|
base_subquery.c.tgdb_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
MetadataSource.FLASHPOINT,
|
|
base_subquery.c.flashpoint_id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
_create_metadata_id_case(
|
|
"romm",
|
|
base_subquery.c.id,
|
|
base_subquery.c.platform_id,
|
|
),
|
|
),
|
|
order_by=[
|
|
is_main_sibling_order,
|
|
base_subquery.c.fs_name_no_ext.asc(),
|
|
],
|
|
)
|
|
.label("row_num"),
|
|
)
|
|
.subquery()
|
|
)
|
|
|
|
# Add a filter to the original query to only include the primary ROM from each group
|
|
query = query.filter(
|
|
Rom.id.in_(
|
|
session.query(group_subquery.c.id).filter(
|
|
group_subquery.c.row_num == 1
|
|
)
|
|
)
|
|
)
|
|
|
|
# Optimize JOINs - only join tables when needed
|
|
needs_metadata_join = any(
|
|
[genres, franchises, collections, companies, age_ratings, player_counts]
|
|
)
|
|
|
|
if needs_metadata_join:
|
|
query = query.outerjoin(RomMetadata)
|
|
|
|
# Apply metadata and rom-level filters efficiently
|
|
filters_to_apply = [
|
|
(genres, genres_logic, self._filter_by_genres),
|
|
(franchises, franchises_logic, self._filter_by_franchises),
|
|
(collections, collections_logic, self._filter_by_collections),
|
|
(companies, companies_logic, self._filter_by_companies),
|
|
(age_ratings, age_ratings_logic, self._filter_by_age_ratings),
|
|
(regions, regions_logic, self._filter_by_regions),
|
|
(languages, languages_logic, self._filter_by_languages),
|
|
(player_counts, player_counts_logic, self._filter_by_player_counts),
|
|
]
|
|
|
|
for values, logic, filter_func in filters_to_apply:
|
|
if values:
|
|
query = filter_func(
|
|
query,
|
|
session=session,
|
|
values=values,
|
|
match_all=(logic == "all"),
|
|
match_none=(logic == "none"),
|
|
)
|
|
|
|
# The RomUser table is already joined if user_id is set
|
|
if statuses and user_id:
|
|
query = self._filter_by_status(
|
|
query,
|
|
session=session,
|
|
values=statuses,
|
|
match_all=(statuses_logic == "all"),
|
|
match_none=(statuses_logic == "none"),
|
|
)
|
|
elif user_id:
|
|
query = query.filter(
|
|
or_(RomUser.hidden.is_(False), RomUser.hidden.is_(None))
|
|
)
|
|
|
|
return query
|
|
|
|
@begin_session
|
|
def get_roms_query(
|
|
self,
|
|
*,
|
|
order_by: str = "name",
|
|
order_dir: str = "asc",
|
|
user_id: int | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> tuple[Query[Rom], Any]:
|
|
query = select(Rom)
|
|
|
|
if user_id:
|
|
query = query.outerjoin(
|
|
RomUser, and_(RomUser.rom_id == Rom.id, RomUser.user_id == user_id)
|
|
)
|
|
|
|
if user_id and hasattr(RomUser, order_by) and not hasattr(Rom, order_by):
|
|
order_attr = getattr(RomUser, order_by)
|
|
query = query.filter(RomUser.user_id == user_id)
|
|
elif hasattr(RomMetadata, order_by) and not hasattr(Rom, order_by):
|
|
order_attr = getattr(RomMetadata, order_by)
|
|
query = query.outerjoin(RomMetadata, RomMetadata.rom_id == Rom.id)
|
|
elif hasattr(Rom, order_by):
|
|
order_attr = getattr(Rom, order_by)
|
|
else:
|
|
order_attr = Rom.name
|
|
|
|
order_attr_column = order_attr
|
|
|
|
# Ignore case when the order attribute is a number
|
|
if isinstance(order_attr.type, (String, Text)):
|
|
# Remove any leading articles
|
|
order_attr = func.trim(
|
|
func.lower(order_attr).regexp_replace(STRIP_ARTICLES_REGEX, "")
|
|
)
|
|
|
|
# Pad numbers with leading zeros to ensure natural sorting
|
|
order_attr = order_attr.regexp_replace(
|
|
r"(\d+)", r"00000000000\1"
|
|
).regexp_replace(r"0*(\d{12})", r"\1")
|
|
|
|
if order_dir.lower() == "desc":
|
|
order_attr = order_attr.desc()
|
|
else:
|
|
order_attr = order_attr.asc()
|
|
|
|
return query.order_by(order_attr), order_attr_column # type: ignore
|
|
|
|
@begin_session
|
|
def get_roms_scalar(
|
|
self,
|
|
*,
|
|
only_fields: Sequence[QueryableAttribute] | None = None,
|
|
session: Session = None, # type: ignore
|
|
**kwargs,
|
|
) -> Sequence[Rom]:
|
|
query, _ = self.get_roms_query(
|
|
order_by=kwargs.get("order_by", "name"),
|
|
order_dir=kwargs.get("order_dir", "asc"),
|
|
user_id=kwargs.get("user_id", None),
|
|
)
|
|
|
|
if only_fields:
|
|
query = query.options(load_only(*only_fields))
|
|
|
|
roms = self.filter_roms(
|
|
query=query,
|
|
platform_ids=kwargs.get("platform_ids", None),
|
|
collection_id=kwargs.get("collection_id", None),
|
|
virtual_collection_id=kwargs.get("virtual_collection_id", None),
|
|
search_term=kwargs.get("search_term", None),
|
|
matched=kwargs.get("matched", None),
|
|
favorite=kwargs.get("favorite", None),
|
|
duplicate=kwargs.get("duplicate", None),
|
|
last_played=kwargs.get("last_played", None),
|
|
playable=kwargs.get("playable", None),
|
|
has_ra=kwargs.get("has_ra", None),
|
|
missing=kwargs.get("missing", None),
|
|
verified=kwargs.get("verified", None),
|
|
genres=kwargs.get("genres", None),
|
|
franchises=kwargs.get("franchises", None),
|
|
collections=kwargs.get("collections", None),
|
|
companies=kwargs.get("companies", None),
|
|
age_ratings=kwargs.get("age_ratings", None),
|
|
statuses=kwargs.get("statuses", None),
|
|
regions=kwargs.get("regions", None),
|
|
languages=kwargs.get("languages", None),
|
|
player_counts=kwargs.get("player_counts", None),
|
|
# Logic operators for multi-value filters
|
|
genres_logic=kwargs.get("genres_logic", "any"),
|
|
franchises_logic=kwargs.get("franchises_logic", "any"),
|
|
collections_logic=kwargs.get("collections_logic", "any"),
|
|
companies_logic=kwargs.get("companies_logic", "any"),
|
|
age_ratings_logic=kwargs.get("age_ratings_logic", "any"),
|
|
regions_logic=kwargs.get("regions_logic", "any"),
|
|
languages_logic=kwargs.get("languages_logic", "any"),
|
|
statuses_logic=kwargs.get("statuses_logic", "any"),
|
|
player_counts_logic=kwargs.get("player_counts_logic", "any"),
|
|
user_id=kwargs.get("user_id", None),
|
|
group_by_meta_id=kwargs.get("group_by_meta_id", False),
|
|
)
|
|
return session.scalars(roms).all()
|
|
|
|
@begin_session
|
|
def with_char_index(
|
|
self,
|
|
query: Query,
|
|
order_by_attr: Any,
|
|
session: Session = None, # type: ignore
|
|
) -> list[Row[tuple[str, int]]]:
|
|
if isinstance(order_by_attr.type, (String, Text)):
|
|
# Remove any leading articles
|
|
order_by_attr = func.trim(
|
|
func.lower(order_by_attr).regexp_replace(STRIP_ARTICLES_REGEX, "")
|
|
)
|
|
else:
|
|
order_by_attr = func.trim(
|
|
func.lower(Rom.name).regexp_replace(STRIP_ARTICLES_REGEX, "")
|
|
)
|
|
|
|
# Pad numbers with leading zeros to ensure natural sorting
|
|
order_by_attr = order_by_attr.regexp_replace(
|
|
r"(\d+)", r"00000000000\1"
|
|
).regexp_replace(r"0*(\d{12})", r"\1")
|
|
|
|
# Get the row number and first letter for each item
|
|
subquery = (
|
|
query.with_only_columns(Rom.id, Rom.name) # type: ignore
|
|
.add_columns( # type: ignore
|
|
func.substring(
|
|
order_by_attr,
|
|
1,
|
|
1,
|
|
).label("letter"),
|
|
func.row_number().over(order_by=order_by_attr).label("position"),
|
|
)
|
|
.subquery()
|
|
)
|
|
|
|
# Get the minimum position for each letter
|
|
return (
|
|
session.query(
|
|
subquery.c.letter, func.min(subquery.c.position - 1).label("position")
|
|
)
|
|
.group_by(subquery.c.letter)
|
|
.order_by(subquery.c.letter)
|
|
.all()
|
|
)
|
|
|
|
@begin_session
|
|
def get_roms_by_fs_name(
|
|
self,
|
|
platform_id: int,
|
|
fs_names: Iterable[str],
|
|
session: Session = None, # type: ignore
|
|
) -> dict[str, Rom]:
|
|
"""Retrieve a dictionary of roms by their filesystem names.
|
|
|
|
Eager-loads only `platform` (used downstream by the scan loop via
|
|
`rom.platform_slug` / `rom.platform.fs_slug`) — deliberately
|
|
avoiding `with_details`, whose full relationship eager-load is
|
|
wasted work for the scan-skip decision on large platforms.
|
|
"""
|
|
roms = (
|
|
session.scalars(
|
|
select(Rom)
|
|
.options(
|
|
selectinload(Rom.platform),
|
|
selectinload(Rom.files).options(
|
|
joinedload(RomFile.rom).load_only(Rom.fs_path, Rom.fs_name)
|
|
),
|
|
)
|
|
.where(
|
|
and_(
|
|
Rom.platform_id == platform_id,
|
|
Rom.fs_name.in_(fs_names),
|
|
)
|
|
)
|
|
)
|
|
.unique()
|
|
.all()
|
|
)
|
|
|
|
return {rom.fs_name: rom for rom in roms}
|
|
|
|
@begin_session
|
|
def update_rom(
|
|
self,
|
|
id: int,
|
|
data: dict,
|
|
session: Session = None, # type: ignore
|
|
) -> Rom:
|
|
session.execute(
|
|
update(Rom)
|
|
.where(Rom.id == id)
|
|
.values(**data)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
return session.query(Rom).filter_by(id=id).one()
|
|
|
|
@begin_session
|
|
def delete_rom(
|
|
self,
|
|
id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> None:
|
|
session.execute(
|
|
delete(Rom)
|
|
.where(Rom.id == id)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
@begin_session
|
|
def bulk_mark_present(
|
|
self,
|
|
platform_id: int,
|
|
rom_ids: list[int],
|
|
session: Session = None, # type: ignore
|
|
) -> None:
|
|
"""Bulk set missing_from_fs=False for a list of ROM IDs."""
|
|
if not rom_ids:
|
|
return
|
|
|
|
for i in range(0, len(rom_ids), 1000):
|
|
chunk = rom_ids[i : i + 1000]
|
|
session.execute(
|
|
update(Rom)
|
|
.where(
|
|
and_(
|
|
Rom.platform_id == platform_id,
|
|
Rom.id.in_(chunk),
|
|
)
|
|
)
|
|
.values(missing_from_fs=False)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
@begin_session
|
|
def mark_missing_roms(
|
|
self,
|
|
platform_id: int,
|
|
fs_roms_to_keep: list[str],
|
|
session: Session = None, # type: ignore
|
|
) -> Sequence[Rom]:
|
|
"""Sync `missing_from_fs` for a platform against the keep-list.
|
|
|
|
Reads the rows once and writes only those whose state actually
|
|
changes, so a re-scan of an unchanged platform issues no updates.
|
|
"""
|
|
keep_set = set(fs_roms_to_keep)
|
|
rows = session.execute(
|
|
select(Rom.id, Rom.fs_name, Rom.missing_from_fs).where(
|
|
Rom.platform_id == platform_id
|
|
)
|
|
).all()
|
|
|
|
flips: dict[bool, list[int]] = {True: [], False: []}
|
|
for rom_id, fs_name, was_missing in rows:
|
|
is_missing = fs_name not in keep_set
|
|
if is_missing != was_missing:
|
|
flips[is_missing].append(rom_id)
|
|
|
|
for desired, ids in flips.items():
|
|
for i in range(0, len(ids), 1000):
|
|
session.execute(
|
|
update(Rom)
|
|
.where(Rom.id.in_(ids[i : i + 1000]))
|
|
.values(missing_from_fs=desired)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
return (
|
|
session.scalars(
|
|
select(Rom)
|
|
.options(load_only(Rom.id, Rom.fs_name))
|
|
.where(
|
|
and_(
|
|
Rom.platform_id == platform_id,
|
|
Rom.missing_from_fs.is_(True),
|
|
)
|
|
)
|
|
.order_by(Rom.fs_name.asc())
|
|
)
|
|
.unique()
|
|
.all()
|
|
)
|
|
|
|
@begin_session
|
|
def add_rom_user(
|
|
self,
|
|
rom_id: int,
|
|
user_id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> RomUser:
|
|
rom_user = session.merge(RomUser(rom_id=rom_id, user_id=user_id))
|
|
session.flush()
|
|
return rom_user
|
|
|
|
@begin_session
|
|
def get_rom_user(
|
|
self,
|
|
rom_id: int,
|
|
user_id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> RomUser | None:
|
|
return session.scalar(
|
|
select(RomUser).filter_by(rom_id=rom_id, user_id=user_id).limit(1)
|
|
)
|
|
|
|
@begin_session
|
|
def get_rom_user_by_id(
|
|
self,
|
|
id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> RomUser | None:
|
|
return session.scalar(select(RomUser).filter_by(id=id).limit(1))
|
|
|
|
@begin_session
|
|
def update_rom_user(
|
|
self,
|
|
id: int,
|
|
data: dict,
|
|
session: Session = None, # type: ignore
|
|
) -> RomUser | None:
|
|
session.execute(
|
|
update(RomUser)
|
|
.where(RomUser.id == id)
|
|
.values(**data)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
rom_user = session.query(RomUser).filter_by(id=id).one_or_none()
|
|
if not rom_user:
|
|
return None
|
|
|
|
if not data.get("is_main_sibling", False):
|
|
return rom_user
|
|
|
|
rom = self.get_rom(rom_user.rom_id)
|
|
if not rom:
|
|
return rom_user
|
|
|
|
session.execute(
|
|
update(RomUser)
|
|
.where(
|
|
and_(
|
|
RomUser.rom_id.in_(r.id for r in rom.sibling_roms),
|
|
RomUser.user_id == rom_user.user_id,
|
|
)
|
|
)
|
|
.values(is_main_sibling=False)
|
|
)
|
|
|
|
return rom_user
|
|
|
|
@begin_session
|
|
def add_rom_file(
|
|
self,
|
|
rom_file: RomFile,
|
|
session: Session = None, # type: ignore
|
|
) -> RomFile:
|
|
merged = session.merge(rom_file)
|
|
session.flush()
|
|
return merged
|
|
|
|
@begin_session
|
|
def get_rom_file_by_id(
|
|
self,
|
|
id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> RomFile | None:
|
|
return session.scalar(select(RomFile).filter_by(id=id).limit(1))
|
|
|
|
@begin_session
|
|
def get_rom_file_by_path(
|
|
self,
|
|
rom_id: int,
|
|
file_path: str,
|
|
file_name: str,
|
|
session: Session = None, # type: ignore
|
|
) -> RomFile | None:
|
|
return session.scalar(
|
|
select(RomFile)
|
|
.filter_by(rom_id=rom_id, file_path=file_path, file_name=file_name)
|
|
.limit(1)
|
|
)
|
|
|
|
@begin_session
|
|
def get_rom_files_by_category(
|
|
self,
|
|
rom_id: int,
|
|
category: RomFileCategory,
|
|
session: Session = None, # type: ignore
|
|
) -> Sequence[RomFile]:
|
|
"""Return the ROM's files for a single category, ordered by file_name."""
|
|
return (
|
|
session.scalars(
|
|
select(RomFile)
|
|
.filter_by(rom_id=rom_id, category=category)
|
|
.order_by(RomFile.file_name.asc())
|
|
)
|
|
.unique()
|
|
.all()
|
|
)
|
|
|
|
@begin_session
|
|
def update_rom_file(
|
|
self,
|
|
id: int,
|
|
data: dict,
|
|
session: Session = None, # type: ignore
|
|
) -> RomFile | None:
|
|
session.execute(
|
|
update(RomFile)
|
|
.where(RomFile.id == id)
|
|
.values(**data)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
return session.query(RomFile).filter_by(id=id).one_or_none()
|
|
|
|
@begin_session
|
|
def purge_rom_files(
|
|
self,
|
|
rom_id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> Sequence[RomFile]:
|
|
purged_rom_files = (
|
|
session.scalars(select(RomFile).filter_by(rom_id=rom_id)).unique().all()
|
|
)
|
|
session.execute(
|
|
delete(RomFile)
|
|
.where(RomFile.rom_id == rom_id)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
return purged_rom_files
|
|
|
|
@begin_session
|
|
def delete_rom_file(
|
|
self,
|
|
id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> None:
|
|
session.execute(
|
|
delete(RomFile)
|
|
.where(RomFile.id == id)
|
|
.execution_options(synchronize_session="evaluate")
|
|
)
|
|
|
|
# Note management methods
|
|
@begin_session
|
|
def get_rom_notes(
|
|
self,
|
|
rom_id: int,
|
|
user_id: int,
|
|
public_only: bool = False,
|
|
search: str | None = "",
|
|
tags: list[str] | None = None,
|
|
only_fields: Sequence[QueryableAttribute] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> Sequence[RomNote]:
|
|
query = session.query(RomNote).filter(RomNote.rom_id == rom_id)
|
|
|
|
if public_only:
|
|
query = query.filter(RomNote.is_public)
|
|
else:
|
|
# Include user's own notes (private + public) and public notes from others
|
|
query = query.filter(or_(RomNote.user_id == user_id, RomNote.is_public))
|
|
|
|
if search:
|
|
query = query.filter(
|
|
or_(RomNote.title.contains(search), RomNote.content.contains(search))
|
|
)
|
|
|
|
if tags:
|
|
for tag in tags:
|
|
query = query.filter(
|
|
json_array_contains_value(RomNote.tags, tag, session=session)
|
|
)
|
|
|
|
if only_fields:
|
|
query = query.options(load_only(*only_fields))
|
|
|
|
return query.order_by(RomNote.updated_at.desc()).all()
|
|
|
|
@begin_session
|
|
def create_rom_note(
|
|
self,
|
|
rom_id: int,
|
|
user_id: int,
|
|
title: str,
|
|
content: str = "",
|
|
is_public: bool = False,
|
|
tags: list[str] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> dict:
|
|
note = RomNote(
|
|
rom_id=rom_id,
|
|
user_id=user_id,
|
|
title=title,
|
|
content=content,
|
|
is_public=is_public,
|
|
tags=tags or [],
|
|
)
|
|
session.add(note)
|
|
session.flush() # To get the ID
|
|
|
|
# Return dict to avoid detached instance issues
|
|
return {
|
|
"id": note.id,
|
|
"title": note.title,
|
|
"content": note.content,
|
|
"is_public": note.is_public,
|
|
"tags": note.tags,
|
|
"created_at": note.created_at,
|
|
"updated_at": note.updated_at,
|
|
"rom_id": note.rom_id,
|
|
"user_id": note.user_id,
|
|
}
|
|
|
|
@begin_session
|
|
def update_rom_note(
|
|
self,
|
|
note_id: int,
|
|
user_id: int,
|
|
session: Session = None, # type: ignore
|
|
**fields,
|
|
) -> dict | None:
|
|
note = (
|
|
session.query(RomNote)
|
|
.filter(RomNote.id == note_id, RomNote.user_id == user_id)
|
|
.first()
|
|
)
|
|
|
|
if not note:
|
|
return None
|
|
|
|
for field, value in fields.items():
|
|
if hasattr(note, field):
|
|
setattr(note, field, value)
|
|
|
|
session.flush() # Ensure changes are committed
|
|
|
|
# Return dict to avoid detached instance issues
|
|
return {
|
|
"id": note.id,
|
|
"title": note.title,
|
|
"content": note.content,
|
|
"is_public": note.is_public,
|
|
"tags": note.tags,
|
|
"created_at": note.created_at,
|
|
"updated_at": note.updated_at,
|
|
"rom_id": note.rom_id,
|
|
"user_id": note.user_id,
|
|
}
|
|
|
|
@begin_session
|
|
def delete_rom_note(
|
|
self,
|
|
note_id: int,
|
|
user_id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> bool:
|
|
result = session.execute(
|
|
delete(RomNote).where(
|
|
and_(RomNote.id == note_id, RomNote.user_id == user_id)
|
|
)
|
|
)
|
|
return result.rowcount > 0
|
|
|
|
@begin_session
|
|
@with_details
|
|
def get_rom_by_metadata_id(
|
|
self,
|
|
igdb_id: int | None = None,
|
|
moby_id: int | None = None,
|
|
ss_id: int | None = None,
|
|
ra_id: int | None = None,
|
|
launchbox_id: int | None = None,
|
|
hasheous_id: int | None = None,
|
|
tgdb_id: int | None = None,
|
|
flashpoint_id: str | None = None,
|
|
hltb_id: int | None = None,
|
|
*,
|
|
query: Query = None, # type: ignore
|
|
session: Session = None, # type: ignore
|
|
) -> Rom | None:
|
|
"""
|
|
Get a ROM by any metadata ID.
|
|
|
|
Returns the first ROM that matches any of the provided metadata IDs.
|
|
"""
|
|
# Build filters for non-nil IDs
|
|
filters = [
|
|
column == value
|
|
for value, column in [
|
|
(igdb_id, Rom.igdb_id),
|
|
(moby_id, Rom.moby_id),
|
|
(ss_id, Rom.ss_id),
|
|
(ra_id, Rom.ra_id),
|
|
(launchbox_id, Rom.launchbox_id),
|
|
(hasheous_id, Rom.hasheous_id),
|
|
(tgdb_id, Rom.tgdb_id),
|
|
(flashpoint_id, Rom.flashpoint_id),
|
|
(hltb_id, Rom.hltb_id),
|
|
]
|
|
if value is not None
|
|
]
|
|
|
|
if not filters:
|
|
return None
|
|
|
|
# Return the first ROM matching any of the provided metadata IDs
|
|
return session.scalar(query.filter(or_(*filters)).limit(1))
|
|
|
|
@begin_session
|
|
@with_details
|
|
def get_rom_by_hash(
|
|
self,
|
|
crc_hash: str | None = None,
|
|
md5_hash: str | None = None,
|
|
sha1_hash: str | None = None,
|
|
ra_hash: str | None = None,
|
|
*,
|
|
query: Query = None, # type: ignore
|
|
session: Session = None, # type: ignore
|
|
) -> Rom | None:
|
|
"""
|
|
Get a ROM by calculated hash value.
|
|
|
|
Returns the first ROM that matches any of the provided hash values.
|
|
"""
|
|
# Build filters for non-nil IDs
|
|
filters = [
|
|
column == value
|
|
for value, column in [
|
|
(crc_hash, Rom.crc_hash),
|
|
(md5_hash, Rom.md5_hash),
|
|
(sha1_hash, Rom.sha1_hash),
|
|
(ra_hash, Rom.ra_hash),
|
|
(crc_hash, RomFile.crc_hash),
|
|
(md5_hash, RomFile.md5_hash),
|
|
(sha1_hash, RomFile.sha1_hash),
|
|
(ra_hash, RomFile.ra_hash),
|
|
]
|
|
if value is not None
|
|
]
|
|
|
|
if not filters:
|
|
return None
|
|
|
|
# Return the first ROM matching any of the provided hash values
|
|
return session.scalar(query.outerjoin(Rom.files).filter(or_(*filters)).limit(1))
|
|
|
|
def _collect_filter_values(
|
|
self,
|
|
session: Session,
|
|
statement: Select,
|
|
) -> dict:
|
|
genres = set()
|
|
franchises = set()
|
|
collections = set()
|
|
companies = set()
|
|
game_modes = set()
|
|
age_ratings = set()
|
|
player_counts = set()
|
|
regions = set()
|
|
languages = set()
|
|
platforms = set()
|
|
|
|
for row in session.execute(statement):
|
|
g, f, cl, co, gm, ar, pc, rg, lg, pid = row
|
|
if g:
|
|
genres.update(g)
|
|
if f:
|
|
franchises.update(f)
|
|
if cl:
|
|
collections.update(cl)
|
|
if co:
|
|
companies.update(co)
|
|
if gm:
|
|
game_modes.update(gm)
|
|
if ar:
|
|
age_ratings.update(ar)
|
|
if pc:
|
|
player_counts.add(pc)
|
|
if rg:
|
|
regions.update(rg)
|
|
if lg:
|
|
languages.update(lg)
|
|
platforms.add(pid)
|
|
|
|
return {
|
|
"genres": sorted(genres),
|
|
"franchises": sorted(franchises),
|
|
"collections": sorted(collections),
|
|
"companies": sorted(companies),
|
|
"game_modes": sorted(game_modes),
|
|
"age_ratings": sorted(age_ratings),
|
|
"player_counts": sorted(player_counts),
|
|
"regions": sorted(regions),
|
|
"languages": sorted(languages),
|
|
"platforms": sorted(platforms),
|
|
}
|
|
|
|
@begin_session
|
|
def with_filter_values(
|
|
self,
|
|
query: Query,
|
|
session: Session = None, # type: ignore
|
|
) -> dict:
|
|
"""
|
|
Returns the list of filters given the current subset of ROMs in the query
|
|
"""
|
|
ids_subq = query.with_only_columns(Rom.id).scalar_subquery() # type: ignore
|
|
|
|
statement = (
|
|
select(
|
|
RomMetadata.genres,
|
|
RomMetadata.franchises,
|
|
RomMetadata.collections,
|
|
RomMetadata.companies,
|
|
RomMetadata.game_modes,
|
|
RomMetadata.age_ratings,
|
|
RomMetadata.player_count,
|
|
Rom.regions,
|
|
Rom.languages,
|
|
Rom.platform_id,
|
|
)
|
|
.select_from(Rom)
|
|
.join(RomMetadata, Rom.id == RomMetadata.rom_id)
|
|
.where(Rom.id.in_(ids_subq))
|
|
)
|
|
|
|
return self._collect_filter_values(session, statement)
|
|
|
|
@begin_session
|
|
def get_rom_filters(
|
|
self,
|
|
session: Session = None, # type: ignore
|
|
) -> dict:
|
|
"""
|
|
Returns all filter values across all ROM metadata
|
|
"""
|
|
statement = select(
|
|
RomMetadata.genres,
|
|
RomMetadata.franchises,
|
|
RomMetadata.collections,
|
|
RomMetadata.companies,
|
|
RomMetadata.game_modes,
|
|
RomMetadata.age_ratings,
|
|
RomMetadata.player_count,
|
|
Rom.regions,
|
|
Rom.languages,
|
|
Rom.platform_id,
|
|
)
|
|
|
|
return self._collect_filter_values(session, statement)
|