Skip to content

dataenginex.warehouse

SQL-like transforms, persistent lineage tracking, and warehouse utilities.

dataenginex.warehouse

SQL-like transforms, persistent lineage, warehouse utilities.

Public API::

from dataenginex.warehouse import (
    Transform, TransformPipeline, TransformResult,
    RenameFieldsTransform, DropNullsTransform,
    CastTypesTransform, AddTimestampTransform, FilterTransform,
    LineageEvent, PersistentLineage,
)

LineageEvent dataclass

A single lineage event describing a data operation.

Attributes:

Name Type Description
event_id str

Auto-generated unique identifier (12-char hex).

parent_id str | None

ID of the upstream event that produced the input.

operation str

Type of operation ("ingest", "transform", "enrich", "export").

layer str

Medallion layer ("bronze", "silver", "gold").

source str

Where data came from.

destination str

Where data was written.

input_count int

Number of input records.

output_count int

Number of output records.

error_count int

Number of records that errored.

quality_score float | None

Quality score of the output (0.0–1.0).

pipeline_name str

Name of the owning pipeline.

step_name str

Name of the transform step.

metadata dict[str, Any]

Free-form context dict.

timestamp datetime

When the event occurred.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
@dataclass
class LineageEvent:
    """A single lineage event describing a data operation.

    Attributes:
        event_id: Auto-generated unique identifier (12-char hex).
        parent_id: ID of the upstream event that produced the input.
        operation: Type of operation (``"ingest"``, ``"transform"``, ``"enrich"``, ``"export"``).
        layer: Medallion layer (``"bronze"``, ``"silver"``, ``"gold"``).
        source: Where data came from.
        destination: Where data was written.
        input_count: Number of input records.
        output_count: Number of output records.
        error_count: Number of records that errored.
        quality_score: Quality score of the output (0.0–1.0).
        pipeline_name: Name of the owning pipeline.
        step_name: Name of the transform step.
        metadata: Free-form context dict.
        timestamp: When the event occurred.
    """

    event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    parent_id: str | None = None

    # What happened
    operation: str = ""  # "ingest", "transform", "enrich", "export"
    layer: str = ""  # "bronze", "silver", "gold"
    source: str = ""
    destination: str = ""

    # Counts
    input_count: int = 0
    output_count: int = 0
    error_count: int = 0
    quality_score: float | None = None

    # Context
    pipeline_name: str = ""
    step_name: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    def to_dict(self) -> dict[str, Any]:
        """Serialize the lineage event to a plain dictionary."""
        d = asdict(self)
        d["timestamp"] = self.timestamp.isoformat()
        return d

to_dict()

Serialize the lineage event to a plain dictionary.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the lineage event to a plain dictionary."""
    d = asdict(self)
    d["timestamp"] = self.timestamp.isoformat()
    return d

PersistentLineage

JSON-file-backed lineage store.

Example::

lineage = PersistentLineage("data/lineage.json")
ev = lineage.record(
    operation="ingest",
    layer="bronze",
    source="linkedin",
    input_count=1250,
    output_count=1250,
)
# later
lineage.record(
    operation="transform",
    layer="silver",
    parent_id=ev.event_id,
    input_count=1250,
    output_count=1200,
    quality_score=0.88,
)
Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
class PersistentLineage:
    """JSON-file-backed lineage store.

    Example::

        lineage = PersistentLineage("data/lineage.json")
        ev = lineage.record(
            operation="ingest",
            layer="bronze",
            source="linkedin",
            input_count=1250,
            output_count=1250,
        )
        # later
        lineage.record(
            operation="transform",
            layer="silver",
            parent_id=ev.event_id,
            input_count=1250,
            output_count=1200,
            quality_score=0.88,
        )
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        self._events: list[LineageEvent] = []
        self._persist_path = Path(persist_path) if persist_path else None
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def record(self, **kwargs: Any) -> LineageEvent:
        """Create and store a new lineage event.

        Accepts the same keyword arguments as ``LineageEvent``.
        """
        event = LineageEvent(**kwargs)
        self._events.append(event)
        logger.info(
            "Lineage event %s: %s %s%s (%d%d)",
            event.event_id,
            event.operation,
            event.source,
            event.destination,
            event.input_count,
            event.output_count,
        )
        self._save()
        return event

    def get_event(self, event_id: str) -> LineageEvent | None:
        """Fetch a single event by ID."""
        for ev in self._events:
            if ev.event_id == event_id:
                return ev
        return None

    def get_children(self, parent_id: str) -> list[LineageEvent]:
        """Return events that have *parent_id* as their parent."""
        return [ev for ev in self._events if ev.parent_id == parent_id]

    def get_chain(self, event_id: str) -> list[LineageEvent]:
        """Walk up from *event_id* to the root and return the full chain."""
        chain: list[LineageEvent] = []
        current = self.get_event(event_id)
        while current:
            chain.append(current)
            current = self.get_event(current.parent_id) if current.parent_id else None
        chain.reverse()
        return chain

    def get_by_layer(self, layer: str) -> list[LineageEvent]:
        """Return all events for a given medallion layer."""
        return [ev for ev in self._events if ev.layer == layer]

    def get_by_pipeline(self, pipeline_name: str) -> list[LineageEvent]:
        """Return all events for a given pipeline."""
        return [ev for ev in self._events if ev.pipeline_name == pipeline_name]

    @property
    def all_events(self) -> list[LineageEvent]:
        """Return a shallow copy of all stored lineage events."""
        return list(self._events)

    def summary(self) -> dict[str, Any]:
        """Return high-level lineage statistics."""
        layers: dict[str, int] = {}
        operations: dict[str, int] = {}
        for ev in self._events:
            layers[ev.layer] = layers.get(ev.layer, 0) + 1
            operations[ev.operation] = operations.get(ev.operation, 0) + 1
        return {
            "total_events": len(self._events),
            "by_layer": layers,
            "by_operation": operations,
        }

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        data = [ev.to_dict() for ev in self._events]
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        raw = json.loads(self._persist_path.read_text())
        for item in raw:
            item.pop("timestamp", None)  # skip — auto-set on creation
            self._events.append(LineageEvent(**item))
        logger.info("Loaded %d lineage events from %s", len(self._events), self._persist_path)

