Skip to content

dataenginex.core

Schemas, validators, medallion architecture, and pipeline configuration.

dataenginex.core

Core framework — schemas, validators, medallion architecture, pipeline config, quality.

Public API::

from dataenginex.core import (
    # Medallion
    MedallionArchitecture, DataLayer, StorageFormat, LayerConfiguration,
    StorageBackend, LocalParquetStorage, BigQueryStorage, DualStorage, DataLineage,
    # Pipeline
    PipelineConfig, PipelineMetrics,
    # Quality
    QualityGate, QualityStore, QualityResult, QualityDimension,
    # Schemas
    JobPosting, JobSourceEnum, UserProfile,
    ErrorDetail, ErrorResponse, RootResponse, HealthResponse,
    StartupResponse, ComponentStatus, ReadinessResponse,
    EchoRequest, EchoResponse,
    DataQualityReport, PipelineExecutionMetadata,
    # Validators
    SchemaValidator, DataQualityChecks, DataHash,
    QualityScorer, ValidationReport,
)

BigQueryStorage

Bases: StorageBackend

BigQuery cloud storage implementation.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class BigQueryStorage(StorageBackend):
    """BigQuery cloud storage implementation."""

    def __init__(self, project_id: str, location: str = "US"):
        self.project_id = project_id
        self.location = location
        logger.info(f"Initialized BigQuery storage for project {project_id}")

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.BIGQUERY,
    ) -> bool:
        """
        Write data to BigQuery table.

        Path format: "dataset.table"

        Args:
            data: Data to write (dataframe or dict records)
            path: BigQuery path as "dataset.table"
            format: Storage format

        Returns:
            True if successful, False otherwise
        """
        if format != StorageFormat.BIGQUERY:
            logger.error(f"BigQueryStorage should use BIGQUERY format, got {format}")
            return False

        try:
            logger.info(f"Writing data to BigQuery {path}")
            # Implementation: Use google.cloud.bigquery to write
            # client.load_table_from_dataframe(df, path).result()
            return True
        except Exception as e:
            logger.error(f"Failed to write to BigQuery {path}: {e}")
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> Any:
        """
        Read data from BigQuery table.

        Args:
            path: BigQuery path as "dataset.table"
            format: Storage format

        Returns:
            Data read from table, or None if failed
        """
        try:
            logger.info(f"Reading data from BigQuery {path}")
            # Implementation: Use google.cloud.bigquery to read
            # df = client.query(
            #     f"SELECT * FROM `{self.project_id}.{path}`"
            # ).to_dataframe()
            return None
        except Exception as e:
            logger.error(f"Failed to read from BigQuery {path}: {e}")
            return None

    def delete(self, path: str) -> bool:
        """Delete BigQuery table."""
        try:
            logger.info(f"Deleting BigQuery table {path}")
            # Implementation: Use google.cloud.bigquery to delete
            # client.delete_table(f"{self.project_id}.{path}")
            return True
        except Exception as e:
            logger.error(f"Failed to delete from BigQuery {path}: {e}")
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List BigQuery tables matching *prefix* (stub)."""
        logger.info("Listing BigQuery tables with prefix %s", prefix)
        return []

    def exists(self, path: str) -> bool:
        """Check if BigQuery table exists (stub)."""
        logger.info("Checking BigQuery table existence: %s", path)
        return False

write(data, path, format=StorageFormat.BIGQUERY)

Write data to BigQuery table.

Path format: "dataset.table"

Parameters:

Name Type Description Default
data Any

Data to write (dataframe or dict records)

required
path str

BigQuery path as "dataset.table"

required
format StorageFormat

Storage format

BIGQUERY

Returns:

Type Description
bool

True if successful, False otherwise

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.BIGQUERY,
) -> bool:
    """
    Write data to BigQuery table.

    Path format: "dataset.table"

    Args:
        data: Data to write (dataframe or dict records)
        path: BigQuery path as "dataset.table"
        format: Storage format

    Returns:
        True if successful, False otherwise
    """
    if format != StorageFormat.BIGQUERY:
        logger.error(f"BigQueryStorage should use BIGQUERY format, got {format}")
        return False

    try:
        logger.info(f"Writing data to BigQuery {path}")
        # Implementation: Use google.cloud.bigquery to write
        # client.load_table_from_dataframe(df, path).result()
        return True
    except Exception as e:
        logger.error(f"Failed to write to BigQuery {path}: {e}")
        return False

read(path, format=StorageFormat.BIGQUERY)

Read data from BigQuery table.

Parameters:

Name Type Description Default
path str

BigQuery path as "dataset.table"

required
format StorageFormat

Storage format

BIGQUERY

Returns:

Type Description
Any

Data read from table, or None if failed

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def read(self, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> Any:
    """
    Read data from BigQuery table.

    Args:
        path: BigQuery path as "dataset.table"
        format: Storage format

    Returns:
        Data read from table, or None if failed
    """
    try:
        logger.info(f"Reading data from BigQuery {path}")
        # Implementation: Use google.cloud.bigquery to read
        # df = client.query(
        #     f"SELECT * FROM `{self.project_id}.{path}`"
        # ).to_dataframe()
        return None
    except Exception as e:
        logger.error(f"Failed to read from BigQuery {path}: {e}")
        return None

delete(path)

Delete BigQuery table.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def delete(self, path: str) -> bool:
    """Delete BigQuery table."""
    try:
        logger.info(f"Deleting BigQuery table {path}")
        # Implementation: Use google.cloud.bigquery to delete
        # client.delete_table(f"{self.project_id}.{path}")
        return True
    except Exception as e:
        logger.error(f"Failed to delete from BigQuery {path}: {e}")
        return False

list_objects(prefix='')

List BigQuery tables matching prefix (stub).

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List BigQuery tables matching *prefix* (stub)."""
    logger.info("Listing BigQuery tables with prefix %s", prefix)
    return []

exists(path)

Check if BigQuery table exists (stub).

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def exists(self, path: str) -> bool:
    """Check if BigQuery table exists (stub)."""
    logger.info("Checking BigQuery table existence: %s", path)
    return False

DataLayer

Bases: StrEnum

Medallion architecture layers

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class DataLayer(StrEnum):
    """Medallion architecture layers"""

    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"

DataLineage

Tracks data lineage through the medallion layers.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class DataLineage:
    """Tracks data lineage through the medallion layers."""

    def __init__(self) -> None:
        self.lineage: dict[str, dict[str, Any]] = {}

    def record_bronze_ingestion(self, source: str, record_count: int, timestamp: str) -> str:
        """Record data entry into Bronze layer."""
        lineage_id = f"bronze_{source}_{timestamp}"
        self.lineage[lineage_id] = {
            "layer": "bronze",
            "source": source,
            "record_count": record_count,
            "timestamp": timestamp,
            "status": "raw",
        }
        return lineage_id

    def record_silver_transformation(
        self, lineage_id: str, processed_count: int, quality_score: float
    ) -> str:
        """Record data transformation in Silver layer."""
        silver_id = f"{lineage_id}_silver"
        self.lineage[silver_id] = {
            "layer": "silver",
            "parent": lineage_id,
            "processed_count": processed_count,
            "quality_score": quality_score,
            "status": "cleaned",
        }
        return silver_id

    def record_gold_enrichment(
        self, lineage_id: str, enriched_count: int, embedding_model: str
    ) -> str:
        """Record data enrichment in Gold layer."""
        gold_id = f"{lineage_id}_gold"
        self.lineage[gold_id] = {
            "layer": "gold",
            "parent": lineage_id,
            "enriched_count": enriched_count,
            "embedding_model": embedding_model,
            "status": "enriched",
        }
        return gold_id

    def get_lineage(self, lineage_id: str) -> dict[str, Any] | None:
        """Get lineage information for a record."""
        return self.lineage.get(lineage_id)

record_bronze_ingestion(source, record_count, timestamp)

Record data entry into Bronze layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def record_bronze_ingestion(self, source: str, record_count: int, timestamp: str) -> str:
    """Record data entry into Bronze layer."""
    lineage_id = f"bronze_{source}_{timestamp}"
    self.lineage[lineage_id] = {
        "layer": "bronze",
        "source": source,
        "record_count": record_count,
        "timestamp": timestamp,
        "status": "raw",
    }
    return lineage_id

record_silver_transformation(lineage_id, processed_count, quality_score)

Record data transformation in Silver layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def record_silver_transformation(
    self, lineage_id: str, processed_count: int, quality_score: float
) -> str:
    """Record data transformation in Silver layer."""
    silver_id = f"{lineage_id}_silver"
    self.lineage[silver_id] = {
        "layer": "silver",
        "parent": lineage_id,
        "processed_count": processed_count,
        "quality_score": quality_score,
        "status": "cleaned",
    }
    return silver_id

record_gold_enrichment(lineage_id, enriched_count, embedding_model)

Record data enrichment in Gold layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def record_gold_enrichment(
    self, lineage_id: str, enriched_count: int, embedding_model: str
) -> str:
    """Record data enrichment in Gold layer."""
    gold_id = f"{lineage_id}_gold"
    self.lineage[gold_id] = {
        "layer": "gold",
        "parent": lineage_id,
        "enriched_count": enriched_count,
        "embedding_model": embedding_model,
        "status": "enriched",
    }
    return gold_id

get_lineage(lineage_id)

