Skip to content

dataenginex.data

Data connectors, profiling, and schema registry.

dataenginex.data

Data connectors, profiling, and schema registry.

Public API::

from dataenginex.data import (
    DataConnector, RestConnector, FileConnector,
    ConnectorStatus, FetchResult,
    DataProfiler, ProfileReport, ColumnProfile,
    SchemaRegistry, SchemaVersion,
)

ConnectorStatus

Bases: StrEnum

Lifecycle states for a data connector.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
class ConnectorStatus(StrEnum):
    """Lifecycle states for a data connector."""

    IDLE = "idle"
    CONNECTED = "connected"
    FETCHING = "fetching"
    ERROR = "error"
    CLOSED = "closed"

DataConnector

Bases: ABC

Base class for all data connectors in DEX.

Subclass and implement connect, fetch, and close.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
class DataConnector(ABC):
    """Base class for all data connectors in DEX.

    Subclass and implement ``connect``, ``fetch``, and ``close``.
    """

    def __init__(self, name: str, source_type: str) -> None:
        self.name = name
        self.source_type = source_type
        self.status = ConnectorStatus.IDLE
        self._connected_at: datetime | None = None

    # -- lifecycle -----------------------------------------------------------

    @abstractmethod
    async def connect(self) -> bool:
        """Establish connection to the data source."""
        ...

    @abstractmethod
    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Retrieve records from the data source."""
        ...

    @abstractmethod
    async def close(self) -> None:
        """Release resources held by the connector."""
        ...

    # -- helpers -------------------------------------------------------------

    def _mark_connected(self) -> None:
        self.status = ConnectorStatus.CONNECTED
        self._connected_at = datetime.now(tz=UTC)
        logger.info("Connector %s connected", self.name)

    def _mark_error(self, message: str) -> None:
        self.status = ConnectorStatus.ERROR
        logger.error("Connector %s error: %s", self.name, message)

    # -- context manager -----------------------------------------------------

    async def __aenter__(self) -> DataConnector:
        await self.connect()
        return self

    async def __aexit__(self, *_exc: object) -> None:
        await self.close()

connect() abstractmethod async

Establish connection to the data source.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
@abstractmethod
async def connect(self) -> bool:
    """Establish connection to the data source."""
    ...

fetch(*, limit=None, offset=0, filters=None) abstractmethod async

Retrieve records from the data source.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
@abstractmethod
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Retrieve records from the data source."""
    ...

close() abstractmethod async

Release resources held by the connector.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
@abstractmethod
async def close(self) -> None:
    """Release resources held by the connector."""
    ...

FetchResult dataclass

Outcome of a single fetch invocation.

Attributes:

Name Type Description
records list[dict[str, Any]]

Retrieved data records.

record_count int

Number of records returned.

source str

Name of the data source.

fetched_at datetime

Timestamp of the fetch.

duration_ms float

Fetch duration in milliseconds.

errors list[str]

Error messages (empty on success).

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
@dataclass
class FetchResult:
    """Outcome of a single ``fetch`` invocation.

    Attributes:
        records: Retrieved data records.
        record_count: Number of records returned.
        source: Name of the data source.
        fetched_at: Timestamp of the fetch.
        duration_ms: Fetch duration in milliseconds.
        errors: Error messages (empty on success).
    """

    records: list[dict[str, Any]]
    record_count: int
    source: str
    fetched_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    duration_ms: float = 0.0
    errors: list[str] = field(default_factory=list)

    @property
    def success(self) -> bool:
        """Return ``True`` if the fetch completed without errors."""
        return len(self.errors) == 0

success property

Return True if the fetch completed without errors.

FileConnector

Bases: DataConnector

Reads records from local JSON, JSONL, or CSV files.

Parameters

