Skip to content

dataenginex.core

Core framework — schemas, validators, medallion architecture, quality.

Domain-specific symbols should live in the application package (e.g. myapp.core).

Public API::

from dataenginex.core import (
    # Medallion
    MedallionArchitecture, DataLayer, StorageFormat, LayerConfiguration,
    StorageBackend, LocalParquetStorage, BigQueryStorage, DualStorage,
    DataLineage,
    # Quality
    QualityGate, QualityStore, QualityResult, QualityDimension,
    # Schemas (generic API)
    ErrorDetail, ErrorResponse, RootResponse, HealthResponse,
    StartupResponse, ComponentStatus, ReadinessResponse,
    EchoRequest, EchoResponse,
    # Validators (generic)
    DataQualityChecks, ValidationReport,
)

BigQueryStorage

Bases: StorageBackend

BigQuery cloud storage — re-exported from :mod:dataenginex.lakehouse.storage.

This is a backwards-compatibility shim. The real implementation lives in dataenginex.lakehouse.storage.BigQueryStorage.

Source code in src/dataenginex/core/medallion_architecture.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class BigQueryStorage(StorageBackend):
    """BigQuery cloud storage — re-exported from :mod:`dataenginex.lakehouse.storage`.

    This is a backwards-compatibility shim.  The real implementation
    lives in ``dataenginex.lakehouse.storage.BigQueryStorage``.
    """

    def __new__(cls, *args: Any, **kwargs: Any) -> BigQueryStorage:  # noqa: ARG003
        from dataenginex.lakehouse.storage import (
            BigQueryStorage as _RealBQ,
        )

        return _RealBQ(*args, **kwargs)  # type: ignore[return-value]

    def __init__(self, project_id: str, location: str = "US") -> None:
        # __init__ is needed for type checker / IDE hints but never runs
        # because __new__ returns a different type.
        pass  # pragma: no cover

    def write(self, data: Any, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> bool:
        raise AssertionError  # pragma: no cover

    def read(self, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> Any:
        raise AssertionError  # pragma: no cover

    def delete(self, path: str) -> bool:
        raise AssertionError  # pragma: no cover

    def list_objects(self, prefix: str = "") -> list[str]:
        raise AssertionError  # pragma: no cover

    def exists(self, path: str) -> bool:
        raise AssertionError  # pragma: no cover

DataLayer

Bases: StrEnum

Medallion architecture layers

Source code in src/dataenginex/core/medallion_architecture.py
52
53
54
55
56
57
class DataLayer(StrEnum):
    """Medallion architecture layers"""

    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"

DataLineage

Tracks data lineage through the medallion layers.

Source code in src/dataenginex/core/medallion_architecture.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
class DataLineage:
    """Tracks data lineage through the medallion layers."""

    def __init__(self) -> None:
        self.lineage: dict[str, dict[str, Any]] = {}

    def record_bronze_ingestion(self, source: str, record_count: int, timestamp: str) -> str:
        """Record data entry into Bronze layer."""
        lineage_id = f"bronze_{source}_{timestamp}"
        self.lineage[lineage_id] = {
            "layer": "bronze",
            "source": source,
            "record_count": record_count,
            "timestamp": timestamp,
            "status": "raw",
        }
        return lineage_id

    def record_silver_transformation(
        self, lineage_id: str, processed_count: int, quality_score: float
    ) -> str:
        """Record data transformation in Silver layer."""
        silver_id = f"{lineage_id}_silver"
        self.lineage[silver_id] = {
            "layer": "silver",
            "parent": lineage_id,
            "processed_count": processed_count,
            "quality_score": quality_score,
            "status": "cleaned",
        }
        return silver_id

    def record_gold_enrichment(
        self, lineage_id: str, enriched_count: int, embedding_model: str
    ) -> str:
        """Record data enrichment in Gold layer."""
        gold_id = f"{lineage_id}_gold"
        self.lineage[gold_id] = {
            "layer": "gold",
            "parent": lineage_id,
            "enriched_count": enriched_count,
            "embedding_model": embedding_model,
            "status": "enriched",
        }
        return gold_id

    def get_lineage(self, lineage_id: str) -> dict[str, Any] | None:
        """Get lineage information for a record."""
        return self.lineage.get(lineage_id)

record_bronze_ingestion(source, record_count, timestamp)

Record data entry into Bronze layer.

Source code in src/dataenginex/core/medallion_architecture.py
443
444
445
446
447
448
449
450
451
452
453
def record_bronze_ingestion(self, source: str, record_count: int, timestamp: str) -> str:
    """Record data entry into Bronze layer."""
    lineage_id = f"bronze_{source}_{timestamp}"
    self.lineage[lineage_id] = {
        "layer": "bronze",
        "source": source,
        "record_count": record_count,
        "timestamp": timestamp,
        "status": "raw",
    }
    return lineage_id

record_silver_transformation(lineage_id, processed_count, quality_score)

Record data transformation in Silver layer.

Source code in src/dataenginex/core/medallion_architecture.py
455
456
457
458
459
460
461
462
463
464
465
466
467
def record_silver_transformation(
    self, lineage_id: str, processed_count: int, quality_score: float
) -> str:
    """Record data transformation in Silver layer."""
    silver_id = f"{lineage_id}_silver"
    self.lineage[silver_id] = {
        "layer": "silver",
        "parent": lineage_id,
        "processed_count": processed_count,
        "quality_score": quality_score,
        "status": "cleaned",
    }
    return silver_id

record_gold_enrichment(lineage_id, enriched_count, embedding_model)

Record data enrichment in Gold layer.

Source code in src/dataenginex/core/medallion_architecture.py
469
470
471
472
473
474
475
476
477
478
479
480
481
def record_gold_enrichment(
    self, lineage_id: str, enriched_count: int, embedding_model: str
) -> str:
    """Record data enrichment in Gold layer."""
    gold_id = f"{lineage_id}_gold"
    self.lineage[gold_id] = {
        "layer": "gold",
        "parent": lineage_id,
        "enriched_count": enriched_count,
        "embedding_model": embedding_model,
        "status": "enriched",
    }
    return gold_id

get_lineage(lineage_id)

Get lineage information for a record.

Source code in src/dataenginex/core/medallion_architecture.py
483
484
485
def get_lineage(self, lineage_id: str) -> dict[str, Any] | None:
    """Get lineage information for a record."""
    return self.lineage.get(lineage_id)

DualStorage

Manages dual storage strategy: local Parquet + BigQuery.

Pattern: - Development/Testing: Write to local Parquet - Production/Cloud: Write to both local (backup) and BigQuery (primary)

Source code in src/dataenginex/core/medallion_architecture.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class DualStorage:
    """
    Manages dual storage strategy: local Parquet + BigQuery.

    Pattern:
    - Development/Testing: Write to local Parquet
    - Production/Cloud: Write to both local (backup) and BigQuery (primary)
    """

    def __init__(
        self,
        local_base_path: str = "data",
        bigquery_project: str | None = None,
        enable_bigquery: bool = False,
    ):
        self.local_storage = LocalParquetStorage(local_base_path)
        self.bigquery_storage = None

        if enable_bigquery and bigquery_project:
            self.bigquery_storage = BigQueryStorage(bigquery_project)
            logger.info("dual storage enabled: local parquet + bigquery")
        else:
            logger.info("storage mode: local parquet only")

    def _write_layer(self, layer: str, data: Any, key: str, timestamp: str) -> bool:
        """
        Write data to a medallion layer.

        Args:
            layer: Layer name (bronze, silver, gold)
            data: Data to write
            key: Source or entity type identifier
            timestamp: Timestamp string for partitioning

        Returns:
            True if local write succeeded
        """
        local_path = f"{layer}/{key}/{timestamp}"
        success = self.local_storage.write(data, local_path)

        if self.bigquery_storage and success:
            bq_path = f"{layer}.{key}_{timestamp.replace('-', '_').replace(':', '_')}"
            self.bigquery_storage.write(data, bq_path)

        return success

    def _read_layer(self, layer: str, key: str, timestamp: str) -> Any:
        """
        Read data from a medallion layer.

        Args:
            layer: Layer name (bronze, silver, gold)
            key: Source or entity type identifier
            timestamp: Timestamp string for partitioning
        """
        path = f"{layer}/{key}/{timestamp}"
        return self.local_storage.read(path)

    def write_bronze(self, data: Any, source: str, timestamp: str) -> bool:
        """Write to Bronze layer — path: bronze/{source}/{timestamp}."""
        return self._write_layer("bronze", data, source, timestamp)

    def write_silver(self, data: Any, entity_type: str, timestamp: str) -> bool:
        """Write to Silver layer — path: silver/{entity_type}/{timestamp}."""
        return self._write_layer("silver", data, entity_type, timestamp)

    def write_gold(self, data: Any, entity_type: str, timestamp: str) -> bool:
        """Write to Gold layer — path: gold/{entity_type}/{timestamp}."""
        return self._write_layer("gold", data, entity_type, timestamp)

    def read_bronze(self, source: str, timestamp: str) -> Any:
        """Read from Bronze layer."""
        return self._read_layer("bronze", source, timestamp)

    def read_silver(self, entity_type: str, timestamp: str) -> Any:
        """Read from Silver layer."""
        return self._read_layer("silver", entity_type, timestamp)

    def read_gold(self, entity_type: str, timestamp: str) -> Any:
        """Read from Gold layer."""
        return self._read_layer("gold", entity_type, timestamp)

write_bronze(data, source, timestamp)

Write to Bronze layer — path: bronze/{source}/{timestamp}.

Source code in src/dataenginex/core/medallion_architecture.py
412
413
414
def write_bronze(self, data: Any, source: str, timestamp: str) -> bool:
    """Write to Bronze layer — path: bronze/{source}/{timestamp}."""
    return self._write_layer("bronze", data, source, timestamp)

write_silver(data, entity_type, timestamp)

Write to Silver layer — path: silver/{entity_type}/{timestamp}.

Source code in src/dataenginex/core/medallion_architecture.py
416
417
418
def write_silver(self, data: Any, entity_type: str, timestamp: str) -> bool:
    """Write to Silver layer — path: silver/{entity_type}/{timestamp}."""
    return self._write_layer("silver", data, entity_type, timestamp)

write_gold(data, entity_type, timestamp)

Write to Gold layer — path: gold/{entity_type}/{timestamp}.

Source code in src/dataenginex/core/medallion_architecture.py
420
421
422
def write_gold(self, data: Any, entity_type: str, timestamp: str) -> bool:
    """Write to Gold layer — path: gold/{entity_type}/{timestamp}."""
    return self._write_layer("gold", data, entity_type, timestamp)

read_bronze(source, timestamp)

Read from Bronze layer.

Source code in src/dataenginex/core/medallion_architecture.py
424
425
426
def read_bronze(self, source: str, timestamp: str) -> Any:
    """Read from Bronze layer."""
    return self._read_layer("bronze", source, timestamp)

read_silver(entity_type, timestamp)

Read from Silver layer.

Source code in src/dataenginex/core/medallion_architecture.py
428
429
430
def read_silver(self, entity_type: str, timestamp: str) -> Any:
    """Read from Silver layer."""
    return self._read_layer("silver", entity_type, timestamp)

read_gold(entity_type, timestamp)

Read from Gold layer.

Source code in src/dataenginex/core/medallion_architecture.py
432
433
434
def read_gold(self, entity_type: str, timestamp: str) -> Any:
    """Read from Gold layer."""
    return self._read_layer("gold", entity_type, timestamp)

LayerConfiguration dataclass

Configuration for a medallion layer.

Source code in src/dataenginex/core/medallion_architecture.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class LayerConfiguration:
    """Configuration for a medallion layer."""

    layer_name: str
    description: str
    purpose: str
    storage_format: StorageFormat
    local_path: str
    bigquery_dataset: str
    retention_days: int | None
    schema_validation: bool
    quality_threshold: float
    compression: str = "snappy"

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.quality_threshold < 0 or self.quality_threshold > 1:
            raise ValueError("quality_threshold must be between 0 and 1")

__post_init__()

Validate configuration.

Source code in src/dataenginex/core/medallion_architecture.py
75
76
77
78
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.quality_threshold < 0 or self.quality_threshold > 1:
        raise ValueError("quality_threshold must be between 0 and 1")

LocalParquetStorage

Bases: StorageBackend

Local Parquet file storage backed by pyarrow.

Raises RuntimeError if pyarrow is not installed.

Source code in src/dataenginex/core/medallion_architecture.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class LocalParquetStorage(StorageBackend):
    """Local Parquet file storage backed by pyarrow.

    Raises ``RuntimeError`` if pyarrow is not installed.
    """

    def __init__(self, base_path: str = "data") -> None:
        self.base_path = base_path
        logger.info("local parquet storage initialised", path=base_path)

    @staticmethod
    def _require_pyarrow() -> Any:  # noqa: ANN401
        """Import and return the ``pyarrow.parquet`` module.

        Raises:
            RuntimeError: If pyarrow is not installed.
        """
        try:
            import pyarrow.parquet as pq  # noqa: PLC0415

            return pq
        except ImportError as exc:
            msg = (
                "pyarrow is required for LocalParquetStorage. "
                "Install it with: uv pip install pyarrow"
            )
            raise RuntimeError(msg) from exc

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write data to a local Parquet file.

        Args:
            data: List of dicts, or a pyarrow Table.
            path: Relative path from base_path.
            format: Must be ``PARQUET``.

        Returns:
            True on success.

        Raises:
            ValueError: If format is not PARQUET.
            RuntimeError: If pyarrow is not installed.
        """
        if format != StorageFormat.PARQUET:
            msg = f"LocalParquetStorage only supports PARQUET format, got {format}"
            raise ValueError(msg)

        pq = self._require_pyarrow()
        import pyarrow as pa  # noqa: PLC0415

        full_path = Path(self.base_path) / path
        full_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            if isinstance(data, list):
                table = pa.Table.from_pylist(data)
            elif hasattr(data, "schema"):
                # Already a pyarrow Table
                table = data
            else:
                msg = f"Unsupported data type: {type(data).__name__}"
                raise TypeError(msg)

            pq.write_table(table, str(full_path), compression="snappy")
            logger.info("wrote rows", count=len(table), path=str(full_path))
            return True
        except (TypeError, pa.ArrowInvalid) as exc:
            logger.error("write failed", path=path, exc=str(exc))
            return False

    def read(
        self,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> list[dict[str, Any]] | None:
        """Read data from a local Parquet file.

        Returns:
            List of record dicts, or None if the file doesn't exist.
        """
        pq = self._require_pyarrow()
        full_path = Path(self.base_path) / path

        if not full_path.exists():
            logger.warning("file not found", path=str(full_path))
            return None

        try:
            table = pq.read_table(str(full_path))
            records: list[dict[str, Any]] = table.to_pylist()
            logger.info("read rows", count=len(records), path=str(full_path))
            return records
        except Exception as exc:
            logger.error("read failed", path=path, exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete a Parquet file from disk."""
        full_path = Path(self.base_path) / path
        if not full_path.exists():
            logger.warning("cannot delete — file not found", path=str(full_path))
            return False
        try:
            full_path.unlink()
            logger.info("deleted file", path=str(full_path))
            return True
        except OSError as exc:
            logger.error("delete failed", path=path, exc=str(exc))
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List files under *prefix* relative to *base_path*."""
        target = Path(self.base_path) / prefix
        if not target.exists():
            return []
        return [str(p.relative_to(self.base_path)) for p in target.rglob("*") if p.is_file()]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists on disk."""
        return (Path(self.base_path) / path).exists()

write(data, path, format=StorageFormat.PARQUET)

Write data to a local Parquet file.

Parameters:

Name Type Description Default
data Any

List of dicts, or a pyarrow Table.

required
path str

Relative path from base_path.

required
format StorageFormat

Must be PARQUET.

PARQUET

Returns:

Type Description
bool

True on success.

Raises:

Type Description
ValueError

If format is not PARQUET.

RuntimeError

If pyarrow is not installed.

Source code in src/dataenginex/core/medallion_architecture.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write data to a local Parquet file.

    Args:
        data: List of dicts, or a pyarrow Table.
        path: Relative path from base_path.
        format: Must be ``PARQUET``.

    Returns:
        True on success.

    Raises:
        ValueError: If format is not PARQUET.
        RuntimeError: If pyarrow is not installed.
    """
    if format != StorageFormat.PARQUET:
        msg = f"LocalParquetStorage only supports PARQUET format, got {format}"
        raise ValueError(msg)

    pq = self._require_pyarrow()
    import pyarrow as pa  # noqa: PLC0415

    full_path = Path(self.base_path) / path
    full_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        if isinstance(data, list):
            table = pa.Table.from_pylist(data)
        elif hasattr(data, "schema"):
            # Already a pyarrow Table
            table = data
        else:
            msg = f"Unsupported data type: {type(data).__name__}"
            raise TypeError(msg)

        pq.write_table(table, str(full_path), compression="snappy")
        logger.info("wrote rows", count=len(table), path=str(full_path))
        return True
    except (TypeError, pa.ArrowInvalid) as exc:
        logger.error("write failed", path=path, exc=str(exc))
        return False

read(path, format=StorageFormat.PARQUET)

Read data from a local Parquet file.

Returns:

Type Description
list[dict[str, Any]] | None

List of record dicts, or None if the file doesn't exist.

Source code in src/dataenginex/core/medallion_architecture.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def read(
    self,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> list[dict[str, Any]] | None:
    """Read data from a local Parquet file.

    Returns:
        List of record dicts, or None if the file doesn't exist.
    """
    pq = self._require_pyarrow()
    full_path = Path(self.base_path) / path

    if not full_path.exists():
        logger.warning("file not found", path=str(full_path))
        return None

    try:
        table = pq.read_table(str(full_path))
        records: list[dict[str, Any]] = table.to_pylist()
        logger.info("read rows", count=len(records), path=str(full_path))
        return records
    except Exception as exc:
        logger.error("read failed", path=path, exc=str(exc))
        return None

delete(path)

Delete a Parquet file from disk.

Source code in src/dataenginex/core/medallion_architecture.py
293
294
295
296
297
298
299
300
301
302
303
304
305
def delete(self, path: str) -> bool:
    """Delete a Parquet file from disk."""
    full_path = Path(self.base_path) / path
    if not full_path.exists():
        logger.warning("cannot delete — file not found", path=str(full_path))
        return False
    try:
        full_path.unlink()
        logger.info("deleted file", path=str(full_path))
        return True
    except OSError as exc:
        logger.error("delete failed", path=path, exc=str(exc))
        return False

list_objects(prefix='')

List files under prefix relative to base_path.

Source code in src/dataenginex/core/medallion_architecture.py
307
308
309
310
311
312
def list_objects(self, prefix: str = "") -> list[str]:
    """List files under *prefix* relative to *base_path*."""
    target = Path(self.base_path) / prefix
    if not target.exists():
        return []
    return [str(p.relative_to(self.base_path)) for p in target.rglob("*") if p.is_file()]

exists(path)

Return True if path exists on disk.

Source code in src/dataenginex/core/medallion_architecture.py
314
315
316
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists on disk."""
    return (Path(self.base_path) / path).exists()

MedallionArchitecture

Manages the three-layer medallion architecture for DEX.

Source code in src/dataenginex/core/medallion_architecture.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class MedallionArchitecture:
    """Manages the three-layer medallion architecture for DEX."""

    # Bronze Layer Configuration
    BRONZE_CONFIG = LayerConfiguration(
        layer_name=DataLayer.BRONZE.value,
        description="Raw, unprocessed data from all sources",
        purpose="Preserve original data for historical reproducibility",
        storage_format=StorageFormat.PARQUET,
        local_path="data/bronze",
        bigquery_dataset="bronze",
        retention_days=90,
        schema_validation=False,
        quality_threshold=0.0,  # No quality threshold for raw data
        compression="snappy",
    )

    # Silver Layer Configuration
    SILVER_CONFIG = LayerConfiguration(
        layer_name=DataLayer.SILVER.value,
        description="Cleaned, deduplicated, validated data",
        purpose="High-quality data ready for analytics and ML",
        storage_format=StorageFormat.PARQUET,
        local_path="data/silver",
        bigquery_dataset="silver",
        retention_days=365,
        schema_validation=True,
        quality_threshold=0.75,  # >= 75% quality score
        compression="snappy",
    )

    # Gold Layer Configuration
    GOLD_CONFIG = LayerConfiguration(
        layer_name=DataLayer.GOLD.value,
        description="Enriched, aggregated data ready for ML and APIs",
        purpose="Serve AI models and customer-facing APIs",
        storage_format=StorageFormat.PARQUET,
        local_path="data/gold",
        bigquery_dataset="gold",
        retention_days=None,  # Indefinite retention
        schema_validation=True,
        quality_threshold=0.90,  # >= 90% quality score
        compression="snappy",
    )

    @classmethod
    def get_layer_config(cls, layer: DataLayer) -> LayerConfiguration | None:
        """Get configuration for a specific layer."""
        configs = {
            DataLayer.BRONZE: cls.BRONZE_CONFIG,
            DataLayer.SILVER: cls.SILVER_CONFIG,
            DataLayer.GOLD: cls.GOLD_CONFIG,
        }
        return configs.get(layer)

    @classmethod
    def get_all_layers(cls) -> list[LayerConfiguration]:
        """Get configurations for all layers in order."""
        return [cls.BRONZE_CONFIG, cls.SILVER_CONFIG, cls.GOLD_CONFIG]

get_layer_config(layer) classmethod

Get configuration for a specific layer.

Source code in src/dataenginex/core/medallion_architecture.py
126
127
128
129
130
131
132
133
134
@classmethod
def get_layer_config(cls, layer: DataLayer) -> LayerConfiguration | None:
    """Get configuration for a specific layer."""
    configs = {
        DataLayer.BRONZE: cls.BRONZE_CONFIG,
        DataLayer.SILVER: cls.SILVER_CONFIG,
        DataLayer.GOLD: cls.GOLD_CONFIG,
    }
    return configs.get(layer)

get_all_layers() classmethod

Get configurations for all layers in order.

Source code in src/dataenginex/core/medallion_architecture.py
136
137
138
139
@classmethod
def get_all_layers(cls) -> list[LayerConfiguration]:
    """Get configurations for all layers in order."""
    return [cls.BRONZE_CONFIG, cls.SILVER_CONFIG, cls.GOLD_CONFIG]

StorageBackend

Bases: ABC

Abstract storage backend interface.

All lakehouse storage implementations must subclass this and provide concrete write, read, delete, list_objects, and exists methods. The interface accepts a StorageFormat hint so backends can choose serialisation.

Built-in implementations
  • LocalParquetStorage — local Parquet files (this module)
  • BigQueryStorage — Google BigQuery tables (this module)
  • JsonStorage — JSON files (dataenginex.lakehouse.storage)
  • ParquetStorage — pyarrow-backed Parquet (dataenginex.lakehouse.storage)
  • S3Storage — AWS S3 object storage (dataenginex.lakehouse.storage)
  • GCSStorage — Google Cloud Storage (dataenginex.lakehouse.storage)
Source code in src/dataenginex/core/medallion_architecture.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class StorageBackend(ABC):
    """Abstract storage backend interface.

    All lakehouse storage implementations must subclass this and provide
    concrete ``write``, ``read``, ``delete``, ``list_objects``, and
    ``exists`` methods.  The interface accepts a ``StorageFormat`` hint
    so backends can choose serialisation.

    Built-in implementations:
        - ``LocalParquetStorage`` — local Parquet files (this module)
        - ``BigQueryStorage`` — Google BigQuery tables (this module)
        - ``JsonStorage`` — JSON files (``dataenginex.lakehouse.storage``)
        - ``ParquetStorage`` — pyarrow-backed Parquet (``dataenginex.lakehouse.storage``)
        - ``S3Storage`` — AWS S3 object storage (``dataenginex.lakehouse.storage``)
        - ``GCSStorage`` — Google Cloud Storage (``dataenginex.lakehouse.storage``)
    """

    @abstractmethod
    def write(self, data: Any, path: str, format: StorageFormat) -> bool:
        """Write *data* to *path* in the given format.

        Returns ``True`` on success, ``False`` on failure.
        """
        ...

    @abstractmethod
    def read(self, path: str, format: StorageFormat) -> Any:
        """Read data from *path*.  Returns ``None`` on failure."""
        ...

    @abstractmethod
    def delete(self, path: str) -> bool:
        """Delete the resource at *path*.  Returns ``True`` on success."""
        ...

    @abstractmethod
    def list_objects(self, prefix: str = "") -> list[str]:
        """List object paths under *prefix*.

        Returns a list of relative paths.  Empty list on failure or when
        no objects match.
        """
        ...

    @abstractmethod
    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in the backend."""
        ...

write(data, path, format) abstractmethod

Write data to path in the given format.

Returns True on success, False on failure.

Source code in src/dataenginex/core/medallion_architecture.py
159
160
161
162
163
164
165
@abstractmethod
def write(self, data: Any, path: str, format: StorageFormat) -> bool:
    """Write *data* to *path* in the given format.

    Returns ``True`` on success, ``False`` on failure.
    """
    ...

read(path, format) abstractmethod

Read data from path. Returns None on failure.

Source code in src/dataenginex/core/medallion_architecture.py
167
168
169
170
@abstractmethod
def read(self, path: str, format: StorageFormat) -> Any:
    """Read data from *path*.  Returns ``None`` on failure."""
    ...

delete(path) abstractmethod

Delete the resource at path. Returns True on success.

Source code in src/dataenginex/core/medallion_architecture.py
172
173
174
175
@abstractmethod
def delete(self, path: str) -> bool:
    """Delete the resource at *path*.  Returns ``True`` on success."""
    ...

list_objects(prefix='') abstractmethod

List object paths under prefix.

Returns a list of relative paths. Empty list on failure or when no objects match.

Source code in src/dataenginex/core/medallion_architecture.py
177
178
179
180
181
182
183
184
@abstractmethod
def list_objects(self, prefix: str = "") -> list[str]:
    """List object paths under *prefix*.

    Returns a list of relative paths.  Empty list on failure or when
    no objects match.
    """
    ...

exists(path) abstractmethod

Return True if path exists in the backend.

Source code in src/dataenginex/core/medallion_architecture.py
186
187
188
189
@abstractmethod
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in the backend."""
    ...

StorageFormat

Bases: StrEnum

Supported storage formats

Source code in src/dataenginex/core/medallion_architecture.py
43
44
45
46
47
48
49
class StorageFormat(StrEnum):
    """Supported storage formats"""

    PARQUET = "parquet"
    DELTA = "delta"
    ICEBERG = "iceberg"
    BIGQUERY = "bigquery"

QualityDimension

Bases: StrEnum

Named quality dimensions tracked by the quality framework.

Source code in src/dataenginex/core/quality.py
51
52
53
54
55
56
57
58
class QualityDimension(StrEnum):
    """Named quality dimensions tracked by the quality framework."""

    COMPLETENESS = "completeness"
    ACCURACY = "accuracy"
    CONSISTENCY = "consistency"
    TIMELINESS = "timeliness"
    UNIQUENESS = "uniqueness"

QualityGate

Orchestrates quality checks at medallion layer transitions.

The gate is domain-agnostic: callers inject a scoring function, required fields, and a uniqueness key to customise evaluation for their data model. When none are supplied the gate still computes completeness, uniqueness, and consistency but accuracy defaults to 0.0.

Parameters:

Name Type Description Default
store QualityStore | None

Optional QualityStore to persist results automatically.

None
profiler DataProfiler | None

Optional DataProfiler instance (created if omitted).

None
scorer Callable[[dict[str, Any]], float] | None

Optional callable (record) -> float returning a per-record quality score (0–1). Injected by the application.

None
required_fields set[str] | None

Field names required for the completeness check. If None, completeness defaults to 1.0 (no check).

None
uniqueness_key str

Record key used for uniqueness checks (default "id").

'id'
Source code in src/dataenginex/core/quality.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class QualityGate:
    """Orchestrates quality checks at medallion layer transitions.

    The gate is **domain-agnostic**: callers inject a scoring function,
    required fields, and a uniqueness key to customise evaluation for their
    data model.  When none are supplied the gate still computes completeness,
    uniqueness, and consistency but accuracy defaults to ``0.0``.

    Args:
        store: Optional ``QualityStore`` to persist results automatically.
        profiler: Optional ``DataProfiler`` instance (created if omitted).
        scorer: Optional callable ``(record) -> float`` returning a
            per-record quality score (0–1).  Injected by the application.
        required_fields: Field names required for the completeness check.
            If ``None``, completeness defaults to ``1.0`` (no check).
        uniqueness_key: Record key used for uniqueness checks
            (default ``"id"``).
    """

    def __init__(
        self,
        store: QualityStore | None = None,
        profiler: DataProfiler | None = None,
        *,
        scorer: Callable[[dict[str, Any]], float] | None = None,
        required_fields: set[str] | None = None,
        uniqueness_key: str = "id",
    ) -> None:
        self._store = store
        self._profiler = profiler or DataProfiler()
        self._scorer = scorer
        self._required_fields = required_fields
        self._uniqueness_key = uniqueness_key

    @property
    def store(self) -> QualityStore | None:
        """Return the attached quality store, if any."""
        return self._store

    def _evaluate_records(
        self,
        records: list[dict[str, Any]],
        effective_fields: set[str] | None,
    ) -> tuple[list[float], list[float], int, int]:
        """Score each record for completeness, accuracy, and uniqueness.

        Returns:
            Tuple of (completeness_scores, quality_scores, valid_count,
            unique_count).
        """
        completeness_scores: list[float] = []
        quality_scores: list[float] = []
        valid_count = 0
        seen_ids: set[str] = set()
        unique_count = 0

        for rec in records:
            # Completeness
            if effective_fields:
                ok, _missing = DataQualityChecks.check_completeness(
                    rec,
                    effective_fields,
                )
                completeness_scores.append(1.0 if ok else 0.0)
                if ok:
                    valid_count += 1
            else:
                completeness_scores.append(1.0)
                valid_count += 1

            # Per-record quality score via injected scorer
            quality_scores.append(
                self._scorer(rec) if self._scorer else 0.0,
            )

            # Uniqueness via configured key
            rid = str(rec.get(self._uniqueness_key, id(rec)))
            if rid not in seen_ids:
                unique_count += 1
                seen_ids.add(rid)

        return completeness_scores, quality_scores, valid_count, unique_count

    def evaluate(
        self,
        records: list[dict[str, Any]],
        layer: DataLayer,
        *,
        required_fields: set[str] | None = None,
        dataset_name: str = "batch",
    ) -> QualityResult:
        """Evaluate a batch of records against a layer's quality threshold.

        Steps performed:
        1. Profile the dataset (``DataProfiler``).
        2. Check completeness for each record (``DataQualityChecks``).
        3. Score each record via the injected ``scorer`` (accuracy).
        4. Check uniqueness across the batch.
        5. Compute per-dimension averages and overall score.
        6. Compare overall score to the layer's ``quality_threshold``.

        Args:
            records: List of record dicts to evaluate.
            layer: Target ``DataLayer`` (bronze / silver / gold).
            required_fields: Override per-call required fields.
                Falls back to constructor ``required_fields``.
            dataset_name: Name passed to the profiler.

        Returns:
            A ``QualityResult`` indicating pass/fail and detailed scores.
        """
        config = MedallionArchitecture.get_layer_config(layer)
        threshold = config.quality_threshold if config else 0.0

        if not records:
            result = QualityResult(
                passed=True,
                layer=layer.value,
                quality_score=0.0,
                threshold=threshold,
                record_count=0,
                valid_count=0,
                dimensions={d.value: 0.0 for d in QualityDimension},
            )
            if self._store:
                self._store.record(result)
            return result

        # 1. Profile
        profile = self._profiler.profile(records, dataset_name)

        # 2. Resolve required fields — call-site > constructor > skip
        effective_fields = required_fields or self._required_fields

        # 3. Per-record evaluation
        completeness_scores, quality_scores, valid_count, unique_count = self._evaluate_records(
            records, effective_fields
        )

        total = len(records)
        dim_completeness = sum(completeness_scores) / total
        dim_accuracy = sum(quality_scores) / total if self._scorer else 0.0
        dim_uniqueness = unique_count / total
        dim_consistency = profile.completeness  # proxy via null-rate
        # Timeliness: placeholder — requires timestamp analysis.
        # Callers needing real timeliness should subclass or post-process.
        dim_timeliness = 1.0

        dimensions = {
            QualityDimension.COMPLETENESS.value: dim_completeness,
            QualityDimension.ACCURACY.value: dim_accuracy,
            QualityDimension.CONSISTENCY.value: dim_consistency,
            QualityDimension.UNIQUENESS.value: dim_uniqueness,
            QualityDimension.TIMELINESS.value: dim_timeliness,
        }

        overall = sum(dimensions.values()) / len(dimensions)
        passed = overall >= threshold

        result = QualityResult(
            passed=passed,
            layer=layer.value,
            quality_score=overall,
            threshold=threshold,
            record_count=total,
            valid_count=valid_count,
            dimensions=dimensions,
            profile=profile,
        )

        if self._store:
            self._store.record(result)

        logger.info(
            "Quality gate evaluated",
            layer=layer.value,
            score=round(overall, 4),
            threshold=threshold,
            passed=passed,
            records=total,
        )

        return result

store property

Return the attached quality store, if any.

evaluate(records, layer, *, required_fields=None, dataset_name='batch')

Evaluate a batch of records against a layer's quality threshold.

Steps performed: 1. Profile the dataset (DataProfiler). 2. Check completeness for each record (DataQualityChecks). 3. Score each record via the injected scorer (accuracy). 4. Check uniqueness across the batch. 5. Compute per-dimension averages and overall score. 6. Compare overall score to the layer's quality_threshold.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of record dicts to evaluate.

required
layer DataLayer

Target DataLayer (bronze / silver / gold).

required
required_fields set[str] | None

Override per-call required fields. Falls back to constructor required_fields.

None
dataset_name str

Name passed to the profiler.

'batch'

Returns:

Type Description
QualityResult

A QualityResult indicating pass/fail and detailed scores.

Source code in src/dataenginex/core/quality.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def evaluate(
    self,
    records: list[dict[str, Any]],
    layer: DataLayer,
    *,
    required_fields: set[str] | None = None,
    dataset_name: str = "batch",
) -> QualityResult:
    """Evaluate a batch of records against a layer's quality threshold.

    Steps performed:
    1. Profile the dataset (``DataProfiler``).
    2. Check completeness for each record (``DataQualityChecks``).
    3. Score each record via the injected ``scorer`` (accuracy).
    4. Check uniqueness across the batch.
    5. Compute per-dimension averages and overall score.
    6. Compare overall score to the layer's ``quality_threshold``.

    Args:
        records: List of record dicts to evaluate.
        layer: Target ``DataLayer`` (bronze / silver / gold).
        required_fields: Override per-call required fields.
            Falls back to constructor ``required_fields``.
        dataset_name: Name passed to the profiler.

    Returns:
        A ``QualityResult`` indicating pass/fail and detailed scores.
    """
    config = MedallionArchitecture.get_layer_config(layer)
    threshold = config.quality_threshold if config else 0.0

    if not records:
        result = QualityResult(
            passed=True,
            layer=layer.value,
            quality_score=0.0,
            threshold=threshold,
            record_count=0,
            valid_count=0,
            dimensions={d.value: 0.0 for d in QualityDimension},
        )
        if self._store:
            self._store.record(result)
        return result

    # 1. Profile
    profile = self._profiler.profile(records, dataset_name)

    # 2. Resolve required fields — call-site > constructor > skip
    effective_fields = required_fields or self._required_fields

    # 3. Per-record evaluation
    completeness_scores, quality_scores, valid_count, unique_count = self._evaluate_records(
        records, effective_fields
    )

    total = len(records)
    dim_completeness = sum(completeness_scores) / total
    dim_accuracy = sum(quality_scores) / total if self._scorer else 0.0
    dim_uniqueness = unique_count / total
    dim_consistency = profile.completeness  # proxy via null-rate
    # Timeliness: placeholder — requires timestamp analysis.
    # Callers needing real timeliness should subclass or post-process.
    dim_timeliness = 1.0

    dimensions = {
        QualityDimension.COMPLETENESS.value: dim_completeness,
        QualityDimension.ACCURACY.value: dim_accuracy,
        QualityDimension.CONSISTENCY.value: dim_consistency,
        QualityDimension.UNIQUENESS.value: dim_uniqueness,
        QualityDimension.TIMELINESS.value: dim_timeliness,
    }

    overall = sum(dimensions.values()) / len(dimensions)
    passed = overall >= threshold

    result = QualityResult(
        passed=passed,
        layer=layer.value,
        quality_score=overall,
        threshold=threshold,
        record_count=total,
        valid_count=valid_count,
        dimensions=dimensions,
        profile=profile,
    )

    if self._store:
        self._store.record(result)

    logger.info(
        "Quality gate evaluated",
        layer=layer.value,
        score=round(overall, 4),
        threshold=threshold,
        passed=passed,
        records=total,
    )

    return result

QualityResult dataclass

Immutable result of evaluating a batch through a QualityGate.

Attributes:

Name Type Description
passed bool

Whether the batch met the layer's quality threshold.

layer str

Target medallion layer that was evaluated.

quality_score float

Overall quality score (0.0–1.0).

threshold float

Layer threshold the batch was compared against.

record_count int

Number of records in the batch.

valid_count int

Number of records that passed schema validation.

dimensions dict[str, float]

Per-dimension scores.

profile ProfileReport | None

Optional ProfileReport produced during evaluation.

evaluated_at datetime

Timestamp of the evaluation.

Source code in src/dataenginex/core/quality.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@dataclass(frozen=True)
class QualityResult:
    """Immutable result of evaluating a batch through a ``QualityGate``.

    Attributes:
        passed: Whether the batch met the layer's quality threshold.
        layer: Target medallion layer that was evaluated.
        quality_score: Overall quality score (0.0–1.0).
        threshold: Layer threshold the batch was compared against.
        record_count: Number of records in the batch.
        valid_count: Number of records that passed schema validation.
        dimensions: Per-dimension scores.
        profile: Optional ``ProfileReport`` produced during evaluation.
        evaluated_at: Timestamp of the evaluation.
    """

    passed: bool
    layer: str
    quality_score: float
    threshold: float
    record_count: int
    valid_count: int
    dimensions: dict[str, float]
    profile: ProfileReport | None = None
    evaluated_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    def to_dict(self) -> dict[str, Any]:
        """Serialise the result to a plain dictionary."""
        return {
            "passed": self.passed,
            "layer": self.layer,
            "quality_score": round(self.quality_score, 4),
            "threshold": self.threshold,
            "record_count": self.record_count,
            "valid_count": self.valid_count,
            "dimensions": {k: round(v, 4) for k, v in self.dimensions.items()},
            "evaluated_at": self.evaluated_at.isoformat(),
        }

to_dict()

Serialise the result to a plain dictionary.

Source code in src/dataenginex/core/quality.py
87
88
89
90
91
92
93
94
95
96
97
98
def to_dict(self) -> dict[str, Any]:
    """Serialise the result to a plain dictionary."""
    return {
        "passed": self.passed,
        "layer": self.layer,
        "quality_score": round(self.quality_score, 4),
        "threshold": self.threshold,
        "record_count": self.record_count,
        "valid_count": self.valid_count,
        "dimensions": {k: round(v, 4) for k, v in self.dimensions.items()},
        "evaluated_at": self.evaluated_at.isoformat(),
    }

QualityStore

In-memory store that accumulates quality metrics per medallion layer.

Each call to :meth:record appends a snapshot. :meth:summary returns the latest-known scores across all layers, suitable for the /api/v1/data/quality endpoint.

Source code in src/dataenginex/core/quality.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class QualityStore:
    """In-memory store that accumulates quality metrics per medallion layer.

    Each call to :meth:`record` appends a snapshot.  :meth:`summary` returns
    the latest-known scores across all layers, suitable for the
    ``/api/v1/data/quality`` endpoint.
    """

    def __init__(self) -> None:
        self._history: dict[str, list[QualityResult]] = {
            DataLayer.BRONZE: [],
            DataLayer.SILVER: [],
            DataLayer.GOLD: [],
        }

    def record(self, result: QualityResult) -> None:
        """Persist a quality result for its layer."""
        layer = result.layer
        if layer not in self._history:
            self._history[layer] = []
        self._history[layer].append(result)
        logger.info(
            "Quality result recorded",
            layer=layer,
            score=round(result.quality_score, 4),
            passed=result.passed,
        )

    def latest(self, layer: str) -> QualityResult | None:
        """Return the most recent result for *layer*, or ``None``."""
        results = self._history.get(layer, [])
        return results[-1] if results else None

    def summary(self) -> dict[str, Any]:
        """Return a quality summary across all layers.

        The shape matches the ``/api/v1/data/quality`` response contract.
        """
        layer_scores: dict[str, float] = {}
        all_dimensions: dict[str, list[float]] = {}

        for layer_name in (DataLayer.BRONZE, DataLayer.SILVER, DataLayer.GOLD):
            latest = self.latest(layer_name)
            if latest is None:
                layer_scores[layer_name] = 0.0
                continue
            layer_scores[layer_name] = round(latest.quality_score, 4)
            for dim, val in latest.dimensions.items():
                all_dimensions.setdefault(dim, []).append(val)

        # Average each dimension across layers that have data
        avg_dimensions: dict[str, float] = {}
        for dim, vals in all_dimensions.items():
            avg_dimensions[dim] = round(sum(vals) / len(vals), 4) if vals else 0.0

        # Ensure all standard dimensions are present
        for dim in QualityDimension:
            avg_dimensions.setdefault(dim.value, 0.0)

        overall = round(sum(layer_scores.values()) / len(layer_scores), 4) if layer_scores else 0.0

        return {
            "overall_score": overall,
            "dimensions": avg_dimensions,
            "layer_scores": layer_scores,
        }

    def history(self, layer: str, limit: int = 10) -> list[dict[str, Any]]:
        """Return the last *limit* results for *layer* as dicts."""
        results = self._history.get(layer, [])
        return [r.to_dict() for r in results[-limit:]]

record(result)

Persist a quality result for its layer.

Source code in src/dataenginex/core/quality.py
116
117
118
119
120
121
122
123
124
125
126
127
def record(self, result: QualityResult) -> None:
    """Persist a quality result for its layer."""
    layer = result.layer
    if layer not in self._history:
        self._history[layer] = []
    self._history[layer].append(result)
    logger.info(
        "Quality result recorded",
        layer=layer,
        score=round(result.quality_score, 4),
        passed=result.passed,
    )

latest(layer)

Return the most recent result for layer, or None.

Source code in src/dataenginex/core/quality.py
129
130
131
132
def latest(self, layer: str) -> QualityResult | None:
    """Return the most recent result for *layer*, or ``None``."""
    results = self._history.get(layer, [])
    return results[-1] if results else None

summary()

Return a quality summary across all layers.

The shape matches the /api/v1/data/quality response contract.

Source code in src/dataenginex/core/quality.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def summary(self) -> dict[str, Any]:
    """Return a quality summary across all layers.

    The shape matches the ``/api/v1/data/quality`` response contract.
    """
    layer_scores: dict[str, float] = {}
    all_dimensions: dict[str, list[float]] = {}

    for layer_name in (DataLayer.BRONZE, DataLayer.SILVER, DataLayer.GOLD):
        latest = self.latest(layer_name)
        if latest is None:
            layer_scores[layer_name] = 0.0
            continue
        layer_scores[layer_name] = round(latest.quality_score, 4)
        for dim, val in latest.dimensions.items():
            all_dimensions.setdefault(dim, []).append(val)

    # Average each dimension across layers that have data
    avg_dimensions: dict[str, float] = {}
    for dim, vals in all_dimensions.items():
        avg_dimensions[dim] = round(sum(vals) / len(vals), 4) if vals else 0.0

    # Ensure all standard dimensions are present
    for dim in QualityDimension:
        avg_dimensions.setdefault(dim.value, 0.0)

    overall = round(sum(layer_scores.values()) / len(layer_scores), 4) if layer_scores else 0.0

    return {
        "overall_score": overall,
        "dimensions": avg_dimensions,
        "layer_scores": layer_scores,
    }

history(layer, limit=10)

Return the last limit results for layer as dicts.

Source code in src/dataenginex/core/quality.py
168
169
170
171
def history(self, layer: str, limit: int = 10) -> list[dict[str, Any]]:
    """Return the last *limit* results for *layer* as dicts."""
    results = self._history.get(layer, [])
    return [r.to_dict() for r in results[-limit:]]

ComponentStatus

Bases: BaseModel

Health status of a single dependency component.

Source code in src/dataenginex/core/schemas.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ComponentStatus(BaseModel):
    """Health status of a single dependency component."""

    name: str
    status: str
    message: str | None = None
    duration_ms: float | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "database",
                    "status": "healthy",
                    "message": "reachable",
                    "duration_ms": 12.5,
                }
            ]
        }
    }

