Skip to content

dataenginex.warehouse

SQL-like transforms, persistent lineage, warehouse utilities.

Public API::

from dataenginex.warehouse import (
    Transform, TransformPipeline, TransformResult,
    RenameFieldsTransform, DropNullsTransform,
    CastTypesTransform, AddTimestampTransform, FilterTransform,
    LineageEvent, PersistentLineage,
)

LineageEvent dataclass

A single lineage event describing a data operation.

Attributes:

Name Type Description
event_id str

Auto-generated unique identifier (12-char hex).

parent_id str | None

ID of the upstream event that produced the input.

operation str

Type of operation ("ingest", "transform", "enrich", "export").

layer str

Medallion layer ("bronze", "silver", "gold").

source str

Where data came from.

destination str

Where data was written.

input_count int

Number of input records.

output_count int

Number of output records.

error_count int

Number of records that errored.

quality_score float | None

Quality score of the output (0.0–1.0).

pipeline_name str

Name of the owning pipeline.

step_name str

Name of the transform step.

metadata dict[str, Any]

Free-form context dict.

timestamp datetime

When the event occurred.

Source code in src/dataenginex/warehouse/lineage.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass
class LineageEvent:
    """A single lineage event describing a data operation.

    Attributes:
        event_id: Auto-generated unique identifier (12-char hex).
        parent_id: ID of the upstream event that produced the input.
        operation: Type of operation (``"ingest"``, ``"transform"``, ``"enrich"``, ``"export"``).
        layer: Medallion layer (``"bronze"``, ``"silver"``, ``"gold"``).
        source: Where data came from.
        destination: Where data was written.
        input_count: Number of input records.
        output_count: Number of output records.
        error_count: Number of records that errored.
        quality_score: Quality score of the output (0.0–1.0).
        pipeline_name: Name of the owning pipeline.
        step_name: Name of the transform step.
        metadata: Free-form context dict.
        timestamp: When the event occurred.
    """

    event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    parent_id: str | None = None

    # What happened
    operation: str = ""  # "ingest", "transform", "enrich", "export"
    layer: str = ""  # "bronze", "silver", "gold"
    source: str = ""
    destination: str = ""

    # Counts
    input_count: int = 0
    output_count: int = 0
    error_count: int = 0
    quality_score: float | None = None

    # Context
    pipeline_name: str = ""
    step_name: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    def to_dict(self) -> dict[str, Any]:
        """Serialize the lineage event to a plain dictionary."""
        d = asdict(self)
        d["timestamp"] = self.timestamp.isoformat()
        return d

to_dict()

Serialize the lineage event to a plain dictionary.

Source code in src/dataenginex/warehouse/lineage.py
69
70
71
72
73
def to_dict(self) -> dict[str, Any]:
    """Serialize the lineage event to a plain dictionary."""
    d = asdict(self)
    d["timestamp"] = self.timestamp.isoformat()
    return d

PersistentLineage

JSON-file-backed lineage store.

Example::