name: Human-readable connector label. path: File system path to the data file. file_format: One of "json", "jsonl", "csv". encoding: Text encoding (default utf-8).

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
class FileConnector(DataConnector):
    """Reads records from local JSON, JSONL, or CSV files.

    Parameters
    ----------
    name:
        Human-readable connector label.
    path:
        File system path to the data file.
    file_format:
        One of ``"json"``, ``"jsonl"``, ``"csv"``.
    encoding:
        Text encoding (default ``utf-8``).
    """

    SUPPORTED_FORMATS = {"json", "jsonl", "csv"}

    def __init__(
        self,
        name: str,
        path: str | Path,
        file_format: str = "json",
        encoding: str = "utf-8",
    ) -> None:
        super().__init__(name=name, source_type="file")
        self.path = Path(path)
        if file_format not in self.SUPPORTED_FORMATS:
            raise ValueError(
                f"Unsupported format {file_format!r}; choose from {self.SUPPORTED_FORMATS}"
            )
        self.file_format = file_format
        self.encoding = encoding
        self._data: list[dict[str, Any]] | None = None

    async def connect(self) -> bool:
        """Verify the configured file exists and is readable."""
        if not self.path.exists():
            self._mark_error(f"File not found: {self.path}")
            return False
        self._mark_connected()
        return True

    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Read and parse records from the local file."""
        self.status = ConnectorStatus.FETCHING
        start = time.perf_counter()

        try:
            if self._data is None:
                self._data = self._load()

            records = self._data[offset:]
            if limit is not None:
                records = records[:limit]

            if filters:
                records = [r for r in records if all(r.get(k) == v for k, v in filters.items())]

            duration = (time.perf_counter() - start) * 1000
            self.status = ConnectorStatus.CONNECTED
            return FetchResult(
                records=records,
                record_count=len(records),
                source=self.name,
                duration_ms=round(duration, 2),
            )
        except Exception as exc:
            duration = (time.perf_counter() - start) * 1000
            self._mark_error(str(exc))
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                duration_ms=round(duration, 2),
                errors=[str(exc)],
            )

    async def close(self) -> None:
        """Release cached file data."""
        self._data = None
        self.status = ConnectorStatus.CLOSED

    # -- private loaders -----------------------------------------------------

    def _load(self) -> list[dict[str, Any]]:
        text = self.path.read_text(encoding=self.encoding)
        if self.file_format == "json":
            data = json.loads(text)
            return data if isinstance(data, list) else [data]
        if self.file_format == "jsonl":
            return [json.loads(line) for line in text.splitlines() if line.strip()]
        # csv
        return self._load_csv(text)

    @staticmethod
    def _load_csv(text: str) -> list[dict[str, Any]]:
        reader = csv.DictReader(io.StringIO(text))
        return [dict(row) for row in reader]

connect() async

Verify the configured file exists and is readable.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def connect(self) -> bool:
    """Verify the configured file exists and is readable."""
    if not self.path.exists():
        self._mark_error(f"File not found: {self.path}")
        return False
    self._mark_connected()
    return True

fetch(*, limit=None, offset=0, filters=None) async

Read and parse records from the local file.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Read and parse records from the local file."""
    self.status = ConnectorStatus.FETCHING
    start = time.perf_counter()

    try:
        if self._data is None:
            self._data = self._load()

        records = self._data[offset:]
        if limit is not None:
            records = records[:limit]

        if filters:
            records = [r for r in records if all(r.get(k) == v for k, v in filters.items())]

        duration = (time.perf_counter() - start) * 1000
        self.status = ConnectorStatus.CONNECTED
        return FetchResult(
            records=records,
            record_count=len(records),
            source=self.name,
            duration_ms=round(duration, 2),
        )
    except Exception as exc:
        duration = (time.perf_counter() - start) * 1000
        self._mark_error(str(exc))
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            duration_ms=round(duration, 2),
            errors=[str(exc)],
        )

close() async

Release cached file data.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def close(self) -> None:
    """Release cached file data."""
    self._data = None
    self.status = ConnectorStatus.CLOSED

RestConnector

Bases: DataConnector

Fetches records from an HTTP/REST API endpoint.

Parameters

