Merge pull request #2678 from sftwninja/fix/use-chd-v5-internal-hash

fix: Use internal SHA1 hash if CHD file is v5
This commit is contained in:
Georges-Antoine Assi
2025-11-17 10:38:48 -05:00
committed by GitHub
2 changed files with 647 additions and 2 deletions

View File

@@ -9,7 +9,7 @@ import zipfile
import zlib
from collections.abc import Callable, Iterator
from pathlib import Path
from typing import IO, Any, Final, Literal, TypedDict
from typing import IO, Any, Final, Literal, TypedDict, cast
import magic
import zipfile_inflate64 # trunk-ignore(ruff/F401): Patches zipfile to support Enhanced Deflate
@@ -58,6 +58,18 @@ COMPRESSED_FILE_EXTENSIONS = frozenset(
)
)
# CHD (Compressed Hunks of Data) v5 format constants
# See: https://github.com/mamedev/mame/blob/master/src/lib/util/chd.h
CHD_SIGNATURE: Final = b"MComprHD"
CHD_SIGNATURE_LENGTH: Final = 8
CHD_MIN_HEADER_LENGTH: Final = 16 # Minimum to read signature and version
CHD_V5_HEADER_LENGTH: Final = 124 # Total v5 header size
CHD_VERSION_OFFSET: Final = 12 # Bytes offset for version field
CHD_VERSION_LENGTH: Final = 4 # Version is a uint32
CHD_V5_SHA1_OFFSET: Final = 84 # Combined raw+meta SHA1 offset in v5
CHD_V5_SHA1_LENGTH: Final = 20 # SHA1 is 20 bytes
CHD_V5_VERSION: Final = 5 # CHD v5 identifier
NON_HASHABLE_PLATFORMS = frozenset(
(
UPS.AMAZON_ALEXA,
@@ -182,6 +194,92 @@ def read_bz2_file(file_path: Path) -> Iterator[bytes]:
yield chunk
def extract_chd_hash(file_path: Path) -> str | None:
"""
Extract the embedded SHA1 hash from a CHD (Compressed Hunks of Data) v5 file header.
Only CHD v5 files are supported, matching MAMERedump's database.
CHD v5 files store the combined raw+meta SHA1 hash in the header.
This hash is what ROM databases use for CHD identification, since it includes
metadata like CD track layouts which are essential for proper disc image
identification.
For reference, check out "chd.h" in the MAME source tree.
---------------------------------- Why? ----------------------------------
CHDMAN does not produce nor guarantee stable, byte-for-byte identical
outputs for a given disc image. (Including HD images.)
For this reason, the CHD format embeds the original source data hash in
its header, allowing different CHD files to be verified as equivalent
even when their compressed representations differ.
--------------------------------------------------------------------------
Args:
file_path: Path to the CHD file
Returns:
SHA1 hash as hex string, or None if file is not a valid CHD v5 file or parsing fails
"""
try:
with open(file_path, "rb") as f:
# Read the v5 header and extract the embedded SHA1
header = f.read(CHD_V5_HEADER_LENGTH)
# Check for "MComprHD" signature
if (
len(header) < CHD_MIN_HEADER_LENGTH
or header[:CHD_SIGNATURE_LENGTH] != CHD_SIGNATURE
):
return None
# Extract and verify version (big-endian uint32)
version_end = CHD_VERSION_OFFSET + CHD_VERSION_LENGTH
version = int.from_bytes(header[CHD_VERSION_OFFSET:version_end], "big")
# Only support v5 CHD files
if version != CHD_V5_VERSION:
return None
# Extract combined raw+meta SHA1 from v5 header
sha1_end = CHD_V5_SHA1_OFFSET + CHD_V5_SHA1_LENGTH
if len(header) < sha1_end:
return None
sha1_bytes = header[CHD_V5_SHA1_OFFSET:sha1_end]
return sha1_bytes.hex()
except OSError:
return None
class CHDHashWrapper:
"""
Wrapper class that mimics hashlib hash objects but returns a pre-computed hash.
This class provides a hashlib-compatible interface for pre-computed hashes
extracted from CHD v5 file headers. It implements the same methods and attributes
as hashlib hash objects (digest(), hexdigest(), update(), and name).
"""
def __init__(self, hash_hex: str, name: str):
self.hash_hex = hash_hex
self.name = name
# Store the digest as bytes
self._digest = bytes.fromhex(hash_hex)
def hexdigest(self) -> str:
"""Return the hash as a hexadecimal string."""
return self.hash_hex
def digest(self) -> bytes:
"""Return the hash as bytes."""
return self._digest
def update(self, data: bytes | bytearray) -> None:
"""No-op update method for compatibility with hashlib interface."""
pass
def category_matches(category: str, path_parts: list[str]):
return category in path_parts or f"{category}s" in path_parts
@@ -504,6 +602,17 @@ class FSRomsHandler(FSHandler):
for chunk in read_bz2_file(file_path):
update_hashes(chunk)
elif extension == ".chd" or file_type == "application/x-mame-chd":
chd_hash = extract_chd_hash(file_path)
if chd_hash:
sha1_h = cast(Any, CHDHashWrapper(chd_hash, name="sha1"))
rom_sha1_h = cast(Any, CHDHashWrapper(chd_hash, name="sha1"))
else:
# Not a valid v5 CHD, treat as basic file
# This ensures CRC32 and MD5 are still calculated for non-v5 CHDs
for chunk in read_basic_file(file_path):
update_hashes(chunk)
else:
for chunk in read_basic_file(file_path):
update_hashes(chunk)

View File

@@ -1,11 +1,17 @@
import os
import shutil
from pathlib import Path
from unittest.mock import Mock
import pytest
from config.config_manager import LIBRARY_BASE_PATH, Config
from handler.filesystem.roms_handler import FileHash, FSRomsHandler
from handler.filesystem.roms_handler import (
CHDHashWrapper,
FileHash,
FSRomsHandler,
extract_chd_hash,
)
from models.platform import Platform
from models.rom import Rom, RomFile, RomFileCategory
@@ -623,3 +629,533 @@ class TestFSRomsHandler:
assert (
rom_sha1 != translation_rom_file.sha1_hash
), "Main ROM hash should not include translation file"
@pytest.mark.asyncio
async def test_get_rom_files_with_chd_v5_uses_internal_hash(
self, handler: FSRomsHandler, platform, tmp_path
):
"""Test that a CHD v5 file uses its internal hash and skips other hashing.
This integration test verifies the complete CHD v5 hashing logic:
1. For valid CHD v5 files, the embedded SHA1 hash from the file header is used
2. CRC32 and MD5 hashes are NOT calculated from file contents
3. The file is not double-processed by read_basic_file
4. This prevents regressions in the if/elif archive type chain
"""
# Create a mock CHD v5 file in a temporary directory
chd_file = tmp_path / "test.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
internal_sha1 = "0123456789abcdef0123456789abcdef01234567"
header[84:104] = bytes.fromhex(internal_sha1)
chd_file.write_bytes(
header + b"This is extra file data to ensure file is not empty"
)
# Set up handler and rom object to point to the mock file
roms_path = tmp_path / platform.fs_slug / "roms"
roms_path.mkdir(parents=True)
shutil.copy(chd_file, roms_path / "test.chd")
# Create a new handler instance with temp base path
test_handler = FSRomsHandler()
test_handler.base_path = tmp_path
rom = Rom(
id=1,
fs_name="test.chd",
fs_path=str(roms_path.relative_to(tmp_path)),
platform=platform,
)
# Run the hashing process
rom_files, crc_hash, md5_hash, sha1_hash, _ = await test_handler.get_rom_files(
rom
)
# Assert that only SHA1 is populated, and it's from the header
assert len(rom_files) == 1
assert sha1_hash == internal_sha1, "SHA1 should be from CHD v5 header"
assert rom_files[0].sha1_hash == internal_sha1
# CRC32 and MD5 should be empty/zero (not calculated)
assert crc_hash == "", f"CRC hash should be empty, got: {crc_hash}"
assert md5_hash == "", f"MD5 hash should be empty, got: {md5_hash}"
assert rom_files[0].crc_hash == ""
assert rom_files[0].md5_hash == ""
@pytest.mark.asyncio
async def test_get_rom_files_with_non_v5_chd_fallback_to_std_hashing(
self, handler: FSRomsHandler, platform, tmp_path
):
"""Test that non-v5 CHD files fall back to standard file hashing.
This ensures backward compatibility: if a .chd file is not version 5
or doesn't have a valid v5 header, it should be treated as a regular
file and all hashes (CRC32, MD5, SHA1) are calculated from content.
"""
# Create a CHD v4 file (should not use internal hash logic)
chd_file = tmp_path / "old_format.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(4).to_bytes(4, "big") # Version 4, not 5
# Add some content
content = header + b"This is CHD v4 data that should be hashed as a normal file"
chd_file.write_bytes(content)
# Set up handler and rom object
roms_path = tmp_path / platform.fs_slug / "roms"
roms_path.mkdir(parents=True)
shutil.copy(chd_file, roms_path / "old_format.chd")
test_handler = FSRomsHandler()
test_handler.base_path = tmp_path
rom = Rom(
id=1,
fs_name="old_format.chd",
fs_path=str(roms_path.relative_to(tmp_path)),
platform=platform,
)
# Run the hashing process
rom_files, crc_hash, md5_hash, sha1_hash, _ = await test_handler.get_rom_files(
rom
)
# All hashes should be populated (calculated from file content)
assert len(rom_files) == 1
assert crc_hash != "", "CRC hash should be calculated for non-v5 CHD"
assert md5_hash != "", "MD5 hash should be calculated for non-v5 CHD"
assert sha1_hash != "", "SHA1 hash should be calculated for non-v5 CHD"
# Verify they're actual hash values (not from an internal header)
assert rom_files[0].crc_hash == crc_hash
assert rom_files[0].md5_hash == md5_hash
assert rom_files[0].sha1_hash == sha1_hash
class TestExtractCHDHash:
"""Test suite for extract_chd_hash function"""
def test_extract_chd_hash_v5_valid(self, tmp_path):
"""Test extracting hash from a valid CHD v5 file"""
chd_file = tmp_path / "test_v5.chd"
# CHD v5 header structure (124 bytes minimum):
# Bytes 0-7: "MComprHD" magic signature
# Bytes 12-15: Version (5 in big-endian)
# Bytes 84-103: SHA1 hash (20 bytes)
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# Use a test SHA1 hash
header[84:104] = bytes.fromhex("0123456789abcdef0123456789abcdef01234567")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is not None
assert isinstance(result, str)
assert len(result) == 40 # SHA1 hex is 40 characters
assert result == "0123456789abcdef0123456789abcdef01234567"
def test_extract_chd_hash_v1_rejected(self, tmp_path):
"""Test that CHD v1 files are rejected"""
chd_file = tmp_path / "test_v1.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(1).to_bytes(4, "big") # Version 1
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_v2_rejected(self, tmp_path):
"""Test that CHD v2 files are rejected"""
chd_file = tmp_path / "test_v2.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(2).to_bytes(4, "big") # Version 2
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_v3_rejected(self, tmp_path):
"""Test that CHD v3 files are rejected"""
chd_file = tmp_path / "test_v3.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(3).to_bytes(4, "big") # Version 3
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_v4_rejected(self, tmp_path):
"""Test that CHD v4 files are rejected"""
chd_file = tmp_path / "test_v4.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(4).to_bytes(4, "big") # Version 4
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_invalid_magic(self, tmp_path):
"""Test that files without CHD magic signature are rejected"""
chd_file = tmp_path / "invalid_magic.bin"
header = bytearray(124)
header[0:8] = b"BadMagic" # Not "MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_truncated_header(self, tmp_path):
"""Test that CHD v5 file with truncated header is rejected"""
chd_file = tmp_path / "truncated.chd"
# Only write 100 bytes instead of required 124
header = bytearray(100)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_nonexistent_file(self, tmp_path):
"""Test that non-existent files are handled gracefully"""
nonexistent = tmp_path / "does_not_exist.chd"
result = extract_chd_hash(nonexistent)
assert result is None
def test_extract_chd_hash_empty_file(self, tmp_path):
"""Test that empty files are rejected"""
chd_file = tmp_path / "empty.chd"
chd_file.write_bytes(b"")
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_sha1_format(self, tmp_path):
"""Test that SHA1 hash is correctly formatted as hex"""
chd_file = tmp_path / "test_format.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# Use a known SHA1 value
test_sha1 = bytes.fromhex("356a192b7913b04c54574d18c28d46e6395428ab")
header[84:104] = test_sha1
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == "356a192b7913b04c54574d18c28d46e6395428ab"
# Verify it's lowercase hex
assert result == result.lower()
# Verify it's 40 characters (SHA1 is 20 bytes = 40 hex chars)
assert len(result) == 40
def test_extract_chd_hash_with_wrapper(self, tmp_path):
"""Test that extracted hash integrates properly with CHDHashWrapper"""
chd_file = tmp_path / "test_wrapper.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
test_sha1 = bytes.fromhex("0123456789abcdef0123456789abcdef01234567")
header[84:104] = test_sha1
chd_file.write_bytes(header)
extracted_hash = extract_chd_hash(chd_file)
assert extracted_hash is not None
# Should be usable with CHDHashWrapper
wrapper = CHDHashWrapper(extracted_hash, "sha1")
assert wrapper.hexdigest() == extracted_hash
assert len(wrapper.digest()) == 20
# Verify digest bytes match the original
assert wrapper.digest() == test_sha1
def test_extract_chd_hash_unknown_version(self, tmp_path):
"""Test that unknown CHD versions are rejected"""
chd_file = tmp_path / "test_unknown.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(99).to_bytes(4, "big") # Unknown version
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_multiple_different_hashes(self, tmp_path):
"""Test that different SHA1 hashes are correctly extracted"""
test_cases = [
"0000000000000000000000000000000000000000",
"ffffffffffffffffffffffffffffffffffffffff",
"356a192b7913b04c54574d18c28d46e6395428ab",
"da39a3ee5e6b4b0d3255bfef95601890afd80709",
]
for i, test_hash in enumerate(test_cases):
chd_file = tmp_path / f"test_hash_{i}.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
header[84:104] = bytes.fromhex(test_hash)
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == test_hash, f"Hash mismatch for test case {i}"
def test_extract_chd_hash_version_boundary_cases(self, tmp_path):
"""Test version checking at boundaries (0, 1, 4, 5, 6)"""
test_versions = [
(0, None), # Version 0 should return None
(1, None), # Version 1 should return None
(4, None), # Version 4 should return None
(5, "0123456789abcdef0123456789abcdef01234567"), # Version 5 should work
(6, None), # Version 6 should return None
]
for version, expected in test_versions:
chd_file = tmp_path / f"test_v{version}.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(version).to_bytes(4, "big")
header[84:104] = bytes.fromhex("0123456789abcdef0123456789abcdef01234567")
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
if expected is None:
assert result is None, f"Version {version} should return None"
else:
assert result == expected, f"Version {version} should return {expected}"
def test_extract_chd_hash_file_too_short_for_magic(self, tmp_path):
"""Test file that's too short to even contain magic + version"""
chd_file = tmp_path / "too_short.chd"
# Only 8 bytes - has magic but no version
header = bytearray(8)
header[0:8] = b"MComprHD"
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result is None
def test_extract_chd_hash_permission_error(self, tmp_path):
"""Test graceful handling of permission errors"""
chd_file = tmp_path / "no_read_permission.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
chd_file.write_bytes(header)
# Remove read permissions
chd_file.chmod(0o000)
try:
result = extract_chd_hash(chd_file)
assert result is None
finally:
# Restore permissions for cleanup
chd_file.chmod(0o644)
def test_extract_chd_hash_real_header(self, tmp_path):
"""Test extracting hash from real Pebble Beach Golf Links CHD v5 header
This uses the actual 128-byte header from:
Pebble Beach Golf Links - Stadler ni Chousen (Japan).chd
Header bytes (hex):
00000000: 4d43 6f6d 7072 4844 0000 007c 0000 0005 MComprHD...|....
00000010: 6364 6c7a 6364 7a6c 6364 666c 0000 0000 cdlzcdzlcdfl....
00000020: 0000 0000 1a97 4e00 0000 0000 1119 b3d0 ......N.........
00000030: 0000 0000 0000 007c 0000 4c80 0000 0990 .......|..L.....
00000040: 8389 486c 34df 316d 1fd3 3997 a3ef ce8c ..Hl4.1m..9.....
00000050: e9c9 6008 0167 fc76 f9e4 312e 6ab4 8fe9 ..`..g.v..1.j...
00000060: 80d2 ce5b 23f7 75c2 0000 0000 0000 0000 ...[#.u.........
00000070: 0000 0000 0000 0000 0000 0000 4348 5432 ............CHT2
The SHA1 hash (combined raw+meta) at bytes 84-103 is:
0167 fc76 f9e4 312e 6ab4 8fe9 80d2 ce5b 23f7 75c2
"""
chd_file = tmp_path / "Pebble Beach.chd"
# Real 128-byte header from the file
real_header = bytes.fromhex(
"4d43 6f6d 7072 4844 0000 007c 0000 0005 "
"6364 6c7a 6364 7a6c 6364 666c 0000 0000 "
"0000 0000 1a97 4e00 0000 0000 1119 b3d0 "
"0000 0000 0000 007c 0000 4c80 0000 0990 "
"8389 486c 34df 316d 1fd3 3997 a3ef ce8c "
"e9c9 6008 0167 fc76 f9e4 312e 6ab4 8fe9 "
"80d2 ce5b 23f7 75c2 0000 0000 0000 0000 "
"0000 0000 0000 0000 0000 0000 4348 5432"
)
chd_file.write_bytes(real_header)
result = extract_chd_hash(chd_file)
# Expected SHA1 from the header at bytes 84-103 (20 bytes, as per chd.h)
expected_sha1 = "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert result is not None
assert result == expected_sha1
assert len(result) == 40
# Verify it matches what's in the header
assert bytes.fromhex(result) == real_header[84:104]
def test_extract_chd_hash_with_extra_metadata(self, tmp_path):
"""Test CHD v5 file with additional metadata beyond header
Real CHD files often have map data and metadata after the 124-byte header.
The hash extraction should work correctly regardless of file size.
"""
chd_file = tmp_path / "test_with_metadata.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
test_sha1 = bytes.fromhex("0167fc76f9e4312e6ab48fe980d2ce5b23f775c2")
header[84:104] = test_sha1
# Write header plus extra data (simulating map and metadata)
extra_data = b"MAP_DATACOMPRESSED_DATA_GOES_HERE" * 100
chd_file.write_bytes(header + extra_data)
result = extract_chd_hash(chd_file)
assert result is not None
assert result == "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
assert bytes.fromhex(result) == test_sha1
def test_extract_chd_hash_off_by_one_header_sizes(self, tmp_path):
"""Test boundary conditions around minimum required header size (104 bytes)"""
test_cases = [
(103, None), # 103 bytes - not enough for SHA1 region
(
104,
"0167fc76f9e4312e6ab48fe980d2ce5b23f775c2",
), # 104 bytes - exactly enough
(123, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # 123 bytes
(124, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # Full v5 header
(125, "0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"), # Extra byte
]
for size, expected in test_cases:
chd_file = tmp_path / f"test_size_{size}.chd"
header = bytearray(size)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
if size >= 104:
header[84:104] = bytes.fromhex(
"0167fc76f9e4312e6ab48fe980d2ce5b23f775c2"
)
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert (
result == expected
), f"Failed for size {size}: got {result}, expected {expected}"
def test_extract_chd_hash_corrupted_header_data(self, tmp_path):
"""Test handling of corrupted/invalid data in header fields"""
chd_file = tmp_path / "corrupted_header.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
# Corrupt the version field with invalid bytes
header[12:16] = b"\xff\xff\xff\xff" # This will be read as 4294967295
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
# Should return None because version is not 5
assert result is None
def test_extract_chd_hash_zero_sha1(self, tmp_path):
"""Test handling of all-zero SHA1 hash (edge case but valid)"""
chd_file = tmp_path / "zero_hash.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# All-zero hash
header[84:104] = b"\x00" * 20
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == "0" * 40
assert len(result) == 40
def test_extract_chd_hash_max_sha1(self, tmp_path):
"""Test handling of maximum SHA1 hash (all 0xFF - edge case but valid)"""
chd_file = tmp_path / "max_hash.chd"
header = bytearray(124)
header[0:8] = b"MComprHD"
header[12:16] = int(5).to_bytes(4, "big")
# All-FF hash
header[84:104] = b"\xff" * 20
chd_file.write_bytes(header)
result = extract_chd_hash(chd_file)
assert result == "f" * 40
assert len(result) == 40