diff --git a/backend/handler/database/play_sessions_handler.py b/backend/handler/database/play_sessions_handler.py index 0f0b59b03..ac956674c 100644 --- a/backend/handler/database/play_sessions_handler.py +++ b/backend/handler/database/play_sessions_handler.py @@ -22,7 +22,7 @@ class DBPlaySessionsHandler(DBBaseHandler): return play_sessions @begin_session - def find_existing( + def find_existing_sesssion( self, user_id: int, keys: list[tuple[str | None, int | None, datetime]], diff --git a/backend/handler/play_session_handler.py b/backend/handler/play_session_handler.py index 106fbb35d..bc64333c6 100644 --- a/backend/handler/play_session_handler.py +++ b/backend/handler/play_session_handler.py @@ -1,10 +1,11 @@ from datetime import datetime, timedelta, timezone from typing import Literal, NotRequired, TypedDict +from pydash import compact + from handler.database import db_device_handler, db_play_session_handler, db_rom_handler from logger.logger import log from models.play_session import PlaySession -from models.rom import Rom from utils.datetime import to_utc @@ -29,6 +30,28 @@ class PlaySessionEntry(TypedDict): duration_ms: int +def _resolve_device(device_id: str | None, user_id: int) -> str | None: + if device_id is None: + return None + device = db_device_handler.get_device(device_id=device_id, user_id=user_id) + return device_id if device is not None else None + + +def _update_rom_user_last_played( + rom_user_updates: dict[int, datetime], user_id: int +) -> None: + for rom_id, latest_end_time in rom_user_updates.items(): + rom_user = db_rom_handler.get_rom_user(rom_id=rom_id, user_id=user_id) + if not rom_user: + rom_user = db_rom_handler.add_rom_user(rom_id=rom_id, user_id=user_id) + + current = to_utc(rom_user.last_played) if rom_user.last_played else None + if current is None or latest_end_time > current: + db_rom_handler.update_rom_user( + rom_user.id, {"last_played": latest_end_time} + ) + + def ingest_play_sessions( *, user_id: int, @@ -39,25 +62,19 @@ def ingest_play_sessions( max_future_minutes: int = 5, ) -> PlaySessionIngestSummary: """Core play session ingestion logic shared by the standalone endpoint and sync complete.""" - now = datetime.now(timezone.utc) - max_future = now + timedelta(minutes=max_future_minutes) + max_future = datetime.now(timezone.utc) + timedelta(minutes=max_future_minutes) + resolved_device_id = _resolve_device(device_id, user_id) - # Resolve device - resolved_device_id = None - if device_id is not None: - device = db_device_handler.get_device(device_id=device_id, user_id=user_id) - if device is not None: - resolved_device_id = device_id + # Bulk-resolve all referenced rom IDs in one query + candidate_rom_ids = {e["rom_id"] for e in entries} + valid_rom_ids: set[int] = set() + if candidate_rom_ids: + found_roms = db_rom_handler.get_roms_by_ids(compact(candidate_rom_ids)) + valid_rom_ids = {r.id for r in found_roms} - rom_cache: dict[int, Rom | None] = {} + # Phase 1: Validate and resolve each entry results: list[PlaySessionIngestResult] = [] - created_count = 0 - skipped_count = 0 - rom_user_updates: dict[int, datetime] = {} - - resolved: list[tuple[int, int | None]] = [] - candidate_keys: list[tuple[str | None, int | None, datetime]] = [] - skipped_indices: set[int] = set() + valid: list[tuple[int, int | None, PlaySessionEntry]] = [] for idx, item in enumerate(entries): if item["end_time"] > max_future: @@ -68,44 +85,29 @@ def ingest_play_sessions( "detail": "end_time is too far in the future", } ) - skipped_count += 1 - skipped_indices.add(idx) - resolved.append((idx, None)) - candidate_keys.append((None, None, item["start_time"])) continue rom_id = item.get("rom_id") - resolved_rom_id = None - if rom_id is not None: - if rom_id not in rom_cache: - rom_cache[rom_id] = db_rom_handler.get_rom(id=rom_id) - rom = rom_cache[rom_id] - if rom is not None: - resolved_rom_id = rom_id + resolved_rom_id = rom_id if rom_id in valid_rom_ids else None + valid.append((idx, resolved_rom_id, item)) - resolved.append((idx, resolved_rom_id)) - candidate_keys.append( - (resolved_device_id, resolved_rom_id, to_utc(item["start_time"])) - ) - - existing_keys = db_play_session_handler.find_existing( - user_id=user_id, - keys=[k for i, k in enumerate(candidate_keys) if i not in skipped_indices], + # Phase 2: Batch dedup check + dedup_keys = [ + (resolved_device_id, rom_id, to_utc(item["start_time"])) + for _, rom_id, item in valid + ] + existing_keys = db_play_session_handler.find_existing_sesssion( + user_id=user_id, keys=dedup_keys ) seen_keys: set[tuple[str | None, int | None, datetime]] = set() to_insert: list[tuple[int, int | None, PlaySession]] = [] - for (idx, resolved_rom_id), item in zip(resolved, entries, strict=False): - if idx in skipped_indices: - continue - - dedup_key = (resolved_device_id, resolved_rom_id, to_utc(item["start_time"])) - if dedup_key in seen_keys or dedup_key in existing_keys: + for (idx, resolved_rom_id, item), key in zip(valid, dedup_keys, strict=True): + if key in seen_keys or key in existing_keys: results.append({"index": idx, "status": "duplicate"}) - skipped_count += 1 continue - seen_keys.add(dedup_key) + seen_keys.add(key) to_insert.append( ( @@ -124,35 +126,29 @@ def ingest_play_sessions( ) ) + # Phase 3: Bulk insert if to_insert: db_play_session_handler.add_sessions([ps for _, _, ps in to_insert]) - for idx, resolved_rom_id, ps in to_insert: - results.append({"index": idx, "status": "created", "id": ps.id}) - created_count += 1 - if resolved_rom_id is not None: - existing = rom_user_updates.get(resolved_rom_id) - if existing is None or ps.end_time > existing: - rom_user_updates[resolved_rom_id] = ps.end_time - for rom_id, latest_end_time in rom_user_updates.items(): - rom_user = db_rom_handler.get_rom_user(rom_id=rom_id, user_id=user_id) - if not rom_user: - rom_user = db_rom_handler.add_rom_user(rom_id=rom_id, user_id=user_id) + rom_user_updates: dict[int, datetime] = {} + for idx, resolved_rom_id, ps in to_insert: + results.append({"index": idx, "status": "created", "id": ps.id}) + if resolved_rom_id is not None: + prev = rom_user_updates.get(resolved_rom_id) + if prev is None or ps.end_time > prev: + rom_user_updates[resolved_rom_id] = ps.end_time - update_data: dict = {} - current_last_played = ( - to_utc(rom_user.last_played) if rom_user.last_played else None - ) - if current_last_played is None or latest_end_time > current_last_played: - update_data["last_played"] = latest_end_time - if update_data: - db_rom_handler.update_rom_user(rom_user.id, update_data) + # Phase 4: Side effects + _update_rom_user_last_played(rom_user_updates, user_id) if resolved_device_id is not None: db_device_handler.update_last_seen( device_id=resolved_device_id, user_id=user_id ) + created_count = len(to_insert) + skipped_count = len(entries) - created_count + log.info( f"Ingested {created_count} play sessions for user {username}" f" ({skipped_count} skipped)" diff --git a/frontend/src/__generated__/index.ts b/frontend/src/__generated__/index.ts index 4e52fdbc2..352bc0682 100644 --- a/frontend/src/__generated__/index.ts +++ b/frontend/src/__generated__/index.ts @@ -127,10 +127,12 @@ export type { SSAgeRating } from './models/SSAgeRating'; export type { StateSchema } from './models/StateSchema'; export type { StatsReturn } from './models/StatsReturn'; export type { SyncCompletePayload } from './models/SyncCompletePayload'; +export type { SyncCompleteResponse } from './models/SyncCompleteResponse'; export type { SyncMode } from './models/SyncMode'; export type { SyncNegotiatePayload } from './models/SyncNegotiatePayload'; export type { SyncNegotiateResponse } from './models/SyncNegotiateResponse'; export type { SyncOperationSchema } from './models/SyncOperationSchema'; +export type { SyncPlaySessionEntry } from './models/SyncPlaySessionEntry'; export type { SyncSessionSchema } from './models/SyncSessionSchema'; export type { SyncTaskMeta } from './models/SyncTaskMeta'; export type { SyncTaskStatusResponse } from './models/SyncTaskStatusResponse'; diff --git a/frontend/src/__generated__/models/PlaySessionSchema.ts b/frontend/src/__generated__/models/PlaySessionSchema.ts index 54d56da7e..c21143fd2 100644 --- a/frontend/src/__generated__/models/PlaySessionSchema.ts +++ b/frontend/src/__generated__/models/PlaySessionSchema.ts @@ -7,6 +7,7 @@ export type PlaySessionSchema = { user_id: number; device_id: (string | null); rom_id: (number | null); + sync_session_id: (number | null); save_slot: (string | null); start_time: string; end_time: string; diff --git a/frontend/src/__generated__/models/SyncCompletePayload.ts b/frontend/src/__generated__/models/SyncCompletePayload.ts index f43b15bf6..19322ecf6 100644 --- a/frontend/src/__generated__/models/SyncCompletePayload.ts +++ b/frontend/src/__generated__/models/SyncCompletePayload.ts @@ -2,8 +2,10 @@ /* istanbul ignore file */ /* tslint:disable */ /* eslint-disable */ +import type { SyncPlaySessionEntry } from './SyncPlaySessionEntry'; export type SyncCompletePayload = { operations_completed?: number; operations_failed?: number; + play_sessions?: (Array | null); }; diff --git a/frontend/src/__generated__/models/SyncCompleteResponse.ts b/frontend/src/__generated__/models/SyncCompleteResponse.ts new file mode 100644 index 000000000..a913e94ae --- /dev/null +++ b/frontend/src/__generated__/models/SyncCompleteResponse.ts @@ -0,0 +1,11 @@ +/* generated using openapi-typescript-codegen -- do not edit */ +/* istanbul ignore file */ +/* tslint:disable */ +/* eslint-disable */ +import type { PlaySessionIngestResponse } from './PlaySessionIngestResponse'; +import type { SyncSessionSchema } from './SyncSessionSchema'; +export type SyncCompleteResponse = { + session: SyncSessionSchema; + play_session_ingest?: (PlaySessionIngestResponse | null); +}; + diff --git a/frontend/src/__generated__/models/SyncPlaySessionEntry.ts b/frontend/src/__generated__/models/SyncPlaySessionEntry.ts new file mode 100644 index 000000000..655f0478e --- /dev/null +++ b/frontend/src/__generated__/models/SyncPlaySessionEntry.ts @@ -0,0 +1,12 @@ +/* generated using openapi-typescript-codegen -- do not edit */ +/* istanbul ignore file */ +/* tslint:disable */ +/* eslint-disable */ +export type SyncPlaySessionEntry = { + rom_id?: (number | null); + save_slot?: (string | null); + start_time: string; + end_time: string; + duration_ms: number; +}; +