Skip to content

dataenginex.data

Data layer — connectors, transforms, quality gates, pipelines.

Public API::

from dataenginex.data import (
    # New registry-based API
    connector_registry, transform_registry,
    DuckDBConnector, CsvConnector,
    PipelineRunner, PipelineResult,
    QualityResult, check_quality,
    # Legacy API
    DataConnector, RestConnector, FileConnector,
    ConnectorStatus, FetchResult,
    DataProfiler, ProfileReport, ColumnProfile,
    SchemaRegistry, SchemaVersion,
)

CsvConnector

Bases: BaseConnector

CSV connector backed by DuckDB CSV reader.

Parameters:

Name Type Description Default
path str

Directory containing CSV files.

'.'
default_file str | None

Default file to read (for conformance test).

None
Source code in src/dataenginex/data/connectors/csv.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@connector_registry.decorator("csv")
class CsvConnector(BaseConnector):
    """CSV connector backed by DuckDB CSV reader.

    Args:
        path: Directory containing CSV files.
        default_file: Default file to read (for conformance test).
    """

    def __init__(self, path: str = ".", default_file: str | None = None, **kwargs: Any) -> None:
        self._path = Path(path)
        self._default_file = default_file
        self._conn: duckdb.DuckDBPyConnection | None = None

    def connect(self) -> None:
        self._conn = duckdb.connect(":memory:")
        logger.debug("csv connector ready", path=str(self._path))

    def disconnect(self) -> None:
        if self._conn is not None:
            self._conn.close()
            self._conn = None

    def read(
        self,
        *,
        table: str | None = None,
        default: Any = None,
        **kwargs: Any,
    ) -> list[dict[str, Any]]:
        if self._conn is None:
            msg = "Not connected — call connect() first"
            raise RuntimeError(msg)

        filename = table or self._default_file
        if filename is None:
            msg = "No table/file specified"
            raise ValueError(msg)

        filepath = self._path / filename
        if not filepath.exists():
            if default is not None:
                return list(default)
            msg = f"CSV file not found: {filepath}"
            raise FileNotFoundError(msg)

        safe_path = str(filepath).replace("'", "''")
        result = self._conn.execute(f"SELECT * FROM read_csv_auto('{safe_path}')")
        columns = [desc[0] for desc in result.description]
        return [dict(zip(columns, row, strict=True)) for row in result.fetchall()]

    def write(self, data: Any, *, table: str = "output.csv", **kwargs: Any) -> None:
        if self._conn is None:
            msg = "Not connected — call connect() first"
            raise RuntimeError(msg)
        import pyarrow as pa
        import pyarrow.csv as pcsv

        filepath = self._path / table
        if isinstance(data, list):
            tbl = pa.Table.from_pylist(data)
        elif isinstance(data, pa.Table):
            tbl = data
        else:
            msg = f"Unsupported data type: {type(data)}"
            raise TypeError(msg)

        pcsv.write_csv(tbl, filepath)
        logger.info("csv written", path=str(filepath), rows=len(tbl))

    def health_check(self) -> bool:
        return self._conn is not None and self._path.exists()

DuckDBConnector

Bases: BaseConnector

DuckDB-backed connector.

Parameters:

Name Type Description Default
database str

Path to DuckDB file. ":memory:" for in-memory.

