Files
metube/app/state_store.py

157 lines
5.1 KiB
Python

from __future__ import annotations
import base64
import collections.abc
import json
import logging
import os
import shelve
import tempfile
import time
from datetime import datetime
from typing import Any, Optional
log = logging.getLogger("state_store")
STATE_SCHEMA_VERSION = 2
_BYTES_MARKER = "__metube_bytes__"
_DATETIME_MARKER = "__metube_datetime__"
def to_json_compatible(value: Any) -> Any:
if value is None or isinstance(value, (bool, int, float, str)):
return value
if isinstance(value, bytes):
return {_BYTES_MARKER: base64.b64encode(value).decode("ascii")}
if isinstance(value, datetime):
return {_DATETIME_MARKER: value.isoformat()}
if isinstance(value, collections.abc.Mapping):
return {str(k): to_json_compatible(v) for k, v in value.items()}
if isinstance(value, (list, tuple, set, frozenset)):
return [to_json_compatible(v) for v in value]
if isinstance(value, collections.abc.Iterable):
return [to_json_compatible(v) for v in value]
raise TypeError(f"Value of type {type(value).__name__} is not JSON serializable")
def from_json_compatible(value: Any) -> Any:
if isinstance(value, list):
return [from_json_compatible(v) for v in value]
if isinstance(value, dict):
if set(value.keys()) == {_BYTES_MARKER}:
return base64.b64decode(value[_BYTES_MARKER].encode("ascii"))
if set(value.keys()) == {_DATETIME_MARKER}:
return datetime.fromisoformat(value[_DATETIME_MARKER])
return {k: from_json_compatible(v) for k, v in value.items()}
return value
def read_legacy_shelf(path: str) -> Optional[list[tuple[Any, Any]]]:
if not os.path.exists(path):
return None
try:
with shelve.open(path, "r") as shelf:
return list(shelf.items())
except Exception as exc:
log.warning("Could not read legacy shelf at %s: %s", path, exc)
return None
class AtomicJsonStore:
def __init__(self, path: str, *, kind: str, schema_version: int = STATE_SCHEMA_VERSION):
self.path = path
self.kind = kind
self.schema_version = schema_version
def _ensure_parent(self) -> None:
parent = os.path.dirname(self.path)
if parent and not os.path.isdir(parent):
os.makedirs(parent, exist_ok=True)
def _build_payload(self, data: dict[str, Any]) -> dict[str, Any]:
payload = {
"schema_version": self.schema_version,
"kind": self.kind,
}
payload.update(data)
return payload
def load(self) -> Optional[dict[str, Any]]:
if not os.path.exists(self.path):
return None
try:
with open(self.path, encoding="utf-8") as f:
payload = json.load(f)
if not isinstance(payload, dict):
raise ValueError("State file must contain a JSON object")
if payload.get("kind") != self.kind:
raise ValueError(
f"State file kind mismatch: expected {self.kind}, got {payload.get('kind')}"
)
return payload
except Exception as exc:
self.quarantine_invalid_file(exc)
return None
def save(self, data: dict[str, Any]) -> None:
self._ensure_parent()
payload = self._build_payload(data)
parent = os.path.dirname(self.path) or "."
fd, tmp_path = tempfile.mkstemp(
prefix=f".{os.path.basename(self.path)}.",
suffix=".tmp",
dir=parent,
text=True,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, separators=(",", ":"))
f.write("\n")
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, self.path)
self._fsync_directory(parent)
except Exception:
try:
os.remove(tmp_path)
except OSError:
pass
raise
def quarantine_invalid_file(self, exc: Exception) -> None:
if not os.path.exists(self.path):
return
ts = time.strftime("%Y%m%d%H%M%S")
backup_path = f"{self.path}.invalid.{ts}"
try:
os.replace(self.path, backup_path)
log.warning(
"State file at %s was invalid (%s); moved it to %s",
self.path,
exc,
backup_path,
)
except OSError as move_exc:
log.warning(
"State file at %s was invalid (%s) and could not be moved aside: %s",
self.path,
exc,
move_exc,
)
@staticmethod
def _fsync_directory(path: str) -> None:
try:
flags = os.O_RDONLY
if hasattr(os, "O_DIRECTORY"):
flags |= os.O_DIRECTORY
fd = os.open(path, flags)
except OSError:
return
try:
os.fsync(fd)
except OSError:
pass
finally:
os.close(fd)