all_events property

Return a shallow copy of all stored lineage events.

record(**kwargs)

Create and store a new lineage event.

Accepts the same keyword arguments as LineageEvent.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def record(self, **kwargs: Any) -> LineageEvent:
    """Create and store a new lineage event.

    Accepts the same keyword arguments as ``LineageEvent``.
    """
    event = LineageEvent(**kwargs)
    self._events.append(event)
    logger.info(
        "Lineage event %s: %s %s%s (%d%d)",
        event.event_id,
        event.operation,
        event.source,
        event.destination,
        event.input_count,
        event.output_count,
    )
    self._save()
    return event

get_event(event_id)

Fetch a single event by ID.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def get_event(self, event_id: str) -> LineageEvent | None:
    """Fetch a single event by ID."""
    for ev in self._events:
        if ev.event_id == event_id:
            return ev
    return None

get_children(parent_id)

Return events that have parent_id as their parent.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def get_children(self, parent_id: str) -> list[LineageEvent]:
    """Return events that have *parent_id* as their parent."""
    return [ev for ev in self._events if ev.parent_id == parent_id]

get_chain(event_id)

Walk up from event_id to the root and return the full chain.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def get_chain(self, event_id: str) -> list[LineageEvent]:
    """Walk up from *event_id* to the root and return the full chain."""
    chain: list[LineageEvent] = []
    current = self.get_event(event_id)
    while current:
        chain.append(current)
        current = self.get_event(current.parent_id) if current.parent_id else None
    chain.reverse()
    return chain

get_by_layer(layer)

Return all events for a given medallion layer.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def get_by_layer(self, layer: str) -> list[LineageEvent]:
    """Return all events for a given medallion layer."""
    return [ev for ev in self._events if ev.layer == layer]

get_by_pipeline(pipeline_name)

Return all events for a given pipeline.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def get_by_pipeline(self, pipeline_name: str) -> list[LineageEvent]:
    """Return all events for a given pipeline."""
    return [ev for ev in self._events if ev.pipeline_name == pipeline_name]

summary()

Return high-level lineage statistics.

Source code in packages/dataenginex/src/dataenginex/warehouse/lineage.py
def summary(self) -> dict[str, Any]:
    """Return high-level lineage statistics."""
    layers: dict[str, int] = {}
    operations: dict[str, int] = {}
    for ev in self._events:
        layers[ev.layer] = layers.get(ev.layer, 0) + 1
        operations[ev.operation] = operations.get(ev.operation, 0) + 1
    return {
        "total_events": len(self._events),
        "by_layer": layers,
        "by_operation": operations,
    }

AddTimestampTransform

Bases: Transform

Add a processing timestamp field to every record.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class AddTimestampTransform(Transform):
    """Add a processing timestamp field to every record."""

    def __init__(self, field_name: str = "processed_at") -> None:
        super().__init__(name="add_timestamp", description="Add processing timestamp")
        self.field_name = field_name

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Add an ISO-8601 processing timestamp to *record*."""
        out = dict(record)
        out[self.field_name] = datetime.now(tz=UTC).isoformat()
        return out

apply(record)

Add an ISO-8601 processing timestamp to record.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Add an ISO-8601 processing timestamp to *record*."""
    out = dict(record)
    out[self.field_name] = datetime.now(tz=UTC).isoformat()
    return out

