add tests

This commit is contained in:
Georges-Antoine Assi
2026-03-14 23:35:04 -04:00
parent f13f929d7d
commit b3fbbf59fb
10 changed files with 1233 additions and 12 deletions

View File

@@ -8,6 +8,9 @@ Create Date: 2026-03-14 00:00:00.000000
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import ENUM
from utils.database import is_postgresql
revision = "0073_sync_sessions"
down_revision = "0072_client_tokens"
@@ -16,6 +19,28 @@ depends_on = None
def upgrade() -> None:
connection = op.get_bind()
if is_postgresql(connection):
rom_user_status_enum = ENUM(
"PENDING",
"IN_PROGRESS",
"COMPLETED",
"FAILED",
"CANCELLED",
name="syncsessionstatus",
create_type=False,
)
rom_user_status_enum.create(connection, checkfirst=False)
else:
rom_user_status_enum = sa.Enum(
"PENDING",
"IN_PROGRESS",
"COMPLETED",
"FAILED",
"CANCELLED",
name="syncsessionstatus",
)
op.create_table(
"sync_sessions",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
@@ -24,15 +49,15 @@ def upgrade() -> None:
sa.Column(
"status",
sa.Enum(
"pending",
"in_progress",
"completed",
"failed",
"cancelled",
"PENDING",
"IN_PROGRESS",
"COMPLETED",
"FAILED",
"CANCELLED",
name="syncsessionstatus",
),
nullable=False,
server_default="pending",
server_default="PENDING",
),
sa.Column(
"initiated_at",
@@ -80,4 +105,5 @@ def downgrade() -> None:
op.drop_index("ix_sync_sessions_status", table_name="sync_sessions")
op.drop_index("ix_sync_sessions_user_id", table_name="sync_sessions")
op.drop_index("ix_sync_sessions_device_id", table_name="sync_sessions")
op.drop_table("sync_sessions")

View File

@@ -15,11 +15,11 @@ if TYPE_CHECKING:
class SyncSessionStatus(enum.StrEnum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
PENDING = "PENDING"
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class SyncSession(BaseModel):
@@ -35,7 +35,9 @@ class SyncSession(BaseModel):
)
status: Mapped[SyncSessionStatus] = mapped_column(
Enum(SyncSessionStatus), default=SyncSessionStatus.PENDING, index=True
Enum(SyncSessionStatus),
default=SyncSessionStatus.PENDING,
index=True,
)
initiated_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(

View File

@@ -23,6 +23,7 @@ from models.device import Device
from models.device_save_sync import DeviceSaveSync
from models.platform import Platform
from models.rom import Rom
from models.sync_session import SyncSession
from models.user import Role, User
engine = create_engine(ConfigManager.get_db_engine(), pool_pre_ping=True)
@@ -38,6 +39,7 @@ def setup_database():
def clear_database():
with session.begin() as s:
s.query(ClientToken).delete(synchronize_session="evaluate")
s.query(SyncSession).delete(synchronize_session="evaluate")
s.query(DeviceSaveSync).delete(synchronize_session="evaluate")
s.query(Device).delete(synchronize_session="evaluate")
s.query(Save).delete(synchronize_session="evaluate")

View File

@@ -0,0 +1,423 @@
"""Tests for sync endpoints."""
from datetime import datetime, timedelta, timezone
from unittest import mock
import pytest
from fastapi import status
from fastapi.testclient import TestClient
from main import app
from config import OAUTH_ACCESS_TOKEN_EXPIRE_SECONDS
from handler.auth import oauth_handler
from handler.database import (
db_device_handler,
db_device_save_sync_handler,
db_save_handler,
db_sync_session_handler,
)
from handler.redis_handler import sync_cache
from models.assets import Save
from models.device import Device, SyncMode
from models.platform import Platform
from models.rom import Rom
from models.sync_session import SyncSession, SyncSessionStatus
from models.user import User
@pytest.fixture
def client():
with TestClient(app) as client:
yield client
@pytest.fixture(autouse=True)
def clear_cache():
yield
sync_cache.flushall()
@pytest.fixture
def editor_access_token(editor_user: User):
return oauth_handler.create_access_token(
data={
"sub": editor_user.username,
"iss": "romm:oauth",
"scopes": " ".join(editor_user.oauth_scopes),
},
expires_delta=timedelta(seconds=OAUTH_ACCESS_TOKEN_EXPIRE_SECONDS),
)
class TestSyncNegotiate:
def test_negotiate_new_client_save(
self, client, access_token: str, admin_user: User, rom: Rom
):
"""Client has a save the server doesn't -> upload."""
device = db_device_handler.add_device(
Device(id="neg-dev-1", user_id=admin_user.id, sync_enabled=True)
)
response = client.post(
"/api/sync/negotiate",
json={
"device_id": device.id,
"saves": [
{
"rom_id": rom.id,
"file_name": "new_save.sav",
"updated_at": "2026-01-10T00:00:00Z",
"file_size_bytes": 1024,
}
],
},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["total_upload"] == 1
assert data["operations"][0]["action"] == "upload"
def test_negotiate_server_has_save_client_doesnt(
self, client, access_token: str, admin_user: User, save: Save
):
"""Server has a save the client doesn't mention -> download."""
device = db_device_handler.add_device(
Device(id="neg-dev-2", user_id=admin_user.id, sync_enabled=True)
)
response = client.post(
"/api/sync/negotiate",
json={
"device_id": device.id,
"saves": [],
},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["total_download"] >= 1
def test_negotiate_identical_hashes(
self, client, access_token: str, admin_user: User, rom: Rom, save: Save
):
"""Matching hash -> no_op."""
device = db_device_handler.add_device(
Device(id="neg-dev-3", user_id=admin_user.id, sync_enabled=True)
)
response = client.post(
"/api/sync/negotiate",
json={
"device_id": device.id,
"saves": [
{
"rom_id": save.rom_id,
"file_name": save.file_name,
"content_hash": save.content_hash,
"updated_at": save.updated_at.isoformat(),
"file_size_bytes": 100,
}
],
},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# If save has a hash, should be no_op; otherwise download/upload by timestamp
assert "session_id" in data
def test_negotiate_device_not_found(self, client, access_token: str):
response = client.post(
"/api/sync/negotiate",
json={"device_id": "nonexistent", "saves": []},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_negotiate_sync_disabled(self, client, access_token: str, admin_user: User):
device = db_device_handler.add_device(
Device(id="neg-dev-disabled", user_id=admin_user.id, sync_enabled=False)
)
response = client.post(
"/api/sync/negotiate",
json={"device_id": device.id, "saves": []},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_negotiate_creates_session(
self, client, access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(id="neg-dev-session", user_id=admin_user.id, sync_enabled=True)
)
response = client.post(
"/api/sync/negotiate",
json={"device_id": device.id, "saves": []},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "session_id" in data
assert data["session_id"] > 0
class TestSyncSessions:
def test_complete_session(self, client, access_token: str, admin_user: User):
device = db_device_handler.add_device(
Device(id="session-dev-1", user_id=admin_user.id)
)
sync_session = db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
response = client.post(
f"/api/sync/sessions/{sync_session.id}/complete",
json={"operations_completed": 5, "operations_failed": 1},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["status"] == "COMPLETED"
assert data["operations_completed"] == 5
assert data["operations_failed"] == 1
def test_complete_session_not_found(self, client, access_token: str):
response = client.post(
"/api/sync/sessions/99999/complete",
json={"operations_completed": 0, "operations_failed": 0},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_complete_already_completed_session(
self, client, access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(id="session-dev-completed", user_id=admin_user.id)
)
sync_session = db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
db_sync_session_handler.complete_session(session_id=sync_session.id)
response = client.post(
f"/api/sync/sessions/{sync_session.id}/complete",
json={"operations_completed": 0, "operations_failed": 0},
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_list_sessions(self, client, access_token: str, admin_user: User):
device = db_device_handler.add_device(
Device(id="session-dev-list", user_id=admin_user.id)
)
db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
response = client.get(
"/api/sync/sessions",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 2
def test_list_sessions_filter_by_device(
self, client, access_token: str, admin_user: User
):
dev_a = db_device_handler.add_device(
Device(id="session-dev-a", user_id=admin_user.id)
)
dev_b = db_device_handler.add_device(
Device(id="session-dev-b", user_id=admin_user.id)
)
db_sync_session_handler.create_session(
device_id=dev_a.id, user_id=admin_user.id
)
db_sync_session_handler.create_session(
device_id=dev_b.id, user_id=admin_user.id
)
response = client.get(
f"/api/sync/sessions?device_id={dev_a.id}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 1
assert data[0]["device_id"] == dev_a.id
def test_get_session(self, client, access_token: str, admin_user: User):
device = db_device_handler.add_device(
Device(id="session-dev-get", user_id=admin_user.id)
)
sync_session = db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
response = client.get(
f"/api/sync/sessions/{sync_session.id}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["id"] == sync_session.id
assert data["device_id"] == device.id
def test_get_session_not_found(self, client, access_token: str):
response = client.get(
"/api/sync/sessions/99999",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
class TestSyncUserIsolation:
def test_cannot_negotiate_with_other_users_device(
self, client, editor_access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(id="admin-sync-dev", user_id=admin_user.id, sync_enabled=True)
)
response = client.post(
"/api/sync/negotiate",
json={"device_id": device.id, "saves": []},
headers={"Authorization": f"Bearer {editor_access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_cannot_complete_other_users_session(
self, client, editor_access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(id="admin-session-dev", user_id=admin_user.id)
)
sync_session = db_sync_session_handler.create_session(
device_id=device.id, user_id=admin_user.id
)
response = client.post(
f"/api/sync/sessions/{sync_session.id}/complete",
json={"operations_completed": 0, "operations_failed": 0},
headers={"Authorization": f"Bearer {editor_access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_sessions_only_return_own(
self,
client,
access_token: str,
editor_access_token: str,
admin_user: User,
editor_user: User,
):
admin_dev = db_device_handler.add_device(
Device(id="admin-iso-dev", user_id=admin_user.id)
)
editor_dev = db_device_handler.add_device(
Device(id="editor-iso-dev", user_id=editor_user.id)
)
db_sync_session_handler.create_session(
device_id=admin_dev.id, user_id=admin_user.id
)
db_sync_session_handler.create_session(
device_id=editor_dev.id, user_id=editor_user.id
)
admin_resp = client.get(
"/api/sync/sessions",
headers={"Authorization": f"Bearer {access_token}"},
)
editor_resp = client.get(
"/api/sync/sessions",
headers={"Authorization": f"Bearer {editor_access_token}"},
)
assert len(admin_resp.json()) == 1
assert admin_resp.json()[0]["device_id"] == admin_dev.id
assert len(editor_resp.json()) == 1
assert editor_resp.json()[0]["device_id"] == editor_dev.id
class TestPushPullTrigger:
def test_trigger_push_pull(self, client, access_token: str, admin_user: User):
device = db_device_handler.add_device(
Device(
id="pp-dev-1",
user_id=admin_user.id,
sync_mode=SyncMode.PUSH_PULL,
sync_enabled=True,
)
)
with mock.patch("endpoints.sync.high_prio_queue") as mock_queue:
response = client.post(
f"/api/sync/devices/{device.id}/push-pull",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["device_id"] == device.id
assert data["status"] == "PENDING"
mock_queue.enqueue.assert_called_once()
def test_trigger_push_pull_wrong_mode(
self, client, access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(
id="pp-dev-wrong-mode",
user_id=admin_user.id,
sync_mode=SyncMode.API,
sync_enabled=True,
)
)
response = client.post(
f"/api/sync/devices/{device.id}/push-pull",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_trigger_push_pull_sync_disabled(
self, client, access_token: str, admin_user: User
):
device = db_device_handler.add_device(
Device(
id="pp-dev-disabled",
user_id=admin_user.id,
sync_mode=SyncMode.PUSH_PULL,
sync_enabled=False,
)
)
response = client.post(
f"/api/sync/devices/{device.id}/push-pull",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_trigger_push_pull_device_not_found(self, client, access_token: str):
response = client.post(
"/api/sync/devices/nonexistent/push-pull",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND

View File

@@ -0,0 +1,191 @@
from datetime import datetime, timedelta, timezone
from handler.database import (
db_device_handler,
db_device_save_sync_handler,
db_save_handler,
)
from models.assets import Save
from models.device import Device
from models.rom import Rom
from models.user import User
class TestGetSync:
def test_get_existing_sync(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="sync-dev-1", user_id=admin_user.id)
)
db_device_save_sync_handler.upsert_sync(device.id, save.id)
result = db_device_save_sync_handler.get_sync(device.id, save.id)
assert result is not None
assert result.device_id == device.id
assert result.save_id == save.id
def test_get_nonexistent_sync(self, admin_user: User):
result = db_device_save_sync_handler.get_sync("no-device", 999)
assert result is None
class TestGetSyncsForDeviceAndSaves:
def test_returns_matching_syncs(self, admin_user: User, rom: Rom):
device = db_device_handler.add_device(
Device(id="bulk-dev-1", user_id=admin_user.id)
)
saves = []
for i in range(3):
s = db_save_handler.add_save(
Save(
rom_id=rom.id,
user_id=admin_user.id,
file_name=f"bulk_{i}.sav",
file_name_no_tags=f"bulk_{i}",
file_name_no_ext=f"bulk_{i}",
file_extension="sav",
emulator="emu",
file_path=f"{rom.platform_slug}/saves",
file_size_bytes=100,
)
)
saves.append(s)
db_device_save_sync_handler.upsert_sync(device.id, s.id)
result = db_device_save_sync_handler.get_syncs_for_device_and_saves(
device.id, [saves[0].id, saves[2].id]
)
assert len(result) == 2
ids = {r.save_id for r in result}
assert saves[0].id in ids
assert saves[2].id in ids
def test_empty_save_ids_returns_empty(self, admin_user: User):
result = db_device_save_sync_handler.get_syncs_for_device_and_saves(
"any-dev", []
)
assert result == []
class TestUpsertSync:
def test_creates_new_sync(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="upsert-dev-1", user_id=admin_user.id)
)
result = db_device_save_sync_handler.upsert_sync(device.id, save.id)
assert result.device_id == device.id
assert result.save_id == save.id
assert result.is_untracked is False
assert result.last_synced_at is not None
def test_updates_existing_sync(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="upsert-dev-2", user_id=admin_user.id)
)
earlier = datetime(2020, 1, 1, tzinfo=timezone.utc)
db_device_save_sync_handler.upsert_sync(device.id, save.id, synced_at=earlier)
later = datetime(2025, 6, 1, tzinfo=timezone.utc)
result = db_device_save_sync_handler.upsert_sync(
device.id, save.id, synced_at=later
)
assert result.last_synced_at == later
assert result.is_untracked is False
def test_upsert_clears_untracked(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="upsert-dev-3", user_id=admin_user.id)
)
db_device_save_sync_handler.set_untracked(device.id, save.id, True)
sync = db_device_save_sync_handler.get_sync(device.id, save.id)
assert sync is not None
assert sync.is_untracked is True
db_device_save_sync_handler.upsert_sync(device.id, save.id)
sync = db_device_save_sync_handler.get_sync(device.id, save.id)
assert sync is not None
assert sync.is_untracked is False
def test_upsert_with_custom_timestamp(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="upsert-dev-4", user_id=admin_user.id)
)
ts = datetime(2024, 3, 15, 12, 0, 0, tzinfo=timezone.utc)
result = db_device_save_sync_handler.upsert_sync(
device.id, save.id, synced_at=ts
)
assert result.last_synced_at == ts
class TestSetUntracked:
def test_set_untracked_on_existing(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="untrack-dev-1", user_id=admin_user.id)
)
db_device_save_sync_handler.upsert_sync(device.id, save.id)
result = db_device_save_sync_handler.set_untracked(device.id, save.id, True)
assert result is not None
assert result.is_untracked is True
def test_set_tracked_on_existing(self, admin_user: User, rom: Rom, save: Save):
device = db_device_handler.add_device(
Device(id="untrack-dev-2", user_id=admin_user.id)
)
db_device_save_sync_handler.upsert_sync(device.id, save.id)
db_device_save_sync_handler.set_untracked(device.id, save.id, True)
result = db_device_save_sync_handler.set_untracked(device.id, save.id, False)
assert result is not None
assert result.is_untracked is False
def test_set_untracked_creates_new_record(
self, admin_user: User, rom: Rom, save: Save
):
device = db_device_handler.add_device(
Device(id="untrack-dev-3", user_id=admin_user.id)
)
result = db_device_save_sync_handler.set_untracked(device.id, save.id, True)
assert result is not None
assert result.is_untracked is True
assert result.device_id == device.id
def test_set_tracked_nonexistent_returns_none(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="untrack-dev-4", user_id=admin_user.id)
)
result = db_device_save_sync_handler.set_untracked(device.id, 999999, False)
assert result is None
class TestDeleteSyncsForDevice:
def test_deletes_all_syncs(self, admin_user: User, rom: Rom):
device = db_device_handler.add_device(
Device(id="del-dev-1", user_id=admin_user.id)
)
for i in range(3):
s = db_save_handler.add_save(
Save(
rom_id=rom.id,
user_id=admin_user.id,
file_name=f"del_{i}.sav",
file_name_no_tags=f"del_{i}",
file_name_no_ext=f"del_{i}",
file_extension="sav",
emulator="emu",
file_path=f"{rom.platform_slug}/saves",
file_size_bytes=100,
)
)
db_device_save_sync_handler.upsert_sync(device.id, s.id)
db_device_save_sync_handler.delete_syncs_for_device(device.id)
result = db_device_save_sync_handler.get_syncs_for_device_and_saves(
device.id, [1, 2, 3]
)
assert len(result) == 0
def test_delete_nonexistent_device_no_error(self):
db_device_save_sync_handler.delete_syncs_for_device("nonexistent-device")

View File

@@ -0,0 +1,221 @@
from handler.database import db_device_handler, db_sync_session_handler
from models.device import Device
from models.sync_session import SyncSessionStatus
from models.user import User
class TestCreateSession:
def test_creates_pending_session(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="sess-dev-1", user_id=admin_user.id)
)
session = db_sync_session_handler.create_session(device.id, admin_user.id)
assert session.id is not None
assert session.device_id == device.id
assert session.user_id == admin_user.id
assert session.status == SyncSessionStatus.PENDING
assert session.initiated_at is not None
assert session.completed_at is None
class TestGetSession:
def test_get_existing_session(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="get-dev-1", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.get_session(created.id, admin_user.id)
assert result is not None
assert result.id == created.id
def test_get_session_wrong_user(self, admin_user: User, editor_user: User):
device = db_device_handler.add_device(
Device(id="get-dev-2", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.get_session(created.id, editor_user.id)
assert result is None
def test_get_nonexistent_session(self, admin_user: User):
result = db_sync_session_handler.get_session(999999, admin_user.id)
assert result is None
class TestGetActiveSession:
def test_returns_pending_session(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="active-dev-1", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.get_active_session(device.id, admin_user.id)
assert result is not None
assert result.id == created.id
def test_returns_in_progress_session(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="active-dev-2", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.update_session(
created.id, {"status": SyncSessionStatus.IN_PROGRESS}
)
result = db_sync_session_handler.get_active_session(device.id, admin_user.id)
assert result is not None
assert result.status == SyncSessionStatus.IN_PROGRESS
def test_does_not_return_completed_session(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="active-dev-3", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.complete_session(created.id)
result = db_sync_session_handler.get_active_session(device.id, admin_user.id)
assert result is None
def test_no_active_session(self, admin_user: User):
result = db_sync_session_handler.get_active_session(
"nonexistent", admin_user.id
)
assert result is None
class TestCompleteSession:
def test_marks_completed_with_counts(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="comp-dev-1", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.complete_session(
created.id, operations_completed=10, operations_failed=2
)
assert result.status == SyncSessionStatus.COMPLETED
assert result.operations_completed == 10
assert result.operations_failed == 2
assert result.completed_at is not None
class TestFailSession:
def test_marks_failed_with_error(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="fail-dev-1", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.fail_session(
created.id, error_message="Connection lost"
)
assert result.status == SyncSessionStatus.FAILED
assert result.error_message == "Connection lost"
assert result.completed_at is not None
def test_marks_failed_without_error(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="fail-dev-2", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
result = db_sync_session_handler.fail_session(created.id)
assert result.status == SyncSessionStatus.FAILED
assert result.error_message is None
class TestCancelActiveSessions:
def test_cancels_active_sessions(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="cancel-dev-1", user_id=admin_user.id)
)
db_sync_session_handler.create_session(device.id, admin_user.id)
s2 = db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.update_session(
s2.id, {"status": SyncSessionStatus.IN_PROGRESS}
)
count = db_sync_session_handler.cancel_active_sessions(device.id, admin_user.id)
assert count == 2
active = db_sync_session_handler.get_active_session(device.id, admin_user.id)
assert active is None
def test_does_not_cancel_completed(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="cancel-dev-2", user_id=admin_user.id)
)
created = db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.complete_session(created.id)
count = db_sync_session_handler.cancel_active_sessions(device.id, admin_user.id)
assert count == 0
class TestGetSessions:
def test_list_sessions_for_user(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="list-dev-1", user_id=admin_user.id)
)
for _ in range(3):
db_sync_session_handler.create_session(device.id, admin_user.id)
sessions = db_sync_session_handler.get_sessions(admin_user.id)
assert len(sessions) == 3
def test_filter_by_device(self, admin_user: User):
d1 = db_device_handler.add_device(
Device(id="list-dev-2a", user_id=admin_user.id)
)
d2 = db_device_handler.add_device(
Device(id="list-dev-2b", user_id=admin_user.id)
)
db_sync_session_handler.create_session(d1.id, admin_user.id)
db_sync_session_handler.create_session(d2.id, admin_user.id)
db_sync_session_handler.create_session(d2.id, admin_user.id)
sessions = db_sync_session_handler.get_sessions(admin_user.id, device_id=d2.id)
assert len(sessions) == 2
assert all(s.device_id == d2.id for s in sessions)
def test_filter_by_status(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="list-dev-3", user_id=admin_user.id)
)
s1 = db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.create_session(device.id, admin_user.id)
db_sync_session_handler.complete_session(s1.id)
sessions = db_sync_session_handler.get_sessions(
admin_user.id, status=SyncSessionStatus.COMPLETED
)
assert len(sessions) == 1
assert sessions[0].status == SyncSessionStatus.COMPLETED
def test_respects_limit(self, admin_user: User):
device = db_device_handler.add_device(
Device(id="list-dev-4", user_id=admin_user.id)
)
for _ in range(5):
db_sync_session_handler.create_session(device.id, admin_user.id)
sessions = db_sync_session_handler.get_sessions(admin_user.id, limit=2)
assert len(sessions) == 2
def test_user_isolation(self, admin_user: User, editor_user: User):
d1 = db_device_handler.add_device(Device(id="iso-dev-1", user_id=admin_user.id))
d2 = db_device_handler.add_device(
Device(id="iso-dev-2", user_id=editor_user.id)
)
db_sync_session_handler.create_session(d1.id, admin_user.id)
db_sync_session_handler.create_session(d2.id, editor_user.id)
admin_sessions = db_sync_session_handler.get_sessions(admin_user.id)
assert len(admin_sessions) == 1
assert admin_sessions[0].user_id == admin_user.id
editor_sessions = db_sync_session_handler.get_sessions(editor_user.id)
assert len(editor_sessions) == 1
assert editor_sessions[0].user_id == editor_user.id

View File

@@ -0,0 +1,142 @@
"""Tests for filesystem sync handler."""
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from handler.filesystem.sync_handler import FSSyncHandler
class TestFSSyncHandler:
@pytest.fixture
def temp_dir(self):
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def handler(self):
return FSSyncHandler.__new__(FSSyncHandler)
@pytest.fixture(autouse=True)
def patch_base_path(self, handler: FSSyncHandler, temp_dir):
handler.base_path = Path(temp_dir)
def test_build_incoming_path(self, handler: FSSyncHandler):
path = handler.build_incoming_path("device-1")
assert "device-1" in path
assert "incoming" in path
def test_build_incoming_path_with_platform(self, handler):
path = handler.build_incoming_path("device-1", "gba")
assert "device-1" in path
assert "incoming" in path
assert "gba" in path
def test_build_outgoing_path(self, handler: FSSyncHandler):
path = handler.build_outgoing_path("device-1")
assert "device-1" in path
assert "outgoing" in path
def test_build_outgoing_path_with_platform(self, handler: FSSyncHandler):
path = handler.build_outgoing_path("device-1", "snes")
assert "device-1" in path
assert "outgoing" in path
assert "snes" in path
def test_build_conflicts_path(self, handler: FSSyncHandler):
path = handler.build_conflicts_path("device-1", "gba")
assert "device-1" in path
assert "conflicts" in path
assert "gba" in path
def test_ensure_device_directories(self, handler: FSSyncHandler, temp_dir):
handler.ensure_device_directories("test-device")
incoming = handler.build_incoming_path("test-device")
outgoing = handler.build_outgoing_path("test-device")
assert os.path.isdir(incoming)
assert os.path.isdir(outgoing)
def test_list_incoming_files_empty(self, handler: FSSyncHandler):
result = handler.list_incoming_files("nonexistent-device")
assert result == []
def test_list_incoming_files(self, handler: FSSyncHandler, temp_dir):
# Set up: create incoming/platform/file structure
handler.ensure_device_directories("dev-1")
incoming_path = handler.build_incoming_path("dev-1", "gba")
os.makedirs(incoming_path, exist_ok=True)
test_file = os.path.join(incoming_path, "save.sav")
with open(test_file, "wb") as f:
f.write(b"test save content")
result = handler.list_incoming_files("dev-1")
assert len(result) == 1
assert result[0]["platform_slug"] == "gba"
assert result[0]["file_name"] == "save.sav"
assert result[0]["file_size"] == 17
def test_compute_file_hash(self, handler: FSSyncHandler, temp_dir):
test_file = os.path.join(temp_dir, "test.bin")
with open(test_file, "wb") as f:
f.write(b"hello world")
hash1 = handler.compute_file_hash(test_file)
hash2 = handler.compute_file_hash(test_file)
assert hash1 == hash2
assert len(hash1) == 32 # MD5 hex length
def test_compute_file_hash_different_content(
self, handler: FSSyncHandler, temp_dir
):
file_a = os.path.join(temp_dir, "a.bin")
file_b = os.path.join(temp_dir, "b.bin")
with open(file_a, "wb") as f:
f.write(b"content a")
with open(file_b, "wb") as f:
f.write(b"content b")
assert handler.compute_file_hash(file_a) != handler.compute_file_hash(file_b)
def test_write_outgoing_file(self, handler: FSSyncHandler, temp_dir):
path = handler.write_outgoing_file(
device_id="dev-1",
platform_slug="gba",
file_name="save.sav",
data=b"outgoing save data",
)
assert os.path.isfile(path)
with open(path, "rb") as f:
assert f.read() == b"outgoing save data"
def test_remove_incoming_file(self, handler: FSSyncHandler, temp_dir):
handler.ensure_device_directories("dev-1")
incoming = handler.build_incoming_path("dev-1", "gba")
os.makedirs(incoming, exist_ok=True)
test_file = os.path.join(incoming, "to_remove.sav")
with open(test_file, "wb") as f:
f.write(b"data")
assert os.path.exists(test_file)
handler.remove_incoming_file(test_file)
assert not os.path.exists(test_file)
def test_remove_incoming_file_outside_base_raises(
self, handler: FSSyncHandler, temp_dir
):
outside_file = os.path.join(tempfile.gettempdir(), "outside.txt")
with open(outside_file, "w") as f:
f.write("should not be deleted")
with pytest.raises(ValueError, match="outside the sync base directory"):
handler.remove_incoming_file(outside_file)
# Cleanup
os.unlink(outside_file)
def test_remove_incoming_file_nonexistent(self, handler: FSSyncHandler):
# Should not raise for nonexistent files
handler.remove_incoming_file("/nonexistent/path/file.sav")

View File

View File

@@ -0,0 +1,145 @@
"""Tests for sync comparison algorithm."""
from datetime import datetime, timezone
import pytest
from handler.sync.comparison import SyncComparisonResult, compare_save_state
class TestCompareIdenticalHashes:
def test_identical_hashes_returns_no_op(self):
result = compare_save_state(
client_hash="abc123",
client_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
server_hash="abc123",
server_updated_at=datetime(2026, 1, 2, tzinfo=timezone.utc),
device_last_synced_at=None,
)
assert result.action == "no_op"
assert "identical" in result.reason.lower()
def test_identical_hashes_with_sync_history(self):
result = compare_save_state(
client_hash="abc123",
client_updated_at=datetime(2026, 1, 5, tzinfo=timezone.utc),
server_hash="abc123",
server_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
device_last_synced_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
assert result.action == "no_op"
class TestCompareWithSyncHistory:
"""Tests where device has synced before (device_last_synced_at is set)."""
def test_client_changed_returns_upload(self):
result = compare_save_state(
client_hash="new_hash",
client_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
server_hash="old_hash",
server_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
device_last_synced_at=datetime(2026, 1, 5, tzinfo=timezone.utc),
)
assert result.action == "upload"
def test_server_changed_returns_download(self):
result = compare_save_state(
client_hash="old_hash",
client_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
server_hash="new_hash",
server_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
device_last_synced_at=datetime(2026, 1, 5, tzinfo=timezone.utc),
)
assert result.action == "download"
def test_both_changed_returns_conflict(self):
result = compare_save_state(
client_hash="client_new",
client_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
server_hash="server_new",
server_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
device_last_synced_at=datetime(2026, 1, 5, tzinfo=timezone.utc),
)
assert result.action == "conflict"
def test_neither_changed_returns_no_op(self):
result = compare_save_state(
client_hash="different_hash",
client_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
server_hash="other_hash",
server_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
device_last_synced_at=datetime(2026, 1, 5, tzinfo=timezone.utc),
)
assert result.action == "no_op"
class TestCompareWithoutSyncHistory:
"""Tests where device has never synced (device_last_synced_at is None)."""
def test_client_newer_returns_upload(self):
result = compare_save_state(
client_hash="client_hash",
client_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
server_hash="server_hash",
server_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
device_last_synced_at=None,
)
assert result.action == "upload"
def test_server_newer_returns_download(self):
result = compare_save_state(
client_hash="client_hash",
client_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
server_hash="server_hash",
server_updated_at=datetime(2026, 1, 10, tzinfo=timezone.utc),
device_last_synced_at=None,
)
assert result.action == "download"
def test_same_timestamp_different_hashes_returns_conflict(self):
ts = datetime(2026, 1, 5, tzinfo=timezone.utc)
result = compare_save_state(
client_hash="hash_a",
client_updated_at=ts,
server_hash="hash_b",
server_updated_at=ts,
device_last_synced_at=None,
)
assert result.action == "conflict"
def test_same_timestamp_same_hashes_returns_no_op(self):
ts = datetime(2026, 1, 5, tzinfo=timezone.utc)
result = compare_save_state(
client_hash="same",
client_updated_at=ts,
server_hash="same",
server_updated_at=ts,
device_last_synced_at=None,
)
assert result.action == "no_op"
def test_same_timestamp_none_hashes_returns_no_op(self):
ts = datetime(2026, 1, 5, tzinfo=timezone.utc)
result = compare_save_state(
client_hash=None,
client_updated_at=ts,
server_hash=None,
server_updated_at=ts,
device_last_synced_at=None,
)
assert result.action == "no_op"
class TestCompareReturnType:
def test_returns_named_tuple(self):
result = compare_save_state(
client_hash="a",
client_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
server_hash="a",
server_updated_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
device_last_synced_at=None,
)
assert isinstance(result, SyncComparisonResult)
assert isinstance(result.action, str)
assert isinstance(result.reason, str)

View File

@@ -0,0 +1,69 @@
"""Tests for SyncPushPullTask initialization and configuration."""
from unittest.mock import MagicMock, patch
import pytest
from tasks.sync_push_pull_task import SyncPushPullTask, sync_push_pull_task
from tasks.tasks import PeriodicTask, TaskType
class TestSyncPushPullTaskInit:
@pytest.fixture
def task(self):
return SyncPushPullTask()
def test_init(self, task: SyncPushPullTask):
assert task.title == "Push-Pull Sync"
assert task.description == "Sync saves with devices via SSH/SFTP"
assert task.task_type == TaskType.SYNC
assert task.func == "tasks.sync_push_pull_task.run_push_pull_sync"
def test_is_periodic_task(self, task: SyncPushPullTask):
assert isinstance(task, PeriodicTask)
def test_module_singleton_exists(self):
assert sync_push_pull_task is not None
assert isinstance(sync_push_pull_task, SyncPushPullTask)
def test_cron_string_set(self, task: SyncPushPullTask):
assert task.cron_string is not None
class TestRunPushPullSync:
@patch("tasks.sync_push_pull_task.ENABLE_SYNC_PUSH_PULL", False)
async def test_run_disabled_returns_disabled(self):
from tasks.sync_push_pull_task import run_push_pull_sync
result = await run_push_pull_sync()
assert result["status"] == "disabled"
@patch("tasks.sync_push_pull_task.ENABLE_SYNC_PUSH_PULL", True)
@patch("tasks.sync_push_pull_task.db_device_handler")
async def test_run_no_devices(self, mock_device_handler):
from tasks.sync_push_pull_task import run_push_pull_sync
mock_device_handler.get_all_devices_by_sync_mode.return_value = []
result = await run_push_pull_sync()
assert result["status"] == "no_devices"
@patch("tasks.sync_push_pull_task.ENABLE_SYNC_PUSH_PULL", True)
@patch("tasks.sync_push_pull_task.db_device_handler")
async def test_run_device_not_found(self, mock_device_handler):
from tasks.sync_push_pull_task import run_push_pull_sync
mock_device_handler.get_device_by_id.return_value = None
result = await run_push_pull_sync(device_id="nonexistent")
assert result["status"] == "error"
assert "not found" in result["message"]
@patch("tasks.sync_push_pull_task.ENABLE_SYNC_PUSH_PULL", False)
async def test_run_force_override(self):
from tasks.sync_push_pull_task import run_push_pull_sync
with patch("tasks.sync_push_pull_task.db_device_handler") as mock_handler:
mock_handler.get_device_by_id.return_value = None
result = await run_push_pull_sync(device_id="test", force=True)
assert result["status"] == "error" # Device not found, but didn't skip