mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 14:56:01 +00:00
@@ -3,7 +3,6 @@ import sys
|
||||
from pathlib import Path
|
||||
from typing import Final
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import pydash
|
||||
import yaml
|
||||
from config import (
|
||||
@@ -69,7 +68,7 @@ class ConfigManager:
|
||||
with open(config_file, "w") as file:
|
||||
file.write("")
|
||||
try:
|
||||
self.read_config()
|
||||
self.get_config()
|
||||
except ConfigNotReadableException as e:
|
||||
log.critical(e.message)
|
||||
sys.exit(5)
|
||||
@@ -96,9 +95,7 @@ class ConfigManager:
|
||||
|
||||
# DEPRECATED
|
||||
if ROMM_DB_DRIVER == "sqlite":
|
||||
log.critical(
|
||||
"Sqlite is not supported anymore, migrate to mariaDB"
|
||||
)
|
||||
log.critical("Sqlite is not supported anymore, migrate to mariaDB")
|
||||
sys.exit(6)
|
||||
# DEPRECATED
|
||||
|
||||
@@ -200,7 +197,7 @@ class ConfigManager:
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
def read_config(self) -> None:
|
||||
def get_config(self) -> None:
|
||||
try:
|
||||
with open(self.config_file) as config_file:
|
||||
self._raw_config = yaml.load(config_file, Loader=SafeLoader) or {}
|
||||
@@ -209,10 +206,37 @@ class ConfigManager:
|
||||
except PermissionError:
|
||||
self._raw_config = {}
|
||||
raise ConfigNotReadableException
|
||||
|
||||
self._parse_config()
|
||||
self._validate_config()
|
||||
|
||||
def update_config(self) -> None:
|
||||
return self.config
|
||||
|
||||
def update_config_file(self) -> None:
|
||||
self._raw_config = {
|
||||
"exclude": {
|
||||
"platforms": self.config.EXCLUDED_PLATFORMS,
|
||||
"roms": {
|
||||
"single_file": {
|
||||
"extensions": self.config.EXCLUDED_SINGLE_EXT,
|
||||
"names": self.config.EXCLUDED_SINGLE_FILES,
|
||||
},
|
||||
"multi_file": {
|
||||
"names": self.config.EXCLUDED_MULTI_FILES,
|
||||
"parts": {
|
||||
"extensions": self.config.EXCLUDED_MULTI_PARTS_EXT,
|
||||
"names": self.config.EXCLUDED_MULTI_PARTS_FILES,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"filesystem": {"roms_folder": self.config.ROMS_FOLDER_NAME},
|
||||
"system": {
|
||||
"platforms": self.config.PLATFORMS_BINDING,
|
||||
"versions": self.config.PLATFORMS_VERSIONS,
|
||||
},
|
||||
}
|
||||
|
||||
try:
|
||||
with open(self.config_file, "w") as config_file:
|
||||
yaml.dump(self._raw_config, config_file)
|
||||
@@ -221,66 +245,65 @@ class ConfigManager:
|
||||
except PermissionError:
|
||||
self._raw_config = {}
|
||||
raise ConfigNotWritableException
|
||||
finally:
|
||||
self._parse_config()
|
||||
|
||||
def add_binding(self, fs_slug: str, slug: str) -> None:
|
||||
try:
|
||||
_ = self._raw_config["system"]
|
||||
except KeyError:
|
||||
self._raw_config = {"system": {"platforms": {}}}
|
||||
try:
|
||||
_ = self._raw_config["system"]["platforms"]
|
||||
except KeyError:
|
||||
self._raw_config["system"]["platforms"] = {}
|
||||
self._raw_config["system"]["platforms"][fs_slug] = slug
|
||||
self.update_config()
|
||||
def add_platform_binding(self, fs_slug: str, slug: str) -> None:
|
||||
platform_bindings = self.config.PLATFORMS_BINDING
|
||||
if fs_slug in platform_bindings:
|
||||
log.warn(f"Binding for {fs_slug} already exists")
|
||||
return
|
||||
|
||||
platform_bindings[fs_slug] = slug
|
||||
self.config.PLATFORMS_BINDING = platform_bindings
|
||||
self.update_config_file()
|
||||
|
||||
def remove_platform_binding(self, fs_slug: str) -> None:
|
||||
platform_bindings = self.config.PLATFORMS_BINDING
|
||||
|
||||
def remove_binding(self, fs_slug: str) -> None:
|
||||
try:
|
||||
del self._raw_config["system"]["platforms"][fs_slug]
|
||||
del platform_bindings[fs_slug]
|
||||
except KeyError:
|
||||
pass
|
||||
self.update_config()
|
||||
|
||||
def add_version(self, fs_slug: str, slug: str) -> None:
|
||||
try:
|
||||
_ = self._raw_config["system"]
|
||||
except KeyError:
|
||||
self._raw_config = {"system": {"versions": {}}}
|
||||
try:
|
||||
_ = self._raw_config["system"]["versions"]
|
||||
except KeyError:
|
||||
self._raw_config["system"]["versions"] = {}
|
||||
self._raw_config["system"]["versions"][fs_slug] = slug
|
||||
self.update_config()
|
||||
self.config.PLATFORMS_BINDING = platform_bindings
|
||||
self.update_config_file()
|
||||
|
||||
def add_platform_version(self, fs_slug: str, slug: str) -> None:
|
||||
platform_versions = self.config.PLATFORMS_VERSIONS
|
||||
if fs_slug in platform_versions:
|
||||
log.warn(f"Version for {fs_slug} already exists")
|
||||
return
|
||||
|
||||
platform_versions[fs_slug] = slug
|
||||
self.config.PLATFORMS_VERSIONS = platform_versions
|
||||
self.update_config_file()
|
||||
|
||||
def remove_platform_version(self, fs_slug: str) -> None:
|
||||
platform_versions = self.config.PLATFORMS_VERSIONS
|
||||
|
||||
def remove_version(self, fs_slug: str) -> None:
|
||||
try:
|
||||
del self._raw_config["system"]["versions"][fs_slug]
|
||||
del platform_versions[fs_slug]
|
||||
except KeyError:
|
||||
pass
|
||||
self.update_config()
|
||||
|
||||
# def _get_exclude_path(self, exclude):
|
||||
# exclude_base = self._raw_config["exclude"]
|
||||
# exclusions = {
|
||||
# "platforms": exclude_base["platforms"],
|
||||
# "single_ext": exclude_base["roms"]["single_file"]["extensions"],
|
||||
# "single_file": exclude_base["roms"]["single_file"]["names"],
|
||||
# "multi_file": exclude_base["roms"]["multi_file"]["names"],
|
||||
# "multi_part_ext": exclude_base["roms"]["multi_file"]["parts"]["extensions"],
|
||||
# "multi_part_file": exclude_base["roms"]["multi_file"]["parts"]["names"],
|
||||
# }
|
||||
# return exclusions[exclude]
|
||||
self.config.PLATFORMS_VERSIONS = platform_versions
|
||||
self.update_config_file()
|
||||
|
||||
# def add_exclusion(self, exclude: str, exclusion: str):
|
||||
# config = self._get_exclude_path(exclude)
|
||||
# config.append(exclusion)
|
||||
def add_exclusion(self, config_key: str, exclusion: str):
|
||||
config_item = self.config.__getattribute__(config_key)
|
||||
config_item.append(exclusion)
|
||||
self.config.__setattr__(config_key, config_item)
|
||||
self.update_config_file()
|
||||
|
||||
# def remove_exclusion(self, exclude: str, exclusion: str):
|
||||
# config = self._get_exclude_path(exclude)
|
||||
# config.remove(exclusion)
|
||||
def remove_exclusion(self, config_key: str, exclusion: str):
|
||||
config_item = self.config.__getattribute__(config_key)
|
||||
|
||||
try:
|
||||
config_item.remove(exclusion)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
self.config.__setattr__(config_key, config_item)
|
||||
self.update_config_file()
|
||||
|
||||
|
||||
config_manager = ConfigManager()
|
||||
|
||||
@@ -21,15 +21,13 @@ def get_config() -> ConfigResponse:
|
||||
"""
|
||||
|
||||
try:
|
||||
cm.read_config()
|
||||
return cm.get_config().__dict__
|
||||
except ConfigNotReadableException as e:
|
||||
log.critical(e.message)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e.message
|
||||
)
|
||||
|
||||
return cm.config.__dict__
|
||||
|
||||
|
||||
@protected_route(router.post, "/config/system/platforms", ["platforms.write"])
|
||||
async def add_platform_binding(request: Request) -> MessageResponse:
|
||||
@@ -40,7 +38,7 @@ async def add_platform_binding(request: Request) -> MessageResponse:
|
||||
slug = data["slug"]
|
||||
|
||||
try:
|
||||
cm.add_binding(fs_slug, slug)
|
||||
cm.add_platform_binding(fs_slug, slug)
|
||||
except ConfigNotWritableException as e:
|
||||
log.critical(e.message)
|
||||
raise HTTPException(
|
||||
@@ -57,7 +55,7 @@ async def delete_platform_binding(request: Request, fs_slug: str) -> MessageResp
|
||||
"""Delete platform binding from the configuration"""
|
||||
|
||||
try:
|
||||
cm.remove_binding(fs_slug)
|
||||
cm.remove_platform_binding(fs_slug)
|
||||
except ConfigNotWritableException as e:
|
||||
log.critical(e.message)
|
||||
raise HTTPException(
|
||||
@@ -76,7 +74,7 @@ async def add_platform_version(request: Request) -> MessageResponse:
|
||||
slug = data["slug"]
|
||||
|
||||
try:
|
||||
cm.add_version(fs_slug, slug)
|
||||
cm.add_platform_version(fs_slug, slug)
|
||||
except ConfigNotWritableException as e:
|
||||
log.critical(e.message)
|
||||
raise HTTPException(
|
||||
@@ -93,7 +91,7 @@ async def delete_platform_version(request: Request, fs_slug: str) -> MessageResp
|
||||
"""Delete platform version from the configuration"""
|
||||
|
||||
try:
|
||||
cm.remove_version(fs_slug)
|
||||
cm.remove_platform_version(fs_slug)
|
||||
except ConfigNotWritableException as e:
|
||||
log.critical(e.message)
|
||||
raise HTTPException(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from stat import S_IFREG
|
||||
from typing import Annotated, Optional
|
||||
@@ -24,7 +25,7 @@ from handler import (
|
||||
)
|
||||
from handler.fs_handler import CoverSize
|
||||
from logger.logger import log
|
||||
from stream_zip import ZIP_64, stream_zip # type: ignore[import]
|
||||
from stream_zip import ZIP_AUTO, stream_zip # type: ignore[import]
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -130,13 +131,14 @@ def get_rom(request: Request, id: int) -> RomSchema:
|
||||
return RomSchema.from_orm_with_request(db_rom_handler.get_roms(id), request)
|
||||
|
||||
|
||||
@protected_route(router.head, "/roms/{id}/content", ["roms.read"])
|
||||
def head_rom_content(request: Request, id: int):
|
||||
@protected_route(router.head, "/roms/{id}/content/{file_name}", ["roms.read"])
|
||||
def head_rom_content(request: Request, id: int, file_name: str):
|
||||
"""Head rom content endpoint
|
||||
|
||||
Args:
|
||||
request (Request): Fastapi Request object
|
||||
id (int): Rom internal id
|
||||
file_name (str): Required due to a bug in emulatorjs
|
||||
|
||||
Returns:
|
||||
FileResponse: Returns the response with headers
|
||||
@@ -147,7 +149,7 @@ def head_rom_content(request: Request, id: int):
|
||||
|
||||
return FileResponse(
|
||||
path=rom_path if not rom.multi else f"{rom_path}/{rom.files[0]}",
|
||||
filename=rom.file_name,
|
||||
filename=file_name,
|
||||
headers={
|
||||
"Content-Disposition": f"attachment; filename={rom.name}.zip",
|
||||
"Content-Type": "application/zip",
|
||||
@@ -156,9 +158,12 @@ def head_rom_content(request: Request, id: int):
|
||||
)
|
||||
|
||||
|
||||
@protected_route(router.get, "/roms/{id}/content", ["roms.read"])
|
||||
@protected_route(router.get, "/roms/{id}/content/{file_name}", ["roms.read"])
|
||||
def get_rom_content(
|
||||
request: Request, id: int, files: Annotated[list[str] | None, Query()] = None
|
||||
request: Request,
|
||||
id: int,
|
||||
file_name: str,
|
||||
files: Annotated[list[str] | None, Query()] = None,
|
||||
):
|
||||
"""Download rom endpoint (one single file or multiple zipped files for multi-part roms)
|
||||
|
||||
@@ -178,28 +183,35 @@ def get_rom_content(
|
||||
rom_path = f"{LIBRARY_BASE_PATH}/{rom.full_path}"
|
||||
|
||||
if not rom.multi:
|
||||
return FileResponse(path=rom_path, filename=rom.file_name)
|
||||
return FileResponse(path=rom_path, filename=file_name)
|
||||
|
||||
# Builds a generator of tuples for each member file
|
||||
def local_files():
|
||||
def contents(file_name):
|
||||
def contents(f):
|
||||
try:
|
||||
with open(f"{rom_path}/{file_name}", "rb") as f:
|
||||
with open(f"{rom_path}/{f}", "rb") as f:
|
||||
while chunk := f.read(65536):
|
||||
yield chunk
|
||||
except FileNotFoundError:
|
||||
log.error(f"File {rom_path}/{file_name} not found!")
|
||||
log.error(f"File {rom_path}/{f} not found!")
|
||||
|
||||
m3u_file = [str.encode(f"{rom.files[i]}\n") for i in range(len(rom.files))]
|
||||
return [
|
||||
(file_name, datetime.now(), S_IFREG | 0o600, ZIP_64, contents(file_name))
|
||||
for file_name in rom.files
|
||||
] + [
|
||||
(
|
||||
f"{rom.file_name}.m3u",
|
||||
f,
|
||||
datetime.now(),
|
||||
S_IFREG | 0o600,
|
||||
ZIP_64,
|
||||
[str.encode(f"{rom.files[i]}\n") for i in range(len(rom.files))],
|
||||
ZIP_AUTO(os.path.getsize(f"{rom_path}/{f}")),
|
||||
contents(f),
|
||||
)
|
||||
for f in rom.files
|
||||
] + [
|
||||
(
|
||||
f"{file_name}.m3u",
|
||||
datetime.now(),
|
||||
S_IFREG | 0o600,
|
||||
ZIP_AUTO(sum([len(f) for f in m3u_file])),
|
||||
m3u_file,
|
||||
)
|
||||
]
|
||||
|
||||
@@ -209,7 +221,7 @@ def get_rom_content(
|
||||
return CustomStreamingResponse(
|
||||
zipped_chunks,
|
||||
media_type="application/zip",
|
||||
headers={"Content-Disposition": f"attachment; filename={rom.file_name}.zip"},
|
||||
headers={"Content-Disposition": f"attachment; filename={file_name}.zip"},
|
||||
emit_body={"id": rom.id},
|
||||
)
|
||||
|
||||
|
||||
@@ -43,86 +43,93 @@ async def scan_platforms(
|
||||
|
||||
sm = _get_socket_manager()
|
||||
|
||||
# Scanning file system
|
||||
try:
|
||||
fs_platforms: list[str] = fs_platform_handler.get_platforms()
|
||||
except FolderStructureNotMatchException as e:
|
||||
log.error(e)
|
||||
await sm.emit("scan:done_ko", e.message)
|
||||
return
|
||||
|
||||
platform_list = [
|
||||
db_platform_handler.get_platforms(s).fs_slug for s in platform_ids
|
||||
] or fs_platforms
|
||||
|
||||
if len(platform_list) == 0:
|
||||
log.warn(
|
||||
"⚠️ No platforms found, verify that the folder structure is right and the volume is mounted correctly "
|
||||
)
|
||||
else:
|
||||
log.info(f"Found {len(platform_list)} platforms in file system ")
|
||||
|
||||
for platform_slug in platform_list:
|
||||
platform = db_platform_handler.get_platform_by_fs_slug(platform_slug)
|
||||
scanned_platform = scan_platform(platform_slug, fs_platforms)
|
||||
|
||||
if platform:
|
||||
scanned_platform.id = platform.id
|
||||
|
||||
platform = db_platform_handler.add_platform(scanned_platform)
|
||||
|
||||
await sm.emit(
|
||||
"scan:scanning_platform",
|
||||
PlatformSchema.model_validate(platform).model_dump(),
|
||||
)
|
||||
|
||||
# Scanning roms
|
||||
# Scanning file system
|
||||
try:
|
||||
fs_roms = fs_rom_handler.get_roms(platform)
|
||||
except RomsNotFoundException as e:
|
||||
fs_platforms: list[str] = fs_platform_handler.get_platforms()
|
||||
except FolderStructureNotMatchException as e:
|
||||
log.error(e)
|
||||
continue
|
||||
await sm.emit("scan:done_ko", e.message)
|
||||
return
|
||||
|
||||
if len(fs_roms) == 0:
|
||||
log.warning(
|
||||
" ⚠️ No roms found, verify that the folder structure is correct"
|
||||
platform_list = [
|
||||
db_platform_handler.get_platforms(s).fs_slug for s in platform_ids
|
||||
] or fs_platforms
|
||||
|
||||
if len(platform_list) == 0:
|
||||
log.warn(
|
||||
"⚠️ No platforms found, verify that the folder structure is right and the volume is mounted correctly "
|
||||
)
|
||||
else:
|
||||
log.info(f" {len(fs_roms)} roms found")
|
||||
log.info(f"Found {len(platform_list)} platforms in file system ")
|
||||
|
||||
for fs_rom in fs_roms:
|
||||
rom = db_rom_handler.get_rom_by_filename(platform.id, fs_rom["file_name"])
|
||||
if (
|
||||
not rom
|
||||
or rom.id in selected_roms
|
||||
or complete_rescan
|
||||
or (rescan_unidentified and not rom.igdb_id)
|
||||
):
|
||||
scanned_rom = await scan_rom(platform, fs_rom)
|
||||
if rom:
|
||||
scanned_rom.id = rom.id
|
||||
for platform_slug in platform_list:
|
||||
platform = db_platform_handler.get_platform_by_fs_slug(platform_slug)
|
||||
scanned_platform = scan_platform(platform_slug, fs_platforms)
|
||||
|
||||
scanned_rom.platform_id = platform.id
|
||||
_added_rom = db_rom_handler.add_rom(scanned_rom)
|
||||
rom = db_rom_handler.get_roms(_added_rom.id)
|
||||
if platform:
|
||||
scanned_platform.id = platform.id
|
||||
|
||||
await sm.emit(
|
||||
"scan:scanning_rom",
|
||||
{
|
||||
"platform_name": platform.name,
|
||||
"platform_slug": platform.slug,
|
||||
**RomSchema.model_validate(rom).model_dump(),
|
||||
},
|
||||
)
|
||||
platform = db_platform_handler.add_platform(scanned_platform)
|
||||
|
||||
db_rom_handler.purge_roms(
|
||||
platform.id, [rom["file_name"] for rom in fs_roms]
|
||||
await sm.emit(
|
||||
"scan:scanning_platform",
|
||||
PlatformSchema.model_validate(platform).model_dump(),
|
||||
)
|
||||
db_platform_handler.purge_platforms(fs_platforms)
|
||||
|
||||
log.info(emoji.emojize(":check_mark: Scan completed "))
|
||||
# Scanning roms
|
||||
try:
|
||||
fs_roms = fs_rom_handler.get_roms(platform)
|
||||
except RomsNotFoundException as e:
|
||||
log.error(e)
|
||||
continue
|
||||
|
||||
await sm.emit("scan:done", {})
|
||||
if len(fs_roms) == 0:
|
||||
log.warning(
|
||||
" ⚠️ No roms found, verify that the folder structure is correct"
|
||||
)
|
||||
else:
|
||||
log.info(f" {len(fs_roms)} roms found")
|
||||
|
||||
for fs_rom in fs_roms:
|
||||
rom = db_rom_handler.get_rom_by_filename(
|
||||
platform.id, fs_rom["file_name"]
|
||||
)
|
||||
if (
|
||||
not rom
|
||||
or rom.id in selected_roms
|
||||
or complete_rescan
|
||||
or (rescan_unidentified and not rom.igdb_id)
|
||||
):
|
||||
scanned_rom = await scan_rom(platform, fs_rom)
|
||||
if rom:
|
||||
scanned_rom.id = rom.id
|
||||
|
||||
scanned_rom.platform_id = platform.id
|
||||
_added_rom = db_rom_handler.add_rom(scanned_rom)
|
||||
rom = db_rom_handler.get_roms(_added_rom.id)
|
||||
|
||||
await sm.emit(
|
||||
"scan:scanning_rom",
|
||||
{
|
||||
"platform_name": platform.name,
|
||||
"platform_slug": platform.slug,
|
||||
**RomSchema.model_validate(rom).model_dump(),
|
||||
},
|
||||
)
|
||||
|
||||
db_rom_handler.purge_roms(
|
||||
platform.id, [rom["file_name"] for rom in fs_roms]
|
||||
)
|
||||
db_platform_handler.purge_platforms(fs_platforms)
|
||||
|
||||
log.info(emoji.emojize(":check_mark: Scan completed "))
|
||||
|
||||
await sm.emit("scan:done", {})
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
# Catch all exceptions and emit error to the client
|
||||
await sm.emit("scan:done_ko", str(e))
|
||||
|
||||
|
||||
@socket_handler.socket_server.on("scan")
|
||||
|
||||
@@ -45,7 +45,7 @@ def platforms_webrcade_feed(request: Request) -> WebrcadeFeedSchema:
|
||||
"type": WEBRCADE_SLUG_TO_TYPE_MAP.get(p.slug, p.slug),
|
||||
"thumbnail": f"{ROMM_HOST}/assets/romm/resources/{rom.path_cover_s}",
|
||||
"background": f"{ROMM_HOST}/assets/romm/resources/{rom.path_cover_l}",
|
||||
"props": {"rom": f"{ROMM_HOST}/api/roms/{rom.id}/content"},
|
||||
"props": {"rom": f"{ROMM_HOST}/api/roms/{rom.id}/content/{rom.file_name}"},
|
||||
}
|
||||
for rom in session.scalars(db_rom_handler.get_roms(platform_id=p.id)).all()
|
||||
],
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
|
||||
from config import LIBRARY_BASE_PATH
|
||||
from config.config_manager import config_manager as cm
|
||||
from config.config_manager import config_manager as cm, Config
|
||||
from exceptions.fs_exceptions import FolderStructureNotMatchException
|
||||
from handler.fs_handler import FSHandler
|
||||
|
||||
@@ -10,11 +10,11 @@ class FSPlatformsHandler(FSHandler):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def _exclude_platforms(self, platforms: list):
|
||||
def _exclude_platforms(self, config: Config, platforms: list):
|
||||
return [
|
||||
platform
|
||||
for platform in platforms
|
||||
if platform not in cm.config.EXCLUDED_PLATFORMS
|
||||
if platform not in config.EXCLUDED_PLATFORMS
|
||||
]
|
||||
|
||||
def get_platforms(self) -> list[str]:
|
||||
@@ -23,12 +23,14 @@ class FSPlatformsHandler(FSHandler):
|
||||
Returns list with all the filesystem platforms found in the LIBRARY_BASE_PATH.
|
||||
Automatically exclude folders defined in user config.
|
||||
"""
|
||||
cnfg = cm.get_config()
|
||||
|
||||
try:
|
||||
platforms: list[str] = (
|
||||
list(os.walk(cm.config.HIGH_PRIO_STRUCTURE_PATH))[0][1]
|
||||
if os.path.exists(cm.config.HIGH_PRIO_STRUCTURE_PATH)
|
||||
list(os.walk(cnfg.HIGH_PRIO_STRUCTURE_PATH))[0][1]
|
||||
if os.path.exists(cnfg.HIGH_PRIO_STRUCTURE_PATH)
|
||||
else list(os.walk(LIBRARY_BASE_PATH))[0][1]
|
||||
)
|
||||
return self._exclude_platforms(platforms)
|
||||
return self._exclude_platforms(cnfg, platforms)
|
||||
except IndexError as exc:
|
||||
raise FolderStructureNotMatchException from exc
|
||||
|
||||
@@ -23,10 +23,11 @@ class FSRomsHandler(FSHandler):
|
||||
pass
|
||||
|
||||
def get_fs_structure(self, fs_slug: str):
|
||||
cnfg = cm.get_config()
|
||||
return (
|
||||
f"{cm.config.ROMS_FOLDER_NAME}/{fs_slug}"
|
||||
if os.path.exists(cm.config.HIGH_PRIO_STRUCTURE_PATH)
|
||||
else f"{fs_slug}/{cm.config.ROMS_FOLDER_NAME}"
|
||||
f"{cnfg.ROMS_FOLDER_NAME}/{fs_slug}"
|
||||
if os.path.exists(cnfg.HIGH_PRIO_STRUCTURE_PATH)
|
||||
else f"{fs_slug}/{cnfg.ROMS_FOLDER_NAME}"
|
||||
)
|
||||
|
||||
def remove_file(self, file_name: str, file_path: str):
|
||||
@@ -81,8 +82,9 @@ class FSRomsHandler(FSHandler):
|
||||
return regs, rev, langs, other_tags
|
||||
|
||||
def _exclude_files(self, files, filetype) -> list[str]:
|
||||
excluded_extensions = getattr(cm.config, f"EXCLUDED_{filetype.upper()}_EXT")
|
||||
excluded_names = getattr(cm.config, f"EXCLUDED_{filetype.upper()}_FILES")
|
||||
cnfg = cm.get_config()
|
||||
excluded_extensions = getattr(cnfg, f"EXCLUDED_{filetype.upper()}_EXT")
|
||||
excluded_names = getattr(cnfg, f"EXCLUDED_{filetype.upper()}_FILES")
|
||||
excluded_files: list = []
|
||||
|
||||
for file_name in files:
|
||||
@@ -105,7 +107,7 @@ class FSRomsHandler(FSHandler):
|
||||
return [f for f in files if f not in excluded_files]
|
||||
|
||||
def _exclude_multi_roms(self, roms) -> list[str]:
|
||||
excluded_names = cm.config.EXCLUDED_MULTI_FILES
|
||||
excluded_names = cm.get_config().EXCLUDED_MULTI_FILES
|
||||
filtered_files: list = []
|
||||
|
||||
for rom in roms:
|
||||
|
||||
@@ -117,9 +117,9 @@ def test_rom_size():
|
||||
def test_exclude_files():
|
||||
from config.config_manager import config_manager as cm
|
||||
|
||||
cm.config.EXCLUDED_SINGLE_FILES = ["Super Mario 64 (J) (Rev A) [Part 1].z64"]
|
||||
cm.add_exclusion("EXCLUDED_SINGLE_FILES", "Super Mario 64 (J) (Rev A) [Part 1].z64")
|
||||
|
||||
patch("utils.fs.config", cm.config)
|
||||
patch("utils.fs.config", cm.get_config())
|
||||
|
||||
filtered_files = fs_rom_handler._exclude_files(
|
||||
files=[
|
||||
@@ -131,7 +131,7 @@ def test_exclude_files():
|
||||
|
||||
assert len(filtered_files) == 1
|
||||
|
||||
cm.config.EXCLUDED_SINGLE_EXT = ["z64"]
|
||||
cm.add_exclusion("EXCLUDED_SINGLE_EXT", "z64")
|
||||
|
||||
filtered_files = fs_rom_handler._exclude_files(
|
||||
files=[
|
||||
@@ -143,7 +143,7 @@ def test_exclude_files():
|
||||
|
||||
assert len(filtered_files) == 0
|
||||
|
||||
cm.config.EXCLUDED_SINGLE_FILES = ["*.z64"]
|
||||
cm.add_exclusion("EXCLUDED_SINGLE_FILES", "*.z64")
|
||||
|
||||
filtered_files = fs_rom_handler._exclude_files(
|
||||
files=[
|
||||
@@ -155,7 +155,7 @@ def test_exclude_files():
|
||||
|
||||
assert len(filtered_files) == 0
|
||||
|
||||
cm.config.EXCLUDED_SINGLE_FILES = ["_.*"]
|
||||
cm.add_exclusion("EXCLUDED_SINGLE_FILES", "_.*")
|
||||
|
||||
filtered_files = fs_rom_handler._exclude_files(
|
||||
files=[
|
||||
|
||||
@@ -316,13 +316,13 @@ class IGDBHandler:
|
||||
with open(MAME_XML_FILE, "r") as index_xml:
|
||||
mame_index = xmltodict.parse(index_xml.read())
|
||||
except FileNotFoundError:
|
||||
log.warning("Fetching the MAME XML file from HyperspinFE...")
|
||||
log.warning("Fetching the MAME XML file from Github...")
|
||||
await update_mame_xml_task.run(force=True)
|
||||
try:
|
||||
with open(MAME_XML_FILE, "r") as index_xml:
|
||||
mame_index = xmltodict.parse(index_xml.read())
|
||||
except FileNotFoundError:
|
||||
log.error("Could not fetch the MAME XML file from HyperspinFE")
|
||||
log.error("Could not fetch the MAME XML file from Github")
|
||||
finally:
|
||||
index_entry = [
|
||||
game
|
||||
|
||||
@@ -15,12 +15,12 @@ from models.platform import Platform
|
||||
from models.rom import Rom
|
||||
from models.user import User
|
||||
|
||||
SWAPPED_PLATFORM_BINDINGS = dict((v, k) for k, v in cm.config.PLATFORMS_BINDING.items())
|
||||
|
||||
|
||||
def _get_main_platform_igdb_id(platform: Platform):
|
||||
if platform.fs_slug in cm.config.PLATFORMS_VERSIONS.keys():
|
||||
main_platform_slug = cm.config.PLATFORMS_VERSIONS[platform.fs_slug]
|
||||
cnfg = cm.get_config()
|
||||
|
||||
if platform.fs_slug in cnfg.PLATFORMS_VERSIONS.keys():
|
||||
main_platform_slug = cnfg.PLATFORMS_VERSIONS[platform.fs_slug]
|
||||
main_platform = db_platform_handler.get_platform_by_fs_slug(main_platform_slug)
|
||||
if main_platform:
|
||||
main_platform_igdb_id = main_platform.igdb_id
|
||||
@@ -49,19 +49,22 @@ def scan_platform(fs_slug: str, fs_platforms) -> Platform:
|
||||
platform_attrs: dict[str, Any] = {}
|
||||
platform_attrs["fs_slug"] = fs_slug
|
||||
|
||||
cnfg = cm.get_config()
|
||||
swapped_platform_bindings = dict((v, k) for k, v in cnfg.PLATFORMS_BINDING.items())
|
||||
|
||||
# Sometimes users change the name of the folder, so we try to match it with the config
|
||||
if fs_slug not in fs_platforms:
|
||||
log.warning(
|
||||
f" {fs_slug} not found in file system, trying to match via config..."
|
||||
)
|
||||
if fs_slug in SWAPPED_PLATFORM_BINDINGS.keys():
|
||||
if fs_slug in swapped_platform_bindings.keys():
|
||||
platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
|
||||
if platform:
|
||||
platform_attrs["fs_slug"] = SWAPPED_PLATFORM_BINDINGS[platform.slug]
|
||||
platform_attrs["fs_slug"] = swapped_platform_bindings[platform.slug]
|
||||
|
||||
try:
|
||||
if fs_slug in cm.config.PLATFORMS_BINDING.keys():
|
||||
platform_attrs["slug"] = cm.config.PLATFORMS_BINDING[fs_slug]
|
||||
if fs_slug in cnfg.PLATFORMS_BINDING.keys():
|
||||
platform_attrs["slug"] = cnfg.PLATFORMS_BINDING[fs_slug]
|
||||
else:
|
||||
platform_attrs["slug"] = fs_slug
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
|
||||
@@ -15,8 +15,8 @@ from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
|
||||
path = (
|
||||
cm.config.HIGH_PRIO_STRUCTURE_PATH
|
||||
if os.path.exists(cm.config.HIGH_PRIO_STRUCTURE_PATH)
|
||||
cm.get_config().HIGH_PRIO_STRUCTURE_PATH
|
||||
if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH)
|
||||
else LIBRARY_BASE_PATH
|
||||
)
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import storeConfig from "@/stores/config";
|
||||
import storeHeartbeat from "@/stores/heartbeat";
|
||||
import socket from "@/services/socket";
|
||||
import { onBeforeMount } from "vue";
|
||||
import cookie from "js-cookie";
|
||||
import storeGalleryFilter from "@/stores/galleryFilter";
|
||||
import storeRoms, { type Rom } from "@/stores/roms";
|
||||
import storeScanning from "@/stores/scanning";
|
||||
@@ -80,7 +79,7 @@ socket.on("scan:done_ko", (msg) => {
|
||||
scanningStore.set(false);
|
||||
|
||||
emitter?.emit("snackbarShow", {
|
||||
msg: `Scan couldn't be completed. Something went wrong: ${msg}`,
|
||||
msg: `Scan failed: ${msg}`,
|
||||
icon: "mdi-close-circle",
|
||||
color: "red",
|
||||
});
|
||||
@@ -101,8 +100,6 @@ onBeforeMount(() => {
|
||||
api.get("/config").then(({ data: configData }) => {
|
||||
configStore.set(configData);
|
||||
});
|
||||
// Set CSRF token for all requests
|
||||
api.defaults.headers.common["x-csrftoken"] = cookie.get("csrftoken");
|
||||
});
|
||||
</script>
|
||||
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
import axios from "axios";
|
||||
import cookie from "js-cookie";
|
||||
|
||||
import router from "@/plugins/router";
|
||||
|
||||
import axios from "axios";
|
||||
|
||||
const api = axios.create({ baseURL: "/api", timeout: 120000 });
|
||||
|
||||
// Set CSRF header for all requests
|
||||
api.interceptors.request.use((config) => {
|
||||
config.headers["x-csrftoken"] = cookie.get("csrftoken");
|
||||
return config;
|
||||
});
|
||||
|
||||
api.interceptors.response.use(
|
||||
(response) => response,
|
||||
(error) => {
|
||||
|
||||
@@ -125,7 +125,7 @@ async function downloadRom({
|
||||
});
|
||||
|
||||
const a = document.createElement("a");
|
||||
a.href = `/api/roms/${rom.id}/content?${files_params}`;
|
||||
a.href = `/api/roms/${rom.id}/content/${rom.file_name}?${files_params}`;
|
||||
a.download = `${rom.name}.zip`;
|
||||
a.click();
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ window.EJS_core =
|
||||
props.rom.platform_slug.toLowerCase() as EJSPlatformSlug
|
||||
];
|
||||
window.EJS_gameID = props.rom.id;
|
||||
window.EJS_gameUrl = `/api/roms/${props.rom.id}/content`;
|
||||
window.EJS_gameUrl = `/api/roms/${props.rom.id}/content/${props.rom.file_name}`;
|
||||
window.EJS_player = "#game";
|
||||
window.EJS_pathtodata = "/assets/emulatorjs/";
|
||||
window.EJS_color = "#A453FF";
|
||||
|
||||
Reference in New Issue
Block a user