some simplification

This commit is contained in:
Georges-Antoine Assi
2026-04-06 21:53:49 -04:00
parent ae03b0b322
commit b2e1e094f7
7 changed files with 88 additions and 64 deletions

View File

@@ -22,7 +22,7 @@ class DBPlaySessionsHandler(DBBaseHandler):
return play_sessions
@begin_session
def find_existing(
def find_existing_sesssion(
self,
user_id: int,
keys: list[tuple[str | None, int | None, datetime]],

View File

@@ -1,10 +1,11 @@
from datetime import datetime, timedelta, timezone
from typing import Literal, NotRequired, TypedDict
from pydash import compact
from handler.database import db_device_handler, db_play_session_handler, db_rom_handler
from logger.logger import log
from models.play_session import PlaySession
from models.rom import Rom
from utils.datetime import to_utc
@@ -29,6 +30,28 @@ class PlaySessionEntry(TypedDict):
duration_ms: int
def _resolve_device(device_id: str | None, user_id: int) -> str | None:
if device_id is None:
return None
device = db_device_handler.get_device(device_id=device_id, user_id=user_id)
return device_id if device is not None else None
def _update_rom_user_last_played(
rom_user_updates: dict[int, datetime], user_id: int
) -> None:
for rom_id, latest_end_time in rom_user_updates.items():
rom_user = db_rom_handler.get_rom_user(rom_id=rom_id, user_id=user_id)
if not rom_user:
rom_user = db_rom_handler.add_rom_user(rom_id=rom_id, user_id=user_id)
current = to_utc(rom_user.last_played) if rom_user.last_played else None
if current is None or latest_end_time > current:
db_rom_handler.update_rom_user(
rom_user.id, {"last_played": latest_end_time}
)
def ingest_play_sessions(
*,
user_id: int,
@@ -39,25 +62,19 @@ def ingest_play_sessions(
max_future_minutes: int = 5,
) -> PlaySessionIngestSummary:
"""Core play session ingestion logic shared by the standalone endpoint and sync complete."""
now = datetime.now(timezone.utc)
max_future = now + timedelta(minutes=max_future_minutes)
max_future = datetime.now(timezone.utc) + timedelta(minutes=max_future_minutes)
resolved_device_id = _resolve_device(device_id, user_id)
# Resolve device
resolved_device_id = None
if device_id is not None:
device = db_device_handler.get_device(device_id=device_id, user_id=user_id)
if device is not None:
resolved_device_id = device_id
# Bulk-resolve all referenced rom IDs in one query
candidate_rom_ids = {e["rom_id"] for e in entries}
valid_rom_ids: set[int] = set()
if candidate_rom_ids:
found_roms = db_rom_handler.get_roms_by_ids(compact(candidate_rom_ids))
valid_rom_ids = {r.id for r in found_roms}
rom_cache: dict[int, Rom | None] = {}
# Phase 1: Validate and resolve each entry
results: list[PlaySessionIngestResult] = []
created_count = 0
skipped_count = 0
rom_user_updates: dict[int, datetime] = {}
resolved: list[tuple[int, int | None]] = []
candidate_keys: list[tuple[str | None, int | None, datetime]] = []
skipped_indices: set[int] = set()
valid: list[tuple[int, int | None, PlaySessionEntry]] = []
for idx, item in enumerate(entries):
if item["end_time"] > max_future:
@@ -68,44 +85,29 @@ def ingest_play_sessions(
"detail": "end_time is too far in the future",
}
)
skipped_count += 1
skipped_indices.add(idx)
resolved.append((idx, None))
candidate_keys.append((None, None, item["start_time"]))
continue
rom_id = item.get("rom_id")
resolved_rom_id = None
if rom_id is not None:
if rom_id not in rom_cache:
rom_cache[rom_id] = db_rom_handler.get_rom(id=rom_id)
rom = rom_cache[rom_id]
if rom is not None:
resolved_rom_id = rom_id
resolved_rom_id = rom_id if rom_id in valid_rom_ids else None
valid.append((idx, resolved_rom_id, item))
resolved.append((idx, resolved_rom_id))
candidate_keys.append(
(resolved_device_id, resolved_rom_id, to_utc(item["start_time"]))
)
existing_keys = db_play_session_handler.find_existing(
user_id=user_id,
keys=[k for i, k in enumerate(candidate_keys) if i not in skipped_indices],
# Phase 2: Batch dedup check
dedup_keys = [
(resolved_device_id, rom_id, to_utc(item["start_time"]))
for _, rom_id, item in valid
]
existing_keys = db_play_session_handler.find_existing_sesssion(
user_id=user_id, keys=dedup_keys
)
seen_keys: set[tuple[str | None, int | None, datetime]] = set()
to_insert: list[tuple[int, int | None, PlaySession]] = []
for (idx, resolved_rom_id), item in zip(resolved, entries, strict=False):
if idx in skipped_indices:
continue
dedup_key = (resolved_device_id, resolved_rom_id, to_utc(item["start_time"]))
if dedup_key in seen_keys or dedup_key in existing_keys:
for (idx, resolved_rom_id, item), key in zip(valid, dedup_keys, strict=True):
if key in seen_keys or key in existing_keys:
results.append({"index": idx, "status": "duplicate"})
skipped_count += 1
continue
seen_keys.add(dedup_key)
seen_keys.add(key)
to_insert.append(
(
@@ -124,35 +126,29 @@ def ingest_play_sessions(
)
)
# Phase 3: Bulk insert
if to_insert:
db_play_session_handler.add_sessions([ps for _, _, ps in to_insert])
for idx, resolved_rom_id, ps in to_insert:
results.append({"index": idx, "status": "created", "id": ps.id})
created_count += 1
if resolved_rom_id is not None:
existing = rom_user_updates.get(resolved_rom_id)
if existing is None or ps.end_time > existing:
rom_user_updates[resolved_rom_id] = ps.end_time
for rom_id, latest_end_time in rom_user_updates.items():
rom_user = db_rom_handler.get_rom_user(rom_id=rom_id, user_id=user_id)
if not rom_user:
rom_user = db_rom_handler.add_rom_user(rom_id=rom_id, user_id=user_id)
rom_user_updates: dict[int, datetime] = {}
for idx, resolved_rom_id, ps in to_insert:
results.append({"index": idx, "status": "created", "id": ps.id})
if resolved_rom_id is not None:
prev = rom_user_updates.get(resolved_rom_id)
if prev is None or ps.end_time > prev:
rom_user_updates[resolved_rom_id] = ps.end_time
update_data: dict = {}
current_last_played = (
to_utc(rom_user.last_played) if rom_user.last_played else None
)
if current_last_played is None or latest_end_time > current_last_played:
update_data["last_played"] = latest_end_time
if update_data:
db_rom_handler.update_rom_user(rom_user.id, update_data)
# Phase 4: Side effects
_update_rom_user_last_played(rom_user_updates, user_id)
if resolved_device_id is not None:
db_device_handler.update_last_seen(
device_id=resolved_device_id, user_id=user_id
)
created_count = len(to_insert)
skipped_count = len(entries) - created_count
log.info(
f"Ingested {created_count} play sessions for user {username}"
f" ({skipped_count} skipped)"

View File

@@ -127,10 +127,12 @@ export type { SSAgeRating } from './models/SSAgeRating';
export type { StateSchema } from './models/StateSchema';
export type { StatsReturn } from './models/StatsReturn';
export type { SyncCompletePayload } from './models/SyncCompletePayload';
export type { SyncCompleteResponse } from './models/SyncCompleteResponse';
export type { SyncMode } from './models/SyncMode';
export type { SyncNegotiatePayload } from './models/SyncNegotiatePayload';
export type { SyncNegotiateResponse } from './models/SyncNegotiateResponse';
export type { SyncOperationSchema } from './models/SyncOperationSchema';
export type { SyncPlaySessionEntry } from './models/SyncPlaySessionEntry';
export type { SyncSessionSchema } from './models/SyncSessionSchema';
export type { SyncTaskMeta } from './models/SyncTaskMeta';
export type { SyncTaskStatusResponse } from './models/SyncTaskStatusResponse';

View File

@@ -7,6 +7,7 @@ export type PlaySessionSchema = {
user_id: number;
device_id: (string | null);
rom_id: (number | null);
sync_session_id: (number | null);
save_slot: (string | null);
start_time: string;
end_time: string;

View File

@@ -2,8 +2,10 @@
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */
import type { SyncPlaySessionEntry } from './SyncPlaySessionEntry';
export type SyncCompletePayload = {
operations_completed?: number;
operations_failed?: number;
play_sessions?: (Array<SyncPlaySessionEntry> | null);
};

View File

@@ -0,0 +1,11 @@
/* generated using openapi-typescript-codegen -- do not edit */
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */
import type { PlaySessionIngestResponse } from './PlaySessionIngestResponse';
import type { SyncSessionSchema } from './SyncSessionSchema';
export type SyncCompleteResponse = {
session: SyncSessionSchema;
play_session_ingest?: (PlaySessionIngestResponse | null);
};

View File

@@ -0,0 +1,12 @@
/* generated using openapi-typescript-codegen -- do not edit */
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */
export type SyncPlaySessionEntry = {
rom_id?: (number | null);
save_slot?: (string | null);
start_time: string;
end_time: string;
duration_ms: number;
};