Files
romm/backend/utils/validation.py
Georges-Antoine Assi 30451d5651 fix(security): move SSRF defense into the HTTP client path
The previous validator did a preflight `socket.getaddrinfo` before each
httpx request. Two problems:

  * DNS rebinding / TOCTOU: httpx re-resolves at connect time, so a
    hostname can answer with a public IP for the validator and a
    private IP for the real request. The preflight check did not
    constrain the connection.
  * Event-loop blocking: `socket.getaddrinfo` is synchronous, and the
    media-download callers are async. Slow resolvers stalled
    unrelated requests.

Replace it with two layers, both wired automatically onto every httpx
client built by `utils.context`:

  1. A request event hook running `validate_url_for_http_request`
     (syntactic checks only: scheme, reserved hostnames, literal IPs,
     internal TLDs). No DNS, no call-site responsibility.
  2. `SSRFProtectedAsyncBackend` / `SSRFProtectedSyncBackend`, custom
     httpcore network backends that resolve the hostname inside
     `connect_tcp`, reject any address in a forbidden range, then
     connect to that *same* validated address. The async variant uses
     `loop.getaddrinfo` so it doesn't block the loop. httpcore calls
     `start_tls(server_hostname=<URL host>)` after `connect_tcp`, so
     TLS SNI and cert verification still use the original hostname
     even though the TCP layer connects by IP.

Drop the explicit `validate_url_for_http_request(...)` calls from
`resources_handler.py` — the event hook covers them. Consolidate the
URL validator and its tests under `utils/ssrf.py` /
`tests/utils/test_ssrf.py` so the SSRF surface lives in one module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 17:58:14 -04:00

118 lines
3.3 KiB
Python

import re
from logger.logger import log
from models.user import TEXT_FIELD_LENGTH
class ValidationError(Exception):
"""Custom exception for validation errors."""
def __init__(self, message: str, field_name: str = "field"):
self.message = message
self.field_name = field_name
super().__init__(self.message)
# Pre-compiled regex patterns for better performance
USERNAME_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$")
EMAIL_PATTERN = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
def validate_ascii_only(value: str, field_name: str = "field") -> None:
"""Validate that a string contains only ASCII characters.
Args:
value (str): The value to validate
field_name (str): The name of the field for error messages
Raises:
ValidationError: If the value contains non-ASCII characters
"""
if not value:
return
# Check if any character is outside ASCII range (0-127)
if any(ord(char) > 127 for char in value):
msg = f"{field_name} must contain only ASCII characters"
log.error(f"Validation failed: {msg}")
raise ValidationError(msg, field_name)
def validate_username(username: str) -> None:
"""Validate username format and content.
Args:
username (str): The username to validate
Raises:
ValidationError: If the username is invalid
"""
if not username or not username.strip():
msg = "Username cannot be empty"
log.error(msg)
raise ValidationError(msg, "Username")
validate_ascii_only(username, "Username")
if len(username) < 3:
msg = "Username must be at least 3 characters long"
log.error(msg)
raise ValidationError(msg, "Username")
if len(username) > TEXT_FIELD_LENGTH:
msg = "Username must be no more than 255 characters long"
log.error(msg)
raise ValidationError(msg, "Username")
if not USERNAME_PATTERN.match(username):
msg = "Username can only contain letters, numbers, underscores, and hyphens"
log.error(f"Validation failed: {msg} for username: {username}")
raise ValidationError(msg, "Username")
def validate_password(password: str) -> None:
"""Validate password format and content.
Args:
password (str): The password to validate
Raises:
ValidationError: If the password is invalid
"""
if not password or not password.strip():
msg = "Password cannot be empty"
log.error(msg)
raise ValidationError(msg, "Password")
validate_ascii_only(password, "Password")
if len(password) < 6:
msg = "Password must be at least 6 characters long"
log.error(msg)
raise ValidationError(msg, "Password")
if len(password) > TEXT_FIELD_LENGTH:
msg = "Password must be no more than 255 characters long"
log.error(msg)
raise ValidationError(msg, "Password")
def validate_email(email: str) -> None:
"""Validate email format and content.
Args:
email (str): The email to validate
Raises:
ValidationError: If the email is invalid
"""
if not email:
return
validate_ascii_only(email, "Email")
if not EMAIL_PATTERN.match(email):
msg = "Invalid email format"
log.error(f"Validation failed: {msg} for email: {email}")
raise ValidationError(msg, "Email")