mirror of
https://github.com/rommapp/romm.git
synced 2026-07-01 08:16:21 +00:00
code formatted with black
This commit is contained in:
@@ -9,40 +9,58 @@ from models.platform import Platform
|
||||
from models.rom import Rom
|
||||
|
||||
|
||||
async def scan(_sid: str, platforms: str, complete_rescan: bool=True, sm=None):
|
||||
async def scan(_sid: str, platforms: str, complete_rescan: bool = True, sm=None):
|
||||
"""Scan platforms and roms and write them in database."""
|
||||
|
||||
log.info(emoji.emojize(":magnifying_glass_tilted_right: Scanning "))
|
||||
fs.store_default_resources()
|
||||
|
||||
try: # Scanning platforms
|
||||
try: # Scanning platforms
|
||||
fs_platforms: list[str] = fs.get_platforms()
|
||||
except PlatformsNotFoundException as e:
|
||||
log.error(e); await sm.emit('scan:done_ko', e.message); return
|
||||
|
||||
platforms: list[str] = json.loads(platforms) if len(json.loads(platforms)) > 0 else fs_platforms
|
||||
log.error(e)
|
||||
await sm.emit("scan:done_ko", e.message)
|
||||
return
|
||||
|
||||
platforms: list[str] = (
|
||||
json.loads(platforms) if len(json.loads(platforms)) > 0 else fs_platforms
|
||||
)
|
||||
log.info(f"Platforms to be scanned: {', '.join(platforms)}")
|
||||
for platform in platforms:
|
||||
log.info(emoji.emojize(f":video_game: {platform} {COLORS['reset']}"))
|
||||
try:
|
||||
scanned_platform: Platform = fastapi.scan_platform(platform)
|
||||
except RomsNotFoundException as e:
|
||||
log.error(e); continue
|
||||
await sm.emit('scan:scanning_platform', [scanned_platform.name, scanned_platform.slug]); await sm.emit('') # Workaround to emit in real-time
|
||||
if platform != str(scanned_platform): log.info(f"Identified as {COLORS['blue']}{scanned_platform}{COLORS['reset']}")
|
||||
log.error(e)
|
||||
continue
|
||||
await sm.emit(
|
||||
"scan:scanning_platform", [scanned_platform.name, scanned_platform.slug]
|
||||
)
|
||||
await sm.emit("") # Workaround to emit in real-time
|
||||
if platform != str(scanned_platform):
|
||||
log.info(
|
||||
f"Identified as {COLORS['blue']}{scanned_platform}{COLORS['reset']}"
|
||||
)
|
||||
dbh.add_platform(scanned_platform)
|
||||
|
||||
# Scanning roms
|
||||
fs_roms: list[str] = fs.get_roms(scanned_platform.fs_slug)
|
||||
for rom in fs_roms:
|
||||
rom_id: int = dbh.rom_exists(scanned_platform.slug, rom['file_name'])
|
||||
if rom_id and not complete_rescan: continue
|
||||
await sm.emit('scan:scanning_rom', rom['file_name']); await sm.emit('') # Workaround to emit in real-time
|
||||
rom_id: int = dbh.rom_exists(scanned_platform.slug, rom["file_name"])
|
||||
if rom_id and not complete_rescan:
|
||||
continue
|
||||
await sm.emit("scan:scanning_rom", rom["file_name"])
|
||||
await sm.emit("") # Workaround to emit in real-time
|
||||
log.info(f"Scanning {COLORS['orange']}{rom['file_name']}{COLORS['reset']}")
|
||||
if rom['multi']: [log.info(f"\t - {COLORS['orange_i']}{file}{COLORS['reset']}") for file in rom['files']]
|
||||
if rom["multi"]:
|
||||
[
|
||||
log.info(f"\t - {COLORS['orange_i']}{file}{COLORS['reset']}")
|
||||
for file in rom["files"]
|
||||
]
|
||||
scanned_rom: Rom = fastapi.scan_rom(scanned_platform, rom)
|
||||
if rom_id: scanned_rom.id = rom_id
|
||||
if rom_id:
|
||||
scanned_rom.id = rom_id
|
||||
dbh.add_rom(scanned_rom)
|
||||
dbh.purge_roms(scanned_platform.slug, [rom['file_name'] for rom in fs_roms])
|
||||
dbh.purge_roms(scanned_platform.slug, [rom["file_name"] for rom in fs_roms])
|
||||
dbh.purge_platforms(fs_platforms)
|
||||
await sm.emit('scan:done')
|
||||
await sm.emit("scan:done")
|
||||
|
||||
@@ -4,17 +4,30 @@ import datetime
|
||||
import requests
|
||||
from pathlib import Path
|
||||
|
||||
from config import LIBRARY_BASE_PATH, HIGH_PRIO_STRUCTURE_PATH, \
|
||||
RESOURCES_BASE_PATH, DEFAULT_URL_COVER_L, DEFAULT_PATH_COVER_L, DEFAULT_URL_COVER_S, DEFAULT_PATH_COVER_S
|
||||
from config import (
|
||||
LIBRARY_BASE_PATH,
|
||||
HIGH_PRIO_STRUCTURE_PATH,
|
||||
RESOURCES_BASE_PATH,
|
||||
DEFAULT_URL_COVER_L,
|
||||
DEFAULT_PATH_COVER_L,
|
||||
DEFAULT_URL_COVER_S,
|
||||
DEFAULT_PATH_COVER_S,
|
||||
)
|
||||
from config.config_loader import ConfigLoader
|
||||
from utils.exceptions import PlatformsNotFoundException, RomsNotFoundException, RomNotFoundError, RomAlreadyExistsException
|
||||
from utils.exceptions import (
|
||||
PlatformsNotFoundException,
|
||||
RomsNotFoundException,
|
||||
RomNotFoundError,
|
||||
RomAlreadyExistsException,
|
||||
)
|
||||
|
||||
cl = ConfigLoader()
|
||||
|
||||
|
||||
# ========= Resources utils =========
|
||||
def _cover_exists(p_slug: str, file_name: str, size: str) -> bool:
|
||||
"""Check if rom cover exists in filesystem
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
@@ -28,7 +41,7 @@ def _cover_exists(p_slug: str, file_name: str, size: str) -> bool:
|
||||
|
||||
def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None:
|
||||
"""Store roms resources in filesystem
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
@@ -37,16 +50,16 @@ def _store_cover(p_slug: str, file_name: str, url_cover: str, size: str) -> None
|
||||
"""
|
||||
cover_file: str = f"{size}.png"
|
||||
cover_path: str = f"{RESOURCES_BASE_PATH}/{p_slug}/{file_name}/cover"
|
||||
res = requests.get(url_cover.replace('t_thumb', f't_cover_{size}'), stream=True)
|
||||
res = requests.get(url_cover.replace("t_thumb", f"t_cover_{size}"), stream=True)
|
||||
if res.status_code == 200:
|
||||
Path(cover_path).mkdir(parents=True, exist_ok=True)
|
||||
with open(f"{cover_path}/{cover_file}", 'wb') as f:
|
||||
with open(f"{cover_path}/{cover_file}", "wb") as f:
|
||||
shutil.copyfileobj(res.raw, f)
|
||||
|
||||
|
||||
def _get_cover_path(p_slug: str, file_name: str, size: str) -> str:
|
||||
"""Returns rom cover filesystem path adapted to frontend folder structure
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
@@ -57,17 +70,31 @@ def _get_cover_path(p_slug: str, file_name: str, size: str) -> str:
|
||||
|
||||
def get_cover(overwrite: bool, p_slug: str, file_name: str, url_cover: str) -> tuple:
|
||||
# Cover small
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 'small')) and url_cover: _store_cover(p_slug, file_name, url_cover, 'small')
|
||||
path_cover_s = _get_cover_path(p_slug, file_name, 'small') if _cover_exists(p_slug, file_name, 'small') else DEFAULT_PATH_COVER_S
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, "small")) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, "small")
|
||||
path_cover_s = (
|
||||
_get_cover_path(p_slug, file_name, "small")
|
||||
if _cover_exists(p_slug, file_name, "small")
|
||||
else DEFAULT_PATH_COVER_S
|
||||
)
|
||||
# Cover big
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, 'big')) and url_cover: _store_cover(p_slug, file_name, url_cover, 'big')
|
||||
(path_cover_l, has_cover) = (_get_cover_path(p_slug, file_name, 'big'), 1) if _cover_exists(p_slug, file_name, 'big') else (DEFAULT_PATH_COVER_L, 0)
|
||||
return {'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover}
|
||||
if (overwrite or not _cover_exists(p_slug, file_name, "big")) and url_cover:
|
||||
_store_cover(p_slug, file_name, url_cover, "big")
|
||||
(path_cover_l, has_cover) = (
|
||||
(_get_cover_path(p_slug, file_name, "big"), 1)
|
||||
if _cover_exists(p_slug, file_name, "big")
|
||||
else (DEFAULT_PATH_COVER_L, 0)
|
||||
)
|
||||
return {
|
||||
"path_cover_s": path_cover_s,
|
||||
"path_cover_l": path_cover_l,
|
||||
"has_cover": has_cover,
|
||||
}
|
||||
|
||||
|
||||
def _store_screenshot(p_slug: str, file_name: str, url: str, idx: int) -> None:
|
||||
"""Store roms resources in filesystem
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
@@ -78,13 +105,13 @@ def _store_screenshot(p_slug: str, file_name: str, url: str, idx: int) -> None:
|
||||
res = requests.get(url, stream=True)
|
||||
if res.status_code == 200:
|
||||
Path(screenshot_path).mkdir(parents=True, exist_ok=True)
|
||||
with open(f"{screenshot_path}/{screenshot_file}", 'wb') as f:
|
||||
with open(f"{screenshot_path}/{screenshot_file}", "wb") as f:
|
||||
shutil.copyfileobj(res.raw, f)
|
||||
|
||||
|
||||
def _get_screenshot_path(p_slug: str, file_name: str, idx: str) -> str:
|
||||
"""Returns rom cover filesystem path adapted to frontend folder structure
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: name of rom file
|
||||
@@ -98,30 +125,41 @@ def get_screenshots(p_slug: str, file_name: str, url_screenshots: list) -> tuple
|
||||
for idx, url in enumerate(url_screenshots):
|
||||
_store_screenshot(p_slug, file_name, url, idx)
|
||||
path_screenshots.append(_get_screenshot_path(p_slug, file_name, idx))
|
||||
return {'path_screenshots': path_screenshots}
|
||||
return {"path_screenshots": path_screenshots}
|
||||
|
||||
|
||||
def store_default_resources() -> None:
|
||||
"""Store default cover resources in the filesystem"""
|
||||
defaul_covers: dict = [{'url': DEFAULT_URL_COVER_L, 'size': 'big'}, {'url': DEFAULT_URL_COVER_S, 'size': 'small'}]
|
||||
defaul_covers: dict = [
|
||||
{"url": DEFAULT_URL_COVER_L, "size": "big"},
|
||||
{"url": DEFAULT_URL_COVER_S, "size": "small"},
|
||||
]
|
||||
for cover in defaul_covers:
|
||||
if not _cover_exists('default', 'default', cover['size']):
|
||||
_store_cover('default', 'default', cover['url'], cover['size'])
|
||||
if not _cover_exists("default", "default", cover["size"]):
|
||||
_store_cover("default", "default", cover["url"], cover["size"])
|
||||
|
||||
|
||||
# ========= Platforms utils =========
|
||||
def _exclude_platforms(platforms) -> None:
|
||||
[platforms.remove(excluded) for excluded in cl.config['EXCLUDED_PLATFORMS'] if excluded in platforms]
|
||||
[
|
||||
platforms.remove(excluded)
|
||||
for excluded in cl.config["EXCLUDED_PLATFORMS"]
|
||||
if excluded in platforms
|
||||
]
|
||||
|
||||
|
||||
def get_platforms() -> list[str]:
|
||||
"""Gets all filesystem platforms
|
||||
|
||||
|
||||
Returns list with all the filesystem platforms found in the LIBRARY_BASE_PATH.
|
||||
Automatically exclude folders defined in user config.
|
||||
"""
|
||||
try:
|
||||
platforms: list[str] = list(os.walk(HIGH_PRIO_STRUCTURE_PATH))[0][1] if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) else list(os.walk(LIBRARY_BASE_PATH))[0][1]
|
||||
platforms: list[str] = (
|
||||
list(os.walk(HIGH_PRIO_STRUCTURE_PATH))[0][1]
|
||||
if os.path.exists(HIGH_PRIO_STRUCTURE_PATH)
|
||||
else list(os.walk(LIBRARY_BASE_PATH))[0][1]
|
||||
)
|
||||
_exclude_platforms(platforms)
|
||||
except IndexError:
|
||||
raise PlatformsNotFoundException
|
||||
@@ -130,22 +168,26 @@ def get_platforms() -> list[str]:
|
||||
|
||||
# ========= Roms utils =========
|
||||
def get_roms_structure(p_slug: str) -> tuple:
|
||||
return f"roms/{p_slug}" if os.path.exists(HIGH_PRIO_STRUCTURE_PATH) else f"{p_slug}/roms"
|
||||
return (
|
||||
f"roms/{p_slug}"
|
||||
if os.path.exists(HIGH_PRIO_STRUCTURE_PATH)
|
||||
else f"{p_slug}/roms"
|
||||
)
|
||||
|
||||
|
||||
def _exclude_files(files, type) -> list[str]:
|
||||
excluded_extensions = cl.config[f'EXCLUDED_{type.upper()}_EXT']
|
||||
excluded_names = cl.config[f'EXCLUDED_{type.upper()}_FILES']
|
||||
excluded_extensions = cl.config[f"EXCLUDED_{type.upper()}_EXT"]
|
||||
excluded_names = cl.config[f"EXCLUDED_{type.upper()}_FILES"]
|
||||
filtered_files: list = []
|
||||
for file in files:
|
||||
if file.split('.')[-1] in excluded_extensions or file in excluded_names:
|
||||
if file.split(".")[-1] in excluded_extensions or file in excluded_names:
|
||||
filtered_files.append(file)
|
||||
files = [f for f in files if f not in filtered_files]
|
||||
return files
|
||||
|
||||
|
||||
def _exclude_multi_roms(roms) -> list[str]:
|
||||
excluded_names = cl.config['EXCLUDED_MULTI_FILES']
|
||||
excluded_names = cl.config["EXCLUDED_MULTI_FILES"]
|
||||
filtered_files: list = []
|
||||
for rom in roms:
|
||||
if rom in excluded_names:
|
||||
@@ -157,7 +199,10 @@ def _exclude_multi_roms(roms) -> list[str]:
|
||||
def get_rom_files(rom: str, roms_path: str) -> list[str]:
|
||||
rom_files: list = []
|
||||
for path, _, files in os.walk(f"{roms_path}/{rom}"):
|
||||
[rom_files.append(f"{Path(path, f)}".replace(f"{roms_path}/{rom}/", '')) for f in _exclude_files(files, 'multi_parts')]
|
||||
[
|
||||
rom_files.append(f"{Path(path, f)}".replace(f"{roms_path}/{rom}/", ""))
|
||||
for f in _exclude_files(files, "multi_parts")
|
||||
]
|
||||
return rom_files
|
||||
|
||||
|
||||
@@ -178,26 +223,38 @@ def get_roms(p_slug: str) -> list[dict] or int:
|
||||
fs_multi_roms: list[str] = list(os.walk(roms_file_path))[0][1]
|
||||
except IndexError:
|
||||
raise RomsNotFoundException(p_slug)
|
||||
fs_roms: list[dict] = [{'multi': False, 'file_name': rom} for rom in _exclude_files(fs_single_roms, 'single')] + \
|
||||
[{'multi': True, 'file_name': rom} for rom in _exclude_multi_roms(fs_multi_roms)]
|
||||
[rom.update({'files': get_rom_files(rom['file_name'], roms_file_path)}) for rom in fs_roms]
|
||||
fs_roms: list[dict] = [
|
||||
{"multi": False, "file_name": rom}
|
||||
for rom in _exclude_files(fs_single_roms, "single")
|
||||
] + [
|
||||
{"multi": True, "file_name": rom} for rom in _exclude_multi_roms(fs_multi_roms)
|
||||
]
|
||||
[
|
||||
rom.update({"files": get_rom_files(rom["file_name"], roms_file_path)})
|
||||
for rom in fs_roms
|
||||
]
|
||||
return fs_roms
|
||||
|
||||
|
||||
def get_rom_size(multi: bool, rom: str, files: list, roms_path:str) -> str:
|
||||
files: list = [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}"] if not multi else [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}/{file}" for file in files]
|
||||
def get_rom_size(multi: bool, rom: str, files: list, roms_path: str) -> str:
|
||||
files: list = (
|
||||
[f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}"]
|
||||
if not multi
|
||||
else [f"{LIBRARY_BASE_PATH}/{roms_path}/{rom}/{file}" for file in files]
|
||||
)
|
||||
total_size: int = 0
|
||||
for file in files:
|
||||
total_size += os.stat(file).st_size
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
|
||||
if total_size < 1024.0 or unit == 'PB': break
|
||||
for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
|
||||
if total_size < 1024.0 or unit == "PB":
|
||||
break
|
||||
total_size /= 1024.0
|
||||
return round(total_size, 2), unit
|
||||
|
||||
|
||||
def _rom_exists(p_slug: str, file_name: str) -> bool:
|
||||
"""Check if rom exists in filesystem
|
||||
|
||||
|
||||
Args:
|
||||
p_slug: short name of the platform
|
||||
file_name: rom file_name
|
||||
@@ -205,7 +262,9 @@ def _rom_exists(p_slug: str, file_name: str) -> bool:
|
||||
True if rom exists in filesystem else False
|
||||
"""
|
||||
rom_path = get_roms_structure(p_slug)
|
||||
exists: bool = True if os.path.exists(f"{LIBRARY_BASE_PATH}/{rom_path}/{file_name}") else False
|
||||
exists: bool = (
|
||||
True if os.path.exists(f"{LIBRARY_BASE_PATH}/{rom_path}/{file_name}") else False
|
||||
)
|
||||
return exists
|
||||
|
||||
|
||||
@@ -214,8 +273,11 @@ def rename_rom(p_slug: str, old_name: str, new_name: str) -> None:
|
||||
rom_path = get_roms_structure(p_slug)
|
||||
if _rom_exists(p_slug, new_name):
|
||||
raise RomAlreadyExistsException(new_name)
|
||||
os.rename(f"{LIBRARY_BASE_PATH}/{rom_path}/{old_name}", f"{LIBRARY_BASE_PATH}/{rom_path}/{new_name}")
|
||||
|
||||
os.rename(
|
||||
f"{LIBRARY_BASE_PATH}/{rom_path}/{old_name}",
|
||||
f"{LIBRARY_BASE_PATH}/{rom_path}/{new_name}",
|
||||
)
|
||||
|
||||
|
||||
def remove_rom(p_slug: str, file_name: str) -> None:
|
||||
rom_path = get_roms_structure(p_slug)
|
||||
|
||||
Reference in New Issue
Block a user