harden media ownership checks

This commit is contained in:
FuzzyGrim
2026-05-09 16:15:14 +02:00
parent 724a66df22
commit 02b5a797a7
5 changed files with 173 additions and 29 deletions

View File

@@ -4,8 +4,9 @@ from urllib.parse import parse_qsl, urlencode, urljoin, urlparse
from django.apps import apps
from django.conf import settings
from django.contrib import messages
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from django.http import HttpResponseRedirect
from django.http import Http404, HttpResponseRedirect
from django.shortcuts import redirect
from django.utils import timezone
from django.utils.encoding import iri_to_uri
@@ -17,6 +18,25 @@ YEAR_ONLY_PARTS = 1
YEAR_MONTH_PARTS = 2
def get_owned_media_or_404(request, media_type, instance_id, *, prefetch=False):
"""Return media owned by the current user or raise 404."""
try:
if prefetch:
return BasicMedia.objects.get_media_prefetch(
request.user,
media_type,
instance_id,
)
return BasicMedia.objects.get_media(
request.user,
media_type,
instance_id,
)
except ObjectDoesNotExist as exc:
msg = "Media not found"
raise Http404(msg) from exc
def get_configured_app_url():
"""Return the configured public application origin, if one is available."""
for url in getattr(settings, "URLS", []):

View File

