Source code for engine.pipeline

"""AnalysisPipeline - orchestrate the analysis workflow."""

import logging
from pathlib import Path
from typing import Dict, List, Optional

from core.discovery import RunDiscovery
from core.input_schemas import AnalyzeRequest
from core.keylog import KeylogParser
from core.keylog_templates import get_template
from core.models import CryptoSecret
from core.phase_normalizer import PhaseNormalizer
from .consensus import ConsensusVector
from .correlator import SearchCorrelator
from .derived_keys import DerivedKeyExpander
from .diff_store import DiffStore
from .results import AnalysisResult, LibraryReport, SecretHit

try:
    from .project_db import ProjectDB
    _HAS_PROJECT_DB = True
except ImportError:
    _HAS_PROJECT_DB = False

logger = logging.getLogger("memdiver.engine.pipeline")


def _resolve_dump(run, phase: str, normalize: bool):
    """Get the dump for *phase*, falling back to canonical matching."""
    dump = run.get_dump_for_phase(phase)
    if dump is None and normalize:
        dump = next((d for d in run.dumps if d.canonical_or_raw == phase), None)
    return dump


[docs] class AnalysisPipeline: """Orchestrate the full analysis workflow. Flow: Load dumps -> Build ConsensusVector -> Expand derived keys -> Search (mmap) -> Feed DiffStore (Polars) -> Package results. """
[docs] def __init__(self, project_db=None, auto_persist: bool = True): self.consensus = ConsensusVector() self.correlator = SearchCorrelator() self.expander = DerivedKeyExpander() self.diff_store = DiffStore() self._normalizer = PhaseNormalizer() self._project_db = project_db self._auto_persist = auto_persist
[docs] def analyze_library( self, library_dir: Path, phase: str, protocol_version: str, keylog_filename: str = "keylog.csv", max_runs: int = 10, expand_keys: bool = True, template=None, normalize: bool = False, algorithms: Optional[List[str]] = None, align_candidates: bool = False, verify_decryption: bool = False, ) -> LibraryReport: """Run full analysis on one library at one phase.""" runs = RunDiscovery.discover_library_runs( library_dir, max_runs=max_runs, keylog_filename=keylog_filename, template=template, ) if not runs: logger.warning("No runs found in %s", library_dir) return LibraryReport( library=library_dir.name, protocol_version=protocol_version, phase=phase, num_runs=0, ) library_name = runs[0].library logger.info("Analyzing %s: %d runs, phase=%s", library_name, len(runs), phase) # Apply phase normalization if requested if normalize: for run in runs: mappings = self._normalizer.normalize_run(run) for raw_phase, mapping in mappings.items(): mapping.dump_file.canonical_phase = mapping.canonical_phase # Resolve dump for each run (with optional canonical fallback) run_dumps = [] for run in runs: dump = _resolve_dump(run, phase, normalize) if dump: run_dumps.append((run, dump)) if runs and not run_dumps: logger.warning("Phase '%s' not found in any of %d runs for %s", phase, len(runs), library_name) # Build consensus matrix (DumpSource-aware for MSL ASLR alignment) from core.dump_source import open_dump _sources = [] if len(run_dumps) >= 2: try: for _, _d in run_dumps: _src = open_dump(_d.path) _src.open() _sources.append(_src) self.consensus.build_from_sources(_sources) finally: for _src in _sources: _src.close() self.correlator = SearchCorrelator(self.consensus) # Resolve secrets secrets = runs[0].secrets if runs else [] # Expand derived keys all_secrets = list(secrets) if expand_keys: derived = self.expander.expand_secrets(secrets) all_secrets.extend(derived) # Determine output phase name (canonical if normalizing) phase_out = phase if normalize and run_dumps: phase_out = run_dumps[0][1].canonical_or_raw # Search each dump (skip exact_match if not in requested algorithms) all_hits: List[SecretHit] = [] run_exact_match = algorithms is None or "exact_match" in algorithms if run_exact_match: for run, dump in run_dumps: hits = self.correlator.search_all( dump.path, all_secrets, library=library_name, phase=phase_out, run_id=run.run_number, ) all_hits.extend(hits) # Feed to DiffStore self.diff_store.ingest_hits(all_hits) # Build report report = LibraryReport( library=library_name, protocol_version=protocol_version, phase=phase_out, num_runs=len(runs), hits=all_hits, static_regions=self.consensus.get_static_regions() if self.consensus.size > 0 else [], metadata={ "consensus": self.consensus.classification_counts() if self.consensus.size > 0 else {}, "diff_summary": self.diff_store.summary_stats(), "total_secrets": len(all_secrets), "derived_count": len(all_secrets) - len(secrets), }, ) # Optional: alignment-filtered candidates if align_candidates and self.consensus.size > 0: aligned = self.consensus.get_aligned_candidates() report.metadata["aligned_candidates"] = [ {"start": r.start, "end": r.end, "length": r.length, "mean_variance": r.mean_variance} for r in aligned ] if verify_decryption and all_hits and secrets: self._verify_hits(all_hits, secrets) logger.info("Report: %d hits across %d runs", len(all_hits), len(runs)) # Persist findings to ProjectDB if available and auto_persist enabled if self._auto_persist and self._project_db and getattr(self._project_db, '_available', False): try: self._persist_report(report, all_hits, library_dir) except Exception as e: logger.warning("ProjectDB persistence failed: %s", e) return report
def _verify_hits(self, all_hits, secrets): """Mark hits whose bytes decrypt a known ciphertext under the first matching secret.""" try: from .verification import AesCbcVerifier, VERIFICATION_IV, VERIFICATION_PLAINTEXT except ImportError: logger.debug("cryptography not available, skipping verification") return from collections import defaultdict from core.dump_source import open_dump verifier = AesCbcVerifier() key_len = verifier.key_length secret = next((s for s in secrets if len(s.secret_value) == key_len), None) if secret is None: return ciphertext = verifier.create_ciphertext( secret.secret_value, VERIFICATION_PLAINTEXT, VERIFICATION_IV, ) hits_by_path = defaultdict(list) for hit in all_hits: if hit.length == key_len: hits_by_path[str(hit.dump_path)].append(hit) for path_str, path_hits in hits_by_path.items(): try: with open_dump(Path(path_str)) as source: for hit in path_hits: candidate = source.read_range(hit.offset, key_len) hit.verified = verifier.verify( candidate, ciphertext, VERIFICATION_IV, VERIFICATION_PLAINTEXT, ) except FileNotFoundError: logger.debug("dump missing during verification: %s", path_str) def _persist_report(self, report, hits, library_dir): """Write analysis results to ProjectDB.""" db = self._project_db project_name = f"{report.library}_{report.protocol_version}" pid = db.create_project(project_name, description=str(library_dir)) rid = db.start_run(pid, config={ "phase": report.phase, "num_runs": report.num_runs, "protocol_version": report.protocol_version, }) for hit in hits: db.add_finding( rid, finding_type=hit.secret_type, offset=hit.offset, length=hit.length, value_hex=hit.value_hex if hasattr(hit, 'value_hex') else None, confidence=1.0, ) db.finish_run(rid)
[docs] def run(self, request: AnalyzeRequest) -> AnalysisResult: """Run analysis across multiple libraries.""" template = request.template or get_template(request.template_name) result = AnalysisResult() for lib_dir in request.library_dirs: report = self.analyze_library( lib_dir, request.phase, request.protocol_version, keylog_filename=request.keylog_filename, max_runs=request.max_runs, expand_keys=request.expand_keys, template=template, normalize=request.normalize, algorithms=request.algorithms, ) result.libraries.append(report) result.metadata = {"diff_summary": self.diff_store.summary_stats()} return result