':memory:'
Source code in src/dataenginex/data/connectors/duckdb.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@connector_registry.decorator("duckdb", is_default=True)
class DuckDBConnector(BaseConnector):
    """DuckDB-backed connector.

    Args:
        database: Path to DuckDB file. ":memory:" for in-memory.
    """

    def __init__(self, database: str = ":memory:", **kwargs: Any) -> None:
        self._database = database
        self._conn: duckdb.DuckDBPyConnection | None = None

    def connect(self) -> None:
        if self._conn is not None:
            return
        self._conn = duckdb.connect(self._database)
        logger.debug("duckdb connected", database=self._database)

    def disconnect(self) -> None:
        if self._conn is not None:
            self._conn.close()
            self._conn = None
            logger.debug("duckdb disconnected", database=self._database)

    def read(self, *, table: str, default: Any = None, **kwargs: Any) -> list[dict[str, Any]]:
        if self._conn is None:
            raise RuntimeError(_NOT_CONNECTED)
        try:
            result = self._conn.execute(f"SELECT * FROM {table}")  # noqa: S608
            columns: list[str] = [desc[0] for desc in result.description]
            return [dict(zip(columns, row, strict=True)) for row in result.fetchall()]
        except duckdb.CatalogException:
            if default is not None:
                return list(default)
            raise

    def write(self, data: Any, *, table: str, **kwargs: Any) -> None:
        if self._conn is None:
            raise RuntimeError(_NOT_CONNECTED)
        import pyarrow as pa

        if isinstance(data, list):
            if len(data) == 0:
                return
            tbl = pa.Table.from_pylist(data)
        elif isinstance(data, pa.Table):
            tbl = data
        else:
            msg = f"Unsupported data type: {type(data)}"
            raise TypeError(msg)

        self._conn.execute(f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM tbl")  # noqa: S608
        logger.info("data written", table=table, rows=len(tbl))

    def health_check(self) -> bool:
        if self._conn is None:
            return False
        try:
            self._conn.execute("SELECT 1")
            return True
        except Exception:
            return False

    def execute(self, sql: str) -> list[dict[str, Any]]:
        """Execute raw SQL and return results as list of dicts."""
        if self._conn is None:
            raise RuntimeError(_NOT_CONNECTED)
        result = self._conn.execute(sql)
        columns = [desc[0] for desc in result.description]
        return [dict(zip(columns, row, strict=True)) for row in result.fetchall()]

    @property
    def connection(self) -> duckdb.DuckDBPyConnection:
        """Direct access to the DuckDB connection for advanced use."""
        if self._conn is None:
            raise RuntimeError(_NOT_CONNECTED)
        return self._conn

connection property

Direct access to the DuckDB connection for advanced use.

execute(sql)

Execute raw SQL and return results as list of dicts.

Source code in src/dataenginex/data/connectors/duckdb.py
85
86
87
88
89
90
91
def execute(self, sql: str) -> list[dict[str, Any]]:
    """Execute raw SQL and return results as list of dicts."""
    if self._conn is None:
        raise RuntimeError(_NOT_CONNECTED)
    result = self._conn.execute(sql)
    columns = [desc[0] for desc in result.description]
    return [dict(zip(columns, row, strict=True)) for row in result.fetchall()]

ConnectorStatus

Bases: StrEnum

Lifecycle states for a data connector.

Source code in src/dataenginex/data/connectors/legacy.py
39
40
41
42
43
44
45
46
class ConnectorStatus(StrEnum):
    """Lifecycle states for a data connector."""

    IDLE = "idle"
    CONNECTED = "connected"
    FETCHING = "fetching"
    ERROR = "error"
    CLOSED = "closed"

DataConnector

Bases: ABC

Base class for all data connectors in DEX.

Subclass and implement connect, fetch, and close.

Source code in src/dataenginex/data/connectors/legacy.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class DataConnector(ABC):
    """Base class for all data connectors in DEX.

    Subclass and implement ``connect``, ``fetch``, and ``close``.
    """

    def __init__(self, name: str, source_type: str) -> None:
        self.name = name
        self.source_type = source_type
        self.status = ConnectorStatus.IDLE
        self._connected_at: datetime | None = None

    # -- lifecycle -----------------------------------------------------------

    @abstractmethod
    async def connect(self) -> bool:
        """Establish connection to the data source."""
        ...

    @abstractmethod
    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Retrieve records from the data source."""
        ...

    @abstractmethod
    async def close(self) -> None:
        """Release resources held by the connector."""
        ...

    # -- helpers -------------------------------------------------------------

    def _mark_connected(self) -> None:
        self.status = ConnectorStatus.CONNECTED
        self._connected_at = datetime.now(tz=UTC)
        logger.info("connector connected", name=self.name)

    def _mark_error(self, message: str) -> None:
        self.status = ConnectorStatus.ERROR
        logger.error("connector error", name=self.name, message=message)

    # -- context manager -----------------------------------------------------

    async def __aenter__(self) -> DataConnector:
        await self.connect()
        return self

    async def __aexit__(self, *_exc: object) -> None:
        await self.close()

connect() abstractmethod async

Establish connection to the data source.

Source code in src/dataenginex/data/connectors/legacy.py
94
95
96
97
@abstractmethod
async def connect(self) -> bool:
    """Establish connection to the data source."""
    ...

fetch(*, limit=None, offset=0, filters=None) abstractmethod async

Retrieve records from the data source.

Source code in src/dataenginex/data/connectors/legacy.py
 99
100
101
102
103
104
105
106
107
108
@abstractmethod
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Retrieve records from the data source."""
    ...

close() abstractmethod async

Release resources held by the connector.

Source code in src/dataenginex/data/connectors/legacy.py
110
111
112
113
@abstractmethod
async def close(self) -> None:
    """Release resources held by the connector."""
    ...

FetchResult dataclass

Outcome of a single fetch invocation.

Attributes:

Name Type Description
records list[dict[str, Any]]

Retrieved data records.

record_count int

Number of records returned.

source str

Name of the data source.

fetched_at datetime

Timestamp of the fetch.

duration_ms float

Fetch duration in milliseconds.

errors list[str]

Error messages (empty on success).

Source code in src/dataenginex/data/connectors/legacy.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class FetchResult:
    """Outcome of a single ``fetch`` invocation.

    Attributes:
        records: Retrieved data records.
        record_count: Number of records returned.
        source: Name of the data source.
        fetched_at: Timestamp of the fetch.
        duration_ms: Fetch duration in milliseconds.
        errors: Error messages (empty on success).
    """

    records: list[dict[str, Any]]
    record_count: int
    source: str
    fetched_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    duration_ms: float = 0.0
    errors: list[str] = field(default_factory=list)

    @property
    def success(self) -> bool:
        """Return ``True`` if the fetch completed without errors."""
        return len(self.errors) == 0

success property

Return True if the fetch completed without errors.

FileConnector

Bases: DataConnector

Reads records from local JSON, JSONL, or CSV files.

Parameters

name: Human-readable connector label. path: File system path to the data file. file_format: One of "json", "jsonl", "csv". encoding: Text encoding (default utf-8).

Source code in src/dataenginex/data/connectors/legacy.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class FileConnector(DataConnector):
    """Reads records from local JSON, JSONL, or CSV files.

    Parameters
    ----------
    name:
        Human-readable connector label.
    path:
        File system path to the data file.
    file_format:
        One of ``"json"``, ``"jsonl"``, ``"csv"``.
    encoding:
        Text encoding (default ``utf-8``).
    """

    SUPPORTED_FORMATS = {"json", "jsonl", "csv"}

    def __init__(
        self,
        name: str,
        path: str | Path,
        file_format: str = "json",
        encoding: str = "utf-8",
    ) -> None:
        super().__init__(name=name, source_type="file")
        self.path = Path(path)
        if file_format not in self.SUPPORTED_FORMATS:
            raise ValueError(
                f"Unsupported format {file_format!r}; choose from {self.SUPPORTED_FORMATS}"
            )
        self.file_format = file_format
        self.encoding = encoding
        self._data: list[dict[str, Any]] | None = None

    async def connect(self) -> bool:
        """Verify the configured file exists and is readable."""
        if not self.path.exists():
            self._mark_error(f"File not found: {self.path}")
            return False
        self._mark_connected()
        return True

    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Read and parse records from the local file."""
        self.status = ConnectorStatus.FETCHING
        start = time.perf_counter()

        try:
            if self._data is None:
                self._data = self._load()

            records = self._data[offset:]
            if limit is not None:
                records = records[:limit]

            if filters:
                records = [r for r in records if all(r.get(k) == v for k, v in filters.items())]

            duration = (time.perf_counter() - start) * 1000
            self.status = ConnectorStatus.CONNECTED
            return FetchResult(
                records=records,
                record_count=len(records),
                source=self.name,
                duration_ms=round(duration, 2),
            )
        except Exception as exc:
            duration = (time.perf_counter() - start) * 1000
            self._mark_error(str(exc))
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                duration_ms=round(duration, 2),
                errors=[str(exc)],
            )

    async def close(self) -> None:
        """Release cached file data."""
        self._data = None
        self.status = ConnectorStatus.CLOSED

    # -- private loaders -----------------------------------------------------

    def _load(self) -> list[dict[str, Any]]:
        text = self.path.read_text(encoding=self.encoding)
        if self.file_format == "json":
            data = json.loads(text)
            return data if isinstance(data, list) else [data]
        if self.file_format == "jsonl":
            return [json.loads(line) for line in text.splitlines() if line.strip()]
        # csv
        return self._load_csv(text)

    @staticmethod
    def _load_csv(text: str) -> list[dict[str, Any]]:
        reader = csv.DictReader(io.StringIO(text))
        return [dict(row) for row in reader]

connect() async

Verify the configured file exists and is readable.

Source code in src/dataenginex/data/connectors/legacy.py
295
296
297
298
299
300
301
async def connect(self) -> bool:
    """Verify the configured file exists and is readable."""
    if not self.path.exists():
        self._mark_error(f"File not found: {self.path}")
        return False
    self._mark_connected()
    return True

fetch(*, limit=None, offset=0, filters=None) async

Read and parse records from the local file.

Source code in src/dataenginex/data/connectors/legacy.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Read and parse records from the local file."""
    self.status = ConnectorStatus.FETCHING
    start = time.perf_counter()

    try:
        if self._data is None:
            self._data = self._load()

        records = self._data[offset:]
        if limit is not None:
            records = records[:limit]

        if filters:
            records = [r for r in records if all(r.get(k) == v for k, v in filters.items())]

        duration = (time.perf_counter() - start) * 1000
        self.status = ConnectorStatus.CONNECTED
        return FetchResult(
            records=records,
            record_count=len(records),
            source=self.name,
            duration_ms=round(duration, 2),
        )
    except Exception as exc:
        duration = (time.perf_counter() - start) * 1000
        self._mark_error(str(exc))
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            duration_ms=round(duration, 2),
            errors=[str(exc)],
        )