EchoRequest

Bases: BaseModel

Request body for the /echo debug endpoint.

Source code in src/dataenginex/core/schemas.py
153
154
155
156
157
158
159
class EchoRequest(BaseModel):
    """Request body for the ``/echo`` debug endpoint."""

    message: str = Field(min_length=1)
    count: int = Field(default=1, ge=1, le=10)

    model_config = {"json_schema_extra": {"examples": [{"message": "hello", "count": 2}]}}

EchoResponse

Bases: BaseModel

Response body for the /echo debug endpoint.

Source code in src/dataenginex/core/schemas.py
162
163
164
165
166
167
168
169
170
171
172
173
class EchoResponse(BaseModel):
    """Response body for the ``/echo`` debug endpoint."""

    message: str
    count: int
    echo: list[str]

    model_config = {
        "json_schema_extra": {
            "examples": [{"message": "hello", "count": 2, "echo": ["hello", "hello"]}]
        }
    }

ErrorDetail

Bases: BaseModel

Detail of a single validation or processing error.

Source code in src/dataenginex/core/schemas.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class ErrorDetail(BaseModel):
    """Detail of a single validation or processing error."""

    field: str | None = None
    message: str
    type: str | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "field": "message",
                    "message": "String should have at least 1 character",
                    "type": "string_too_short",
                }
            ]
        }
    }

