This commit is contained in:
Georges-Antoine Assi
2026-05-18 07:40:43 -04:00
parent 01f0b1d2b5
commit e6d4ede939
4 changed files with 72 additions and 56 deletions

View File

@@ -65,6 +65,7 @@ CHD_VERSION_LENGTH: Final = 4 # Version is a uint32
CHD_V5_SHA1_OFFSET: Final = 84 # Combined raw+meta SHA1 offset in v5
CHD_V5_SHA1_LENGTH: Final = 20 # SHA1 is 20 bytes
CHD_V5_VERSION: Final = 5 # CHD v5 identifier
CHD_MIME_TYPE: Final = "application/x-mame-chd"
NON_HASHABLE_PLATFORMS = frozenset(
(
@@ -98,6 +99,7 @@ NON_HASHABLE_PLATFORMS = frozenset(
)
FILE_READ_CHUNK_SIZE = 1024 * 8
_MIME_DETECTOR = magic.Magic(mime=True)
class FSRom(TypedDict):
@@ -119,8 +121,10 @@ class FileHash(TypedDict):
def is_compressed_file(file_path: str) -> bool:
mime = magic.Magic(mime=True)
file_type = mime.from_file(file_path)
try:
file_type = _MIME_DETECTOR.from_file(file_path)
except magic.MagicException:
file_type = ""
return file_type in COMPRESSED_MIME_TYPES or file_path.lower().endswith(
tuple(COMPRESSED_FILE_EXTENSIONS)
@@ -191,7 +195,18 @@ def read_bz2_file(file_path: Path) -> Iterator[bytes]:
yield chunk
def extract_chd_hash(file_path: Path) -> str | None:
def is_chd_file(file_path: Path) -> bool:
"""Return True if the file is a CHD by extension or libmagic-detected MIME type."""
if file_path.suffix.lower() == ".chd":
return True
try:
return _MIME_DETECTOR.from_file(file_path) == CHD_MIME_TYPE
except (OSError, magic.MagicException):
return False
def extract_chd_hash(file_path: Path) -> str:
"""
Extract the embedded SHA1 hash from a CHD (Compressed Hunks of Data) v5 file header.
@@ -217,7 +232,7 @@ def extract_chd_hash(file_path: Path) -> str | None:
file_path: Path to the CHD file
Returns:
SHA1 hash as hex string, or None if file is not a valid CHD v5 file or parsing fails
SHA1 hash as hex string if valid
"""
try:
with open(file_path, "rb") as f:
@@ -229,7 +244,7 @@ def extract_chd_hash(file_path: Path) -> str | None:
len(header) < CHD_MIN_HEADER_LENGTH
or header[:CHD_SIGNATURE_LENGTH] != CHD_SIGNATURE
):
return None
return ""
# Extract and verify version (big-endian uint32)
version_end = CHD_VERSION_OFFSET + CHD_VERSION_LENGTH
@@ -237,16 +252,16 @@ def extract_chd_hash(file_path: Path) -> str | None:
# Only support v5 CHD files
if version != CHD_V5_VERSION:
return None
return ""
# Extract combined raw+meta SHA1 from v5 header
sha1_end = CHD_V5_SHA1_OFFSET + CHD_V5_SHA1_LENGTH
if len(header) < sha1_end:
return None
return ""
sha1_bytes = header[CHD_V5_SHA1_OFFSET:sha1_end]
return sha1_bytes.hex()
except OSError:
return None
return ""
def category_matches(category: str, path_parts: list[str]):
@@ -388,7 +403,7 @@ class FSRomsHandler(FSHandler):
crc_hash=file_hash["crc_hash"],
md5_hash=file_hash["md5_hash"],
sha1_hash=file_hash["sha1_hash"],
chd_sha1_hash=file_hash["chd_sha1_hash"] or None,
chd_sha1_hash=file_hash["chd_sha1_hash"],
)
async def get_rom_files(
@@ -417,29 +432,28 @@ class FSRomsHandler(FSHandler):
rom_sha1_h = hashlib.sha1(usedforsecurity=False) if calculate_hashes else None
rom_ra_h = ""
rom_dir = Path(abs_fs_path, rom.fs_name)
# Check if rom is a multi-part rom
if await AnyioPath(f"{abs_fs_path}/{rom.fs_name}").is_dir():
# Calculate the RA hash if the platform has a slug that matches a known RA slug
if calculate_hashes:
ra_platform = meta_ra_handler.get_platform(rom.platform_slug)
if ra_platform and ra_platform["ra_id"]:
rom_dir = Path(abs_fs_path, rom.fs_name)
# RAHasher can't process CHD files via the /* wildcard — it expects
# RAHasher can't process CHD files via the /* wildcard and instead expects
# track files (bin/cue/etc.). For CHD-only folders, find the largest
# CHD and pass it directly, matching single-file CHD behaviour.
def _list_chd_by_size() -> list[Path]:
chds = [
f for f in rom_dir.iterdir() if f.suffix.lower() == ".chd"
]
return sorted(
def _largest_chd_file() -> Path | None:
chds = [f for f in rom_dir.iterdir() if is_chd_file(f)]
sorted_chds = sorted(
chds, key=lambda f: f.stat().st_size, reverse=True
)
return sorted_chds[0] if sorted_chds else None
chd_files = await asyncio.to_thread(_list_chd_by_size)
chd_file = await asyncio.to_thread(_largest_chd_file)
ra_path = (
str(chd_files[0])
if chd_files
str(chd_file)
if chd_file and chd_file.is_file()
else f"{abs_fs_path}/{rom.fs_name}/*"
)
rom_ra_h = await RAHasherService().calculate_hash(
@@ -494,9 +508,6 @@ class FSRomsHandler(FSHandler):
md5_h = hashlib.md5(usedforsecurity=False)
sha1_h = hashlib.sha1(usedforsecurity=False)
chd_sha1 = ""
if Path(file_name).suffix.lower() == ".chd":
chd_sha1 = extract_chd_hash(Path(f_path, file_name)) or ""
file_hash = FileHash(
crc_hash=crc32_to_hex(crc_c) if crc_c != DEFAULT_CRC_C else "",
md5_hash=(
@@ -509,7 +520,11 @@ class FSRomsHandler(FSHandler):
if sha1_h.digest() != DEFAULT_SHA1_H_DIGEST
else ""
),
chd_sha1_hash=chd_sha1,
chd_sha1_hash=(
extract_chd_hash(rom_dir)
if is_chd_file(Path(f_path, file_name))
else ""
),
)
else:
file_hash = FileHash(
@@ -552,9 +567,6 @@ class FSRomsHandler(FSHandler):
f"{abs_fs_path}/{rom.fs_name}",
)
chd_sha1 = ""
if Path(rom.fs_name).suffix.lower() == ".chd":
chd_sha1 = extract_chd_hash(Path(abs_fs_path, rom.fs_name)) or ""
file_hash = FileHash(
crc_hash=crc32_to_hex(crc_c) if crc_c != DEFAULT_CRC_C else "",
md5_hash=(
@@ -565,7 +577,11 @@ class FSRomsHandler(FSHandler):
if sha1_h.digest() != DEFAULT_SHA1_H_DIGEST
else ""
),
chd_sha1_hash=chd_sha1,
chd_sha1_hash=(
extract_chd_hash(rom_dir)
if is_chd_file(Path(abs_fs_path, rom.fs_name))
else ""
),
)
rom_files.append(
self._build_rom_file(
@@ -617,7 +633,10 @@ class FSRomsHandler(FSHandler):
extension = Path(file_path).suffix.lower()
mime = magic.Magic(mime=True)
try:
file_type = mime.from_file(file_path)
try:
file_type = mime.from_file(file_path)
except magic.MagicException:
file_type = ""
crc_c = 0
md5_h = hashlib.md5(usedforsecurity=False)

View File

@@ -258,7 +258,7 @@ class TestFSRomsHandler:
"crc_hash": "ABCD1234",
"md5_hash": "def456",
"sha1_hash": "789ghi",
"chd_sha1_hash": "",
"chd_sha1_hash": "654321",
}
)
@@ -284,7 +284,7 @@ class TestFSRomsHandler:
"crc_hash": "12345678",
"md5_hash": "abcdef",
"sha1_hash": "123456",
"chd_sha1_hash": "",
"chd_sha1_hash": "654321",
}
)
@@ -842,7 +842,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is not None
assert result
assert isinstance(result, str)
assert len(result) == 40 # SHA1 hex is 40 characters
assert result == "0123456789abcdef0123456789abcdef01234567"
@@ -859,7 +859,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_v2_rejected(self, tmp_path):
"""Test that CHD v2 files are rejected"""
@@ -873,7 +873,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_v3_rejected(self, tmp_path):
"""Test that CHD v3 files are rejected"""
@@ -887,7 +887,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_v4_rejected(self, tmp_path):
"""Test that CHD v4 files are rejected"""
@@ -901,7 +901,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_invalid_magic(self, tmp_path):
"""Test that files without CHD magic signature are rejected"""
@@ -915,7 +915,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_truncated_header(self, tmp_path):
"""Test that CHD v5 file with truncated header is rejected"""
@@ -930,7 +930,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_nonexistent_file(self, tmp_path):
"""Test that non-existent files are handled gracefully"""
@@ -938,7 +938,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(nonexistent)
assert result is None
assert result == ""
def test_extract_chd_hash_empty_file(self, tmp_path):
"""Test that empty files are rejected"""
@@ -947,7 +947,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_sha1_format(self, tmp_path):
"""Test that SHA1 hash is correctly formatted as hex"""
@@ -983,7 +983,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_multiple_different_hashes(self, tmp_path):
"""Test that different SHA1 hashes are correctly extracted"""
@@ -1011,11 +1011,11 @@ class TestExtractCHDHash:
def test_extract_chd_hash_version_boundary_cases(self, tmp_path):
"""Test version checking at boundaries (0, 1, 4, 5, 6)"""
test_versions = [
(0, None), # Version 0 should return None
(1, None), # Version 1 should return None
(4, None), # Version 4 should return None
(0, ""), # Version 0 should return ""
(1, ""), # Version 1 should return ""
(4, ""), # Version 4 should return ""
(5, "0123456789abcdef0123456789abcdef01234567"), # Version 5 should work
(6, None), # Version 6 should return None
(6, ""), # Version 6 should return ""
]
for version, expected in test_versions:
@@ -1030,10 +1030,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
if expected is None:
assert result is None, f"Version {version} should return None"
else:
assert result == expected, f"Version {version} should return {expected}"
assert result == expected, f"Version {version} should return {expected!r}"
def test_extract_chd_hash_file_too_short_for_magic(self, tmp_path):
"""Test file that's too short to even contain magic + version"""
@@ -1047,7 +1044,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
def test_extract_chd_hash_permission_error(self, tmp_path):
"""Test graceful handling of permission errors"""
@@ -1064,7 +1061,7 @@ class TestExtractCHDHash:
try:
result = extract_chd_hash(chd_file)
assert result is None
assert result == ""
finally:
# Restore permissions for cleanup
chd_file.chmod(0o644)
@@ -1109,7 +1106,7 @@ class TestExtractCHDHash:
# Expected SHA1 from the header at bytes 84-103 (20 bytes, as per chd.h)
expected_sha1 = "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert result is not None
assert result
assert result == expected_sha1
assert len(result) == 40
# Verify it matches what's in the header
@@ -1136,14 +1133,14 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
assert result is not None
assert result
assert result == "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert bytes.fromhex(result) == test_sha1
def test_extract_chd_hash_off_by_one_header_sizes(self, tmp_path):
"""Test boundary conditions around minimum required header size (104 bytes)"""
test_cases = [
(103, None), # 103 bytes - not enough for SHA1 region
(103, ""), # 103 bytes - not enough for SHA1 region
(
104,
"0167fc76f9e4312e6ab48fe980d2ce5b23f775c2",
@@ -1186,7 +1183,7 @@ class TestExtractCHDHash:
result = extract_chd_hash(chd_file)
# Should return None because version is not 5
assert result is None
assert result == ""
def test_extract_chd_hash_zero_sha1(self, tmp_path):
"""Test handling of all-zero SHA1 hash (edge case but valid)"""

View File

@@ -20,7 +20,7 @@ const romInfo = ref([
{ label: "Size", value: formatBytes(props.rom.fs_size_bytes) },
{ label: "SHA-1", value: props.rom.sha1_hash },
{
label: "Disc SHA-1",
label: "CHD SHA-1",
value: props.rom.has_simple_single_file
? (props.rom.files[0]?.chd_sha1_hash ?? null)
: null,

View File

@@ -21,7 +21,7 @@ const fileInfo = ref([
: null,
},
{
label: "Disc SHA-1",
label: "CHD SHA-1",
value: props.item.chd_sha1_hash
? props.item.chd_sha1_hash.substring(0, 6) +
"..." +