lineage = PersistentLineage("data/lineage.json")
ev = lineage.record(
    operation="ingest",
    layer="bronze",
    source="linkedin",
    input_count=1250,
    output_count=1250,
)
# later
lineage.record(
    operation="transform",
    layer="silver",
    parent_id=ev.event_id,
    input_count=1250,
    output_count=1200,
    quality_score=0.88,
)
Source code in src/dataenginex/warehouse/lineage.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class PersistentLineage:
    """JSON-file-backed lineage store.

    Example::

        lineage = PersistentLineage("data/lineage.json")
        ev = lineage.record(
            operation="ingest",
            layer="bronze",
            source="linkedin",
            input_count=1250,
            output_count=1250,
        )
        # later
        lineage.record(
            operation="transform",
            layer="silver",
            parent_id=ev.event_id,
            input_count=1250,
            output_count=1200,
            quality_score=0.88,
        )
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        self._events: list[LineageEvent] = []
        self._persist_path = Path(persist_path) if persist_path else None
        self._lock = threading.Lock()
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def record(self, **kwargs: Any) -> LineageEvent:
        """Create and store a new lineage event.

        Accepts the same keyword arguments as ``LineageEvent``.
        """
        with self._lock:
            event = LineageEvent(**kwargs)
            self._events.append(event)
            logger.info(
                "Lineage event %s: %s %s%s (%d%d)",
                event.event_id,
                event.operation,
                event.source,
                event.destination,
                event.input_count,
                event.output_count,
            )
            self._save()
        return event

    def get_event(self, event_id: str) -> LineageEvent | None:
        """Fetch a single event by ID."""
        for ev in self._events:
            if ev.event_id == event_id:
                return ev
        return None

    def get_children(self, parent_id: str) -> list[LineageEvent]:
        """Return events that have *parent_id* as their parent."""
        return [ev for ev in self._events if ev.parent_id == parent_id]

    def get_chain(self, event_id: str) -> list[LineageEvent]:
        """Walk up from *event_id* to the root and return the full chain."""
        chain: list[LineageEvent] = []
        current = self.get_event(event_id)
        while current:
            chain.append(current)
            current = self.get_event(current.parent_id) if current.parent_id else None
        chain.reverse()
        return chain

    def get_by_layer(self, layer: str) -> list[LineageEvent]:
        """Return all events for a given medallion layer."""
        return [ev for ev in self._events if ev.layer == layer]

    def get_by_pipeline(self, pipeline_name: str) -> list[LineageEvent]:
        """Return all events for a given pipeline."""
        return [ev for ev in self._events if ev.pipeline_name == pipeline_name]

    @property
    def all_events(self) -> list[LineageEvent]:
        """Return a shallow copy of all stored lineage events."""
        return list(self._events)

    def summary(self) -> dict[str, Any]:
        """Return high-level lineage statistics."""
        layers: dict[str, int] = {}
        operations: dict[str, int] = {}
        for ev in self._events:
            layers[ev.layer] = layers.get(ev.layer, 0) + 1
            operations[ev.operation] = operations.get(ev.operation, 0) + 1
        return {
            "total_events": len(self._events),
            "by_layer": layers,
            "by_operation": operations,
        }

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        data = [ev.to_dict() for ev in self._events]
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        try:
            raw = json.loads(self._persist_path.read_text())
            for item in raw:
                item.pop("timestamp", None)
                self._events.append(LineageEvent(**item))
            logger.info(
                "lineage events loaded",
                count=len(self._events),
                path=str(self._persist_path),
            )
        except (json.JSONDecodeError, TypeError, KeyError) as exc:
            logger.warning(
                "lineage file corrupted, starting fresh",
                path=str(self._persist_path),
                error=str(exc),
            )
            self._events = []

all_events property

Return a shallow copy of all stored lineage events.

record(**kwargs)

Create and store a new lineage event.

Accepts the same keyword arguments as LineageEvent.

Source code in src/dataenginex/warehouse/lineage.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def record(self, **kwargs: Any) -> LineageEvent:
    """Create and store a new lineage event.

    Accepts the same keyword arguments as ``LineageEvent``.
    """
    with self._lock:
        event = LineageEvent(**kwargs)
        self._events.append(event)
        logger.info(
            "Lineage event %s: %s %s%s (%d%d)",
            event.event_id,
            event.operation,
            event.source,
            event.destination,
            event.input_count,
            event.output_count,
        )
        self._save()
    return event

get_event(event_id)

Fetch a single event by ID.

Source code in src/dataenginex/warehouse/lineage.py
129
130
131
132
133
134
def get_event(self, event_id: str) -> LineageEvent | None:
    """Fetch a single event by ID."""
    for ev in self._events:
        if ev.event_id == event_id:
            return ev
    return None

get_children(parent_id)

Return events that have parent_id as their parent.

Source code in src/dataenginex/warehouse/lineage.py
136
137
138
def get_children(self, parent_id: str) -> list[LineageEvent]:
    """Return events that have *parent_id* as their parent."""
    return [ev for ev in self._events if ev.parent_id == parent_id]

get_chain(event_id)

Walk up from event_id to the root and return the full chain.

Source code in src/dataenginex/warehouse/lineage.py
140
141
142
143
144
145
146
147
148
def get_chain(self, event_id: str) -> list[LineageEvent]:
    """Walk up from *event_id* to the root and return the full chain."""
    chain: list[LineageEvent] = []
    current = self.get_event(event_id)
    while current:
        chain.append(current)
        current = self.get_event(current.parent_id) if current.parent_id else None
    chain.reverse()
    return chain

get_by_layer(layer)

Return all events for a given medallion layer.

Source code in src/dataenginex/warehouse/lineage.py
150
151
152
def get_by_layer(self, layer: str) -> list[LineageEvent]:
    """Return all events for a given medallion layer."""
    return [ev for ev in self._events if ev.layer == layer]

get_by_pipeline(pipeline_name)

Return all events for a given pipeline.

Source code in src/dataenginex/warehouse/lineage.py
154
155
156
def get_by_pipeline(self, pipeline_name: str) -> list[LineageEvent]:
    """Return all events for a given pipeline."""
    return [ev for ev in self._events if ev.pipeline_name == pipeline_name]

summary()

Return high-level lineage statistics.

Source code in src/dataenginex/warehouse/lineage.py
163
164
165
166
167
168
169
170
171
172
173
174
def summary(self) -> dict[str, Any]:
    """Return high-level lineage statistics."""
    layers: dict[str, int] = {}
    operations: dict[str, int] = {}
    for ev in self._events:
        layers[ev.layer] = layers.get(ev.layer, 0) + 1
        operations[ev.operation] = operations.get(ev.operation, 0) + 1
    return {
        "total_events": len(self._events),
        "by_layer": layers,
        "by_operation": operations,
    }

AddTimestampTransform

Bases: Transform

Add a processing timestamp field to every record.

Source code in src/dataenginex/warehouse/transforms.py
145
146
147
148
149
150
151
152
153
154
155
156
class AddTimestampTransform(Transform):
    """Add a processing timestamp field to every record."""

    def __init__(self, field_name: str = "processed_at") -> None:
        super().__init__(name="add_timestamp", description="Add processing timestamp")
        self.field_name = field_name

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Add an ISO-8601 processing timestamp to *record*."""
        out = dict(record)
        out[self.field_name] = datetime.now(tz=UTC).isoformat()
        return out

