Source code for core.phase_normalizer

"""PhaseNormalizer - map raw dump phase names to canonical lifecycle stages.

TLS libraries use different naming conventions for lifecycle events (e.g.,
"abort", "shutdown", "cleanup"). This module normalizes those raw names into
a consistent set of canonical phases based on timestamp ordering rather than
name matching.

Canonical phases (in display order):
    pre/post_key_update      - TLS 1.3 key update events
    pre/post_handshake_end   - First lifecycle event after handshake
    pre/post_second_event    - Second lifecycle event (if present)
    pre/post_cleanup         - Final cleanup phase
"""

import logging
from dataclasses import dataclass
from typing import Dict, List, Tuple

from .models import DumpFile, RunDirectory

logger = logging.getLogger("memdiver.phase_normalizer")

CANONICAL_PHASE_ORDER: List[str] = [
    "pre_key_update", "post_key_update",
    "pre_handshake_end", "post_handshake_end",
    "pre_second_event", "post_second_event",
    "pre_cleanup", "post_cleanup",
]

_GENERIC_SUFFIXES: List[str] = ["handshake_end", "second_event"]


[docs] @dataclass class PhaseMapping: """Maps a raw dump phase to its canonical lifecycle stage.""" raw_phase: str canonical_phase: str timestamp: str dump_file: DumpFile
def _group_into_pairs( sorted_dumps: List[DumpFile], ) -> List[Tuple[str, List[DumpFile]]]: """Group sorted dumps by phase_name, preserving first-appearance order.""" groups: Dict[str, List[DumpFile]] = {} for dump in sorted_dumps: groups.setdefault(dump.phase_name, []).append(dump) return list(groups.items()) def _generic_suffix(index: int) -> str: """Return the canonical suffix for the *index*-th generic pair.""" if index < len(_GENERIC_SUFFIXES): return _GENERIC_SUFFIXES[index] return f"event_{index + 1}" def _emit_mappings( dumps: List[DumpFile], suffix: str, result: Dict[str, PhaseMapping], ) -> None: """Write PhaseMapping entries for every dump in *dumps*.""" for dump in dumps: result[dump.full_phase] = PhaseMapping( raw_phase=dump.full_phase, canonical_phase=f"{dump.phase_prefix}_{suffix}", timestamp=dump.timestamp, dump_file=dump, )
[docs] class PhaseNormalizer: """Normalize raw dump phase names to canonical lifecycle stages. Uses timestamp ordering (not name matching) to assign canonical roles to each phase pair. Usage:: normalizer = PhaseNormalizer() mappings = normalizer.normalize_run(run_directory) for raw, mapping in mappings.items(): print(f"{raw} -> {mapping.canonical_phase}") """ KEY_UPDATE_NAMES = {"server_key_update", "client_key_update"} CLEANUP_NAMES = {"cleanup"}
[docs] def normalize_run(self, run: RunDirectory) -> Dict[str, PhaseMapping]: """Normalize all dump phases in a run to canonical lifecycle stages. Sorts dumps by timestamp, groups into pre/post pairs by phase_name, then classifies: key_update, cleanup (last pair wins), or generic (first -> handshake_end, second -> second_event). """ if not run.dumps: return {} sorted_dumps = sorted(run.dumps, key=lambda d: d.timestamp) pairs = _group_into_pairs(sorted_dumps) return self._classify_pairs(pairs)
[docs] def available_canonical_phases(self, runs: List[RunDirectory]) -> List[str]: """Return canonical phases present across *runs*, in display order.""" seen: set = set() for run in runs: for mapping in self.normalize_run(run).values(): seen.add(mapping.canonical_phase) return [phase for phase in CANONICAL_PHASE_ORDER if phase in seen]
[docs] @staticmethod def get_canonical_display(canonical: str) -> str: """Return a human-readable label (e.g. 'Pre Handshake End').""" return canonical.replace("_", " ").title()
# ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _classify_pairs( self, pairs: List[Tuple[str, List[DumpFile]]], ) -> Dict[str, PhaseMapping]: """Classify grouped phase pairs into canonical categories.""" result: Dict[str, PhaseMapping] = {} generic_pairs: List[Tuple[str, List[DumpFile]]] = [] last_cleanup_dumps = None for phase_name, dumps in pairs: if phase_name in self.KEY_UPDATE_NAMES: _emit_mappings(dumps, "key_update", result) elif phase_name in self.CLEANUP_NAMES: last_cleanup_dumps = dumps else: generic_pairs.append((phase_name, dumps)) for idx, (_phase_name, dumps) in enumerate(generic_pairs): _emit_mappings(dumps, _generic_suffix(idx), result) if last_cleanup_dumps is not None: _emit_mappings(last_cleanup_dumps, "cleanup", result) return result