diff --git a/backend/endpoints/scan.py b/backend/endpoints/scan.py index 3eb5eb94d..e35e73ffa 100644 --- a/backend/endpoints/scan.py +++ b/backend/endpoints/scan.py @@ -9,40 +9,58 @@ from models.platform import Platform from models.rom import Rom -async def scan(_sid: str, platforms: str, complete_rescan: bool=True, sm=None): +async def scan(_sid: str, platforms: str, complete_rescan: bool = True, sm=None): """Scan platforms and roms and write them in database.""" log.info(emoji.emojize(":magnifying_glass_tilted_right: Scanning ")) fs.store_default_resources() - try: # Scanning platforms + try: # Scanning platforms fs_platforms: list[str] = fs.get_platforms() except PlatformsNotFoundException as e: - log.error(e); await sm.emit('scan:done_ko', e.message); return - - platforms: list[str] = json.loads(platforms) if len(json.loads(platforms)) > 0 else fs_platforms + log.error(e) + await sm.emit("scan:done_ko", e.message) + return + + platforms: list[str] = ( + json.loads(platforms) if len(json.loads(platforms)) > 0 else fs_platforms + ) log.info(f"Platforms to be scanned: {', '.join(platforms)}") for platform in platforms: log.info(emoji.emojize(f":video_game: {platform} {COLORS['reset']}")) try: scanned_platform: Platform = fastapi.scan_platform(platform) except RomsNotFoundException as e: - log.error(e); continue - await sm.emit('scan:scanning_platform', [scanned_platform.name, scanned_platform.slug]); await sm.emit('') # Workaround to emit in real-time - if platform != str(scanned_platform): log.info(f"Identified as {COLORS['blue']}{scanned_platform}{COLORS['reset']}") + log.error(e) + continue + await sm.emit( + "scan:scanning_platform", [scanned_platform.name, scanned_platform.slug] + ) + await sm.emit("") # Workaround to emit in real-time + if platform != str(scanned_platform): + log.info( + f"Identified as {COLORS['blue']}{scanned_platform}{COLORS['reset']}" + ) dbh.add_platform(scanned_platform) # Scanning roms fs_roms: list[str] = fs.get_roms(scanned_platform.fs_slug) for rom in fs_roms: - rom_id: int = dbh.rom_exists(scanned_platform.slug, rom['file_name']) - if rom_id and not complete_rescan: continue - await sm.emit('scan:scanning_rom', rom['file_name']); await sm.emit('') # Workaround to emit in real-time + rom_id: int = dbh.rom_exists(scanned_platform.slug, rom["file_name"]) + if rom_id and not complete_rescan: + continue + await sm.emit("scan:scanning_rom", rom["file_name"]) + await sm.emit("") # Workaround to emit in real-time log.info(f"Scanning {COLORS['orange']}{rom['file_name']}{COLORS['reset']}") - if rom['multi']: [log.info(f"\t - {COLORS['orange_i']}{file}{COLORS['reset']}") for file in rom['files']] + if rom["multi"]: + [ + log.info(f"\t - {COLORS['orange_i']}{file}{COLORS['reset']}") + for file in rom["files"] + ] scanned_rom: Rom = fastapi.scan_rom(scanned_platform, rom) - if rom_id: scanned_rom.id = rom_id + if rom_id: + scanned_rom.id = rom_id dbh.add_rom(scanned_rom) - dbh.purge_roms(scanned_platform.slug, [rom['file_name'] for rom in fs_roms]) + dbh.purge_roms(scanned_platform.slug, [rom["file_name"] for rom in fs_roms]) dbh.purge_platforms(fs_platforms) - await sm.emit('scan:done') + await sm.emit("scan:done") diff --git a/backend/utils/fs.py b/backend/utils/fs.py index f296f0558..ab4109821 100644 --- a/backend/utils/fs.py +++ b/backend/utils/fs.py @@ -4,17 +4,30 @@ import datetime import requests from pathlib import Path -from config import LIBRARY_BASE_PATH, HIGH_PRIO_STRUCTURE_PATH, \ - RESOURCES_BASE_PATH, DEFAULT_URL_COVER_L, DEFAULT_PATH_COVER_L, DEFAULT_URL_COVER_S, DEFAULT_PATH_COVER_S +from config import ( + LIBRARY_BASE_PATH, + HIGH_PRIO_STRUCTURE_PATH, + RESOURCES_BASE_PATH, + DEFAULT_URL_COVER_L, + DEFAULT_PATH_COVER_L, + DEFAULT_URL_COVER_S, + DEFAULT_PATH_COVER_S, +) from config.config_loader import ConfigLoader -from utils.exceptions import PlatformsNotFoundException, RomsNotFoundException, RomNotFoundError, RomAlreadyExistsException +from utils.exceptions import ( + PlatformsNotFoundException, + RomsNotFoundException, + RomNotFoundError, + RomAlreadyExistsException, +) + cl = ConfigLoader() # ========= Resources utils ========= def _cover_exists(p_slug: str, file_name: str, size: str) -> bool: """Check if rom cover exists in filesystem - + Args: p_slug: short name of the platform file_name: name of rom file @@ -28,7 +41,7 @@ def _cover_exists(p_slug: str, file_name: str, size: str) -> bool: def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None: """Store roms resources in filesystem - + Args: p_slug: short name of the platform file_name: name of rom file @@ -37,16 +50,16 @@ def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None """ cover_file: str = f"{size}.png" cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}/cover" - res = requests.get(url_cover.replace('t_thumb', f't_cover_{size}'), stream=True) + res = requests.get(url_cover.replace("t_thumb", f"t_cover_{size}"), stream=True) if res.status_code == 200: Path(cover_path).mkdir(parents=True, exist_ok=True) - with open(f"{cover_path}/{cover_file}", 'wb') as f: + with open(f"{cover_path}/{cover_file}", "wb") as f: shutil.copyfileobj(res.raw, f) def _get_cover_path(p_slug: str, file_name: str, size: str) -> str: """Returns rom cover filesystem path adapted to frontend folder structure - + Args: p_slug: short name of the platform file_name: name of rom file @@ -57,17 +70,31 @@ def _get_cover_path(p_slug: str, file_name: str, size: str) -> str: def get_cover(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple: # Cover small - if (overwrite or not _cover_exists(p_slug, file_name, 'small')) and url_cover: _store_cover(p_slug, file_name, url_cover, 'small') - path_cover_s = _get_cover_path(p_slug, file_name, 'small') if _cover_exists(p_slug, file_name, 'small') else DEFAULT_PATH_COVER_S + if (overwrite or not _cover_exists(p_slug, file_name, "small")) and url_cover: + _store_cover(p_slug, file_name, url_cover, "small") + path_cover_s = ( + _get_cover_path(p_slug, file_name, "small") + if _cover_exists(p_slug, file_name, "small") + else DEFAULT_PATH_COVER_S + ) # Cover big - if (overwrite or not _cover_exists(p_slug, file_name, 'big')) and url_cover: _store_cover(p_slug, file_name, url_cover, 'big') - (path_cover_l, has_cover) = (_get_cover_path(p_slug, file_name, 'big'), 1) if _cover_exists(p_slug, file_name, 'big') else (DEFAULT_PATH_COVER_L, 0) - return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover} + if (overwrite or not _cover_exists(p_slug, file_name, "big")) and url_cover: + _store_cover(p_slug, file_name, url_cover, "big") + (path_cover_l, has_cover) = ( + (_get_cover_path(p_slug, file_name, "big"), 1) + if _cover_exists(p_slug, file_name, "big") + else (DEFAULT_PATH_COVER_L, 0) + ) + return { + "path_cover_s": path_cover_s, + "path_cover_l": path_cover_l, + "has_cover": has_cover, + } def _store_screenshot(p_slug: str, file_name: str, url: str, idx: int) -> None: """Store roms resources in filesystem - + Args: p_slug: short name of the platform file_name: name of rom file @@ -78,13 +105,13 @@ def _store_screenshot(p_slug: str, file_name: str, url: str, idx: int) -> None: res = requests.get(url, stream=True) if res.status_code == 200: Path(screenshot_path).mkdir(parents=True, exist_ok=True) - with open(f"{screenshot_path}/{screenshot_file}", 'wb') as f: + with open(f"{screenshot_path}/{screenshot_file}", "wb") as f: shutil.copyfileobj(res.raw, f) def _get_screenshot_path(p_slug: str, file_name: str, idx: str) -> str: """Returns rom cover filesystem path adapted to frontend folder structure - + Args: p_slug: short name of the platform file_name: name of rom file @@ -98,30 +125,41 @@ def get_screenshots(p_slug: str, file_name: str, url_screenshots: list) -> tuple for idx, url in enumerate(url_screenshots): _store_screenshot(p_slug, file_name, url, idx) path_screenshots.append(_get_screenshot_path(p_slug, file_name, idx)) - return {'path_screenshots': path_screenshots} + return {"path_screenshots": path_screenshots} def store_default_resources() -> None: """Store default cover resources in the filesystem""" - defaul_covers: dict = [{'url': DEFAULT_URL_COVER_L, 'size': 'big'}, {'url': DEFAULT_URL_COVER_S, 'size': 'small'}] + defaul_covers: dict = [ + {"url": DEFAULT_URL_COVER_L, "size": "big"}, + {"url": DEFAULT_URL_COVER_S, "size": "small"}, + ] for cover in defaul_covers: - if not _cover_exists('default', 'default', cover['size']): - _store_cover('default', 'default', cover['url'], cover['size']) + if not _cover_exists("default", "default", cover["size"]): + _store_cover("default", "default", cover["url"], cover["size"]) # ========= Platforms utils ========= def _exclude_platforms(platforms) -> None: - [platforms.remove(excluded) for excluded in cl.config['EXCLUDED_PLATFORMS'] if excluded in platforms] + [ + platforms.remove(excluded) + for excluded in cl.config["EXCLUDED_PLATFORMS"] + if excluded in platforms + ] def get_platforms() -> list[str]: """Gets all filesystem platforms - + Returns list with all the filesystem platforms found in the LIBRARY_BASE_PATH. Automatically exclude folders defined in user config. """ try: - platforms: list[str] = list(os.walk(HIGH_PRIO_STRUCTURE_PATH))[0][1] if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) else list(os.walk(LIBRARY_BASE_PATH))[0][1] + platforms: list[str] = ( + list(os.walk(HIGH_PRIO_STRUCTURE_PATH))[0][1] + if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) + else list(os.walk(LIBRARY_BASE_PATH))[0][1] + ) _exclude_platforms(platforms) except IndexError: raise PlatformsNotFoundException @@ -130,22 +168,26 @@ def get_platforms() -> list[str]: # ========= Roms utils ========= def get_roms_structure(p_slug: str) -> tuple: - return f"roms/{p_slug}" if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) else f"{p_slug}/roms" + return ( + f"roms/{p_slug}" + if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) + else f"{p_slug}/roms" + ) def _exclude_files(files, type) -> list[str]: - excluded_extensions = cl.config[f'EXCLUDED_{type.upper()}_EXT'] - excluded_names = cl.config[f'EXCLUDED_{type.upper()}_FILES'] + excluded_extensions = cl.config[f"EXCLUDED_{type.upper()}_EXT"] + excluded_names = cl.config[f"EXCLUDED_{type.upper()}_FILES"] filtered_files: list = [] for file in files: - if file.split('.')[-1] in excluded_extensions or file in excluded_names: + if file.split(".")[-1] in excluded_extensions or file in excluded_names: filtered_files.append(file) files = [f for f in files if f not in filtered_files] return files def _exclude_multi_roms(roms) -> list[str]: - excluded_names = cl.config['EXCLUDED_MULTI_FILES'] + excluded_names = cl.config["EXCLUDED_MULTI_FILES"] filtered_files: list = [] for rom in roms: if rom in excluded_names: @@ -157,7 +199,10 @@ def _exclude_multi_roms(roms) -> list[str]: def get_rom_files(rom: str, roms_path: str) -> list[str]: rom_files: list = [] for path, _, files in os.walk(f"{roms_path}/{rom}"): - [rom_files.append(f"{Path(path, f)}".replace(f"{roms_path}/{rom}/", '')) for f in _exclude_files(files, 'multi_parts')] + [ + rom_files.append(f"{Path(path, f)}".replace(f"{roms_path}/{rom}/", "")) + for f in _exclude_files(files, "multi_parts") + ] return rom_files @@ -178,26 +223,38 @@ def get_roms(p_slug: str) -> list[dict] or int: fs_multi_roms: list[str] = list(os.walk(roms_file_path))[0][1] except IndexError: raise RomsNotFoundException(p_slug) - fs_roms: list[dict] = [{'multi': False, 'file_name': rom} for rom in _exclude_files(fs_single_roms, 'single')] + \ - [{'multi': True, 'file_name': rom} for rom in _exclude_multi_roms(fs_multi_roms)] - [rom.update({'files': get_rom_files(rom['file_name'], roms_file_path)}) for rom in fs_roms] + fs_roms: list[dict] = [ + {"multi": False, "file_name": rom} + for rom in _exclude_files(fs_single_roms, "single") + ] + [ + {"multi": True, "file_name": rom} for rom in _exclude_multi_roms(fs_multi_roms) + ] + [ + rom.update({"files": get_rom_files(rom["file_name"], roms_file_path)}) + for rom in fs_roms + ] return fs_roms -def get_rom_size(multi: bool, rom: str, files: list, roms_path:str) -> str: - files: list = [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}"] if not multi else [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}/{file}" for file in files] +def get_rom_size(multi: bool, rom: str, files: list, roms_path: str) -> str: + files: list = ( + [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}"] + if not multi + else [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}/{file}" for file in files] + ) total_size: int = 0 for file in files: total_size += os.stat(file).st_size - for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: - if total_size < 1024.0 or unit == 'PB': break + for unit in ["B", "KB", "MB", "GB", "TB", "PB"]: + if total_size < 1024.0 or unit == "PB": + break total_size /= 1024.0 return round(total_size, 2), unit def _rom_exists(p_slug: str, file_name: str) -> bool: """Check if rom exists in filesystem - + Args: p_slug: short name of the platform file_name: rom file_name @@ -205,7 +262,9 @@ def _rom_exists(p_slug: str, file_name: str) -> bool: True if rom exists in filesystem else False """ rom_path = get_roms_structure(p_slug) - exists: bool = True if os.path.exists(f"{LIBRARY_BASE_PATH}/{rom_path}/{file_name}") else False + exists: bool = ( + True if os.path.exists(f"{LIBRARY_BASE_PATH}/{rom_path}/{file_name}") else False + ) return exists @@ -214,8 +273,11 @@ def rename_rom(p_slug: str, old_name: str, new_name: str) -> None: rom_path = get_roms_structure(p_slug) if _rom_exists(p_slug, new_name): raise RomAlreadyExistsException(new_name) - os.rename(f"{LIBRARY_BASE_PATH}/{rom_path}/{old_name}", f"{LIBRARY_BASE_PATH}/{rom_path}/{new_name}") - + os.rename( + f"{LIBRARY_BASE_PATH}/{rom_path}/{old_name}", + f"{LIBRARY_BASE_PATH}/{rom_path}/{new_name}", + ) + def remove_rom(p_slug: str, file_name: str) -> None: rom_path = get_roms_structure(p_slug)