From 7bf14f4acbfa2da164f4e5ca09e98ffe79c540b0 Mon Sep 17 00:00:00 2001 From: zurdi zurdo Date: Tue, 18 Apr 2023 16:55:18 +0200 Subject: [PATCH] exclude multi rom part with extension fixed --- backend/src/config/config_loader.py | 48 ++++---- backend/src/handler/db_handler.py | 5 +- backend/src/utils/fs.py | 170 ++++++++++++++-------------- 3 files changed, 117 insertions(+), 106 deletions(-) diff --git a/backend/src/config/config_loader.py b/backend/src/config/config_loader.py index efc0be00d..5a991bc14 100644 --- a/backend/src/config/config_loader.py +++ b/backend/src/config/config_loader.py @@ -2,30 +2,36 @@ import os import sys from urllib.parse import quote_plus -from config import ROMM_DB_DRIVER, SUPPORTED_DB_DRIVERS, SQLITE_DB_BASE_PATH +from config import ROMM_DB_DRIVER, SUPPORTED_DB_DRIVERS, SQLITE_DB_BASE_PATH, user_config from logger.logger import log -def get_db_engine(): - if ROMM_DB_DRIVER in SUPPORTED_DB_DRIVERS: +class ConfigLoader: - if ROMM_DB_DRIVER == 'mariadb': - DB_HOST: str = os.getenv('DB_HOST') - try: - DB_PORT: int = int(os.getenv('DB_PORT')) - except TypeError: - log.critical(f"DB_PORT variable not set properly") - sys.exit(3) - DB_USER: str = os.getenv('DB_USER') - DB_PASSWD: str = os.getenv('DB_PASSWD') - DB_NAME: str = os.getenv('DB_NAME', 'romm') - - return f"mariadb+mariadbconnector://{DB_USER}:%s@{DB_HOST}:{DB_PORT}/{DB_NAME}" % quote_plus(DB_PASSWD) + def __init__(self): + pass - elif ROMM_DB_DRIVER == 'sqlite': - if not os.path.exists(SQLITE_DB_BASE_PATH): os.makedirs(SQLITE_DB_BASE_PATH) - return f"sqlite:////{SQLITE_DB_BASE_PATH}/romm.db" - else: - log.critical(f"{ROMM_DB_DRIVER} database not supported") - sys.exit(3) + def get_db_engine(self): + if ROMM_DB_DRIVER in SUPPORTED_DB_DRIVERS: + + if ROMM_DB_DRIVER == 'mariadb': + DB_HOST: str = os.getenv('DB_HOST') + try: + DB_PORT: int = int(os.getenv('DB_PORT')) + except TypeError: + log.critical(f"DB_PORT variable not set properly") + sys.exit(3) + DB_USER: str = os.getenv('DB_USER') + DB_PASSWD: str = os.getenv('DB_PASSWD') + DB_NAME: str = os.getenv('DB_NAME', 'romm') + + return f"mariadb+mariadbconnector://{DB_USER}:%s@{DB_HOST}:{DB_PORT}/{DB_NAME}" % quote_plus(DB_PASSWD) + + elif ROMM_DB_DRIVER == 'sqlite': + if not os.path.exists(SQLITE_DB_BASE_PATH): os.makedirs(SQLITE_DB_BASE_PATH) + return f"sqlite:////{SQLITE_DB_BASE_PATH}/romm.db" + + else: + log.critical(f"{ROMM_DB_DRIVER} database not supported") + sys.exit(3) diff --git a/backend/src/handler/db_handler.py b/backend/src/handler/db_handler.py index 483a7c4f5..9ddff8bdc 100644 --- a/backend/src/handler/db_handler.py +++ b/backend/src/handler/db_handler.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy import select from sqlalchemy.exc import ProgrammingError -from config.config_loader import get_db_engine +from config.config_loader import ConfigLoader from models.platform import Platform from models.rom import Rom @@ -14,7 +14,8 @@ from models.rom import Rom class DBHandler: def __init__(self) -> None: - self.engine = create_engine(get_db_engine(), pool_pre_ping=True) + cl = ConfigLoader() + self.engine = create_engine(cl.get_db_engine(), pool_pre_ping=True) self.session = sessionmaker(bind=self.engine, expire_on_commit=False) diff --git a/backend/src/utils/fs.py b/backend/src/utils/fs.py index ddd099f68..426878178 100644 --- a/backend/src/utils/fs.py +++ b/backend/src/utils/fs.py @@ -11,7 +11,71 @@ from handler import dbh from logger.logger import log -# ========= Defaults utils ========= +# ========= Resources utils ========= +def _cover_exists(p_slug: str, file_name: str, size: str) -> bool: + """Check if rom cover exists in filesystem + + Args: + p_slug: short name of the platform + file_name: name of rom file + size: size of the cover -> big as 'l' | small as 's' + Returns + True if cover exists in filesystem else False + """ + logo_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png" + return True if os.path.exists(logo_path) else False + + +def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None: + """Store roms resources in filesystem + + Args: + p_slug: short name of the platform + file_name: name of rom file + url_cover: url to get the cover + size: size of the cover -> big as 'l' | small as 's' + """ + cover_file: str = f"{file_name}_{size}.png" + cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/" + sizes: dict = {'l': 'big', 's': 'small'} + res = requests.get(url_cover.replace('t_thumb', f't_cover_{sizes[size]}'), stream=True) + if res.status_code == 200: + Path(cover_path).mkdir(parents=True, exist_ok=True) + with open(f"{cover_path}/{cover_file}", 'wb') as f: + shutil.copyfileobj(res.raw, f) + log.info(f"{file_name} {sizes[size]} cover downloaded successfully!") + else: + log.error(f"{file_name} {sizes[size]} cover couldn't be downloaded") + + +def _get_cover_path(p_slug: str, file_name: str, size: str) -> str: + """Returns rom cover filesystem path adapted to frontend folder structure + + Args: + p_slug: short name of the platform + file_name: name of rom file + size: size of the cover -> big as 'l' | small as 's' + """ + return f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png" + + +def get_cover_details(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple: + path_cover_s: str = DEFAULT_PATH_COVER_S + path_cover_l: str = DEFAULT_PATH_COVER_L + has_cover: int = 0 + if (overwrite or not _cover_exists(p_slug, file_name, 's')) and url_cover: + _store_cover(p_slug, file_name, url_cover, 's') + if _cover_exists(p_slug, file_name, 's'): + path_cover_s = _get_cover_path(p_slug, file_name, 's') + + if (overwrite or not _cover_exists(p_slug, file_name, 'l')) and url_cover: + _store_cover(p_slug, file_name, url_cover, 'l') + if _cover_exists(p_slug, file_name, 'l'): + path_cover_l = _get_cover_path(p_slug, file_name, 'l') + has_cover = 1 + return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover} + + def store_default_resources() -> None: """Store default cover resources in the filesystem""" defaul_covers: dict = [{'url': DEFAULT_URL_COVER_L, 'size': 'l'}, {'url': DEFAULT_URL_COVER_S, 'size': 's'}] @@ -50,16 +114,24 @@ def _get_roms_structure(p_slug) -> tuple: def _exclude_files(files, type) -> list[str]: - try: - excluded_extensions: list = [] - excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['extensions'] - except (TypeError, KeyError): - pass - try: - excluded_names: list = [] - excluded_names = user_config['exclude']['roms'][f'{type}_file']['names'] - except (TypeError, KeyError): - pass + if type == 'single': + try: + excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['extensions'] + except (TypeError, KeyError): + excluded_extensions: list = [] + try: + excluded_names = user_config['exclude']['roms'][f'{type}_file']['names'] + except (TypeError, KeyError): + excluded_names: list = [] + elif type == 'multi': + try: + excluded_extensions = user_config['exclude']['roms'][f'{type}_file']['parts']['extensions'] + except (TypeError, KeyError): + excluded_extensions: list = [] + try: + excluded_names = user_config['exclude']['roms'][f'{type}_file']['parts']['names'] + except (TypeError, KeyError): + excluded_names: list = [] filtered_files: list = [] for rom in files: try: @@ -113,19 +185,15 @@ def _get_rom_files(multi: bool, rom: str, roms_path: str) -> list[str]: return [] if not multi else _exclude_files(list(os.walk(f"{roms_path}/{rom}"))[0][2], 'multi') -def _convert_size_human_readable(size, decimals=2) -> tuple: - for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: - if size < 1024.0 or unit == 'PB': break - size /= 1024.0 - return round(size, 2), unit - - def _get_file_size(multi: bool, rom: str, files: list, roms_path:str) -> str: files: list = [f"{roms_path}/{rom}"] if not multi else [f"{roms_path}/{rom}/{file}" for file in files] total_size: int = 0 for file in files: total_size += os.stat(file).st_size - return _convert_size_human_readable(total_size) + for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: + if total_size < 1024.0 or unit == 'PB': break + total_size /= 1024.0 + return round(total_size, 2), unit def get_roms(p_slug: str, full_scan: bool, only_amount: bool = False) -> list[dict]: @@ -193,67 +261,3 @@ def remove_rom(p_slug: str, file_name: str) -> None: shutil.rmtree(f"{rom_path}/{file_name}") except FileNotFoundError: log.error(f"{rom_path}/{file_name} not found in filesystem") - - -def _cover_exists(p_slug: str, file_name: str, size: str) -> bool: - """Check if rom cover exists in filesystem - - Args: - p_slug: short name of the platform - file_name: name of rom file - size: size of the cover -> big as 'l' | small as 's' - Returns - True if cover exists in filesystem else False - """ - logo_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png" - return True if os.path.exists(logo_path) else False - - -def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None: - """Store roms resources in filesystem - - Args: - p_slug: short name of the platform - file_name: name of rom file - url_cover: url to get the cover - size: size of the cover -> big as 'l' | small as 's' - """ - cover_file: str = f"{file_name}_{size}.png" - cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/" - sizes: dict = {'l': 'big', 's': 'small'} - res = requests.get(url_cover.replace('t_thumb', f't_cover_{sizes[size]}'), stream=True) - if res.status_code == 200: - Path(cover_path).mkdir(parents=True, exist_ok=True) - with open(f"{cover_path}/{cover_file}", 'wb') as f: - shutil.copyfileobj(res.raw, f) - log.info(f"{file_name} {sizes[size]} cover downloaded successfully!") - else: - log.error(f"{file_name} {sizes[size]} cover couldn't be downloaded") - - -def _get_cover_path(p_slug: str, file_name: str, size: str) -> str: - """Returns rom cover filesystem path adapted to frontend folder structure - - Args: - p_slug: short name of the platform - file_name: name of rom file - size: size of the cover -> big as 'l' | small as 's' - """ - return f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}_{size}.png" - - -def get_cover_details(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple: - path_cover_s: str = DEFAULT_PATH_COVER_S - path_cover_l: str = DEFAULT_PATH_COVER_L - has_cover: int = 0 - if (overwrite or not _cover_exists(p_slug, file_name, 's')) and url_cover: - _store_cover(p_slug, file_name, url_cover, 's') - if _cover_exists(p_slug, file_name, 's'): - path_cover_s = _get_cover_path(p_slug, file_name, 's') - - if (overwrite or not _cover_exists(p_slug, file_name, 'l')) and url_cover: - _store_cover(p_slug, file_name, url_cover, 'l') - if _cover_exists(p_slug, file_name, 'l'): - path_cover_l = _get_cover_path(p_slug, file_name, 'l') - has_cover = 1 - return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover}