From 02b5a797a7a8eb0f8e92c45f3fcb9d5ef2475bbf Mon Sep 17 00:00:00 2001 From: FuzzyGrim Date: Sat, 9 May 2026 16:15:14 +0200 Subject: [PATCH] harden media ownership checks --- src/app/helpers.py | 22 +++++- src/app/models.py | 2 +- src/app/tests/views/test_crud.py | 108 +++++++++++++++++++++++++++ src/app/tests/views/test_progress.py | 36 +++++++++ src/app/views.py | 34 ++------- 5 files changed, 173 insertions(+), 29 deletions(-) diff --git a/src/app/helpers.py b/src/app/helpers.py index 2b343fc5..cb0b90f3 100644 --- a/src/app/helpers.py +++ b/src/app/helpers.py @@ -4,8 +4,9 @@ from urllib.parse import parse_qsl, urlencode, urljoin, urlparse from django.apps import apps from django.conf import settings from django.contrib import messages +from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q -from django.http import HttpResponseRedirect +from django.http import Http404, HttpResponseRedirect from django.shortcuts import redirect from django.utils import timezone from django.utils.encoding import iri_to_uri @@ -17,6 +18,25 @@ YEAR_ONLY_PARTS = 1 YEAR_MONTH_PARTS = 2 +def get_owned_media_or_404(request, media_type, instance_id, *, prefetch=False): + """Return media owned by the current user or raise 404.""" + try: + if prefetch: + return BasicMedia.objects.get_media_prefetch( + request.user, + media_type, + instance_id, + ) + return BasicMedia.objects.get_media( + request.user, + media_type, + instance_id, + ) + except ObjectDoesNotExist as exc: + msg = "Media not found" + raise Http404(msg) from exc + + def get_configured_app_url(): """Return the configured public application origin, if one is available.""" for url in getattr(settings, "URLS", []): diff --git a/src/app/models.py b/src/app/models.py index 3189319d..180b398d 100644 --- a/src/app/models.py +++ b/src/app/models.py @@ -670,7 +670,7 @@ class MediaManager(models.Manager): queryset = self._apply_prefetch_related(queryset, media_type) self.annotate_max_progress(queryset, media_type) - return queryset[0] + return queryset.get() def _get_media_params( self, diff --git a/src/app/tests/views/test_crud.py b/src/app/tests/views/test_crud.py index 0a57efa6..d5ddda45 100644 --- a/src/app/tests/views/test_crud.py +++ b/src/app/tests/views/test_crud.py @@ -23,7 +23,11 @@ class CreateMedia(TestCase): def setUp(self): """Create a user and log in.""" self.credentials = {"username": "test", "password": "12345"} + self.external_credentials = {"username": "test2", "password": "12345"} self.user = get_user_model().objects.create_user(**self.credentials) + self.external_user = get_user_model().objects.create_user( + **self.external_credentials + ) self.client.login(**self.credentials) @override_settings(MEDIA_ROOT=("create_media")) @@ -129,7 +133,11 @@ class EditMedia(TestCase): def setUp(self): """Create a user and log in.""" self.credentials = {"username": "test", "password": "12345"} + self.external_credentials = {"username": "test2", "password": "12345"} self.user = get_user_model().objects.create_user(**self.credentials) + self.external_user = get_user_model().objects.create_user( + **self.external_credentials + ) self.client.login(**self.credentials) def test_edit_movie_score(self): @@ -167,6 +175,75 @@ class EditMedia(TestCase): ) self.assertEqual(Movie.objects.get(item__media_id="10494").score, 10) + def test_cannot_edit_another_users_media(self): + """Test users cannot edit another user's media by instance ID.""" + item = Item.objects.create( + media_id="10494", + source=Sources.TMDB.value, + media_type=MediaTypes.MOVIE.value, + title="Perfect Blue", + image="http://example.com/image.jpg", + ) + movie = Movie.objects.create( + item=item, + user=self.external_user, + score=9, + progress=0, + status=Status.PLANNING.value, + notes="Nice", + ) + + response = self.client.post( + reverse("media_save"), + { + "instance_id": movie.id, + "media_id": "10494", + "source": Sources.TMDB.value, + "media_type": MediaTypes.MOVIE.value, + "score": 10, + "progress": 0, + "status": Status.PLANNING.value, + "notes": "Changed", + }, + ) + + self.assertEqual(response.status_code, 404) + movie.refresh_from_db() + self.assertEqual(movie.score, 9) + self.assertEqual(movie.notes, "Nice") + + def test_cannot_update_another_users_media_score(self): + """Test users cannot update another user's score by instance ID.""" + item = Item.objects.create( + media_id="10494", + source=Sources.TMDB.value, + media_type=MediaTypes.MOVIE.value, + title="Perfect Blue", + image="http://example.com/image.jpg", + ) + movie = Movie.objects.create( + item=item, + user=self.external_user, + score=9, + progress=0, + status=Status.PLANNING.value, + ) + + response = self.client.post( + reverse( + "update_media_score", + kwargs={ + "media_type": MediaTypes.MOVIE.value, + "instance_id": movie.id, + }, + ), + {"score": 10}, + ) + + self.assertEqual(response.status_code, 404) + movie.refresh_from_db() + self.assertEqual(movie.score, 9) + class DeleteMedia(TestCase): """Test the deletion of media objects through views.""" @@ -174,7 +251,11 @@ class DeleteMedia(TestCase): def setUp(self): """Create a user and log in.""" self.credentials = {"username": "test", "password": "12345"} + self.external_credentials = {"username": "test2", "password": "12345"} self.user = get_user_model().objects.create_user(**self.credentials) + self.external_user = get_user_model().objects.create_user( + **self.external_credentials + ) self.client.login(**self.credentials) self.item_season = Item.objects.create( @@ -236,6 +317,33 @@ class DeleteMedia(TestCase): 0, ) + def test_cannot_delete_another_users_media(self): + """Test users cannot delete another user's media by instance ID.""" + item = Item.objects.create( + media_id="10494", + source=Sources.TMDB.value, + media_type=MediaTypes.MOVIE.value, + title="Perfect Blue", + image="http://example.com/image.jpg", + ) + movie = Movie.objects.create( + item=item, + user=self.external_user, + progress=0, + status=Status.PLANNING.value, + ) + + response = self.client.post( + reverse("media_delete"), + data={ + "instance_id": movie.id, + "media_type": MediaTypes.MOVIE.value, + }, + ) + + self.assertEqual(response.status_code, 404) + self.assertTrue(Movie.objects.filter(id=movie.id).exists()) + def test_unwatch_episode(self): """Test unwatching of an episode through views.""" self.client.post( diff --git a/src/app/tests/views/test_progress.py b/src/app/tests/views/test_progress.py index 5b12f891..4762d131 100644 --- a/src/app/tests/views/test_progress.py +++ b/src/app/tests/views/test_progress.py @@ -112,7 +112,11 @@ class ProgressEditAnime(TestCase): def setUp(self): """Prepare the database with an anime.""" self.credentials = {"username": "test", "password": "12345"} + self.external_credentials = {"username": "test2", "password": "12345"} self.user = get_user_model().objects.create_user(**self.credentials) + self.external_user = get_user_model().objects.create_user( + **self.external_credentials + ) self.client.login(**self.credentials) self.item = Item.objects.create( @@ -163,6 +167,38 @@ class ProgressEditAnime(TestCase): self.assertEqual(Anime.objects.get(item__media_id="1").progress, 1) + def test_cannot_edit_another_users_progress(self): + """Test users cannot edit another user's media progress by instance ID.""" + item = Item.objects.create( + media_id="2", + source=Sources.MAL.value, + media_type=MediaTypes.ANIME.value, + title="Samurai Champloo", + image="http://example.com/image.jpg", + ) + anime = Anime( + item=item, + user=self.external_user, + status=Status.IN_PROGRESS.value, + progress=2, + ) + Anime.save_base(anime) + + response = self.client.post( + reverse( + "progress_edit", + kwargs={ + "media_type": MediaTypes.ANIME.value, + "instance_id": anime.id, + }, + ), + {"operation": "increase"}, + ) + + self.assertEqual(response.status_code, 404) + anime.refresh_from_db() + self.assertEqual(anime.progress, 2) + class ProgressEditPersistentMessages(TestCase): """Test HTMX progress edits that create persistent user messages.""" diff --git a/src/app/views.py b/src/app/views.py index 1c786a4f..0399d293 100644 --- a/src/app/views.py +++ b/src/app/views.py @@ -100,10 +100,8 @@ def progress_edit(request, media_type, instance_id): """Increase or decrease the progress of a media item from home page.""" operation = request.POST["operation"] - media = BasicMedia.objects.get_media_prefetch( - request.user, - media_type, - instance_id, + media = helpers.get_owned_media_or_404( + request, media_type, instance_id, prefetch=True ) if operation == "increase": @@ -375,11 +373,7 @@ def season_details(request, source, media_id, title, season_number): # noqa: AR @require_POST def update_media_score(request, media_type, instance_id): """Update the user's score for a media item.""" - media = BasicMedia.objects.get_media( - request.user, - media_type, - instance_id, - ) + media = helpers.get_owned_media_or_404(request, media_type, instance_id) score = float(request.POST.get("score")) media.score = score @@ -584,11 +578,7 @@ def media_save(request): instance_id = request.POST.get("instance_id") if instance_id: - instance = BasicMedia.objects.get_media( - request.user, - media_type, - instance_id, - ) + instance = helpers.get_owned_media_or_404(request, media_type, instance_id) else: metadata = services.get_media_metadata( media_type, @@ -632,19 +622,9 @@ def media_delete(request): """Delete media data from the database.""" instance_id = request.POST["instance_id"] media_type = request.POST["media_type"] - model = apps.get_model(app_label="app", model_name=media_type) - - try: - media = BasicMedia.objects.get_media( - request.user, - media_type, - instance_id, - ) - media.delete() - logger.info("%s deleted successfully.", media) - - except model.DoesNotExist: - logger.warning("The %s was already deleted before.", media_type) + media = helpers.get_owned_media_or_404(request, media_type, instance_id) + media.delete() + logger.info("%s deleted successfully.", media) return helpers.redirect_back(request)