close() async

Release cached file data.

Source code in src/dataenginex/data/connectors/legacy.py
344
345
346
347
async def close(self) -> None:
    """Release cached file data."""
    self._data = None
    self.status = ConnectorStatus.CLOSED

RestConnector

Bases: DataConnector

Fetches records from an HTTP/REST API endpoint.

Parameters

name: Human-readable connector label. base_url: Root URL of the API (e.g. https://api.example.com). endpoint: Path appended to base_url for fetches (e.g. /v1/jobs). headers: Extra HTTP headers (auth tokens, accept types, etc.). timeout: HTTP timeout in seconds (default 30). records_key: JSON key that contains the result list (e.g. "data"). When None the root response is expected to be a list.

Source code in src/dataenginex/data/connectors/legacy.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class RestConnector(DataConnector):
    """Fetches records from an HTTP/REST API endpoint.

    Parameters
    ----------
    name:
        Human-readable connector label.
    base_url:
        Root URL of the API (e.g. ``https://api.example.com``).
    endpoint:
        Path appended to *base_url* for fetches (e.g. ``/v1/jobs``).
    headers:
        Extra HTTP headers (auth tokens, accept types, etc.).
    timeout:
        HTTP timeout in seconds (default 30).
    records_key:
        JSON key that contains the result list (e.g. ``"data"``).
        When *None* the root response is expected to be a list.
    """

    def __init__(
        self,
        name: str,
        base_url: str,
        endpoint: str = "/",
        headers: dict[str, str] | None = None,
        timeout: float = 30.0,
        records_key: str | None = "data",
    ) -> None:
        super().__init__(name=name, source_type="rest_api")
        self.base_url = base_url.rstrip("/")
        self.endpoint = endpoint
        self.headers = headers or {}
        self.timeout = timeout
        self.records_key = records_key
        self._client: httpx.AsyncClient | None = None

    async def connect(self) -> bool:
        """Open an HTTP client session to the configured base URL."""
        try:
            self._client = httpx.AsyncClient(
                base_url=self.base_url,
                headers=self.headers,
                timeout=httpx.Timeout(self.timeout),
            )
            self._mark_connected()
            return True
        except Exception as exc:
            self._mark_error(str(exc))
            return False

    async def fetch(
        self,
        *,
        limit: int | None = None,
        offset: int = 0,
        filters: dict[str, Any] | None = None,
    ) -> FetchResult:
        """Fetch records from the REST endpoint with optional pagination."""
        if self._client is None:
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                errors=["Connector not connected"],
            )

        self.status = ConnectorStatus.FETCHING
        params: dict[str, Any] = {**(filters or {})}
        if limit is not None:
            params["limit"] = limit
        if offset:
            params["offset"] = offset

        start = time.perf_counter()
        try:
            resp = await self._client.get(self.endpoint, params=params)
            resp.raise_for_status()
            body = resp.json()

            if self.records_key and isinstance(body, dict):
                records = body.get(self.records_key, [])
            elif isinstance(body, list):
                records = body
            else:
                records = [body]

            duration = (time.perf_counter() - start) * 1000
            self.status = ConnectorStatus.CONNECTED
            return FetchResult(
                records=records,
                record_count=len(records),
                source=self.name,
                duration_ms=round(duration, 2),
            )
        except Exception as exc:
            duration = (time.perf_counter() - start) * 1000
            self._mark_error(str(exc))
            return FetchResult(
                records=[],
                record_count=0,
                source=self.name,
                duration_ms=round(duration, 2),
                errors=[str(exc)],
            )

    async def close(self) -> None:
        """Close the HTTP client session and release resources."""
        if self._client:
            await self._client.aclose()
            self._client = None
        self.status = ConnectorStatus.CLOSED
        logger.info("connector closed", name=self.name)

connect() async

Open an HTTP client session to the configured base URL.

Source code in src/dataenginex/data/connectors/legacy.py
178
179
180
181
182
183
184
185
186
187
188
189
190
async def connect(self) -> bool:
    """Open an HTTP client session to the configured base URL."""
    try:
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            headers=self.headers,
            timeout=httpx.Timeout(self.timeout),
        )
        self._mark_connected()
        return True
    except Exception as exc:
        self._mark_error(str(exc))
        return False

fetch(*, limit=None, offset=0, filters=None) async

Fetch records from the REST endpoint with optional pagination.