name: Human-readable connector label. base_url: Root URL of the API (e.g. https://api.example.com). endpoint: Path appended to base_url for fetches (e.g. /v1/jobs). headers: Extra HTTP headers (auth tokens, accept types, etc.). timeout: HTTP timeout in seconds (default 30). records_key: JSON key that contains the result list (e.g. "data"). When None the root response is expected to be a list.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
class RestConnector(DataConnector):
    """Fetches records from an HTTP/REST API endpoint.

    Parameters
    ----------
    name:
        Human-readable connector label.
    base_url:
        Root URL of the API (e.g. ``https://api.example.com``).
    endpoint:
        Path appended to *base_url* for fetches (e.g. ``/v1/jobs``).
    headers:
        Extra HTTP headers (auth tokens, accept types, etc.).
    timeout:
        HTTP timeout in seconds (default 30).
    records_key:
        JSON key that contains the result list (e.g. ``"data"``).
        When *None* the root response is expected to be a list.
    """

    def __init__(
        self,
        name: str,
        base_url: str,
        endpoint: str = "/",
        headers: dict[str, str] | None = None,
        timeout: float = 30.0,
        records_key: str | None = "data",
    ) -> None:
        super().__init__(name=name, source_type="rest_api")
        self.base_url = base_url.rstrip("/")
        self.endpoint = endpoint
        self.headers = headers or {}
        self.timeout = timeout
        self.records_key = records_key
        self._client: httpx.AsyncClient | None = None

    async def connect(self) -> bool:
        """Open an HTTP client session to the configured base URL."""
        try:
            self._client = httpx.AsyncClient(
                base_url=self.base_url,
                headers=self.headers,
                timeout=httpx.Timeout(self.timeout),
            )
            self._mark_connected()
            return True
        except Exception as exc:
            self._mark_error(str(exc))
            return False

    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Fetch records from the REST endpoint with optional pagination."""
        if self._client is None:
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                errors=["Connector not connected"],
            )

        self.status = ConnectorStatus.FETCHING
        params: dict[str, Any] = {**(filters or {})}
        if limit is not None:
            params["limit"] = limit
        if offset:
            params["offset"] = offset

        start = time.perf_counter()
        try:
            resp = await self._client.get(self.endpoint, params=params)
            resp.raise_for_status()
            body = resp.json()

            if self.records_key and isinstance(body, dict):
                records = body.get(self.records_key, [])
            elif isinstance(body, list):
                records = body
            else:
                records = [body]

            duration = (time.perf_counter() - start) * 1000
            self.status = ConnectorStatus.CONNECTED
            return FetchResult(
                records=records,
                record_count=len(records),
                source=self.name,
                duration_ms=round(duration, 2),
            )
        except Exception as exc:
            duration = (time.perf_counter() - start) * 1000
            self._mark_error(str(exc))
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                duration_ms=round(duration, 2),
                errors=[str(exc)],
            )

    async def close(self) -> None:
        """Close the HTTP client session and release resources."""
        if self._client:
            await self._client.aclose()
            self._client = None
        self.status = ConnectorStatus.CLOSED
        logger.info("RestConnector %s closed", self.name)

connect() async

Open an HTTP client session to the configured base URL.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def connect(self) -> bool:
    """Open an HTTP client session to the configured base URL."""
    try:
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            headers=self.headers,
            timeout=httpx.Timeout(self.timeout),
        )
        self._mark_connected()
        return True
    except Exception as exc:
        self._mark_error(str(exc))
        return False

fetch(*, limit=None, offset=0, filters=None) async

Fetch records from the REST endpoint with optional pagination.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Fetch records from the REST endpoint with optional pagination."""
    if self._client is None:
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            errors=["Connector not connected"],
        )

    self.status = ConnectorStatus.FETCHING
    params: dict[str, Any] = {**(filters or {})}
    if limit is not None:
        params["limit"] = limit
    if offset:
        params["offset"] = offset

    start = time.perf_counter()
    try:
        resp = await self._client.get(self.endpoint, params=params)
        resp.raise_for_status()
        body = resp.json()

        if self.records_key and isinstance(body, dict):
            records = body.get(self.records_key, [])
        elif isinstance(body, list):
            records = body
        else:
            records = [body]

        duration = (time.perf_counter() - start) * 1000
        self.status = ConnectorStatus.CONNECTED
        return FetchResult(
            records=records,
            record_count=len(records),
            source=self.name,
            duration_ms=round(duration, 2),
        )
    except Exception as exc:
        duration = (time.perf_counter() - start) * 1000
        self._mark_error(str(exc))
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            duration_ms=round(duration, 2),
            errors=[str(exc)],
        )

close() async

