mirror of
https://github.com/rommapp/romm.git
synced 2026-06-29 23:35:47 +00:00
exclude multi rom part with extension fixed
This commit is contained in:
@@ -2,30 +2,36 @@ import os
|
||||
import sys
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from config import ROMM_DB_DRIVER, SUPPORTED_DB_DRIVERS, SQLITE_DB_BASE_PATH
|
||||
from config import ROMM_DB_DRIVER, SUPPORTED_DB_DRIVERS, SQLITE_DB_BASE_PATH, user_config
|
||||
from logger.logger import log
|
||||
|
||||
|
||||
def get_db_engine():
|
||||
if ROMM_DB_DRIVER in SUPPORTED_DB_DRIVERS:
|
||||
class ConfigLoader:
|
||||
|
||||
if ROMM_DB_DRIVER == 'mariadb':
|
||||
DB_HOST: str = os.getenv('DB_HOST')
|
||||
try:
|
||||
DB_PORT: int = int(os.getenv('DB_PORT'))
|
||||
except TypeError:
|
||||
log.critical(f"DB_PORT variable not set properly")
|
||||
sys.exit(3)
|
||||
DB_USER: str = os.getenv('DB_USER')
|
||||
DB_PASSWD: str = os.getenv('DB_PASSWD')
|
||||
DB_NAME: str = os.getenv('DB_NAME', 'romm')
|
||||
|
||||
return f"mariadb+mariadbconnector://{DB_USER}:%s@{DB_HOST}:{DB_PORT}/{DB_NAME}" % quote_plus(DB_PASSWD)
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
elif ROMM_DB_DRIVER == 'sqlite':
|
||||
if not os.path.exists(SQLITE_DB_BASE_PATH): os.makedirs(SQLITE_DB_BASE_PATH)
|
||||
return f"sqlite:////{SQLITE_DB_BASE_PATH}/romm.db"
|
||||
|
||||
else:
|
||||
log.critical(f"{ROMM_DB_DRIVER} database not supported")
|
||||
sys.exit(3)
|
||||
def get_db_engine(self):
|
||||
if ROMM_DB_DRIVER in SUPPORTED_DB_DRIVERS:
|
||||
|
||||
if ROMM_DB_DRIVER == 'mariadb':
|
||||
DB_HOST: str = os.getenv('DB_HOST')
|
||||
try:
|
||||
DB_PORT: int = int(os.getenv('DB_PORT'))
|
||||
except TypeError:
|
||||
log.critical(f"DB_PORT variable not set properly")
|
||||
sys.exit(3)
|
||||
DB_USER: str = os.getenv('DB_USER')
|
||||
DB_PASSWD: str = os.getenv('DB_PASSWD')
|
||||
DB_NAME: str = os.getenv('DB_NAME', 'romm')
|
||||
|
||||
return f"mariadb+mariadbconnector://{DB_USER}:%s@{DB_HOST}:{DB_PORT}/{DB_NAME}" % quote_plus(DB_PASSWD)
|
||||
|
||||
elif ROMM_DB_DRIVER == 'sqlite':
|
||||
if not os.path.exists(SQLITE_DB_BASE_PATH): os.makedirs(SQLITE_DB_BASE_PATH)
|
||||
return f"sqlite:////{SQLITE_DB_BASE_PATH}/romm.db"
|
||||
|
||||
else:
|
||||
log.critical(f"{ROMM_DB_DRIVER} database not supported")
|
||||
sys.exit(3)
|
||||
|
||||
@@ -6,7 +6,7 @@ from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import ProgrammingError
|
||||
|
||||
from config.config_loader import get_db_engine
|
||||
from config.config_loader import ConfigLoader
|
||||
from models.platform import Platform
|
||||
from models.rom import Rom
|
||||
|
||||
@@ -14,7 +14,8 @@ from models.rom import Rom
|
||||
class DBHandler:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.engine = create_engine(get_db_engine(), pool_pre_ping=True)
|
||||
cl = ConfigLoader()
|
||||
self.engine = create_engine(cl.get_db_engine(), pool_pre_ping=True)
|
||||
self.session = sessionmaker(bind=self.engine, expire_on_commit=False)
|
||||
|
||||
|
||||
|
||||
@@ -11,7 +11,71 @@ from handler import dbh
|
||||
from logger.logger import log
|
||||
|
||||
|
||||
# ========= Defaults utils =========
|
||||
# ========= Resources utils =========
|
||||
def _cover_exists(p_slug: str, file_name: str, size: str) -> bool:
|
||||
"""Check if rom cover exists in filesystem
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
Returns
|
||||
True if cover exists in filesystem else False
|
||||
"""
|
||||
logo_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png"
|
||||
return True if os.path.exists(logo_path) else False
|
||||
|
||||
|
||||
def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None:
|
||||
"""Store roms resources in filesystem
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
url_cover: url to get the cover
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
"""
|
||||
cover_file: str = f"{file_name}_{size}.png"
|
||||
cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/"
|
||||
sizes: dict = {'l': 'big', 's': 'small'}
|
||||
res = requests.get(url_cover.replace('t_thumb', f't_cover_{sizes[size]}'), stream=True)
|
||||
if res.status_code == 200:
|
||||
Path(cover_path).mkdir(parents=True, exist_ok=True)
|
||||
with open(f"{cover_path}/{cover_file}", 'wb') as f:
|
||||
shutil.copyfileobj(res.raw, f)
|
||||
log.info(f"{file_name} {sizes[size]} cover downloaded successfully!")
|
||||
else:
|
||||
log.error(f"{file_name} {sizes[size]} cover couldn't be downloaded")
|
||||
|
||||
|
||||
def _get_cover_path(p_slug: str, file_name: str, size: str) -> str:
|
||||
"""Returns rom cover filesystem path adapted to frontend folder structure
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
"""
|
||||
return f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png"
|
||||
|
||||
|
||||
def get_cover_details(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple:
|
||||
path_cover_s: str = DEFAULT_PATH_COVER_S
|
||||
path_cover_l: str = DEFAULT_PATH_COVER_L
|
||||
has_cover: int = 0
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 's')) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, 's')
|
||||
if _cover_exists(p_slug, file_name, 's'):
|
||||
path_cover_s = _get_cover_path(p_slug, file_name, 's')
|
||||
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 'l')) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, 'l')
|
||||
if _cover_exists(p_slug, file_name, 'l'):
|
||||
path_cover_l = _get_cover_path(p_slug, file_name, 'l')
|
||||
has_cover = 1
|
||||
return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover}
|
||||
|
||||
|
||||
def store_default_resources() -> None:
|
||||
"""Store default cover resources in the filesystem"""
|
||||
defaul_covers: dict = [{'url': DEFAULT_URL_COVER_L, 'size': 'l'}, {'url': DEFAULT_URL_COVER_S, 'size': 's'}]
|
||||
@@ -50,16 +114,24 @@ def _get_roms_structure(p_slug) -> tuple:
|
||||
|
||||
|
||||
def _exclude_files(files, type) -> list[str]:
|
||||
try:
|
||||
excluded_extensions: list = []
|
||||
excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['extensions']
|
||||
except (TypeError, KeyError):
|
||||
pass
|
||||
try:
|
||||
excluded_names: list = []
|
||||
excluded_names = user_config['exclude']['roms'][f'{type}_file']['names']
|
||||
except (TypeError, KeyError):
|
||||
pass
|
||||
if type == 'single':
|
||||
try:
|
||||
excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['extensions']
|
||||
except (TypeError, KeyError):
|
||||
excluded_extensions: list = []
|
||||
try:
|
||||
excluded_names = user_config['exclude']['roms'][f'{type}_file']['names']
|
||||
except (TypeError, KeyError):
|
||||
excluded_names: list = []
|
||||
elif type == 'multi':
|
||||
try:
|
||||
excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['parts']['extensions']
|
||||
except (TypeError, KeyError):
|
||||
excluded_extensions: list = []
|
||||
try:
|
||||
excluded_names = user_config['exclude']['roms'][f'{type}_file']['parts']['names']
|
||||
except (TypeError, KeyError):
|
||||
excluded_names: list = []
|
||||
filtered_files: list = []
|
||||
for rom in files:
|
||||
try:
|
||||
@@ -113,19 +185,15 @@ def _get_rom_files(multi: bool, rom: str, roms_path: str) -> list[str]:
|
||||
return [] if not multi else _exclude_files(list(os.walk(f"{roms_path}/{rom}"))[0][2], 'multi')
|
||||
|
||||
|
||||
def _convert_size_human_readable(size, decimals=2) -> tuple:
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
|
||||
if size < 1024.0 or unit == 'PB': break
|
||||
size /= 1024.0
|
||||
return round(size, 2), unit
|
||||
|
||||
|
||||
def _get_file_size(multi: bool, rom: str, files: list, roms_path:str) -> str:
|
||||
files: list = [f"{roms_path}/{rom}"] if not multi else [f"{roms_path}/{rom}/{file}" for file in files]
|
||||
total_size: int = 0
|
||||
for file in files:
|
||||
total_size += os.stat(file).st_size
|
||||
return _convert_size_human_readable(total_size)
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
|
||||
if total_size < 1024.0 or unit == 'PB': break
|
||||
total_size /= 1024.0
|
||||
return round(total_size, 2), unit
|
||||
|
||||
|
||||
def get_roms(p_slug: str, full_scan: bool, only_amount: bool = False) -> list[dict]:
|
||||
@@ -193,67 +261,3 @@ def remove_rom(p_slug: str, file_name: str) -> None:
|
||||
shutil.rmtree(f"{rom_path}/{file_name}")
|
||||
except FileNotFoundError:
|
||||
log.error(f"{rom_path}/{file_name} not found in filesystem")
|
||||
|
||||
|
||||
def _cover_exists(p_slug: str, file_name: str, size: str) -> bool:
|
||||
"""Check if rom cover exists in filesystem
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
Returns
|
||||
True if cover exists in filesystem else False
|
||||
"""
|
||||
logo_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png"
|
||||
return True if os.path.exists(logo_path) else False
|
||||
|
||||
|
||||
def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None:
|
||||
"""Store roms resources in filesystem
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
url_cover: url to get the cover
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
"""
|
||||
cover_file: str = f"{file_name}_{size}.png"
|
||||
cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/"
|
||||
sizes: dict = {'l': 'big', 's': 'small'}
|
||||
res = requests.get(url_cover.replace('t_thumb', f't_cover_{sizes[size]}'), stream=True)
|
||||
if res.status_code == 200:
|
||||
Path(cover_path).mkdir(parents=True, exist_ok=True)
|
||||
with open(f"{cover_path}/{cover_file}", 'wb') as f:
|
||||
shutil.copyfileobj(res.raw, f)
|
||||
log.info(f"{file_name} {sizes[size]} cover downloaded successfully!")
|
||||
else:
|
||||
log.error(f"{file_name} {sizes[size]} cover couldn't be downloaded")
|
||||
|
||||
|
||||
def _get_cover_path(p_slug: str, file_name: str, size: str) -> str:
|
||||
"""Returns rom cover filesystem path adapted to frontend folder structure
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
size: size of the cover -> big as 'l' | small as 's'
|
||||
"""
|
||||
return f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png"
|
||||
|
||||
|
||||
def get_cover_details(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple:
|
||||
path_cover_s: str = DEFAULT_PATH_COVER_S
|
||||
path_cover_l: str = DEFAULT_PATH_COVER_L
|
||||
has_cover: int = 0
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 's')) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, 's')
|
||||
if _cover_exists(p_slug, file_name, 's'):
|
||||
path_cover_s = _get_cover_path(p_slug, file_name, 's')
|
||||
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 'l')) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, 'l')
|
||||
if _cover_exists(p_slug, file_name, 'l'):
|
||||
path_cover_l = _get_cover_path(p_slug, file_name, 'l')
|
||||
has_cover = 1
|
||||
return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover}
|
||||
|
||||
Reference in New Issue
Block a user