mirror of
https://github.com/rommapp/romm.git
synced 2026-06-27 22:35:57 +00:00
fix: handle malformed authorization header in hybrid auth backend
Co-authored-by: zurdi15 <34356590+zurdi15@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
27725c26b3
commit
a2775ca2b8
@@ -25,7 +25,11 @@ class HybridAuthBackend(AuthenticationBackend):
|
||||
|
||||
# Check if Authorization header exists
|
||||
if "Authorization" in conn.headers:
|
||||
scheme, token = conn.headers["Authorization"].split()
|
||||
auth_header_parts = conn.headers["Authorization"].split()
|
||||
if len(auth_header_parts) != 2:
|
||||
return None
|
||||
|
||||
scheme, token = auth_header_parts
|
||||
|
||||
# Check if basic auth header is valid
|
||||
if scheme.lower() == "basic":
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi import status
|
||||
|
||||
from exceptions.fs_exceptions import PlatformAlreadyExistsException
|
||||
@@ -51,6 +52,14 @@ def test_heartbeat(client):
|
||||
assert isinstance(oidc["RP_INITIATED_LOGOUT"], bool)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("authorization_header", ["Bearer ", "Foo", "a b c"])
|
||||
def test_heartbeat_with_malformed_authorization_header(client, authorization_header: str):
|
||||
response = client.get(
|
||||
"/api/heartbeat", headers={"Authorization": authorization_header}
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
|
||||
def test_heartbeat_metadata(client):
|
||||
response = client.get("/api/heartbeat/metadata/launchbox")
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
|
||||
@@ -192,6 +192,22 @@ async def test_hybrid_auth_backend_invalid_scheme():
|
||||
assert result is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("authorization_header", ["Bearer ", "Foo", "a b c"])
|
||||
async def test_hybrid_auth_backend_malformed_authorization_header(
|
||||
authorization_header: str,
|
||||
):
|
||||
class MockConnection(HTTPConnection):
|
||||
def __init__(self):
|
||||
self.scope: dict[str, dict] = {"session": {}}
|
||||
self._headers = {"Authorization": authorization_header}
|
||||
|
||||
backend = HybridAuthBackend()
|
||||
conn = MockConnection()
|
||||
|
||||
result = await backend.authenticate(conn)
|
||||
assert result is None
|
||||
|
||||
|
||||
async def test_hybrid_auth_backend_with_refresh_token(editor_user: User):
|
||||
refresh_token = oauth_handler.create_refresh_token(
|
||||
data={
|
||||
|
||||
Reference in New Issue
Block a user