core.variance

Per-byte variance computation and classification.

Shared by ConsensusVector (engine) and DifferentialAlgorithm (algorithms). Uses numpy for vectorized computation (10-30x faster than pure Python loops).

class ByteClass[source]

Bases: IntEnum

Byte-position classification based on cross-run variance.

INVARIANT = 0
STRUCTURAL = 1
POINTER = 2
KEY_CANDIDATE = 3
__new__(value)
compute_variance(byte_buffers, min_size, chunk_bytes=CHUNK_BYTES)[source]

Compute per-byte-position variance across multiple byte buffers.

Uses a chunked two-pass estimator: the byte range [0, min_size) is divided into slabs of chunk_bytes bytes; each slab is stacked into an (N, chunk_len) float32 matrix and reduced via np.var(axis=0). Output is bit-identical to a single-call np.var over a fully-stacked matrix because the per-column reduction is column-local.

Parameters:
  • byte_buffers (List[bytes]) – List of raw byte sequences (each at least min_size long).

  • min_size (int) – Number of byte positions to analyze.

  • chunk_bytes (int) – Width of the per-slab reduction. Tests may override.

Returns:

numpy.ndarray (float32) of length min_size with per-byte variance.

Return type:

numpy.ndarray

class WelfordVariance[source]

Bases: object

Online (Welford’s recurrence) per-byte variance accumulator.

Maintains running mean and sum-of-squared-deviations vectors of length size that are updated one dump at a time. Intended for incremental / live-capture workflows where dumps arrive sequentially and the full set cannot be held in memory at once.

Numerically equivalent to np.var(..., axis=0, ddof=0) applied to the batch of all added dumps.

__init__(size)[source]
Parameters:

size (int)

Return type:

None

property num_dumps: int
property size: int
add_dump(buf)[source]

Fold one dump into the running accumulators.

Parameters:

buf (bytes)

Return type:

None

variance()[source]

Return current population variance (ddof=0). Zeros if n == 0.

Return type:

numpy.ndarray

reset()[source]
Return type:

None

state_arrays()[source]

Return (mean, m2, n) — used by the CLI to persist state to disk.

Return type:

tuple[numpy.ndarray, numpy.ndarray, int]

classmethod from_state(mean, m2, n)[source]

Rebuild from persisted arrays (CLI round-trip).

Parameters:
Return type:

WelfordVariance

classify_variance(variance)[source]

Classify every byte position by its variance value.

Parameters:

variance (numpy.ndarray | array) – Per-byte variance values (numpy array or stdlib array).

Returns:

numpy.ndarray (uint8) of ByteClass integer codes.

Return type:

numpy.ndarray

find_contiguous_runs(classifications, target)[source]

Return (start, end) pairs for contiguous runs of target ByteClass.

Parameters:
Return type:

List[Tuple[int, int]]

count_classifications(classifications)[source]

Count occurrences of each classification label.

Returns dict with human-readable string keys for backward compatibility.

Return type:

Dict[str, int]

PhaseNormalizer - map raw dump phase names to canonical lifecycle stages.

TLS libraries use different naming conventions for lifecycle events (e.g., “abort”, “shutdown”, “cleanup”). This module normalizes those raw names into a consistent set of canonical phases based on timestamp ordering rather than name matching.

Canonical phases (in display order):

pre/post_key_update - TLS 1.3 key update events pre/post_handshake_end - First lifecycle event after handshake pre/post_second_event - Second lifecycle event (if present) pre/post_cleanup - Final cleanup phase

class PhaseMapping[source]

Bases: object

Maps a raw dump phase to its canonical lifecycle stage.

raw_phase: str
canonical_phase: str
timestamp: str
dump_file: DumpFile
__init__(raw_phase, canonical_phase, timestamp, dump_file)
Parameters:
Return type:

None

class PhaseNormalizer[source]

Bases: object

Normalize raw dump phase names to canonical lifecycle stages.

Uses timestamp ordering (not name matching) to assign canonical roles to each phase pair.

Usage:

normalizer = PhaseNormalizer()
mappings = normalizer.normalize_run(run_directory)
for raw, mapping in mappings.items():
    print(f"{raw} -> {mapping.canonical_phase}")
KEY_UPDATE_NAMES = {'client_key_update', 'server_key_update'}
CLEANUP_NAMES = {'cleanup'}
normalize_run(run)[source]

Normalize all dump phases in a run to canonical lifecycle stages.

Sorts dumps by timestamp, groups into pre/post pairs by phase_name, then classifies: key_update, cleanup (last pair wins), or generic (first -> handshake_end, second -> second_event).

Parameters:

run (RunDirectory)

Return type:

Dict[str, PhaseMapping]

available_canonical_phases(runs)[source]

Return canonical phases present across runs, in display order.

Parameters:

runs (List[RunDirectory])

Return type:

List[str]

static get_canonical_display(canonical)[source]

Return a human-readable label (e.g. ‘Pre Handshake End’).

Parameters:

canonical (str)

Return type:

str