Source code for core.kdf_registry

"""Auto-discovery registry for KDF (Key Derivation Function) plugins.

Mirrors the algorithm plugin pattern in algorithms/registry.py.  Discovers
BaseKDF subclasses from all ``core/kdf_*.py`` modules via importlib.
"""

import importlib
import logging
import pkgutil
from pathlib import Path
from typing import Dict, List, Optional

from core.kdf_base import BaseKDF

logger = logging.getLogger("memdiver.kdf_registry")


[docs] class KDFRegistry: """Discover and manage KDF plugins from ``core/kdf_*.py`` modules."""
[docs] def __init__(self): self._kdfs: Dict[str, BaseKDF] = {}
[docs] def discover(self) -> None: """Walk ``core/kdf_*.py`` modules and register BaseKDF subclasses.""" core_dir = Path(__file__).parent for py_file in sorted(core_dir.glob("kdf_*.py")): mod_name = f"core.{py_file.stem}" if mod_name == "core.kdf_base" or mod_name == "core.kdf_registry": continue try: mod = importlib.import_module(mod_name) except ImportError as exc: if "No module named" in str(exc): logger.debug("Optional KDF module not found: %s", mod_name) else: logger.warning("Failed to import KDF module %s: %s", mod_name, exc) continue for attr_name in dir(mod): attr = getattr(mod, attr_name) if ( isinstance(attr, type) and issubclass(attr, BaseKDF) and attr is not BaseKDF and getattr(attr, "name", "") ): instance = attr() self._kdfs[instance.name] = instance logger.debug("Registered KDF: %s", instance.name)
[docs] def get(self, name: str) -> Optional[BaseKDF]: """Return KDF plugin by name, or None.""" return self._kdfs.get(name)
[docs] def get_for_protocol( self, protocol: str, version: str ) -> Optional[BaseKDF]: """Return the first KDF matching *protocol* and *version*.""" for kdf in self._kdfs.values(): if kdf.protocol == protocol and version in kdf.versions: return kdf return None
[docs] def list_all(self) -> List[BaseKDF]: """Return all registered KDF plugins.""" return list(self._kdfs.values())
_registry: Optional[KDFRegistry] = None
[docs] def get_kdf_registry() -> KDFRegistry: """Return the lazily-initialised global KDF registry.""" global _registry if _registry is None: _registry = KDFRegistry() _registry.discover() return _registry