@@ -670,7 +670,7 @@ class MediaManager(models.Manager):
queryset = self._apply_prefetch_related(queryset, media_type)
self.annotate_max_progress(queryset, media_type)
return queryset[0]
return queryset.get()
def _get_media_params(
self,

View File

@@ -23,7 +23,11 @@ class CreateMedia(TestCase):
def setUp(self):
"""Create a user and log in."""
self.credentials = {"username": "test", "password": "12345"}
self.external_credentials = {"username": "test2", "password": "12345"}
self.user = get_user_model().objects.create_user(**self.credentials)
self.external_user = get_user_model().objects.create_user(
**self.external_credentials
)
self.client.login(**self.credentials)
@override_settings(MEDIA_ROOT=("create_media"))
@@ -129,7 +133,11 @@ class EditMedia(TestCase):
def setUp(self):
"""Create a user and log in."""
self.credentials = {"username": "test", "password": "12345"}
self.external_credentials = {"username": "test2", "password": "12345"}
self.user = get_user_model().objects.create_user(**self.credentials)
self.external_user = get_user_model().objects.create_user(
**self.external_credentials
)
self.client.login(**self.credentials)
def test_edit_movie_score(self):
@@ -167,6 +175,75 @@ class EditMedia(TestCase):
)
self.assertEqual(Movie.objects.get(item__media_id="10494").score, 10)
def test_cannot_edit_another_users_media(self):
"""Test users cannot edit another user's media by instance ID."""
item = Item.objects.create(
media_id="10494",
source=Sources.TMDB.value,
media_type=MediaTypes.MOVIE.value,
title="Perfect Blue",
image="http://example.com/image.jpg",
)
movie = Movie.objects.create(
item=item,
user=self.external_user,
score=9,
progress=0,
status=Status.PLANNING.value,
notes="Nice",
)
response = self.client.post(
reverse("media_save"),
{
"instance_id": movie.id,
"media_id": "10494",
"source": Sources.TMDB.value,
"media_type": MediaTypes.MOVIE.value,
"score": 10,
"progress": 0,
"status": Status.PLANNING.value,
"notes": "Changed",
},
)
self.assertEqual(response.status_code, 404)
movie.refresh_from_db()
self.assertEqual(movie.score, 9)
self.assertEqual(movie.notes, "Nice")
def test_cannot_update_another_users_media_score(self):
"""Test users cannot update another user's score by instance ID."""
item = Item.objects.create(
media_id="10494",
source=Sources.TMDB.value,
media_type=MediaTypes.MOVIE.value,
title="Perfect Blue",
image="http://example.com/image.jpg",
)
movie = Movie.objects.create(
item=item,
user=self.external_user,
score=9,
progress=0,
status=Status.PLANNING.value,
)
response = self.client.post(
reverse(
"update_media_score",
kwargs={
"media_type": MediaTypes.MOVIE.value,
"instance_id": movie.id,
},
),
{"score": 10},
)
self.assertEqual(response.status_code, 404)
movie.refresh_from_db()
self.assertEqual(movie.score, 9)
class DeleteMedia(TestCase):
"""Test the deletion of media objects through views."""
@@ -174,7 +251,11 @@ class DeleteMedia(TestCase):
def setUp(self):
"""Create a user and log in."""
self.credentials = {"username": "test", "password": "12345"}
self.external_credentials = {"username": "test2", "password": "12345"}
self.user = get_user_model().objects.create_user(**self.credentials)
self.external_user = get_user_model().objects.create_user(
**self.external_credentials
)
self.client.login(**self.credentials)
self.item_season = Item.objects.create(
@@ -236,6 +317,33 @@ class DeleteMedia(TestCase):
0,
)
def test_cannot_delete_another_users_media(self):
"""Test users cannot delete another user's media by instance ID."""
item = Item.objects.create(
media_id="10494",
source=Sources.TMDB.value,
media_type=MediaTypes.MOVIE.value,
title="Perfect Blue",
image="http://example.com/image.jpg",
)
movie = Movie.objects.create(
item=item,
user=self.external_user,
progress=0,
status=Status.PLANNING.value,
)
response = self.client.post(
reverse("media_delete"),
data={
"instance_id": movie.id,
"media_type": MediaTypes.MOVIE.value,
},
)
self.assertEqual(response.status_code, 404)
self.assertTrue(Movie.objects.filter(id=movie.id).exists())
def test_unwatch_episode(self):
"""Test unwatching of an episode through views."""
self.client.post(

View File

@@ -112,7 +112,11 @@ class ProgressEditAnime(TestCase):
def setUp(self):
"""Prepare the database with an anime."""
self.credentials = {"username": "test", "password": "12345"}
self.external_credentials = {"username": "test2", "password": "12345"}
self.user = get_user_model().objects.create_user(**self.credentials)
self.external_user = get_user_model().objects.create_user(
**self.external_credentials
)
self.client.login(**self.credentials)
self.item = Item.objects.create(
@@ -163,6 +167,38 @@ class ProgressEditAnime(TestCase):
self.assertEqual(Anime.objects.get(item__media_id="1").progress, 1)
def test_cannot_edit_another_users_progress(self):
"""Test users cannot edit another user's media progress by instance ID."""
item = Item.objects.create(
media_id="2",
source=Sources.MAL.value,
media_type=MediaTypes.ANIME.value,
title="Samurai Champloo",
image="http://example.com/image.jpg",
)
anime = Anime(
item=item,
user=self.external_user,
status=Status.IN_PROGRESS.value,
progress=2,
)
Anime.save_base(anime)
response = self.client.post(
reverse(
"progress_edit",
kwargs={
"media_type": MediaTypes.ANIME.value,
"instance_id": anime.id,
},
),
{"operation": "increase"},
)
self.assertEqual(response.status_code, 404)
anime.refresh_from_db()
self.assertEqual(anime.progress, 2)
class ProgressEditPersistentMessages(TestCase):
"""Test HTMX progress edits that create persistent user messages."""

View File

@@ -100,10 +100,8 @@ def progress_edit(request, media_type, instance_id):
"""Increase or decrease the progress of a media item from home page."""
operation = request.POST["operation"]
media = BasicMedia.objects.get_media_prefetch(
request.user,
media_type,
instance_id,
media = helpers.get_owned_media_or_404(
request, media_type, instance_id, prefetch=True
)
if operation == "increase":
@@ -375,11 +373,7 @@ def season_details(request, source, media_id, title, season_number): # noqa: AR
@require_POST
def update_media_score(request, media_type, instance_id):
"""Update the user's score for a media item."""
media = BasicMedia.objects.get_media(
request.user,
media_type,
instance_id,
)
media = helpers.get_owned_media_or_404(request, media_type, instance_id)
score = float(request.POST.get("score"))
media.score = score
@@ -584,11 +578,7 @@ def media_save(request):
instance_id = request.POST.get("instance_id")
if instance_id:
instance = BasicMedia.objects.get_media(
request.user,
media_type,
instance_id,
)
instance = helpers.get_owned_media_or_404(request, media_type, instance_id)
else:
metadata = services.get_media_metadata(
media_type,
@@ -632,19 +622,9 @@ def media_delete(request):
"""Delete media data from the database."""
instance_id = request.POST["instance_id"]
media_type = request.POST["media_type"]
model = apps.get_model(app_label="app", model_name=media_type)
try:
media = BasicMedia.objects.get_media(
request.user,
media_type,
instance_id,
)
media.delete()
logger.info("%s deleted successfully.", media)
except model.DoesNotExist:
logger.warning("The %s was already deleted before.", media_type)
media = helpers.get_owned_media_or_404(request, media_type, instance_id)
media.delete()
logger.info("%s deleted successfully.", media)
return helpers.redirect_back(request)