mirror of
https://github.com/FuzzyGrim/Yamtrack.git
synced 2026-03-03 02:27:01 +00:00
simkl import (#245)
* add extra logging for debug * add default simkl keys * better log levels * better mal api rate limit handler * add debug logging for celery * ignore tv without episode data in simkl import * add api 404 handler in simkl import * standardize import result message * allow personal keys for simkl import * update readme * add simkl import tests * add info on where to get SIMKL keys
This commit is contained in:
@@ -21,7 +21,7 @@ You can try the app at [yamtrack.fuzzygrim.com](https://yamtrack.fuzzygrim.com)
|
||||
- Keep up with your upcoming media with a calendar.
|
||||
- Easy deployment with Docker via docker-compose with SQLite or PostgreSQL.
|
||||
- Multi-users functionality allowing individual accounts with personalized tracking.
|
||||
- Import from [Trakt](https://trakt.tv/), [MyAnimeList](https://myanimelist.net/), [The Movie Database](https://www.themoviedb.org/), [AniList](https://anilist.co/) and [Kitsu](https://kitsu.app/).
|
||||
- Import from [Trakt](https://trakt.tv/), [Simkl](https://simkl.com/), [MyAnimeList](https://myanimelist.net/), [The Movie Database](https://www.themoviedb.org/), [AniList](https://anilist.co/) and [Kitsu](https://kitsu.app/).
|
||||
- Export all your tracked media to a CSV file and import it back.
|
||||
|
||||
## Installing with Docker
|
||||
@@ -49,6 +49,8 @@ Alternatively, if you need a PostgreSQL database, you can use the `docker-compos
|
||||
| IGDB_ID | String | IGDB API key for games, a default key is provided but it's recommended to get your own as it has a low rate limit. |
|
||||
| IGDB_SECRET | String | IGDB API secret for games, a default value is provided but it's recommended to get your own as it has a low rate limit. |
|
||||
| IGDB_NSFW | Bool | Default to false, set to true to include adult content in game searches |
|
||||
| SIMKL_ID | String | Simkl API key for importing media, a default key is provided but you can get one at [Simkl Developer](https://simkl.com/settings/developer/new/custom-search/) |
|
||||
| SIMKL_SECRET | String | Simkl API secret for importing media, a default secret is provided but you can get one at [Simkl Developer](https://simkl.com/settings/developer/new/custom-search/) |
|
||||
| REDIS_URL | String | Default to redis://localhost:6379, Redis is needed for processing background tasks, set this to your redis server url. |
|
||||
| SECRET | String | [Secret key](https://docs.djangoproject.com/en/stable/ref/settings/#secret-key) used for cryptographic signing, should be a random string |
|
||||
| ALLOWED_HOSTS | List | Host/domain names that this Django site can serve, set this to your domain name if exposing to the public |
|
||||
|
||||
@@ -26,10 +26,14 @@ def get_redis_connection():
|
||||
redis_pool = get_redis_connection()
|
||||
|
||||
session = LimiterSession(
|
||||
per_second=10,
|
||||
per_second=5,
|
||||
bucket_class=RedisBucket,
|
||||
bucket_kwargs={"redis_pool": redis_pool, "bucket_name": "api"},
|
||||
)
|
||||
session.mount(
|
||||
"https://api.myanimelist.net/v2",
|
||||
LimiterAdapter(per_minute=30),
|
||||
)
|
||||
session.mount(
|
||||
"https://graphql.anilist.co",
|
||||
LimiterAdapter(per_minute=85),
|
||||
|
||||
@@ -148,13 +148,13 @@ LOGGING = {
|
||||
"disable_existing_loggers": False,
|
||||
"loggers": {
|
||||
"requests_ratelimiter": {
|
||||
"level": "INFO",
|
||||
"level": "DEBUG" if DEBUG else "INFO",
|
||||
},
|
||||
"psycopg": {
|
||||
"level": "INFO",
|
||||
"level": "DEBUG" if DEBUG else "INFO",
|
||||
},
|
||||
"urllib3": {
|
||||
"level": "INFO",
|
||||
"level": "DEBUG" if DEBUG else "INFO",
|
||||
},
|
||||
},
|
||||
"formatters": {
|
||||
@@ -230,8 +230,14 @@ IGDB_ID = config("IGDB_ID", default="8wqmm7x1n2xxtnz94lb8mthadhtgrt")
|
||||
IGDB_SECRET = config("IGDB_SECRET", default="ovbq0hwscv58hu46yxn50hovt4j8kj")
|
||||
IGDB_NSFW = config("IGDB_NSFW", default=False, cast=bool)
|
||||
|
||||
SIMKL_ID = config("SIMKL_ID", default="")
|
||||
SIMKL_SECRET = config("SIMKL_SECRET", default="")
|
||||
SIMKL_ID = config(
|
||||
"SIMKL_ID",
|
||||
default="f1df351ddbace7e2c52f0010efdeb1fd59d379d9cdfb88e9a847c68af410db0e",
|
||||
)
|
||||
SIMKL_SECRET = config(
|
||||
"SIMKL_SECRET",
|
||||
default="9bb254894a598894bee14f61eafdcdca47622ab346632f951ed7220a3de289b5",
|
||||
)
|
||||
|
||||
REGISTRATION = config("REGISTRATION", default=True, cast=bool)
|
||||
|
||||
@@ -268,7 +274,7 @@ CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
|
||||
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
||||
|
||||
CELERY_TASK_TRACK_STARTED = True
|
||||
CELERY_TASK_TIME_LIMIT = 60 * 60 * 6 # 6 hours
|
||||
CELERY_TASK_TIME_LIMIT = 60 * 60 * 6 # 6 hours
|
||||
|
||||
CELERY_RESULT_EXTENDED = True
|
||||
CELERY_RESULT_BACKEND = "django-db"
|
||||
|
||||
@@ -89,20 +89,20 @@ def importer(username, user):
|
||||
params={"query": query, "variables": variables},
|
||||
)
|
||||
|
||||
anime_imported, anime_warning = import_media(
|
||||
anime_imported, anime_warnings = import_media(
|
||||
response["data"]["anime"],
|
||||
"anime",
|
||||
user,
|
||||
)
|
||||
|
||||
manga_imported, manga_warning = import_media(
|
||||
manga_imported, manga_warnings = import_media(
|
||||
response["data"]["manga"],
|
||||
"manga",
|
||||
user,
|
||||
)
|
||||
|
||||
warning_message = anime_warning + manga_warning
|
||||
return anime_imported, manga_imported, warning_message
|
||||
warning_messages = anime_warnings + manga_warnings
|
||||
return anime_imported, manga_imported, "\n".join(warning_messages)
|
||||
|
||||
|
||||
def import_media(media_data, media_type, user):
|
||||
@@ -110,15 +110,15 @@ def import_media(media_data, media_type, user):
|
||||
logger.info("Importing %s from Anilist", media_type)
|
||||
|
||||
bulk_media = []
|
||||
warning_message = ""
|
||||
warnings = []
|
||||
for status_list in media_data["lists"]:
|
||||
if not status_list["isCustomList"]:
|
||||
bulk_media, warning_message = process_status_list(
|
||||
bulk_media, warnings = process_status_list(
|
||||
bulk_media,
|
||||
status_list,
|
||||
media_type,
|
||||
user,
|
||||
warning_message,
|
||||
warnings,
|
||||
)
|
||||
|
||||
model = apps.get_model(app_label="app", model_name=media_type)
|
||||
@@ -129,15 +129,15 @@ def import_media(media_data, media_type, user):
|
||||
|
||||
logger.info("Imported %s %s", num_imported, media_type)
|
||||
|
||||
return num_imported, warning_message
|
||||
return num_imported, warnings
|
||||
|
||||
|
||||
def process_status_list(bulk_media, status_list, media_type, user, warning_message):
|
||||
def process_status_list(bulk_media, status_list, media_type, user, warnings):
|
||||
"""Process each status list."""
|
||||
for content in status_list["entries"]:
|
||||
if content["media"]["idMal"] is None:
|
||||
warning_message += (
|
||||
f"No matching MAL ID for {content['media']['title']['userPreferred']}\n"
|
||||
warnings.append(
|
||||
f"{content['media']['title']['userPreferred']}: No matching MAL ID.",
|
||||
)
|
||||
else:
|
||||
if content["status"] == "CURRENT":
|
||||
@@ -170,7 +170,7 @@ def process_status_list(bulk_media, status_list, media_type, user, warning_messa
|
||||
)
|
||||
bulk_media.append(instance)
|
||||
|
||||
return bulk_media, warning_message
|
||||
return bulk_media, warnings
|
||||
|
||||
|
||||
def get_date(date):
|
||||
|
||||
@@ -19,13 +19,13 @@ KITSU_PAGE_LIMIT = 500
|
||||
def import_by_user_id(kitsu_id, user):
|
||||
"""Import anime and manga ratings from Kitsu by user ID."""
|
||||
anime_response = get_media_response(kitsu_id, "anime")
|
||||
num_anime_imported, anime_warning = importer(anime_response, "anime", user)
|
||||
num_anime_imported, anime_warnings = importer(anime_response, "anime", user)
|
||||
|
||||
manga_response = get_media_response(kitsu_id, "manga")
|
||||
num_manga_imported, manga_warning = importer(manga_response, "manga", user)
|
||||
|
||||
warning_message = anime_warning + manga_warning
|
||||
return num_anime_imported, num_manga_imported, warning_message
|
||||
warning_messages = anime_warnings + manga_warning
|
||||
return num_anime_imported, num_manga_imported, "\n".join(warning_messages)
|
||||
|
||||
|
||||
def import_by_username(kitsu_username, user):
|
||||
@@ -94,8 +94,7 @@ def importer(response, media_type, user):
|
||||
}
|
||||
|
||||
bulk_data = []
|
||||
warning_message = ""
|
||||
num_imported = 0
|
||||
warnings = []
|
||||
|
||||
current_file_dir = Path(__file__).resolve().parent
|
||||
json_file_path = current_file_dir / "data" / "kitsu-mu-mapping.json"
|
||||
@@ -112,15 +111,18 @@ def importer(response, media_type, user):
|
||||
user,
|
||||
)
|
||||
except ValueError as e:
|
||||
warning_message += f"{e}\n"
|
||||
warnings.append(str(e))
|
||||
else:
|
||||
bulk_data.append(instance)
|
||||
num_imported += 1
|
||||
|
||||
num_before = model.objects.filter(user=user).count()
|
||||
helpers.bulk_chunk_import(bulk_data, model, user)
|
||||
num_after = model.objects.filter(user=user).count()
|
||||
num_imported = num_after - num_before
|
||||
|
||||
logger.info("Imported %s %s", num_imported, media_type)
|
||||
|
||||
return num_imported, warning_message
|
||||
return num_imported, warnings
|
||||
|
||||
|
||||
def process_entry( # noqa: PLR0913
|
||||
@@ -219,7 +221,7 @@ def create_or_get_item(media_type, kitsu_metadata, mapping_lookup, kitsu_mu_mapp
|
||||
|
||||
if not media_id:
|
||||
media_title = kitsu_metadata["attributes"]["canonicalTitle"]
|
||||
msg = f"Couldn't find a matching ID for {media_title}."
|
||||
msg = f"{media_title}: No valid external ID found."
|
||||
raise ValueError(msg)
|
||||
|
||||
image_url = get_image_url(kitsu_metadata)
|
||||
|
||||
@@ -11,15 +11,11 @@ logger = logging.getLogger(__name__)
|
||||
SIMKL_API_BASE_URL = "https://api.simkl.com"
|
||||
|
||||
|
||||
def get_token(domain, scheme, code):
|
||||
def get_token(request):
|
||||
"""View for getting the SIMKL OAuth2 token."""
|
||||
simkl_id = settings.SIMKL_ID
|
||||
simkl_secret = settings.SIMKL_SECRET
|
||||
|
||||
if not simkl_id or not simkl_secret:
|
||||
msg = "SIMKL_ID and SIMKL_SECRET not set."
|
||||
raise ValueError(msg)
|
||||
|
||||
domain = request.get_host()
|
||||
scheme = request.scheme
|
||||
code = request.GET["code"]
|
||||
url = f"{SIMKL_API_BASE_URL}/oauth/token"
|
||||
|
||||
headers = {
|
||||
@@ -27,28 +23,45 @@ def get_token(domain, scheme, code):
|
||||
}
|
||||
|
||||
params = {
|
||||
"client_id": simkl_id,
|
||||
"client_secret": simkl_secret,
|
||||
"client_id": settings.SIMKL_ID,
|
||||
"client_secret": settings.SIMKL_SECRET,
|
||||
"code": code,
|
||||
"grant_type": "authorization_code",
|
||||
"redirect_uri": f"{scheme}://{domain}",
|
||||
}
|
||||
|
||||
request = app.providers.services.api_request(
|
||||
"SIMKL",
|
||||
"POST",
|
||||
url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
try:
|
||||
request = app.providers.services.api_request(
|
||||
"SIMKL",
|
||||
"POST",
|
||||
url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
except requests.exceptions.HTTPError as error:
|
||||
if error.response.status_code == requests.codes.unauthorized:
|
||||
msg = "Invalid SIMKL secret key."
|
||||
raise ValueError(msg) from error
|
||||
raise
|
||||
|
||||
return request["access_token"]
|
||||
|
||||
|
||||
def importer(domain, scheme, code, user):
|
||||
def importer(token, user):
|
||||
"""Import tv shows, movies and anime from SIMKL."""
|
||||
token = get_token(domain, scheme, code)
|
||||
data = get_user_list(token)
|
||||
|
||||
tv_count, tv_warnings = process_tv_list(data["shows"], user)
|
||||
movie_count, movie_warnings = process_movie_list(data["movies"], user)
|
||||
anime_count, anime_warnings = process_anime_list(data["anime"], user)
|
||||
|
||||
warning_messages = tv_warnings + movie_warnings + anime_warnings
|
||||
|
||||
return tv_count, movie_count, anime_count, "\n".join(warning_messages)
|
||||
|
||||
|
||||
def get_user_list(token):
|
||||
"""Get the user's list from SIMKL."""
|
||||
url = f"{SIMKL_API_BASE_URL}/sync/all-items/"
|
||||
headers = {
|
||||
"Authorization": f"Bearer: {token}",
|
||||
@@ -59,7 +72,7 @@ def importer(domain, scheme, code, user):
|
||||
"episode_watched_at": "yes",
|
||||
}
|
||||
|
||||
data = app.providers.services.api_request(
|
||||
return app.providers.services.api_request(
|
||||
"SIMKL",
|
||||
"GET",
|
||||
url,
|
||||
@@ -67,23 +80,37 @@ def importer(domain, scheme, code, user):
|
||||
params=params,
|
||||
)
|
||||
|
||||
tv_count = process_tv_list(data["shows"], user)
|
||||
movie_count = process_movie_list(data["movies"], user)
|
||||
anime_count, anime_warnings = process_anime_list(data["anime"], user)
|
||||
|
||||
return tv_count, movie_count, anime_count, "\n".join(anime_warnings)
|
||||
|
||||
|
||||
def process_tv_list(tv_list, user):
|
||||
"""Process TV list from SIMKL and add to database."""
|
||||
logger.info("Processing tv shows")
|
||||
warnings = []
|
||||
tv_count = 0
|
||||
|
||||
for tv in tv_list:
|
||||
title = tv["show"]["title"]
|
||||
msg = f"Processing {title}"
|
||||
logger.debug(msg)
|
||||
tmdb_id = tv["show"]["ids"]["tmdb"]
|
||||
tv_status = get_status(tv["status"])
|
||||
|
||||
season_numbers = [season["number"] for season in tv["seasons"]]
|
||||
metadata = app.providers.tmdb.tv_with_seasons(tmdb_id, season_numbers)
|
||||
try:
|
||||
season_numbers = [season["number"] for season in tv["seasons"]]
|
||||
except KeyError:
|
||||
warnings.append(
|
||||
f"{title}: It doesn't have data on episodes viewed.",
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
metadata = app.providers.tmdb.tv_with_seasons(tmdb_id, season_numbers)
|
||||
except requests.exceptions.HTTPError as error:
|
||||
if error.response.status_code == requests.codes.not_found:
|
||||
warnings.append(
|
||||
f"{title}: Couldn't fetch metadata from TMDB ({tmdb_id})",
|
||||
)
|
||||
continue
|
||||
raise
|
||||
|
||||
tv_item, _ = app.models.Item.objects.get_or_create(
|
||||
media_id=tmdb_id,
|
||||
@@ -138,14 +165,7 @@ def process_tv_list(tv_list, user):
|
||||
)
|
||||
|
||||
for episode in episodes:
|
||||
ep_img = None
|
||||
for episode_metadata in metadata[f"season/{season_number}"]["episodes"]:
|
||||
if episode_metadata["episode_number"] == episode["number"]:
|
||||
ep_img = episode_metadata["still_path"]
|
||||
break
|
||||
|
||||
if not ep_img:
|
||||
ep_img = settings.IMG_NONE
|
||||
ep_img = get_episode_image(episode, season_number, metadata)
|
||||
|
||||
episode_item, _ = app.models.Item.objects.get_or_create(
|
||||
media_id=tmdb_id,
|
||||
@@ -167,18 +187,41 @@ def process_tv_list(tv_list, user):
|
||||
},
|
||||
)
|
||||
logger.info("Finished processing tv shows")
|
||||
return tv_count
|
||||
return tv_count, warnings
|
||||
|
||||
|
||||
def get_episode_image(episode, season_number, metadata):
|
||||
"""Get the image for the episode."""
|
||||
for episode_metadata in metadata[f"season/{season_number}"]["episodes"]:
|
||||
if episode_metadata["episode_number"] == episode["number"]:
|
||||
return episode_metadata["still_path"]
|
||||
return settings.IMG_NONE
|
||||
|
||||
|
||||
def process_movie_list(movie_list, user):
|
||||
"""Process movie list from SIMKL and add to database."""
|
||||
logger.info("Processing movies")
|
||||
warnings = []
|
||||
movie_count = 0
|
||||
|
||||
for movie in movie_list:
|
||||
title = movie["movie"]["title"]
|
||||
|
||||
msg = f"Processing {title}"
|
||||
logger.debug(msg)
|
||||
|
||||
tmdb_id = movie["movie"]["ids"]["tmdb"]
|
||||
movie_status = get_status(movie["status"])
|
||||
|
||||
metadata = app.providers.tmdb.movie(tmdb_id)
|
||||
try:
|
||||
metadata = app.providers.tmdb.movie(tmdb_id)
|
||||
except requests.exceptions.HTTPError as error:
|
||||
if error.response.status_code == requests.codes.not_found:
|
||||
warnings.append(
|
||||
f"{title}: Couldn't fetch metadata from TMDB ({tmdb_id})",
|
||||
)
|
||||
continue
|
||||
raise
|
||||
|
||||
movie_item, _ = app.models.Item.objects.get_or_create(
|
||||
media_id=tmdb_id,
|
||||
@@ -206,7 +249,7 @@ def process_movie_list(movie_list, user):
|
||||
|
||||
logger.info("Finished processing movies")
|
||||
|
||||
return movie_count
|
||||
return movie_count, warnings
|
||||
|
||||
|
||||
def process_anime_list(anime_list, user):
|
||||
@@ -216,6 +259,10 @@ def process_anime_list(anime_list, user):
|
||||
anime_count = 0
|
||||
|
||||
for anime in anime_list:
|
||||
title = anime["show"]["title"]
|
||||
msg = f"Processing {title}"
|
||||
logger.debug(msg)
|
||||
|
||||
mal_id = anime["show"]["ids"]["mal"]
|
||||
anime_status = get_status(anime["status"])
|
||||
|
||||
@@ -224,9 +271,10 @@ def process_anime_list(anime_list, user):
|
||||
except requests.exceptions.HTTPError as error:
|
||||
if error.response.status_code == requests.codes.not_found:
|
||||
warnings.append(
|
||||
f"Anime: {anime['show']['title']} with MAL ID {mal_id} not found.",
|
||||
f"{title}: Couldn't fetch metadata from TMDB ({mal_id})",
|
||||
)
|
||||
continue
|
||||
raise
|
||||
|
||||
anime_item, _ = app.models.Item.objects.get_or_create(
|
||||
media_id=mal_id,
|
||||
|
||||
@@ -64,13 +64,13 @@ def importer(username, user):
|
||||
"ratings",
|
||||
)
|
||||
|
||||
msgs = shows_msg + movies_msg + watchlist_msg + ratings_msg
|
||||
warning_messages = shows_msg + movies_msg + watchlist_msg + ratings_msg
|
||||
return (
|
||||
shows_num,
|
||||
movies_num,
|
||||
watchlist_num,
|
||||
ratings_num,
|
||||
"\n".join(msgs),
|
||||
"\n".join(warning_messages),
|
||||
)
|
||||
|
||||
|
||||
@@ -482,13 +482,13 @@ def get_metadata(fetch_func, source, title, *args, **kwargs):
|
||||
return fetch_func(*args, **kwargs)
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == requests.codes.not_found:
|
||||
logger.warning("%s ID %s not found for %s", source, args[0], title)
|
||||
msg = f"Couldn't find {source} metadata for {title} with ID {args[0]}"
|
||||
msg = f"{title}: Couldn't fetch metadata from {source} ({args[0]})"
|
||||
logger.warning(msg)
|
||||
raise ValueError(msg) from e
|
||||
raise # Re-raise other HTTP errors
|
||||
except KeyError as e:
|
||||
logger.warning("%s ID %s incomplete metadata for %s", source, args[0], title)
|
||||
msg = f"Incomplete {source} metadata for {title} with ID {args[0]}"
|
||||
msg = f"{title}: Couldn't parse incomplete metadata from {source} ({args[0]})"
|
||||
logger.warning(msg)
|
||||
raise ValueError(msg) from e
|
||||
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ def import_trakt(username, user):
|
||||
num_movie_imported,
|
||||
num_watchlist_imported,
|
||||
num_ratings_imported,
|
||||
msg,
|
||||
warning_message,
|
||||
) = trakt.importer(username, user)
|
||||
info_message = (
|
||||
f"Imported {num_tv_imported} TV shows, "
|
||||
@@ -22,16 +22,19 @@ def import_trakt(username, user):
|
||||
f"{num_watchlist_imported} watchlist items, "
|
||||
f"and {num_ratings_imported} ratings."
|
||||
)
|
||||
if msg:
|
||||
return f"{info_message} {ERROR_TITLE} {msg}"
|
||||
if warning_message:
|
||||
return f"{info_message} {ERROR_TITLE} {warning_message}"
|
||||
return info_message
|
||||
|
||||
|
||||
@shared_task(name="Import from SIMKL")
|
||||
def import_simkl(domain, scheme, code, user):
|
||||
def import_simkl(token, user):
|
||||
"""Celery task for importing anime and manga data from SIMKL."""
|
||||
num_tv_imported, num_movie_imported, num_anime_imported, msg = simkl.importer(
|
||||
domain, scheme, code, user,
|
||||
num_tv_imported, num_movie_imported, num_anime_imported, warning_message = (
|
||||
simkl.importer(
|
||||
token,
|
||||
user,
|
||||
)
|
||||
)
|
||||
|
||||
info_message = (
|
||||
@@ -39,8 +42,8 @@ def import_simkl(domain, scheme, code, user):
|
||||
f"{num_movie_imported} movies, "
|
||||
f"and {num_anime_imported} anime."
|
||||
)
|
||||
if msg:
|
||||
return f"{info_message} {ERROR_TITLE} {msg}"
|
||||
if warning_message:
|
||||
return f"{info_message} {ERROR_TITLE} {warning_message}"
|
||||
return info_message
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from django.contrib.auth import get_user_model
|
||||
from django.test import TestCase
|
||||
|
||||
from app.models import TV, Anime, Episode, Item, Manga, Movie, Season
|
||||
from integrations.imports import anilist, kitsu, mal, tmdb, trakt, yamtrack
|
||||
from integrations.imports import anilist, kitsu, mal, simkl, tmdb, trakt, yamtrack
|
||||
|
||||
mock_path = Path(__file__).resolve().parent / "mock_data"
|
||||
app_mock_path = (
|
||||
@@ -351,3 +351,101 @@ class ImportTrakt(TestCase):
|
||||
"""Test getting date from Trakt."""
|
||||
self.assertEqual(trakt.get_date("2023-01-01T00:00:00.000Z"), date(2023, 1, 1))
|
||||
self.assertIsNone(trakt.get_date(None))
|
||||
|
||||
|
||||
class ImportSimkl(TestCase):
|
||||
"""Test importing media from SIMKL."""
|
||||
|
||||
def setUp(self):
|
||||
"""Create user for the tests."""
|
||||
credentials = {"username": "test", "password": "12345"}
|
||||
self.user = get_user_model().objects.create_user(**credentials)
|
||||
|
||||
@patch("integrations.imports.simkl.get_user_list")
|
||||
def test_importer(
|
||||
self,
|
||||
user_list,
|
||||
):
|
||||
"""Test importing media from SIMKL."""
|
||||
# Mock API response
|
||||
user_list.return_value = {
|
||||
"shows": [
|
||||
{
|
||||
"show": {"title": "Breaking Bad", "ids": {"tmdb": 1396}},
|
||||
"status": "watching",
|
||||
"user_rating": 8,
|
||||
"seasons": [
|
||||
{
|
||||
"number": 1,
|
||||
"episodes": [
|
||||
{"number": 1, "watched_at": "2023-01-01T00:00:00Z"},
|
||||
{"number": 2, "watched_at": "2023-01-02T00:00:00Z"},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
"movies": [
|
||||
{
|
||||
"movie": {"title": "Perfect Blue", "ids": {"tmdb": 10494}},
|
||||
"status": "completed",
|
||||
"user_rating": 9,
|
||||
"last_watched_at": "2023-02-01T00:00:00Z",
|
||||
},
|
||||
],
|
||||
"anime": [
|
||||
{
|
||||
"show": {"title": "Example Anime", "ids": {"mal": 1}},
|
||||
"status": "plantowatch",
|
||||
"user_rating": 7,
|
||||
"watched_episodes_count": 0,
|
||||
"last_watched_at": None,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
tv_count, movie_count, anime_count, warnings = simkl.importer(
|
||||
"token",
|
||||
self.user,
|
||||
)
|
||||
|
||||
# Check the results
|
||||
self.assertEqual(tv_count, 1)
|
||||
self.assertEqual(movie_count, 1)
|
||||
self.assertEqual(anime_count, 1)
|
||||
self.assertEqual(warnings, "")
|
||||
|
||||
# Check TV show
|
||||
tv_item = Item.objects.get(media_type="tv")
|
||||
self.assertEqual(tv_item.title, "Breaking Bad")
|
||||
tv_obj = TV.objects.get(item=tv_item)
|
||||
self.assertEqual(tv_obj.status, "In progress")
|
||||
self.assertEqual(tv_obj.score, 8)
|
||||
|
||||
# Check Movie
|
||||
movie_item = Item.objects.get(media_type="movie")
|
||||
self.assertEqual(movie_item.title, "Perfect Blue")
|
||||
movie_obj = Movie.objects.get(item=movie_item)
|
||||
self.assertEqual(movie_obj.status, "Completed")
|
||||
self.assertEqual(movie_obj.score, 9)
|
||||
|
||||
# Check Anime
|
||||
anime_item = Item.objects.get(media_type="anime")
|
||||
self.assertEqual(anime_item.title, "Cowboy Bebop")
|
||||
anime_obj = Anime.objects.get(item=anime_item)
|
||||
self.assertEqual(anime_obj.status, "Planning")
|
||||
self.assertEqual(anime_obj.score, 7)
|
||||
|
||||
def test_get_status(self):
|
||||
"""Test mapping SIMKL status to internal status."""
|
||||
self.assertEqual(simkl.get_status("completed"), "Completed")
|
||||
self.assertEqual(simkl.get_status("watching"), "In progress")
|
||||
self.assertEqual(simkl.get_status("plantowatch"), "Planning")
|
||||
self.assertEqual(simkl.get_status("hold"), "Paused")
|
||||
self.assertEqual(simkl.get_status("dropped"), "Dropped")
|
||||
self.assertEqual(simkl.get_status("unknown"), "In progress") # Default case
|
||||
|
||||
def test_get_date(self):
|
||||
"""Test getting date from SIMKL."""
|
||||
self.assertEqual(simkl.get_date("2023-01-01T00:00:00Z"), date(2023, 1, 1))
|
||||
self.assertIsNone(simkl.get_date(None))
|
||||
|
||||
@@ -10,6 +10,7 @@ from django.shortcuts import redirect
|
||||
from django.views.decorators.http import require_GET, require_POST
|
||||
|
||||
from integrations import exports, tasks
|
||||
from integrations.imports import simkl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -38,11 +39,8 @@ def simkl_oauth(request):
|
||||
@require_GET
|
||||
def import_simkl(request):
|
||||
"""View for getting the SIMKL OAuth2 token."""
|
||||
domain = request.get_host()
|
||||
scheme = request.scheme
|
||||
code = request.GET["code"]
|
||||
user = request.user
|
||||
tasks.import_simkl.delay(domain, scheme, code, user)
|
||||
token = simkl.get_token(request)
|
||||
tasks.import_simkl.delay(token, request.user)
|
||||
messages.success(request, "SIMKL import task queued.")
|
||||
return redirect("profile")
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@ stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
|
||||
[program:celery]
|
||||
command=celery --app config worker --loglevel INFO --without-mingle --without-gossip
|
||||
command=bash -c 'if [ "$DEBUG" = "True" ]; then LOGLEVEL=DEBUG; else LOGLEVEL=INFO; fi; celery --app config worker --loglevel $LOGLEVEL --without-mingle --without-gossip'
|
||||
environment=DEBUG=%(ENV_DEBUG)s
|
||||
user=abc
|
||||
stopasgroup=true
|
||||
stopwaitsecs=60
|
||||
@@ -23,11 +24,12 @@ stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
|
||||
[program:celery-beat]
|
||||
command=celery --app config beat -s ./db/celerybeat-schedule --loglevel INFO
|
||||
command=bash -c 'if [ "$DEBUG" = "True" ]; then LOGLEVEL=DEBUG; else LOGLEVEL=INFO; fi; celery --app config beat -s ./db/celerybeat-schedule --loglevel $LOGLEVEL'
|
||||
environment=DEBUG=%(ENV_DEBUG)s
|
||||
user=abc
|
||||
stopasgroup=true
|
||||
priority=10
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
stderr_logfile_maxbytes=0
|
||||
Reference in New Issue
Block a user