Get lineage information for a record.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def get_lineage(self, lineage_id: str) -> dict[str, Any] | None:
    """Get lineage information for a record."""
    return self.lineage.get(lineage_id)

DualStorage

Manages dual storage strategy: local Parquet + BigQuery.

Pattern: - Development/Testing: Write to local Parquet - Production/Cloud: Write to both local (backup) and BigQuery (primary)

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class DualStorage:
    """
    Manages dual storage strategy: local Parquet + BigQuery.

    Pattern:
    - Development/Testing: Write to local Parquet
    - Production/Cloud: Write to both local (backup) and BigQuery (primary)
    """

    def __init__(
        self,
        local_base_path: str = "data",
        bigquery_project: str | None = None,
        enable_bigquery: bool = False,
    ):
        self.local_storage = LocalParquetStorage(local_base_path)
        self.bigquery_storage = None

        if enable_bigquery and bigquery_project:
            self.bigquery_storage = BigQueryStorage(bigquery_project)
            logger.info("Dual storage enabled: Local Parquet + BigQuery")
        else:
            logger.info("Storage mode: Local Parquet only")

    def _write_layer(self, layer: str, data: Any, key: str, timestamp: str) -> bool:
        """
        Write data to a medallion layer.

        Args:
            layer: Layer name (bronze, silver, gold)
            data: Data to write
            key: Source or entity type identifier
            timestamp: Timestamp string for partitioning

        Returns:
            True if local write succeeded
        """
        local_path = f"{layer}/{key}/{timestamp}"
        success = self.local_storage.write(data, local_path)

        if self.bigquery_storage and success:
            bq_path = f"careerdex_{layer}.{key}_{timestamp.replace('-', '_').replace(':', '_')}"
            self.bigquery_storage.write(data, bq_path)

        return success

    def _read_layer(self, layer: str, key: str, timestamp: str) -> Any:
        """
        Read data from a medallion layer.

        Args:
            layer: Layer name (bronze, silver, gold)
            key: Source or entity type identifier
            timestamp: Timestamp string for partitioning
        """
        path = f"{layer}/{key}/{timestamp}"
        return self.local_storage.read(path)

    def write_bronze(self, data: Any, source: str, timestamp: str) -> bool:
        """Write to Bronze layer — path: bronze/{source}/{timestamp}."""
        return self._write_layer("bronze", data, source, timestamp)

    def write_silver(self, data: Any, entity_type: str, timestamp: str) -> bool:
        """Write to Silver layer — path: silver/{entity_type}/{timestamp}."""
        return self._write_layer("silver", data, entity_type, timestamp)

    def write_gold(self, data: Any, entity_type: str, timestamp: str) -> bool:
        """Write to Gold layer — path: gold/{entity_type}/{timestamp}."""
        return self._write_layer("gold", data, entity_type, timestamp)

    def read_bronze(self, source: str, timestamp: str) -> Any:
        """Read from Bronze layer."""
        return self._read_layer("bronze", source, timestamp)

    def read_silver(self, entity_type: str, timestamp: str) -> Any:
        """Read from Silver layer."""
        return self._read_layer("silver", entity_type, timestamp)

    def read_gold(self, entity_type: str, timestamp: str) -> Any:
        """Read from Gold layer."""
        return self._read_layer("gold", entity_type, timestamp)

write_bronze(data, source, timestamp)

Write to Bronze layer — path: bronze/{source}/{timestamp}.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def write_bronze(self, data: Any, source: str, timestamp: str) -> bool:
    """Write to Bronze layer — path: bronze/{source}/{timestamp}."""
    return self._write_layer("bronze", data, source, timestamp)

write_silver(data, entity_type, timestamp)

Write to Silver layer — path: silver/{entity_type}/{timestamp}.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def write_silver(self, data: Any, entity_type: str, timestamp: str) -> bool:
    """Write to Silver layer — path: silver/{entity_type}/{timestamp}."""
    return self._write_layer("silver", data, entity_type, timestamp)

write_gold(data, entity_type, timestamp)

Write to Gold layer — path: gold/{entity_type}/{timestamp}.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def write_gold(self, data: Any, entity_type: str, timestamp: str) -> bool:
    """Write to Gold layer — path: gold/{entity_type}/{timestamp}."""
    return self._write_layer("gold", data, entity_type, timestamp)

read_bronze(source, timestamp)

Read from Bronze layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def read_bronze(self, source: str, timestamp: str) -> Any:
    """Read from Bronze layer."""
    return self._read_layer("bronze", source, timestamp)

read_silver(entity_type, timestamp)

Read from Silver layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def read_silver(self, entity_type: str, timestamp: str) -> Any:
    """Read from Silver layer."""
    return self._read_layer("silver", entity_type, timestamp)

read_gold(entity_type, timestamp)

Read from Gold layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def read_gold(self, entity_type: str, timestamp: str) -> Any:
    """Read from Gold layer."""
    return self._read_layer("gold", entity_type, timestamp)

LayerConfiguration dataclass

Configuration for a medallion layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@dataclass
class LayerConfiguration:
    """Configuration for a medallion layer."""

    layer_name: str
    description: str
    purpose: str
    storage_format: StorageFormat
    local_path: str
    bigquery_dataset: str
    retention_days: int | None
    schema_validation: bool
    quality_threshold: float
    compression: str = "snappy"

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.quality_threshold < 0 or self.quality_threshold > 1:
            raise ValueError("quality_threshold must be between 0 and 1")

__post_init__()

Validate configuration.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.quality_threshold < 0 or self.quality_threshold > 1:
        raise ValueError("quality_threshold must be between 0 and 1")

LocalParquetStorage

Bases: StorageBackend

Local Parquet file storage implementation.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class LocalParquetStorage(StorageBackend):
    """Local Parquet file storage implementation."""

    def __init__(self, base_path: str = "data"):
        self.base_path = base_path
        logger.info(f"Initialized local Parquet storage at {base_path}")

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """
        Write data to local Parquet file.

        Args:
            data: Data to write (dict, list, or dataframe)
            path: Relative path from base_path
            format: Storage format (must be PARQUET for this backend)

        Returns:
            True if successful, False otherwise
        """
        if format != StorageFormat.PARQUET:
            logger.error(f"LocalParquetStorage only supports PARQUET format, got {format}")
            return False

        try:
            full_path = f"{self.base_path}/{path}"
            logger.info(f"Writing data to {full_path}")
            # Implementation: Use pyarrow.parquet to write
            # df.to_parquet(full_path, compression='snappy', index=False)
            return True
        except Exception as e:
            logger.error(f"Failed to write to {path}: {e}")
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """
        Read data from local Parquet file.

        Args:
            path: Relative path from base_path
            format: Storage format

        Returns:
            Data read from file, or None if failed
        """
        try:
            full_path = f"{self.base_path}/{path}"
            logger.info(f"Reading data from {full_path}")
            # Implementation: Use pyarrow.parquet to read
            # df = pd.read_parquet(full_path)
            return None
        except Exception as e:
            logger.error(f"Failed to read from {path}: {e}")
            return None

    def delete(self, path: str) -> bool:
        """Delete Parquet file."""
        try:
            full_path = f"{self.base_path}/{path}"
            logger.info(f"Deleting {full_path}")
            # Implementation: Use os.remove or shutil.rmtree
            return True
        except Exception as e:
            logger.error(f"Failed to delete {path}: {e}")
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List files under *prefix* relative to *base_path*."""
        target = Path(self.base_path) / prefix
        if not target.exists():
            return []
        return [str(p.relative_to(self.base_path)) for p in target.rglob("*") if p.is_file()]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists on disk."""
        return (Path(self.base_path) / path).exists()

write(data, path, format=StorageFormat.PARQUET)

Write data to local Parquet file.

Parameters:

Name Type Description Default
data Any

Data to write (dict, list, or dataframe)

required
path str

Relative path from base_path

required
format StorageFormat

Storage format (must be PARQUET for this backend)

PARQUET

Returns:

Type Description
bool

True if successful, False otherwise

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """
    Write data to local Parquet file.

    Args:
        data: Data to write (dict, list, or dataframe)
        path: Relative path from base_path
        format: Storage format (must be PARQUET for this backend)

    Returns:
        True if successful, False otherwise
    """
    if format != StorageFormat.PARQUET:
        logger.error(f"LocalParquetStorage only supports PARQUET format, got {format}")
        return False

    try:
        full_path = f"{self.base_path}/{path}"
        logger.info(f"Writing data to {full_path}")
        # Implementation: Use pyarrow.parquet to write
        # df.to_parquet(full_path, compression='snappy', index=False)
        return True
    except Exception as e:
        logger.error(f"Failed to write to {path}: {e}")
        return False

read(path, format=StorageFormat.PARQUET)

Read data from local Parquet file.

Parameters:

Name Type Description Default
path str

Relative path from base_path

required
format StorageFormat

Storage format

PARQUET

Returns:

Type Description
Any

Data read from file, or None if failed

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """
    Read data from local Parquet file.

    Args:
        path: Relative path from base_path
        format: Storage format

    Returns:
        Data read from file, or None if failed
    """
    try:
        full_path = f"{self.base_path}/{path}"
        logger.info(f"Reading data from {full_path}")
        # Implementation: Use pyarrow.parquet to read
        # df = pd.read_parquet(full_path)
        return None
    except Exception as e:
        logger.error(f"Failed to read from {path}: {e}")
        return None

