Files
romm/backend/tests/handler/filesystem/test_roms_handler.py
2026-06-18 19:59:00 -04:00

1652 lines
61 KiB
Python

import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from hypothesis import assume, given
from hypothesis import strategies as st
from tests._zipfile_shim import reload_zipfile
from config.config_manager import LIBRARY_BASE_PATH, Config
from handler.filesystem.base_handler import (
LANGUAGES_BY_SHORTCODE,
REGIONS_BY_SHORTCODE,
)
from handler.filesystem.roms_handler import (
FileHash,
FSRomsHandler,
)
from models.platform import Platform
from models.rom import Rom, RomFile, RomFileCategory
from utils.archives import extract_chd_hash
class TestFSRomsHandler:
"""Test suite for FSRomsHandler class"""
@pytest.fixture
def handler(self):
return FSRomsHandler()
@pytest.fixture
def config(self):
return Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=["tmp"],
EXCLUDED_SINGLE_FILES=["excluded_test.tmp"],
EXCLUDED_MULTI_FILES=["excluded_multi"],
EXCLUDED_MULTI_PARTS_EXT=["tmp"],
EXCLUDED_MULTI_PARTS_FILES=["excluded_part.bin"],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
@pytest.fixture
def platform(self):
return Platform(name="Nintendo 64", slug="n64", fs_slug="n64")
@pytest.fixture
def rom_single(self, platform: Platform):
return Rom(
id=1,
fs_name="Paper Mario (USA).z64",
fs_path="n64/roms",
fs_extension="z64",
platform=platform,
full_path="n64/roms/Paper Mario (USA).z64",
)
@pytest.fixture
def rom_single_nested(self, platform: Platform):
return Rom(
id=3,
fs_name="Sonic (EU) [T]",
fs_path="n64/roms",
fs_extension="",
platform=platform,
full_path="n64/roms/Sonic (EU) [T]",
files=[
RomFile(
id=1,
file_name="Sonic (EU) [T].n64",
file_path="n64/roms/Sonic (EU) [T]",
),
RomFile(
id=2,
file_name="Sonic (EU) [T-En].z64",
file_path="n64/roms/Sonic (EU) [T]/translation",
),
],
)
@pytest.fixture
def rom_multi(self, platform: Platform):
return Rom(
id=2,
fs_name="Super Mario 64 (J) (Rev A)",
fs_path="n64/roms",
fs_extension="",
platform=platform,
files=[
RomFile(
id=1,
file_name="Super Mario 64 (J) (Rev A) [Part 1].z64",
file_path="n64/roms",
),
RomFile(
id=2,
file_name="Super Mario 64 (J) (Rev A) [Part 2].z64",
file_path="n64/roms",
),
],
)
def test_init_uses_library_base_path(self, handler: FSRomsHandler):
"""Test that FSRomsHandler initializes with LIBRARY_BASE_PATH"""
assert handler.base_path == Path(LIBRARY_BASE_PATH).resolve()
def test_get_roms_fs_structure_structure_b(self, handler: FSRomsHandler):
"""Test get_roms_fs_structure with Structure B ({platform}/roms)"""
fs_slug = "n64"
cnfg = Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=[],
EXCLUDED_SINGLE_FILES=[],
EXCLUDED_MULTI_FILES=[],
EXCLUDED_MULTI_PARTS_EXT=[],
EXCLUDED_MULTI_PARTS_FILES=[],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
cnfg.has_structure_path_b = True
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: cnfg)
result = handler.get_roms_fs_structure(fs_slug)
assert result == f"{fs_slug}/roms"
def test_get_roms_fs_structure_structure_a(self, handler: FSRomsHandler):
"""Test get_roms_fs_structure with Structure A (roms/{platform})"""
fs_slug = "n64"
cnfg = Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=[],
EXCLUDED_SINGLE_FILES=[],
EXCLUDED_MULTI_FILES=[],
EXCLUDED_MULTI_PARTS_EXT=[],
EXCLUDED_MULTI_PARTS_FILES=[],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
cnfg.has_structure_path_b = False
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: cnfg)
result = handler.get_roms_fs_structure(fs_slug)
assert result == f"roms/{fs_slug}"
def test_parse_tags_regions_and_languages(self, handler: FSRomsHandler):
"""Test parse_tags method with regions and languages"""
fs_name = "Zelda (USA) (Rev 1) [En,Fr] [Test].n64"
parsed_tags = handler.parse_tags(fs_name)
assert "USA" in parsed_tags.regions
assert parsed_tags.revision == "1"
assert parsed_tags.version == ""
assert "English" in parsed_tags.languages
assert "French" in parsed_tags.languages
assert "Test" in parsed_tags.other_tags
def test_parse_tags_complex_tags(self, handler: FSRomsHandler):
"""Test parse_tags with complex tag structures"""
fs_name = "Game (Europe) (En,De,Fr,Es,It) (Rev A) [Reg-PAL] [Beta].rom"
parsed_tags = handler.parse_tags(fs_name)
assert "Europe" in parsed_tags.regions
assert "PAL" in parsed_tags.regions
assert parsed_tags.revision == "A"
assert parsed_tags.version == ""
assert "English" in parsed_tags.languages
assert "German" in parsed_tags.languages
assert "French" in parsed_tags.languages
assert "Spanish" in parsed_tags.languages
assert "Italian" in parsed_tags.languages
assert "Beta" in parsed_tags.other_tags
def test_parse_tags_no_tags(self, handler: FSRomsHandler):
"""Test parse_tags with no tags"""
fs_name = "Simple Game.rom"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.regions == []
assert parsed_tags.revision == ""
assert parsed_tags.version == ""
assert parsed_tags.languages == []
assert parsed_tags.other_tags == []
def test_parse_tags_version(self, handler: FSRomsHandler):
"""Test parse_tags method with version tags"""
fs_name = "stardew_valley(v1.5.6.1988831614)(53038).exe"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.version == "1.5.6.1988831614"
assert "53038" in parsed_tags.other_tags
assert parsed_tags.regions == []
assert parsed_tags.revision == ""
assert parsed_tags.languages == []
fs_name = "My Game (Version 1.2.3).rom"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.version == "1.2.3"
fs_name = "My Game (Ver-1.2.3).rom"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.version == "1.2.3"
fs_name = "My Game (v_1.2.3).rom"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.version == "1.2.3"
fs_name = "My Game (v 1.2.3).rom"
parsed_tags = handler.parse_tags(fs_name)
assert parsed_tags.version == "1.2.3"
def test_exclude_multi_roms_filters_excluded(self, handler: FSRomsHandler, config):
"""Test exclude_multi_roms filters out excluded multi-file ROMs"""
roms = ["Game1", "excluded_multi", "Game2", "Game3"]
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
result = handler.exclude_multi_roms(roms)
expected = ["Game1", "Game2", "Game3"]
assert result == expected
def test_exclude_multi_roms_no_exclusions(self, handler: FSRomsHandler):
"""Test exclude_multi_roms with no exclusions"""
roms = ["Game1", "Game2", "Game3"]
config = Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=[],
EXCLUDED_SINGLE_FILES=[],
EXCLUDED_MULTI_FILES=[],
EXCLUDED_MULTI_PARTS_EXT=[],
EXCLUDED_MULTI_PARTS_FILES=[],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
result = handler.exclude_multi_roms(roms)
assert result == roms
def test_exclude_multi_roms_case_insensitive(self, handler: FSRomsHandler, config):
"""Test exclude_multi_roms ignores case in excluded names"""
roms = ["Game1", "Manuals", "Game2"]
config.EXCLUDED_MULTI_FILES = ["manuals"]
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
result = handler.exclude_multi_roms(roms)
assert result == ["Game1", "Game2"]
def test_exclude_multi_roms_ignores_whitespace(
self, handler: FSRomsHandler, config
):
"""Test exclude_multi_roms trims accidental surrounding whitespace"""
roms = ["Game1", "covers", "Game2"]
config.EXCLUDED_MULTI_FILES = [" covers "]
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
result = handler.exclude_multi_roms(roms)
assert result == ["Game1", "Game2"]
def test_exclude_multi_roms_wildcard_patterns(self, handler: FSRomsHandler, config):
"""Test exclude_multi_roms keeps wildcard matching with normalized config"""
roms = ["Game1", "Manuals", "manuals-fr", "Game2"]
config.EXCLUDED_MULTI_FILES = [" manuals* "]
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
result = handler.exclude_multi_roms(roms)
assert result == ["Game1", "Game2"]
def test_build_rom_file_single_file(self, rom_single: Rom, handler: FSRomsHandler):
"""Test _build_rom_file with actual single ROM file"""
rom_path = Path(rom_single.fs_path)
file_name = rom_single.fs_name
file_hash = FileHash(
{
"crc_hash": "ABCD1234",
"md5_hash": "def456",
"sha1_hash": "789ghi",
"chd_sha1_hash": "654321",
}
)
rom_file = handler._build_rom_file(rom_single, rom_path, file_name, file_hash)
assert isinstance(rom_file, RomFile)
assert rom_file.file_name == file_name
assert rom_file.file_path == str(rom_path)
assert rom_file.crc_hash == "ABCD1234"
assert rom_file.md5_hash == "def456"
assert rom_file.sha1_hash == "789ghi"
assert rom_file.file_size_bytes > 0 # Should have actual file size
assert rom_file.last_modified is not None
assert rom_file.category is None # No category matching for this path
def test_build_rom_file_with_category(self, rom_multi: Rom, handler: FSRomsHandler):
"""Test _build_rom_file with category detection"""
# Test with DLC category
rom_path = Path(rom_multi.fs_path, "dlc")
file_name = "test_dlc.n64"
file_hash = FileHash(
{
"crc_hash": "12345678",
"md5_hash": "abcdef",
"sha1_hash": "123456",
"chd_sha1_hash": "654321",
}
)
# Create the test file
os.makedirs(handler.base_path / rom_path, exist_ok=True)
test_file = handler.base_path / rom_path / file_name
test_file.write_text("Test DLC content")
try:
rom_file = handler._build_rom_file(
rom_multi, rom_path, file_name, file_hash
)
assert rom_file.category == RomFileCategory.DLC
assert rom_file.file_name == file_name
assert rom_file.file_size_bytes > 0
finally:
# Clean up
if test_file.exists():
test_file.unlink()
@pytest.mark.asyncio
async def test_get_roms(self, handler: FSRomsHandler, platform, config):
"""Test get_roms with actual files in the filesystem"""
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
m.setattr("os.path.exists", lambda x: False) # Normal structure
result = await handler.get_roms(platform)
assert isinstance(result, list)
assert len(result) > 0
# Check that we have both single and multi ROMs
single_roms = [r for r in result if not r["flat"]]
multi_roms = [r for r in result if r["nested"]]
assert len(single_roms) > 0
assert len(multi_roms) > 0
# Check specific files exist
rom_names = [r["fs_name"] for r in result]
assert "Paper Mario (USA).z64" in rom_names
assert "Super Mario 64 (J) (Rev A)" in rom_names
assert "Zelda (USA) (Rev 1) [En,Fr] [Test].n64" in rom_names
# Check excluded files are not present
assert "excluded_test.tmp" not in rom_names
@pytest.mark.asyncio
async def test_get_rom_files_single_rom(
self, handler: FSRomsHandler, rom_single, config
):
"""Test get_rom_files with a single ROM file"""
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
m.setattr("os.path.exists", lambda x: False) # Normal structure
parsed_rom_files = await handler.get_rom_files(rom_single)
assert len(parsed_rom_files.rom_files) == 1
assert isinstance(parsed_rom_files.rom_files[0], RomFile)
assert parsed_rom_files.rom_files[0].file_name == "Paper Mario (USA).z64"
assert parsed_rom_files.rom_files[0].file_path == "n64/roms"
assert parsed_rom_files.rom_files[0].file_size_bytes > 0
assert parsed_rom_files.crc_hash == "efb5af2e"
assert parsed_rom_files.md5_hash == "0f343b0931126a20f133d67c2b018a3b"
assert (
parsed_rom_files.sha1_hash == "60cacbf3d72e1e7834203da608037b1bf83b40e8"
)
@pytest.mark.asyncio
async def test_get_rom_files_multi_rom(
self, handler: FSRomsHandler, rom_multi, config
):
"""Test get_rom_files with a multi-part ROM"""
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
m.setattr("os.path.exists", lambda x: False) # Normal structure
parsed_rom_files = await handler.get_rom_files(rom_multi)
assert len(parsed_rom_files.rom_files) >= 2 # Should have multiple parts
file_names = [rf.file_name for rf in parsed_rom_files.rom_files]
assert "Super Mario 64 (J) (Rev A) [Part 1].z64" in file_names
assert "Super Mario 64 (J) (Rev A) [Part 2].z64" in file_names
for rom_file in parsed_rom_files.rom_files:
assert isinstance(rom_file, RomFile)
assert rom_file.file_size_bytes > 0
assert rom_file.last_modified is not None
@pytest.mark.asyncio
async def test_get_rom_files_multi_rom_multi_dot_exclusion(
self, handler: FSRomsHandler, rom_multi
):
"""Multi-dot filenames in a multi-part dir are excluded by simple or compound ext rules."""
multi_dot_file = (
handler.base_path / "n64/roms/Super Mario 64 (J) (Rev A)/game.n64.hash.txt"
)
multi_dot_file.write_text("hash data")
try:
# Exclude by the last single extension "txt"
config_txt = Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=[],
EXCLUDED_SINGLE_FILES=[],
EXCLUDED_MULTI_FILES=[],
EXCLUDED_MULTI_PARTS_EXT=["txt"],
EXCLUDED_MULTI_PARTS_FILES=[],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
with pytest.MonkeyPatch.context() as m:
m.setattr(
"handler.filesystem.roms_handler.cm.get_config", lambda: config_txt
)
parsed = await handler.get_rom_files(rom_multi)
file_names = [rf.file_name for rf in parsed.rom_files]
assert "game.n64.hash.txt" not in file_names
assert "Super Mario 64 (J) (Rev A) [Part 1].z64" in file_names
# Exclude by the compound extension "hash.txt"
config_compound = Config(
EXCLUDED_PLATFORMS=[],
EXCLUDED_SINGLE_EXT=[],
EXCLUDED_SINGLE_FILES=[],
EXCLUDED_MULTI_FILES=[],
EXCLUDED_MULTI_PARTS_EXT=["hash.txt"],
EXCLUDED_MULTI_PARTS_FILES=[],
PLATFORMS_BINDING={},
PLATFORMS_VERSIONS={},
ROMS_FOLDER_NAME="roms",
FIRMWARE_FOLDER_NAME="bios",
)
with pytest.MonkeyPatch.context() as m:
m.setattr(
"handler.filesystem.roms_handler.cm.get_config",
lambda: config_compound,
)
parsed = await handler.get_rom_files(rom_multi)
file_names = [rf.file_name for rf in parsed.rom_files]
assert "game.n64.hash.txt" not in file_names
assert "Super Mario 64 (J) (Rev A) [Part 1].z64" in file_names
finally:
multi_dot_file.unlink(missing_ok=True)
async def test_rename_fs_rom_same_name(self, handler: FSRomsHandler):
"""Test rename_fs_rom when old and new names are the same"""
old_name = "test_rom.n64"
new_name = "test_rom.n64"
fs_path = "n64/roms"
# Should not raise any exception
await handler.rename_fs_rom(old_name, new_name, fs_path)
async def test_rename_fs_rom_different_name_target_exists(
self, handler: FSRomsHandler
):
"""Test rename_fs_rom when target file already exists"""
old_name = "Paper Mario (USA).z64"
new_name = "test_game.n64" # This file exists
fs_path = "n64/roms"
from exceptions.fs_exceptions import RomAlreadyExistsException
with pytest.raises(RomAlreadyExistsException):
await handler.rename_fs_rom(old_name, new_name, fs_path)
async def test_rename_fs_rom_successful_rename(self, handler: FSRomsHandler):
"""Test successful ROM file rename"""
# Create a test file to rename
test_file = handler.base_path / "n64/roms/test_rename.n64"
test_file.write_text("Test ROM content")
old_name = "test_rename.n64"
new_name = "renamed_rom.n64"
fs_path = "n64/roms"
try:
await handler.rename_fs_rom(old_name, new_name, fs_path)
# Check that old file is gone and new file exists
old_path = handler.base_path / fs_path / old_name
new_path = handler.base_path / fs_path / new_name
assert not old_path.exists()
assert new_path.exists()
assert new_path.read_text() == "Test ROM content"
finally:
# Clean up
new_path = handler.base_path / fs_path / new_name
if new_path.exists():
new_path.unlink()
def test_integration_with_base_handler_methods(self, handler: FSRomsHandler):
"""Test that FSRomsHandler properly inherits from FSHandler"""
# Test that handler has base methods
assert hasattr(handler, "validate_path")
assert hasattr(handler, "list_files")
assert hasattr(handler, "list_directories")
assert hasattr(handler, "file_exists")
assert hasattr(handler, "move_file_or_folder")
assert hasattr(handler, "stream_file")
assert hasattr(handler, "exclude_single_files")
async def test_exclude_single_files_integration(
self, handler: FSRomsHandler, config
):
"""Test that exclude_single_files works with actual ROM files"""
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
# Get all files in the ROM directory
all_files = await handler.list_files(path="n64/roms")
# Should include .tmp files before exclusion
assert "excluded_test.tmp" in all_files
assert "Paper Mario (USA).z64" in all_files
# After exclusion, .tmp files should be removed
filtered_files = handler.exclude_single_files(all_files)
assert "excluded_test.tmp" not in filtered_files
assert "Paper Mario (USA).z64" in filtered_files
async def test_file_operations_with_actual_structure(self, handler: FSRomsHandler):
"""Test that file operations work with the actual ROM directory structure"""
# Test that we can list files
n64_files = await handler.list_files("n64/roms")
assert len(n64_files) > 0
n64_dirs = await handler.list_directories("n64/roms")
assert len(n64_dirs) > 0
# Test that we can check file existence
assert await handler.file_exists("n64/roms/Paper Mario (USA).z64")
assert await handler.file_exists("n64/roms/test_game.n64")
assert not await handler.file_exists("n64/roms/nonexistent.rom")
async def test_stream_file_with_actual_roms(self, handler: FSRomsHandler):
"""Test streaming actual ROM files"""
async with await handler.stream_file("n64/roms/Paper Mario (USA).z64") as f:
content = await f.read()
assert len(content) > 0
async with await handler.stream_file("n64/roms/test_game.n64") as f:
content = await f.read()
assert len(content) > 0
assert b"Test N64 ROM" in content
def test_tag_parsing_edge_cases(self, handler: FSRomsHandler):
"""Test tag parsing with edge cases"""
# Test with comma-separated tags
parsed_tags = handler.parse_tags("Game (USA,Europe) [En,Fr,De].rom")
assert "USA" in parsed_tags.regions
assert "Europe" in parsed_tags.regions
assert "English" in parsed_tags.languages
assert "French" in parsed_tags.languages
assert "German" in parsed_tags.languages
assert parsed_tags.version == ""
# Test with reg- prefix
parsed_tags = handler.parse_tags("Game [Reg-NTSC].rom")
assert "NTSC" in parsed_tags.regions
assert parsed_tags.version == ""
# Test with rev- prefix
parsed_tags = handler.parse_tags("Game [Rev-B].rom")
assert parsed_tags.revision == "B"
assert parsed_tags.version == ""
def test_platform_specific_behavior(self, handler: FSRomsHandler, config):
"""Test platform-specific behavior differences"""
# Create mock platforms - one hashable, one non-hashable
hashable_platform = Mock(spec=Platform)
hashable_platform.fs_slug = "gba"
hashable_platform.slug = "gba"
non_hashable_platform = Mock(spec=Platform)
non_hashable_platform.fs_slug = "n64"
non_hashable_platform.slug = "nintendo-64"
config.has_structure_path_b = True
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
hashable_path = handler.get_roms_fs_structure(hashable_platform.fs_slug)
non_hashable_path = handler.get_roms_fs_structure(
non_hashable_platform.fs_slug
)
assert hashable_path == f"{hashable_platform.fs_slug}/roms"
assert non_hashable_path == f"{non_hashable_platform.fs_slug}/roms"
async def test_multi_rom_directory_handling(self, handler: FSRomsHandler, config):
"""Test handling of multi-ROM directories with actual structure"""
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
# List directories in the ROM path
directories = await handler.list_directories("n64/roms")
# Should include our multi-ROM directories
assert "Super Mario 64 (J) (Rev A)" in directories
assert "Test Multi Rom [USA]" in directories
# After exclusion, normal directories should remain
filtered_dirs = handler.exclude_multi_roms(directories)
assert "Super Mario 64 (J) (Rev A)" in filtered_dirs
assert "Test Multi Rom [USA]" in filtered_dirs
def test_rom_fs_structure_consistency(self, handler: FSRomsHandler, config):
"""Test that ROM filesystem structure is consistent across methods"""
fs_slug = "gba"
with pytest.MonkeyPatch.context() as m:
m.setattr("handler.filesystem.roms_handler.cm.get_config", lambda: config)
# Test with Structure B
config.has_structure_path_b = True
structure = handler.get_roms_fs_structure(fs_slug)
assert structure == f"{fs_slug}/roms"
# Test with Structure A
config.has_structure_path_b = False
structure = handler.get_roms_fs_structure(fs_slug)
assert structure == f"roms/{fs_slug}"
def test_actual_file_hash_calculation(self, handler: FSRomsHandler):
"""Test hash calculation with actual files"""
# Create a test file with known content for hash verification
test_content = b"Test ROM content for hashing"
test_file = handler.base_path / "n64/roms/hash_test.n64"
test_file.write_bytes(test_content)
try:
# Calculate expected hashes
import binascii
import hashlib
expected_crc = binascii.crc32(test_content)
expected_md5 = hashlib.md5(test_content, usedforsecurity=False).hexdigest()
expected_sha1 = hashlib.sha1(
test_content, usedforsecurity=False
).hexdigest()
# Test the hash calculation method
crc_result, _, md5_result, _, sha1_result, _ = (
handler._calculate_rom_hashes(
test_file,
0,
hashlib.md5(usedforsecurity=False),
hashlib.sha1(usedforsecurity=False),
)
)
assert crc_result == expected_crc
assert md5_result.hexdigest() == expected_md5
assert sha1_result.hexdigest() == expected_sha1
finally:
# Clean up
if test_file.exists():
test_file.unlink()
async def test_compressed_file_handling(self, handler: FSRomsHandler):
"""Test handling of compressed ROM files"""
# Test with the ZIP file
psx_files = await handler.list_files("psx/roms")
assert "PaRappa the Rapper.zip" in psx_files
# Verify we can stream the compressed file
async with await handler.stream_file("psx/roms/PaRappa the Rapper.zip") as f:
content = await f.read()
assert len(content) > 0
async def test_top_level_files_only_in_main_hash(
self, handler: FSRomsHandler, rom_single_nested
):
"""Test that only top-level files contribute to main ROM hash calculation"""
parsed_rom_files = await handler.get_rom_files(rom_single_nested)
# Verify we have multiple files (base game + translation)
assert len(parsed_rom_files.rom_files) == 2
base_game_rom_file = None
translation_rom_file = None
for rom_file in parsed_rom_files.rom_files:
if rom_file.file_name == "Sonic (EU) [T].n64":
base_game_rom_file = rom_file
elif rom_file.file_name == "Sonic (EU) [T-En].z64":
translation_rom_file = rom_file
assert base_game_rom_file is not None, "Base game file not found"
assert translation_rom_file is not None, "Translation file not found"
# Verify file categories
assert base_game_rom_file.category is None
assert translation_rom_file.category == RomFileCategory.TRANSLATION
# The main ROM hash should be different from the translation file hash
# (this verifies that the translation is not included in the main hash)
assert (
parsed_rom_files.md5_hash == base_game_rom_file.md5_hash
), "Main ROM hash should include base game file"
assert (
parsed_rom_files.md5_hash != translation_rom_file.md5_hash
), "Main ROM hash should not include translation file"
assert (
parsed_rom_files.sha1_hash == base_game_rom_file.sha1_hash
), "Main ROM hash should include base game file"
assert (
parsed_rom_files.sha1_hash != translation_rom_file.sha1_hash
), "Main ROM hash should not include translation file"
@pytest.mark.asyncio
async def test_get_rom_files_with_chd_v5_uses_internal_hash(
self, handler: FSRomsHandler, platform, tmp_path
):
"""Test that a CHD v5 file stores the header SHA1 in chd_sha1_hash.
CHD files are hashed like any other file type (CRC32, MD5, SHA1 from
raw bytes). The embedded disc-data SHA1 from the CHD v5 header is
separately stored in chd_sha1_hash for metadata providers that need it.
"""
# Create a mock CHD v5 file in a temporary directory
chd_file = tmp_path / "test.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
internal_sha1 = "0123456789abcdef0123456789abcdef01234567"
header[84:104] = bytes.fromhex(internal_sha1)
chd_file.write_bytes(
header + b"This is extra file data to ensure file is not empty"
)
# Set up handler and rom object to point to the mock file
roms_path = tmp_path / platform.fs_slug / "roms"
roms_path.mkdir(parents=True)
shutil.copy(chd_file, roms_path / "test.chd")
# Create a new handler instance with temp base path
test_handler = FSRomsHandler()
test_handler.base_path = tmp_path
rom = Rom(
id=1,
fs_name="test.chd",
fs_extension="chd",
fs_path=str(roms_path.relative_to(tmp_path)),
platform=platform,
)
# Run the hashing process
parsed_rom_files = await test_handler.get_rom_files(rom)
# All three raw-file hashes should be populated
assert len(parsed_rom_files.rom_files) == 1
assert parsed_rom_files.crc_hash != "", "CRC should be computed from raw bytes"
assert parsed_rom_files.md5_hash != "", "MD5 should be computed from raw bytes"
assert (
parsed_rom_files.sha1_hash != ""
), "SHA1 should be computed from raw bytes"
# Raw file SHA1 is NOT the header SHA1
assert parsed_rom_files.sha1_hash != internal_sha1
# Header SHA1 stored separately in chd_sha1_hash
assert parsed_rom_files.rom_files[0].chd_sha1_hash == internal_sha1
@staticmethod
def _setup_archive_rom(
tmp_path: Path, platform: Platform, fs_name: str, fs_extension: str, data: bytes
) -> tuple[FSRomsHandler, Rom]:
roms_path = tmp_path / platform.fs_slug / "roms"
roms_path.mkdir(parents=True, exist_ok=True)
(roms_path / fs_name).write_bytes(data)
test_handler = FSRomsHandler()
test_handler.base_path = tmp_path
rom = Rom(
id=1,
fs_name=fs_name,
fs_extension=fs_extension,
fs_path=str(roms_path.relative_to(tmp_path)),
platform=platform,
)
return test_handler, rom
@pytest.mark.asyncio
async def test_get_rom_files_zip_composite_hash_sorted_order(
self, platform: Platform, tmp_path: Path
):
"""Zip member bytes are hashed in ASCII path order regardless of insertion order."""
import hashlib
import io
import zipfile
contents = {
"a.bin": b"AAA content for first file",
"b.bin": b"BBB content for second file",
"c.bin": b"CCC content for third file",
}
reload_zipfile()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
# Insert in reverse to ensure sorting is what governs order
for name in ("c.bin", "b.bin", "a.bin"):
zf.writestr(name, contents[name])
test_handler, rom = self._setup_archive_rom(
tmp_path, platform, "game.zip", "zip", buf.getvalue()
)
parsed = await test_handler.get_rom_files(rom)
concat = b"".join(contents[k] for k in sorted(contents))
assert parsed.md5_hash == hashlib.md5(concat, usedforsecurity=False).hexdigest()
assert (
parsed.sha1_hash == hashlib.sha1(concat, usedforsecurity=False).hexdigest()
)
# Only one RomFile (the archive itself) is surfaced, not one per member.
# Per-member hashes are stored on `archive_members`.
assert len(parsed.rom_files) == 1
archive_rom_file = parsed.rom_files[0]
assert archive_rom_file.file_name == "game.zip"
assert archive_rom_file.md5_hash == parsed.md5_hash
# full_path resolves to a file that actually exists on disk
assert (Path(test_handler.base_path) / archive_rom_file.full_path).is_file()
assert archive_rom_file.archive_members is not None
# ASCII-sorted ordering, and each member has the right size + hashes
assert [m["name"] for m in archive_rom_file.archive_members] == sorted(contents)
for member in archive_rom_file.archive_members:
data = contents[member["name"]]
assert member["size"] == len(data)
assert (
member["md5_hash"]
== hashlib.md5(data, usedforsecurity=False).hexdigest()
)
assert (
member["sha1_hash"]
== hashlib.sha1(data, usedforsecurity=False).hexdigest()
)
@pytest.mark.asyncio
async def test_get_rom_files_zip_ordering_invariant(
self, platform: Platform, tmp_path: Path
):
"""Two zips with the same members in different insertion order hash identically."""
import io
import zipfile
members = [("a.bin", b"AAA"), ("b.bin", b"BBB"), ("c.bin", b"CCC")]
reload_zipfile()
def build_zip(order: list[tuple[str, bytes]]) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
for name, data in order:
zf.writestr(name, data)
return buf.getvalue()
forward = build_zip(members)
reverse = build_zip(list(reversed(members)))
handler_f, rom_f = self._setup_archive_rom(
tmp_path, platform, "forward.zip", "zip", forward
)
handler_r, rom_r = self._setup_archive_rom(
tmp_path, platform, "reverse.zip", "zip", reverse
)
parsed_f = await handler_f.get_rom_files(rom_f)
parsed_r = await handler_r.get_rom_files(rom_r)
assert parsed_f.md5_hash == parsed_r.md5_hash
assert parsed_f.sha1_hash == parsed_r.sha1_hash
assert parsed_f.crc_hash == parsed_r.crc_hash
@pytest.mark.asyncio
async def test_get_rom_files_tar_gz_composite_hash_compound_ext(
self, platform: Platform, tmp_path: Path
):
"""Compound .tar.gz extension routes through the tar reader and composites members."""
import hashlib
import io
import tarfile
members = {"a.bin": b"first member bytes", "b.bin": b"second member bytes"}
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
for name in ("b.bin", "a.bin"): # reverse insertion
info = tarfile.TarInfo(name=name)
info.size = len(members[name])
tf.addfile(info, io.BytesIO(members[name]))
test_handler, rom = self._setup_archive_rom(
tmp_path, platform, "game.tar.gz", "tar.gz", buf.getvalue()
)
parsed = await test_handler.get_rom_files(rom)
concat = b"".join(members[k] for k in sorted(members))
assert parsed.md5_hash == hashlib.md5(concat, usedforsecurity=False).hexdigest()
assert len(parsed.rom_files) == 1
assert parsed.rom_files[0].file_name == "game.tar.gz"
@pytest.mark.asyncio
async def test_get_rom_files_malformed_zip_falls_back_to_raw_bytes(
self, platform: Platform, tmp_path: Path
):
"""A file with .zip extension that is not a valid zip falls back to raw-file hashing."""
import hashlib
junk = b"Definitely not a zip file. Just arbitrary bytes for fallback."
test_handler, rom = self._setup_archive_rom(
tmp_path, platform, "fake.zip", "zip", junk
)
parsed = await test_handler.get_rom_files(rom)
# On a malformed zip, the reader yields nothing and the fallback path
# hashes the archive file itself; read_zip_file's BadZipFile guard
# routes that to raw-byte hashing.
assert parsed.md5_hash == hashlib.md5(junk, usedforsecurity=False).hexdigest()
assert parsed.sha1_hash == hashlib.sha1(junk, usedforsecurity=False).hexdigest()
assert len(parsed.rom_files) == 1
assert parsed.rom_files[0].file_name == "fake.zip"
assert parsed.rom_files[0].archive_members is None
@pytest.mark.asyncio
async def test_get_rom_files_zip_with_only_excluded_entries_falls_back(
self, platform: Platform, tmp_path: Path
):
"""A zip whose entries are all default-excluded hashes the archive's raw bytes."""
import hashlib
import io
import zipfile
reload_zipfile()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("foo.tmp", b"X" * 256) # excluded by extension
zf.writestr(".DS_Store", b"Y" * 8) # excluded by name
zip_bytes = buf.getvalue()
test_handler, rom = self._setup_archive_rom(
tmp_path, platform, "only_excluded.zip", "zip", zip_bytes
)
parsed = await test_handler.get_rom_files(rom)
assert (
parsed.md5_hash == hashlib.md5(zip_bytes, usedforsecurity=False).hexdigest()
)
assert (
parsed.sha1_hash
== hashlib.sha1(zip_bytes, usedforsecurity=False).hexdigest()
)
assert len(parsed.rom_files) == 1
assert parsed.rom_files[0].file_name == "only_excluded.zip"
assert parsed.rom_files[0].archive_members is None
@pytest.mark.asyncio
async def test_get_rom_files_empty_zip_falls_back_to_raw_bytes(
self, platform: Platform, tmp_path: Path
):
"""A zip with zero entries hashes the archive's raw bytes."""
import hashlib
import io
import zipfile
reload_zipfile()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w"):
pass
zip_bytes = buf.getvalue()
test_handler, rom = self._setup_archive_rom(
tmp_path, platform, "empty.zip", "zip", zip_bytes
)
parsed = await test_handler.get_rom_files(rom)
assert (
parsed.md5_hash == hashlib.md5(zip_bytes, usedforsecurity=False).hexdigest()
)
assert (
parsed.sha1_hash
== hashlib.sha1(zip_bytes, usedforsecurity=False).hexdigest()
)
assert len(parsed.rom_files) == 1
assert parsed.rom_files[0].file_name == "empty.zip"
assert parsed.rom_files[0].archive_members is None
@pytest.mark.asyncio
async def test_get_rom_files_with_non_v5_chd_fallback_to_std_hashing(
self, handler: FSRomsHandler, platform, tmp_path
):
"""Test that non-v5 CHD files fall back to standard file hashing.
This ensures backward compatibility: if a .chd file is not version 5
or doesn't have a valid v5 header, it should be treated as a regular
file and all hashes (CRC32, MD5, SHA1) are calculated from content.
"""
# Create a CHD v4 file (should not use internal hash logic)
chd_file = tmp_path / "old_format.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(4).to_bytes(4, "big") # Version 4, not 5
# Add some content
content = header + b"This is CHD v4 data that should be hashed as a normal file"
chd_file.write_bytes(content)
# Set up handler and rom object
roms_path = tmp_path / platform.fs_slug / "roms"
roms_path.mkdir(parents=True)
shutil.copy(chd_file, roms_path / "old_format.chd")
test_handler = FSRomsHandler()
test_handler.base_path = tmp_path
rom = Rom(
id=1,
fs_name="old_format.chd",
fs_extension="chd",
fs_path=str(roms_path.relative_to(tmp_path)),
platform=platform,
)
# Run the hashing process
parsed_rom_files = await test_handler.get_rom_files(rom)
# All hashes should be populated (calculated from file content)
assert len(parsed_rom_files.rom_files) == 1
assert (
parsed_rom_files.crc_hash != ""
), "CRC hash should be calculated for non-v5 CHD"
assert (
parsed_rom_files.md5_hash != ""
), "MD5 hash should be calculated for non-v5 CHD"
assert (
parsed_rom_files.sha1_hash != ""
), "SHA1 hash should be calculated for non-v5 CHD"
# Verify they're actual hash values (not from an internal header)
assert parsed_rom_files.rom_files[0].crc_hash == parsed_rom_files.crc_hash
assert parsed_rom_files.rom_files[0].md5_hash == parsed_rom_files.md5_hash
assert parsed_rom_files.rom_files[0].sha1_hash == parsed_rom_files.sha1_hash
@pytest.mark.asyncio
async def test_get_rom_files_archive_computes_ra_hash_for_cartridge_platform(
self, tmp_path: Path
):
"""RA hash is computed for cartridge-platform archives (buffer hashing supported)."""
import io
import zipfile
from tests._zipfile_shim import reload_zipfile
cartridge_platform = Platform(
name="Game Boy Advance", slug="gba", fs_slug="gba"
)
reload_zipfile()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("game.gba", b"fake GBA ROM content")
test_handler, rom = self._setup_archive_rom(
tmp_path, cartridge_platform, "game.zip", "zip", buf.getvalue()
)
with patch(
"adapters.services.rahasher.RAHasherService.calculate_hash",
return_value="abcdef1234567890abcdef1234567890",
):
parsed = await test_handler.get_rom_files(rom)
assert parsed.ra_hash == "abcdef1234567890abcdef1234567890"
@pytest.mark.asyncio
async def test_get_rom_files_archive_skips_ra_hash_for_disc_platform(
self, tmp_path: Path
):
"""RA hash is not computed for disc-platform archives (buffer hashing unsupported)."""
import io
import zipfile
from tests._zipfile_shim import reload_zipfile
disc_platform = Platform(name="PlayStation", slug="psx", fs_slug="psx")
reload_zipfile()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("game.bin", b"fake PSX disc content")
test_handler, rom = self._setup_archive_rom(
tmp_path, disc_platform, "game.zip", "zip", buf.getvalue()
)
with patch(
"adapters.services.rahasher.RAHasherService.calculate_hash",
return_value="",
) as mock_calculate:
parsed = await test_handler.get_rom_files(rom)
mock_calculate.assert_called_once()
assert parsed.ra_hash == ""
class TestExtractCHDHash:
"""Test suite for extract_chd_hash function"""
def test_extract_chd_hash_v5_valid(self, tmp_path):
"""Test extracting hash from a valid CHD v5 file"""
chd_file = tmp_path / "test_v5.chd"
# CHD v5 header structure (124 bytes minimum):
# Bytes 0-7: "MComprHD" magic signature
# Bytes 12-15: Version (5 in big-endian)
# Bytes 84-103: SHA1 hash (20 bytes)
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# Use a test SHA1 hash
header[84:104] = bytes.fromhex("0123456789abcdef0123456789abcdef01234567")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result
assert isinstance(result, str)
assert len(result) == 40 # SHA1 hex is 40 characters
assert result == "0123456789abcdef0123456789abcdef01234567"
def test_extract_chd_hash_v1_rejected(self, tmp_path):
"""Test that CHD v1 files are rejected"""
chd_file = tmp_path / "test_v1.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(1).to_bytes(4, "big") # Version 1
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_v2_rejected(self, tmp_path):
"""Test that CHD v2 files are rejected"""
chd_file = tmp_path / "test_v2.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(2).to_bytes(4, "big") # Version 2
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_v3_rejected(self, tmp_path):
"""Test that CHD v3 files are rejected"""
chd_file = tmp_path / "test_v3.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(3).to_bytes(4, "big") # Version 3
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_v4_rejected(self, tmp_path):
"""Test that CHD v4 files are rejected"""
chd_file = tmp_path / "test_v4.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(4).to_bytes(4, "big") # Version 4
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_invalid_magic(self, tmp_path):
"""Test that files without CHD magic signature are rejected"""
chd_file = tmp_path / "invalid_magic.bin"
header = bytearray(124)
header[0:8] = b"BadMagic" # Not "MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_truncated_header(self, tmp_path):
"""Test that CHD v5 file with truncated header is rejected"""
chd_file = tmp_path / "truncated.chd"
# Only write 100 bytes instead of required 124
header = bytearray(100)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_nonexistent_file(self, tmp_path):
"""Test that non-existent files are handled gracefully"""
nonexistent = tmp_path / "does_not_exist.chd"
result = extract_chd_hash(nonexistent)
assert result == ""
def test_extract_chd_hash_empty_file(self, tmp_path):
"""Test that empty files are rejected"""
chd_file = tmp_path / "empty.chd"
chd_file.write_bytes(b"")
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_sha1_format(self, tmp_path):
"""Test that SHA1 hash is correctly formatted as hex"""
chd_file = tmp_path / "test_format.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# Use a known SHA1 value
test_sha1 = bytes.fromhex("356a192b7913b04c54574d18c28d46e6395428ab")
header[84:104] = test_sha1
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == "356a192b7913b04c54574d18c28d46e6395428ab"
# Verify it's lowercase hex
assert result == result.lower()
# Verify it's 40 characters (SHA1 is 20 bytes = 40 hex chars)
assert len(result) == 40
def test_extract_chd_hash_unknown_version(self, tmp_path):
"""Test that unknown CHD versions are rejected"""
chd_file = tmp_path / "test_unknown.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(99).to_bytes(4, "big") # Unknown version
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_multiple_different_hashes(self, tmp_path):
"""Test that different SHA1 hashes are correctly extracted"""
test_cases = [
"0000000000000000000000000000000000000000",
"ffffffffffffffffffffffffffffffffffffffff",
"356a192b7913b04c54574d18c28d46e6395428ab",
"da39a3ee5e6b4b0d3255bfef95601890afd80709",
]
for i, test_hash in enumerate(test_cases):
chd_file = tmp_path / f"test_hash_{i}.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
header[84:104] = bytes.fromhex(test_hash)
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == test_hash, f"Hash mismatch for test case {i}"
def test_extract_chd_hash_version_boundary_cases(self, tmp_path):
"""Test version checking at boundaries (0, 1, 4, 5, 6)"""
test_versions = [
(0, ""), # Version 0 should return ""
(1, ""), # Version 1 should return ""
(4, ""), # Version 4 should return ""
(5, "0123456789abcdef0123456789abcdef01234567"), # Version 5 should work
(6, ""), # Version 6 should return ""
]
for version, expected in test_versions:
chd_file = tmp_path / f"test_v{version}.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(version).to_bytes(4, "big")
header[84:104] = bytes.fromhex("0123456789abcdef0123456789abcdef01234567")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == expected, f"Version {version} should return {expected!r}"
def test_extract_chd_hash_file_too_short_for_magic(self, tmp_path):
"""Test file that's too short to even contain magic + version"""
chd_file = tmp_path / "too_short.chd"
# Only 8 bytes - has magic but no version
header = bytearray(8)
header[0:8] = b"MComprHD"
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == ""
def test_extract_chd_hash_permission_error(self, tmp_path):
"""Test graceful handling of permission errors"""
chd_file = tmp_path / "no_read_permission.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
# Remove read permissions
chd_file.chmod(0o000)
try:
result = extract_chd_hash(chd_file)
assert result == ""
finally:
# Restore permissions for cleanup
chd_file.chmod(0o644)
def test_extract_chd_hash_real_header(self, tmp_path):
"""Test extracting hash from real Pebble Beach Golf Links CHD v5 header
This uses the actual 128-byte header from:
Pebble Beach Golf Links - Stadler ni Chousen (Japan).chd
Header bytes (hex):
00000000: 4d43 6f6d 7072 4844 0000 007c 0000 0005 MComprHD...|....
00000010: 6364 6c7a 6364 7a6c 6364 666c 0000 0000 cdlzcdzlcdfl....
00000020: 0000 0000 1a97 4e00 0000 0000 1119 b3d0 ......N.........
00000030: 0000 0000 0000 007c 0000 4c80 0000 0990 .......|..L.....
00000040: 8389 486c 34df 316d 1fd3 3997 a3ef ce8c ..Hl4.1m..9.....
00000050: e9c9 6008 0167 fc76 f9e4 312e 6ab4 8fe9 ..`..g.v..1.j...
00000060: 80d2 ce5b 23f7 75c2 0000 0000 0000 0000 ...[#.u.........
00000070: 0000 0000 0000 0000 0000 0000 4348 5432 ............CHT2
The SHA1 hash (combined raw+meta) at bytes 84-103 is:
0167 fc76 f9e4 312e 6ab4 8fe9 80d2 ce5b 23f7 75c2
"""
chd_file = tmp_path / "Pebble Beach.chd"
# Real 128-byte header from the file
real_header = bytes.fromhex(
"4d43 6f6d 7072 4844 0000 007c 0000 0005 "
"6364 6c7a 6364 7a6c 6364 666c 0000 0000 "
"0000 0000 1a97 4e00 0000 0000 1119 b3d0 "
"0000 0000 0000 007c 0000 4c80 0000 0990 "
"8389 486c 34df 316d 1fd3 3997 a3ef ce8c "
"e9c9 6008 0167 fc76 f9e4 312e 6ab4 8fe9 "
"80d2 ce5b 23f7 75c2 0000 0000 0000 0000 "
"0000 0000 0000 0000 0000 0000 4348 5432"
)
chd_file.write_bytes(real_header)
result = extract_chd_hash(chd_file)
# Expected SHA1 from the header at bytes 84-103 (20 bytes, as per chd.h)
expected_sha1 = "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert result
assert result == expected_sha1
assert len(result) == 40
# Verify it matches what's in the header
assert bytes.fromhex(result) == real_header[84:104]
def test_extract_chd_hash_with_extra_metadata(self, tmp_path):
"""Test CHD v5 file with additional metadata beyond header
Real CHD files often have map data and metadata after the 124-byte header.
The hash extraction should work correctly regardless of file size.
"""
chd_file = tmp_path / "test_with_metadata.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
test_sha1 = bytes.fromhex("0167fc76f9e4312e6ab48fe980d2ce5b23f775c2")
header[84:104] = test_sha1
# Write header plus extra data (simulating map and metadata)
extra_data = b"MAP_DATACOMPRESSED_DATA_GOES_HERE" * 100
chd_file.write_bytes(header + extra_data)
result = extract_chd_hash(chd_file)
assert result
assert result == "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert bytes.fromhex(result) == test_sha1
def test_extract_chd_hash_off_by_one_header_sizes(self, tmp_path):
"""Test boundary conditions around minimum required header size (104 bytes)"""
test_cases = [
(103, ""), # 103 bytes - not enough for SHA1 region
(
104,
"0167fc76f9e4312e6ab48fe980d2ce5b23f775c2",
), # 104 bytes - exactly enough
(123, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # 123 bytes
(124, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # Full v5 header
(125, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # Extra byte
]
for size, expected in test_cases:
chd_file = tmp_path / f"test_size_{size}.chd"
header = bytearray(size)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
if size >= 104:
header[84:104] = bytes.fromhex(
"0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
)
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert (
result == expected
), f"Failed for size {size}: got {result}, expected {expected}"
def test_extract_chd_hash_corrupted_header_data(self, tmp_path):
"""Test handling of corrupted/invalid data in header fields"""
chd_file = tmp_path / "corrupted_header.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
# Corrupt the version field with invalid bytes
header[12:16] = b"\xff\xff\xff\xff" # This will be read as 4294967295
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
# Should return empty string because version is not 5
assert result == ""
def test_extract_chd_hash_zero_sha1(self, tmp_path):
"""Test handling of all-zero SHA1 hash (edge case but valid)"""
chd_file = tmp_path / "zero_hash.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# All-zero hash
header[84:104] = b"\x00" * 20
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result
assert result == "0" * 40
assert len(result) == 40
def test_extract_chd_hash_max_sha1(self, tmp_path):
"""Test handling of maximum SHA1 hash (all 0xFF - edge case but valid)"""
chd_file = tmp_path / "max_hash.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# All-FF hash
header[84:104] = b"\xff" * 20
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result
assert result == "f" * 40
assert len(result) == 40
KNOWN_REGION_NAMES = frozenset(REGIONS_BY_SHORTCODE.values())
KNOWN_LANGUAGE_NAMES = frozenset(LANGUAGES_BY_SHORTCODE.values())
region_code = st.sampled_from(sorted(REGIONS_BY_SHORTCODE))
language_code = st.sampled_from(sorted(LANGUAGES_BY_SHORTCODE))
class TestParseTagsProperties:
"""Property-based tests for FSRomsHandler.parse_tags."""
handler = FSRomsHandler()
@given(st.text())
def test_never_raises_on_arbitrary_input(self, fs_name: str):
self.handler.parse_tags(fs_name)
@given(st.text())
def test_is_deterministic(self, fs_name: str):
assert self.handler.parse_tags(fs_name) == self.handler.parse_tags(fs_name)
@given(st.lists(region_code), st.lists(language_code))
def test_known_codes_map_to_known_names(self, regions, languages):
fs_name = "Game"
for code in regions:
fs_name += f"({code})"
for code in languages:
fs_name += f"({code})"
fs_name += ".rom"
parsed = self.handler.parse_tags(fs_name)
assert set(parsed.regions) <= KNOWN_REGION_NAMES
assert set(parsed.languages) <= KNOWN_LANGUAGE_NAMES
# Every supplied code resolves to its mapped full name.
assert {REGIONS_BY_SHORTCODE[c] for c in regions} <= set(parsed.regions)
assert {LANGUAGES_BY_SHORTCODE[c] for c in languages} <= set(parsed.languages)
def _chd_header_bytes(version: int, sha1: bytes) -> bytes:
"""Build a 124-byte CHD header with the given version and combined SHA1."""
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = version.to_bytes(4, "big")
header[84:104] = sha1
return bytes(header)
def _run_extract(data: bytes) -> str:
fd, path = tempfile.mkstemp(suffix=".chd")
try:
with os.fdopen(fd, "wb") as f:
f.write(data)
return extract_chd_hash(Path(path))
finally:
os.unlink(path)
class TestExtractCHDHashProperties:
"""Property-based tests for extract_chd_hash."""
@given(sha1=st.binary(min_size=20, max_size=20))
def test_valid_v5_returns_embedded_sha1(self, sha1: bytes):
result = _run_extract(_chd_header_bytes(5, sha1))
assert result == sha1.hex()
assert len(result) == 40
@given(
version=st.integers(min_value=0, max_value=2**32 - 1),
sha1=st.binary(min_size=20, max_size=20),
)
def test_non_v5_version_returns_empty(self, version: int, sha1: bytes):
assume(version != 5)
assert _run_extract(_chd_header_bytes(version, sha1)) == ""
@given(data=st.binary(min_size=16))
def test_wrong_signature_returns_empty(self, data: bytes):
assume(data[:8] != b"MComprHD")
assert _run_extract(data) == ""
@given(data=st.binary(max_size=15))
def test_too_short_for_signature_returns_empty(self, data: bytes):
assert _run_extract(data) == ""
@given(
sha1=st.binary(min_size=20, max_size=20),
truncate_len=st.integers(min_value=16, max_value=103),
)
def test_truncated_before_sha1_returns_empty(self, sha1: bytes, truncate_len: int):
truncated = _chd_header_bytes(5, sha1)[:truncate_len]
assert _run_extract(truncated) == ""
@given(data=st.binary())
def test_never_raises_on_arbitrary_bytes(self, data: bytes):
_run_extract(data)