Close the HTTP client session and release resources.

Source code in packages/dataenginex/src/dataenginex/data/connectors.py
async def close(self) -> None:
    """Close the HTTP client session and release resources."""
    if self._client:
        await self._client.aclose()
        self._client = None
    self.status = ConnectorStatus.CLOSED
    logger.info("RestConnector %s closed", self.name)

ColumnProfile dataclass

Statistics for a single column / field.

Source code in packages/dataenginex/src/dataenginex/data/profiler.py
@dataclass
class ColumnProfile:
    """Statistics for a single column / field."""

    name: str
    dtype: str  # "string", "numeric", "boolean", "null", "mixed"
    total_count: int = 0
    null_count: int = 0
    unique_count: int = 0

    # numeric stats (populated when dtype == "numeric")
    min_value: float | None = None
    max_value: float | None = None
    mean_value: float | None = None
    median_value: float | None = None
    stddev_value: float | None = None

    # string stats (populated when dtype == "string")
    min_length: int | None = None
    max_length: int | None = None
    avg_length: float | None = None

    # top values
    top_values: list[tuple[Any, int]] = field(default_factory=list)

    @property
    def null_rate(self) -> float:
        """Fraction of values that are ``None`` (0.0–1.0)."""
        return self.null_count / self.total_count if self.total_count else 0.0

    @property
    def uniqueness(self) -> float:
        """Ratio of unique non-null values to total non-null values."""
        non_null = self.total_count - self.null_count
        return self.unique_count / non_null if non_null else 0.0

null_rate property

Fraction of values that are None (0.0–1.0).

uniqueness property

Ratio of unique non-null values to total non-null values.

DataProfiler

Profile a list of dict records in pure Python (no pandas needed).

Source code in packages/dataenginex/src/dataenginex/data/profiler.py
class DataProfiler:
    """Profile a list of dict records in pure Python (no pandas needed)."""

    def profile(
        self,
        records: list[dict[str, Any]],
        dataset_name: str = "unnamed",
    ) -> ProfileReport:
        """Profile a list of dict records and return column-level statistics."""
        import time

        start = time.perf_counter()

        if not records:
            return ProfileReport(
                dataset_name=dataset_name,
                record_count=0,
                column_count=0,
                columns=[],
            )

        # Discover all keys across all records
        all_keys: list[str] = []
        seen: set[str] = set()
        for rec in records:
            for k in rec:
                if k not in seen:
                    all_keys.append(k)
                    seen.add(k)

        columns = [self._profile_column(key, records) for key in all_keys]
        duration = (time.perf_counter() - start) * 1000

        return ProfileReport(
            dataset_name=dataset_name,
            record_count=len(records),
            column_count=len(columns),
            columns=columns,
            duration_ms=round(duration, 2),
        )

    # ------------------------------------------------------------------

    @staticmethod
    def _profile_column(key: str, records: list[dict[str, Any]]) -> ColumnProfile:
        values = [r.get(key) for r in records]
        total = len(values)
        non_null = [v for v in values if v is not None]
        null_count = total - len(non_null)

        # Determine dominant type
        dtype = DataProfiler._infer_dtype(non_null)
        counter: Counter[Any] = Counter(str(v) for v in non_null)
        top_values = counter.most_common(5)
        unique_count = len(counter)

        col = ColumnProfile(
            name=key,
            dtype=dtype,
            total_count=total,
            null_count=null_count,
            unique_count=unique_count,
            top_values=top_values,
        )

        if dtype == "numeric":
            nums = [float(v) for v in non_null if DataProfiler._is_numeric(v)]
            if nums:
                col.min_value = min(nums)
                col.max_value = max(nums)
                col.mean_value = statistics.mean(nums)
                col.median_value = statistics.median(nums)
                col.stddev_value = statistics.stdev(nums) if len(nums) > 1 else 0.0

        elif dtype == "string":
            lengths = [len(str(v)) for v in non_null]
            if lengths:
                col.min_length = min(lengths)
                col.max_length = max(lengths)
                col.avg_length = statistics.mean(lengths)

        return col

    @staticmethod
    def _infer_dtype(values: list[Any]) -> str:
        if not values:
            return "null"
        types: set[str] = set()
        for v in values:
            if isinstance(v, bool):
                types.add("boolean")
            elif isinstance(v, (int, float)):
                types.add("numeric")
            elif isinstance(v, str):
                types.add("string")
            else:
                types.add("mixed")
        if len(types) == 1:
            return types.pop()
        return "mixed"

    @staticmethod
    def _is_numeric(value: Any) -> bool:
        if isinstance(value, bool):
            return False
        if isinstance(value, (int, float)):
            return True
        try:
            float(value)
            return True
        except (ValueError, TypeError):
            return False

