Source code for architect.volatility3_exporter

"""Volatility3Exporter - generate self-contained Volatility3 plugins from patterns."""

import logging
from datetime import datetime, timezone
from pathlib import Path
from string import Template
from typing import Optional

from .yara_exporter import YaraExporter

logger = logging.getLogger("memdiver.architect.volatility3_exporter")

_PLUGIN_TEMPLATE = Template('''\
"""MemDiver Volatility3 plugin: $plugin_name
$description
Generated: $timestamp | Pattern: $pattern_name ($pattern_length bytes, $static_ratio static)
Key region: offset +$key_offset, length $key_length
"""
import math
from typing import List
from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import scanners

YARA_RULE = r\'\'\'
$yara_rule
\'\'\'
PATTERN_LENGTH = $pattern_length
PATTERN_NAME = "$pattern_name"
KEY_OFFSET = $key_offset
KEY_LENGTH = $key_length
# Regex built from wildcard pattern: static bytes are literal, ?? becomes .
SCAN_REGEX = $scan_regex_repr
# Fallback: longest contiguous static run for BytesScanner.
NEEDLE = bytes.fromhex("$fallback_hex")
NEEDLE_OFFSET = $needle_offset
VTYPES = $vtypes_repr

_COLUMNS = [
    ("KeyOffset", renderers.format_hints.Hex),
    ("KeyHex", str),
    ("KeyEntropy", float),
    ("KeyLength", int),
    ("PatternOffset", renderers.format_hints.Hex),
    ("StaticRatio", float),
]

class $class_name(interfaces.plugins.PluginInterface):
    """Scan process memory for MemDiver pattern: $pattern_name."""
    _required_framework_version = (2, 0, 0)
    _version = (1, 1, 0)

    @classmethod
    def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
        return [
            requirements.TranslationLayerRequirement(
                name="primary", description="Memory layer", optional=False),
            requirements.SymbolTableRequirement(
                name="symbols", description="OS kernel symbols", optional=True),
            requirements.IntRequirement(
                name="pid", description="Target process PID",
                optional=True, default=None),
            requirements.BooleanRequirement(
                name="full_scan",
                description="Scan full memory instead of PID-filtered",
                optional=True, default=False),
        ]

    @staticmethod
    def _entropy(data: bytes) -> float:
        if not data:
            return 0.0
        freq = [0] * 256
        for b in data:
            freq[b] += 1
        n = len(data)
        return -sum((c / n) * math.log2(c / n) for c in freq if c > 0)

    def _scan_layer(self, layer_name: str):
        """Scan *layer_name* for the pattern.

        Primary: ``RegExScanner`` with the full wildcard pattern
        converted to a Python bytes regex — encodes both structural
        anchors and wildcard positions natively.

        Fallback: ``BytesScanner`` with the longest static needle,
        adjusted by ``NEEDLE_OFFSET`` to the pattern start, then
        verified against a compiled YARA rule on the read data.

        ``layer.scan()`` in Volatility3 >= 2.x yields plain ``int``
        offsets (not tuples).
        """
        layer = self.context.layers[layer_name]

        # --- Primary: RegExScanner (full structural pattern) ---
        if SCAN_REGEX:
            try:
                seen = set()
                for offset in layer.scan(
                    context=self.context,
                    scanner=scanners.RegExScanner(SCAN_REGEX)):
                    if offset in seen:
                        continue
                    seen.add(offset)
                    try:
                        data = layer.read(offset, PATTERN_LENGTH)
                    except Exception:
                        continue
                    yield offset, data
                return
            except Exception:
                pass  # fall through to BytesScanner

        # --- Fallback: BytesScanner + optional YARA verify ---
        if not NEEDLE:
            return

        yara_rules = None
        try:
            import yara
            yara_rules = yara.compile(source=YARA_RULE)
        except Exception:
            pass

        seen = set()
        for needle_hit in layer.scan(
            context=self.context,
            scanner=scanners.BytesScanner(needle=NEEDLE)):
            pattern_start = needle_hit - NEEDLE_OFFSET
            if pattern_start < 0 or pattern_start in seen:
                continue
            seen.add(pattern_start)
            try:
                data = layer.read(pattern_start, PATTERN_LENGTH)
            except Exception:
                continue
            if yara_rules is not None:
                if not yara_rules.match(data=data):
                    continue
            yield pattern_start, data

    def _generator(self, layer_name: str):
        for pattern_offset, full_data in self._scan_layer(layer_name):
            key_bytes = full_data[KEY_OFFSET:KEY_OFFSET + KEY_LENGTH]
            key_entropy = self._entropy(key_bytes)
            key_vaddr = pattern_offset + KEY_OFFSET
            yield (0, (
                renderers.format_hints.Hex(key_vaddr),
                key_bytes.hex(),
                round(key_entropy, 4),
                KEY_LENGTH,
                renderers.format_hints.Hex(pattern_offset),
                $static_ratio,
            ))

    def _grid(self, layer_name: str):
        return renderers.TreeGrid(_COLUMNS, self._generator(layer_name))

    def _try_pid_scan(self, layer_name, pid):
        """Attempt PID-filtered scan on Linux then Windows."""
        for mod, cls, list_fn, pid_attr, layer_fn, sym_key in [
            ("volatility3.plugins.linux.pslist", "PsList",
             "list_tasks", "pid", "add_process_layer", "vmlinux"),
            ("volatility3.plugins.windows.pslist", "PsList",
             "list_processes", "UniqueProcessId", "add_process_layer",
             "nt_symbols"),
        ]:
            try:
                import importlib
                plugin = importlib.import_module(mod)
                # Resolve symbol table key: prefer OS-specific, fall back
                stab = self.config.get(sym_key) or self.config.get("symbols", "")
                for proc in getattr(getattr(plugin, cls), list_fn)(
                    self.context, layer_name, stab):
                    if getattr(proc, pid_attr) == pid:
                        proc_layer = getattr(proc, layer_fn)()
                        if proc_layer:
                            return self._grid(proc_layer)
            except Exception:
                continue
        return None

    def run(self):
        layer_name = self.config["primary"]
        full_scan = self.config.get("full_scan", False)
        pid = self.config.get("pid", None)

        if full_scan:
            return self._grid(layer_name)

        if pid is not None:
            result = self._try_pid_scan(layer_name, pid)
            if result is not None:
                return result
            # PID scan failed — fall back to full layer scan.
            return self._grid(layer_name)

        # Neither --pid nor --full-scan: default to full scan.
        return self._grid(layer_name)
''')


