mirror of
https://github.com/rommapp/romm.git
synced 2026-07-01 08:16:21 +00:00
178 lines
5.8 KiB
Python
178 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
|
|
from sqlalchemy import distinct, func, select
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.sql.selectable import Select
|
|
|
|
from decorators.database import begin_session
|
|
from endpoints.responses.stats import MetadataCoverageItem, RegionBreakdownItem
|
|
from models.assets import Save, Screenshot, State
|
|
from models.rom import METADATA_SOURCE_COLUMNS, Rom, RomFile
|
|
|
|
from .base_handler import DBBaseHandler
|
|
|
|
|
|
def _exclude_hidden(
|
|
query: Select,
|
|
hidden_platform_ids: Sequence[int] | None,
|
|
hidden_rom_ids: Sequence[int] | None,
|
|
) -> Select:
|
|
"""Drop rows for platforms/roms hidden from the caller (admins pass None)."""
|
|
if hidden_platform_ids:
|
|
query = query.where(Rom.platform_id.not_in(hidden_platform_ids))
|
|
if hidden_rom_ids:
|
|
query = query.where(Rom.id.not_in(hidden_rom_ids))
|
|
return query
|
|
|
|
|
|
class DBStatsHandler(DBBaseHandler):
|
|
@begin_session
|
|
def get_platforms_count(
|
|
self,
|
|
hidden_platform_ids: Sequence[int] | None = None,
|
|
hidden_rom_ids: Sequence[int] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
"""Get the number of platforms with any roms."""
|
|
query = _exclude_hidden(
|
|
select(func.count(distinct(Rom.platform_id))).select_from(Rom),
|
|
hidden_platform_ids,
|
|
hidden_rom_ids,
|
|
)
|
|
return session.scalar(query) or 0
|
|
|
|
@begin_session
|
|
def get_roms_count(
|
|
self,
|
|
hidden_platform_ids: Sequence[int] | None = None,
|
|
hidden_rom_ids: Sequence[int] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
query = _exclude_hidden(
|
|
select(func.count()).select_from(Rom),
|
|
hidden_platform_ids,
|
|
hidden_rom_ids,
|
|
)
|
|
return session.scalar(query) or 0
|
|
|
|
@begin_session
|
|
def get_saves_count(
|
|
self,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
return session.scalar(select(func.count()).select_from(Save)) or 0
|
|
|
|
@begin_session
|
|
def get_states_count(
|
|
self,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
return session.scalar(select(func.count()).select_from(State)) or 0
|
|
|
|
@begin_session
|
|
def get_screenshots_count(
|
|
self,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
return session.scalar(select(func.count()).select_from(Screenshot)) or 0
|
|
|
|
@begin_session
|
|
def get_total_filesize(
|
|
self,
|
|
hidden_platform_ids: Sequence[int] | None = None,
|
|
hidden_rom_ids: Sequence[int] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
"""Get the total filesize of all roms in the database, in bytes."""
|
|
query = select(func.sum(RomFile.file_size_bytes)).select_from(RomFile)
|
|
if hidden_platform_ids or hidden_rom_ids:
|
|
query = _exclude_hidden(
|
|
query.join(Rom), hidden_platform_ids, hidden_rom_ids
|
|
)
|
|
return session.scalar(query) or 0
|
|
|
|
@begin_session
|
|
def get_platform_filesize(
|
|
self,
|
|
platform_id: int,
|
|
session: Session = None, # type: ignore
|
|
) -> int:
|
|
"""Get the total filesize of all roms in the database, in bytes."""
|
|
return (
|
|
session.scalar(
|
|
select(func.sum(RomFile.file_size_bytes))
|
|
.select_from(RomFile)
|
|
.join(Rom)
|
|
.filter(Rom.platform_id == platform_id)
|
|
)
|
|
or 0
|
|
)
|
|
|
|
@begin_session
|
|
def get_metadata_coverage_by_platform(
|
|
self,
|
|
hidden_platform_ids: Sequence[int] | None = None,
|
|
hidden_rom_ids: Sequence[int] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> dict[int, list[MetadataCoverageItem]]:
|
|
"""Get the count of ROMs matched per metadata source, grouped by platform."""
|
|
rows = session.execute(
|
|
_exclude_hidden(
|
|
select(
|
|
Rom.platform_id,
|
|
*(
|
|
func.count(col).label(key)
|
|
for key, col in METADATA_SOURCE_COLUMNS.items()
|
|
),
|
|
).select_from(Rom),
|
|
hidden_platform_ids,
|
|
hidden_rom_ids,
|
|
).group_by(Rom.platform_id)
|
|
).all()
|
|
|
|
result: dict[int, list[MetadataCoverageItem]] = {}
|
|
for row in rows:
|
|
result[row.platform_id] = [
|
|
MetadataCoverageItem(source=key, matched=getattr(row, key))
|
|
for key in METADATA_SOURCE_COLUMNS
|
|
if getattr(row, key) > 0
|
|
]
|
|
|
|
return result
|
|
|
|
@begin_session
|
|
def get_region_breakdown_by_platform(
|
|
self,
|
|
hidden_platform_ids: Sequence[int] | None = None,
|
|
hidden_rom_ids: Sequence[int] | None = None,
|
|
session: Session = None, # type: ignore
|
|
) -> dict[int, list[RegionBreakdownItem]]:
|
|
"""Get the count of ROMs per region, grouped by platform."""
|
|
rows = session.execute(
|
|
_exclude_hidden(
|
|
select(Rom.platform_id, Rom.regions).where(Rom.regions.is_not(None)),
|
|
hidden_platform_ids,
|
|
hidden_rom_ids,
|
|
)
|
|
).all()
|
|
|
|
counter: dict[int, dict[str, int]] = {}
|
|
for platform_id, regions_list in rows:
|
|
if regions_list:
|
|
if platform_id not in counter:
|
|
counter[platform_id] = {}
|
|
for region in regions_list:
|
|
counter[platform_id][region] = (
|
|
counter[platform_id].get(region, 0) + 1
|
|
)
|
|
|
|
return {
|
|
pid: [
|
|
{"region": r, "count": c}
|
|
for r, c in sorted(regions.items(), key=lambda x: -x[1])
|
|
]
|
|
for pid, regions in counter.items()
|
|
}
|