ErrorResponse

Bases: BaseModel

Standard error response returned by all API error handlers.

Source code in src/dataenginex/core/schemas.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class ErrorResponse(BaseModel):
    """Standard error response returned by all API error handlers."""

    error: str
    message: str
    request_id: str | None = None
    details: list[ErrorDetail] | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "error": "validation_error",
                    "message": "Request validation failed",
                    "request_id": "1f9b6b1c-5b90-4c6c-8d2a-1f28f6f0b5a1",
                    "details": [
                        {
                            "field": "message",
                            "message": "String should have at least 1 character",
                            "type": "string_too_short",
                        }
                    ],
                }
            ]
        }
    }

HealthResponse

Bases: BaseModel

Response body for the /health liveness probe.

Source code in src/dataenginex/core/schemas.py
36
37
38
39
40
41
class HealthResponse(BaseModel):
    """Response body for the ``/health`` liveness probe."""

    status: str

    model_config = {"json_schema_extra": {"examples": [{"status": "alive"}]}}

ReadinessResponse

Bases: BaseModel

Response body for the /ready readiness probe.

Source code in src/dataenginex/core/schemas.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class ReadinessResponse(BaseModel):
    """Response body for the ``/ready`` readiness probe."""

    status: str
    components: list[ComponentStatus]

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "status": "ready",
                    "components": [
                        {
                            "name": "database",
                            "status": "healthy",
                            "message": "reachable",
                            "duration_ms": 12.5,
                        },
                        {
                            "name": "cache",
                            "status": "skipped",
                            "message": "cache not configured",
                            "duration_ms": None,
                        },
                    ],
                }
            ]
        }
    }