delete(path)

Delete Parquet file.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def delete(self, path: str) -> bool:
    """Delete Parquet file."""
    try:
        full_path = f"{self.base_path}/{path}"
        logger.info(f"Deleting {full_path}")
        # Implementation: Use os.remove or shutil.rmtree
        return True
    except Exception as e:
        logger.error(f"Failed to delete {path}: {e}")
        return False

list_objects(prefix='')

List files under prefix relative to base_path.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List files under *prefix* relative to *base_path*."""
    target = Path(self.base_path) / prefix
    if not target.exists():
        return []
    return [str(p.relative_to(self.base_path)) for p in target.rglob("*") if p.is_file()]

exists(path)

Return True if path exists on disk.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists on disk."""
    return (Path(self.base_path) / path).exists()

MedallionArchitecture

Manages the three-layer medallion architecture for DEX.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class MedallionArchitecture:
    """Manages the three-layer medallion architecture for DEX."""

    # Bronze Layer Configuration
    BRONZE_CONFIG = LayerConfiguration(
        layer_name=DataLayer.BRONZE.value,
        description="Raw, unprocessed data from all sources",
        purpose="Preserve original data for historical reproducibility",
        storage_format=StorageFormat.PARQUET,
        local_path="data/bronze",
        bigquery_dataset="careerdex_bronze",
        retention_days=90,
        schema_validation=False,
        quality_threshold=0.0,  # No quality threshold for raw data
        compression="snappy",
    )

    # Silver Layer Configuration
    SILVER_CONFIG = LayerConfiguration(
        layer_name=DataLayer.SILVER.value,
        description="Cleaned, deduplicated, validated data",
        purpose="High-quality data ready for analytics and ML",
        storage_format=StorageFormat.PARQUET,
        local_path="data/silver",
        bigquery_dataset="careerdex_silver",
        retention_days=365,
        schema_validation=True,
        quality_threshold=0.75,  # >= 75% quality score
        compression="snappy",
    )

    # Gold Layer Configuration
    GOLD_CONFIG = LayerConfiguration(
        layer_name=DataLayer.GOLD.value,
        description="Enriched, aggregated data ready for ML and APIs",
        purpose="Serve AI models and customer-facing APIs",
        storage_format=StorageFormat.PARQUET,
        local_path="data/gold",
        bigquery_dataset="careerdex_gold",
        retention_days=None,  # Indefinite retention
        schema_validation=True,
        quality_threshold=0.90,  # >= 90% quality score
        compression="snappy",
    )

    @classmethod
    def get_layer_config(cls, layer: DataLayer) -> LayerConfiguration | None:
        """Get configuration for a specific layer."""
        configs = {
            DataLayer.BRONZE: cls.BRONZE_CONFIG,
            DataLayer.SILVER: cls.SILVER_CONFIG,
            DataLayer.GOLD: cls.GOLD_CONFIG,
        }
        return configs.get(layer)

    @classmethod
    def get_all_layers(cls) -> list[LayerConfiguration]:
        """Get configurations for all layers in order."""
        return [cls.BRONZE_CONFIG, cls.SILVER_CONFIG, cls.GOLD_CONFIG]

get_layer_config(layer) classmethod

Get configuration for a specific layer.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@classmethod
def get_layer_config(cls, layer: DataLayer) -> LayerConfiguration | None:
    """Get configuration for a specific layer."""
    configs = {
        DataLayer.BRONZE: cls.BRONZE_CONFIG,
        DataLayer.SILVER: cls.SILVER_CONFIG,
        DataLayer.GOLD: cls.GOLD_CONFIG,
    }
    return configs.get(layer)

get_all_layers() classmethod

Get configurations for all layers in order.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@classmethod
def get_all_layers(cls) -> list[LayerConfiguration]:
    """Get configurations for all layers in order."""
    return [cls.BRONZE_CONFIG, cls.SILVER_CONFIG, cls.GOLD_CONFIG]

StorageBackend

Bases: ABC

Abstract storage backend interface.

All lakehouse storage implementations must subclass this and provide concrete write, read, delete, list_objects, and exists methods. The interface accepts a StorageFormat hint so backends can choose serialisation.

Built-in implementations
  • LocalParquetStorage — local Parquet files (this module)
  • BigQueryStorage — Google BigQuery tables (this module)
  • JsonStorage — JSON files (dataenginex.lakehouse.storage)
  • ParquetStorage — pyarrow-backed Parquet (dataenginex.lakehouse.storage)
  • S3Storage — AWS S3 object storage (dataenginex.lakehouse.storage)
  • GCSStorage — Google Cloud Storage (dataenginex.lakehouse.storage)
Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class StorageBackend(ABC):
    """Abstract storage backend interface.

    All lakehouse storage implementations must subclass this and provide
    concrete ``write``, ``read``, ``delete``, ``list_objects``, and
    ``exists`` methods.  The interface accepts a ``StorageFormat`` hint
    so backends can choose serialisation.

    Built-in implementations:
        - ``LocalParquetStorage`` — local Parquet files (this module)
        - ``BigQueryStorage`` — Google BigQuery tables (this module)
        - ``JsonStorage`` — JSON files (``dataenginex.lakehouse.storage``)
        - ``ParquetStorage`` — pyarrow-backed Parquet (``dataenginex.lakehouse.storage``)
        - ``S3Storage`` — AWS S3 object storage (``dataenginex.lakehouse.storage``)
        - ``GCSStorage`` — Google Cloud Storage (``dataenginex.lakehouse.storage``)
    """

    @abstractmethod
    def write(self, data: Any, path: str, format: StorageFormat) -> bool:
        """Write *data* to *path* in the given format.

        Returns ``True`` on success, ``False`` on failure.
        """
        ...

    @abstractmethod
    def read(self, path: str, format: StorageFormat) -> Any:
        """Read data from *path*.  Returns ``None`` on failure."""
        ...

    @abstractmethod
    def delete(self, path: str) -> bool:
        """Delete the resource at *path*.  Returns ``True`` on success."""
        ...

    @abstractmethod
    def list_objects(self, prefix: str = "") -> list[str]:
        """List object paths under *prefix*.

        Returns a list of relative paths.  Empty list on failure or when
        no objects match.
        """
        ...

    @abstractmethod
    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in the backend."""
        ...

write(data, path, format) abstractmethod

Write data to path in the given format.

Returns True on success, False on failure.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def write(self, data: Any, path: str, format: StorageFormat) -> bool:
    """Write *data* to *path* in the given format.

    Returns ``True`` on success, ``False`` on failure.
    """
    ...

read(path, format) abstractmethod

Read data from path. Returns None on failure.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def read(self, path: str, format: StorageFormat) -> Any:
    """Read data from *path*.  Returns ``None`` on failure."""
    ...

delete(path) abstractmethod

Delete the resource at path. Returns True on success.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def delete(self, path: str) -> bool:
    """Delete the resource at *path*.  Returns ``True`` on success."""
    ...

list_objects(prefix='') abstractmethod

List object paths under prefix.

Returns a list of relative paths. Empty list on failure or when no objects match.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def list_objects(self, prefix: str = "") -> list[str]:
    """List object paths under *prefix*.

    Returns a list of relative paths.  Empty list on failure or when
    no objects match.
    """
    ...

exists(path) abstractmethod

Return True if path exists in the backend.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in the backend."""
    ...

StorageFormat

Bases: StrEnum

Supported storage formats

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class StorageFormat(StrEnum):
    """Supported storage formats"""

    PARQUET = "parquet"
    DELTA = "delta"
    ICEBERG = "iceberg"
    BIGQUERY = "bigquery"

PipelineConfig

Configuration for DEX data pipelines.

Source code in packages/dataenginex/src/dataenginex/core/pipeline_config.py
class PipelineConfig:
    """Configuration for DEX data pipelines."""

    # Pipeline execution constants
    EXECUTION_SCHEDULE = "0 */3 * * *"  # Every 3 hours (00:00, 03:00, 06:00, etc.)
    EXPECTED_CYCLE_TIME_MINUTES = 45  # Expected runtime: 45 minutes per cycle
    TIMEOUT_MINUTES = 120  # Kill pipeline if running >2 hours

    # Job ingestion sources and target volumes
    CAREERDEX_JOB_SOURCES = {
        "linkedin": {
            "type": "rest_api",
            "expected_daily_jobs": 10000,
            "cycles_per_day": 8,  # 24 hours / 3 hours
            "expected_cycle_jobs": 1250,
            "timeout_seconds": 900,
        },
        "indeed": {
            "type": "rest_api",
            "expected_daily_jobs": 50000,
            "cycles_per_day": 8,
            "expected_cycle_jobs": 6250,
            "timeout_seconds": 1200,
        },
        "glassdoor": {
            "type": "rest_api",
            "expected_daily_jobs": 20000,
            "cycles_per_day": 8,
            "expected_cycle_jobs": 2500,
            "timeout_seconds": 1000,
        },
        "company_career_pages": {
            "type": "scraper",
            "expected_daily_jobs": 30000,
            "cycles_per_day": 8,
            "expected_cycle_jobs": 3750,
            "timeout_seconds": 1500,
        },
    }

    # Total expected jobs per cycle: 13,750 posts
    EXPECTED_JOBS_PER_CYCLE: int = 13750  # 1250 + 6250 + 2500 + 3750

    # Total expected jobs in system (live): ~110K per day = 1M+ rolling window
    EXPECTED_JOBS_TOTAL: int = 110000  # 10000 + 50000 + 20000 + 30000

