"""MSL file reader with mmap-backed, endianness-aware parsing."""
import logging
import mmap
import struct
from pathlib import Path
from typing import Iterator, List, Optional, Tuple
from uuid import UUID
from .block_iter import merge_continuations as _merge_cont
from .compress import decompress
from .decoders import (decode_connection_table, decode_connectivity_table,
decode_end_of_capture, decode_handle_table,
decode_import_provenance, decode_key_hint,
decode_memory_region, decode_module_entry,
decode_module_list_index, decode_process_identity,
decode_process_table, decode_related_dump,
decode_vas_map)
from .decoders_ext import (decode_environment_block, decode_file_descriptor,
decode_network_connection, decode_security_token,
decode_system_context, decode_thread_context)
from .enums import (BLOCK_HEADER_SIZE, BLOCK_MAGIC, BlockType, Endianness,
FILE_HEADER_SIZE, FILE_MAGIC)
from .types import (MslBlockHeader, MslConnectionTable, MslConnectivityTable,
MslEncryptedError, MslEndOfCapture, MslFileHeader,
MslHandleTable, MslImportProvenance, MslKeyHint,
MslMemoryRegion, MslModuleEntry, MslModuleListIndex,
MslParseError, MslProcessIdentity, MslProcessTable,
MslRelatedDump, MslVasMap)
logger = logging.getLogger("memdiver.msl.reader")
[docs]
class MslReader:
"""Memory-mapped MSL file reader (context manager)."""
_CACHE_ATTRS = ("_regions_cache", "_hints_cache", "_modules_cache",
"_process_identity_cache", "_vas_cache",
"_related_dumps_cache", "_end_of_capture_cache",
"_import_provenance_cache",
# New spec-defined table decoders (Phase MSL-Decoders-02)
"_module_list_index_cache", "_processes_cache",
"_connections_cache", "_handles_cache",
"_connectivity_tables_cache",
# Ext decoders (wired but speculative layouts; see decoders_ext.py)
"_thread_contexts_cache", "_file_descriptors_cache",
"_network_connections_cache", "_environment_blocks_cache",
"_security_tokens_cache", "_system_context_cache")
[docs]
def __init__(self, path: Path):
self.path = path
self._file = None
self._mmap: Optional[mmap.mmap] = None
self._file_header: Optional[MslFileHeader] = None
self._byte_order: str = "<"
for attr in self._CACHE_ATTRS:
setattr(self, attr, None)
[docs]
def open(self) -> None:
self._file = open(self.path, "rb")
size = self.path.stat().st_size
if size < FILE_HEADER_SIZE:
raise MslParseError(f"File too small: {size} bytes")
self._mmap = mmap.mmap(self._file.fileno(), 0, access=mmap.ACCESS_READ)
self._file_header = self._parse_file_header()
[docs]
def close(self) -> None:
for attr in self._CACHE_ATTRS:
setattr(self, attr, None)
if self._mmap:
self._mmap.close()
self._mmap = None
if self._file:
self._file.close()
self._file = None
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()
@property
def file_header(self) -> MslFileHeader:
if self._file_header is None:
raise MslParseError("Reader not opened")
return self._file_header
def _parse_file_header(self) -> MslFileHeader:
buf = self._mmap
if buf[0:8] != FILE_MAGIC:
raise MslParseError(f"Bad magic: {bytes(buf[0:8])!r}")
endianness = buf[8]
if endianness not in (Endianness.LITTLE, Endianness.BIG):
raise MslParseError(f"Invalid endianness: 0x{endianness:02X}")
self._byte_order = "<" if endianness == Endianness.LITTLE else ">"
bo = self._byte_order
header_size = buf[9]
version = struct.unpack_from(f"{bo}H", buf, 0x0A)[0]
flags = struct.unpack_from(f"{bo}I", buf, 0x0C)[0]
cap_bitmap = struct.unpack_from(f"{bo}Q", buf, 0x10)[0]
dump_uuid = UUID(bytes=bytes(buf[0x18:0x28]))
timestamp_ns = struct.unpack_from(f"{bo}Q", buf, 0x28)[0]
os_type = struct.unpack_from(f"{bo}H", buf, 0x30)[0]
arch_type = struct.unpack_from(f"{bo}H", buf, 0x32)[0]
pid = struct.unpack_from(f"{bo}I", buf, 0x34)[0]
clock_source = buf[0x38]
hdr = MslFileHeader(
endianness=endianness, header_size=header_size,
version_major=(version >> 8) & 0xFF,
version_minor=version & 0xFF,
flags=flags, cap_bitmap=cap_bitmap,
dump_uuid=dump_uuid, timestamp_ns=timestamp_ns,
os_type=os_type, arch_type=arch_type,
pid=pid, clock_source=clock_source,
)
if hdr.encrypted:
raise MslEncryptedError(
"Encrypted MSL files not supported. Decrypt first."
)
return hdr
def _parse_block_header(self, offset: int) -> MslBlockHeader:
buf = self._mmap
if buf[offset:offset + 4] != BLOCK_MAGIC:
raise MslParseError(f"Bad block magic at 0x{offset:X}")
bo = self._byte_order
return MslBlockHeader(
block_type=struct.unpack_from(f"{bo}H", buf, offset + 4)[0],
flags=struct.unpack_from(f"{bo}H", buf, offset + 6)[0],
block_length=struct.unpack_from(f"{bo}I", buf, offset + 8)[0],
payload_version=struct.unpack_from(f"{bo}H", buf, offset + 0x0C)[0],
block_uuid=UUID(bytes=bytes(buf[offset + 0x10:offset + 0x20])),
parent_uuid=UUID(bytes=bytes(buf[offset + 0x20:offset + 0x30])),
prev_hash=bytes(buf[offset + 0x30:offset + 0x50]),
file_offset=offset,
payload_offset=offset + BLOCK_HEADER_SIZE,
)
def _iter_raw_blocks(self) -> Iterator[Tuple[MslBlockHeader, bytes]]:
"""Iterate all blocks without continuation merging."""
buf = self._mmap
offset = self.file_header.header_size
file_size = len(buf)
while offset + BLOCK_HEADER_SIZE <= file_size:
if buf[offset:offset + 4] != BLOCK_MAGIC:
break
hdr = self._parse_block_header(offset)
if hdr.block_length < BLOCK_HEADER_SIZE:
raise MslParseError(f"Invalid block length at 0x{offset:X}")
end = offset + hdr.block_length
if end > file_size:
logger.warning("Truncated block at 0x%X", offset)
break
raw = bytes(buf[hdr.payload_offset:end])
if hdr.compressed:
raw = decompress(raw, hdr.comp_algo)
yield hdr, raw
offset = end
[docs]
def iter_blocks(self, merge_cont: bool = True) -> Iterator[Tuple[MslBlockHeader, bytes]]:
"""Iterate blocks; merges continuation blocks when *merge_cont* is True."""
raw = self._iter_raw_blocks()
return _merge_cont(raw) if merge_cont else raw
[docs]
def read_bytes(self, offset: int, length: int) -> bytes:
"""Read raw bytes from the mmap at given offset."""
if self._mmap is None:
return b""
end = min(offset + length, len(self._mmap))
return bytes(self._mmap[offset:end])
[docs]
def read_block_payload(self, hdr: MslBlockHeader) -> bytes:
"""Read and decompress a block's payload bytes."""
if self._mmap is None:
return b""
end = min(hdr.file_offset + hdr.block_length, len(self._mmap))
raw = bytes(self._mmap[hdr.payload_offset:end])
if hdr.compressed:
raw = decompress(raw, hdr.comp_algo)
return raw
def _collect(self, block_type, decoder, cache_attr):
cached = getattr(self, cache_attr)
if cached is None:
cached = [decoder(h, p, self._byte_order)
for h, p in self.iter_blocks() if h.block_type == block_type]
setattr(self, cache_attr, cached)
return cached
[docs]
def collect_regions(self) -> List[MslMemoryRegion]:
return self._collect(BlockType.MEMORY_REGION, decode_memory_region, "_regions_cache")
[docs]
def collect_key_hints(self) -> List[MslKeyHint]:
return self._collect(BlockType.KEY_HINT, decode_key_hint, "_hints_cache")
[docs]
def collect_modules(self) -> List[MslModuleEntry]:
return self._collect(BlockType.MODULE_ENTRY, decode_module_entry, "_modules_cache")
[docs]
def collect_process_identity(self) -> List[MslProcessIdentity]:
return self._collect(BlockType.PROCESS_IDENTITY, decode_process_identity, "_process_identity_cache")
[docs]
def collect_vas_map(self) -> List[MslVasMap]:
return self._collect(BlockType.VAS_MAP, decode_vas_map, "_vas_cache")
[docs]
def collect_end_of_capture(self) -> List[MslEndOfCapture]:
return self._collect(BlockType.END_OF_CAPTURE, decode_end_of_capture, "_end_of_capture_cache")
[docs]
def collect_import_provenance(self) -> List[MslImportProvenance]:
return self._collect(BlockType.IMPORT_PROVENANCE, decode_import_provenance, "_import_provenance_cache")
# -- New spec-defined table block collectors --
[docs]
def collect_module_list_index(self) -> List[MslModuleListIndex]:
return self._collect(BlockType.MODULE_LIST_INDEX, decode_module_list_index, "_module_list_index_cache")
[docs]
def collect_processes(self) -> List[MslProcessTable]:
return self._collect(BlockType.PROCESS_TABLE, decode_process_table, "_processes_cache")
[docs]
def collect_connections(self) -> List[MslConnectionTable]:
return self._collect(BlockType.CONNECTION_TABLE, decode_connection_table, "_connections_cache")
[docs]
def collect_handles(self) -> List[MslHandleTable]:
return self._collect(BlockType.HANDLE_TABLE, decode_handle_table, "_handles_cache")
[docs]
def collect_connectivity_tables(self) -> List[MslConnectivityTable]:
return self._collect(
BlockType.CONNECTIVITY_TABLE,
decode_connectivity_table,
"_connectivity_tables_cache",
)
# -- Ext decoder collectors (speculative layouts; see decoders_ext.py) --
[docs]
def collect_thread_contexts(self) -> list:
return self._collect(BlockType.THREAD_CONTEXT, decode_thread_context, "_thread_contexts_cache")
[docs]
def collect_file_descriptors(self) -> list:
return self._collect(BlockType.FILE_DESCRIPTOR, decode_file_descriptor, "_file_descriptors_cache")
[docs]
def collect_network_connections(self) -> list:
return self._collect(BlockType.NETWORK_CONNECTION, decode_network_connection, "_network_connections_cache")
[docs]
def collect_environment_blocks(self) -> list:
return self._collect(BlockType.ENVIRONMENT_BLOCK, decode_environment_block, "_environment_blocks_cache")
[docs]
def collect_security_tokens(self) -> list:
return self._collect(BlockType.SECURITY_TOKEN, decode_security_token, "_security_tokens_cache")
[docs]
def collect_system_context(self) -> list:
"""Collect SYSTEM_CONTEXT (0x0050) blocks per spec §6.2 Table 20."""
return self._collect(BlockType.SYSTEM_CONTEXT, decode_system_context, "_system_context_cache")