RootResponse

Bases: BaseModel

Response body for the root / endpoint.

Source code in src/dataenginex/core/schemas.py
25
26
27
28
29
30
31
32
33
class RootResponse(BaseModel):
    """Response body for the root ``/`` endpoint."""

    message: str
    version: str

    model_config = {
        "json_schema_extra": {"examples": [{"message": "DataEngineX API", "version": "0.1.0"}]}
    }

StartupResponse

Bases: BaseModel

Response body for the /startup probe.

Source code in src/dataenginex/core/schemas.py
44
45
46
47
48
49
class StartupResponse(BaseModel):
    """Response body for the ``/startup`` probe."""

    status: str

    model_config = {"json_schema_extra": {"examples": [{"status": "started"}]}}

DataQualityChecks

Generic data quality checks — not tied to any domain schema.

Source code in src/dataenginex/core/validators.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class DataQualityChecks:
    """Generic data quality checks — not tied to any domain schema."""

    @staticmethod
    def check_completeness(
        record: dict[str, Any],
        required_fields: set[str],
    ) -> tuple[bool, list[str]]:
        """Check that all required fields are present and non-null.

        Args:
            record: Data record to check.
            required_fields: Set of field names that must be present.

        Returns:
            Tuple of (is_complete, missing_fields).
        """
        missing = [
            field for field in required_fields if field not in record or record[field] is None
        ]
        return len(missing) == 0, missing

    @staticmethod
    def check_consistency_dates(
        posted_date: datetime,
        last_modified_date: datetime,
        expiration_date: datetime | None = None,
    ) -> tuple[bool, list[str]]:
        """Check temporal consistency of dates.

        Returns:
            Tuple of (is_consistent, issues).
        """
        issues: list[str] = []

        if posted_date > last_modified_date:
            issues.append("Posted date is after last modified date")

        if expiration_date and expiration_date < posted_date:
            issues.append("Expiration date is before posted date")

        if posted_date > datetime.now(tz=UTC):
            issues.append("Posted date is in the future")

        return len(issues) == 0, issues