apply(record)

Add an ISO-8601 processing timestamp to record.

Source code in src/dataenginex/warehouse/transforms.py
152
153
154
155
156
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Add an ISO-8601 processing timestamp to *record*."""
    out = dict(record)
    out[self.field_name] = datetime.now(tz=UTC).isoformat()
    return out

CastTypesTransform

Bases: Transform

Cast fields to target types (int, float, str, bool).

Source code in src/dataenginex/warehouse/transforms.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class CastTypesTransform(Transform):
    """Cast fields to target types (``int``, ``float``, ``str``, ``bool``)."""

    _CASTERS: dict[str, type] = {"int": int, "float": float, "str": str, "bool": bool}

    def __init__(self, type_map: dict[str, str]) -> None:
        super().__init__(name="cast_types", description="Cast field types")
        self.type_map = type_map

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Cast specified fields to their target types."""
        out = dict(record)
        for field_name, target in self.type_map.items():
            if field_name in out and out[field_name] is not None:
                caster = self._CASTERS.get(target)
                if caster:
                    with contextlib.suppress(ValueError, TypeError):
                        out[field_name] = caster(out[field_name])
        return out

apply(record)

Cast specified fields to their target types.

Source code in src/dataenginex/warehouse/transforms.py
133
134
135
136
137
138
139
140
141
142
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Cast specified fields to their target types."""
    out = dict(record)
    for field_name, target in self.type_map.items():
        if field_name in out and out[field_name] is not None:
            caster = self._CASTERS.get(target)
            if caster:
                with contextlib.suppress(ValueError, TypeError):
                    out[field_name] = caster(out[field_name])
    return out

DropNullsTransform

Bases: Transform

Drop records that have None in any of the specified fields.

Source code in src/dataenginex/warehouse/transforms.py
109
110
111
112
113
114
115
116
117
118
119
120
121
class DropNullsTransform(Transform):
    """Drop records that have *None* in any of the specified fields."""

    def __init__(self, required_fields: list[str]) -> None:
        super().__init__(name="drop_nulls", description="Drop records with null required fields")
        self.required_fields = required_fields

    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Return ``None`` if any required field is null, else the record."""
        for f in self.required_fields:
            if record.get(f) is None:
                return None
        return record