PipelineMetrics

Metrics tracking for pipeline monitoring.

Source code in packages/dataenginex/src/dataenginex/core/pipeline_config.py
class PipelineMetrics:
    """Metrics tracking for pipeline monitoring."""

    METRICS = {
        "jobs_fetched": {
            "description": "Total jobs fetched from all sources",
            "type": "counter",
            "unit": "count",
        },
        "jobs_ingested": {
            "description": "Jobs successfully ingested into system",
            "type": "counter",
            "unit": "count",
        },
        "jobs_deduplicated": {
            "description": "Duplicate jobs detected and marked",
            "type": "counter",
            "unit": "count",
        },
        "jobs_enriched": {
            "description": "Jobs with embeddings and enrichments",
            "type": "counter",
            "unit": "count",
        },
        "data_quality_score": {
            "description": "Average quality score of ingested jobs",
            "type": "gauge",
            "unit": "percentage",
            "target": 85,  # Target 85%+
        },
        "pipeline_duration": {
            "description": "Total execution time for pipeline",
            "type": "histogram",
            "unit": "seconds",
            "target_max": 2700,  # 45 minute target
        },
        "bronze_to_silver_loss": {
            "description": "Percentage of data lost in cleaning",
            "type": "gauge",
            "unit": "percentage",
            "target_max": 5,  # Max 5% loss acceptable
        },
        "silver_to_gold_loss": {
            "description": "Percentage of data lost in enrichment",
            "type": "gauge",
            "unit": "percentage",
            "target_max": 2,  # Max 2% loss acceptable
        },
    }

QualityDimension

Bases: StrEnum

Named quality dimensions tracked by the quality framework.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
class QualityDimension(StrEnum):
    """Named quality dimensions tracked by the quality framework."""

    COMPLETENESS = "completeness"
    ACCURACY = "accuracy"
    CONSISTENCY = "consistency"
    TIMELINESS = "timeliness"
    UNIQUENESS = "uniqueness"

QualityGate

Orchestrates quality checks at medallion layer transitions.

Combines DataProfiler, DataQualityChecks, and QualityScorer to produce a single pass/fail QualityResult for a batch of records.

Parameters:

Name Type Description Default
store QualityStore | None

Optional QualityStore to persist results automatically.

None
profiler DataProfiler | None

Optional DataProfiler instance (created if omitted).

None
Source code in packages/dataenginex/src/dataenginex/core/quality.py
class QualityGate:
    """Orchestrates quality checks at medallion layer transitions.

    Combines ``DataProfiler``, ``DataQualityChecks``, and ``QualityScorer``
    to produce a single pass/fail ``QualityResult`` for a batch of records.

    Args:
        store: Optional ``QualityStore`` to persist results automatically.
        profiler: Optional ``DataProfiler`` instance (created if omitted).
    """

    def __init__(
        self,
        store: QualityStore | None = None,
        profiler: DataProfiler | None = None,
    ) -> None:
        self._store = store
        self._profiler = profiler or DataProfiler()

    @property
    def store(self) -> QualityStore | None:
        """Return the attached quality store, if any."""
        return self._store

    def evaluate(
        self,
        records: list[dict[str, Any]],
        layer: DataLayer,
        *,
        required_fields: set[str] | None = None,
        dataset_name: str = "batch",
    ) -> QualityResult:
        """Evaluate a batch of records against a layer's quality threshold.

        Steps performed:
        1. Profile the dataset (``DataProfiler``).
        2. Check completeness for each record (``DataQualityChecks``).
        3. Score each record (``QualityScorer``).
        4. Check uniqueness across the batch.
        5. Compute per-dimension averages and overall score.
        6. Compare overall score to the layer's ``quality_threshold``.

        Args:
            records: List of record dicts to evaluate.
            layer: Target ``DataLayer`` (bronze / silver / gold).
            required_fields: Field names required for completeness check.
                Falls back to ``{"job_id", "source", "company_name", "job_title"}``.
            dataset_name: Name passed to the profiler.

        Returns:
            A ``QualityResult`` indicating pass/fail and detailed scores.
        """
        config = MedallionArchitecture.get_layer_config(layer)
        threshold = config.quality_threshold if config else 0.0

        if not records:
            result = QualityResult(
                passed=True,
                layer=layer.value,
                quality_score=0.0,
                threshold=threshold,
                record_count=0,
                valid_count=0,
                dimensions={d.value: 0.0 for d in QualityDimension},
            )
            if self._store:
                self._store.record(result)
            return result

        # 1. Profile
        profile = self._profiler.profile(records, dataset_name)

        # 2. Required-field defaults
        if required_fields is None:
            required_fields = {"job_id", "source", "company_name", "job_title"}

        # 3. Per-record evaluation
        completeness_scores: list[float] = []
        quality_scores: list[float] = []
        valid_count = 0
        seen_ids: set[str] = set()
        unique_count = 0

        for rec in records:
            # Completeness
            ok, _missing = DataQualityChecks.check_completeness(rec, required_fields)
            completeness_scores.append(1.0 if ok else 0.0)
            if ok:
                valid_count += 1

            # Quality score
            quality_scores.append(QualityScorer.score_job_posting(rec))

            # Uniqueness
            rid = str(rec.get("job_id", id(rec)))
            is_unique, _ = DataQualityChecks.check_uniqueness_job_id(rid, seen_ids)
            if is_unique:
                unique_count += 1
                seen_ids.add(rid)

        total = len(records)
        dim_completeness = sum(completeness_scores) / total
        dim_accuracy = sum(quality_scores) / total
        dim_uniqueness = unique_count / total
        dim_consistency = profile.completeness  # proxy via null-rate
        dim_timeliness = 1.0  # placeholder — requires timestamp analysis

        dimensions = {
            QualityDimension.COMPLETENESS.value: dim_completeness,
            QualityDimension.ACCURACY.value: dim_accuracy,
            QualityDimension.CONSISTENCY.value: dim_consistency,
            QualityDimension.UNIQUENESS.value: dim_uniqueness,
            QualityDimension.TIMELINESS.value: dim_timeliness,
        }

        overall = sum(dimensions.values()) / len(dimensions)
        passed = overall >= threshold

        result = QualityResult(
            passed=passed,
            layer=layer.value,
            quality_score=overall,
            threshold=threshold,
            record_count=total,
            valid_count=valid_count,
            dimensions=dimensions,
            profile=profile,
        )

        if self._store:
            self._store.record(result)

        logger.info(
            "Quality gate evaluated",
            layer=layer.value,
            score=round(overall, 4),
            threshold=threshold,
            passed=passed,
            records=total,
        )

        return result

store property

Return the attached quality store, if any.

evaluate(records, layer, *, required_fields=None, dataset_name='batch')

Evaluate a batch of records against a layer's quality threshold.

Steps performed: 1. Profile the dataset (DataProfiler). 2. Check completeness for each record (DataQualityChecks). 3. Score each record (QualityScorer). 4. Check uniqueness across the batch. 5. Compute per-dimension averages and overall score. 6. Compare overall score to the layer's quality_threshold.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of record dicts to evaluate.

required
layer DataLayer

Target DataLayer (bronze / silver / gold).

required
required_fields set[str] | None

Field names required for completeness check. Falls back to {"job_id", "source", "company_name", "job_title"}.

None
dataset_name str

Name passed to the profiler.

'batch'

Returns:

Type Description
QualityResult

A QualityResult indicating pass/fail and detailed scores.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def evaluate(
    self,
    records: list[dict[str, Any]],
    layer: DataLayer,
    *,
    required_fields: set[str] | None = None,
    dataset_name: str = "batch",
) -> QualityResult:
    """Evaluate a batch of records against a layer's quality threshold.

    Steps performed:
    1. Profile the dataset (``DataProfiler``).
    2. Check completeness for each record (``DataQualityChecks``).
    3. Score each record (``QualityScorer``).
    4. Check uniqueness across the batch.
    5. Compute per-dimension averages and overall score.
    6. Compare overall score to the layer's ``quality_threshold``.

    Args:
        records: List of record dicts to evaluate.
        layer: Target ``DataLayer`` (bronze / silver / gold).
        required_fields: Field names required for completeness check.
            Falls back to ``{"job_id", "source", "company_name", "job_title"}``.
        dataset_name: Name passed to the profiler.

    Returns:
        A ``QualityResult`` indicating pass/fail and detailed scores.
    """
    config = MedallionArchitecture.get_layer_config(layer)
    threshold = config.quality_threshold if config else 0.0

    if not records:
        result = QualityResult(
            passed=True,
            layer=layer.value,
            quality_score=0.0,
            threshold=threshold,
            record_count=0,
            valid_count=0,
            dimensions={d.value: 0.0 for d in QualityDimension},
        )
        if self._store:
            self._store.record(result)
        return result

    # 1. Profile
    profile = self._profiler.profile(records, dataset_name)

    # 2. Required-field defaults
    if required_fields is None:
        required_fields = {"job_id", "source", "company_name", "job_title"}

    # 3. Per-record evaluation
    completeness_scores: list[float] = []
    quality_scores: list[float] = []
    valid_count = 0
    seen_ids: set[str] = set()
    unique_count = 0

    for rec in records:
        # Completeness
        ok, _missing = DataQualityChecks.check_completeness(rec, required_fields)
        completeness_scores.append(1.0 if ok else 0.0)
        if ok:
            valid_count += 1

        # Quality score
        quality_scores.append(QualityScorer.score_job_posting(rec))

        # Uniqueness
        rid = str(rec.get("job_id", id(rec)))
        is_unique, _ = DataQualityChecks.check_uniqueness_job_id(rid, seen_ids)
        if is_unique:
            unique_count += 1
            seen_ids.add(rid)

    total = len(records)
    dim_completeness = sum(completeness_scores) / total
    dim_accuracy = sum(quality_scores) / total
    dim_uniqueness = unique_count / total
    dim_consistency = profile.completeness  # proxy via null-rate
    dim_timeliness = 1.0  # placeholder — requires timestamp analysis

    dimensions = {
        QualityDimension.COMPLETENESS.value: dim_completeness,
        QualityDimension.ACCURACY.value: dim_accuracy,
        QualityDimension.CONSISTENCY.value: dim_consistency,
        QualityDimension.UNIQUENESS.value: dim_uniqueness,
        QualityDimension.TIMELINESS.value: dim_timeliness,
    }

    overall = sum(dimensions.values()) / len(dimensions)
    passed = overall >= threshold

    result = QualityResult(
        passed=passed,
        layer=layer.value,
        quality_score=overall,
        threshold=threshold,
        record_count=total,
        valid_count=valid_count,
        dimensions=dimensions,
        profile=profile,
    )

    if self._store:
        self._store.record(result)

    logger.info(
        "Quality gate evaluated",
        layer=layer.value,
        score=round(overall, 4),
        threshold=threshold,
        passed=passed,
        records=total,
    )

    return result