Source code in src/dataenginex/data/connectors/legacy.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def fetch(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
    filters: dict[str, Any] | None = None,
) -> FetchResult:
    """Fetch records from the REST endpoint with optional pagination."""
    if self._client is None:
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            errors=["Connector not connected"],
        )

    self.status = ConnectorStatus.FETCHING
    params: dict[str, Any] = {**(filters or {})}
    if limit is not None:
        params["limit"] = limit
    if offset:
        params["offset"] = offset

    start = time.perf_counter()
    try:
        resp = await self._client.get(self.endpoint, params=params)
        resp.raise_for_status()
        body = resp.json()

        if self.records_key and isinstance(body, dict):
            records = body.get(self.records_key, [])
        elif isinstance(body, list):
            records = body
        else:
            records = [body]

        duration = (time.perf_counter() - start) * 1000
        self.status = ConnectorStatus.CONNECTED
        return FetchResult(
            records=records,
            record_count=len(records),
            source=self.name,
            duration_ms=round(duration, 2),
        )
    except Exception as exc:
        duration = (time.perf_counter() - start) * 1000
        self._mark_error(str(exc))
        return FetchResult(
            records=[],
            record_count=0,
            source=self.name,
            duration_ms=round(duration, 2),
            errors=[str(exc)],
        )

close() async

Close the HTTP client session and release resources.

Source code in src/dataenginex/data/connectors/legacy.py
247
248
249
250
251
252
253
async def close(self) -> None:
    """Close the HTTP client session and release resources."""
    if self._client:
        await self._client.aclose()
        self._client = None
    self.status = ConnectorStatus.CLOSED
    logger.info("connector closed", name=self.name)

PipelineResult dataclass

Result of a single pipeline execution.

Source code in src/dataenginex/data/pipeline/runner.py
40
41
42
43
44
45
46
47
48
49
50
@dataclass
class PipelineResult:
    """Result of a single pipeline execution."""

    pipeline: str
    success: bool
    rows_input: int = 0
    rows_output: int = 0
    steps_completed: int = 0
    dry_run: bool = False
    error: str | None = None

PipelineRunner

Execute data pipelines defined in dex.yaml.

Parameters:

Name Type Description Default
config DexConfig

Loaded DexConfig.

required
data_dir Path | None

Root directory for lakehouse layer storage.

None
Source code in src/dataenginex/data/pipeline/runner.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class PipelineRunner:
    """Execute data pipelines defined in dex.yaml.

    Args:
        config: Loaded DexConfig.
        data_dir: Root directory for lakehouse layer storage.
    """

    def __init__(
        self,
        config: DexConfig,
        data_dir: Path | None = None,
        project_dir: Path | None = None,
        lineage: PersistentLineage | None = None,
    ) -> None:
        self._config = config
        self._data_dir = data_dir or Path(".dex/lakehouse")
        self._data_dir.mkdir(parents=True, exist_ok=True)
        self._project_dir = project_dir
        self._lineage = lineage

    def run(self, pipeline_name: str, *, dry_run: bool = False) -> PipelineResult:
        """Run a single pipeline by name."""
        pipelines = self._config.data.pipelines
        if pipeline_name not in pipelines:
            available = list(pipelines.keys())
            msg = f"Pipeline '{pipeline_name}' not found. Available: {available}"
            raise KeyError(msg)

        pipeline_config = pipelines[pipeline_name]
        log = logger.bind(pipeline=pipeline_name)
        log.info("pipeline starting", dry_run=dry_run)

        if dry_run:
            log.info("pipeline dry run — validating only")
            return PipelineResult(pipeline=pipeline_name, success=True, dry_run=True)

        db_path = self._data_dir / f"{pipeline_name}.duckdb"
        conn = duckdb.connect(str(db_path))

        try:
            return self._execute(conn, pipeline_name, pipeline_config, log)
        except (PipelineError, PipelineStepError, KeyError):
            raise
        except Exception as e:
            log.error("pipeline failed", error=str(e))
            return PipelineResult(pipeline=pipeline_name, success=False, error=str(e))
        finally:
            conn.close()

    def _execute(
        self,
        conn: duckdb.DuckDBPyConnection,
        name: str,
        cfg: PipelineConfig,
        log: Any,
    ) -> PipelineResult:
        """Core pipeline execution: extract -> transform -> quality -> load."""
        rows_input = self._extract(conn, name, cfg, log)
        current_table, steps = self._transform(conn, name, cfg, log)
        self._check_quality(conn, name, cfg, current_table, log)
        rows_output = self._load(conn, name, cfg, current_table, log)

        return PipelineResult(
            pipeline=name,
            success=True,
            rows_input=rows_input,
            rows_output=rows_output,
            steps_completed=steps,
        )

    def _extract(
        self,
        conn: duckdb.DuckDBPyConnection,
        name: str,
        cfg: PipelineConfig,
        log: Any,
    ) -> int:
        """Extract source data into DuckDB bronze table. Returns row count."""
        sources = self._config.data.sources
        if cfg.source not in sources:
            msg = f"Source '{cfg.source}' not found"
            raise PipelineStepError(step="extract", cause=msg, pipeline=name)

        source_config = sources[cfg.source]
        connector_cls = connector_registry.get(source_config.type)

        connector_kwargs: dict[str, Any] = dict(source_config.connection)
        if source_config.path and "path" not in connector_kwargs:
            src_path = source_config.path
            if self._project_dir and not Path(src_path).is_absolute():
                src_path = str(self._project_dir / src_path)
            connector_kwargs["path"] = src_path
        if source_config.url and "url" not in connector_kwargs:
            connector_kwargs["url"] = source_config.url

        connector = connector_cls(**connector_kwargs)
        connector.connect()
        read_table = connector_kwargs.get("default_file", cfg.source)
        raw_data = connector.read(table=read_table)
        connector.disconnect()

        bronze_arrow = pa.Table.from_pylist(raw_data)  # noqa: F841 — referenced by DuckDB SQL
        conn.execute("CREATE OR REPLACE TABLE bronze AS SELECT * FROM bronze_arrow")
        log.info("extract complete", source=cfg.source, rows=len(raw_data))
        if self._lineage is not None:
            self._lineage.record(
                operation="ingest",
                layer="bronze",
                source=cfg.source,
                destination=f"bronze/{name}",
                input_count=len(raw_data),
                output_count=len(raw_data),
                pipeline_name=name,
                step_name="extract",
            )
        return len(raw_data)

    def _transform(
        self,
        conn: duckdb.DuckDBPyConnection,
        name: str,
        cfg: PipelineConfig,
        log: Any,
    ) -> tuple[str, int]:
        """Run transform chain. Returns (final_table, steps_completed)."""
        current_table = "bronze"
        steps_completed = 0

        for i, step_config in enumerate(cfg.transforms):
            kwargs = _build_transform_kwargs(step_config)
            transform_cls = transform_registry.get(step_config.type)
            transform = transform_cls(**kwargs)

            errors = transform.validate()
            if errors:
                msg = f"Transform validation failed: {errors}"
                raise PipelineStepError(step=f"transform-{i}", cause=msg, pipeline=name)

            current_table = transform.apply(conn, current_table)
            steps_completed += 1
            log.info("transform complete", step=i, type=step_config.type)

        return current_table, steps_completed

    def _check_quality(
        self,
        conn: duckdb.DuckDBPyConnection,
        name: str,
        cfg: PipelineConfig,
        table: str,
        log: Any,
    ) -> None:
        """Run quality gate if configured."""
        if not cfg.quality:
            return
        q = cfg.quality
        result = check_quality(
            conn,
            table,
            completeness=q.completeness,
            uniqueness=q.uniqueness,
            custom_sql=q.custom_sql,
        )
        if not result.passed:
            msg = (
                f"Quality gate failed: completeness={result.completeness_score:.2f}, "
                f"uniqueness={result.uniqueness_score:.2f}"
            )
            raise PipelineStepError(step="quality", cause=msg, pipeline=name)
        log.info("quality gate passed")

    def _load(
        self,
        conn: duckdb.DuckDBPyConnection,
        name: str,
        cfg: PipelineConfig,
        table: str,
        log: Any,
    ) -> int:
        """Write final table to lakehouse layer as parquet. Returns row count."""
        count_row = conn.execute(f"SELECT count(*) FROM {table}").fetchone()
        rows = int(count_row[0]) if count_row else 0
        target_layer = "silver"
        if cfg.target:
            target_layer = cfg.target.get("layer", "silver")

        layer_dir = self._data_dir / target_layer
        layer_dir.mkdir(parents=True, exist_ok=True)
        output_path = layer_dir / f"{name}.parquet"
        conn.execute(f"COPY {table} TO '{output_path}' (FORMAT PARQUET)")
        log.info("load complete", layer=target_layer, path=str(output_path), rows=rows)
        if self._lineage is not None:
            self._lineage.record(
                operation="load",
                layer=target_layer,
                source=f"bronze/{name}",
                destination=str(output_path),
                input_count=rows,
                output_count=rows,
                pipeline_name=name,
                step_name="load",
            )
        return rows

    def run_all(self) -> dict[str, PipelineResult]:
        """Run all pipelines in dependency order."""
        if not self._config.data.pipelines:
            return {}

        dep_graph: dict[str, list[str]] = {
            name: list(p.depends_on) for name, p in self._config.data.pipelines.items()
        }
        order = resolve_execution_order(dep_graph)
        results: dict[str, PipelineResult] = {}

        for name in order:
            result = self.run(name)
            results[name] = result
            if not result.success:
                logger.error("pipeline failed — stopping", pipeline=name)
                break

        return results