profile(records, dataset_name='unnamed')

Profile a list of dict records and return column-level statistics.

Source code in packages/dataenginex/src/dataenginex/data/profiler.py
def profile(
    self,
    records: list[dict[str, Any]],
    dataset_name: str = "unnamed",
) -> ProfileReport:
    """Profile a list of dict records and return column-level statistics."""
    import time

    start = time.perf_counter()

    if not records:
        return ProfileReport(
            dataset_name=dataset_name,
            record_count=0,
            column_count=0,
            columns=[],
        )

    # Discover all keys across all records
    all_keys: list[str] = []
    seen: set[str] = set()
    for rec in records:
        for k in rec:
            if k not in seen:
                all_keys.append(k)
                seen.add(k)

    columns = [self._profile_column(key, records) for key in all_keys]
    duration = (time.perf_counter() - start) * 1000

    return ProfileReport(
        dataset_name=dataset_name,
        record_count=len(records),
        column_count=len(columns),
        columns=columns,
        duration_ms=round(duration, 2),
    )

ProfileReport dataclass

Aggregated profiling report for a dataset.

Attributes:

Name Type Description
dataset_name str

Name of the profiled dataset.

record_count int

Total number of records profiled.

column_count int

Number of columns/fields discovered.

columns list[ColumnProfile]

Per-column profile statistics.

profiled_at datetime

Timestamp when profiling was performed.

duration_ms float

Time taken for profiling in milliseconds.

Source code in packages/dataenginex/src/dataenginex/data/profiler.py
@dataclass
class ProfileReport:
    """Aggregated profiling report for a dataset.

    Attributes:
        dataset_name: Name of the profiled dataset.
        record_count: Total number of records profiled.
        column_count: Number of columns/fields discovered.
        columns: Per-column profile statistics.
        profiled_at: Timestamp when profiling was performed.
        duration_ms: Time taken for profiling in milliseconds.
    """

    dataset_name: str
    record_count: int
    column_count: int
    columns: list[ColumnProfile]
    profiled_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    duration_ms: float = 0.0

    def to_dict(self) -> dict[str, Any]:
        """Serialize the profile report to a plain dictionary."""
        return {
            "dataset_name": self.dataset_name,
            "record_count": self.record_count,
            "column_count": self.column_count,
            "profiled_at": self.profiled_at.isoformat(),
            "duration_ms": self.duration_ms,
            "columns": [
                {
                    "name": c.name,
                    "dtype": c.dtype,
                    "total_count": c.total_count,
                    "null_count": c.null_count,
                    "null_rate": round(c.null_rate, 4),
                    "unique_count": c.unique_count,
                    "uniqueness": round(c.uniqueness, 4),
                    "min_value": c.min_value,
                    "max_value": c.max_value,
                    "mean_value": round(c.mean_value, 4) if c.mean_value is not None else None,
                    "median_value": c.median_value,
                    "stddev_value": (
                        round(c.stddev_value, 4) if c.stddev_value is not None else None
                    ),
                    "min_length": c.min_length,
                    "max_length": c.max_length,
                    "avg_length": round(c.avg_length, 2) if c.avg_length is not None else None,
                    "top_values": c.top_values[:5],
                }
                for c in self.columns
            ],
        }

    @property
    def completeness(self) -> float:
        """Overall dataset completeness (1 − avg null rate)."""
        if not self.columns:
            return 1.0
        avg_null = statistics.mean(c.null_rate for c in self.columns)
        return round(1.0 - avg_null, 4)

completeness property

Overall dataset completeness (1 − avg null rate).

to_dict()

Serialize the profile report to a plain dictionary.