check_completeness(record, required_fields) staticmethod

Check that all required fields are present and non-null.

Parameters:

Name Type Description Default
record dict[str, Any]

Data record to check.

required
required_fields set[str]

Set of field names that must be present.

required

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_complete, missing_fields).

Source code in src/dataenginex/core/validators.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@staticmethod
def check_completeness(
    record: dict[str, Any],
    required_fields: set[str],
) -> tuple[bool, list[str]]:
    """Check that all required fields are present and non-null.

    Args:
        record: Data record to check.
        required_fields: Set of field names that must be present.

    Returns:
        Tuple of (is_complete, missing_fields).
    """
    missing = [
        field for field in required_fields if field not in record or record[field] is None
    ]
    return len(missing) == 0, missing

check_consistency_dates(posted_date, last_modified_date, expiration_date=None) staticmethod

Check temporal consistency of dates.

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_consistent, issues).

Source code in src/dataenginex/core/validators.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@staticmethod
def check_consistency_dates(
    posted_date: datetime,
    last_modified_date: datetime,
    expiration_date: datetime | None = None,
) -> tuple[bool, list[str]]:
    """Check temporal consistency of dates.

    Returns:
        Tuple of (is_consistent, issues).
    """
    issues: list[str] = []

    if posted_date > last_modified_date:
        issues.append("Posted date is after last modified date")

    if expiration_date and expiration_date < posted_date:
        issues.append("Expiration date is before posted date")

    if posted_date > datetime.now(tz=UTC):
        issues.append("Posted date is in the future")

    return len(issues) == 0, issues