QualityResult dataclass

Immutable result of evaluating a batch through a QualityGate.

Attributes:

Name Type Description
passed bool

Whether the batch met the layer's quality threshold.

layer str

Target medallion layer that was evaluated.

quality_score float

Overall quality score (0.0–1.0).

threshold float

Layer threshold the batch was compared against.

record_count int

Number of records in the batch.

valid_count int

Number of records that passed schema validation.

dimensions dict[str, float]

Per-dimension scores.

profile ProfileReport | None

Optional ProfileReport produced during evaluation.

evaluated_at datetime

Timestamp of the evaluation.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
@dataclass(frozen=True)
class QualityResult:
    """Immutable result of evaluating a batch through a ``QualityGate``.

    Attributes:
        passed: Whether the batch met the layer's quality threshold.
        layer: Target medallion layer that was evaluated.
        quality_score: Overall quality score (0.0–1.0).
        threshold: Layer threshold the batch was compared against.
        record_count: Number of records in the batch.
        valid_count: Number of records that passed schema validation.
        dimensions: Per-dimension scores.
        profile: Optional ``ProfileReport`` produced during evaluation.
        evaluated_at: Timestamp of the evaluation.
    """

    passed: bool
    layer: str
    quality_score: float
    threshold: float
    record_count: int
    valid_count: int
    dimensions: dict[str, float]
    profile: ProfileReport | None = None
    evaluated_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    def to_dict(self) -> dict[str, Any]:
        """Serialise the result to a plain dictionary."""
        return {
            "passed": self.passed,
            "layer": self.layer,
            "quality_score": round(self.quality_score, 4),
            "threshold": self.threshold,
            "record_count": self.record_count,
            "valid_count": self.valid_count,
            "dimensions": {k: round(v, 4) for k, v in self.dimensions.items()},
            "evaluated_at": self.evaluated_at.isoformat(),
        }

to_dict()

Serialise the result to a plain dictionary.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def to_dict(self) -> dict[str, Any]:
    """Serialise the result to a plain dictionary."""
    return {
        "passed": self.passed,
        "layer": self.layer,
        "quality_score": round(self.quality_score, 4),
        "threshold": self.threshold,
        "record_count": self.record_count,
        "valid_count": self.valid_count,
        "dimensions": {k: round(v, 4) for k, v in self.dimensions.items()},
        "evaluated_at": self.evaluated_at.isoformat(),
    }

QualityStore

In-memory store that accumulates quality metrics per medallion layer.

Each call to :meth:record appends a snapshot. :meth:summary returns the latest-known scores across all layers, suitable for the /api/v1/data/quality endpoint.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
class QualityStore:
    """In-memory store that accumulates quality metrics per medallion layer.

    Each call to :meth:`record` appends a snapshot.  :meth:`summary` returns
    the latest-known scores across all layers, suitable for the
    ``/api/v1/data/quality`` endpoint.
    """

    def __init__(self) -> None:
        self._history: dict[str, list[QualityResult]] = {
            DataLayer.BRONZE: [],
            DataLayer.SILVER: [],
            DataLayer.GOLD: [],
        }

    def record(self, result: QualityResult) -> None:
        """Persist a quality result for its layer."""
        layer = result.layer
        if layer not in self._history:
            self._history[layer] = []
        self._history[layer].append(result)
        logger.info(
            "Quality result recorded",
            layer=layer,
            score=round(result.quality_score, 4),
            passed=result.passed,
        )

    def latest(self, layer: str) -> QualityResult | None:
        """Return the most recent result for *layer*, or ``None``."""
        results = self._history.get(layer, [])
        return results[-1] if results else None

    def summary(self) -> dict[str, Any]:
        """Return a quality summary across all layers.

        The shape matches the ``/api/v1/data/quality`` response contract.
        """
        layer_scores: dict[str, float] = {}
        all_dimensions: dict[str, list[float]] = {}

        for layer_name in (DataLayer.BRONZE, DataLayer.SILVER, DataLayer.GOLD):
            latest = self.latest(layer_name)
            if latest is None:
                layer_scores[layer_name] = 0.0
                continue
            layer_scores[layer_name] = round(latest.quality_score, 4)
            for dim, val in latest.dimensions.items():
                all_dimensions.setdefault(dim, []).append(val)

        # Average each dimension across layers that have data
        avg_dimensions: dict[str, float] = {}
        for dim, vals in all_dimensions.items():
            avg_dimensions[dim] = round(sum(vals) / len(vals), 4) if vals else 0.0

        # Ensure all standard dimensions are present
        for dim in QualityDimension:
            avg_dimensions.setdefault(dim.value, 0.0)

        overall = round(sum(layer_scores.values()) / len(layer_scores), 4) if layer_scores else 0.0

        return {
            "overall_score": overall,
            "dimensions": avg_dimensions,
            "layer_scores": layer_scores,
        }

    def history(self, layer: str, limit: int = 10) -> list[dict[str, Any]]:
        """Return the last *limit* results for *layer* as dicts."""
        results = self._history.get(layer, [])
        return [r.to_dict() for r in results[-limit:]]

record(result)

Persist a quality result for its layer.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def record(self, result: QualityResult) -> None:
    """Persist a quality result for its layer."""
    layer = result.layer
    if layer not in self._history:
        self._history[layer] = []
    self._history[layer].append(result)
    logger.info(
        "Quality result recorded",
        layer=layer,
        score=round(result.quality_score, 4),
        passed=result.passed,
    )

latest(layer)

Return the most recent result for layer, or None.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def latest(self, layer: str) -> QualityResult | None:
    """Return the most recent result for *layer*, or ``None``."""
    results = self._history.get(layer, [])
    return results[-1] if results else None

summary()

Return a quality summary across all layers.

The shape matches the /api/v1/data/quality response contract.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def summary(self) -> dict[str, Any]:
    """Return a quality summary across all layers.

    The shape matches the ``/api/v1/data/quality`` response contract.
    """
    layer_scores: dict[str, float] = {}
    all_dimensions: dict[str, list[float]] = {}

    for layer_name in (DataLayer.BRONZE, DataLayer.SILVER, DataLayer.GOLD):
        latest = self.latest(layer_name)
        if latest is None:
            layer_scores[layer_name] = 0.0
            continue
        layer_scores[layer_name] = round(latest.quality_score, 4)
        for dim, val in latest.dimensions.items():
            all_dimensions.setdefault(dim, []).append(val)

    # Average each dimension across layers that have data
    avg_dimensions: dict[str, float] = {}
    for dim, vals in all_dimensions.items():
        avg_dimensions[dim] = round(sum(vals) / len(vals), 4) if vals else 0.0

    # Ensure all standard dimensions are present
    for dim in QualityDimension:
        avg_dimensions.setdefault(dim.value, 0.0)

    overall = round(sum(layer_scores.values()) / len(layer_scores), 4) if layer_scores else 0.0

    return {
        "overall_score": overall,
        "dimensions": avg_dimensions,
        "layer_scores": layer_scores,
    }

history(layer, limit=10)

Return the last limit results for layer as dicts.

Source code in packages/dataenginex/src/dataenginex/core/quality.py
def history(self, layer: str, limit: int = 10) -> list[dict[str, Any]]:
    """Return the last *limit* results for *layer* as dicts."""
    results = self._history.get(layer, [])
    return [r.to_dict() for r in results[-limit:]]

ComponentStatus

Bases: BaseModel

Health status of a single dependency component.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class ComponentStatus(BaseModel):
    """Health status of a single dependency component."""

    name: str
    status: str
    message: str | None = None
    duration_ms: float | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "database",
                    "status": "healthy",
                    "message": "reachable",
                    "duration_ms": 12.5,
                }
            ]
        }
    }

