mirror of
https://github.com/rommapp/romm.git
synced 2026-06-28 06:46:00 +00:00
Batch session inserts into single transaction
Replace per-item add_session with add_sessions using add_all. No fallback on IntegrityError -- duplicate concurrent submissions are the client's responsibility.
This commit is contained in:
@@ -2,7 +2,6 @@ from datetime import datetime, timedelta, timezone
|
||||
|
||||
from fastapi import HTTPException, Request, status
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from decorators.auth import protected_route
|
||||
from endpoints.responses.play_session import (
|
||||
@@ -121,6 +120,7 @@ def ingest_play_sessions(
|
||||
)
|
||||
|
||||
seen_keys: set[tuple[str | None, int | None, datetime]] = set()
|
||||
to_insert: list[tuple[int, int | None, PlaySession]] = []
|
||||
|
||||
for (idx, resolved_rom_id), item in zip(resolved, payload.sessions, strict=False):
|
||||
if idx in skipped_indices:
|
||||
@@ -138,40 +138,33 @@ def ingest_play_sessions(
|
||||
continue
|
||||
seen_keys.add(dedup_key)
|
||||
|
||||
play_session = PlaySession(
|
||||
user_id=user_id,
|
||||
device_id=resolved_device_id,
|
||||
rom_id=resolved_rom_id,
|
||||
save_slot=item.save_slot,
|
||||
start_time=item.start_time,
|
||||
end_time=item.end_time,
|
||||
duration_ms=item.duration_ms,
|
||||
to_insert.append(
|
||||
(
|
||||
idx,
|
||||
resolved_rom_id,
|
||||
PlaySession(
|
||||
user_id=user_id,
|
||||
device_id=resolved_device_id,
|
||||
rom_id=resolved_rom_id,
|
||||
save_slot=item.save_slot,
|
||||
start_time=item.start_time,
|
||||
end_time=item.end_time,
|
||||
duration_ms=item.duration_ms,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
created = db_play_session_handler.add_session(play_session)
|
||||
if to_insert:
|
||||
db_play_session_handler.add_sessions([ps for _, _, ps in to_insert])
|
||||
for idx, resolved_rom_id, ps in to_insert:
|
||||
results.append(
|
||||
PlaySessionIngestResult(
|
||||
index=idx,
|
||||
status="created",
|
||||
id=created.id,
|
||||
)
|
||||
PlaySessionIngestResult(index=idx, status="created", id=ps.id)
|
||||
)
|
||||
created_count += 1
|
||||
|
||||
if resolved_rom_id is not None:
|
||||
existing = rom_user_updates.get(resolved_rom_id)
|
||||
if existing is None or item.end_time > existing:
|
||||
rom_user_updates[resolved_rom_id] = item.end_time
|
||||
|
||||
except IntegrityError:
|
||||
results.append(
|
||||
PlaySessionIngestResult(
|
||||
index=idx,
|
||||
status="duplicate",
|
||||
)
|
||||
)
|
||||
skipped_count += 1
|
||||
if existing is None or ps.end_time > existing:
|
||||
rom_user_updates[resolved_rom_id] = ps.end_time
|
||||
|
||||
for rom_id, latest_end_time in rom_user_updates.items():
|
||||
rom_user = db_rom_handler.get_rom_user(rom_id=rom_id, user_id=user_id)
|
||||
|
||||
@@ -12,14 +12,14 @@ from .base_handler import DBBaseHandler
|
||||
|
||||
class DBPlaySessionsHandler(DBBaseHandler):
|
||||
@begin_session
|
||||
def add_session(
|
||||
def add_sessions(
|
||||
self,
|
||||
play_session: PlaySession,
|
||||
play_sessions: list[PlaySession],
|
||||
session: Session = None, # type: ignore
|
||||
) -> PlaySession:
|
||||
session.add(play_session)
|
||||
) -> list[PlaySession]:
|
||||
session.add_all(play_sessions)
|
||||
session.flush()
|
||||
return play_session
|
||||
return play_sessions
|
||||
|
||||
@begin_session
|
||||
def find_existing(
|
||||
|
||||
Reference in New Issue
Block a user