ValidationReport

Generates validation reports for data quality assessment.

Source code in src/dataenginex/core/validators.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ValidationReport:
    """Generates validation reports for data quality assessment."""

    def __init__(self) -> None:
        self.total_records = 0
        self.valid_records = 0
        self.invalid_records = 0
        self.errors: list[dict[str, Any]] = []
        self.warnings: list[dict[str, Any]] = []

    def add_error(self, record_id: str, error_type: str, message: str) -> None:
        """Record a validation error."""
        self.invalid_records += 1
        self.errors.append({"record_id": record_id, "type": error_type, "message": message})

    def add_warning(self, record_id: str, warning_type: str, message: str) -> None:
        """Record a validation warning."""
        self.warnings.append({"record_id": record_id, "type": warning_type, "message": message})

    def mark_valid(self) -> None:
        """Mark a record as valid."""
        self.valid_records += 1

    def finalize(self) -> dict[str, Any]:
        """Generate final validation report."""
        total = self.valid_records + self.invalid_records
        valid_pct = (self.valid_records / total * 100) if total > 0 else 0

        return {
            "total_records": total,
            "valid_records": self.valid_records,
            "invalid_records": self.invalid_records,
            "validity_percentage": valid_pct,
            "error_count": len(self.errors),
            "warning_count": len(self.warnings),
            "errors": self.errors[:100],  # Top 100 errors
            "warnings": self.warnings[:100],  # Top 100 warnings
        }

