Source code for harvester.metadata_store

"""MetadataStore - aggregate metadata across runs using Polars."""

import logging
from typing import Any, Dict, List, Optional

logger = logging.getLogger("memdiver.harvester.metadata_store")

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False

from core.models import RunDirectory


[docs] class MetadataStore: """Aggregate and query metadata across multiple runs. Uses Polars DataFrames for efficient filtering and aggregation of run metadata, sidecar data, and analysis results. """
[docs] def __init__(self): self._records: List[Dict[str, Any]] = [] self._df: Optional[object] = None
[docs] def add_run( self, run: RunDirectory, sidecar: Optional[Dict[str, Any]] = None, ) -> None: """Register a run directory with optional sidecar metadata.""" record = { "library": run.library, "tls_version": run.tls_version, "run_number": run.run_number, "num_dumps": len(run.dumps), "num_secrets": len(run.secrets), "path": str(run.path), } if sidecar: for k, v in sidecar.items(): if isinstance(v, (str, int, float, bool)): record[f"meta_{k}"] = v self._records.append(record) self._df = None # Invalidate cache
def _ensure_df(self): """Build the Polars DataFrame if not cached.""" if self._df is not None: return if not HAS_POLARS or not self._records: return self._df = pl.DataFrame(self._records)
[docs] def get_runs_for_library(self, library: str) -> List[Dict]: """Get all run records for a specific library.""" return [r for r in self._records if r["library"] == library]
[docs] def summary(self) -> Dict[str, Any]: """Summary statistics across all registered runs.""" if not self._records: return {"total_runs": 0} if not HAS_POLARS: return { "total_runs": len(self._records), "libraries": list(set(r["library"] for r in self._records)), } self._ensure_df() return { "total_runs": len(self._records), "libraries": self._df["library"].n_unique(), "total_dumps": self._df["num_dumps"].sum(), "total_secrets": self._df["num_secrets"].sum(), }
[docs] def filter_by(self, **kwargs) -> List[Dict]: """Filter records by field values.""" results = self._records for key, value in kwargs.items(): results = [r for r in results if r.get(key) == value] return results