Update endpoints that use form data

This commit is contained in:
Georges-Antoine Assi
2026-02-18 16:10:45 -05:00
parent 10f2e1377b
commit cef0ab0422
9 changed files with 377 additions and 130 deletions

View File

@@ -3,6 +3,7 @@ from datetime import datetime
from io import BytesIO
from typing import Annotated
from fastapi import File, Form
from fastapi import Path as PathVar
from fastapi import Query, Request, UploadFile, status
@@ -42,7 +43,14 @@ async def add_collection(
request: Request,
is_public: bool | None = None,
is_favorite: bool | None = None,
artwork: UploadFile | None = None,
artwork: UploadFile | None = File(
default=None, description="Collection artwork file."
),
name: str = Form(default=""),
description: str = Form(default=""),
url_cover: str = Form(
default="", description="Remote URL to fetch and use as cover artwork."
),
) -> CollectionSchema:
"""Create collection endpoint
@@ -52,12 +60,10 @@ async def add_collection(
Returns:
CollectionSchema: Just created collection
"""
data = await request.form()
cleaned_data = {
"name": data.get("name", ""),
"description": data.get("description", ""),
"url_cover": data.get("url_cover", ""),
"name": name,
"description": description,
"url_cover": url_cover,
"is_public": is_public or False,
"is_favorite": is_favorite or False,
"user_id": request.user.id,
@@ -104,7 +110,14 @@ async def add_collection(
@protected_route(router.post, "/smart", [Scope.COLLECTIONS_WRITE])
async def add_smart_collection(
request: Request, is_public: bool | None = None
request: Request,
is_public: bool | None = None,
name: str = Form(default=""),
description: str = Form(default=""),
filter_criteria: str = Form(
default="{}",
description="Smart collection filters as a JSON string.",
),
) -> SmartCollectionSchema:
"""Create smart collection endpoint
@@ -114,19 +127,16 @@ async def add_smart_collection(
Returns:
SmartCollectionSchema: Just created smart collection
"""
data = await request.form()
# Parse filter criteria from JSON string
try:
filter_criteria = json.loads(str(data.get("filter_criteria", "{}")))
parsed_filter_criteria = json.loads(filter_criteria)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON for filter_criteria field") from e
cleaned_data = {
"name": str(data.get("name", "")),
"description": str(data.get("description", "")),
"filter_criteria": filter_criteria,
"name": name,
"description": description,
"filter_criteria": parsed_filter_criteria,
"is_public": is_public if is_public is not None else False,
"user_id": request.user.id,
}
@@ -358,7 +368,16 @@ async def update_collection(
id: int,
remove_cover: bool = False,
is_public: bool | None = None,
artwork: UploadFile | None = None,
artwork: UploadFile | None = File(
default=None, description="Collection artwork file."
),
rom_ids: str = Form(
...,
description="Collection ROM IDs as a JSON array string (e.g. [1,2,3]).",
),
name: str | None = Form(default=None),
description: str | None = Form(default=None),
url_cover: str | None = Form(default=None, description="Updated remote cover URL."),
) -> CollectionSchema:
"""Update collection endpoint
@@ -368,9 +387,6 @@ async def update_collection(
Returns:
CollectionSchema: Updated collection
"""
data = await request.form()
collection = db_collection_handler.get_collection(id)
if not collection:
raise CollectionNotFoundInDatabaseException(id)
@@ -382,13 +398,15 @@ async def update_collection(
raise CollectionNotFoundInDatabaseException(id)
try:
rom_ids = json.loads(data["rom_ids"]) # type: ignore
parsed_rom_ids = json.loads(rom_ids)
except json.JSONDecodeError as e:
raise ValueError("Invalid list for rom_ids field in update collection") from e
cleaned_data = {
"name": data.get("name", collection.name),
"description": data.get("description", collection.description),
"name": name if name is not None else collection.name,
"description": (
description if description is not None else collection.description
),
"is_public": is_public if is_public is not None else collection.is_public,
"user_id": request.user.id,
}
@@ -415,26 +433,28 @@ async def update_collection(
}
)
else:
if data.get(
"url_cover", ""
) != collection.url_cover or not fs_resource_handler.cover_exists(
collection, CoverSize.BIG
current_url_cover = (
url_cover if url_cover is not None else collection.url_cover
)
if (
current_url_cover != collection.url_cover
or not fs_resource_handler.cover_exists(collection, CoverSize.BIG)
):
path_cover_s, path_cover_l = await fs_resource_handler.get_cover(
entity=collection,
overwrite=True,
url_cover=data.get("url_cover", ""), # type: ignore
url_cover=current_url_cover,
)
cleaned_data.update(
{
"url_cover": data.get("url_cover", collection.url_cover),
"url_cover": current_url_cover,
"path_cover_s": path_cover_s,
"path_cover_l": path_cover_l,
}
)
updated_collection = db_collection_handler.update_collection(
id, cleaned_data, rom_ids
id, cleaned_data, parsed_rom_ids
)
return CollectionSchema.model_validate(updated_collection)
@@ -445,6 +465,12 @@ async def update_smart_collection(
request: Request,
id: int,
is_public: bool | None = None,
name: str | None = Form(default=None),
description: str | None = Form(default=None),
filter_criteria: str | None = Form(
default=None,
description="Updated smart collection filters as a JSON string.",
),
) -> SmartCollectionSchema:
"""Update smart collection endpoint
@@ -455,9 +481,6 @@ async def update_smart_collection(
Returns:
SmartCollectionSchema: Updated smart collection
"""
data = await request.form()
smart_collection = db_collection_handler.get_smart_collection(id)
if not smart_collection:
raise CollectionNotFoundInDatabaseException(id)
@@ -466,17 +489,19 @@ async def update_smart_collection(
raise CollectionPermissionError(id)
# Parse filter criteria if provided
filter_criteria = smart_collection.filter_criteria
if "filter_criteria" in data:
parsed_filter_criteria = smart_collection.filter_criteria
if filter_criteria is not None:
try:
filter_criteria = json.loads(str(data["filter_criteria"]))
parsed_filter_criteria = json.loads(filter_criteria)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON for filter_criteria field") from e
cleaned_data = {
"name": str(data.get("name", smart_collection.name)),
"description": str(data.get("description", smart_collection.description)),
"filter_criteria": filter_criteria,
"name": name if name is not None else smart_collection.name,
"description": (
description if description is not None else smart_collection.description
),
"filter_criteria": parsed_filter_criteria,
"is_public": is_public if is_public is not None else smart_collection.is_public,
"user_id": request.user.id,
}

View File

@@ -1,4 +1,5 @@
from fastapi import HTTPException, Request, status
from pydantic import BaseModel
from config.config_manager import config_manager as cm
from decorators.auth import protected_route
@@ -14,6 +15,16 @@ router = APIRouter(
)
class PlatformBindingPayload(BaseModel):
fs_slug: str
slug: str
class ExclusionPayload(BaseModel):
exclusion_value: str
exclusion_type: str
@router.get("")
def get_config() -> ConfigResponse:
"""Get config endpoint
@@ -52,12 +63,13 @@ def get_config() -> ConfigResponse:
@protected_route(router.post, "/system/platforms", [Scope.PLATFORMS_WRITE])
async def add_platform_binding(request: Request) -> None:
async def add_platform_binding(
request: Request, payload: PlatformBindingPayload
) -> None:
"""Add platform binding to the configuration"""
data = await request.json()
fs_slug = data["fs_slug"]
slug = data["slug"]
fs_slug = payload.fs_slug
slug = payload.slug
try:
cm.add_platform_binding(fs_slug, slug)
@@ -82,12 +94,13 @@ async def delete_platform_binding(request: Request, fs_slug: str) -> None:
@protected_route(router.post, "/system/versions", [Scope.PLATFORMS_WRITE])
async def add_platform_version(request: Request) -> None:
async def add_platform_version(
request: Request, payload: PlatformBindingPayload
) -> None:
"""Add platform version to the configuration"""
data = await request.json()
fs_slug = data["fs_slug"]
slug = data["slug"]
fs_slug = payload.fs_slug
slug = payload.slug
try:
cm.add_platform_version(fs_slug, slug)
@@ -112,12 +125,11 @@ async def delete_platform_version(request: Request, fs_slug: str) -> None:
@protected_route(router.post, "/exclude", [Scope.PLATFORMS_WRITE])
async def add_exclusion(request: Request) -> None:
async def add_exclusion(request: Request, payload: ExclusionPayload) -> None:
"""Add platform exclusion to the configuration"""
data = await request.json()
exclusion_value = data["exclusion_value"]
exclusion_type = data["exclusion_type"]
exclusion_value = payload.exclusion_value
exclusion_type = payload.exclusion_type
try:
cm.add_exclusion(exclusion_type, exclusion_value)
except ConfigNotWritableException as exc:

View File

@@ -12,7 +12,9 @@ import pydash
from anyio import Path, open_file
from fastapi import (
Body,
Depends,
File,
Form,
Header,
HTTPException,
)
@@ -23,11 +25,10 @@ from fastapi import (
UploadFile,
status,
)
from fastapi.datastructures import FormData
from fastapi.responses import Response
from fastapi_pagination.ext.sqlalchemy import paginate
from fastapi_pagination.limit_offset import LimitOffsetPage, LimitOffsetParams
from pydantic import BaseModel
from pydantic import BaseModel, Field
from starlette.requests import ClientDisconnect
from starlette.responses import FileResponse
from streaming_form_data import StreamingFormDataParser
@@ -86,8 +87,117 @@ def safe_int_or_none(value: Any) -> int | None:
return safe_int(value)
def parse_raw_metadata(data: FormData, form_key: str) -> dict | None:
raw_json = data.get(form_key, None)
class RomUpdateForm(BaseModel):
igdb_id: str | None = Field(default=None, description="IGDB game ID.")
sgdb_id: str | None = Field(default=None, description="SteamGridDB game ID.")
moby_id: str | None = Field(default=None, description="MobyGames game ID.")
ss_id: str | None = Field(default=None, description="ScreenScraper game ID.")
ra_id: str | None = Field(default=None, description="RetroAchievements game ID.")
launchbox_id: str | None = Field(default=None, description="LaunchBox game ID.")
hasheous_id: str | None = Field(default=None, description="Hasheous game ID.")
tgdb_id: str | None = Field(default=None, description="TheGamesDB game ID.")
flashpoint_id: str | None = Field(default=None, description="Flashpoint game ID.")
hltb_id: str | None = Field(default=None, description="HowLongToBeat game ID.")
raw_igdb_metadata: str | None = Field(
default=None, description="Raw IGDB metadata as JSON string."
)
raw_moby_metadata: str | None = Field(
default=None, description="Raw MobyGames metadata as JSON string."
)
raw_ss_metadata: str | None = Field(
default=None, description="Raw ScreenScraper metadata as JSON string."
)
raw_launchbox_metadata: str | None = Field(
default=None, description="Raw LaunchBox metadata as JSON string."
)
raw_hasheous_metadata: str | None = Field(
default=None, description="Raw Hasheous metadata as JSON string."
)
raw_flashpoint_metadata: str | None = Field(
default=None, description="Raw Flashpoint metadata as JSON string."
)
raw_hltb_metadata: str | None = Field(
default=None, description="Raw HowLongToBeat metadata as JSON string."
)
raw_manual_metadata: str | None = Field(
default=None, description="Raw manual metadata as JSON string."
)
name: str | None = None
summary: str | None = None
fs_name: str | None = None
url_cover: str | None = None
url_manual: str | None = None
class RomUserUpdatePayload(BaseModel):
data: dict[str, Any] = Field(default_factory=dict)
update_last_played: bool = False
remove_last_played: bool = False
async def parse_rom_update_form(
request: Request,
igdb_id: str | None = Form(default=None),
sgdb_id: str | None = Form(default=None),
moby_id: str | None = Form(default=None),
ss_id: str | None = Form(default=None),
ra_id: str | None = Form(default=None),
launchbox_id: str | None = Form(default=None),
hasheous_id: str | None = Form(default=None),
tgdb_id: str | None = Form(default=None),
flashpoint_id: str | None = Form(default=None),
hltb_id: str | None = Form(default=None),
raw_igdb_metadata: str | None = Form(default=None),
raw_moby_metadata: str | None = Form(default=None),
raw_ss_metadata: str | None = Form(default=None),
raw_launchbox_metadata: str | None = Form(default=None),
raw_hasheous_metadata: str | None = Form(default=None),
raw_flashpoint_metadata: str | None = Form(default=None),
raw_hltb_metadata: str | None = Form(default=None),
raw_manual_metadata: str | None = Form(default=None),
name: str | None = Form(default=None),
summary: str | None = Form(default=None),
fs_name: str | None = Form(default=None),
url_cover: str | None = Form(default=None),
url_manual: str | None = Form(default=None),
) -> RomUpdateForm:
# Preserve "field was provided" behavior used by update logic.
form_keys = set((await request.form()).keys())
field_values = {
"igdb_id": igdb_id,
"sgdb_id": sgdb_id,
"moby_id": moby_id,
"ss_id": ss_id,
"ra_id": ra_id,
"launchbox_id": launchbox_id,
"hasheous_id": hasheous_id,
"tgdb_id": tgdb_id,
"flashpoint_id": flashpoint_id,
"hltb_id": hltb_id,
"raw_igdb_metadata": raw_igdb_metadata,
"raw_moby_metadata": raw_moby_metadata,
"raw_ss_metadata": raw_ss_metadata,
"raw_launchbox_metadata": raw_launchbox_metadata,
"raw_hasheous_metadata": raw_hasheous_metadata,
"raw_flashpoint_metadata": raw_flashpoint_metadata,
"raw_hltb_metadata": raw_hltb_metadata,
"raw_manual_metadata": raw_manual_metadata,
"name": name,
"summary": summary,
"fs_name": fs_name,
"url_cover": url_cover,
"url_manual": url_manual,
}
return RomUpdateForm.model_validate(
{field: value for field, value in field_values.items() if field in form_keys}
)
def parse_raw_metadata(form_data: RomUpdateForm, form_key: str) -> dict | None:
if form_key not in form_data.model_fields_set:
return None
raw_json = getattr(form_data, form_key, None)
if not raw_json or str(raw_json).strip() == "":
return None
@@ -988,6 +1098,7 @@ async def get_rom_content(
async def update_rom(
request: Request,
id: Annotated[int, PathVar(description="Rom internal id.", ge=1)],
form_data: Annotated[RomUpdateForm, Depends(parse_rom_update_form)],
artwork: Annotated[
UploadFile | None,
File(description="Custom artwork to set as cover."),
@@ -1002,8 +1113,6 @@ async def update_rom(
] = False,
) -> DetailedRomSchema:
"""Update a rom."""
data = await request.form()
rom = db_rom_handler.get_rom(id)
if not rom:
@@ -1051,50 +1160,69 @@ async def update_rom(
return DetailedRomSchema.from_orm_with_request(rom, request)
provided_fields = form_data.model_fields_set
cleaned_data: dict[str, Any] = {
"igdb_id": (
safe_int_or_none(data["igdb_id"]) if "igdb_id" in data else rom.igdb_id
safe_int_or_none(form_data.igdb_id)
if "igdb_id" in provided_fields
else rom.igdb_id
),
"sgdb_id": (
safe_int_or_none(data["sgdb_id"]) if "sgdb_id" in data else rom.sgdb_id
safe_int_or_none(form_data.sgdb_id)
if "sgdb_id" in provided_fields
else rom.sgdb_id
),
"moby_id": (
safe_int_or_none(data["moby_id"]) if "moby_id" in data else rom.moby_id
safe_int_or_none(form_data.moby_id)
if "moby_id" in provided_fields
else rom.moby_id
),
"ss_id": (
safe_int_or_none(form_data.ss_id)
if "ss_id" in provided_fields
else rom.ss_id
),
"ra_id": (
safe_int_or_none(form_data.ra_id)
if "ra_id" in provided_fields
else rom.ra_id
),
"ss_id": safe_int_or_none(data["ss_id"]) if "ss_id" in data else rom.ss_id,
"ra_id": safe_int_or_none(data["ra_id"]) if "ra_id" in data else rom.ra_id,
"launchbox_id": (
safe_int_or_none(data["launchbox_id"])
if "launchbox_id" in data
safe_int_or_none(form_data.launchbox_id)
if "launchbox_id" in provided_fields
else rom.launchbox_id
),
"hasheous_id": (
safe_int_or_none(data["hasheous_id"])
if "hasheous_id" in data
safe_int_or_none(form_data.hasheous_id)
if "hasheous_id" in provided_fields
else rom.hasheous_id
),
"tgdb_id": (
safe_int_or_none(data["tgdb_id"]) if "tgdb_id" in data else rom.tgdb_id
safe_int_or_none(form_data.tgdb_id)
if "tgdb_id" in provided_fields
else rom.tgdb_id
),
"flashpoint_id": (
data["flashpoint_id"] or None
if "flashpoint_id" in data
form_data.flashpoint_id or None
if "flashpoint_id" in provided_fields
else rom.flashpoint_id
),
"hltb_id": (
safe_int_or_none(data["hltb_id"]) if "hltb_id" in data else rom.hltb_id
safe_int_or_none(form_data.hltb_id)
if "hltb_id" in provided_fields
else rom.hltb_id
),
}
# Add raw metadata parsing
raw_igdb_metadata = parse_raw_metadata(data, "raw_igdb_metadata")
raw_moby_metadata = parse_raw_metadata(data, "raw_moby_metadata")
raw_ss_metadata = parse_raw_metadata(data, "raw_ss_metadata")
raw_launchbox_metadata = parse_raw_metadata(data, "raw_launchbox_metadata")
raw_hasheous_metadata = parse_raw_metadata(data, "raw_hasheous_metadata")
raw_flashpoint_metadata = parse_raw_metadata(data, "raw_flashpoint_metadata")
raw_hltb_metadata = parse_raw_metadata(data, "raw_hltb_metadata")
raw_manual_metadata = parse_raw_metadata(data, "raw_manual_metadata")
raw_igdb_metadata = parse_raw_metadata(form_data, "raw_igdb_metadata")
raw_moby_metadata = parse_raw_metadata(form_data, "raw_moby_metadata")
raw_ss_metadata = parse_raw_metadata(form_data, "raw_ss_metadata")
raw_launchbox_metadata = parse_raw_metadata(form_data, "raw_launchbox_metadata")
raw_hasheous_metadata = parse_raw_metadata(form_data, "raw_hasheous_metadata")
raw_flashpoint_metadata = parse_raw_metadata(form_data, "raw_flashpoint_metadata")
raw_hltb_metadata = parse_raw_metadata(form_data, "raw_hltb_metadata")
raw_manual_metadata = parse_raw_metadata(form_data, "raw_manual_metadata")
if cleaned_data["igdb_id"] and raw_igdb_metadata is not None:
cleaned_data["igdb_metadata"] = raw_igdb_metadata
if cleaned_data["moby_id"] and raw_moby_metadata is not None:
@@ -1175,12 +1303,14 @@ async def update_rom(
cleaned_data.update(
{
"name": data.get("name", rom.name),
"summary": data.get("summary", rom.summary),
"name": form_data.name if "name" in provided_fields else rom.name,
"summary": (
form_data.summary if "summary" in provided_fields else rom.summary
),
}
)
new_fs_name = str(data.get("fs_name") or rom.fs_name)
new_fs_name = str(form_data.fs_name or rom.fs_name)
new_fs_name = sanitize_filename(new_fs_name)
cleaned_data.update(
{
@@ -1212,7 +1342,9 @@ async def update_rom(
}
)
else:
url_cover = data.get("url_cover", rom.url_cover)
url_cover = (
form_data.url_cover if "url_cover" in provided_fields else rom.url_cover
)
path_cover_s, path_cover_l = await fs_resource_handler.get_cover(
entity=rom,
overwrite=url_cover != rom.url_cover,
@@ -1226,7 +1358,9 @@ async def update_rom(
}
)
url_manual = data.get("url_manual", rom.url_manual)
url_manual = (
form_data.url_manual if "url_manual" in provided_fields else rom.url_manual
)
path_manual = await fs_resource_handler.get_manual(
rom=rom,
overwrite=url_manual != rom.url_manual,
@@ -1519,20 +1653,10 @@ async def delete_roms(
async def update_rom_user(
request: Request,
id: Annotated[int, PathVar(description="Rom internal id.", ge=1)],
update_last_played: Annotated[
bool,
Body(description="Whether to update the last played date."),
] = False,
remove_last_played: Annotated[
bool,
Body(description="Whether to remove the last played date."),
] = False,
payload: Annotated[RomUserUpdatePayload, Body()],
) -> RomUserSchema:
"""Update rom data associated to the current user."""
# TODO: Migrate to native FastAPI body parsing.
data = await request.json()
rom_user_data = data.get("data", {})
rom_user_data = payload.data
rom = db_rom_handler.get_rom(id)
@@ -1560,9 +1684,9 @@ async def update_rom_user(
if field in rom_user_data
}
if update_last_played:
if payload.update_last_played:
cleaned_data.update({"last_played": datetime.now(timezone.utc)})
elif remove_last_played:
elif payload.remove_last_played:
cleaned_data.update({"last_played": None})
rom_user = db_rom_handler.update_rom_user(db_rom_user.id, cleaned_data)

View File

@@ -3,7 +3,7 @@ import re
from datetime import datetime, timezone
from typing import Annotated
from fastapi import Body, HTTPException, Request, UploadFile, status
from fastapi import Body, File, HTTPException, Request, UploadFile, status
from fastapi.responses import FileResponse
from decorators.auth import protected_route
@@ -110,26 +110,28 @@ async def add_save(
overwrite: bool = False,
autocleanup: bool = False,
autocleanup_limit: int = 10,
saveFile: UploadFile | None = File(
default=None, description="Save file to upload."
),
screenshotFile: UploadFile | None = File(
default=None, description="Screenshot file associated with this save."
),
) -> SaveSchema:
"""Upload a save file for a ROM."""
device = _resolve_device(
device_id, request.user.id, request.auth.scopes, Scope.DEVICES_WRITE
)
data = await request.form()
rom = db_rom_handler.get_rom(rom_id)
if not rom:
raise RomNotFoundInDatabaseException(rom_id)
if "saveFile" not in data:
if not saveFile:
log.error("No save file provided")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="No save file provided"
)
saveFile: UploadFile = data["saveFile"] # type: ignore
if not saveFile.filename:
log.error("Save file has no filename")
raise HTTPException(
@@ -250,7 +252,6 @@ async def add_save(
except FileNotFoundError:
log.warning(f"Could not delete old save file: {old_save.full_path}")
screenshotFile: UploadFile | None = data.get("screenshotFile", None) # type: ignore
if screenshotFile and screenshotFile.filename:
screenshots_path = fs_asset_handler.build_screenshots_file_path(
user=request.user, platform_fs_slug=rom.platform_slug, rom_id=rom.id
@@ -446,9 +447,17 @@ def confirm_download(
@protected_route(router.put, "/{id}", [Scope.ASSETS_WRITE])
async def update_save(request: Request, id: int) -> SaveSchema:
async def update_save(
request: Request,
id: int,
saveFile: UploadFile | None = File(
default=None, description="Updated save file content."
),
screenshotFile: UploadFile | None = File(
default=None, description="Updated screenshot file."
),
) -> SaveSchema:
"""Update a save file."""
data = await request.form()
db_save = db_save_handler.get_save(user_id=request.user.id, id=id)
if not db_save:
@@ -456,14 +465,12 @@ async def update_save(request: Request, id: int) -> SaveSchema:
log.error(error)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=error)
if "saveFile" in data:
saveFile: UploadFile = data["saveFile"] # type: ignore
if saveFile:
await fs_asset_handler.write_file(file=saveFile, path=db_save.file_path)
db_save = db_save_handler.update_save(
db_save.id, {"file_size_bytes": saveFile.size}
)
screenshotFile: UploadFile | None = data.get("screenshotFile", None) # type: ignore
if screenshotFile and screenshotFile.filename:
screenshots_path = fs_asset_handler.build_screenshots_file_path(
user=request.user,

View File

@@ -1,4 +1,4 @@
from fastapi import HTTPException, Request, UploadFile, status
from fastapi import File, HTTPException, Request, UploadFile, status
from decorators.auth import protected_route
from endpoints.responses.assets import ScreenshotSchema
@@ -22,9 +22,10 @@ router = APIRouter(
async def add_screenshot(
request: Request,
rom_id: int,
screenshotFile: UploadFile | None = File(
default=None, description="Screenshot file to upload."
),
) -> ScreenshotSchema:
data = await request.form()
rom = db_rom_handler.get_rom(id=rom_id)
if not rom:
raise RomNotFoundInDatabaseException(rom_id)
@@ -36,14 +37,12 @@ async def add_screenshot(
user=request.user, platform_fs_slug=rom.platform_slug, rom_id=rom.id
)
if "screenshotFile" not in data:
if not screenshotFile:
log.error("No screenshot file provided")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No screenshot file provided",
)
screenshotFile: UploadFile = data["screenshotFile"] # type: ignore
if not screenshotFile.filename:
log.error("Screenshot file has no filename")
raise HTTPException(

View File

@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from typing import Annotated
from fastapi import Body, HTTPException, Request, UploadFile, status
from fastapi import Body, File, HTTPException, Request, UploadFile, status
from decorators.auth import protected_route
from endpoints.responses.assets import StateSchema
@@ -27,9 +27,13 @@ async def add_state(
request: Request,
rom_id: int,
emulator: str | None = None,
stateFile: UploadFile | None = File(
default=None, description="State file to upload."
),
screenshotFile: UploadFile | None = File(
default=None, description="Screenshot file associated with this state."
),
) -> StateSchema:
data = await request.form()
rom = db_rom_handler.get_rom(rom_id)
if not rom:
raise RomNotFoundInDatabaseException(rom_id)
@@ -43,14 +47,12 @@ async def add_state(
emulator=emulator,
)
if "stateFile" not in data:
if not stateFile:
log.error("No state file provided")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="No state file provided"
)
stateFile: UploadFile = data["stateFile"] # type: ignore
if not stateFile.filename:
log.error("State file has no filename")
raise HTTPException(
@@ -95,7 +97,6 @@ async def add_state(
scanned_state.emulator = emulator
db_state = db_state_handler.add_state(state=scanned_state)
screenshotFile: UploadFile | None = data.get("screenshotFile", None) # type: ignore
if screenshotFile and screenshotFile.filename:
screenshots_path = fs_asset_handler.build_screenshots_file_path(
user=request.user, platform_fs_slug=rom.platform_slug, rom_id=rom.id
@@ -187,23 +188,27 @@ def get_state(request: Request, id: int) -> StateSchema:
@protected_route(router.put, "/{id}", [Scope.ASSETS_WRITE])
async def update_state(request: Request, id: int) -> StateSchema:
data = await request.form()
async def update_state(
request: Request,
id: int,
stateFile: UploadFile | None = File(
default=None, description="Updated state file content."
),
screenshotFile: UploadFile | None = File(
default=None, description="Updated screenshot file."
),
) -> StateSchema:
db_state = db_state_handler.get_state(user_id=request.user.id, id=id)
if not db_state:
error = f"State with ID {id} not found"
log.error(error)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=error)
if "stateFile" in data:
stateFile: UploadFile = data["stateFile"] # type: ignore
if stateFile:
await fs_asset_handler.write_file(file=stateFile, path=db_state.file_path)
db_state = db_state_handler.update_state(
db_state.id, {"file_size_bytes": stateFile.size}
)
screenshotFile: UploadFile | None = data.get("screenshotFile", None) # type: ignore
if screenshotFile and screenshotFile.filename:
screenshots_path = fs_asset_handler.build_screenshots_file_path(
user=request.user,

View File

@@ -1,3 +1,5 @@
from unittest.mock import patch
import pytest
from fastapi import status
from fastapi.testclient import TestClient
@@ -8,6 +10,7 @@ from config.config_manager import (
DEFAULT_EXCLUDED_EXTENSIONS,
DEFAULT_EXCLUDED_FILES,
)
from config.config_manager import config_manager as cm
@pytest.fixture
@@ -33,3 +36,39 @@ def test_config(client):
assert config.get("EXCLUDED_MULTI_PARTS_FILES") == DEFAULT_EXCLUDED_FILES
assert config.get("PLATFORMS_BINDING") == {}
assert not config.get("SKIP_HASH_CALCULATION")
def test_add_platform_binding_payload_shape(client, access_token: str):
with patch.object(cm, "add_platform_binding") as add_platform_binding:
response = client.post(
"/api/config/system/platforms",
headers={"Authorization": f"Bearer {access_token}"},
json={"fs_slug": "n64", "slug": "nintendo-64"},
)
assert response.status_code == status.HTTP_200_OK
add_platform_binding.assert_called_once_with("n64", "nintendo-64")
def test_add_platform_version_payload_shape(client, access_token: str):
with patch.object(cm, "add_platform_version") as add_platform_version:
response = client.post(
"/api/config/system/versions",
headers={"Authorization": f"Bearer {access_token}"},
json={"fs_slug": "n64", "slug": "1.0"},
)
assert response.status_code == status.HTTP_200_OK
add_platform_version.assert_called_once_with("n64", "1.0")
def test_add_exclusion_payload_shape(client, access_token: str):
with patch.object(cm, "add_exclusion") as add_exclusion:
response = client.post(
"/api/config/exclude",
headers={"Authorization": f"Bearer {access_token}"},
json={"exclusion_type": "single_files", "exclusion_value": "README.txt"},
)
assert response.status_code == status.HTTP_200_OK
add_exclusion.assert_called_once_with("single_files", "README.txt")

View File

@@ -123,6 +123,41 @@ def test_delete_roms(client: TestClient, access_token: str, rom: Rom):
assert body["successful_items"] == 1
def test_update_rom_user_props_with_data_envelope(
client: TestClient, access_token: str, rom: Rom
):
response = client.put(
f"/api/roms/{rom.id}/props",
headers={"Authorization": f"Bearer {access_token}"},
json={"data": {"backlogged": True, "rating": 7}},
)
assert response.status_code == status.HTTP_200_OK
body = response.json()
assert body["backlogged"] is True
assert body["rating"] == 7
def test_update_rom_user_props_last_played_flags(
client: TestClient, access_token: str, rom: Rom
):
mark_played_response = client.put(
f"/api/roms/{rom.id}/props",
headers={"Authorization": f"Bearer {access_token}"},
json={"data": {}, "update_last_played": True},
)
assert mark_played_response.status_code == status.HTTP_200_OK
assert mark_played_response.json()["last_played"] is not None
clear_played_response = client.put(
f"/api/roms/{rom.id}/props",
headers={"Authorization": f"Bearer {access_token}"},
json={"data": {}, "remove_last_played": True},
)
assert clear_played_response.status_code == status.HTTP_200_OK
assert clear_played_response.json()["last_played"] is None
class TestUpdateMetadataIDs:
@patch.object(
IGDBHandler, "get_rom_by_id", return_value=IGDBRom(igdb_id=MOCK_IGDB_ID)

View File

@@ -71,10 +71,11 @@
# versions:
# naomi: arcade
# The folder name where your roms are located
# filesystem:
# roms_folder: 'roms' # For example if your folder structure is /home/user/library/roms_folder
# skip_hash_calculation: false # Skip file hash calculations on low power devices (eg. Raspberry PI)
# # Custom games folder name only (eg. `retro_games/`)
# roms_folder: 'roms'
# # Skip file hash calculations on low power devices (eg. Raspberry PI)
# skip_hash_calculation: false
# scan:
# # Metadata priority during scans