feature/repair-persistent-queues

This commit is contained in:
AlvinRamoutar
2025-12-31 04:25:51 -05:00
parent 179452b4f4
commit d799a4a8eb
3 changed files with 72 additions and 6 deletions

View File

@@ -16,7 +16,7 @@ COPY pyproject.toml uv.lock docker-entrypoint.sh ./
# Install dependencies
RUN sed -i 's/\r$//g' docker-entrypoint.sh && \
chmod +x docker-entrypoint.sh && \
apk add --update ffmpeg aria2 coreutils shadow su-exec curl tini deno && \
apk add --update ffmpeg aria2 coreutils shadow su-exec curl tini deno gdbm-tools sqlite && \
apk add --update --virtual .build-deps gcc g++ musl-dev uv && \
UV_PROJECT_ENVIRONMENT=/usr/local uv sync --frozen --no-dev --compile-bytecode && \
apk del .build-deps && \

View File

@@ -270,8 +270,9 @@ MeTube development relies on code contributions by the community. The program as
Make sure you have Node.js 22+ and Python 3.13 installed.
```bash
cd metube/ui
# install Angular and build the UI
cd ui
curl -fsSL https://get.pnpm.io/install.sh | sh -
pnpm install
pnpm run build
# install python dependencies

View File

@@ -8,6 +8,8 @@ import multiprocessing
import logging
import re
import types
import dbm
import subprocess
import yt_dlp.networking.impersonate
from dl_formats import get_format, get_opts, AUDIO_FORMATS
@@ -193,13 +195,16 @@ class Download:
await self.notifier.updated(self.info)
class PersistentQueue:
def __init__(self, path):
def __init__(self, name, path):
self.identifier = name
pdir = os.path.dirname(path)
if not os.path.isdir(pdir):
os.mkdir(pdir)
with shelve.open(path, 'c'):
pass
self.path = path
self.repair()
self.dict = OrderedDict()
def load(self):
@@ -238,13 +243,73 @@ class PersistentQueue:
def empty(self):
return not bool(self.dict)
def repair(self):
# check DB format
type_check = subprocess.run(
["file", self.path],
capture_output=True,
text=True
)
db_type = type_check.stdout.lower()
if "gnu dbm" in db_type:
# perform gdbm repair
log_prefix = f"PersistentQueue:{self.identifier} repair (dbm/file)"
log.debug(f"{log_prefix} started")
try:
result = subprocess.run(
["gdbmtool", self.path],
input="recover verbose summary\n",
text=True,
capture_output=True,
cwd=os.getcwd()
)
log.debug(f"{log_prefix}{result.stdout}")
if result.stderr:
log.debug(f"{log_prefix} failed: {result.stderr}")
except FileNotFoundError:
log.debug(f"{log_prefix} failed: 'gdbmtool' was not found")
# perform null key cleanup
log_prefix = f"PersistentQueue:{self.identifier} repair (null keys)"
log.debug(f"{log_prefix} started")
deleted = 0
try:
with dbm.open((self.path), "w") as db:
for key in list(db.keys()):
if key and all(b == 0x00 for b in key):
log.debug(f"{log_prefix} deleting key of length {len(key)} (all NUL bytes)")
del db[key]
deleted += 1
log.debug(f"{log_prefix} done - deleted {deleted} key(s)")
except dbm.error:
log.debug(f"{log_prefix} failed: db type is dbm.gnu, but the module is not available")
elif "sqlite" in db_type:
# perform sqlite3 recovery
log_prefix = f"PersistentQueue:{self.identifier} repair (sqlite3/file)"
log.debug(f"{log_prefix} started")
try:
result = subprocess.run(
f"sqlite3 {self.path} '.recover' | sqlite3 {self.path}",
capture_output=True,
text=True,
shell=True
)
if result.stderr:
log.debug(f"{log_prefix} failed: {result.stderr}")
else:
log.debug(f"{log_prefix}{result.stdout or " was successful, no output"}")
except FileNotFoundError:
log.debug(f"{log_prefix} failed: 'sqlite3' was not found")
class DownloadQueue:
def __init__(self, config, notifier):
self.config = config
self.notifier = notifier
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
self.pending = PersistentQueue(self.config.STATE_DIR + '/pending')
self.queue = PersistentQueue("queue", self.config.STATE_DIR + '/queue')
self.done = PersistentQueue("completed", self.config.STATE_DIR + '/completed')
self.pending = PersistentQueue("pending", self.config.STATE_DIR + '/pending')
self.active_downloads = set()
self.semaphore = None
# For sequential mode, use an asyncio lock to ensure one-at-a-time execution.