"""DumpIngestor - load dump files with optional metadata."""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from core.discovery import RunDiscovery, DatasetScanner, DatasetInfo
from core.dump_io import DumpReader
from core.models import RunDirectory
logger = logging.getLogger("memdiver.harvester.ingestor")
[docs]
class DumpIngestor:
"""Load and organize dump files for analysis.
Provides structured access to the dataset tree with optional
sidecar metadata enrichment.
"""
[docs]
def __init__(self, root: Path, keylog_filename: str = "keylog.csv"):
self.root = root
self.keylog_filename = keylog_filename
self._dataset_info: Optional[DatasetInfo] = None
[docs]
def scan(self) -> DatasetInfo:
"""Perform a fast scan of the dataset tree."""
scanner = DatasetScanner(self.root, self.keylog_filename)
self._dataset_info = scanner.fast_scan()
logger.info(
"Scanned dataset: %d TLS versions, %d total runs",
len(self._dataset_info.tls_versions),
self._dataset_info.total_runs,
)
return self._dataset_info
@property
def dataset_info(self) -> Optional[DatasetInfo]:
return self._dataset_info
[docs]
def load_library_runs(
self,
tls_version: str,
scenario: str,
library: str,
max_runs: int = 10,
template=None,
) -> List[RunDirectory]:
"""Load run directories for a specific library."""
lib_dir = self.root / f"TLS{tls_version}" / scenario / library
if not lib_dir.is_dir():
logger.warning("Library directory not found: %s", lib_dir)
return []
runs = RunDiscovery.discover_library_runs(
lib_dir, max_runs=max_runs,
keylog_filename=self.keylog_filename, template=template,
)
logger.info("Loaded %d runs for %s/%s/%s", len(runs), tls_version, scenario, library)
return runs
[docs]
def load_dump_data(self, dump_path: Path) -> bytes:
"""Load raw dump data from a file."""
with DumpReader(dump_path) as reader:
return reader.read_all()
[docs]
def get_dump_paths_for_phase(
self,
runs: List[RunDirectory],
phase: str,
) -> List[Path]:
"""Collect all dump file paths for a given phase across runs."""
paths = []
for run in runs:
dump = run.get_dump_for_phase(phase)
if dump:
paths.append(dump.path)
return paths
[docs]
def list_libraries(self, tls_version: str, scenario: str) -> List[str]:
"""List available libraries for a version/scenario."""
if self._dataset_info is None:
self.scan()
libs = self._dataset_info.libraries.get(scenario, set())
return sorted(libs)
[docs]
def list_scenarios(self, tls_version: str) -> List[str]:
"""List available scenarios for a TLS version."""
if self._dataset_info is None:
self.scan()
return self._dataset_info.scenarios.get(tls_version, [])