apply(record)

Return None if any required field is null, else the record.

Source code in src/dataenginex/warehouse/transforms.py
116
117
118
119
120
121
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Return ``None`` if any required field is null, else the record."""
    for f in self.required_fields:
        if record.get(f) is None:
            return None
    return record

FilterTransform

Bases: Transform

Keep only records that match a predicate expression.

predicate receives a record dict and returns True to keep it.

Source code in src/dataenginex/warehouse/transforms.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class FilterTransform(Transform):
    """Keep only records that match a predicate expression.

    ``predicate`` receives a record dict and returns ``True`` to keep it.
    """

    def __init__(self, name: str, predicate: Any) -> None:
        super().__init__(name=name, description="Filter records by predicate")
        self._predicate = predicate

    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Keep *record* if the predicate returns ``True``, else drop it."""
        if self._predicate(record):
            return record
        return None

apply(record)

Keep record if the predicate returns True, else drop it.

Source code in src/dataenginex/warehouse/transforms.py
169
170
171
172
173
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Keep *record* if the predicate returns ``True``, else drop it."""
    if self._predicate(record):
        return record
    return None

RenameFieldsTransform

Bases: Transform

Rename keys in a record according to a mapping.

Source code in src/dataenginex/warehouse/transforms.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class RenameFieldsTransform(Transform):
    """Rename keys in a record according to a mapping."""

    def __init__(self, mapping: dict[str, str]) -> None:
        super().__init__(name="rename_fields", description="Rename record fields")
        self.mapping = mapping

    def apply(self, record: dict[str, Any]) -> dict[str, Any]:
        """Rename keys in *record* according to the configured mapping."""
        out = dict(record)
        for old_key, new_key in self.mapping.items():
            if old_key in out:
                out[new_key] = out.pop(old_key)
        return out

apply(record)

Rename keys in record according to the configured mapping.

Source code in src/dataenginex/warehouse/transforms.py
100
101
102
103
104
105
106
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
    """Rename keys in *record* according to the configured mapping."""
    out = dict(record)
    for old_key, new_key in self.mapping.items():
        if old_key in out:
            out[new_key] = out.pop(old_key)
    return out

Transform

Bases: ABC

Base class for a single data transform step.

Subclass and implement apply which receives one record and returns either a transformed record or None to drop it.

Source code in src/dataenginex/warehouse/transforms.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Transform(ABC):
    """Base class for a single data transform step.

    Subclass and implement ``apply`` which receives one record and returns
    either a transformed record or *None* to drop it.
    """

    def __init__(self, name: str, description: str = "") -> None:
        self.name = name
        self.description = description

    @abstractmethod
    def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
        """Transform *record* in place or return a new dict.

        Return *None* to drop the record from the pipeline.
        """
        ...

apply(record) abstractmethod

Transform record in place or return a new dict.

Return None to drop the record from the pipeline.

Source code in src/dataenginex/warehouse/transforms.py
79
80
81
82
83
84
85
@abstractmethod
def apply(self, record: dict[str, Any]) -> dict[str, Any] | None:
    """Transform *record* in place or return a new dict.

    Return *None* to drop the record from the pipeline.
    """
    ...

TransformPipeline

Execute an ordered chain of Transform steps over a batch.

Example::

pipeline = TransformPipeline("bronze_to_silver")
pipeline.add(DropNullsTransform(["event_id", "user_name"]))
pipeline.add(CastTypesTransform({"amount": "float"}))
result = pipeline.run(records)
Source code in src/dataenginex/warehouse/transforms.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class TransformPipeline:
    """Execute an ordered chain of ``Transform`` steps over a batch.

    Example::

        pipeline = TransformPipeline("bronze_to_silver")
        pipeline.add(DropNullsTransform(["event_id", "user_name"]))
        pipeline.add(CastTypesTransform({"amount": "float"}))
        result = pipeline.run(records)
    """

    def __init__(self, name: str) -> None:
        self.name = name
        self._steps: list[Transform] = []

    def add(self, transform: Transform) -> TransformPipeline:
        """Append a transform step to the pipeline."""
        self._steps.append(transform)
        return self  # fluent API

    def run(self, records: list[dict[str, Any]]) -> TransformResult:
        """Execute all steps in order and return the aggregated result."""
        start = time.perf_counter()
        current = list(records)
        step_metrics: list[dict[str, Any]] = []

        for step in self._steps:
            step_start = time.perf_counter()
            before_count = len(current)
            output: list[dict[str, Any]] = []

            for rec in current:
                try:
                    result = step.apply(rec)
                    if result is not None:
                        output.append(result)
                except Exception as exc:
                    logger.warning(
                        "Transform %s failed on record: %s",
                        step.name,
                        exc,
                    )

            step_duration = (time.perf_counter() - step_start) * 1000
            step_metrics.append(
                {
                    "step": step.name,
                    "input_count": before_count,
                    "output_count": len(output),
                    "dropped": before_count - len(output),
                    "duration_ms": round(step_duration, 2),
                }
            )
            current = output

        total_duration = (time.perf_counter() - start) * 1000
        return TransformResult(
            input_count=len(records),
            output_count=len(current),
            dropped_count=len(records) - len(current),
            duration_ms=round(total_duration, 2),
            records=current,
            step_metrics=step_metrics,
        )

add(transform)

Append a transform step to the pipeline.

Source code in src/dataenginex/warehouse/transforms.py
196
197
198
199
def add(self, transform: Transform) -> TransformPipeline:
    """Append a transform step to the pipeline."""
    self._steps.append(transform)
    return self  # fluent API

run(records)

Execute all steps in order and return the aggregated result.

Source code in src/dataenginex/warehouse/transforms.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def run(self, records: list[dict[str, Any]]) -> TransformResult:
    """Execute all steps in order and return the aggregated result."""
    start = time.perf_counter()
    current = list(records)
    step_metrics: list[dict[str, Any]] = []

    for step in self._steps:
        step_start = time.perf_counter()
        before_count = len(current)
        output: list[dict[str, Any]] = []

        for rec in current:
            try:
                result = step.apply(rec)
                if result is not None:
                    output.append(result)
            except Exception as exc:
                logger.warning(
                    "Transform %s failed on record: %s",
                    step.name,
                    exc,
                )

        step_duration = (time.perf_counter() - step_start) * 1000
        step_metrics.append(
            {
                "step": step.name,
                "input_count": before_count,
                "output_count": len(output),
                "dropped": before_count - len(output),
                "duration_ms": round(step_duration, 2),
            }
        )
        current = output

    total_duration = (time.perf_counter() - start) * 1000
    return TransformResult(
        input_count=len(records),
        output_count=len(current),
        dropped_count=len(records) - len(current),
        duration_ms=round(total_duration, 2),
        records=current,
        step_metrics=step_metrics,
    )

TransformResult dataclass

Outcome of running a pipeline over a batch of records.

Attributes:

Name Type Description
input_count int

Number of records fed into the pipeline.

output_count int

Number of records that passed all transform steps.

dropped_count int

Records dropped during transformation.

error_count int

Records that caused transform errors.

duration_ms float

Total pipeline execution time in milliseconds.

records list[dict[str, Any]]

Transformed output records.

step_metrics list[dict[str, Any]]

Per-step metrics (input/output/dropped/duration).

completed_at datetime

Timestamp of pipeline completion.

Source code in src/dataenginex/warehouse/transforms.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class TransformResult:
    """Outcome of running a pipeline over a batch of records.

    Attributes:
        input_count: Number of records fed into the pipeline.
        output_count: Number of records that passed all transform steps.
        dropped_count: Records dropped during transformation.
        error_count: Records that caused transform errors.
        duration_ms: Total pipeline execution time in milliseconds.
        records: Transformed output records.
        step_metrics: Per-step metrics (input/output/dropped/duration).
        completed_at: Timestamp of pipeline completion.
    """

    input_count: int
    output_count: int
    dropped_count: int = 0
    error_count: int = 0
    duration_ms: float = 0.0
    records: list[dict[str, Any]] = field(default_factory=list)
    step_metrics: list[dict[str, Any]] = field(default_factory=list)
    completed_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

    @property
    def success_rate(self) -> float:
        """Fraction of input records that made it to the output."""
        return self.output_count / self.input_count if self.input_count else 0.0

success_rate property

Fraction of input records that made it to the output.