DataQualityReport

Bases: BaseModel

Data quality check results (Issue #33)

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class DataQualityReport(BaseModel):
    """Data quality check results (Issue #33)"""

    execution_id: str = Field(...)
    check_timestamp: datetime = Field(...)
    dataset_name: str = Field(...)
    total_records: int = Field(default=0, ge=0)
    passed_records: int = Field(default=0, ge=0)
    failed_records: int = Field(default=0, ge=0)
    quality_score: float = Field(default=0.0, ge=0, le=100)
    check_results: dict[str, Any] = Field(default_factory=dict)
    issues: list[str] = Field(default_factory=list)

EchoRequest

Bases: BaseModel

Request body for the /echo debug endpoint.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class EchoRequest(BaseModel):
    """Request body for the ``/echo`` debug endpoint."""

    message: str = Field(min_length=1)
    count: int = Field(default=1, ge=1, le=10)

    model_config = {"json_schema_extra": {"examples": [{"message": "hello", "count": 2}]}}

EchoResponse

Bases: BaseModel

Response body for the /echo debug endpoint.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class EchoResponse(BaseModel):
    """Response body for the ``/echo`` debug endpoint."""

    message: str
    count: int
    echo: list[str]

    model_config = {
        "json_schema_extra": {
            "examples": [{"message": "hello", "count": 2, "echo": ["hello", "hello"]}]
        }
    }

ErrorDetail

Bases: BaseModel

Detail of a single validation or processing error.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class ErrorDetail(BaseModel):
    """Detail of a single validation or processing error."""

    field: str | None = None
    message: str
    type: str | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "field": "message",
                    "message": "String should have at least 1 character",
                    "type": "string_too_short",
                }
            ]
        }
    }

ErrorResponse

Bases: BaseModel

Standard error response returned by all API error handlers.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class ErrorResponse(BaseModel):
    """Standard error response returned by all API error handlers."""

    error: str
    message: str
    request_id: str | None = None
    details: list[ErrorDetail] | None = None

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "error": "validation_error",
                    "message": "Request validation failed",
                    "request_id": "1f9b6b1c-5b90-4c6c-8d2a-1f28f6f0b5a1",
                    "details": [
                        {
                            "field": "message",
                            "message": "String should have at least 1 character",
                            "type": "string_too_short",
                        }
                    ],
                }
            ]
        }
    }

HealthResponse

Bases: BaseModel

Response body for the /health liveness probe.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class HealthResponse(BaseModel):
    """Response body for the ``/health`` liveness probe."""

    status: str

    model_config = {"json_schema_extra": {"examples": [{"status": "alive"}]}}

JobPosting

Bases: BaseModel

Core job posting schema for CareerDEX (Silver layer)

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class JobPosting(BaseModel):
    """Core job posting schema for CareerDEX (Silver layer)"""

    job_id: str = Field(..., description="Unique job identifier across all sources")
    source: JobSourceEnum = Field(..., description="Data source of job posting")
    source_job_id: str = Field(..., description="Original ID from source system")

    # Company info
    company_name: str = Field(..., min_length=1, max_length=255)
    company_industry: str | None = Field(None, max_length=100)
    company_size: str | None = Field(None, description="e.g., '1-50', '51-200', '200-1000'")

    # Position details
    job_title: str = Field(..., min_length=3, max_length=255)
    job_description: str = Field(..., min_length=10)
    required_skills: list[str] = Field(default_factory=list)
    preferred_skills: list[str] = Field(default_factory=list)
    experience_level: str | None = Field(
        None, description="entry_level, mid_level, senior, executive"
    )
    years_experience_required: int | None = Field(None, ge=0)

    # Employment
    location: JobLocation = Field(...)
    employment_type: str = Field(..., description="full_time, part_time, contract, temporary")

    # Compensation
    benefits: JobBenefits | None = Field(default=None)

    # Metadata
    posted_date: datetime = Field(..., description="When job was posted on source")
    last_modified_date: datetime = Field(...)
    expiration_date: datetime | None = Field(None)

    # Tracking
    dex_ingestion_date: datetime = Field(default_factory=lambda: datetime.now(tz=UTC))
    dex_hash: str = Field(..., description="Content hash for deduplication")
    dex_dedup_id: str | None = Field(None, description="Deduplication group ID")
    quality_score: float = Field(
        default=0.0, ge=0, le=1.0, description="Job quality/relevance score"
    )

JobSourceEnum

Bases: StrEnum

Supported external data sources (project-specific enumeration)

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class JobSourceEnum(StrEnum):
    """Supported external data sources (project-specific enumeration)"""

    LINKEDIN = "linkedin"
    INDEED = "indeed"
    GLASSDOOR = "glassdoor"
    COMPANY_CAREER_PAGES = "company_career_pages"

PipelineExecutionMetadata

Bases: BaseModel

Tracks pipeline execution for lineage and troubleshooting (Issue #34)

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class PipelineExecutionMetadata(BaseModel):
    """Tracks pipeline execution for lineage and troubleshooting (Issue #34)"""

    pipeline_name: str = Field(...)
    execution_id: str = Field(..., description="Unique execution identifier")
    execution_start_time: datetime = Field(...)
    execution_end_time: datetime | None = Field(None)
    status: str = Field(..., description="running, succeeded, failed, partial")
    source_count: int = Field(default=0, ge=0)
    processed_count: int = Field(default=0, ge=0)
    ingested_count: int = Field(default=0, ge=0)
    failed_count: int = Field(default=0, ge=0)
    error_message: str | None = Field(None)
    layer: str = Field(..., description="bronze, silver, or gold")

ReadinessResponse

Bases: BaseModel

Response body for the /ready readiness probe.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class ReadinessResponse(BaseModel):
    """Response body for the ``/ready`` readiness probe."""

    status: str
    components: list[ComponentStatus]

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "status": "ready",
                    "components": [
                        {
                            "name": "database",
                            "status": "healthy",
                            "message": "reachable",
                            "duration_ms": 12.5,
                        },
                        {
                            "name": "cache",
                            "status": "skipped",
                            "message": "cache not configured",
                            "duration_ms": None,
                        },
                    ],
                }
            ]
        }
    }

RootResponse

Bases: BaseModel

Response body for the root / endpoint.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class RootResponse(BaseModel):
    """Response body for the root ``/`` endpoint."""

    message: str
    version: str

    model_config = {
        "json_schema_extra": {"examples": [{"message": "DataEngineX API", "version": "0.1.0"}]}
    }

StartupResponse

Bases: BaseModel

Response body for the /startup probe.

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class StartupResponse(BaseModel):
    """Response body for the ``/startup`` probe."""

    status: str

    model_config = {"json_schema_extra": {"examples": [{"status": "started"}]}}

UserProfile

Bases: BaseModel

CareerDEX user profile schema

Source code in packages/dataenginex/src/dataenginex/core/schemas.py
class UserProfile(BaseModel):
    """CareerDEX user profile schema"""

    user_id: str = Field(..., description="Unique user identifier")
    email: EmailStr = Field(...)
    first_name: str = Field(..., max_length=100)
    last_name: str = Field(..., max_length=100)

    # Professional info
    current_title: str | None = Field(None, max_length=255)
    current_company: str | None = Field(None, max_length=255)
    years_experience: int | None = Field(None, ge=0)
    skills: list[str] = Field(default_factory=list)

    # Education
    education: str | None = Field(None, description="Highest education level")
    preferred_locations: list[str] = Field(default_factory=list, description="City, country codes")

    # Preferences
    preferred_job_titles: list[str] = Field(default_factory=list)
    salary_expectations: dict[str, float] | None = Field(None)
    willing_to_relocate: bool = Field(False)

    # Engagement
    created_date: datetime = Field(...)
    last_activity_date: datetime = Field(...)
    profile_completion_percentage: float = Field(default=0.0, ge=0, le=100)

DataHash

Generates content hashes for deduplication (DEX requirement).

Source code in packages/dataenginex/src/dataenginex/core/validators.py
class DataHash:
    """Generates content hashes for deduplication (DEX requirement)."""

    @staticmethod
    def generate_job_hash(job_id: str, source: str, company_name: str, job_title: str) -> str:
        """
        Generate a hash for job posting content to identify duplicates.
        Uses job_id + source + company + title as content identifier.

        Args:
            job_id: Source job ID
            source: Job source (linkedin, indeed, etc.)
            company_name: Company name
            job_title: Job title

        Returns:
            SHA256 hash of content
        """
        content = f"{source}:{company_name}:{job_title}:{job_id}"
        return hashlib.sha256(content.encode()).hexdigest()

    @staticmethod
    def generate_user_hash(email: str, first_name: str, last_name: str) -> str:
        """
        Generate a hash for user profile to identify duplicates.

        Returns:
            SHA256 hash of user identity
        """
        content = f"{email}:{first_name.lower()}:{last_name.lower()}"
        return hashlib.sha256(content.encode()).hexdigest()

generate_job_hash(job_id, source, company_name, job_title) staticmethod

Generate a hash for job posting content to identify duplicates. Uses job_id + source + company + title as content identifier.

Parameters:

Name Type Description Default
job_id str

Source job ID

required
source str

Job source (linkedin, indeed, etc.)

required
company_name str

Company name

required
job_title str

Job title

required

Returns:

Type Description
str