add_error(record_id, error_type, message)

Record a validation error.

Source code in src/dataenginex/core/validators.py
78
79
80
81
def add_error(self, record_id: str, error_type: str, message: str) -> None:
    """Record a validation error."""
    self.invalid_records += 1
    self.errors.append({"record_id": record_id, "type": error_type, "message": message})

add_warning(record_id, warning_type, message)

Record a validation warning.

Source code in src/dataenginex/core/validators.py
83
84
85
def add_warning(self, record_id: str, warning_type: str, message: str) -> None:
    """Record a validation warning."""
    self.warnings.append({"record_id": record_id, "type": warning_type, "message": message})

mark_valid()

Mark a record as valid.

Source code in src/dataenginex/core/validators.py
87
88
89
def mark_valid(self) -> None:
    """Mark a record as valid."""
    self.valid_records += 1

finalize()

Generate final validation report.

Source code in src/dataenginex/core/validators.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def finalize(self) -> dict[str, Any]:
    """Generate final validation report."""
    total = self.valid_records + self.invalid_records
    valid_pct = (self.valid_records / total * 100) if total > 0 else 0

    return {
        "total_records": total,
        "valid_records": self.valid_records,
        "invalid_records": self.invalid_records,
        "validity_percentage": valid_pct,
        "error_count": len(self.errors),
        "warning_count": len(self.warnings),
        "errors": self.errors[:100],  # Top 100 errors
        "warnings": self.warnings[:100],  # Top 100 warnings
    }