CastTypesTransform

Bases: Transform

Cast fields to target types (int, float, str, bool).

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class CastTypesTransform(Transform):
    """Cast fields to target types (``int``, ``float``, ``str``, ``bool``)."""

    _CASTERS: dict[str, type] = {"int": int, "float": float, "str": str, "bool": bool}

    def __init__(self, type_map: dict[str, str]) -> None:
        super().__init__(name="cast_types", description="Cast field types")
        self.type_map = type_map

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Cast specified fields to their target types."""
        out = dict(record)
        for field_name, target in self.type_map.items():
            if field_name in out and out[field_name] is not None:
                caster = self._CASTERS.get(target)
                if caster:
                    with contextlib.suppress(ValueError, TypeError):
                        out[field_name] = caster(out[field_name])
        return out

apply(record)

Cast specified fields to their target types.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Cast specified fields to their target types."""
    out = dict(record)
    for field_name, target in self.type_map.items():
        if field_name in out and out[field_name] is not None:
            caster = self._CASTERS.get(target)
            if caster:
                with contextlib.suppress(ValueError, TypeError):
                    out[field_name] = caster(out[field_name])
    return out

DropNullsTransform

Bases: Transform

Drop records that have None in any of the specified fields.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class DropNullsTransform(Transform):
    """Drop records that have *None* in any of the specified fields."""

    def __init__(self, required_fields: list[str]) -> None:
        super().__init__(name="drop_nulls", description="Drop records with null required fields")
        self.required_fields = required_fields

    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Return ``None`` if any required field is null, else the record."""
        for f in self.required_fields:
            if record.get(f) is None:
                return None
        return record

apply(record)

Return None if any required field is null, else the record.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Return ``None`` if any required field is null, else the record."""
    for f in self.required_fields:
        if record.get(f) is None:
            return None
    return record

FilterTransform

Bases: Transform

Keep only records that match a predicate expression.

predicate receives a record dict and returns True to keep it.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class FilterTransform(Transform):
    """Keep only records that match a predicate expression.

    ``predicate`` receives a record dict and returns ``True`` to keep it.
    """

    def __init__(self, name: str, predicate: Any) -> None:
        super().__init__(name=name, description="Filter records by predicate")
        self._predicate = predicate

    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Keep *record* if the predicate returns ``True``, else drop it."""
        if self._predicate(record):
            return record
        return None

apply(record)

Keep record if the predicate returns True, else drop it.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Keep *record* if the predicate returns ``True``, else drop it."""
    if self._predicate(record):
        return record
    return None

RenameFieldsTransform

Bases: Transform

Rename keys in a record according to a mapping.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class RenameFieldsTransform(Transform):
    """Rename keys in a record according to a mapping."""

    def __init__(self, mapping: dict[str, str]) -> None:
        super().__init__(name="rename_fields", description="Rename record fields")
        self.mapping = mapping

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Rename keys in *record* according to the configured mapping."""
        out = dict(record)
        for old_key, new_key in self.mapping.items():
            if old_key in out:
                out[new_key] = out.pop(old_key)
        return out

apply(record)

Rename keys in record according to the configured mapping.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Rename keys in *record* according to the configured mapping."""
    out = dict(record)
    for old_key, new_key in self.mapping.items():
        if old_key in out:
            out[new_key] = out.pop(old_key)
    return out

Transform

Bases: ABC

Base class for a single data transform step.

Subclass and implement apply which receives one record and returns either a transformed record or None to drop it.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class Transform(ABC):
    """Base class for a single data transform step.

    Subclass and implement ``apply`` which receives one record and returns
    either a transformed record or *None* to drop it.
    """

    def __init__(self, name: str, description: str = "") -> None:
        self.name = name
        self.description = description

    @abstractmethod
    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Transform *record* in place or return a new dict.

        Return *None* to drop the record from the pipeline.
        """
        ...

apply(record) abstractmethod

Transform record in place or return a new dict.

Return None to drop the record from the pipeline.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
@abstractmethod
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Transform *record* in place or return a new dict.

    Return *None* to drop the record from the pipeline.
    """
    ...

TransformPipeline

Execute an ordered chain of Transform steps over a batch.

Example::

