Skip to content

dataenginex.lakehouse

Storage backends, data catalog, and partitioning.

Public API::

from dataenginex.lakehouse import (
    # Storage backends
    StorageBackend,
    ParquetStorage, JsonStorage, S3Storage, GCSStorage,
    get_storage,
    # Catalog
    CatalogEntry, DataCatalog,
    # Partitioning
    PartitionStrategy, DatePartitioner, HashPartitioner,
)

StorageBackend

Bases: ABC

Abstract storage backend interface.

All lakehouse storage implementations must subclass this and provide concrete write, read, delete, list_objects, and exists methods. The interface accepts a StorageFormat hint so backends can choose serialisation.

Built-in implementations
  • LocalParquetStorage — local Parquet files (this module)
  • BigQueryStorage — Google BigQuery tables (this module)
  • JsonStorage — JSON files (dataenginex.lakehouse.storage)
  • ParquetStorage — pyarrow-backed Parquet (dataenginex.lakehouse.storage)
  • S3Storage — AWS S3 object storage (dataenginex.lakehouse.storage)
  • GCSStorage — Google Cloud Storage (dataenginex.lakehouse.storage)
Source code in src/dataenginex/core/medallion_architecture.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class StorageBackend(ABC):
    """Abstract storage backend interface.

    All lakehouse storage implementations must subclass this and provide
    concrete ``write``, ``read``, ``delete``, ``list_objects``, and
    ``exists`` methods.  The interface accepts a ``StorageFormat`` hint
    so backends can choose serialisation.

    Built-in implementations:
        - ``LocalParquetStorage`` — local Parquet files (this module)
        - ``BigQueryStorage`` — Google BigQuery tables (this module)
        - ``JsonStorage`` — JSON files (``dataenginex.lakehouse.storage``)
        - ``ParquetStorage`` — pyarrow-backed Parquet (``dataenginex.lakehouse.storage``)
        - ``S3Storage`` — AWS S3 object storage (``dataenginex.lakehouse.storage``)
        - ``GCSStorage`` — Google Cloud Storage (``dataenginex.lakehouse.storage``)
    """

    @abstractmethod
    def write(self, data: Any, path: str, format: StorageFormat) -> bool:
        """Write *data* to *path* in the given format.

        Returns ``True`` on success, ``False`` on failure.
        """
        ...

    @abstractmethod
    def read(self, path: str, format: StorageFormat) -> Any:
        """Read data from *path*.  Returns ``None`` on failure."""
        ...

    @abstractmethod
    def delete(self, path: str) -> bool:
        """Delete the resource at *path*.  Returns ``True`` on success."""
        ...

    @abstractmethod
    def list_objects(self, prefix: str = "") -> list[str]:
        """List object paths under *prefix*.

        Returns a list of relative paths.  Empty list on failure or when
        no objects match.
        """
        ...

    @abstractmethod
    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in the backend."""
        ...

write(data, path, format) abstractmethod

Write data to path in the given format.

Returns True on success, False on failure.

Source code in src/dataenginex/core/medallion_architecture.py
159
160
161
162
163
164
165
@abstractmethod
def write(self, data: Any, path: str, format: StorageFormat) -> bool:
    """Write *data* to *path* in the given format.

    Returns ``True`` on success, ``False`` on failure.
    """
    ...

read(path, format) abstractmethod

Read data from path. Returns None on failure.

Source code in src/dataenginex/core/medallion_architecture.py
167
168
169
170
@abstractmethod
def read(self, path: str, format: StorageFormat) -> Any:
    """Read data from *path*.  Returns ``None`` on failure."""
    ...

delete(path) abstractmethod

Delete the resource at path. Returns True on success.

Source code in src/dataenginex/core/medallion_architecture.py
172
173
174
175
@abstractmethod
def delete(self, path: str) -> bool:
    """Delete the resource at *path*.  Returns ``True`` on success."""
    ...

list_objects(prefix='') abstractmethod

List object paths under prefix.

Returns a list of relative paths. Empty list on failure or when no objects match.

Source code in src/dataenginex/core/medallion_architecture.py
177
178
179
180
181
182
183
184
@abstractmethod
def list_objects(self, prefix: str = "") -> list[str]:
    """List object paths under *prefix*.

    Returns a list of relative paths.  Empty list on failure or when
    no objects match.
    """
    ...

exists(path) abstractmethod

Return True if path exists in the backend.

Source code in src/dataenginex/core/medallion_architecture.py
186
187
188
189
@abstractmethod
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in the backend."""
    ...

CatalogEntry dataclass

Metadata about a single dataset in the lakehouse.

Attributes:

Name Type Description
name str

Unique dataset name.

layer str

Medallion layer ("bronze", "silver", "gold").

format str

Storage format ("parquet", "json", "delta").

location str

File path or table reference.

record_count int

Approximate number of records.

schema_fields list[str]

List of column/field names.

description str

Human-readable dataset description.

owner str

Team or user responsible for the dataset.

tags list[str]

Arbitrary labels for discovery.

created_at datetime

When the dataset was first registered.

updated_at datetime

When the entry was last updated.

metadata dict[str, Any]

Free-form context dict.

version int

Auto-incremented version counter.