def _sanitize_class_name(name: str) -> str:
    """Convert to valid Python class name (CamelCase)."""
    sanitized = "".join(c if c.isalnum() else "_" for c in name)
    if sanitized and sanitized[0].isdigit():
        sanitized = "Scan" + sanitized
    return "".join(w.capitalize() for w in sanitized.split("_") if w)


def _longest_static_run(wildcard_pattern: str) -> tuple[str, int]:
    """Extract longest contiguous run of non-wildcard hex bytes.

    Returns ``(hex_string, byte_offset)`` where *byte_offset* is the
    position of the first byte of the run inside the full pattern.
    """
    tokens = wildcard_pattern.split()
    best: list[str] = []
    best_start = 0
    current: list[str] = []
    current_start = 0
    for i, token in enumerate(tokens):
        if "?" in token:
            if len(current) > len(best):
                best = current
                best_start = current_start
            current = []
            current_start = i + 1
        else:
            if not current:
                current_start = i
            current.append(token)
    if len(current) > len(best):
        best = current
        best_start = current_start
    return "".join(best).lower(), best_start


def _wildcard_to_regex(wildcard_pattern: str) -> bytes:
    r"""Convert a YARA-style wildcard hex pattern to a Python bytes regex.

    ``"aa bb ?? cc"`` → ``b'\xaa\xbb.\xcc'``

    Static bytes become literal ``\xNN``; ``??`` wildcards become ``.``
    (match any single byte).  The result is usable with
    ``scanners.RegExScanner`` in Volatility3.
    """
    tokens = wildcard_pattern.split()
    parts: list[bytes] = []
    for token in tokens:
        if "?" in token:
            parts.append(b".")
        else:
            byte_val = int(token, 16)
            # Escape bytes that are regex metacharacters.
            if byte_val in _REGEX_META:
                parts.append(b"\\" + bytes([byte_val]))
            else:
                parts.append(bytes([byte_val]))
    return b"".join(parts)


_REGEX_META = frozenset(b"\\^$.|?*+()[]{}")


[docs] class Volatility3Exporter: """Export byte patterns as self-contained Volatility3 Python plugins."""
[docs] @staticmethod def export( pattern: dict, plugin_name: Optional[str] = None, description: Optional[str] = None, yara_rule: Optional[str] = None, ) -> str: """Export a pattern dict as a Volatility3 plugin Python source. Args: pattern: Pattern dict from PatternGenerator.generate(), optionally enriched with *key_offset*, *key_length*, *vtypes*, and *fields* by ``vol3_emit``. plugin_name: Plugin class name (defaults to CamelCase of pattern name). description: Human-readable description. yara_rule: Pre-built YARA rule string. Generated if not provided. Returns: Complete Python source code for a Volatility3 plugin. """ raw_name = pattern.get("name", "memdiver_pattern") class_name = plugin_name or ("MemDiverScan" + _sanitize_class_name(raw_name)) desc = description or f"Scan for MemDiver pattern: {raw_name}" rule = yara_rule or YaraExporter.export(pattern) fallback_hex, needle_offset = _longest_static_run( pattern.get("wildcard_pattern", ""), ) timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") key_offset = pattern.get("key_offset", 0) key_length = pattern.get("key_length", pattern.get("length", 0)) vtypes = pattern.get("vtypes", {}) wp = pattern.get("wildcard_pattern", "") scan_regex = _wildcard_to_regex(wp) if wp else b"" source = _PLUGIN_TEMPLATE.substitute( plugin_name=raw_name, description=desc, timestamp=timestamp, pattern_name=raw_name, pattern_length=pattern.get("length", 0), static_ratio=pattern.get("static_ratio", 0), yara_rule=rule, fallback_hex=fallback_hex, needle_offset=needle_offset, class_name=class_name, key_offset=key_offset, key_length=key_length, vtypes_repr=repr(vtypes), scan_regex_repr=repr(scan_regex), ) logger.info("Exported Volatility3 plugin: %s (%d bytes)", class_name, pattern.get("length", 0)) return source
[docs] @staticmethod def save(content: str, output_path: Path) -> None: """Write plugin source to a file.""" output_path.write_text(content) logger.info("Saved Volatility3 plugin to %s", output_path)