run(pipeline_name, *, dry_run=False)

Run a single pipeline by name.

Source code in src/dataenginex/data/pipeline/runner.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def run(self, pipeline_name: str, *, dry_run: bool = False) -> PipelineResult:
    """Run a single pipeline by name."""
    pipelines = self._config.data.pipelines
    if pipeline_name not in pipelines:
        available = list(pipelines.keys())
        msg = f"Pipeline '{pipeline_name}' not found. Available: {available}"
        raise KeyError(msg)

    pipeline_config = pipelines[pipeline_name]
    log = logger.bind(pipeline=pipeline_name)
    log.info("pipeline starting", dry_run=dry_run)

    if dry_run:
        log.info("pipeline dry run — validating only")
        return PipelineResult(pipeline=pipeline_name, success=True, dry_run=True)

    db_path = self._data_dir / f"{pipeline_name}.duckdb"
    conn = duckdb.connect(str(db_path))

    try:
        return self._execute(conn, pipeline_name, pipeline_config, log)
    except (PipelineError, PipelineStepError, KeyError):
        raise
    except Exception as e:
        log.error("pipeline failed", error=str(e))
        return PipelineResult(pipeline=pipeline_name, success=False, error=str(e))
    finally:
        conn.close()

run_all()

Run all pipelines in dependency order.

Source code in src/dataenginex/data/pipeline/runner.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def run_all(self) -> dict[str, PipelineResult]:
    """Run all pipelines in dependency order."""
    if not self._config.data.pipelines:
        return {}

    dep_graph: dict[str, list[str]] = {
        name: list(p.depends_on) for name, p in self._config.data.pipelines.items()
    }
    order = resolve_execution_order(dep_graph)
    results: dict[str, PipelineResult] = {}

    for name in order:
        result = self.run(name)
        results[name] = result
        if not result.success:
            logger.error("pipeline failed — stopping", pipeline=name)
            break

    return results

ColumnProfile dataclass

Statistics for a single column / field.

Source code in src/dataenginex/data/profiler.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass
class ColumnProfile:
    """Statistics for a single column / field."""

    name: str
    dtype: str  # "string", "numeric", "boolean", "null", "mixed"
    total_count: int = 0
    null_count: int = 0
    unique_count: int = 0

    # numeric stats (populated when dtype == "numeric")
    min_value: float | None = None
    max_value: float | None = None
    mean_value: float | None = None
    median_value: float | None = None
    stddev_value: float | None = None

    # string stats (populated when dtype == "string")
    min_length: int | None = None
    max_length: int | None = None
    avg_length: float | None = None

    # top values
    top_values: list[tuple[Any, int]] = field(default_factory=list)

    @property
    def null_rate(self) -> float:
        """Fraction of values that are ``None`` (0.0–1.0)."""
        return self.null_count / self.total_count if self.total_count else 0.0

    @property
    def uniqueness(self) -> float:
        """Ratio of unique non-null values to total non-null values."""
        non_null = self.total_count - self.null_count
        return self.unique_count / non_null if non_null else 0.0