pipeline = TransformPipeline("bronze_to_silver")
pipeline.add(DropNullsTransform(["job_id", "company_name"]))
pipeline.add(CastTypesTransform({"salary_min": "float"}))
result = pipeline.run(records)
Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
class TransformPipeline:
    """Execute an ordered chain of ``Transform`` steps over a batch.

    Example::

        pipeline = TransformPipeline("bronze_to_silver")
        pipeline.add(DropNullsTransform(["job_id", "company_name"]))
        pipeline.add(CastTypesTransform({"salary_min": "float"}))
        result = pipeline.run(records)
    """

    def __init__(self, name: str) -> None:
        self.name = name
        self._steps: list[Transform] = []

    def add(self, transform: Transform) -> TransformPipeline:
        """Append a transform step to the pipeline."""
        self._steps.append(transform)
        return self  # fluent API

    def run(self, records: list[dict[str, Any]]) -> TransformResult:
        """Execute all steps in order and return the aggregated result."""
        start = time.perf_counter()
        current = list(records)
        step_metrics: list[dict[str, Any]] = []

        for step in self._steps:
            step_start = time.perf_counter()
            before_count = len(current)
            output: list[dict[str, Any]] = []

            for rec in current:
                try:
                    result = step.apply(rec)
                    if result is not None:
                        output.append(result)
                except Exception as exc:
                    logger.warning(
                        "Transform %s failed on record: %s",
                        step.name,
                        exc,
                    )

            step_duration = (time.perf_counter() - step_start) * 1000
            step_metrics.append(
                {
                    "step": step.name,
                    "input_count": before_count,
                    "output_count": len(output),
                    "dropped": before_count - len(output),
                    "duration_ms": round(step_duration, 2),
                }
            )
            current = output

        total_duration = (time.perf_counter() - start) * 1000
        return TransformResult(
            input_count=len(records),
            output_count=len(current),
            dropped_count=len(records) - len(current),
            duration_ms=round(total_duration, 2),
            records=current,
            step_metrics=step_metrics,
        )

add(transform)

Append a transform step to the pipeline.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def add(self, transform: Transform) -> TransformPipeline:
    """Append a transform step to the pipeline."""
    self._steps.append(transform)
    return self  # fluent API

run(records)

Execute all steps in order and return the aggregated result.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
def run(self, records: list[dict[str, Any]]) -> TransformResult:
    """Execute all steps in order and return the aggregated result."""
    start = time.perf_counter()
    current = list(records)
    step_metrics: list[dict[str, Any]] = []

    for step in self._steps:
        step_start = time.perf_counter()
        before_count = len(current)
        output: list[dict[str, Any]] = []

        for rec in current:
            try:
                result = step.apply(rec)
                if result is not None:
                    output.append(result)
            except Exception as exc:
                logger.warning(
                    "Transform %s failed on record: %s",
                    step.name,
                    exc,
                )

        step_duration = (time.perf_counter() - step_start) * 1000
        step_metrics.append(
            {
                "step": step.name,
                "input_count": before_count,
                "output_count": len(output),
                "dropped": before_count - len(output),
                "duration_ms": round(step_duration, 2),
            }
        )
        current = output

    total_duration = (time.perf_counter() - start) * 1000
    return TransformResult(
        input_count=len(records),
        output_count=len(current),
        dropped_count=len(records) - len(current),
        duration_ms=round(total_duration, 2),
        records=current,
        step_metrics=step_metrics,
    )

TransformResult dataclass

Outcome of running a pipeline over a batch of records.

Attributes:

Name Type Description
input_count int

Number of records fed into the pipeline.

output_count int

Number of records that passed all transform steps.

dropped_count int

Records dropped during transformation.

error_count int

Records that caused transform errors.

duration_ms float

Total pipeline execution time in milliseconds.

records list[dict[str, Any]]

Transformed output records.

step_metrics list[dict[str, Any]]

Per-step metrics (input/output/dropped/duration).

completed_at datetime

Timestamp of pipeline completion.

Source code in packages/dataenginex/src/dataenginex/warehouse/transforms.py
@dataclass
class TransformResult:
    """Outcome of running a pipeline over a batch of records.

    Attributes:
        input_count: Number of records fed into the pipeline.
        output_count: Number of records that passed all transform steps.
        dropped_count: Records dropped during transformation.
        error_count: Records that caused transform errors.
        duration_ms: Total pipeline execution time in milliseconds.
        records: Transformed output records.
        step_metrics: Per-step metrics (input/output/dropped/duration).
        completed_at: Timestamp of pipeline completion.
    """

    input_count: int
    output_count: int
    dropped_count: int = 0
    error_count: int = 0
    duration_ms: float = 0.0
    records: list[dict[str, Any]] = field(default_factory=list)
    step_metrics: list[dict[str, Any]] = field(default_factory=list)
    completed_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    @property
    def success_rate(self) -> float:
        """Fraction of input records that made it to the output."""
        return self.output_count / self.input_count if self.input_count else 0.0

success_rate property

Fraction of input records that made it to the output.