Source code in src/dataenginex/lakehouse/catalog.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class CatalogEntry:
    """Metadata about a single dataset in the lakehouse.

    Attributes:
        name: Unique dataset name.
        layer: Medallion layer (``"bronze"``, ``"silver"``, ``"gold"``).
        format: Storage format (``"parquet"``, ``"json"``, ``"delta"``).
        location: File path or table reference.
        record_count: Approximate number of records.
        schema_fields: List of column/field names.
        description: Human-readable dataset description.
        owner: Team or user responsible for the dataset.
        tags: Arbitrary labels for discovery.
        created_at: When the dataset was first registered.
        updated_at: When the entry was last updated.
        metadata: Free-form context dict.
        version: Auto-incremented version counter.
    """

    name: str
    layer: str  # "bronze", "silver", "gold"
    format: str  # "parquet", "json", "delta"
    location: str  # file path or table ref
    record_count: int = 0
    schema_fields: list[str] = field(default_factory=list)
    description: str = ""
    owner: str = ""
    tags: list[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    updated_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    metadata: dict[str, Any] = field(default_factory=dict)
    version: int = 1

    def to_dict(self) -> dict[str, Any]:
        """Serialize the catalog entry to a plain dictionary."""
        d = asdict(self)
        d["created_at"] = self.created_at.isoformat()
        d["updated_at"] = self.updated_at.isoformat()
        return d

to_dict()

Serialize the catalog entry to a plain dictionary.

Source code in src/dataenginex/lakehouse/catalog.py
60
61
62
63
64
65
def to_dict(self) -> dict[str, Any]:
    """Serialize the catalog entry to a plain dictionary."""
    d = asdict(self)
    d["created_at"] = self.created_at.isoformat()
    d["updated_at"] = self.updated_at.isoformat()
    return d

DataCatalog

In-process data catalog backed by an optional JSON file.

Parameters

persist_path: When set, catalog entries are persisted to this JSON file.

Source code in src/dataenginex/lakehouse/catalog.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class DataCatalog:
    """In-process data catalog backed by an optional JSON file.

    Parameters
    ----------
    persist_path:
        When set, catalog entries are persisted to this JSON file.
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        self._entries: dict[str, CatalogEntry] = {}
        self._persist_path = Path(persist_path) if persist_path else None
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def register(self, entry: CatalogEntry) -> CatalogEntry:
        """Register or update a dataset entry."""
        existing = self._entries.get(entry.name)
        if existing:
            entry.version = existing.version + 1
            entry.created_at = existing.created_at
        entry.updated_at = datetime.now(tz=UTC)
        self._entries[entry.name] = entry
        logger.info(
            "Catalog registered: %s (layer=%s, v%d)",
            entry.name,
            entry.layer,
            entry.version,
        )
        self._save()
        return entry

    def get(self, name: str) -> CatalogEntry | None:
        """Retrieve an entry by name."""
        return self._entries.get(name)

    def search(
        self,
        *,
        layer: str | None = None,
        tags: list[str] | None = None,
        owner: str | None = None,
        name_contains: str | None = None,
    ) -> list[CatalogEntry]:
        """Search entries by criteria."""
        results = list(self._entries.values())
        if layer:
            results = [e for e in results if e.layer == layer]
        if tags:
            tag_set = set(tags)
            results = [e for e in results if tag_set.issubset(set(e.tags))]
        if owner:
            results = [e for e in results if e.owner == owner]
        if name_contains:
            results = [e for e in results if name_contains.lower() in e.name.lower()]
        return results

    def list_all(self) -> list[CatalogEntry]:
        """Return all catalog entries."""
        return list(self._entries.values())

    def delete(self, name: str) -> bool:
        """Remove an entry by name."""
        if name in self._entries:
            del self._entries[name]
            self._save()
            return True
        return False

    def summary(self) -> dict[str, Any]:
        """High-level catalog statistics."""
        layers: dict[str, int] = {}
        formats: dict[str, int] = {}
        for e in self._entries.values():
            layers[e.layer] = layers.get(e.layer, 0) + 1
            formats[e.format] = formats.get(e.format, 0) + 1
        return {
            "total_datasets": len(self._entries),
            "by_layer": layers,
            "by_format": formats,
        }

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        data = [e.to_dict() for e in self._entries.values()]
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        raw = json.loads(self._persist_path.read_text())
        for item in raw:
            item.pop("created_at", None)
            item.pop("updated_at", None)
            entry = CatalogEntry(**item)
            self._entries[entry.name] = entry
        logger.info(
            "catalog entries loaded", count=len(self._entries), path=str(self._persist_path)
        )

register(entry)

Register or update a dataset entry.

Source code in src/dataenginex/lakehouse/catalog.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def register(self, entry: CatalogEntry) -> CatalogEntry:
    """Register or update a dataset entry."""
    existing = self._entries.get(entry.name)
    if existing:
        entry.version = existing.version + 1
        entry.created_at = existing.created_at
    entry.updated_at = datetime.now(tz=UTC)
    self._entries[entry.name] = entry
    logger.info(
        "Catalog registered: %s (layer=%s, v%d)",
        entry.name,
        entry.layer,
        entry.version,
    )
    self._save()
    return entry

get(name)

Retrieve an entry by name.

Source code in src/dataenginex/lakehouse/catalog.py
102
103
104
def get(self, name: str) -> CatalogEntry | None:
    """Retrieve an entry by name."""
    return self._entries.get(name)

search(*, layer=None, tags=None, owner=None, name_contains=None)

Search entries by criteria.

Source code in src/dataenginex/lakehouse/catalog.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def search(
    self,
    *,
    layer: str | None = None,
    tags: list[str] | None = None,
    owner: str | None = None,
    name_contains: str | None = None,
) -> list[CatalogEntry]:
    """Search entries by criteria."""
    results = list(self._entries.values())
    if layer:
        results = [e for e in results if e.layer == layer]
    if tags:
        tag_set = set(tags)
        results = [e for e in results if tag_set.issubset(set(e.tags))]
    if owner:
        results = [e for e in results if e.owner == owner]
    if name_contains:
        results = [e for e in results if name_contains.lower() in e.name.lower()]
    return results

list_all()

Return all catalog entries.

Source code in src/dataenginex/lakehouse/catalog.py
127
128
129
def list_all(self) -> list[CatalogEntry]:
    """Return all catalog entries."""
    return list(self._entries.values())

delete(name)

Remove an entry by name.

Source code in src/dataenginex/lakehouse/catalog.py
131
132
133
134
135
136
137
def delete(self, name: str) -> bool:
    """Remove an entry by name."""
    if name in self._entries:
        del self._entries[name]
        self._save()
        return True
    return False

summary()

High-level catalog statistics.

Source code in src/dataenginex/lakehouse/catalog.py
139
140
141
142
143
144
145
146
147
148
149
150
def summary(self) -> dict[str, Any]:
    """High-level catalog statistics."""
    layers: dict[str, int] = {}
    formats: dict[str, int] = {}
    for e in self._entries.values():
        layers[e.layer] = layers.get(e.layer, 0) + 1
        formats[e.format] = formats.get(e.format, 0) + 1
    return {
        "total_datasets": len(self._entries),
        "by_layer": layers,
        "by_format": formats,
    }

DatePartitioner

Bases: PartitionStrategy

Partition by a date field using year=…/month=…/day=… layout.

Parameters

date_field: Name of the record field containing a date/datetime value. granularity: "day" (default), "month", or "year".

Source code in src/dataenginex/lakehouse/partitioning.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DatePartitioner(PartitionStrategy):
    """Partition by a date field using ``year=…/month=…/day=…`` layout.

    Parameters
    ----------
    date_field:
        Name of the record field containing a date/datetime value.
    granularity:
        ``"day"`` (default), ``"month"``, or ``"year"``.
    """

    def __init__(self, date_field: str = "created_at", granularity: str = "day") -> None:
        self.date_field = date_field
        if granularity not in ("day", "month", "year"):
            raise ValueError(f"granularity must be day/month/year, got {granularity!r}")
        self.granularity = granularity

    def partition_key(self, record: dict[str, Any]) -> str:
        """Return a ``year=.../month=.../day=...`` key for *record*."""
        dt = self._extract_date(record)
        parts = [f"year={dt.year}"]
        if self.granularity in ("month", "day"):
            parts.append(f"month={dt.month:02d}")
        if self.granularity == "day":
            parts.append(f"day={dt.day:02d}")
        return "/".join(parts)

    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return *base* joined with the date partition key."""
        key = self.partition_key(record)
        return f"{base}/{key}" if base else key

    def _extract_date(self, record: dict[str, Any]) -> datetime:
        value = record.get(self.date_field)
        if isinstance(value, datetime):
            return value
        if isinstance(value, str):
            # Try ISO format
            try:
                return datetime.fromisoformat(value)
            except ValueError:
                pass
        # Fallback to now
        return datetime.now(tz=UTC)

partition_key(record)

Return a year=.../month=.../day=... key for record.

Source code in src/dataenginex/lakehouse/partitioning.py
53
54
55
56
57
58
59
60
61
def partition_key(self, record: dict[str, Any]) -> str:
    """Return a ``year=.../month=.../day=...`` key for *record*."""
    dt = self._extract_date(record)
    parts = [f"year={dt.year}"]
    if self.granularity in ("month", "day"):
        parts.append(f"month={dt.month:02d}")
    if self.granularity == "day":
        parts.append(f"day={dt.day:02d}")
    return "/".join(parts)

partition_path(record, base='')

Return base joined with the date partition key.

Source code in src/dataenginex/lakehouse/partitioning.py
63
64
65
66
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return *base* joined with the date partition key."""
    key = self.partition_key(record)
    return f"{base}/{key}" if base else key

HashPartitioner

Bases: PartitionStrategy

Partition by a hash of one or more fields, distributing across n_buckets.

Parameters

fields: Record fields whose values are hashed. n_buckets: Number of hash buckets (directories).

Source code in src/dataenginex/lakehouse/partitioning.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class HashPartitioner(PartitionStrategy):
    """Partition by a hash of one or more fields, distributing across *n_buckets*.

    Parameters
    ----------
    fields:
        Record fields whose values are hashed.
    n_buckets:
        Number of hash buckets (directories).
    """

    def __init__(self, fields: list[str], n_buckets: int = 16) -> None:
        if not fields:
            raise ValueError("At least one field is required for hash partitioning")
        self.fields = fields
        self.n_buckets = max(1, n_buckets)

    def partition_key(self, record: dict[str, Any]) -> str:
        """Return a ``bucket=NNNN`` key based on hashed field values."""
        content = "|".join(str(record.get(f, "")) for f in self.fields)
        digest = hashlib.md5(content.encode()).hexdigest()  # noqa: S324
        bucket = int(digest, 16) % self.n_buckets
        return f"bucket={bucket:04d}"

    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return *base* joined with the hash-bucket partition key."""
        key = self.partition_key(record)
        return f"{base}/{key}" if base else key

partition_key(record)

Return a bucket=NNNN key based on hashed field values.

Source code in src/dataenginex/lakehouse/partitioning.py
 99
100
101
102
103
104
def partition_key(self, record: dict[str, Any]) -> str:
    """Return a ``bucket=NNNN`` key based on hashed field values."""
    content = "|".join(str(record.get(f, "")) for f in self.fields)
    digest = hashlib.md5(content.encode()).hexdigest()  # noqa: S324
    bucket = int(digest, 16) % self.n_buckets
    return f"bucket={bucket:04d}"

partition_path(record, base='')

Return base joined with the hash-bucket partition key.

Source code in src/dataenginex/lakehouse/partitioning.py
106
107
108
109
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return *base* joined with the hash-bucket partition key."""
    key = self.partition_key(record)
    return f"{base}/{key}" if base else key

PartitionStrategy

Bases: ABC

Base class for partitioning strategies.

Source code in src/dataenginex/lakehouse/partitioning.py
22
23
24
25
26
27
28
29
30
31
32
33
class PartitionStrategy(ABC):
    """Base class for partitioning strategies."""

    @abstractmethod
    def partition_key(self, record: dict[str, Any]) -> str:
        """Return the partition path segment for *record*."""
        ...

    @abstractmethod
    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return the full relative path (base + partition) for *record*."""
        ...

partition_key(record) abstractmethod

Return the partition path segment for record.

Source code in src/dataenginex/lakehouse/partitioning.py
25
26
27
28
@abstractmethod
def partition_key(self, record: dict[str, Any]) -> str:
    """Return the partition path segment for *record*."""
    ...

partition_path(record, base='') abstractmethod

Return the full relative path (base + partition) for record.

Source code in src/dataenginex/lakehouse/partitioning.py
30
31
32
33
@abstractmethod
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return the full relative path (base + partition) for *record*."""
    ...

BigQueryStorage

Bases: StorageBackend

Google BigQuery storage backend.

Reads/writes JSON rows to BigQuery tables. Requires google-cloud-bigquery at runtime.

Path convention: dataset.table — the path argument is split on the first "." to derive the dataset and table identifiers.

Parameters

project_id: GCP project ID. dataset: Default BigQuery dataset for operations when path does not contain a "." separator. Defaults to "dex". location: BigQuery dataset location (default "US"). client: Optional pre-configured bigquery.Client (useful for tests).

Source code in src/dataenginex/lakehouse/storage.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
class BigQueryStorage(StorageBackend):
    """Google BigQuery storage backend.

    Reads/writes JSON rows to BigQuery tables.  Requires
    ``google-cloud-bigquery`` at runtime.

    Path convention: ``dataset.table`` — the *path* argument is split on
    the first ``"."`` to derive the dataset and table identifiers.

    Parameters
    ----------
    project_id:
        GCP project ID.
    dataset:
        Default BigQuery dataset for operations when *path* does not
        contain a ``"."`` separator.  Defaults to ``"dex"``.
    location:
        BigQuery dataset location (default ``"US"``).
    client:
        Optional pre-configured ``bigquery.Client`` (useful for tests).
    """

    def __init__(
        self,
        project_id: str,
        dataset: str = "dex",
        location: str = "US",
        client: Any = None,
    ) -> None:
        self.project_id = project_id
        self.dataset = dataset
        self.location = location

        if client is not None:
            self._client = client
            logger.info("bigquery storage initialised with injected client", project=project_id)
        elif not _HAS_BIGQUERY:
            logger.warning("google-cloud-bigquery not installed — install dataenginex[cloud]")
            self._client = None
        else:
            self._client = bq_client.Client(project=project_id, location=location)
            logger.info("bigquery storage initialised", project=project_id)

    def _table_ref(self, path: str) -> str:
        """Resolve *path* to ``project.dataset.table``."""
        if "." in path:
            ds, table = path.split(".", 1)
        else:
            ds, table = self.dataset, path
        return f"{self.project_id}.{ds}.{table}"

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.BIGQUERY,
    ) -> bool:
        """Load *data* (list of dicts) into a BigQuery table."""
        if self._client is None:
            logger.error("bigquery storage: google-cloud-bigquery not available")
            return False
        try:
            records = data if isinstance(data, list) else [data]
            table_ref = self._table_ref(path)
            job_config: Any = None
            try:
                from google.cloud import bigquery as bq  # noqa: PLC0415

                job_config = bq.LoadJobConfig(
                    source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON,
                    autodetect=True,
                    write_disposition=bq.WriteDisposition.WRITE_APPEND,
                )
            except ImportError:
                pass  # injected client (tests) — job_config left as None
            job = self._client.load_table_from_json(
                records,
                table_ref,
                job_config=job_config,
            )
            job.result()  # block until complete
            logger.info("wrote records to bigquery", count=len(records), table=table_ref)
            return True
        except Exception as exc:
            logger.error("bigquery storage write failed", exc=str(exc))
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> Any:
        """Query all rows from a BigQuery table."""
        if self._client is None:
            logger.error("bigquery storage: google-cloud-bigquery not available")
            return None
        try:
            table_ref = self._table_ref(path)
            rows = self._client.list_rows(table_ref)
            return [dict(row) for row in rows]
        except Exception as exc:
            logger.error("bigquery storage read failed", exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete a BigQuery table."""
        if self._client is None:
            logger.error("bigquery storage: google-cloud-bigquery not available")
            return False
        try:
            table_ref = self._table_ref(path)
            self._client.delete_table(table_ref, not_found_ok=True)
            logger.info("deleted bigquery table", table=table_ref)
            return True
        except Exception as exc:
            logger.error("bigquery storage delete failed", exc=str(exc))
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List tables in the dataset, optionally filtered by *prefix*."""
        if self._client is None:
            return []
        try:
            dataset_ref = f"{self.project_id}.{self.dataset}"
            tables = self._client.list_tables(dataset_ref)
            names = [t.table_id for t in tables]
            if prefix:
                names = [n for n in names if n.startswith(prefix)]
            return names
        except Exception as exc:
            logger.error("bigquery storage list_objects failed", exc=str(exc))
            return []

    def exists(self, path: str) -> bool:
        """Return ``True`` if the BigQuery table exists."""
        if self._client is None:
            return False
        try:
            table_ref = self._table_ref(path)
            self._client.get_table(table_ref)
            return True
        except Exception as exc:
            error_type = type(exc).__name__
            if error_type == "NotFound":
                return False
            logger.error("bigquery storage exists check failed", exc=str(exc))
            raise

write(data, path, format=StorageFormat.BIGQUERY)

Load data (list of dicts) into a BigQuery table.

Source code in src/dataenginex/lakehouse/storage.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.BIGQUERY,
) -> bool:
    """Load *data* (list of dicts) into a BigQuery table."""
    if self._client is None:
        logger.error("bigquery storage: google-cloud-bigquery not available")
        return False
    try:
        records = data if isinstance(data, list) else [data]
        table_ref = self._table_ref(path)
        job_config: Any = None
        try:
            from google.cloud import bigquery as bq  # noqa: PLC0415

            job_config = bq.LoadJobConfig(
                source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON,
                autodetect=True,
                write_disposition=bq.WriteDisposition.WRITE_APPEND,
            )
        except ImportError:
            pass  # injected client (tests) — job_config left as None
        job = self._client.load_table_from_json(
            records,
            table_ref,
            job_config=job_config,
        )
        job.result()  # block until complete
        logger.info("wrote records to bigquery", count=len(records), table=table_ref)
        return True
    except Exception as exc:
        logger.error("bigquery storage write failed", exc=str(exc))
        return False

read(path, format=StorageFormat.BIGQUERY)

Query all rows from a BigQuery table.

Source code in src/dataenginex/lakehouse/storage.py
486
487
488
489
490
491
492
493
494
495
496
497
def read(self, path: str, format: StorageFormat = StorageFormat.BIGQUERY) -> Any:
    """Query all rows from a BigQuery table."""
    if self._client is None:
        logger.error("bigquery storage: google-cloud-bigquery not available")
        return None
    try:
        table_ref = self._table_ref(path)
        rows = self._client.list_rows(table_ref)
        return [dict(row) for row in rows]
    except Exception as exc:
        logger.error("bigquery storage read failed", exc=str(exc))
        return None

delete(path)

Delete a BigQuery table.

Source code in src/dataenginex/lakehouse/storage.py
499
500
501
502
503
504
505
506
507
508
509
510
511
def delete(self, path: str) -> bool:
    """Delete a BigQuery table."""
    if self._client is None:
        logger.error("bigquery storage: google-cloud-bigquery not available")
        return False
    try:
        table_ref = self._table_ref(path)
        self._client.delete_table(table_ref, not_found_ok=True)
        logger.info("deleted bigquery table", table=table_ref)
        return True
    except Exception as exc:
        logger.error("bigquery storage delete failed", exc=str(exc))
        return False

list_objects(prefix='')

List tables in the dataset, optionally filtered by prefix.

Source code in src/dataenginex/lakehouse/storage.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def list_objects(self, prefix: str = "") -> list[str]:
    """List tables in the dataset, optionally filtered by *prefix*."""
    if self._client is None:
        return []
    try:
        dataset_ref = f"{self.project_id}.{self.dataset}"
        tables = self._client.list_tables(dataset_ref)
        names = [t.table_id for t in tables]
        if prefix:
            names = [n for n in names if n.startswith(prefix)]
        return names
    except Exception as exc:
        logger.error("bigquery storage list_objects failed", exc=str(exc))
        return []

exists(path)

Return True if the BigQuery table exists.

Source code in src/dataenginex/lakehouse/storage.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def exists(self, path: str) -> bool:
    """Return ``True`` if the BigQuery table exists."""
    if self._client is None:
        return False
    try:
        table_ref = self._table_ref(path)
        self._client.get_table(table_ref)
        return True
    except Exception as exc:
        error_type = type(exc).__name__
        if error_type == "NotFound":
            return False
        logger.error("bigquery storage exists check failed", exc=str(exc))
        raise

GCSStorage

Bases: StorageBackend

Google Cloud Storage backend.

Reads/writes JSON-serialised records to a GCS bucket. Requires google-cloud-storage at runtime.

Parameters

bucket: GCS bucket name. prefix: Key prefix for all objects (default ""). project: GCP project ID (optional, uses ADC default). api_endpoint: Custom API endpoint for GCS-compatible services (e.g. fake-gcs-server). When None, the default Google endpoint is used.

Source code in src/dataenginex/lakehouse/storage.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
class GCSStorage(StorageBackend):
    """Google Cloud Storage backend.

    Reads/writes JSON-serialised records to a GCS bucket.  Requires
    ``google-cloud-storage`` at runtime.

    Parameters
    ----------
    bucket:
        GCS bucket name.
    prefix:
        Key prefix for all objects (default ``""``).
    project:
        GCP project ID (optional, uses ADC default).
    api_endpoint:
        Custom API endpoint for GCS-compatible services (e.g.
        ``fake-gcs-server``).  When ``None``, the default Google
        endpoint is used.
    """

    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        project: str | None = None,
        api_endpoint: str | None = None,
    ) -> None:
        self.bucket_name = bucket
        self.prefix = prefix.rstrip("/")

        if not _HAS_GCS:
            logger.warning("google-cloud-storage not installed — gcs storage operations will fail")
            self._bucket = None
        else:
            if api_endpoint:
                # Use anonymous credentials + ClientOptions for local emulators
                from google.api_core.client_options import ClientOptions  # noqa: PLC0415
                from google.auth.credentials import (
                    AnonymousCredentials,  # type: ignore[import-untyped]  # noqa: PLC0415, E501
                )

                client = gcs_storage.Client(
                    project=project or "test-project",
                    credentials=AnonymousCredentials(),  # type: ignore[no-untyped-call]
                    client_options=ClientOptions(api_endpoint=api_endpoint),
                )
            else:
                client = gcs_storage.Client(project=project)
            self._bucket = client.bucket(bucket)
            logger.info("gcs storage initialised", uri=f"gs://{bucket}/{prefix}")

    def _blob_name(self, path: str) -> str:
        return f"{self.prefix}/{path}" if self.prefix else path

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write JSON-serialised data to GCS."""
        if self._bucket is None:
            logger.error("gcs storage: google-cloud-storage not available")
            return False
        try:
            records = data if isinstance(data, list) else [data]
            body = json.dumps(records, default=str)
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            blob.upload_from_string(body, content_type="application/json")
            logger.info(
                "wrote records to gcs",
                count=len(records),
                uri=f"gs://{self.bucket_name}/{blob.name}",
            )
            return True
        except Exception as exc:
            logger.error("gcs storage write failed", exc=str(exc))
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read JSON data from GCS."""
        if self._bucket is None:
            logger.error("gcs storage: google-cloud-storage not available")
            return None
        try:
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            body = blob.download_as_text()
            return json.loads(body)
        except Exception as exc:
            logger.error("gcs storage read failed", exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete object from GCS."""
        if self._bucket is None:
            logger.error("gcs storage: google-cloud-storage not available")
            return False
        try:
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            blob.delete()
            logger.info("deleted gcs object", uri=f"gs://{self.bucket_name}/{blob.name}")
            return True
        except Exception as exc:
            logger.error("gcs storage delete failed", exc=str(exc))
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List GCS objects under *prefix*."""
        if self._bucket is None:
            return []
        try:
            full_prefix = self._blob_name(prefix)
            return [blob.name for blob in self._bucket.list_blobs(prefix=full_prefix)]
        except Exception as exc:
            logger.error("gcs storage list_objects failed", exc=str(exc))
            return []

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in GCS."""
        if self._bucket is None:
            return False
        try:
            result: bool = self._bucket.blob(self._blob_name(f"{path}.json")).exists()
            return result
        except (AttributeError, TypeError):
            return False
        except Exception as exc:
            # Surface auth/permission errors instead of silently returning False
            logger.error("gcs storage exists check failed", exc=str(exc))
            raise

write(data, path, format=StorageFormat.PARQUET)

Write JSON-serialised data to GCS.

Source code in src/dataenginex/lakehouse/storage.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write JSON-serialised data to GCS."""
    if self._bucket is None:
        logger.error("gcs storage: google-cloud-storage not available")
        return False
    try:
        records = data if isinstance(data, list) else [data]
        body = json.dumps(records, default=str)
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        blob.upload_from_string(body, content_type="application/json")
        logger.info(
            "wrote records to gcs",
            count=len(records),
            uri=f"gs://{self.bucket_name}/{blob.name}",
        )
        return True
    except Exception as exc:
        logger.error("gcs storage write failed", exc=str(exc))
        return False

read(path, format=StorageFormat.PARQUET)

Read JSON data from GCS.

Source code in src/dataenginex/lakehouse/storage.py
623
624
625
626
627
628
629
630
631
632
633
634
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read JSON data from GCS."""
    if self._bucket is None:
        logger.error("gcs storage: google-cloud-storage not available")
        return None
    try:
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        body = blob.download_as_text()
        return json.loads(body)
    except Exception as exc:
        logger.error("gcs storage read failed", exc=str(exc))
        return None

delete(path)

Delete object from GCS.

Source code in src/dataenginex/lakehouse/storage.py
636
637
638
639
640
641
642
643
644
645
646
647
648
def delete(self, path: str) -> bool:
    """Delete object from GCS."""
    if self._bucket is None:
        logger.error("gcs storage: google-cloud-storage not available")
        return False
    try:
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        blob.delete()
        logger.info("deleted gcs object", uri=f"gs://{self.bucket_name}/{blob.name}")
        return True
    except Exception as exc:
        logger.error("gcs storage delete failed", exc=str(exc))
        return False

list_objects(prefix='')

List GCS objects under prefix.

Source code in src/dataenginex/lakehouse/storage.py
650
651
652
653
654
655
656
657
658
659
def list_objects(self, prefix: str = "") -> list[str]:
    """List GCS objects under *prefix*."""
    if self._bucket is None:
        return []
    try:
        full_prefix = self._blob_name(prefix)
        return [blob.name for blob in self._bucket.list_blobs(prefix=full_prefix)]
    except Exception as exc:
        logger.error("gcs storage list_objects failed", exc=str(exc))
        return []

exists(path)

Return True if path exists in GCS.

Source code in src/dataenginex/lakehouse/storage.py
661
662
663
664
665
666
667
668
669
670
671
672
673
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in GCS."""
    if self._bucket is None:
        return False
    try:
        result: bool = self._bucket.blob(self._blob_name(f"{path}.json")).exists()
        return result
    except (AttributeError, TypeError):
        return False
    except Exception as exc:
        # Surface auth/permission errors instead of silently returning False
        logger.error("gcs storage exists check failed", exc=str(exc))
        raise

JsonStorage

Bases: StorageBackend

Simple JSON-file storage for development and testing.

Each write call serialises data (list of dicts) as a JSON array.

Source code in src/dataenginex/lakehouse/storage.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class JsonStorage(StorageBackend):
    """Simple JSON-file storage for development and testing.

    Each ``write`` call serialises *data* (list of dicts) as a JSON array.
    """

    def __init__(self, base_path: str = "data") -> None:
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        logger.info("json storage initialised", path=str(self.base_path))

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Serialize *data* as JSON and write to *path*."""
        try:
            full = self.base_path / f"{path}.json"
            full.parent.mkdir(parents=True, exist_ok=True)
            records = self._normalise(data)
            full.write_text(json.dumps(records, indent=2, default=str))
            logger.info("wrote records", count=len(records), path=str(full))
            return True
        except Exception as exc:
            logger.error("json storage write failed", exc=str(exc))
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read and deserialize a JSON file at *path*."""
        try:
            full = self.base_path / f"{path}.json"
            if not full.exists():
                logger.warning("file not found", path=str(full))
                return None
            return json.loads(full.read_text())
        except Exception as exc:
            logger.error("json storage read failed", exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete the JSON file at *path* if it exists."""
        try:
            full = self.base_path / f"{path}.json"
            if full.exists():
                full.unlink()
                logger.info("deleted file", path=str(full))
            return True
        except Exception as exc:
            logger.error("json storage delete failed", exc=str(exc))
            return False

    @staticmethod
    def _normalise(data: Any) -> list[dict[str, Any]]:
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return [{"value": data}]

    def list_objects(self, prefix: str = "") -> list[str]:
        """List JSON entries under *prefix*."""
        target = self.base_path / prefix
        if not target.exists():
            return []
        return [
            str(p.relative_to(self.base_path).with_suffix(""))
            for p in target.rglob("*.json")
            if p.is_file()
        ]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* has a corresponding JSON file."""
        return (self.base_path / f"{path}.json").exists()

write(data, path, format=StorageFormat.PARQUET)

Serialize data as JSON and write to path.

Source code in src/dataenginex/lakehouse/storage.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Serialize *data* as JSON and write to *path*."""
    try:
        full = self.base_path / f"{path}.json"
        full.parent.mkdir(parents=True, exist_ok=True)
        records = self._normalise(data)
        full.write_text(json.dumps(records, indent=2, default=str))
        logger.info("wrote records", count=len(records), path=str(full))
        return True
    except Exception as exc:
        logger.error("json storage write failed", exc=str(exc))
        return False

read(path, format=StorageFormat.PARQUET)

Read and deserialize a JSON file at path.

Source code in src/dataenginex/lakehouse/storage.py
81
82
83
84
85
86
87
88
89
90
91
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read and deserialize a JSON file at *path*."""
    try:
        full = self.base_path / f"{path}.json"
        if not full.exists():
            logger.warning("file not found", path=str(full))
            return None
        return json.loads(full.read_text())
    except Exception as exc:
        logger.error("json storage read failed", exc=str(exc))
        return None

delete(path)

Delete the JSON file at path if it exists.

Source code in src/dataenginex/lakehouse/storage.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def delete(self, path: str) -> bool:
    """Delete the JSON file at *path* if it exists."""
    try:
        full = self.base_path / f"{path}.json"
        if full.exists():
            full.unlink()
            logger.info("deleted file", path=str(full))
        return True
    except Exception as exc:
        logger.error("json storage delete failed", exc=str(exc))
        return False

list_objects(prefix='')

List JSON entries under prefix.

Source code in src/dataenginex/lakehouse/storage.py
113
114
115
116
117
118
119
120
121
122
def list_objects(self, prefix: str = "") -> list[str]:
    """List JSON entries under *prefix*."""
    target = self.base_path / prefix
    if not target.exists():
        return []
    return [
        str(p.relative_to(self.base_path).with_suffix(""))
        for p in target.rglob("*.json")
        if p.is_file()
    ]

exists(path)

Return True if path has a corresponding JSON file.

Source code in src/dataenginex/lakehouse/storage.py
124
125
126
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* has a corresponding JSON file."""
    return (self.base_path / f"{path}.json").exists()

ParquetStorage

Bases: StorageBackend

Parquet file storage backed by pyarrow.

Falls back to JsonStorage when pyarrow is not installed.

Source code in src/dataenginex/lakehouse/storage.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class ParquetStorage(StorageBackend):
    """Parquet file storage backed by *pyarrow*.

    Falls back to ``JsonStorage`` when *pyarrow* is not installed.
    """

    def __init__(self, base_path: str = "data", compression: str = "snappy") -> None:
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        self.compression = compression

        if _HAS_PYARROW:
            logger.info("parquet storage initialised", path=str(self.base_path))
        else:
            logger.warning("pyarrow not installed — parquet storage will use json fallback")
            self._fallback = JsonStorage(str(self.base_path))

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write *data* as a Parquet file at *path*."""
        if not _HAS_PYARROW:
            return self._fallback.write(data, path, format)

        try:
            full = self.base_path / f"{path}.parquet"
            full.parent.mkdir(parents=True, exist_ok=True)
            records = self._to_records(data)
            if not records:
                logger.warning("no records to write", path=str(full))
                return False
            table = pa.Table.from_pylist(records)
            pq.write_table(table, str(full), compression=self.compression)
            logger.info("wrote records", count=len(records), path=str(full))
            return True
        except Exception as exc:
            logger.error("parquet storage write failed", exc=str(exc))
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read a Parquet file at *path* and return records."""
        if not _HAS_PYARROW:
            return self._fallback.read(path, format)

        try:
            full = self.base_path / f"{path}.parquet"
            if not full.exists():
                logger.warning("parquet file not found", path=str(full))
                return None
            table = pq.read_table(str(full))
            return table.to_pylist()
        except Exception as exc:
            logger.error("parquet storage read failed", exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete the Parquet file at *path* if it exists."""
        if not _HAS_PYARROW:
            return self._fallback.delete(path)

        try:
            full = self.base_path / f"{path}.parquet"
            if full.exists():
                full.unlink()
                logger.info("deleted file", path=str(full))
            return True
        except Exception as exc:
            logger.error("parquet storage delete failed", exc=str(exc))
            return False

    @staticmethod
    def _to_records(data: Any) -> list[dict[str, Any]]:
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []

    def list_objects(self, prefix: str = "") -> list[str]:
        """List Parquet entries under *prefix*."""
        if not _HAS_PYARROW:
            return self._fallback.list_objects(prefix)
        target = self.base_path / prefix
        if not target.exists():
            return []
        return [
            str(p.relative_to(self.base_path).with_suffix(""))
            for p in target.rglob("*.parquet")
            if p.is_file()
        ]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* has a corresponding Parquet file."""
        if not _HAS_PYARROW:
            return self._fallback.exists(path)
        return (self.base_path / f"{path}.parquet").exists()

write(data, path, format=StorageFormat.PARQUET)

Write data as a Parquet file at path.

Source code in src/dataenginex/lakehouse/storage.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write *data* as a Parquet file at *path*."""
    if not _HAS_PYARROW:
        return self._fallback.write(data, path, format)

    try:
        full = self.base_path / f"{path}.parquet"
        full.parent.mkdir(parents=True, exist_ok=True)
        records = self._to_records(data)
        if not records:
            logger.warning("no records to write", path=str(full))
            return False
        table = pa.Table.from_pylist(records)
        pq.write_table(table, str(full), compression=self.compression)
        logger.info("wrote records", count=len(records), path=str(full))
        return True
    except Exception as exc:
        logger.error("parquet storage write failed", exc=str(exc))
        return False

read(path, format=StorageFormat.PARQUET)

Read a Parquet file at path and return records.

Source code in src/dataenginex/lakehouse/storage.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read a Parquet file at *path* and return records."""
    if not _HAS_PYARROW:
        return self._fallback.read(path, format)

    try:
        full = self.base_path / f"{path}.parquet"
        if not full.exists():
            logger.warning("parquet file not found", path=str(full))
            return None
        table = pq.read_table(str(full))
        return table.to_pylist()
    except Exception as exc:
        logger.error("parquet storage read failed", exc=str(exc))
        return None

delete(path)

Delete the Parquet file at path if it exists.

Source code in src/dataenginex/lakehouse/storage.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def delete(self, path: str) -> bool:
    """Delete the Parquet file at *path* if it exists."""
    if not _HAS_PYARROW:
        return self._fallback.delete(path)

    try:
        full = self.base_path / f"{path}.parquet"
        if full.exists():
            full.unlink()
            logger.info("deleted file", path=str(full))
        return True
    except Exception as exc:
        logger.error("parquet storage delete failed", exc=str(exc))
        return False

list_objects(prefix='')

List Parquet entries under prefix.

Source code in src/dataenginex/lakehouse/storage.py
215
216
217
218
219
220
221
222
223
224
225
226
def list_objects(self, prefix: str = "") -> list[str]:
    """List Parquet entries under *prefix*."""
    if not _HAS_PYARROW:
        return self._fallback.list_objects(prefix)
    target = self.base_path / prefix
    if not target.exists():
        return []
    return [
        str(p.relative_to(self.base_path).with_suffix(""))
        for p in target.rglob("*.parquet")
        if p.is_file()
    ]

exists(path)

Return True if path has a corresponding Parquet file.

Source code in src/dataenginex/lakehouse/storage.py
228
229
230
231
232
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* has a corresponding Parquet file."""
    if not _HAS_PYARROW:
        return self._fallback.exists(path)
    return (self.base_path / f"{path}.parquet").exists()

S3Storage

Bases: StorageBackend

AWS S3 object storage backend.

Reads/writes JSON-serialised records to an S3 bucket. Requires boto3 at runtime.

Parameters

bucket: S3 bucket name. prefix: Key prefix for all objects (default ""). region: AWS region (default "us-east-1"). endpoint_url: Custom endpoint for S3-compatible services (e.g. LocalStack). When None, the default AWS endpoint is used.

Source code in src/dataenginex/lakehouse/storage.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
class S3Storage(StorageBackend):
    """AWS S3 object storage backend.

    Reads/writes JSON-serialised records to an S3 bucket.  Requires
    ``boto3`` at runtime.

    Parameters
    ----------
    bucket:
        S3 bucket name.
    prefix:
        Key prefix for all objects (default ``""``).
    region:
        AWS region (default ``"us-east-1"``).
    endpoint_url:
        Custom endpoint for S3-compatible services (e.g. LocalStack).
        When ``None``, the default AWS endpoint is used.
    """

    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        region: str = "us-east-1",
        endpoint_url: str | None = None,
    ) -> None:
        self.bucket = bucket
        self.prefix = prefix.rstrip("/")
        self.region = region
        self.endpoint_url = endpoint_url

        if not _HAS_BOTO3:
            logger.warning("boto3 not installed — s3 storage operations will fail")
            self._client = None
        else:
            client_kwargs: dict[str, Any] = {"region_name": region}
            if endpoint_url:
                client_kwargs["endpoint_url"] = endpoint_url
            self._client = boto3.client("s3", **client_kwargs)
            logger.info("s3 storage initialised", uri=f"s3://{bucket}/{prefix}")

    def _key(self, path: str) -> str:
        return f"{self.prefix}/{path}" if self.prefix else path

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write JSON-serialised data to S3."""
        if self._client is None:
            logger.error("s3 storage: boto3 not available")
            return False
        try:
            records = data if isinstance(data, list) else [data]
            body = json.dumps(records, default=str).encode()
            key = self._key(f"{path}.json")
            self._client.put_object(Bucket=self.bucket, Key=key, Body=body)
            logger.info("wrote records to s3", count=len(records), uri=f"s3://{self.bucket}/{key}")
            return True
        except Exception as exc:
            logger.error("s3 storage write failed", exc=str(exc))
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read JSON data from S3."""
        if self._client is None:
            logger.error("s3 storage: boto3 not available")
            return None
        try:
            key = self._key(f"{path}.json")
            response = self._client.get_object(Bucket=self.bucket, Key=key)
            body = response["Body"].read().decode()
            return json.loads(body)
        except Exception as exc:
            logger.error("s3 storage read failed", exc=str(exc))
            return None

    def delete(self, path: str) -> bool:
        """Delete object from S3."""
        if self._client is None:
            logger.error("s3 storage: boto3 not available")
            return False
        try:
            key = self._key(f"{path}.json")
            self._client.delete_object(Bucket=self.bucket, Key=key)
            logger.info("deleted s3 object", uri=f"s3://{self.bucket}/{key}")
            return True
        except Exception as exc:
            logger.error("s3 storage delete failed", exc=str(exc))
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List S3 objects under *prefix*."""
        if self._client is None:
            return []
        try:
            full_prefix = self._key(prefix)
            resp = self._client.list_objects_v2(Bucket=self.bucket, Prefix=full_prefix)
            return [obj["Key"] for obj in resp.get("Contents", [])]
        except Exception as exc:
            logger.error("s3 storage list_objects failed", exc=str(exc))
            return []

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in S3."""
        if self._client is None:
            return False
        try:
            self._client.head_object(Bucket=self.bucket, Key=self._key(f"{path}.json"))
            return True
        except self._client.exceptions.NoSuchKey:
            return False
        except Exception as exc:
            # Surface auth/permission errors instead of silently returning False
            error_code = getattr(
                getattr(exc, "response", None),
                "Error",
                {},
            ).get("Code", "")
            if error_code in ("404", "NoSuchKey"):
                return False
            logger.error("s3 storage exists check failed", exc=str(exc))
            raise

write(data, path, format=StorageFormat.PARQUET)

Write JSON-serialised data to S3.

Source code in src/dataenginex/lakehouse/storage.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write JSON-serialised data to S3."""
    if self._client is None:
        logger.error("s3 storage: boto3 not available")
        return False
    try:
        records = data if isinstance(data, list) else [data]
        body = json.dumps(records, default=str).encode()
        key = self._key(f"{path}.json")
        self._client.put_object(Bucket=self.bucket, Key=key, Body=body)
        logger.info("wrote records to s3", count=len(records), uri=f"s3://{self.bucket}/{key}")
        return True
    except Exception as exc:
        logger.error("s3 storage write failed", exc=str(exc))
        return False

read(path, format=StorageFormat.PARQUET)

Read JSON data from S3.

Source code in src/dataenginex/lakehouse/storage.py
312
313
314
315
316
317
318
319
320
321
322
323
324
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read JSON data from S3."""
    if self._client is None:
        logger.error("s3 storage: boto3 not available")
        return None
    try:
        key = self._key(f"{path}.json")
        response = self._client.get_object(Bucket=self.bucket, Key=key)
        body = response["Body"].read().decode()
        return json.loads(body)
    except Exception as exc:
        logger.error("s3 storage read failed", exc=str(exc))
        return None

delete(path)

Delete object from S3.

Source code in src/dataenginex/lakehouse/storage.py
326
327
328
329
330
331
332
333
334
335
336
337
338
def delete(self, path: str) -> bool:
    """Delete object from S3."""
    if self._client is None:
        logger.error("s3 storage: boto3 not available")
        return False
    try:
        key = self._key(f"{path}.json")
        self._client.delete_object(Bucket=self.bucket, Key=key)
        logger.info("deleted s3 object", uri=f"s3://{self.bucket}/{key}")
        return True
    except Exception as exc:
        logger.error("s3 storage delete failed", exc=str(exc))
        return False

list_objects(prefix='')

List S3 objects under prefix.

Source code in src/dataenginex/lakehouse/storage.py
340
341
342
343
344
345
346
347
348
349
350
def list_objects(self, prefix: str = "") -> list[str]:
    """List S3 objects under *prefix*."""
    if self._client is None:
        return []
    try:
        full_prefix = self._key(prefix)
        resp = self._client.list_objects_v2(Bucket=self.bucket, Prefix=full_prefix)
        return [obj["Key"] for obj in resp.get("Contents", [])]
    except Exception as exc:
        logger.error("s3 storage list_objects failed", exc=str(exc))
        return []

exists(path)

Return True if path exists in S3.

Source code in src/dataenginex/lakehouse/storage.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in S3."""
    if self._client is None:
        return False
    try:
        self._client.head_object(Bucket=self.bucket, Key=self._key(f"{path}.json"))
        return True
    except self._client.exceptions.NoSuchKey:
        return False
    except Exception as exc:
        # Surface auth/permission errors instead of silently returning False
        error_code = getattr(
            getattr(exc, "response", None),
            "Error",
            {},
        ).get("Code", "")
        if error_code in ("404", "NoSuchKey"):
            return False
        logger.error("s3 storage exists check failed", exc=str(exc))
        raise

get_storage(uri, **kwargs)

Create a :class:StorageBackend from a URI scheme.

Supported schemes:

  • file:// (or no scheme) → :class:JsonStorage
  • s3://bucket/prefix → :class:S3Storage
  • gs://bucket/prefix → :class:GCSStorage

Extra kwargs are forwarded to the backend constructor.

Raises

ValueError If the URI scheme is not supported.

Source code in src/dataenginex/lakehouse/storage.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
def get_storage(uri: str, **kwargs: Any) -> StorageBackend:
    """Create a :class:`StorageBackend` from a URI scheme.

    Supported schemes:

    * ``file://`` (or no scheme) → :class:`JsonStorage`
    * ``s3://bucket/prefix``     → :class:`S3Storage`
    * ``gs://bucket/prefix``     → :class:`GCSStorage`

    Extra *kwargs* are forwarded to the backend constructor.

    Raises
    ------
    ValueError
        If the URI scheme is not supported.
    """
    from urllib.parse import urlparse

    parsed = urlparse(uri)
    if parsed.scheme in ("", "file"):
        path = parsed.path or "data"
        return JsonStorage(base_path=path, **kwargs)
    if parsed.scheme == "s3":
        return S3Storage(
            bucket=parsed.netloc,
            prefix=parsed.path.lstrip("/"),
            **kwargs,
        )
    if parsed.scheme == "gs":
        return GCSStorage(
            bucket=parsed.netloc,
            prefix=parsed.path.lstrip("/"),
            **kwargs,
        )
    if parsed.scheme == "bq":
        return BigQueryStorage(
            project_id=parsed.netloc,
            dataset=parsed.path.lstrip("/") or "dex",
            **kwargs,
        )
    msg = f"Unsupported storage URI scheme: {parsed.scheme!r}"
    raise ValueError(msg)