null_rate property

Fraction of values that are None (0.0–1.0).

uniqueness property

Ratio of unique non-null values to total non-null values.

DataProfiler

Profile a list of dict records in pure Python (no pandas needed).

Source code in src/dataenginex/data/profiler.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class DataProfiler:
    """Profile a list of dict records in pure Python (no pandas needed)."""

    def profile(
        self,
        records: list[dict[str, Any]],
        dataset_name: str = "unnamed",
    ) -> ProfileReport:
        """Profile a list of dict records and return column-level statistics."""
        import time

        start = time.perf_counter()

        if not records:
            return ProfileReport(
                dataset_name=dataset_name,
                record_count=0,
                column_count=0,
                columns=[],
            )

        # Discover all keys across all records
        all_keys: list[str] = []
        seen: set[str] = set()
        for rec in records:
            for k in rec:
                if k not in seen:
                    all_keys.append(k)
                    seen.add(k)

        columns = [self._profile_column(key, records) for key in all_keys]
        duration = (time.perf_counter() - start) * 1000

        return ProfileReport(
            dataset_name=dataset_name,
            record_count=len(records),
            column_count=len(columns),
            columns=columns,
            duration_ms=round(duration, 2),
        )

    # ------------------------------------------------------------------

    @staticmethod
    def _profile_column(key: str, records: list[dict[str, Any]]) -> ColumnProfile:
        values = [r.get(key) for r in records]
        total = len(values)
        non_null = [v for v in values if v is not None]
        null_count = total - len(non_null)

        # Determine dominant type
        dtype = DataProfiler._infer_dtype(non_null)
        counter: Counter[Any] = Counter(str(v) for v in non_null)
        top_values = counter.most_common(5)
        unique_count = len(counter)

        col = ColumnProfile(
            name=key,
            dtype=dtype,
            total_count=total,
            null_count=null_count,
            unique_count=unique_count,
            top_values=top_values,
        )

        if dtype == "numeric":
            nums = [float(v) for v in non_null if DataProfiler._is_numeric(v)]
            if nums:
                col.min_value = min(nums)
                col.max_value = max(nums)
                col.mean_value = statistics.mean(nums)
                col.median_value = statistics.median(nums)
                col.stddev_value = statistics.stdev(nums) if len(nums) > 1 else 0.0

        elif dtype == "string":
            lengths = [len(str(v)) for v in non_null]
            if lengths:
                col.min_length = min(lengths)
                col.max_length = max(lengths)
                col.avg_length = statistics.mean(lengths)

        return col

    @staticmethod
    def _infer_dtype(values: list[Any]) -> str:
        if not values:
            return "null"
        types: set[str] = set()
        for v in values:
            if isinstance(v, bool):
                types.add("boolean")
            elif isinstance(v, (int, float)):
                types.add("numeric")
            elif isinstance(v, str):
                types.add("string")
            else:
                types.add("mixed")
        if len(types) == 1:
            return types.pop()
        return "mixed"

    @staticmethod
    def _is_numeric(value: Any) -> bool:
        if isinstance(value, bool):
            return False
        if isinstance(value, (int, float)):
            return True
        try:
            float(value)
            return True
        except (ValueError, TypeError):
            return False

profile(records, dataset_name='unnamed')

Profile a list of dict records and return column-level statistics.

Source code in src/dataenginex/data/profiler.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def profile(
    self,
    records: list[dict[str, Any]],
    dataset_name: str = "unnamed",
) -> ProfileReport:
    """Profile a list of dict records and return column-level statistics."""
    import time

    start = time.perf_counter()

    if not records:
        return ProfileReport(
            dataset_name=dataset_name,
            record_count=0,
            column_count=0,
            columns=[],
        )

    # Discover all keys across all records
    all_keys: list[str] = []
    seen: set[str] = set()
    for rec in records:
        for k in rec:
            if k not in seen:
                all_keys.append(k)
                seen.add(k)

    columns = [self._profile_column(key, records) for key in all_keys]
    duration = (time.perf_counter() - start) * 1000

    return ProfileReport(
        dataset_name=dataset_name,
        record_count=len(records),
        column_count=len(columns),
        columns=columns,
        duration_ms=round(duration, 2),
    )

ProfileReport dataclass

Aggregated profiling report for a dataset.

Attributes:

Name Type Description
dataset_name str

Name of the profiled dataset.

record_count int

Total number of records profiled.

column_count int

Number of columns/fields discovered.

columns list[ColumnProfile]

Per-column profile statistics.

profiled_at datetime

Timestamp when profiling was performed.

duration_ms float

Time taken for profiling in milliseconds.

Source code in src/dataenginex/data/profiler.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass
class ProfileReport:
    """Aggregated profiling report for a dataset.

    Attributes:
        dataset_name: Name of the profiled dataset.
        record_count: Total number of records profiled.
        column_count: Number of columns/fields discovered.
        columns: Per-column profile statistics.
        profiled_at: Timestamp when profiling was performed.
        duration_ms: Time taken for profiling in milliseconds.
    """

    dataset_name: str
    record_count: int
    column_count: int
    columns: list[ColumnProfile]
    profiled_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    duration_ms: float = 0.0

    def to_dict(self) -> dict[str, Any]:
        """Serialize the profile report to a plain dictionary."""
        return {
            "dataset_name": self.dataset_name,
            "record_count": self.record_count,
            "column_count": self.column_count,
            "profiled_at": self.profiled_at.isoformat(),
            "duration_ms": self.duration_ms,
            "columns": [
                {
                    "name": c.name,
                    "dtype": c.dtype,
                    "total_count": c.total_count,
                    "null_count": c.null_count,
                    "null_rate": round(c.null_rate, 4),
                    "unique_count": c.unique_count,
                    "uniqueness": round(c.uniqueness, 4),
                    "min_value": c.min_value,
                    "max_value": c.max_value,
                    "mean_value": round(c.mean_value, 4) if c.mean_value is not None else None,
                    "median_value": c.median_value,
                    "stddev_value": (
                        round(c.stddev_value, 4) if c.stddev_value is not None else None
                    ),
                    "min_length": c.min_length,
                    "max_length": c.max_length,
                    "avg_length": round(c.avg_length, 2) if c.avg_length is not None else None,
                    "top_values": c.top_values[:5],
                }
                for c in self.columns
            ],
        }

    @property
    def completeness(self) -> float:
        """Overall dataset completeness (1 − avg null rate)."""
        if not self.columns:
            return 1.0
        avg_null = statistics.mean(c.null_rate for c in self.columns)
        return round(1.0 - avg_null, 4)