SHA256 hash of content

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def generate_job_hash(job_id: str, source: str, company_name: str, job_title: str) -> str:
    """
    Generate a hash for job posting content to identify duplicates.
    Uses job_id + source + company + title as content identifier.

    Args:
        job_id: Source job ID
        source: Job source (linkedin, indeed, etc.)
        company_name: Company name
        job_title: Job title

    Returns:
        SHA256 hash of content
    """
    content = f"{source}:{company_name}:{job_title}:{job_id}"
    return hashlib.sha256(content.encode()).hexdigest()

generate_user_hash(email, first_name, last_name) staticmethod

Generate a hash for user profile to identify duplicates.

Returns:

Type Description
str

SHA256 hash of user identity

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def generate_user_hash(email: str, first_name: str, last_name: str) -> str:
    """
    Generate a hash for user profile to identify duplicates.

    Returns:
        SHA256 hash of user identity
    """
    content = f"{email}:{first_name.lower()}:{last_name.lower()}"
    return hashlib.sha256(content.encode()).hexdigest()

DataQualityChecks

Implements data quality checks across multiple dimensions.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
class DataQualityChecks:
    """Implements data quality checks across multiple dimensions."""

    @staticmethod
    def check_completeness(
        record: dict[str, Any], required_fields: set[str]
    ) -> tuple[bool, list[str]]:
        """
        Check that all required fields are present and non-null.

        Args:
            record: Data record to check
            required_fields: Set of field names that must be present

        Returns:
            Tuple of (is_complete, missing_fields)
        """
        missing = []
        for field in required_fields:
            if field not in record or record[field] is None:
                missing.append(field)

        return len(missing) == 0, missing

    @staticmethod
    def check_accuracy_salary(
        salary_min: float | None, salary_max: float | None
    ) -> tuple[bool, list[str]]:
        """
        Check salary range accuracy and reasonableness.

        Returns:
            Tuple of (is_accurate, issues)
        """
        issues = []

        if salary_min is not None and salary_max is not None:
            if salary_min > salary_max:
                issues.append(f"Salary min ({salary_min}) > max ({salary_max})")
            if salary_max > 500000:  # Reasonable upper bound for most positions
                issues.append(f"Salary max ({salary_max}) exceeds reasonable threshold")
            if salary_min < 15000:  # Below US minimum wage equivalent
                issues.append(f"Salary min ({salary_min}) below reasonable threshold")

        return len(issues) == 0, issues

    @staticmethod
    def check_consistency_dates(
        posted_date: datetime,
        last_modified_date: datetime,
        expiration_date: datetime | None = None,
    ) -> tuple[bool, list[str]]:
        """
        Check temporal consistency of dates.

        Returns:
            Tuple of (is_consistent, issues)
        """
        issues = []

        if posted_date > last_modified_date:
            issues.append("Posted date is after last modified date")

        if expiration_date and expiration_date < posted_date:
            issues.append("Expiration date is before posted date")

        if posted_date > datetime.now(tz=UTC):
            issues.append("Posted date is in the future")

        return len(issues) == 0, issues

    @staticmethod
    def check_uniqueness_job_id(current_id: str, seen_ids: set[str]) -> tuple[bool, str]:
        """
        Check if job ID is unique in the batch.

        Returns:
            Tuple of (is_unique, issue_message)
        """
        if current_id in seen_ids:
            return False, f"Duplicate job ID: {current_id}"
        return True, ""

    @staticmethod
    def check_validity_location(
        country: str,
        city: str,
        latitude: float | None = None,
        longitude: float | None = None,
    ) -> tuple[bool, list[str]]:
        """
        Check location validity.

        Returns:
            Tuple of (is_valid, issues)
        """
        issues = []

        if len(country) != 2:
            issues.append(f"Country code should be 2 chars, got: {country}")

        if not city or len(city) < 2:
            issues.append("City name is too short")

        if latitude is not None and not (-90 <= latitude <= 90):
            issues.append(f"Latitude out of range: {latitude}")

        if longitude is not None and not (-180 <= longitude <= 180):
            issues.append(f"Longitude out of range: {longitude}")

        return len(issues) == 0, issues

check_completeness(record, required_fields) staticmethod

Check that all required fields are present and non-null.

Parameters:

Name Type Description Default
record dict[str, Any]

Data record to check

required
required_fields set[str]

Set of field names that must be present

required

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_complete, missing_fields)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def check_completeness(
    record: dict[str, Any], required_fields: set[str]
) -> tuple[bool, list[str]]:
    """
    Check that all required fields are present and non-null.

    Args:
        record: Data record to check
        required_fields: Set of field names that must be present

    Returns:
        Tuple of (is_complete, missing_fields)
    """
    missing = []
    for field in required_fields:
        if field not in record or record[field] is None:
            missing.append(field)

    return len(missing) == 0, missing

check_accuracy_salary(salary_min, salary_max) staticmethod

Check salary range accuracy and reasonableness.

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_accurate, issues)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def check_accuracy_salary(
    salary_min: float | None, salary_max: float | None
) -> tuple[bool, list[str]]:
    """
    Check salary range accuracy and reasonableness.

    Returns:
        Tuple of (is_accurate, issues)
    """
    issues = []

    if salary_min is not None and salary_max is not None:
        if salary_min > salary_max:
            issues.append(f"Salary min ({salary_min}) > max ({salary_max})")
        if salary_max > 500000:  # Reasonable upper bound for most positions
            issues.append(f"Salary max ({salary_max}) exceeds reasonable threshold")
        if salary_min < 15000:  # Below US minimum wage equivalent
            issues.append(f"Salary min ({salary_min}) below reasonable threshold")

    return len(issues) == 0, issues

check_consistency_dates(posted_date, last_modified_date, expiration_date=None) staticmethod

Check temporal consistency of dates.

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_consistent, issues)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def check_consistency_dates(
    posted_date: datetime,
    last_modified_date: datetime,
    expiration_date: datetime | None = None,
) -> tuple[bool, list[str]]:
    """
    Check temporal consistency of dates.

    Returns:
        Tuple of (is_consistent, issues)
    """
    issues = []

    if posted_date > last_modified_date:
        issues.append("Posted date is after last modified date")

    if expiration_date and expiration_date < posted_date:
        issues.append("Expiration date is before posted date")

    if posted_date > datetime.now(tz=UTC):
        issues.append("Posted date is in the future")

    return len(issues) == 0, issues

check_uniqueness_job_id(current_id, seen_ids) staticmethod

Check if job ID is unique in the batch.

Returns:

Type Description
tuple[bool, str]

Tuple of (is_unique, issue_message)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def check_uniqueness_job_id(current_id: str, seen_ids: set[str]) -> tuple[bool, str]:
    """
    Check if job ID is unique in the batch.

    Returns:
        Tuple of (is_unique, issue_message)
    """
    if current_id in seen_ids:
        return False, f"Duplicate job ID: {current_id}"
    return True, ""

check_validity_location(country, city, latitude=None, longitude=None) staticmethod

Check location validity.

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_valid, issues)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def check_validity_location(
    country: str,
    city: str,
    latitude: float | None = None,
    longitude: float | None = None,
) -> tuple[bool, list[str]]:
    """
    Check location validity.

    Returns:
        Tuple of (is_valid, issues)
    """
    issues = []

    if len(country) != 2:
        issues.append(f"Country code should be 2 chars, got: {country}")

    if not city or len(city) < 2:
        issues.append("City name is too short")

    if latitude is not None and not (-90 <= latitude <= 90):
        issues.append(f"Latitude out of range: {latitude}")

    if longitude is not None and not (-180 <= longitude <= 180):
        issues.append(f"Longitude out of range: {longitude}")

    return len(issues) == 0, issues

QualityScorer

