mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
Finish scanning for saves/states
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0012_rom_assets
|
||||
Revision ID: 0012_asset_files
|
||||
Revises: 0011_drop_has_cover
|
||||
Create Date: 2023-11-12 23:51:15.578857
|
||||
Create Date: 2023-11-13 12:25:47.355434
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
@@ -10,7 +10,7 @@ import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "0012_rom_assets"
|
||||
revision = "0012_asset_files"
|
||||
down_revision = "0011_drop_has_cover"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
@@ -19,56 +19,82 @@ depends_on = None
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
"saves",
|
||||
"bios",
|
||||
sa.Column("platform_slug", sa.String(length=50), nullable=False),
|
||||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column("rom_id", sa.Integer(), nullable=True),
|
||||
sa.Column("file_name", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_tags", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_extension", sa.String(length=10), nullable=False),
|
||||
sa.Column("file_path", sa.String(length=1000), nullable=False),
|
||||
sa.Column("file_size_bytes", sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["rom_id"],
|
||||
["roms.id"],
|
||||
["platform_slug"], ["platforms.slug"], ondelete="CASCADE"
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
op.create_table(
|
||||
"saves",
|
||||
sa.Column("rom_id", sa.Integer(), nullable=False),
|
||||
sa.Column("platform_slug", sa.String(length=50), nullable=False),
|
||||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column("file_name", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_tags", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_extension", sa.String(length=10), nullable=False),
|
||||
sa.Column("file_path", sa.String(length=1000), nullable=False),
|
||||
sa.Column("file_size_bytes", sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["platform_slug"], ["platforms.slug"], ondelete="CASCADE"
|
||||
),
|
||||
sa.ForeignKeyConstraint(["rom_id"], ["roms.id"], ondelete="CASCADE"),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
op.create_table(
|
||||
"screenshots",
|
||||
sa.Column("rom_id", sa.Integer(), nullable=False),
|
||||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column("rom_id", sa.Integer(), nullable=True),
|
||||
sa.Column("file_name", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_tags", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_extension", sa.String(length=10), nullable=False),
|
||||
sa.Column("file_path", sa.String(length=1000), nullable=False),
|
||||
sa.Column("file_size_bytes", sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["rom_id"],
|
||||
["roms.id"],
|
||||
),
|
||||
sa.ForeignKeyConstraint(["rom_id"], ["roms.id"], ondelete="CASCADE"),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
op.create_table(
|
||||
"states",
|
||||
sa.Column("rom_id", sa.Integer(), nullable=False),
|
||||
sa.Column("platform_slug", sa.String(length=50), nullable=False),
|
||||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column("rom_id", sa.Integer(), nullable=True),
|
||||
sa.Column("file_name", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_tags", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_extension", sa.String(length=10), nullable=False),
|
||||
sa.Column("file_path", sa.String(length=1000), nullable=False),
|
||||
sa.Column("file_size_bytes", sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["rom_id"],
|
||||
["roms.id"],
|
||||
["platform_slug"], ["platforms.slug"], ondelete="CASCADE"
|
||||
),
|
||||
sa.ForeignKeyConstraint(["rom_id"], ["roms.id"], ondelete="CASCADE"),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
with op.batch_alter_table("roms", schema=None) as batch_op:
|
||||
batch_op.drop_constraint("fk_platform_roms", type_="foreignkey")
|
||||
batch_op.create_foreign_key(
|
||||
None, "platforms", ["platform_slug"], ["slug"], ondelete="CASCADE"
|
||||
)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table("roms", schema=None) as batch_op:
|
||||
batch_op.drop_constraint(None, type_="foreignkey")
|
||||
batch_op.create_foreign_key(
|
||||
"fk_platform_roms", "platforms", ["platform_slug"], ["slug"]
|
||||
)
|
||||
|
||||
op.drop_table("states")
|
||||
op.drop_table("screenshots")
|
||||
op.drop_table("saves")
|
||||
op.drop_table("bios")
|
||||
# ### end Alembic commands ###
|
||||
@@ -10,10 +10,15 @@ DEV_PORT: Final = int(os.environ.get("VITE_BACKEND_DEV_PORT", "5000"))
|
||||
DEV_HOST: Final = "0.0.0.0"
|
||||
|
||||
# PATHS
|
||||
ROMM_BASE_PATH: Final = os.environ.get("ROMM_BASE_PATH", "/romm")
|
||||
ROMS_FOLDER_NAME: Final = os.environ.get("ROMS_FOLDER_NAME", "roms")
|
||||
SAVES_FOLDER_NAME: Final = os.environ.get("SAVES_FOLDER_NAME", "saves")
|
||||
STATES_FOLDER_NAME: Final = os.environ.get("STATES_FOLDER_NAME", "states")
|
||||
SCREENSHOTS_FOLDER_NAME: Final = os.environ.get(
|
||||
"SCREENSHOTS_FOLDER_NAME", "screenshots"
|
||||
)
|
||||
BIOS_FOLDER_NAME: Final = os.environ.get("BIOS_FOLDER_NAME", "bios")
|
||||
|
||||
ROMM_BASE_PATH: Final = os.environ.get("ROMM_BASE_PATH", "/romm")
|
||||
LIBRARY_BASE_PATH: Final = f"{ROMM_BASE_PATH}/library"
|
||||
HIGH_PRIO_STRUCTURE_PATH: Final = f"{LIBRARY_BASE_PATH}/{ROMS_FOLDER_NAME}"
|
||||
|
||||
|
||||
@@ -30,8 +30,8 @@ from utils.fs import (
|
||||
build_artwork_path,
|
||||
build_upload_roms_path,
|
||||
rename_rom,
|
||||
get_cover,
|
||||
get_screenshots,
|
||||
get_rom_cover,
|
||||
get_rom_screenshots,
|
||||
remove_rom,
|
||||
)
|
||||
|
||||
@@ -240,7 +240,7 @@ async def update_rom(
|
||||
cleaned_data["file_name"] = fs_safe_file_name
|
||||
cleaned_data["file_name_no_tags"] = get_file_name_with_no_tags(fs_safe_file_name)
|
||||
cleaned_data.update(
|
||||
get_cover(
|
||||
get_rom_cover(
|
||||
overwrite=True,
|
||||
fs_slug=db_rom.platform_slug,
|
||||
rom_name=cleaned_data["name"],
|
||||
@@ -249,7 +249,7 @@ async def update_rom(
|
||||
)
|
||||
|
||||
cleaned_data.update(
|
||||
get_screenshots(
|
||||
get_rom_screenshots(
|
||||
fs_slug=db_rom.platform_slug,
|
||||
rom_name=cleaned_data["name"],
|
||||
url_screenshots=cleaned_data.get("url_screenshots", []),
|
||||
|
||||
@@ -4,10 +4,22 @@ import socketio # type: ignore
|
||||
from logger.logger import log
|
||||
from exceptions.fs_exceptions import PlatformsNotFoundException, RomsNotFoundException
|
||||
from handler import dbh
|
||||
from utils import get_file_name_with_no_tags
|
||||
from utils.fastapi import scan_platform, scan_rom, scan_save, scan_state
|
||||
from utils.fastapi import (
|
||||
scan_platform,
|
||||
scan_rom,
|
||||
scan_save,
|
||||
scan_state,
|
||||
scan_bios,
|
||||
scan_screenshot,
|
||||
)
|
||||
from utils.socket import socket_server
|
||||
from utils.fs import get_platforms, get_roms, store_default_resources, get_assets
|
||||
from utils.fs import (
|
||||
get_platforms,
|
||||
get_roms,
|
||||
store_default_resources,
|
||||
get_assets,
|
||||
get_screenshots,
|
||||
)
|
||||
from utils.redis import high_prio_queue, redis_url
|
||||
from endpoints.platform import PlatformSchema
|
||||
from endpoints.rom import RomSchema
|
||||
@@ -76,39 +88,74 @@ async def scan_platforms(
|
||||
},
|
||||
)
|
||||
|
||||
# Scanning assets
|
||||
fs_assets = get_assets(scanned_platform.fs_slug)
|
||||
for fs_save in fs_assets["saves"]:
|
||||
scanned_save = await scan_save(
|
||||
|
||||
# Scanning saves
|
||||
log.info("\t · Saves")
|
||||
for fs_save_filename in fs_assets["saves"]:
|
||||
save = dbh.get_save_by_filename(scanned_platform.slug, fs_save_filename)
|
||||
if save:
|
||||
continue
|
||||
|
||||
scanned_save = scan_save(
|
||||
scanned_platform,
|
||||
fs_save,
|
||||
)
|
||||
|
||||
file_name_no_tags = get_file_name_with_no_tags(scanned_save.file_name)
|
||||
rom = dbh.get_rom_by_filename_no_tags(
|
||||
scanned_platform.slug, file_name_no_tags
|
||||
fs_save_filename,
|
||||
)
|
||||
scanned_save.platform_slug = scanned_platform.slug
|
||||
|
||||
rom = dbh.get_rom_by_filename_no_tags(scanned_save.file_name_no_tags)
|
||||
if rom:
|
||||
scanned_save.rom_id = rom.id
|
||||
dbh.add_save(scanned_save)
|
||||
|
||||
dbh.add_save(scanned_save)
|
||||
# Scanning states
|
||||
log.info("\t · States")
|
||||
for fs_state_filename in fs_assets["states"]:
|
||||
state = dbh.get_state_by_filename(scanned_platform.slug, fs_state_filename)
|
||||
if state:
|
||||
continue
|
||||
|
||||
for state in fs_assets["states"]:
|
||||
scanned_state = await scan_state(scanned_platform, state)
|
||||
|
||||
file_name_no_tags = get_file_name_with_no_tags(scanned_state.file_name)
|
||||
rom = dbh.get_rom_by_filename_no_tags(
|
||||
scanned_platform.slug, file_name_no_tags
|
||||
)
|
||||
scanned_state = scan_state(scanned_platform, fs_state_filename)
|
||||
scanned_state.platform_slug = scanned_platform.slug
|
||||
|
||||
rom = dbh.get_rom_by_filename_no_tags(scanned_state.file_name_no_tags)
|
||||
if rom:
|
||||
scanned_state.rom_id = rom.id
|
||||
dbh.add_state(scanned_state)
|
||||
|
||||
dbh.add_state(scanned_state)
|
||||
# Scanning bios
|
||||
log.info("\t · Firmware")
|
||||
for fs_bios_filename in fs_assets["bios"]:
|
||||
bios = dbh.get_bios_by_filename(scanned_platform.slug, fs_bios_filename)
|
||||
if bios:
|
||||
continue
|
||||
|
||||
scanned_bios = scan_bios(scanned_platform, fs_bios_filename)
|
||||
scanned_bios.platform_slug = scanned_platform.slug
|
||||
dbh.add_bios(scanned_bios)
|
||||
|
||||
dbh.purge_saves(scanned_platform.slug, fs_assets["saves"])
|
||||
dbh.purge_states(scanned_platform.slug, fs_assets["states"])
|
||||
dbh.purge_bios(scanned_platform.slug, fs_assets["bios"])
|
||||
dbh.purge_roms(scanned_platform.slug, [rom["file_name"] for rom in fs_roms])
|
||||
|
||||
# Scanning screenshots
|
||||
log.info("\t · Screenshots")
|
||||
fs_screenshots = get_screenshots()
|
||||
for fs_screenshot_filename in fs_screenshots:
|
||||
screenshot = dbh.get_screenshot_by_filename(fs_screenshot_filename)
|
||||
if screenshot:
|
||||
continue
|
||||
|
||||
scanned_screenshot = scan_screenshot(fs_screenshot_filename)
|
||||
|
||||
rom = dbh.get_rom_by_filename_no_tags(scanned_screenshot.file_name_no_tags)
|
||||
if rom:
|
||||
scanned_screenshot.rom_id = rom.id
|
||||
dbh.add_screenshot(scanned_screenshot)
|
||||
|
||||
dbh.purge_platforms(fs_platforms)
|
||||
dbh.purge_screenshots(fs_screenshots)
|
||||
|
||||
await sm.emit("scan:done", {})
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from sqlalchemy.exc import ProgrammingError
|
||||
|
||||
from logger.logger import log
|
||||
from config.config_loader import ConfigLoader
|
||||
from models import Platform, Rom, User, Role, Save, State
|
||||
from models import Platform, Rom, User, Role, Save, State, Screenshot, Bios
|
||||
|
||||
|
||||
class DBHandler:
|
||||
@@ -48,6 +48,21 @@ class DBHandler:
|
||||
|
||||
@begin_session
|
||||
def purge_platforms(self, platforms: list[str], session: Session = None):
|
||||
session.execute(
|
||||
delete(Save)
|
||||
.where(Save.platform_slug.not_in(platforms))
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
session.execute(
|
||||
delete(State)
|
||||
.where(State.platform_slug.not_in(platforms))
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
session.execute(
|
||||
delete(Bios)
|
||||
.where(Bios.platform_slug.not_in(platforms))
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
return session.execute(
|
||||
delete(Platform)
|
||||
.where(or_(Platform.slug.not_in(platforms), Platform.slug.is_(None)))
|
||||
@@ -118,32 +133,128 @@ class DBHandler:
|
||||
|
||||
@begin_session
|
||||
def get_rom_by_filename_no_tags(
|
||||
self, platform_slug: str, file_name_no_tags: str, session: Session = None
|
||||
self, file_name_no_tags: str, session: Session = None
|
||||
):
|
||||
return session.scalars(
|
||||
select(Rom)
|
||||
.filter_by(platform_slug=platform_slug, file_name_no_tags=file_name_no_tags)
|
||||
.limit(1)
|
||||
select(Rom).filter_by(file_name_no_tags=file_name_no_tags).limit(1)
|
||||
).first()
|
||||
|
||||
|
||||
# ========= Saves =========
|
||||
@begin_session
|
||||
def add_save(self, save: Rom, session: Session = None):
|
||||
return session.merge(save)
|
||||
|
||||
|
||||
@begin_session
|
||||
def get_save(self, id, session: Session = None):
|
||||
return session.get(Save, id)
|
||||
|
||||
|
||||
@begin_session
|
||||
def get_save_by_filename(
|
||||
self, platform_slug: str, file_name: str, session: Session = None
|
||||
):
|
||||
return session.scalars(
|
||||
select(Save)
|
||||
.filter_by(platform_slug=platform_slug, file_name=file_name)
|
||||
.limit(1)
|
||||
).first()
|
||||
|
||||
@begin_session
|
||||
def purge_saves(
|
||||
self, platform_slug: str, saves: list[str], session: Session = None
|
||||
):
|
||||
return session.execute(
|
||||
delete(Save)
|
||||
.where(
|
||||
and_(Save.platform_slug == platform_slug, Save.file_name.not_in(saves))
|
||||
)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
# ========= States =========
|
||||
@begin_session
|
||||
def add_state(self, state: Rom, session: Session = None):
|
||||
return session.merge(state)
|
||||
|
||||
|
||||
@begin_session
|
||||
def get_state(self, id, session: Session = None):
|
||||
return session.get(State, id)
|
||||
|
||||
@begin_session
|
||||
def get_state_by_filename(
|
||||
self, platform_slug: str, file_name: str, session: Session = None
|
||||
):
|
||||
return session.scalars(
|
||||
select(State)
|
||||
.filter_by(platform_slug=platform_slug, file_name=file_name)
|
||||
.limit(1)
|
||||
).first()
|
||||
|
||||
@begin_session
|
||||
def purge_states(
|
||||
self, platform_slug: str, states: list[str], session: Session = None
|
||||
):
|
||||
return session.execute(
|
||||
delete(State)
|
||||
.where(
|
||||
and_(
|
||||
State.platform_slug == platform_slug, State.file_name.not_in(states)
|
||||
)
|
||||
)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
# ========= Bios =========
|
||||
@begin_session
|
||||
def add_bios(self, bios: Rom, session: Session = None):
|
||||
return session.merge(bios)
|
||||
|
||||
@begin_session
|
||||
def get_bios(self, id, session: Session = None):
|
||||
return session.get(Bios, id)
|
||||
|
||||
@begin_session
|
||||
def get_bios_by_filename(
|
||||
self, platform_slug: str, file_name: str, session: Session = None
|
||||
):
|
||||
return session.scalars(
|
||||
select(Bios)
|
||||
.filter_by(platform_slug=platform_slug, file_name=file_name)
|
||||
.limit(1)
|
||||
).first()
|
||||
|
||||
@begin_session
|
||||
def purge_bios(self, platform_slug: str, bios: list[str], session: Session = None):
|
||||
return session.execute(
|
||||
delete(Bios)
|
||||
.where(
|
||||
and_(Bios.platform_slug == platform_slug, Bios.file_name.not_in(bios))
|
||||
)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
# ========= Screenshots =========
|
||||
@begin_session
|
||||
def add_screenshot(self, screenshot: Rom, session: Session = None):
|
||||
return session.merge(screenshot)
|
||||
|
||||
@begin_session
|
||||
def get_screenshot(self, id, session: Session = None):
|
||||
return session.get(Screenshot, id)
|
||||
|
||||
@begin_session
|
||||
def get_screenshot_by_filename(self, file_name: str, session: Session = None):
|
||||
return session.scalars(
|
||||
select(Screenshot).filter_by(file_name=file_name).limit(1)
|
||||
).first()
|
||||
|
||||
@begin_session
|
||||
def purge_screenshots(self, screenshots: list[str], session: Session = None):
|
||||
return session.execute(
|
||||
delete(Screenshot)
|
||||
.where(Screenshot.file_name.not_in(screenshots))
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
# ========= Users =========
|
||||
@begin_session
|
||||
def add_user(self, user: User, session: Session = None):
|
||||
|
||||
@@ -13,7 +13,7 @@ from typing import Final
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from config import IGDB_CLIENT_ID, IGDB_CLIENT_SECRET, DEFAULT_URL_COVER_L
|
||||
from utils import get_file_name_with_no_tags as get_search_term
|
||||
from utils import get_file_name_with_no_tags
|
||||
from logger.logger import log
|
||||
from utils.cache import cache
|
||||
from tasks.update_switch_titledb import update_switch_titledb_task
|
||||
@@ -180,7 +180,7 @@ class IGDBHandler:
|
||||
|
||||
@check_twitch_token
|
||||
async def get_rom(self, file_name: str, platform_idgb_id: int) -> IGDBRomType:
|
||||
search_term = get_search_term(file_name)
|
||||
search_term = get_file_name_with_no_tags(file_name)
|
||||
|
||||
# Patch support for PS2 OPL flename format
|
||||
match = re.match(PS2_OPL_REGEX, file_name)
|
||||
|
||||
@@ -57,7 +57,7 @@ def test_roms(rom):
|
||||
roms = session.scalars(dbh.get_roms(rom.platform_slug)).all()
|
||||
assert len(roms) == 1
|
||||
|
||||
dbh.purge_roms(rom_2.platform_slug, [rom_2.slug])
|
||||
dbh.purge_roms(rom_2.platform_slug, [rom_2.id])
|
||||
|
||||
with dbh.session.begin() as session:
|
||||
roms = session.scalars(dbh.get_roms(rom.platform_slug)).all()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from .platform import Platform # noqa[401]
|
||||
from .rom import Rom # noqa[401]
|
||||
from .user import User, Role # noqa[401]
|
||||
from .asset import Save, State, Screenshot # noqa[401]
|
||||
from .asset import Save, State, Screenshot, Bios # noqa[401]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from sqlalchemy import Integer, Column, ForeignKey, String
|
||||
from sqlalchemy.orm import relationship, Mapped
|
||||
from sqlalchemy.orm import relationship, Mapped, backref
|
||||
from functools import cached_property
|
||||
|
||||
from config import FRONT_LIBRARY_PATH
|
||||
@@ -7,12 +7,9 @@ from .base import BaseModel
|
||||
|
||||
|
||||
class BaseAsset(BaseModel):
|
||||
from .rom import Rom
|
||||
|
||||
__abstract__ = True
|
||||
|
||||
id = Column(Integer(), primary_key=True, autoincrement=True)
|
||||
rom_id = Column(Integer(), ForeignKey("roms.id"), nullable=True)
|
||||
|
||||
file_name = Column(String(length=450), nullable=False)
|
||||
file_name_no_tags = Column(String(length=450), nullable=False)
|
||||
@@ -34,15 +31,22 @@ class Save(BaseAsset):
|
||||
|
||||
__tablename__ = "saves"
|
||||
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True)
|
||||
rom_id = Column(Integer(), ForeignKey("roms.id", ondelete='CASCADE'), nullable=False)
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True, backref=backref('saves', passive_deletes=True))
|
||||
|
||||
platform_slug = Column(String(length=50), ForeignKey("platforms.slug", ondelete='CASCADE'), nullable=False)
|
||||
platform = relationship("Platform", lazy="joined", innerjoin=True, backref=backref('saves', passive_deletes=True))
|
||||
|
||||
class State(BaseAsset):
|
||||
from .rom import Rom
|
||||
|
||||
__tablename__ = "states"
|
||||
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True)
|
||||
rom_id = Column(Integer(), ForeignKey("roms.id", ondelete='CASCADE'), nullable=False)
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True, backref=backref('states', passive_deletes=True))
|
||||
|
||||
platform_slug = Column(String(length=50), ForeignKey("platforms.slug", ondelete='CASCADE'), nullable=False)
|
||||
platform = relationship("Platform", lazy="joined", innerjoin=True, backref=backref('states', passive_deletes=True))
|
||||
|
||||
|
||||
class Screenshot(BaseAsset):
|
||||
@@ -50,4 +54,14 @@ class Screenshot(BaseAsset):
|
||||
|
||||
__tablename__ = "screenshots"
|
||||
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True)
|
||||
rom_id = Column(Integer(), ForeignKey("roms.id", ondelete='CASCADE'), nullable=False)
|
||||
rom: Mapped[Rom] = relationship("Rom", lazy="joined", innerjoin=True, backref=backref('screenshots', passive_deletes=True))
|
||||
|
||||
|
||||
class Bios(BaseAsset):
|
||||
from .platform import Platform
|
||||
|
||||
__tablename__ = "bios"
|
||||
|
||||
platform_slug = Column(String(length=50), ForeignKey("platforms.slug", ondelete='CASCADE'), nullable=False)
|
||||
platform: Mapped[Platform] = relationship("Platform", lazy="joined", innerjoin=True, backref=backref('bios', passive_deletes=True))
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import re
|
||||
from sqlalchemy import Integer, Column, String, Text, Boolean, Float, JSON, ForeignKey
|
||||
from sqlalchemy.orm import relationship, Mapped
|
||||
from sqlalchemy.orm import relationship, Mapped, backref
|
||||
from functools import cached_property
|
||||
|
||||
from config import DEFAULT_PATH_COVER_S, DEFAULT_PATH_COVER_L, FRONT_LIBRARY_PATH
|
||||
@@ -29,10 +29,15 @@ class Rom(BaseModel):
|
||||
|
||||
# Foreign key to platform
|
||||
platform_slug = Column(
|
||||
String(length=50), ForeignKey("platforms.slug"), nullable=False
|
||||
String(length=50),
|
||||
ForeignKey("platforms.slug", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
platform: Mapped[Platform] = relationship( # noqa
|
||||
"Platform", lazy="joined", innerjoin=True
|
||||
"Platform",
|
||||
lazy="joined",
|
||||
innerjoin=True,
|
||||
backref=backref("roms", passive_deletes=True),
|
||||
)
|
||||
|
||||
### DEPRECATED ###
|
||||
|
||||
@@ -105,8 +105,7 @@ def get_file_name_with_no_extension(file_name: str) -> str:
|
||||
|
||||
def get_file_name_with_no_tags(file_name: str) -> str:
|
||||
file_name_no_extension = get_file_name_with_no_extension(file_name)
|
||||
return re.sub(TAG_REGEX, "", file_name_no_extension).strip()
|
||||
|
||||
return re.split(TAG_REGEX, file_name_no_extension)[0].strip()
|
||||
|
||||
def get_file_extension(file_name: str) -> str:
|
||||
return re.search(EXTENSION_REGEX, file_name).group(1)
|
||||
|
||||
@@ -3,9 +3,14 @@ from typing import Any
|
||||
|
||||
from handler import igdbh
|
||||
from utils import fs, parse_tags, get_file_extension, get_file_name_with_no_tags
|
||||
from config import SAVES_FOLDER_NAME, STATES_FOLDER_NAME
|
||||
from config import (
|
||||
SAVES_FOLDER_NAME,
|
||||
STATES_FOLDER_NAME,
|
||||
BIOS_FOLDER_NAME,
|
||||
SCREENSHOTS_FOLDER_NAME,
|
||||
)
|
||||
from config.config_loader import config
|
||||
from models import Platform, Rom, Save, State
|
||||
from models import Platform, Rom, Save, State, Bios, Screenshot
|
||||
from logger.logger import log
|
||||
|
||||
|
||||
@@ -100,7 +105,7 @@ async def scan_rom(
|
||||
|
||||
# Update properties from IGDB
|
||||
rom_attrs.update(
|
||||
fs.get_cover(
|
||||
fs.get_rom_cover(
|
||||
overwrite=overwrite,
|
||||
fs_slug=platform.slug,
|
||||
rom_name=rom_attrs["name"],
|
||||
@@ -108,7 +113,7 @@ async def scan_rom(
|
||||
)
|
||||
)
|
||||
rom_attrs.update(
|
||||
fs.get_screenshots(
|
||||
fs.get_rom_screenshots(
|
||||
fs_slug=platform.slug,
|
||||
rom_name=rom_attrs["name"],
|
||||
url_screenshots=rom_attrs["url_screenshots"],
|
||||
@@ -118,33 +123,34 @@ async def scan_rom(
|
||||
return Rom(**rom_attrs)
|
||||
|
||||
|
||||
async def scan_save(platform: Platform, file_name: str) -> Save:
|
||||
def _scan_asset(file_name: str, path: str):
|
||||
log.info(f"\t\t · {file_name}")
|
||||
|
||||
file_size = fs.get_fs_file_size(file_name=file_name, asset_path=path)
|
||||
|
||||
return {
|
||||
"file_path": path,
|
||||
"file_name": file_name,
|
||||
"file_name_no_tags": get_file_name_with_no_tags(file_name),
|
||||
"file_extension": get_file_extension(file_name),
|
||||
"file_size_bytes": file_size,
|
||||
}
|
||||
|
||||
|
||||
def scan_save(platform: Platform, file_name: str) -> Save:
|
||||
saves_path = fs.get_fs_structure(platform.fs_slug, folder=SAVES_FOLDER_NAME)
|
||||
|
||||
log.info(f"\t · {file_name}")
|
||||
|
||||
file_size = fs.get_fs_file_size(file_name=file_name, asset_path=saves_path)
|
||||
|
||||
return Save(
|
||||
file_path=saves_path,
|
||||
file_name=file_name,
|
||||
file_name_no_tags=get_file_name_with_no_tags(file_name),
|
||||
file_extension=get_file_extension(file_name),
|
||||
file_size_bytes=file_size,
|
||||
)
|
||||
return Save(**_scan_asset(file_name, saves_path))
|
||||
|
||||
|
||||
async def scan_state(platform: Platform, file_name: str) -> State:
|
||||
def scan_state(platform: Platform, file_name: str) -> State:
|
||||
states_path = fs.get_fs_structure(platform.fs_slug, folder=STATES_FOLDER_NAME)
|
||||
return State(**_scan_asset(file_name, states_path))
|
||||
|
||||
log.info(f"\t · {file_name}")
|
||||
|
||||
file_size = fs.get_fs_file_size(file_name=file_name, asset_path=states_path)
|
||||
def scan_bios(platform: Platform, file_name: str) -> State:
|
||||
bios_path = fs.get_fs_structure(platform.fs_slug, folder=BIOS_FOLDER_NAME)
|
||||
return Bios(**_scan_asset(file_name, bios_path))
|
||||
|
||||
return State(
|
||||
file_path=states_path,
|
||||
file_name=file_name,
|
||||
file_name_no_tags=get_file_name_with_no_tags(file_name),
|
||||
file_extension=get_file_extension(file_name),
|
||||
file_size_bytes=file_size,
|
||||
)
|
||||
|
||||
def scan_screenshot(file_name: str) -> State:
|
||||
return Screenshot(**_scan_asset(file_name, SCREENSHOTS_FOLDER_NAME))
|
||||
|
||||
@@ -22,6 +22,8 @@ from config import (
|
||||
DEFAULT_HEIGHT_COVER_S,
|
||||
SAVES_FOLDER_NAME,
|
||||
STATES_FOLDER_NAME,
|
||||
SCREENSHOTS_FOLDER_NAME,
|
||||
BIOS_FOLDER_NAME,
|
||||
)
|
||||
from config.config_loader import config
|
||||
from exceptions.fs_exceptions import (
|
||||
@@ -113,7 +115,7 @@ def _get_cover_path(fs_slug: str, rom_name: str, size: CoverSize):
|
||||
return f"{fs_slug}/{rom_name}/cover/{size.value}.png?timestamp={strtime}"
|
||||
|
||||
|
||||
def get_cover(
|
||||
def get_rom_cover(
|
||||
overwrite: bool, fs_slug: str, rom_name: str, url_cover: str = ""
|
||||
) -> dict:
|
||||
q_rom_name = quote(rom_name)
|
||||
@@ -171,13 +173,14 @@ def _get_screenshot_path(fs_slug: str, rom_name: str, idx: str):
|
||||
return f"{fs_slug}/{rom_name}/screenshots/{idx}.jpg"
|
||||
|
||||
|
||||
def get_screenshots(fs_slug: str, rom_name: str, url_screenshots: list) -> dict:
|
||||
def get_rom_screenshots(fs_slug: str, rom_name: str, url_screenshots: list) -> dict:
|
||||
q_rom_name = quote(rom_name)
|
||||
|
||||
path_screenshots: list[str] = []
|
||||
for idx, url in enumerate(url_screenshots):
|
||||
_store_screenshot(fs_slug, rom_name, url, idx)
|
||||
path_screenshots.append(_get_screenshot_path(fs_slug, q_rom_name, str(idx)))
|
||||
|
||||
return {"path_screenshots": path_screenshots}
|
||||
|
||||
|
||||
@@ -303,6 +306,7 @@ def get_assets(fs_slug: str):
|
||||
|
||||
fs_saves: list[str] = []
|
||||
fs_states: list[str] = []
|
||||
fs_bios: list[str] = []
|
||||
|
||||
try:
|
||||
fs_saves = list(os.walk(saves_file_path))[0][2]
|
||||
@@ -317,12 +321,32 @@ def get_assets(fs_slug: str):
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
bios_path = get_fs_structure(fs_slug, folder=BIOS_FOLDER_NAME)
|
||||
bios_file_path = f"{LIBRARY_BASE_PATH}/{bios_path}"
|
||||
|
||||
try:
|
||||
fs_bios = list(os.walk(bios_file_path))[0][2]
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
return {
|
||||
"saves": fs_saves,
|
||||
"states": fs_states,
|
||||
"bios": fs_bios,
|
||||
}
|
||||
|
||||
|
||||
def get_screenshots():
|
||||
screenshots_path = f"{LIBRARY_BASE_PATH}/{SCREENSHOTS_FOLDER_NAME}"
|
||||
|
||||
try:
|
||||
fs_screenshots = list(os.walk(screenshots_path))[0][2]
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
return fs_screenshots
|
||||
|
||||
|
||||
def get_rom_file_size(
|
||||
roms_path: str, file_name: str, multi: bool, multi_files: list = []
|
||||
):
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import pytest
|
||||
|
||||
from ..fs import (
|
||||
get_cover,
|
||||
get_rom_cover,
|
||||
get_platforms,
|
||||
get_roms_structure,
|
||||
get_fs_structure,
|
||||
get_roms,
|
||||
get_rom_file_size,
|
||||
# get_screenshots # TODO: write test
|
||||
# get_rom_screenshots # TODO: write test
|
||||
# store_default_resources # TODO: write test
|
||||
# get_rom_files, # TODO: write test
|
||||
# rename_rom, # TODO: write test
|
||||
@@ -23,9 +23,9 @@ from config import (
|
||||
|
||||
|
||||
@pytest.mark.vcr
|
||||
def test_get_cover():
|
||||
def test_get_rom_cover():
|
||||
# Game: Paper Mario (USA).z64
|
||||
cover = get_cover(
|
||||
cover = get_rom_cover(
|
||||
overwrite=False,
|
||||
fs_slug="n64",
|
||||
rom_name="Paper Mario",
|
||||
@@ -35,7 +35,7 @@ def test_get_cover():
|
||||
assert DEFAULT_PATH_COVER_L in cover["path_cover_l"]
|
||||
|
||||
# Game: Paper Mario (USA).z64
|
||||
cover = get_cover(
|
||||
cover = get_rom_cover(
|
||||
overwrite=True,
|
||||
fs_slug="n64",
|
||||
rom_name="Paper Mario",
|
||||
@@ -46,7 +46,7 @@ def test_get_cover():
|
||||
assert "n64/Paper%20Mario/cover/big.png" in cover["path_cover_l"]
|
||||
|
||||
# Game: Super Mario 64 (J) (Rev A)
|
||||
cover = get_cover(
|
||||
cover = get_rom_cover(
|
||||
overwrite=False,
|
||||
fs_slug="n64",
|
||||
rom_name="Super Mario 64",
|
||||
@@ -57,7 +57,7 @@ def test_get_cover():
|
||||
assert "n64/Super%20Mario%2064/cover/big.png" in cover["path_cover_l"]
|
||||
|
||||
# Game: Disney's Kim Possible: What's the Switch?.zip
|
||||
cover = get_cover(
|
||||
cover = get_rom_cover(
|
||||
overwrite=False,
|
||||
fs_slug="ps2",
|
||||
rom_name="Disney's Kim Possible: What's the Switch?",
|
||||
@@ -74,7 +74,7 @@ def test_get_cover():
|
||||
)
|
||||
|
||||
# Game: Fake Game.xyz
|
||||
cover = get_cover(
|
||||
cover = get_rom_cover(
|
||||
overwrite=False,
|
||||
fs_slug="n64",
|
||||
rom_name="Fake Game",
|
||||
@@ -91,8 +91,8 @@ def test_get_platforms():
|
||||
assert "psx" in platforms
|
||||
|
||||
|
||||
def test_get_roms_structure():
|
||||
roms_structure = get_roms_structure(fs_slug="n64")
|
||||
def test_get_fs_structure():
|
||||
roms_structure = get_fs_structure(fs_slug="n64")
|
||||
|
||||
assert roms_structure == "n64/roms"
|
||||
|
||||
@@ -110,7 +110,7 @@ def test_get_roms():
|
||||
|
||||
def test_rom_size():
|
||||
rom_size = get_rom_file_size(
|
||||
roms_path=get_roms_structure(fs_slug="n64"),
|
||||
roms_path=get_fs_structure(fs_slug="n64"),
|
||||
file_name="Paper Mario (USA).z64",
|
||||
multi=False,
|
||||
)
|
||||
@@ -118,7 +118,7 @@ def test_rom_size():
|
||||
assert rom_size == (1.0, "KB")
|
||||
|
||||
rom_size = get_rom_file_size(
|
||||
roms_path=get_roms_structure(fs_slug="n64"),
|
||||
roms_path=get_fs_structure(fs_slug="n64"),
|
||||
file_name="Super Mario 64 (J) (Rev A)",
|
||||
multi=True,
|
||||
multi_files=[
|
||||
|
||||
@@ -102,17 +102,19 @@ onBeforeMount(async () => {
|
||||
>
|
||||
<v-tabs v-model="tab" slider-color="romm-accent-1" rounded="0">
|
||||
<v-tab value="details" rounded="0">Details</v-tab>
|
||||
<v-tab value="saves" rounded="0" disabled
|
||||
>Saves<span class="text-caption text-truncate ml-1"
|
||||
>[coming soon]</span
|
||||
></v-tab
|
||||
>
|
||||
<v-tab value="saves" rounded="0" disabled>
|
||||
Saves
|
||||
</v-tab>
|
||||
<v-tab value="states" rounded="0" disabled>
|
||||
States
|
||||
</v-tab>
|
||||
<v-tab
|
||||
v-if="rom.path_screenshots.length > 0"
|
||||
value="screenshots"
|
||||
rounded="0"
|
||||
>Screenshots</v-tab
|
||||
>
|
||||
Screenshots
|
||||
</v-tab>
|
||||
</v-tabs>
|
||||
</v-row>
|
||||
<v-row no-gutters>
|
||||
|
||||
Reference in New Issue
Block a user