mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
model and scanning
This commit is contained in:
@@ -10,6 +10,7 @@ from models.assets import Save, Screenshot, State # noqa
|
||||
from models.rom import Rom # noqa
|
||||
from models.platform import Platform # noqa
|
||||
from models.user import User # noqa
|
||||
from models.firmware import Firmware # noqa
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
|
||||
@@ -2,40 +2,40 @@
|
||||
|
||||
Revision ID: 0017_firmware
|
||||
Revises: 0016_user_last_login_active
|
||||
Create Date: 2024-04-10 13:50:39.208700
|
||||
Create Date: 2024-05-01 14:55:51.122514
|
||||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0017_firmware'
|
||||
down_revision = '0016_user_last_login_active'
|
||||
revision = "0017_firmware"
|
||||
down_revision = "0016_user_last_login_active"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('firmwares',
|
||||
sa.Column('platform_id', sa.Integer(), nullable=False),
|
||||
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('file_name', sa.String(length=450), nullable=False),
|
||||
sa.Column('file_name_no_tags', sa.String(length=450), nullable=False),
|
||||
sa.Column('file_name_no_ext', sa.String(length=450), nullable=False),
|
||||
sa.Column('file_extension', sa.String(length=100), nullable=False),
|
||||
sa.Column('file_path', sa.String(length=1000), nullable=False),
|
||||
sa.Column('file_size_bytes', sa.BigInteger(), nullable=False),
|
||||
sa.ForeignKeyConstraint(['platform_id'], ['platforms.id'], ondelete='CASCADE'),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
op.create_table(
|
||||
"firmware",
|
||||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column("platform_id", sa.Integer(), nullable=False),
|
||||
sa.Column("file_name", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_tags", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_name_no_ext", sa.String(length=450), nullable=False),
|
||||
sa.Column("file_extension", sa.String(length=100), nullable=False),
|
||||
sa.Column("file_path", sa.String(length=1000), nullable=False),
|
||||
sa.Column("file_size_bytes", sa.BigInteger(), nullable=False),
|
||||
sa.ForeignKeyConstraint(["platform_id"], ["platforms.id"], ondelete="CASCADE"),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('firmwares')
|
||||
op.drop_table("firmware")
|
||||
# ### end Alembic commands ###
|
||||
|
||||
@@ -37,6 +37,7 @@ class Config:
|
||||
PLATFORMS_BINDING: dict[str, str]
|
||||
PLATFORMS_VERSIONS: dict[str, str]
|
||||
ROMS_FOLDER_NAME: str
|
||||
FIRMWARE_FOLDER_NAME: str
|
||||
HIGH_PRIO_STRUCTURE_PATH: str
|
||||
|
||||
def __init__(self, **entries):
|
||||
@@ -127,6 +128,9 @@ class ConfigManager:
|
||||
ROMS_FOLDER_NAME=pydash.get(
|
||||
self._raw_config, "filesystem.roms_folder", "roms"
|
||||
),
|
||||
FIRMWARE_FOLDER_NAME=pydash.get(
|
||||
self._raw_config, "filesystem.firmware_folder", "bios"
|
||||
),
|
||||
)
|
||||
|
||||
def _validate_config(self):
|
||||
@@ -197,6 +201,18 @@ class ConfigManager:
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
if not isinstance(self.config.FIRMWARE_FOLDER_NAME, str):
|
||||
log.critical(
|
||||
"Invalid config.yml: filesystem.firmware_folder must be a string"
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
if self.config.FIRMWARE_FOLDER_NAME == "":
|
||||
log.critical(
|
||||
"Invalid config.yml: filesystem.firmware_folder cannot be an empty string"
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
def get_config(self) -> None:
|
||||
try:
|
||||
with open(self.config_file) as config_file:
|
||||
|
||||
@@ -28,6 +28,4 @@ system:
|
||||
|
||||
filesystem:
|
||||
roms_folder: 'ROMS'
|
||||
saves_folder: 'SAVES'
|
||||
states_folder: 'STATES'
|
||||
screenshots_folder: 'SCREENSHOTS'
|
||||
firmware_folder: 'BIOS'
|
||||
|
||||
@@ -18,6 +18,7 @@ def test_config_loader():
|
||||
assert loader.config.PLATFORMS_BINDING == {"gc": "ngc"}
|
||||
assert loader.config.PLATFORMS_VERSIONS == {"naomi": "arcade"}
|
||||
assert loader.config.ROMS_FOLDER_NAME == "ROMS"
|
||||
assert loader.config.FIRMWARE_FOLDER_NAME == "BIOS"
|
||||
|
||||
|
||||
def test_empty_config_loader():
|
||||
@@ -32,3 +33,4 @@ def test_empty_config_loader():
|
||||
assert loader.config.PLATFORMS_BINDING == {}
|
||||
assert loader.config.PLATFORMS_VERSIONS == {}
|
||||
assert loader.config.ROMS_FOLDER_NAME == "roms"
|
||||
assert loader.config.FIRMWARE_FOLDER_NAME == "bios"
|
||||
|
||||
@@ -11,4 +11,5 @@ class ConfigResponse(TypedDict):
|
||||
PLATFORMS_BINDING: dict[str, str]
|
||||
PLATFORMS_VERSIONS: dict[str, str]
|
||||
ROMS_FOLDER_NAME: str
|
||||
FIRMWARE_FOLDER_NAME: str
|
||||
HIGH_PRIO_STRUCTURE_PATH: str
|
||||
|
||||
21
backend/endpoints/responses/firmware.py
Normal file
21
backend/endpoints/responses/firmware.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class FirmwareSchema(BaseModel):
|
||||
id: int
|
||||
|
||||
platform_id: int
|
||||
platform_slug: str
|
||||
platform_name: str
|
||||
|
||||
file_name: str
|
||||
file_name_no_tags: str
|
||||
file_name_no_ext: str
|
||||
file_extension: str
|
||||
file_path: str
|
||||
file_size_bytes: int
|
||||
|
||||
full_path: str
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
@@ -2,17 +2,21 @@ import emoji
|
||||
import socketio # type: ignore
|
||||
from rq import Worker
|
||||
from rq.job import Job
|
||||
from endpoints.platform import PlatformSchema
|
||||
from endpoints.rom import RomSchema
|
||||
from endpoints.responses.platform import PlatformSchema
|
||||
from endpoints.responses.rom import RomSchema
|
||||
from endpoints.responses.firmware import FirmwareSchema
|
||||
from exceptions.fs_exceptions import (
|
||||
FolderStructureNotMatchException,
|
||||
RomsNotFoundException,
|
||||
FirmwareNotFoundException,
|
||||
)
|
||||
from handler import (
|
||||
db_platform_handler,
|
||||
db_rom_handler,
|
||||
db_firmware_handler,
|
||||
fs_platform_handler,
|
||||
fs_rom_handler,
|
||||
fs_firmware_handler,
|
||||
socket_handler,
|
||||
)
|
||||
from config import SCAN_TIMEOUT
|
||||
@@ -20,6 +24,7 @@ from handler.redis_handler import high_prio_queue, redis_url, redis_client
|
||||
from handler.scan_handler import (
|
||||
scan_platform,
|
||||
scan_rom,
|
||||
scan_firmware,
|
||||
ScanType,
|
||||
)
|
||||
from handler.metadata_handler.igdb_handler import IGDB_API_ENABLED
|
||||
@@ -35,6 +40,8 @@ class ScanStats:
|
||||
self.scanned_roms = 0
|
||||
self.added_roms = 0
|
||||
self.metadata_roms = 0
|
||||
self.scanned_firmware = 0
|
||||
self.added_firmware = 0
|
||||
|
||||
|
||||
def _get_socket_manager():
|
||||
@@ -110,6 +117,45 @@ async def scan_platforms(
|
||||
PlatformSchema.model_validate(platform).model_dump(),
|
||||
)
|
||||
|
||||
# Scanning firmware
|
||||
try:
|
||||
fs_firmware = fs_firmware_handler.get_firmware(platform)
|
||||
except FirmwareNotFoundException:
|
||||
continue
|
||||
|
||||
if len(fs_firmware) == 0:
|
||||
log.warning(
|
||||
" ⚠️ No firmware found, skipping firmware scan for this platform"
|
||||
)
|
||||
else:
|
||||
log.info(f" {len(fs_firmware)} firmware files found")
|
||||
|
||||
for fs_fw in fs_firmware:
|
||||
firmware = db_firmware_handler.get_firmware_by_filename(
|
||||
platform.id, fs_fw
|
||||
)
|
||||
|
||||
scanned_firmware = scan_firmware(
|
||||
platform=platform,
|
||||
file_name=fs_fw,
|
||||
firmware=firmware,
|
||||
)
|
||||
|
||||
scan_stats.scanned_firmware += 1
|
||||
scan_stats.added_firmware += 1 if not firmware else 0
|
||||
|
||||
_added_firmware = db_firmware_handler.add_firmware(scanned_firmware)
|
||||
firmware = db_firmware_handler.get_firmware(_added_firmware.id)
|
||||
|
||||
await sm.emit(
|
||||
"scan:scanning_firmware",
|
||||
{
|
||||
"platform_name": platform.name,
|
||||
"platform_slug": platform.slug,
|
||||
**FirmwareSchema.model_validate(firmware).model_dump(),
|
||||
},
|
||||
)
|
||||
|
||||
# Scanning roms
|
||||
try:
|
||||
fs_roms = fs_rom_handler.get_roms(platform)
|
||||
@@ -178,6 +224,7 @@ async def scan_platforms(
|
||||
db_rom_handler.purge_roms(
|
||||
platform.id, [rom["file_name"] for rom in fs_roms]
|
||||
)
|
||||
db_firmware_handler.purge_firmware(platform.id, [fw for fw in fs_firmware])
|
||||
db_platform_handler.purge_platforms(fs_platforms)
|
||||
|
||||
log.info(emoji.emojize(":check_mark: Scan completed "))
|
||||
|
||||
@@ -18,3 +18,4 @@ def test_config():
|
||||
assert config.get('EXCLUDED_MULTI_PARTS_FILES') == []
|
||||
assert config.get('PLATFORMS_BINDING') == {}
|
||||
assert config.get('ROMS_FOLDER_NAME') == 'roms'
|
||||
assert config.get('FIRMWARE_FOLDER_NAME') == 'bios'
|
||||
|
||||
@@ -44,3 +44,19 @@ class RomAlreadyExistsException(Exception):
|
||||
|
||||
def __repr__(self):
|
||||
return self.message
|
||||
|
||||
class FirmwareNotFoundException(Exception):
|
||||
def __init__(self, platform: str):
|
||||
self.message = f"Firmware not found for platform {platform}. {folder_struct_msg}"
|
||||
super().__init__(self.message)
|
||||
|
||||
def __repr__(self):
|
||||
return self.message
|
||||
|
||||
class FirmwareAlreadyExistsException(Exception):
|
||||
def __init__(self, firmware_name: str):
|
||||
self.message = f"Can't rename: {firmware_name} already exists"
|
||||
super().__init__(self.message)
|
||||
|
||||
def __repr__(self):
|
||||
return self.message
|
||||
|
||||
@@ -6,10 +6,12 @@ from handler.db_handler.db_states_handler import DBStatesHandler
|
||||
from handler.db_handler.db_users_handler import DBUsersHandler
|
||||
from handler.db_handler.db_stats_handler import DBStatsHandler
|
||||
from handler.db_handler.db_screenshots_handler import DBScreenshotsHandler
|
||||
from handler.db_handler.db_firmware_handler import DBFirmwareHandler
|
||||
from handler.fs_handler.fs_assets_handler import FSAssetsHandler
|
||||
from handler.fs_handler.fs_platforms_handler import FSPlatformsHandler
|
||||
from handler.fs_handler.fs_resources_handler import FSResourceHandler
|
||||
from handler.fs_handler.fs_roms_handler import FSRomsHandler
|
||||
from handler.fs_handler.fs_firmware_handler import FSFirmwareHandler
|
||||
from handler.gh_handler import GHHandler
|
||||
from handler.metadata_handler.igdb_handler import IGDBHandler
|
||||
from handler.metadata_handler.moby_handler import MobyGamesHandler
|
||||
@@ -30,7 +32,9 @@ db_state_handler = DBStatesHandler()
|
||||
db_screenshot_handler = DBScreenshotsHandler()
|
||||
db_user_handler = DBUsersHandler()
|
||||
db_stats_handler = DBStatsHandler()
|
||||
db_firmware_handler = DBFirmwareHandler()
|
||||
fs_platform_handler = FSPlatformsHandler()
|
||||
fs_rom_handler = FSRomsHandler()
|
||||
fs_asset_handler = FSAssetsHandler()
|
||||
fs_resource_handler = FSResourceHandler()
|
||||
fs_firmware_handler = FSFirmwareHandler()
|
||||
|
||||
63
backend/handler/db_handler/db_firmware_handler.py
Normal file
63
backend/handler/db_handler/db_firmware_handler.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from decorators.database import begin_session
|
||||
from handler.db_handler import DBHandler
|
||||
from models.firmware import Firmware
|
||||
from sqlalchemy import update, delete, and_
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
class DBFirmwareHandler(DBHandler):
|
||||
@begin_session
|
||||
def add_firmware(self, firmware: Firmware, session: Session = None):
|
||||
return session.merge(firmware)
|
||||
|
||||
@begin_session
|
||||
def get_firmware(
|
||||
self, id: int = None, platform_id: int = None, session: Session = None
|
||||
):
|
||||
return (
|
||||
session.get(Firmware, id)
|
||||
if id
|
||||
else session.query(Firmware).filter_by(platform_id=platform_id).all()
|
||||
)
|
||||
|
||||
@begin_session
|
||||
def get_firmware_by_filename(
|
||||
self, platform_id: int, file_name: str, session: Session = None
|
||||
):
|
||||
return (
|
||||
session.query(Firmware)
|
||||
.filter_by(platform_id=platform_id, file_name=file_name)
|
||||
.first()
|
||||
)
|
||||
|
||||
@begin_session
|
||||
def update_firmware(self, id: int, data: dict, session: Session = None):
|
||||
return session.execute(
|
||||
update(Firmware)
|
||||
.where(Firmware.id == id)
|
||||
.values(**data)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
@begin_session
|
||||
def delete_firmware(self, id: int, session: Session = None):
|
||||
return session.execute(
|
||||
delete(Firmware)
|
||||
.where(Firmware.id == id)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
|
||||
@begin_session
|
||||
def purge_firmware(
|
||||
self, platform_id: int, firmware: list[str], session: Session = None
|
||||
):
|
||||
return session.execute(
|
||||
delete(Firmware)
|
||||
.where(
|
||||
and_(
|
||||
Firmware.platform_id == platform_id,
|
||||
Firmware.file_name.not_in(firmware),
|
||||
)
|
||||
)
|
||||
.execution_options(synchronize_session="evaluate")
|
||||
)
|
||||
@@ -87,13 +87,21 @@ class FSHandler(ABC):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def get_fs_structure(self, fs_slug: str) -> str:
|
||||
def get_roms_fs_structure(self, fs_slug: str) -> str:
|
||||
cnfg = cm.get_config()
|
||||
return (
|
||||
f"{cnfg.ROMS_FOLDER_NAME}/{fs_slug}"
|
||||
if os.path.exists(cnfg.HIGH_PRIO_STRUCTURE_PATH)
|
||||
else f"{fs_slug}/{cnfg.ROMS_FOLDER_NAME}"
|
||||
)
|
||||
|
||||
def get_firmware_fs_structure(self, fs_slug: str) -> str:
|
||||
cnfg = cm.get_config()
|
||||
return (
|
||||
f"{cnfg.FIRMWARE_FOLDER_NAME}/{fs_slug}"
|
||||
if os.path.exists(cnfg.HIGH_PRIO_STRUCTURE_PATH)
|
||||
else f"{fs_slug}/{cnfg.FIRMWARE_FOLDER_NAME}"
|
||||
)
|
||||
|
||||
def get_file_name_with_no_extension(self, file_name: str) -> str:
|
||||
return re.sub(EXTENSION_REGEX, "", file_name).strip()
|
||||
|
||||
60
backend/handler/fs_handler/fs_firmware_handler.py
Normal file
60
backend/handler/fs_handler/fs_firmware_handler.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from handler.fs_handler import FSHandler
|
||||
from exceptions.fs_exceptions import (
|
||||
FirmwareNotFoundException,
|
||||
FirmwareAlreadyExistsException,
|
||||
)
|
||||
from config import LIBRARY_BASE_PATH
|
||||
from models.platform import Platform
|
||||
|
||||
|
||||
class FSFirmwareHandler(FSHandler):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def remove_file(self, file_name: str, file_path: str):
|
||||
try:
|
||||
os.remove(f"{LIBRARY_BASE_PATH}/{file_path}/{file_name}")
|
||||
except IsADirectoryError:
|
||||
shutil.rmtree(f"{LIBRARY_BASE_PATH}/{file_path}/{file_name}")
|
||||
|
||||
def get_firmware(self, platform: Platform):
|
||||
"""Gets all filesystem firmware for a platform
|
||||
|
||||
Args:
|
||||
platform: platform where firmware belong
|
||||
Returns:
|
||||
list with all the filesystem firmware for a platform found in the LIBRARY_BASE_PATH
|
||||
"""
|
||||
firmware_path = self.get_firmware_fs_structure(platform.fs_slug)
|
||||
firmware_file_path = f"{LIBRARY_BASE_PATH}/{firmware_path}"
|
||||
|
||||
try:
|
||||
fs_firmware_files: list[str] = list(os.walk(firmware_file_path))[0][2]
|
||||
except IndexError as exc:
|
||||
raise FirmwareNotFoundException(platform.fs_slug) from exc
|
||||
|
||||
return fs_firmware_files
|
||||
|
||||
def get_firmware_file_size(self, firmware_path: str, file_name: str):
|
||||
files = [f"{LIBRARY_BASE_PATH}/{firmware_path}/{file_name}"]
|
||||
return sum([os.stat(file).st_size for file in files])
|
||||
|
||||
def file_exists(self, path: str, file_name: str):
|
||||
return bool(os.path.exists(f"{LIBRARY_BASE_PATH}/{path}/{file_name}"))
|
||||
|
||||
def rename_file(self, old_name: str, new_name: str, file_path: str):
|
||||
if new_name != old_name:
|
||||
if self.file_exists(path=file_path, file_name=new_name):
|
||||
raise FirmwareAlreadyExistsException(new_name)
|
||||
|
||||
os.rename(
|
||||
f"{LIBRARY_BASE_PATH}/{file_path}/{old_name}",
|
||||
f"{LIBRARY_BASE_PATH}/{file_path}/{new_name}",
|
||||
)
|
||||
|
||||
def build_upload_file_path(self, fs_slug: str):
|
||||
file_path = self.get_firmware_fs_structure(fs_slug)
|
||||
return f"{LIBRARY_BASE_PATH}/{file_path}"
|
||||
@@ -125,7 +125,7 @@ class FSRomsHandler(FSHandler):
|
||||
Returns:
|
||||
list with all the filesystem roms for a platform found in the LIBRARY_BASE_PATH
|
||||
"""
|
||||
roms_path = self.get_fs_structure(platform.fs_slug)
|
||||
roms_path = self.get_roms_fs_structure(platform.fs_slug)
|
||||
roms_file_path = f"{LIBRARY_BASE_PATH}/{roms_path}"
|
||||
|
||||
try:
|
||||
@@ -189,5 +189,5 @@ class FSRomsHandler(FSHandler):
|
||||
)
|
||||
|
||||
def build_upload_file_path(self, fs_slug: str):
|
||||
file_path = self.get_fs_structure(fs_slug)
|
||||
file_path = self.get_roms_fs_structure(fs_slug)
|
||||
return f"{LIBRARY_BASE_PATH}/{file_path}"
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from handler import fs_resource_handler, fs_platform_handler, fs_rom_handler
|
||||
from models.platform import Platform
|
||||
@@ -74,8 +73,8 @@ def test_get_platforms():
|
||||
assert "psx" in platforms
|
||||
|
||||
|
||||
def test_get_fs_structure():
|
||||
roms_structure = fs_rom_handler.get_fs_structure(fs_slug="n64")
|
||||
def test_get_roms_fs_structure():
|
||||
roms_structure = fs_rom_handler.get_roms_fs_structure(fs_slug="n64")
|
||||
|
||||
assert roms_structure == "n64/roms"
|
||||
|
||||
@@ -94,7 +93,7 @@ def test_get_roms():
|
||||
|
||||
def test_rom_size():
|
||||
rom_size = fs_rom_handler.get_rom_file_size(
|
||||
roms_path=fs_rom_handler.get_fs_structure(fs_slug="n64"),
|
||||
roms_path=fs_rom_handler.get_roms_fs_structure(fs_slug="n64"),
|
||||
file_name="Paper Mario (USA).z64",
|
||||
multi=False,
|
||||
)
|
||||
@@ -102,7 +101,7 @@ def test_rom_size():
|
||||
assert rom_size == 1024
|
||||
|
||||
rom_size = fs_rom_handler.get_rom_file_size(
|
||||
roms_path=fs_rom_handler.get_fs_structure(fs_slug="n64"),
|
||||
roms_path=fs_rom_handler.get_roms_fs_structure(fs_slug="n64"),
|
||||
file_name="Super Mario 64 (J) (Rev A)",
|
||||
multi=True,
|
||||
multi_files=[
|
||||
|
||||
@@ -7,6 +7,7 @@ from handler import (
|
||||
fs_asset_handler,
|
||||
fs_resource_handler,
|
||||
fs_rom_handler,
|
||||
fs_firmware_handler,
|
||||
igdb_handler,
|
||||
moby_handler,
|
||||
)
|
||||
@@ -15,6 +16,7 @@ from models.assets import Save, Screenshot, State
|
||||
from models.platform import Platform
|
||||
from models.rom import Rom
|
||||
from models.user import User
|
||||
from models.firmware import Firmware
|
||||
|
||||
|
||||
class ScanType(Enum):
|
||||
@@ -83,7 +85,7 @@ def scan_platform(
|
||||
platform_attrs["slug"] = fs_slug
|
||||
|
||||
igdb_platform = igdb_handler.get_platform(platform_attrs["slug"])
|
||||
moby_platform = moby_handler.get_platform(platform_attrs["slug"])
|
||||
moby_platform = moby_handler.get_platform(platform_attrs["slug"])
|
||||
|
||||
platform_attrs["name"] = platform_attrs["slug"].replace("-", " ").title()
|
||||
platform_attrs.update({**moby_platform, **igdb_platform}) # Reverse order
|
||||
@@ -98,6 +100,46 @@ def scan_platform(
|
||||
return Platform(**platform_attrs)
|
||||
|
||||
|
||||
def scan_firmware(
|
||||
platform: Platform,
|
||||
file_name: str,
|
||||
firmware: Firmware | None = None,
|
||||
) -> Firmware:
|
||||
firmware_path = fs_firmware_handler.get_firmware_fs_structure(platform.fs_slug)
|
||||
|
||||
log.info(f"\t · {file_name}")
|
||||
|
||||
# Set default properties
|
||||
firmware_attrs = {
|
||||
"id": firmware.id if firmware else None,
|
||||
"platform_id": platform.id,
|
||||
}
|
||||
|
||||
file_size = fs_firmware_handler.get_firmware_file_size(
|
||||
firmware_path=firmware_path,
|
||||
file_name=file_name,
|
||||
)
|
||||
|
||||
firmware_attrs.update(
|
||||
{
|
||||
"file_path": firmware_path,
|
||||
"file_name": file_name,
|
||||
"file_name_no_tags": fs_firmware_handler.get_file_name_with_no_tags(
|
||||
file_name
|
||||
),
|
||||
"file_name_no_ext": fs_firmware_handler.get_file_name_with_no_extension(
|
||||
file_name
|
||||
),
|
||||
"file_extension": fs_firmware_handler.parse_file_extension(
|
||||
file_name
|
||||
),
|
||||
"file_size_bytes": file_size,
|
||||
}
|
||||
)
|
||||
|
||||
return Firmware(**firmware_attrs)
|
||||
|
||||
|
||||
async def scan_rom(
|
||||
platform: Platform,
|
||||
rom_attrs: dict,
|
||||
@@ -105,7 +147,7 @@ async def scan_rom(
|
||||
rom: Rom | None = None,
|
||||
metadata_sources: list[str] = ["igdb", "moby"],
|
||||
) -> Rom:
|
||||
roms_path = fs_rom_handler.get_fs_structure(platform.fs_slug)
|
||||
roms_path = fs_rom_handler.get_roms_fs_structure(platform.fs_slug)
|
||||
|
||||
log.info(f"\t · {rom_attrs['file_name']}")
|
||||
|
||||
@@ -222,8 +264,17 @@ async def scan_rom(
|
||||
if (
|
||||
not rom
|
||||
or scan_type == ScanType.COMPLETE
|
||||
or (scan_type == ScanType.PARTIAL and rom and (not rom.igdb_id or not rom.moby_id))
|
||||
or (scan_type == ScanType.UNIDENTIFIED and rom and not rom.igdb_id and not rom.moby_id)
|
||||
or (
|
||||
scan_type == ScanType.PARTIAL
|
||||
and rom
|
||||
and (not rom.igdb_id or not rom.moby_id)
|
||||
)
|
||||
or (
|
||||
scan_type == ScanType.UNIDENTIFIED
|
||||
and rom
|
||||
and not rom.igdb_id
|
||||
and not rom.moby_id
|
||||
)
|
||||
):
|
||||
rom_attrs.update(
|
||||
fs_resource_handler.get_rom_cover(
|
||||
|
||||
@@ -33,23 +33,6 @@ class BaseAsset(BaseModel):
|
||||
@cached_property
|
||||
def download_path(self) -> str:
|
||||
return f"/api/raw/assets/{self.full_path}?timestamp={self.updated_at}"
|
||||
|
||||
|
||||
class PlatformAsset(BaseAsset):
|
||||
__abstract__ = True
|
||||
|
||||
platform_id = Column(
|
||||
Integer(), ForeignKey("platforms.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
|
||||
|
||||
class Firmware(PlatformAsset):
|
||||
# Represents a BIOS or firmware file
|
||||
|
||||
__tablename__ = "firmwares"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
platform = relationship("Platform", lazy="selectin", back_populates="firmwares")
|
||||
|
||||
|
||||
class RomAsset(BaseAsset):
|
||||
|
||||
48
backend/models/firmware.py
Normal file
48
backend/models/firmware.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from functools import cached_property
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
ForeignKey,
|
||||
Integer,
|
||||
String,
|
||||
BigInteger,
|
||||
)
|
||||
|
||||
from models.base import BaseModel
|
||||
|
||||
|
||||
class Firmware(BaseModel):
|
||||
__tablename__ = "firmware"
|
||||
|
||||
id = Column(Integer(), primary_key=True, autoincrement=True)
|
||||
platform_id = Column(
|
||||
Integer(), ForeignKey("platforms.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
|
||||
file_name = Column(String(length=450), nullable=False)
|
||||
file_name_no_tags = Column(String(length=450), nullable=False)
|
||||
file_name_no_ext = Column(String(length=450), nullable=False)
|
||||
file_extension = Column(String(length=100), nullable=False)
|
||||
file_path = Column(String(length=1000), nullable=False)
|
||||
file_size_bytes = Column(BigInteger(), default=0, nullable=False)
|
||||
|
||||
platform = relationship("Platform", lazy="selectin", back_populates="firmware")
|
||||
|
||||
@property
|
||||
def platform_slug(self) -> str:
|
||||
return self.platform.slug
|
||||
|
||||
@property
|
||||
def platform_fs_slug(self) -> str:
|
||||
return self.platform.fs_slug
|
||||
|
||||
@property
|
||||
def platform_name(self) -> str:
|
||||
return self.platform.name
|
||||
|
||||
@cached_property
|
||||
def full_path(self) -> str:
|
||||
return f"{self.file_path}/{self.file_name}"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.file_name
|
||||
@@ -1,5 +1,6 @@
|
||||
from models.base import BaseModel
|
||||
from models.rom import Rom
|
||||
from models.firmware import Firmware
|
||||
from sqlalchemy import Column, Integer, String
|
||||
from sqlalchemy.orm import Mapped, relationship
|
||||
|
||||
@@ -19,6 +20,9 @@ class Platform(BaseModel):
|
||||
roms: Mapped[set[Rom]] = relationship(
|
||||
"Rom", lazy="selectin", back_populates="platform"
|
||||
)
|
||||
firmware: Mapped[set[Firmware]] = relationship(
|
||||
"Firmware", lazy="selectin", back_populates="platform"
|
||||
)
|
||||
|
||||
@property
|
||||
def rom_count(self) -> int:
|
||||
|
||||
@@ -13,6 +13,7 @@ export type ConfigResponse = {
|
||||
PLATFORMS_BINDING: Record<string, string>;
|
||||
PLATFORMS_VERSIONS: Record<string, string>;
|
||||
ROMS_FOLDER_NAME: string;
|
||||
FIRMWARE_FOLDER_NAME: string;
|
||||
HIGH_PRIO_STRUCTURE_PATH: string;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user