Source code in packages/dataenginex/src/dataenginex/data/profiler.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the profile report to a plain dictionary."""
    return {
        "dataset_name": self.dataset_name,
        "record_count": self.record_count,
        "column_count": self.column_count,
        "profiled_at": self.profiled_at.isoformat(),
        "duration_ms": self.duration_ms,
        "columns": [
            {
                "name": c.name,
                "dtype": c.dtype,
                "total_count": c.total_count,
                "null_count": c.null_count,
                "null_rate": round(c.null_rate, 4),
                "unique_count": c.unique_count,
                "uniqueness": round(c.uniqueness, 4),
                "min_value": c.min_value,
                "max_value": c.max_value,
                "mean_value": round(c.mean_value, 4) if c.mean_value is not None else None,
                "median_value": c.median_value,
                "stddev_value": (
                    round(c.stddev_value, 4) if c.stddev_value is not None else None
                ),
                "min_length": c.min_length,
                "max_length": c.max_length,
                "avg_length": round(c.avg_length, 2) if c.avg_length is not None else None,
                "top_values": c.top_values[:5],
            }
            for c in self.columns
        ],
    }

SchemaRegistry

In-process schema registry backed by an optional JSON file.

Parameters

persist_path: If given, schemas are saved/loaded from this JSON file so they survive across process restarts.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
class SchemaRegistry:
    """In-process schema registry backed by an optional JSON file.

    Parameters
    ----------
    persist_path:
        If given, schemas are saved/loaded from this JSON file so they
        survive across process restarts.
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        # schema_name → [SchemaVersion …] (ordered oldest → newest)
        self._schemas: dict[str, list[SchemaVersion]] = {}
        self._persist_path = Path(persist_path) if persist_path else None
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def register(self, schema: SchemaVersion) -> SchemaVersion:
        """Register a new schema version.  Duplicate versions are rejected."""
        versions = self._schemas.setdefault(schema.name, [])
        existing = {v.version for v in versions}
        if schema.version in existing:
            raise ValueError(f"Schema {schema.name!r} version {schema.version} already registered")
        versions.append(schema)
        logger.info("Registered schema %s v%s", schema.name, schema.version)
        self._save()
        return schema

    def get_latest(self, name: str) -> SchemaVersion | None:
        """Return the most recently registered version for *name*."""
        versions = self._schemas.get(name)
        if not versions:
            return None
        return versions[-1]

    def get_version(self, name: str, version: str) -> SchemaVersion | None:
        """Return a specific version, or *None* if not found."""
        for v in self._schemas.get(name, []):
            if v.version == version:
                return v
        return None

    def list_schemas(self) -> list[str]:
        """Return all registered schema names."""
        return list(self._schemas.keys())

    def list_versions(self, name: str) -> list[str]:
        """Return all registered versions for *name* (oldest first)."""
        return [v.version for v in self._schemas.get(name, [])]

    def validate(
        self, name: str, record: dict[str, Any], version: str | None = None
    ) -> tuple[bool, list[str]]:
        """Validate *record* against a schema.

        If *version* is ``None`` the latest version is used.
        """
        schema = self.get_version(name, version) if version else self.get_latest(name)
        if schema is None:
            return False, [f"Schema {name!r} (version={version}) not found"]
        return schema.validate_record(record)

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        data: dict[str, list[dict[str, Any]]] = {}
        for name, versions in self._schemas.items():
            data[name] = [v.to_dict() for v in versions]
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        raw = json.loads(self._persist_path.read_text())
        for name, versions in raw.items():
            self._schemas[name] = [
                SchemaVersion(
                    name=v["name"],
                    version=v["version"],
                    fields=v["fields"],
                    required_fields=v.get("required_fields", []),
                    description=v.get("description", ""),
                    metadata=v.get("metadata", {}),
                )
                for v in versions
            ]
        logger.info("Loaded %d schemas from %s", len(self._schemas), self._persist_path)

register(schema)

Register a new schema version. Duplicate versions are rejected.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def register(self, schema: SchemaVersion) -> SchemaVersion:
    """Register a new schema version.  Duplicate versions are rejected."""
    versions = self._schemas.setdefault(schema.name, [])
    existing = {v.version for v in versions}
    if schema.version in existing:
        raise ValueError(f"Schema {schema.name!r} version {schema.version} already registered")
    versions.append(schema)
    logger.info("Registered schema %s v%s", schema.name, schema.version)
    self._save()
    return schema

