replace SQL TruncDate with Python day-binning to avoid SQLite “string or blob too big”

This commit is contained in:
FuzzyGrim
2025-07-12 14:34:35 +02:00
parent 0d938c087d
commit b3437d6812
2 changed files with 54 additions and 118 deletions

View File

@@ -9,11 +9,9 @@ from dateutil.relativedelta import relativedelta
from django.apps import apps
from django.db import models
from django.db.models import (
Count,
Prefetch,
Q,
)
from django.db.models.functions import TruncDate
from django.utils import timezone
from app import media_type_config
@@ -514,36 +512,33 @@ def get_level(count):
def get_filtered_historical_data(start_date, end_date, user):
"""Get historical data filtered by date range."""
"""Return [{"date": datetime.date, "count": int}]."""
historical_models = BasicMedia.objects.get_historical_models()
combined_data = []
local_timezone = timezone.get_current_timezone()
local_tz = timezone.get_current_timezone()
day_buckets = defaultdict(int)
for model_name in historical_models:
historical_model = apps.get_model("app", model_name)
model = apps.get_model("app", model_name)
# Start with base query
query = historical_model.objects.filter(
history_user_id=user,
)
qs = model.objects.filter(history_user_id=user)
# Add date filters conditionally
if start_date is not None:
query = query.filter(history_date__date__gte=start_date)
if end_date is not None:
query = query.filter(history_date__date__lte=end_date)
if start_date:
qs = qs.filter(history_date__gte=start_date)
if end_date:
qs = qs.filter(history_date__lte=end_date)
# Annotate and aggregate
data = (
query.annotate(
date=TruncDate("history_date", tzinfo=local_timezone),
)
.values("date")
.annotate(count=Count("id"))
)
# We only need the timestamp, stream results to keep memory usage flat
for ts in qs.values_list("history_date", flat=True).iterator(chunk_size=2_000):
aware_ts = timezone.localtime(ts, local_tz)
combined_data.extend(data)
day_buckets[aware_ts.date()] += 1
combined_data = [
{"date": day, "count": count} for day, count in day_buckets.items()
]
logger.info("%s - built historical data (%s rows)", user, len(combined_data))
return combined_data

View File

@@ -2,7 +2,6 @@ import datetime
from unittest.mock import MagicMock, patch
from django.contrib.auth import get_user_model
from django.db.models import Count
from django.test import TestCase
from app import statistics
@@ -819,109 +818,51 @@ class StatisticsTests(TestCase):
@patch("app.statistics.BasicMedia.objects.get_historical_models")
@patch("app.statistics.apps.get_model")
def test_get_filtered_historical_data(
self,
mock_get_model,
mock_get_historical_models,
):
def test_get_filtered_historical_data(self, mock_get_model, mock_get_hist_models):
"""Test the get_filtered_historical_data function."""
# Setup test dates
start_date = datetime.datetime(2025, 1, 1, 0, 0, tzinfo=datetime.UTC)
end_date = datetime.datetime(2025, 3, 31, 0, 0, tzinfo=datetime.UTC)
start = datetime.datetime(2025, 1, 1, tzinfo=datetime.UTC)
end = datetime.datetime(2025, 3, 31, tzinfo=datetime.UTC)
# Mock historical models list
mock_get_historical_models.return_value = [
"historicalmodel1",
"historicalmodel2",
mock_get_hist_models.return_value = ["historicalmodel1", "historicalmodel2"]
def build_fake_model(timestamps):
qs = MagicMock()
qs.filter.return_value = qs
qs.values_list.return_value.iterator.return_value = timestamps
model = MagicMock()
model.objects = qs
return model
model1_ts = [
datetime.datetime(2025, 1, 5, 12, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 5, 18, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 9, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 10, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 11, tzinfo=datetime.UTC),
]
model2_ts = [
datetime.datetime(2025, 2, 15, 8, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 17, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 18, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 19, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 20, tzinfo=datetime.UTC),
]
# Create mock historical data for first model
mock_historical_model1 = MagicMock()
user_chain1 = mock_historical_model1.objects.filter.return_value
start_date_chain1 = user_chain1.filter.return_value
end_date_chain1 = start_date_chain1.filter.return_value
annotate_chain1 = end_date_chain1.annotate.return_value
values_chain1 = annotate_chain1.values.return_value
values_chain1.annotate.return_value = [
{"date": datetime.date(2025, 1, 5), "count": 3},
{"date": datetime.date(2025, 1, 10), "count": 2},
]
# Create mock historical data for second model
mock_historical_model2 = MagicMock()
user_chain2 = mock_historical_model2.objects.filter.return_value
start_date_chain2 = user_chain2.filter.return_value
end_date_chain2 = start_date_chain2.filter.return_value
annotate_chain2 = end_date_chain2.annotate.return_value
values_chain2 = annotate_chain2.values.return_value
values_chain2.annotate.return_value = [
{"date": datetime.date(2025, 2, 15), "count": 1},
{"date": datetime.date(2025, 3, 20), "count": 4},
]
# Setup the get_model mock to return different models based on input
def side_effect(_, model_name):
if model_name == "historicalmodel1":
return mock_historical_model1
if model_name == "historicalmodel2":
return mock_historical_model2
return MagicMock()
mock_get_model.side_effect = side_effect
# Call the function
result = statistics.get_filtered_historical_data(
start_date,
end_date,
self.user,
fake_model1 = build_fake_model(model1_ts)
fake_model2 = build_fake_model(model2_ts)
mock_get_model.side_effect = lambda _, name: (
fake_model1 if name == "historicalmodel1" else fake_model2
)
# Verify results
self.assertEqual(len(result), 4) # Should have 4 date entries
result = statistics.get_filtered_historical_data(start, end, self.user)
# Check that the data from both models is combined
expected_data = [
{"date": datetime.date(2025, 1, 5), "count": 3},
{"date": datetime.date(2025, 1, 10), "count": 2},
expected = [
{"date": datetime.date(2025, 1, 5), "count": 2},
{"date": datetime.date(2025, 1, 10), "count": 3},
{"date": datetime.date(2025, 2, 15), "count": 1},
{"date": datetime.date(2025, 3, 20), "count": 4},
]
# Check that all expected data is in the result
for item in expected_data:
self.assertIn(item, result)
# Verify the filter calls were made correctly
for model_mock in [mock_historical_model1, mock_historical_model2]:
# Check first filter call (history_user_id)
first_filter_kwargs = model_mock.objects.filter.call_args[1]
self.assertEqual(first_filter_kwargs["history_user_id"], self.user)
# Check second filter call (start_date)
user_chain = model_mock.objects.filter.return_value
start_date_filter_kwargs = user_chain.filter.call_args[1]
self.assertEqual(
start_date_filter_kwargs["history_date__date__gte"],
start_date,
)
# Check third filter call (end_date)
start_date_chain = user_chain.filter.return_value
end_date_filter_kwargs = start_date_chain.filter.call_args[1]
self.assertEqual(
end_date_filter_kwargs["history_date__date__lte"],
end_date,
)
# Verify the annotation and values calls
end_date_chain = start_date_chain.filter.return_value
end_date_chain.annotate.assert_called_once()
annotate_chain = end_date_chain.annotate.return_value
annotate_chain.values.assert_called_once_with("date")
values_chain = annotate_chain.values.return_value
values_chain.annotate.assert_called_once_with(count=Count("id"))
self.assertCountEqual(result, expected)
def test_calculate_day_of_week_stats(self):
"""Test the calculate_day_of_week_stats function."""