completeness property

Overall dataset completeness (1 − avg null rate).

to_dict()

Serialize the profile report to a plain dictionary.

Source code in src/dataenginex/data/profiler.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def to_dict(self) -> dict[str, Any]:
    """Serialize the profile report to a plain dictionary."""
    return {
        "dataset_name": self.dataset_name,
        "record_count": self.record_count,
        "column_count": self.column_count,
        "profiled_at": self.profiled_at.isoformat(),
        "duration_ms": self.duration_ms,
        "columns": [
            {
                "name": c.name,
                "dtype": c.dtype,
                "total_count": c.total_count,
                "null_count": c.null_count,
                "null_rate": round(c.null_rate, 4),
                "unique_count": c.unique_count,
                "uniqueness": round(c.uniqueness, 4),
                "min_value": c.min_value,
                "max_value": c.max_value,
                "mean_value": round(c.mean_value, 4) if c.mean_value is not None else None,
                "median_value": c.median_value,
                "stddev_value": (
                    round(c.stddev_value, 4) if c.stddev_value is not None else None
                ),
                "min_length": c.min_length,
                "max_length": c.max_length,
                "avg_length": round(c.avg_length, 2) if c.avg_length is not None else None,
                "top_values": c.top_values[:5],
            }
            for c in self.columns
        ],
    }

QualityResult dataclass

Result of a quality gate check.

Source code in src/dataenginex/data/quality/gates.py
36
37
38
39
40
41
42
43
44
45
@dataclass
class QualityResult:
    """Result of a quality gate check."""

    passed: bool
    completeness_score: float = 1.0
    uniqueness_score: float = 1.0
    custom_passed: bool = True
    schema_violations: list[str] = field(default_factory=list)
    details: dict[str, Any] = field(default_factory=dict)

SchemaRegistry

In-process schema registry backed by an optional JSON file.

Parameters

persist_path: If given, schemas are saved/loaded from this JSON file so they survive across process restarts.

Source code in src/dataenginex/data/registry.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class SchemaRegistry:
    """In-process schema registry backed by an optional JSON file.

    Parameters
    ----------
    persist_path:
        If given, schemas are saved/loaded from this JSON file so they
        survive across process restarts.
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        # schema_name → [SchemaVersion …] (ordered oldest → newest)
        self._schemas: dict[str, list[SchemaVersion]] = {}
        self._persist_path = Path(persist_path) if persist_path else None
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def register(self, schema: SchemaVersion) -> SchemaVersion:
        """Register a new schema version.  Duplicate versions are rejected."""
        versions = self._schemas.setdefault(schema.name, [])
        existing = {v.version for v in versions}
        if schema.version in existing:
            raise ValueError(f"Schema {schema.name!r} version {schema.version} already registered")
        versions.append(schema)
        logger.info("schema registered", name=schema.name, version=schema.version)
        self._save()
        return schema

    def get_latest(self, name: str) -> SchemaVersion | None:
        """Return the most recently registered version for *name*."""
        versions = self._schemas.get(name)
        if not versions:
            return None
        return versions[-1]

    def get_version(self, name: str, version: str) -> SchemaVersion | None:
        """Return a specific version, or *None* if not found."""
        for v in self._schemas.get(name, []):
            if v.version == version:
                return v
        return None

    def list_schemas(self) -> list[str]:
        """Return all registered schema names."""
        return list(self._schemas.keys())

    def list_versions(self, name: str) -> list[str]:
        """Return all registered versions for *name* (oldest first)."""
        return [v.version for v in self._schemas.get(name, [])]

    def validate(
        self, name: str, record: dict[str, Any], version: str | None = None
    ) -> tuple[bool, list[str]]:
        """Validate *record* against a schema.

        If *version* is ``None`` the latest version is used.
        """
        schema = self.get_version(name, version) if version else self.get_latest(name)
        if schema is None:
            return False, [f"Schema {name!r} (version={version}) not found"]
        return schema.validate_record(record)

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        data: dict[str, list[dict[str, Any]]] = {}
        for name, versions in self._schemas.items():
            data[name] = [v.to_dict() for v in versions]
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        raw = json.loads(self._persist_path.read_text())
        for name, versions in raw.items():
            self._schemas[name] = [
                SchemaVersion(
                    name=v["name"],
                    version=v["version"],
                    fields=v["fields"],
                    required_fields=v.get("required_fields", []),
                    description=v.get("description", ""),
                    metadata=v.get("metadata", {}),
                )
                for v in versions
            ]
        logger.info("schemas loaded", count=len(self._schemas), path=str(self._persist_path))

register(schema)

Register a new schema version. Duplicate versions are rejected.

Source code in src/dataenginex/data/registry.py
89
90
91
92
93
94
95
96
97
98
def register(self, schema: SchemaVersion) -> SchemaVersion:
    """Register a new schema version.  Duplicate versions are rejected."""
    versions = self._schemas.setdefault(schema.name, [])
    existing = {v.version for v in versions}
    if schema.version in existing:
        raise ValueError(f"Schema {schema.name!r} version {schema.version} already registered")
    versions.append(schema)
    logger.info("schema registered", name=schema.name, version=schema.version)
    self._save()
    return schema

get_latest(name)

Return the most recently registered version for name.

Source code in src/dataenginex/data/registry.py
100
101
102
103
104
105
def get_latest(self, name: str) -> SchemaVersion | None:
    """Return the most recently registered version for *name*."""
    versions = self._schemas.get(name)
    if not versions:
        return None
    return versions[-1]

get_version(name, version)

Return a specific version, or None if not found.

Source code in src/dataenginex/data/registry.py
107
108
109
110
111
112
def get_version(self, name: str, version: str) -> SchemaVersion | None:
    """Return a specific version, or *None* if not found."""
    for v in self._schemas.get(name, []):
        if v.version == version:
            return v
    return None

list_schemas()

Return all registered schema names.

Source code in src/dataenginex/data/registry.py
114
115
116
def list_schemas(self) -> list[str]:
    """Return all registered schema names."""
    return list(self._schemas.keys())

list_versions(name)

Return all registered versions for name (oldest first).

Source code in src/dataenginex/data/registry.py
118
119
120
def list_versions(self, name: str) -> list[str]:
    """Return all registered versions for *name* (oldest first)."""
    return [v.version for v in self._schemas.get(name, [])]

validate(name, record, version=None)

Validate record against a schema.

If version is None the latest version is used.

Source code in src/dataenginex/data/registry.py
122
123
124
125
126
127
128
129
130
131
132
def validate(
    self, name: str, record: dict[str, Any], version: str | None = None
) -> tuple[bool, list[str]]:
    """Validate *record* against a schema.

    If *version* is ``None`` the latest version is used.
    """
    schema = self.get_version(name, version) if version else self.get_latest(name)
    if schema is None:
        return False, [f"Schema {name!r} (version={version}) not found"]
    return schema.validate_record(record)

SchemaVersion dataclass

An immutable snapshot of a schema at a particular version.

Attributes:

Name Type Description
name str

Schema identifier (e.g. "user_event").

version str

Semver string (e.g. "1.2.0").

fields dict[str, str]

Mapping of field names to type descriptions.

required_fields list[str]

Fields that must be present in every record.

description str

Human-readable summary of the schema.

created_at datetime

When this version was registered.

metadata dict[str, Any]

Extra context dict.

Source code in src/dataenginex/data/registry.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class SchemaVersion:
    """An immutable snapshot of a schema at a particular version.

    Attributes:
        name: Schema identifier (e.g. ``"user_event"``).
        version: Semver string (e.g. ``"1.2.0"``).
        fields: Mapping of field names to type descriptions.
        required_fields: Fields that must be present in every record.
        description: Human-readable summary of the schema.
        created_at: When this version was registered.
        metadata: Extra context dict.
    """

    name: str
    version: str  # semver string, e.g. "1.2.0"
    fields: dict[str, str]  # field_name → type_description
    required_fields: list[str] = field(default_factory=list)
    description: str = ""
    created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Serialize the schema version to a plain dictionary."""
        return {
            "name": self.name,
            "version": self.version,
            "fields": self.fields,
            "required_fields": self.required_fields,
            "description": self.description,
            "created_at": self.created_at.isoformat(),
            "metadata": self.metadata,
        }

    def validate_record(self, record: dict[str, Any]) -> tuple[bool, list[str]]:
        """Check that *record* has all required fields.

        Returns ``(is_valid, errors)`` where *errors* lists the missing
        required fields.
        """
        missing = [f for f in self.required_fields if f not in record]
        return len(missing) == 0, [f"Missing required field: {f}" for f in missing]