get_latest(name)

Return the most recently registered version for name.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def get_latest(self, name: str) -> SchemaVersion | None:
    """Return the most recently registered version for *name*."""
    versions = self._schemas.get(name)
    if not versions:
        return None
    return versions[-1]

get_version(name, version)

Return a specific version, or None if not found.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def get_version(self, name: str, version: str) -> SchemaVersion | None:
    """Return a specific version, or *None* if not found."""
    for v in self._schemas.get(name, []):
        if v.version == version:
            return v
    return None

list_schemas()

Return all registered schema names.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def list_schemas(self) -> list[str]:
    """Return all registered schema names."""
    return list(self._schemas.keys())

list_versions(name)

Return all registered versions for name (oldest first).

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def list_versions(self, name: str) -> list[str]:
    """Return all registered versions for *name* (oldest first)."""
    return [v.version for v in self._schemas.get(name, [])]

validate(name, record, version=None)

Validate record against a schema.

If version is None the latest version is used.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def validate(
    self, name: str, record: dict[str, Any], version: str | None = None
) -> tuple[bool, list[str]]:
    """Validate *record* against a schema.

    If *version* is ``None`` the latest version is used.
    """
    schema = self.get_version(name, version) if version else self.get_latest(name)
    if schema is None:
        return False, [f"Schema {name!r} (version={version}) not found"]
    return schema.validate_record(record)

SchemaVersion dataclass

An immutable snapshot of a schema at a particular version.

Attributes:

Name Type Description
name str

Schema identifier (e.g. "job_posting").

version str

Semver string (e.g. "1.2.0").

fields dict[str, str]

Mapping of field names to type descriptions.

required_fields list[str]

Fields that must be present in every record.

description str

Human-readable summary of the schema.

created_at datetime

When this version was registered.

metadata dict[str, Any]

Extra context dict.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
@dataclass
class SchemaVersion:
    """An immutable snapshot of a schema at a particular version.

    Attributes:
        name: Schema identifier (e.g. ``"job_posting"``).
        version: Semver string (e.g. ``"1.2.0"``).
        fields: Mapping of field names to type descriptions.
        required_fields: Fields that must be present in every record.
        description: Human-readable summary of the schema.
        created_at: When this version was registered.
        metadata: Extra context dict.
    """

    name: str
    version: str  # semver string, e.g. "1.2.0"
    fields: dict[str, str]  # field_name → type_description
    required_fields: list[str] = field(default_factory=list)
    description: str = ""
    created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Serialize the schema version to a plain dictionary."""
        return {
            "name": self.name,
            "version": self.version,
            "fields": self.fields,
            "required_fields": self.required_fields,
            "description": self.description,
            "created_at": self.created_at.isoformat(),
            "metadata": self.metadata,
        }

    def validate_record(self, record: dict[str, Any]) -> tuple[bool, list[str]]:
        """Check that *record* has all required fields.

        Returns ``(is_valid, errors)`` where *errors* lists the missing
        required fields.
        """
        missing = [f for f in self.required_fields if f not in record]
        return len(missing) == 0, [f"Missing required field: {f}" for f in missing]

to_dict()

Serialize the schema version to a plain dictionary.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the schema version to a plain dictionary."""
    return {
        "name": self.name,
        "version": self.version,
        "fields": self.fields,
        "required_fields": self.required_fields,
        "description": self.description,
        "created_at": self.created_at.isoformat(),
        "metadata": self.metadata,
    }

validate_record(record)

Check that record has all required fields.

Returns (is_valid, errors) where errors lists the missing required fields.

Source code in packages/dataenginex/src/dataenginex/data/registry.py
def validate_record(self, record: dict[str, Any]) -> tuple[bool, list[str]]:
    """Check that *record* has all required fields.

    Returns ``(is_valid, errors)`` where *errors* lists the missing
    required fields.
    """
    missing = [f for f in self.required_fields if f not in record]
    return len(missing) == 0, [f"Missing required field: {f}" for f in missing]