Calculates quality scores for data records.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
class QualityScorer:
    """Calculates quality scores for data records."""

    @staticmethod
    def _score_salary(record: dict[str, Any]) -> float:
        """Award score for salary info. Returns 0.1 or 0.0."""
        benefits = record.get("benefits", {})
        if benefits.get("salary_min") and benefits.get("salary_max"):
            return 0.1
        return 0.0

    @staticmethod
    def _score_location(record: dict[str, Any]) -> float:
        """Award score for location. Returns 0.1 or 0.0."""
        if record.get("location", {}).get("city"):
            return 0.1
        return 0.0

    @staticmethod
    def _score_skills(record: dict[str, Any]) -> float:
        """Award score for skills. Returns 0.15 or 0.0."""
        if record.get("required_skills"):
            return 0.15
        return 0.0

    @staticmethod
    def _score_description(record: dict[str, Any]) -> float:
        """Award score for description. Returns 0.2 or 0.0."""
        job_desc = record.get("job_description", "")
        if job_desc and len(job_desc) > 200:
            return 0.2
        return 0.0

    @staticmethod
    def _score_dates(record: dict[str, Any]) -> float:
        """Award score for dates. Returns 0.1 or 0.0."""
        try:
            posted = record.get("posted_date")
            modified = record.get("last_modified_date")
            if posted and modified and posted <= modified:
                return 0.1
        except Exception:
            pass
        return 0.0

    @staticmethod
    def _score_company(record: dict[str, Any]) -> float:
        """Award score for company. Returns 0.1 or 0.0."""
        if record.get("company_name"):
            return 0.1
        return 0.0

    @staticmethod
    def _score_employment(record: dict[str, Any]) -> float:
        """Award score for employment type. Returns 0.1 or 0.0."""
        if record.get("employment_type"):
            return 0.1
        return 0.0

    @staticmethod
    def _score_benefits(record: dict[str, Any]) -> float:
        """Award score for benefits. Returns 0.05 or 0.0."""
        if record.get("benefits", {}).get("benefits"):
            return 0.05
        return 0.0

    @staticmethod
    def score_job_posting(record: dict[str, Any]) -> float:
        """
        Calculate quality score for job posting (0-1 scale).

        Scoring criteria:
        - Has salary range: +0.1
        - Has location details: +0.1
        - Has skill requirements: +0.15
        - Has job description (>200 chars): +0.2
        - Has reasonable dates: +0.1
        - Has company info: +0.1
        - Has employment type: +0.1
        - Has benefits listed: +0.05

        Args:
            record: Job posting record

        Returns:
            Quality score (0.0 - 1.0)
        """
        score = (
            QualityScorer._score_salary(record)
            + QualityScorer._score_location(record)
            + QualityScorer._score_skills(record)
            + QualityScorer._score_description(record)
            + QualityScorer._score_dates(record)
            + QualityScorer._score_company(record)
            + QualityScorer._score_employment(record)
            + QualityScorer._score_benefits(record)
        )
        return min(score, 1.0)

    @staticmethod
    def score_user_profile(record: dict[str, Any]) -> float:
        """
        Calculate quality score for user profile (0-1 scale).

        Scoring criteria:
        - Has email: +0.15
        - Has name: +0.1
        - Has professional info: +0.2
        - Has skills: +0.15
        - Has experience: +0.1
        - Has preferences: +0.15
        - Profile completion >50%: +0.15

        Returns:
            Quality score (0.0 - 1.0)
        """
        score = 0.0

        if record.get("email"):
            score += 0.15

        if record.get("first_name") and record.get("last_name"):
            score += 0.1

        if record.get("current_title") or record.get("current_company"):
            score += 0.2

        if record.get("skills"):
            score += 0.15

        if record.get("years_experience"):
            score += 0.1

        if record.get("preferred_job_titles") or record.get("preferred_locations"):
            score += 0.15

        if record.get("profile_completion_percentage", 0) > 50:
            score += 0.15

        return min(score, 1.0)

score_job_posting(record) staticmethod

Calculate quality score for job posting (0-1 scale).

Scoring criteria: - Has salary range: +0.1 - Has location details: +0.1 - Has skill requirements: +0.15 - Has job description (>200 chars): +0.2 - Has reasonable dates: +0.1 - Has company info: +0.1 - Has employment type: +0.1 - Has benefits listed: +0.05

Parameters:

Name Type Description Default
record dict[str, Any]

Job posting record

required

Returns:

Type Description
float

Quality score (0.0 - 1.0)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def score_job_posting(record: dict[str, Any]) -> float:
    """
    Calculate quality score for job posting (0-1 scale).

    Scoring criteria:
    - Has salary range: +0.1
    - Has location details: +0.1
    - Has skill requirements: +0.15
    - Has job description (>200 chars): +0.2
    - Has reasonable dates: +0.1
    - Has company info: +0.1
    - Has employment type: +0.1
    - Has benefits listed: +0.05

    Args:
        record: Job posting record

    Returns:
        Quality score (0.0 - 1.0)
    """
    score = (
        QualityScorer._score_salary(record)
        + QualityScorer._score_location(record)
        + QualityScorer._score_skills(record)
        + QualityScorer._score_description(record)
        + QualityScorer._score_dates(record)
        + QualityScorer._score_company(record)
        + QualityScorer._score_employment(record)
        + QualityScorer._score_benefits(record)
    )
    return min(score, 1.0)

score_user_profile(record) staticmethod

Calculate quality score for user profile (0-1 scale).

Scoring criteria: - Has email: +0.15 - Has name: +0.1 - Has professional info: +0.2 - Has skills: +0.15 - Has experience: +0.1 - Has preferences: +0.15 - Profile completion >50%: +0.15

Returns:

Type Description
float

Quality score (0.0 - 1.0)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def score_user_profile(record: dict[str, Any]) -> float:
    """
    Calculate quality score for user profile (0-1 scale).

    Scoring criteria:
    - Has email: +0.15
    - Has name: +0.1
    - Has professional info: +0.2
    - Has skills: +0.15
    - Has experience: +0.1
    - Has preferences: +0.15
    - Profile completion >50%: +0.15

    Returns:
        Quality score (0.0 - 1.0)
    """
    score = 0.0

    if record.get("email"):
        score += 0.15

    if record.get("first_name") and record.get("last_name"):
        score += 0.1

    if record.get("current_title") or record.get("current_company"):
        score += 0.2

    if record.get("skills"):
        score += 0.15

    if record.get("years_experience"):
        score += 0.1

    if record.get("preferred_job_titles") or record.get("preferred_locations"):
        score += 0.15

    if record.get("profile_completion_percentage", 0) > 50:
        score += 0.15

    return min(score, 1.0)

SchemaValidator

Validates that data conforms to DEX schema specifications.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
class SchemaValidator:
    """Validates that data conforms to DEX schema specifications."""

    @staticmethod
    def validate_job_posting(data: Mapping[str, Any]) -> tuple[bool, list[str]]:
        """
        Validate job posting data against JobPosting schema.

        Args:
            data: Dictionary containing job posting data

        Returns:
            Tuple of (is_valid, error_messages)
        """
        errors = []
        try:
            JobPosting(**data)
            return True, []
        except Exception as e:
            errors.append(str(e))
            return False, errors

    @staticmethod
    def validate_user_profile(data: dict[str, Any]) -> tuple[bool, list[str]]:
        """Validate user profile data against schema."""
        errors = []
        try:
            UserProfile(**data)
            return True, []
        except Exception as e:
            errors.append(str(e))
            return False, errors

validate_job_posting(data) staticmethod

Validate job posting data against JobPosting schema.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Dictionary containing job posting data

required

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (is_valid, error_messages)

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def validate_job_posting(data: Mapping[str, Any]) -> tuple[bool, list[str]]:
    """
    Validate job posting data against JobPosting schema.

    Args:
        data: Dictionary containing job posting data

    Returns:
        Tuple of (is_valid, error_messages)
    """
    errors = []
    try:
        JobPosting(**data)
        return True, []
    except Exception as e:
        errors.append(str(e))
        return False, errors

validate_user_profile(data) staticmethod

Validate user profile data against schema.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
@staticmethod
def validate_user_profile(data: dict[str, Any]) -> tuple[bool, list[str]]:
    """Validate user profile data against schema."""
    errors = []
    try:
        UserProfile(**data)
        return True, []
    except Exception as e:
        errors.append(str(e))
        return False, errors

ValidationReport

Generates validation reports for data quality assessment.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
class ValidationReport:
    """Generates validation reports for data quality assessment."""

    def __init__(self) -> None:
        self.total_records = 0
        self.valid_records = 0
        self.invalid_records = 0
        self.errors: list[dict[str, Any]] = []
        self.warnings: list[dict[str, Any]] = []

    def add_error(self, record_id: str, error_type: str, message: str) -> None:
        """Record a validation error."""
        self.invalid_records += 1
        self.errors.append({"record_id": record_id, "type": error_type, "message": message})

    def add_warning(self, record_id: str, warning_type: str, message: str) -> None:
        """Record a validation warning."""
        self.warnings.append({"record_id": record_id, "type": warning_type, "message": message})

    def mark_valid(self) -> None:
        """Mark a record as valid."""
        self.valid_records += 1

    def finalize(self) -> dict[str, Any]:
        """Generate final validation report."""
        total = self.valid_records + self.invalid_records
        valid_pct = (self.valid_records / total * 100) if total > 0 else 0

        return {
            "total_records": total,
            "valid_records": self.valid_records,
            "invalid_records": self.invalid_records,
            "validity_percentage": valid_pct,
            "error_count": len(self.errors),
            "warning_count": len(self.warnings),
            "errors": self.errors[:100],  # Top 100 errors
            "warnings": self.warnings[:100],  # Top 100 warnings
        }

add_error(record_id, error_type, message)

Record a validation error.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
def add_error(self, record_id: str, error_type: str, message: str) -> None:
    """Record a validation error."""
    self.invalid_records += 1
    self.errors.append({"record_id": record_id, "type": error_type, "message": message})

add_warning(record_id, warning_type, message)

Record a validation warning.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
def add_warning(self, record_id: str, warning_type: str, message: str) -> None:
    """Record a validation warning."""
    self.warnings.append({"record_id": record_id, "type": warning_type, "message": message})

mark_valid()

Mark a record as valid.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
def mark_valid(self) -> None:
    """Mark a record as valid."""
    self.valid_records += 1

finalize()

Generate final validation report.

Source code in packages/dataenginex/src/dataenginex/core/validators.py
def finalize(self) -> dict[str, Any]:
    """Generate final validation report."""
    total = self.valid_records + self.invalid_records
    valid_pct = (self.valid_records / total * 100) if total > 0 else 0

    return {
        "total_records": total,
        "valid_records": self.valid_records,
        "invalid_records": self.invalid_records,
        "validity_percentage": valid_pct,
        "error_count": len(self.errors),
        "warning_count": len(self.warnings),
        "errors": self.errors[:100],  # Top 100 errors
        "warnings": self.warnings[:100],  # Top 100 warnings
    }