to_dict()

Serialize the schema version to a plain dictionary.

Source code in src/dataenginex/data/registry.py
48
49
50
51
52
53
54
55
56
57
58
def to_dict(self) -> dict[str, Any]:
    """Serialize the schema version to a plain dictionary."""
    return {
        "name": self.name,
        "version": self.version,
        "fields": self.fields,
        "required_fields": self.required_fields,
        "description": self.description,
        "created_at": self.created_at.isoformat(),
        "metadata": self.metadata,
    }

validate_record(record)

Check that record has all required fields.

Returns (is_valid, errors) where errors lists the missing required fields.

Source code in src/dataenginex/data/registry.py
60
61
62
63
64
65
66
67
def validate_record(self, record: dict[str, Any]) -> tuple[bool, list[str]]:
    """Check that *record* has all required fields.

    Returns ``(is_valid, errors)`` where *errors* lists the missing
    required fields.
    """
    missing = [f for f in self.required_fields if f not in record]
    return len(missing) == 0, [f"Missing required field: {f}" for f in missing]

check_quality(conn, table, *, completeness=None, uniqueness=None, schema=None, custom_sql=None)

Run quality checks against a DuckDB table.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

Active DuckDB connection.

required
table str

Table name to check.

required
completeness float | None

Minimum fraction of non-null values (0.0-1.0).

None
uniqueness list[str] | None

Columns that must be unique (no duplicates).

None
schema list[ColumnSpec] | None

Expected column specs (existence, type, nullability). Checked even on empty tables.

None
custom_sql str | None

SQL that must return count > 0 to pass.

None

Returns:

Type Description
QualityResult

QualityResult with pass/fail, scores, and any schema violations.

Source code in src/dataenginex/data/quality/gates.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def check_quality(
    conn: duckdb.DuckDBPyConnection,
    table: str,
    *,
    completeness: float | None = None,
    uniqueness: list[str] | None = None,
    schema: list[ColumnSpec] | None = None,
    custom_sql: str | None = None,
) -> QualityResult:
    """Run quality checks against a DuckDB table.

    Args:
        conn: Active DuckDB connection.
        table: Table name to check.
        completeness: Minimum fraction of non-null values (0.0-1.0).
        uniqueness: Columns that must be unique (no duplicates).
        schema: Expected column specs (existence, type, nullability).
               Checked even on empty tables.
        custom_sql: SQL that must return count > 0 to pass.

    Returns:
        QualityResult with pass/fail, scores, and any schema violations.
    """
    result = QualityResult(passed=True)

    # Schema checks run before the empty-table short-circuit — column
    # existence and type are independent of row count.
    if schema is not None:
        _check_schema(conn, table, schema, result)

    count_row = conn.execute(f"SELECT count(*) FROM {table}").fetchone()
    total_rows: int = int(count_row[0]) if count_row else 0

    if total_rows == 0:
        logger.info("quality check: empty table — passing vacuously", table=table)
        return result

    if completeness is not None:
        _check_completeness(conn, table, completeness, total_rows, result)

    if uniqueness is not None:
        _check_uniqueness(conn, table, uniqueness, total_rows, result)

    if custom_sql is not None:
        custom_row = conn.execute(custom_sql).fetchone()
        custom_result = int(custom_row[0]) if custom_row else 0
        result.custom_passed = custom_result > 0
        if not result.custom_passed:
            result.passed = False
            logger.warning("quality check failed: custom SQL", table=table)

    if result.passed:
        logger.info("quality check passed", table=table)

    return result