import as background tasks with celery

This commit is contained in:
FuzzyGrim
2024-04-25 11:40:43 +02:00
parent b586c49566
commit 69e52b6ca7
15 changed files with 268 additions and 138 deletions

View File

@@ -6,9 +6,8 @@ ENV PYTHONUNBUFFERED=1
COPY ./requirements.txt /requirements.txt
RUN apt-get update \
&& apt-get install -y --no-install-recommends g++ gosu \
&& pip install --no-cache-dir -r /requirements.txt \
&& apt-get -y autoremove --purge g++ \
&& pip install --upgrade --no-cache-dir supervisor==4.2.5 \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*
@@ -18,6 +17,7 @@ ENV PYTHONUNBUFFERED=1
WORKDIR /yamtrack
COPY ./entrypoint.sh /entrypoint.sh
COPY ./supervisord.conf /etc/supervisord.conf
RUN chmod +x /entrypoint.sh && \
# create user abc for later PUID/PGID mapping

View File

@@ -46,14 +46,14 @@ Alternatively, if you need a PostgreSQL database, you can use the `docker-compos
| IGDB_ID | String | IGDB API key for games, a default key is provided but it's recommended to get your own as it has a low rate limit. |
| IGDB_SECRET | String | IGDB API secret for games, a default value is provided but it's recommended to get your own as it has a low rate limit. |
| IGDB_NSFW | Bool | Default to false, set to true to include adult content in game searches |
| REDIS_URL | String | Redis is recommended for better performance |
| REDIS_URL | String | Default to redis://localhost:6379, Redis is needed for processing background tasks, set this to your redis server url. |
| SECRET | String | [Secret key](https://docs.djangoproject.com/en/stable/ref/settings/#secret-key) used for cryptographic signing, should be a random string |
| ALLOWED_HOSTS | List | Host/domain names that this Django site can serve, set this to your domain name if exposing to the public |
| REGISTRATION | Bool | Default to true, set to false to disable user registration |
| DEBUG | Bool | Default to false, set to true for debugging |
| PUID | Int | User ID for the app, default to 1000 |
| PGID | Int | Group ID for the app, default to 1000 |
| TZ | String | Timezone, default to UTC |
| TZ | String | Timezone, default to UTC |
| WEB_CONCURRENCY | Int | Number of webserver processes, default to 1 but it's recommended to have a value of [(2 x num cores) + 1](https://docs.gunicorn.org/en/latest/design.html#how-many-workers) |
### Environment variables for PostgreSQL
@@ -75,6 +75,12 @@ git clone https://github.com/FuzzyGrim/Yamtrack.git
cd Yamtrack
```
Install Redis or spin up a bare redis container:
```bash
docker run -d --name redis -p 6379:6379 --restart unless-stopped redis:7-alpine
```
Create a `.env` file in the root directory and add the following variables.
```bash
@@ -90,8 +96,9 @@ Then run the following commands.
```bash
python -m pip install -U -r requirements_dev.txt
python src/manage.py migrate
python src/manage.py runserver
cd src
python manage.py migrate
python manage.py runserver & celery --app config worker -l DEBUG
```
Go to: http://localhost:8000

View File

@@ -12,4 +12,4 @@ usermod -o -u "$PUID" abc
chown -R abc:abc db
chown -R abc:abc staticfiles
exec gosu abc:abc gunicorn --bind :8000 --timeout 120 --preload config.wsgi:application
exec /usr/local/bin/supervisord -c /etc/supervisord.conf

View File

@@ -1,5 +1,7 @@
celery==5.4.0
crispy-bootstrap5==2024.2
Django==5.0.4
django-celery-results==2.5.1
django-crispy-forms==2.1
django_debug_toolbar==4.3.0
django-model-utils==4.5.0

View File

@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from config.celery import app as celery_app
__all__ = ("celery_app",)

13
src/config/celery.py Normal file
View File

@@ -0,0 +1,13 @@
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("yamtrack")
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()

View File

@@ -41,6 +41,7 @@ INSTALLED_APPS = [
"crispy_forms",
"crispy_bootstrap5",
"debug_toolbar",
"django_celery_results",
"simple_history",
]
@@ -212,7 +213,7 @@ TZ = zoneinfo.ZoneInfo(TIME_ZONE)
IMG_NONE = "https://www.themoviedb.org/assets/2/v4/glyphicons/basic/glyphicons-basic-38-picture-grey-c2ebdbb057f2a7614185931650f8cee23fa137b93812ccb132b9df511df1cfac.svg"
REQUEST_TIMEOUT = 10 # seconds
REQUEST_TIMEOUT = 30 # seconds
TMDB_API = config("TMDB_API", default="61572be02f0a068658828f6396aacf60")
TMDB_NSFW = config("TMDB_NSFW", default=False, cast=bool)
@@ -240,3 +241,25 @@ DEBUG_TOOLBAR_CONFIG = {
"bootstrap5/",
),
}
# Celery settings
CELERY_BROKER_URL = REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 600 # 10 minutes
CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "default"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-serializer
CELERY_TASK_SERIALIZER = "pickle"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-accept_content
CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"]

View File

@@ -0,0 +1,16 @@
from simple_history.utils import bulk_create_with_history
def bulk_chunk_import(media_list, model):
"""Bulk import media in chunks.
Fixes bulk_create_with_history limit
https://github.com/jazzband/django-simple-history/issues/1216#issuecomment-1903240831
"""
chunk_size = 500
for i in range(0, len(media_list), chunk_size):
bulk_create_with_history(
media_list[i : i + chunk_size],
model,
ignore_conflicts=True,
)

View File

@@ -4,7 +4,8 @@ import logging
from app.models import Anime, Manga
from app.providers import services
from django.apps import apps
from simple_history.utils import bulk_create_with_history
from integrations import helpers
logger = logging.getLogger(__name__)
@@ -88,8 +89,16 @@ def importer(username, user):
params={"query": query, "variables": variables},
)
# returns media that couldn't be added
return add_media_list(query, warning_message="", user=user)
num_anime_before = Anime.objects.filter(user=user).count()
num_manga_before = Manga.objects.filter(user=user).count()
# media that couldn't be added
warning_message = add_media_list(query, warning_message="", user=user)
num_anime_imported = Anime.objects.filter(user=user).count() - num_anime_before
num_manga_imported = Manga.objects.filter(user=user).count() - num_manga_before
return num_anime_imported, num_manga_imported, warning_message
def add_media_list(query, warning_message, user):
@@ -139,8 +148,8 @@ def add_media_list(query, warning_message, user):
bulk_media[media_type].append(instance)
bulk_create_with_history(bulk_media["anime"], Anime, ignore_conflicts=True)
bulk_create_with_history(bulk_media["manga"], Manga, ignore_conflicts=True)
helpers.bulk_chunk_import(bulk_media["anime"], Anime)
helpers.bulk_chunk_import(bulk_media["manga"], Manga)
return warning_message

View File

@@ -4,7 +4,8 @@ from app.models import Anime, Manga
from app.providers import services
from django.apps import apps
from django.conf import settings
from simple_history.utils import bulk_create_with_history
from integrations import helpers
logger = logging.getLogger(__name__)
@@ -31,8 +32,15 @@ def importer(username, user):
mangas = get_whole_response(manga_url, manga_params)
bulk_add_manga = add_media_list(mangas, "manga", user)
bulk_create_with_history(bulk_add_anime, Anime, ignore_conflicts=True)
bulk_create_with_history(bulk_add_manga, Manga, ignore_conflicts=True)
num_anime_before = Anime.objects.filter(user=user).count()
helpers.bulk_chunk_import(bulk_add_anime, Anime)
num_manga_before = Manga.objects.filter(user=user).count()
helpers.bulk_chunk_import(bulk_add_manga, Manga)
num_anime_imported = Anime.objects.filter(user=user).count() - num_anime_before
num_manga_imported = Manga.objects.filter(user=user).count() - num_manga_before
return num_anime_imported, num_manga_imported
def get_whole_response(url, params):

View File

@@ -1,4 +1,3 @@
import contextlib
import csv
import datetime
import logging
@@ -6,7 +5,7 @@ import logging
from app import forms
from app.providers import services
from django.apps import apps
from django.db import IntegrityError
from django.db.utils import IntegrityError
logger = logging.getLogger(__name__)
@@ -18,12 +17,15 @@ def importer(file, user, status):
logger.info("Importing from TMDB")
num_tv_imported = 0
num_movie_imported = 0
for row in reader:
media_type = row["Type"]
episode_number = row["Episode Number"]
media_id = row["TMDb ID"]
# if movie or tv show (not episode), currently cant import episodes
# if movie or tv show (not episode), currently not importing episodes
if media_type == "movie" or (media_type == "tv" and episode_number == ""):
media_metadata = services.get_media_metadata(media_type, media_id)
@@ -32,15 +34,23 @@ def importer(file, user, status):
if form.is_valid():
# ignore if already in database
with contextlib.suppress(IntegrityError):
try:
# not using bulk_create because need of custom save method
form.save()
except IntegrityError:
pass
else:
if media_type == "tv":
num_tv_imported += 1
else:
num_movie_imported += 1
else:
error_message = (
f"{media_metadata['title']} ({media_type}): Import failed."
)
logger.error(error_message)
return num_tv_imported, num_movie_imported
def create_instance(media_metadata, user):

View File

@@ -5,7 +5,8 @@ from app import forms
from app.forms import EpisodeForm
from app.models import TV, Episode, Season
from django.apps import apps
from simple_history.utils import bulk_create_with_history
from integrations import helpers
logger = logging.getLogger(__name__)
@@ -27,6 +28,8 @@ def importer(file, user):
"game": [],
}
imported_counts = {}
for row in reader:
media_type = row["media_type"]
if media_type == "episode":
@@ -45,14 +48,15 @@ def importer(file, user):
else:
add_bulk_media(row, user, bulk_media)
for media_type in ["anime", "manga", "movie", "tv", "game"]:
for media_type in ["anime", "manga", "movie", "game", "tv"]:
if bulk_media[media_type]:
model = apps.get_model(app_label="app", model_name=media_type)
bulk_create_with_history(
bulk_media[media_type],
model,
ignore_conflicts=True,
)
num_objects_before = model.objects.filter(user=user).count()
helpers.bulk_chunk_import(bulk_media[media_type], model)
num_objects_after = model.objects.filter(user=user).count()
imported_counts[media_type] = num_objects_after - num_objects_before
if bulk_media["season"]:
# Extract unique media IDs from the seasons
@@ -68,11 +72,11 @@ def importer(file, user):
for season in bulk_media["season"]:
season.related_tv = tv_mapping[season.media_id]
bulk_create_with_history(
bulk_media["season"],
Season,
ignore_conflicts=True,
)
num_seasons_before = Season.objects.filter(user=user).count()
helpers.bulk_chunk_import(bulk_media["season"], Season)
num_seasons_after = Season.objects.filter(user=user).count()
imported_counts["season"] = num_seasons_after - num_seasons_before
if bulk_media["episodes"]:
# Extract unique media IDs and season numbers from the episodes
@@ -99,11 +103,14 @@ def importer(file, user):
episode["instance"].related_season = season_mapping[season_key]
episode_instances = [episode["instance"] for episode in bulk_media["episodes"]]
bulk_create_with_history(
episode_instances,
Episode,
ignore_conflicts=True,
)
num_episodes_before = Episode.objects.filter(related_season__user=user).count()
helpers.bulk_chunk_import(episode_instances, Episode)
num_episodes_after = Episode.objects.filter(related_season__user=user).count()
imported_counts["episode"] = num_episodes_after - num_episodes_before
return imported_counts
def add_bulk_media(row, user, bulk_media):

89
src/integrations/tasks.py Normal file
View File

@@ -0,0 +1,89 @@
import requests
from celery import shared_task
from integrations.imports import anilist, mal, tmdb, yamtrack
@shared_task(name="Import from MyAnimeList")
def import_mal(username, user):
"""Celery task for importing anime and manga data from MyAnimeList."""
try:
num_anime_imported, num_manga_imported = mal.importer(username, user)
except requests.exceptions.HTTPError as error:
if error.response.status_code == requests.codes.not_found:
msg = f"User {username} not found."
raise ValueError(msg) from error
raise # re-raise for other errors
else:
return f"Imported {num_anime_imported} anime and {num_manga_imported} manga."
@shared_task(name="Import from TMDB")
def import_tmdb(file, user, status):
"""Celery task for importing TMDB tv shows and movies."""
try:
num_tv_imported, num_movie_imported = tmdb.importer(file, user, status)
except UnicodeDecodeError as error:
msg = "Invalid file format. Please upload a CSV file."
raise ValueError(msg) from error
except KeyError as error:
msg = "Error parsing TMDB CSV file."
raise ValueError(msg) from error
else:
return f"Imported {num_tv_imported} TV shows and {num_movie_imported} movies."
@shared_task(name="Import from AniList")
def import_anilist(username, user):
"""Celery task for importing anime and manga data from AniList."""
try:
num_anime_imported, num_manga_imported, warning_message = anilist.importer(
username,
user,
)
except requests.exceptions.HTTPError as error:
error_message = error.response.json()["errors"][0].get("message")
if error_message == "User not found":
msg = f"User {username} not found."
raise ValueError(msg) from error
if error_message == "Private User":
msg = f"User {username} is private."
raise ValueError(msg) from error
raise # re-raise for other errors
else:
message = f"Imported {num_anime_imported} anime and {num_manga_imported} manga."
if warning_message:
title = "\n \nCouldn't import the following Anime or Manga: \n"
message += title + warning_message
return message
@shared_task(name="Import from Yamtrack")
def import_yamtrack(file, user):
"""Celery task for importing media data from Yamtrack."""
try:
imported_counts = yamtrack.importer(file, user)
except UnicodeDecodeError as error:
msg = "Invalid file format. Please upload a CSV file."
raise ValueError(msg) from error
except KeyError as error:
msg = "Error parsing Yamtrack CSV file."
raise ValueError(msg) from error
else:
media_type_str = {
"anime": "anime",
"manga": "manga",
"movie": "movies",
"game": "games",
"tv": "TV shows",
"season": "seasons",
"episode": "episodes",
}
imported_summary_list = [
f"{count} {media_type_str[media_type]}"
for media_type, count in imported_counts.items()
]
imported_summary = (
", ".join(imported_summary_list[:-1]) + " and " + imported_summary_list[-1]
)
return f"Imported {imported_summary}."

View File

@@ -1,17 +1,14 @@
"""Contains views for importing and exporting media data from various sources."""
import logging
import re
from functools import wraps
import requests
from django.contrib import messages
from django.http import HttpResponse
from django.shortcuts import redirect
from django.views.decorators.http import require_GET, require_POST
from integrations import exports
from integrations.imports import anilist, mal, tmdb, yamtrack
from integrations import exports, tasks
logger = logging.getLogger(__name__)
@@ -35,25 +32,8 @@ def check_demo(view):
def import_mal(request):
"""View for importing anime and manga data from MyAnimeList."""
username = request.POST["mal"]
# only alphanumeric, hyphen, and underscore characters are allowed
if not re.match("^[A-Za-z0-9_-]*$", username):
msg = f"Invalid username format: {username}"
messages.error(request, msg)
return redirect("profile")
try:
mal.importer(username, request.user)
messages.success(request, "Your MyAnimeList has been imported!")
except requests.exceptions.HTTPError as error:
if error.response.status_code == requests.codes.not_found:
messages.error(
request,
f"User {request.POST['mal']} not found in MyAnimeList.",
)
else:
raise # re-raise for other errors
tasks.import_mal.delay(username, request.user)
messages.success(request, "MyAnimeList import task started in the background.")
return redirect("profile")
@@ -61,26 +41,12 @@ def import_mal(request):
@require_POST
def import_tmdb_ratings(request):
"""View for importing TMDB movie and TV ratings."""
try:
tmdb.importer(
request.FILES["tmdb_ratings"],
request.user,
status="Completed",
)
messages.success(request, "Your TMDB ratings have been imported!")
except UnicodeDecodeError: # when the file is not a CSV file
messages.error(
request,
"Couldn't import your TMDB ratings. Make sure the file is a CSV file.",
)
logger.exception("Error reading TMDB ratings file.")
except KeyError: # error parsing csv
messages.error(
request,
"Something went wrong while parsing your TMDB ratings.",
)
logger.exception("Error parsing TMDB ratings CSV file.")
tasks.import_tmdb.delay(
request.FILES["tmdb_ratings"],
request.user,
"Completed",
)
messages.success(request, "TMDB ratings import task started in the background.")
return redirect("profile")
@@ -88,26 +54,12 @@ def import_tmdb_ratings(request):
@require_POST
def import_tmdb_watchlist(request):
"""View for importing TMDB movie and TV watchlist."""
try:
tmdb.importer(
request.FILES["tmdb_watchlist"],
request.user,
status="Planning",
)
messages.success(request, "Your TMDB watchlist has been imported!")
except UnicodeDecodeError: # when the file is not a CSV file
messages.error(
request,
"Couldn't import your TMDB watchlist. Make sure the file is a CSV file.",
)
logger.exception("Error reading TMDB watchlist file.")
except KeyError: # error parsing csv
messages.error(
request,
"Something went wrong while parsing your TMDB watchlist.",
)
logger.exception("Error parsing TMDB watchlist CSV file.")
tasks.import_tmdb.delay(
request.FILES["tmdb_watchlist"],
request.user,
"Planning",
)
messages.success(request, "TMDB watchlist import task started in the background.")
return redirect("profile")
@@ -116,28 +68,8 @@ def import_tmdb_watchlist(request):
def import_anilist(request):
"""View for importing anime and manga data from AniList."""
username = request.POST["anilist"]
if not username.isalnum():
msg = f"Username must be alphanumeric: {username}"
messages.error(request, msg)
return redirect("profile")
try:
warning_message = anilist.importer(username, request.user)
if warning_message:
title = "Couldn't import the following Anime or Manga: \n"
messages.warning(request, title + warning_message)
else:
messages.success(request, "Your AniList has been imported!")
except requests.exceptions.HTTPError as error:
if error.response.json()["errors"][0].get("message") == "User not found":
messages.error(
request,
f"User {request.POST['anilist']} not found in AniList.",
)
else:
raise # re-raise for other errors
tasks.import_anilist.delay(username, request.user)
messages.success(request, "AniList import task started in the background.")
return redirect("profile")
@@ -145,22 +77,8 @@ def import_anilist(request):
@require_POST
def import_yamtrack(request):
"""View for importing anime and manga data from Yamtrack CSV."""
try:
yamtrack.importer(request.FILES["yamtrack_csv"], request.user)
messages.success(request, "Your Yamtrack CSV file has been imported!")
except UnicodeDecodeError: # when the file is not a CSV file
messages.error(
request,
"Couldn't import your Yamtrack CSV. Make sure the file is a CSV file.",
)
logger.exception("Error reading Yamtrack file.")
except KeyError: # error parsing csv
messages.error(
request,
"Something went wrong while parsing your Yamtrack CSV.",
)
logger.exception("Error parsing Yamtrack CSV file.")
tasks.import_yamtrack.delay(request.FILES["yamtrack_csv"], request.user)
messages.success(request, "Yamtrack import task started in the background.")
return redirect("profile")

23
supervisord.conf Normal file
View File

@@ -0,0 +1,23 @@
[supervisord]
nodaemon=true
user=root
[program:gunicorn]
command=gunicorn --bind :8000 --preload config.wsgi:application
user=abc
priority=1
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:celery]
command=celery --app config worker --loglevel INFO --without-mingle --without-gossip
user=abc
stopasgroup=true
stopwaitsecs=60
priority=5
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0