Skip to content

dataenginex.lakehouse

Storage backends, data catalog, and partitioning strategies.

dataenginex.lakehouse

Storage backends, data catalog, and partitioning.

Public API::

from dataenginex.lakehouse import (
    # Storage backends
    StorageBackend,
    ParquetStorage, JsonStorage, S3Storage, GCSStorage,
    get_storage,
    # Catalog
    CatalogEntry, DataCatalog,
    # Partitioning
    PartitionStrategy, DatePartitioner, HashPartitioner,
)

StorageBackend

Bases: ABC

Abstract storage backend interface.

All lakehouse storage implementations must subclass this and provide concrete write, read, delete, list_objects, and exists methods. The interface accepts a StorageFormat hint so backends can choose serialisation.

Built-in implementations
  • LocalParquetStorage — local Parquet files (this module)
  • BigQueryStorage — Google BigQuery tables (this module)
  • JsonStorage — JSON files (dataenginex.lakehouse.storage)
  • ParquetStorage — pyarrow-backed Parquet (dataenginex.lakehouse.storage)
  • S3Storage — AWS S3 object storage (dataenginex.lakehouse.storage)
  • GCSStorage — Google Cloud Storage (dataenginex.lakehouse.storage)
Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
class StorageBackend(ABC):
    """Abstract storage backend interface.

    All lakehouse storage implementations must subclass this and provide
    concrete ``write``, ``read``, ``delete``, ``list_objects``, and
    ``exists`` methods.  The interface accepts a ``StorageFormat`` hint
    so backends can choose serialisation.

    Built-in implementations:
        - ``LocalParquetStorage`` — local Parquet files (this module)
        - ``BigQueryStorage`` — Google BigQuery tables (this module)
        - ``JsonStorage`` — JSON files (``dataenginex.lakehouse.storage``)
        - ``ParquetStorage`` — pyarrow-backed Parquet (``dataenginex.lakehouse.storage``)
        - ``S3Storage`` — AWS S3 object storage (``dataenginex.lakehouse.storage``)
        - ``GCSStorage`` — Google Cloud Storage (``dataenginex.lakehouse.storage``)
    """

    @abstractmethod
    def write(self, data: Any, path: str, format: StorageFormat) -> bool:
        """Write *data* to *path* in the given format.

        Returns ``True`` on success, ``False`` on failure.
        """
        ...

    @abstractmethod
    def read(self, path: str, format: StorageFormat) -> Any:
        """Read data from *path*.  Returns ``None`` on failure."""
        ...

    @abstractmethod
    def delete(self, path: str) -> bool:
        """Delete the resource at *path*.  Returns ``True`` on success."""
        ...

    @abstractmethod
    def list_objects(self, prefix: str = "") -> list[str]:
        """List object paths under *prefix*.

        Returns a list of relative paths.  Empty list on failure or when
        no objects match.
        """
        ...

    @abstractmethod
    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in the backend."""
        ...

write(data, path, format) abstractmethod

Write data to path in the given format.

Returns True on success, False on failure.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def write(self, data: Any, path: str, format: StorageFormat) -> bool:
    """Write *data* to *path* in the given format.

    Returns ``True`` on success, ``False`` on failure.
    """
    ...

read(path, format) abstractmethod

Read data from path. Returns None on failure.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def read(self, path: str, format: StorageFormat) -> Any:
    """Read data from *path*.  Returns ``None`` on failure."""
    ...

delete(path) abstractmethod

Delete the resource at path. Returns True on success.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def delete(self, path: str) -> bool:
    """Delete the resource at *path*.  Returns ``True`` on success."""
    ...

list_objects(prefix='') abstractmethod

List object paths under prefix.

Returns a list of relative paths. Empty list on failure or when no objects match.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def list_objects(self, prefix: str = "") -> list[str]:
    """List object paths under *prefix*.

    Returns a list of relative paths.  Empty list on failure or when
    no objects match.
    """
    ...

exists(path) abstractmethod

Return True if path exists in the backend.

Source code in packages/dataenginex/src/dataenginex/core/medallion_architecture.py
@abstractmethod
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in the backend."""
    ...

CatalogEntry dataclass

Metadata about a single dataset in the lakehouse.

Attributes:

Name Type Description
name str

Unique dataset name.

layer str

Medallion layer ("bronze", "silver", "gold").

format str

Storage format ("parquet", "json", "delta").

location str

File path or table reference.

record_count int

Approximate number of records.

schema_fields list[str]

List of column/field names.

description str

Human-readable dataset description.

owner str

Team or user responsible for the dataset.

tags list[str]

Arbitrary labels for discovery.

created_at datetime

When the dataset was first registered.

updated_at datetime

When the entry was last updated.

metadata dict[str, Any]

Free-form context dict.

version int

Auto-incremented version counter.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
@dataclass
class CatalogEntry:
    """Metadata about a single dataset in the lakehouse.

    Attributes:
        name: Unique dataset name.
        layer: Medallion layer (``"bronze"``, ``"silver"``, ``"gold"``).
        format: Storage format (``"parquet"``, ``"json"``, ``"delta"``).
        location: File path or table reference.
        record_count: Approximate number of records.
        schema_fields: List of column/field names.
        description: Human-readable dataset description.
        owner: Team or user responsible for the dataset.
        tags: Arbitrary labels for discovery.
        created_at: When the dataset was first registered.
        updated_at: When the entry was last updated.
        metadata: Free-form context dict.
        version: Auto-incremented version counter.
    """

    name: str
    layer: str  # "bronze", "silver", "gold"
    format: str  # "parquet", "json", "delta"
    location: str  # file path or table ref
    record_count: int = 0
    schema_fields: list[str] = field(default_factory=list)
    description: str = ""
    owner: str = ""
    tags: list[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    updated_at: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
    metadata: dict[str, Any] = field(default_factory=dict)
    version: int = 1

    def to_dict(self) -> dict[str, Any]:
        """Serialize the catalog entry to a plain dictionary."""
        d = asdict(self)
        d["created_at"] = self.created_at.isoformat()
        d["updated_at"] = self.updated_at.isoformat()
        return d

to_dict()

Serialize the catalog entry to a plain dictionary.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def to_dict(self) -> dict[str, Any]:
    """Serialize the catalog entry to a plain dictionary."""
    d = asdict(self)
    d["created_at"] = self.created_at.isoformat()
    d["updated_at"] = self.updated_at.isoformat()
    return d

DataCatalog

In-process data catalog backed by an optional JSON file.

Parameters

persist_path: When set, catalog entries are persisted to this JSON file.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
class DataCatalog:
    """In-process data catalog backed by an optional JSON file.

    Parameters
    ----------
    persist_path:
        When set, catalog entries are persisted to this JSON file.
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        self._entries: dict[str, CatalogEntry] = {}
        self._persist_path = Path(persist_path) if persist_path else None
        if self._persist_path and self._persist_path.exists():
            self._load()

    # -- public API ----------------------------------------------------------

    def register(self, entry: CatalogEntry) -> CatalogEntry:
        """Register or update a dataset entry."""
        existing = self._entries.get(entry.name)
        if existing:
            entry.version = existing.version + 1
            entry.created_at = existing.created_at
        entry.updated_at = datetime.now(tz=UTC)
        self._entries[entry.name] = entry
        logger.info(
            "Catalog registered: %s (layer=%s, v%d)",
            entry.name,
            entry.layer,
            entry.version,
        )
        self._save()
        return entry

    def get(self, name: str) -> CatalogEntry | None:
        """Retrieve an entry by name."""
        return self._entries.get(name)

    def search(
        self,
        *,
        layer: str | None = None,
        tags: list[str] | None = None,
        owner: str | None = None,
        name_contains: str | None = None,
    ) -> list[CatalogEntry]:
        """Search entries by criteria."""
        results = list(self._entries.values())
        if layer:
            results = [e for e in results if e.layer == layer]
        if tags:
            tag_set = set(tags)
            results = [e for e in results if tag_set.issubset(set(e.tags))]
        if owner:
            results = [e for e in results if e.owner == owner]
        if name_contains:
            results = [e for e in results if name_contains.lower() in e.name.lower()]
        return results

    def list_all(self) -> list[CatalogEntry]:
        """Return all catalog entries."""
        return list(self._entries.values())

    def delete(self, name: str) -> bool:
        """Remove an entry by name."""
        if name in self._entries:
            del self._entries[name]
            self._save()
            return True
        return False

    def summary(self) -> dict[str, Any]:
        """High-level catalog statistics."""
        layers: dict[str, int] = {}
        formats: dict[str, int] = {}
        for e in self._entries.values():
            layers[e.layer] = layers.get(e.layer, 0) + 1
            formats[e.format] = formats.get(e.format, 0) + 1
        return {
            "total_datasets": len(self._entries),
            "by_layer": layers,
            "by_format": formats,
        }

    # -- persistence ---------------------------------------------------------

    def _save(self) -> None:
        if not self._persist_path:
            return
        self._persist_path.parent.mkdir(parents=True, exist_ok=True)
        data = [e.to_dict() for e in self._entries.values()]
        self._persist_path.write_text(json.dumps(data, indent=2, default=str))

    def _load(self) -> None:
        if not self._persist_path or not self._persist_path.exists():
            return
        raw = json.loads(self._persist_path.read_text())
        for item in raw:
            item.pop("created_at", None)
            item.pop("updated_at", None)
            entry = CatalogEntry(**item)
            self._entries[entry.name] = entry
        logger.info("Loaded %d catalog entries from %s", len(self._entries), self._persist_path)

register(entry)

Register or update a dataset entry.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def register(self, entry: CatalogEntry) -> CatalogEntry:
    """Register or update a dataset entry."""
    existing = self._entries.get(entry.name)
    if existing:
        entry.version = existing.version + 1
        entry.created_at = existing.created_at
    entry.updated_at = datetime.now(tz=UTC)
    self._entries[entry.name] = entry
    logger.info(
        "Catalog registered: %s (layer=%s, v%d)",
        entry.name,
        entry.layer,
        entry.version,
    )
    self._save()
    return entry

get(name)

Retrieve an entry by name.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def get(self, name: str) -> CatalogEntry | None:
    """Retrieve an entry by name."""
    return self._entries.get(name)

search(*, layer=None, tags=None, owner=None, name_contains=None)

Search entries by criteria.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def search(
    self,
    *,
    layer: str | None = None,
    tags: list[str] | None = None,
    owner: str | None = None,
    name_contains: str | None = None,
) -> list[CatalogEntry]:
    """Search entries by criteria."""
    results = list(self._entries.values())
    if layer:
        results = [e for e in results if e.layer == layer]
    if tags:
        tag_set = set(tags)
        results = [e for e in results if tag_set.issubset(set(e.tags))]
    if owner:
        results = [e for e in results if e.owner == owner]
    if name_contains:
        results = [e for e in results if name_contains.lower() in e.name.lower()]
    return results

list_all()

Return all catalog entries.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def list_all(self) -> list[CatalogEntry]:
    """Return all catalog entries."""
    return list(self._entries.values())

delete(name)

Remove an entry by name.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def delete(self, name: str) -> bool:
    """Remove an entry by name."""
    if name in self._entries:
        del self._entries[name]
        self._save()
        return True
    return False

summary()

High-level catalog statistics.

Source code in packages/dataenginex/src/dataenginex/lakehouse/catalog.py
def summary(self) -> dict[str, Any]:
    """High-level catalog statistics."""
    layers: dict[str, int] = {}
    formats: dict[str, int] = {}
    for e in self._entries.values():
        layers[e.layer] = layers.get(e.layer, 0) + 1
        formats[e.format] = formats.get(e.format, 0) + 1
    return {
        "total_datasets": len(self._entries),
        "by_layer": layers,
        "by_format": formats,
    }

DatePartitioner

Bases: PartitionStrategy

Partition by a date field using year=…/month=…/day=… layout.

Parameters

date_field: Name of the record field containing a date/datetime value. granularity: "day" (default), "month", or "year".

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
class DatePartitioner(PartitionStrategy):
    """Partition by a date field using ``year=…/month=…/day=…`` layout.

    Parameters
    ----------
    date_field:
        Name of the record field containing a date/datetime value.
    granularity:
        ``"day"`` (default), ``"month"``, or ``"year"``.
    """

    def __init__(self, date_field: str = "created_at", granularity: str = "day") -> None:
        self.date_field = date_field
        if granularity not in ("day", "month", "year"):
            raise ValueError(f"granularity must be day/month/year, got {granularity!r}")
        self.granularity = granularity

    def partition_key(self, record: dict[str, Any]) -> str:
        """Return a ``year=.../month=.../day=...`` key for *record*."""
        dt = self._extract_date(record)
        parts = [f"year={dt.year}"]
        if self.granularity in ("month", "day"):
            parts.append(f"month={dt.month:02d}")
        if self.granularity == "day":
            parts.append(f"day={dt.day:02d}")
        return "/".join(parts)

    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return *base* joined with the date partition key."""
        key = self.partition_key(record)
        return f"{base}/{key}" if base else key

    def _extract_date(self, record: dict[str, Any]) -> datetime:
        value = record.get(self.date_field)
        if isinstance(value, datetime):
            return value
        if isinstance(value, str):
            # Try ISO format
            try:
                return datetime.fromisoformat(value)
            except ValueError:
                pass
        # Fallback to now
        return datetime.now(tz=UTC)

partition_key(record)

Return a year=.../month=.../day=... key for record.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
def partition_key(self, record: dict[str, Any]) -> str:
    """Return a ``year=.../month=.../day=...`` key for *record*."""
    dt = self._extract_date(record)
    parts = [f"year={dt.year}"]
    if self.granularity in ("month", "day"):
        parts.append(f"month={dt.month:02d}")
    if self.granularity == "day":
        parts.append(f"day={dt.day:02d}")
    return "/".join(parts)

partition_path(record, base='')

Return base joined with the date partition key.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return *base* joined with the date partition key."""
    key = self.partition_key(record)
    return f"{base}/{key}" if base else key

HashPartitioner

Bases: PartitionStrategy

Partition by a hash of one or more fields, distributing across n_buckets.

Parameters

fields: Record fields whose values are hashed. n_buckets: Number of hash buckets (directories).

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
class HashPartitioner(PartitionStrategy):
    """Partition by a hash of one or more fields, distributing across *n_buckets*.

    Parameters
    ----------
    fields:
        Record fields whose values are hashed.
    n_buckets:
        Number of hash buckets (directories).
    """

    def __init__(self, fields: list[str], n_buckets: int = 16) -> None:
        if not fields:
            raise ValueError("At least one field is required for hash partitioning")
        self.fields = fields
        self.n_buckets = max(1, n_buckets)

    def partition_key(self, record: dict[str, Any]) -> str:
        """Return a ``bucket=NNNN`` key based on hashed field values."""
        content = "|".join(str(record.get(f, "")) for f in self.fields)
        digest = hashlib.md5(content.encode()).hexdigest()  # noqa: S324
        bucket = int(digest, 16) % self.n_buckets
        return f"bucket={bucket:04d}"

    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return *base* joined with the hash-bucket partition key."""
        key = self.partition_key(record)
        return f"{base}/{key}" if base else key

partition_key(record)

Return a bucket=NNNN key based on hashed field values.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
def partition_key(self, record: dict[str, Any]) -> str:
    """Return a ``bucket=NNNN`` key based on hashed field values."""
    content = "|".join(str(record.get(f, "")) for f in self.fields)
    digest = hashlib.md5(content.encode()).hexdigest()  # noqa: S324
    bucket = int(digest, 16) % self.n_buckets
    return f"bucket={bucket:04d}"

partition_path(record, base='')

Return base joined with the hash-bucket partition key.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return *base* joined with the hash-bucket partition key."""
    key = self.partition_key(record)
    return f"{base}/{key}" if base else key

PartitionStrategy

Bases: ABC

Base class for partitioning strategies.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
class PartitionStrategy(ABC):
    """Base class for partitioning strategies."""

    @abstractmethod
    def partition_key(self, record: dict[str, Any]) -> str:
        """Return the partition path segment for *record*."""
        ...

    @abstractmethod
    def partition_path(self, record: dict[str, Any], base: str = "") -> str:
        """Return the full relative path (base + partition) for *record*."""
        ...

partition_key(record) abstractmethod

Return the partition path segment for record.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
@abstractmethod
def partition_key(self, record: dict[str, Any]) -> str:
    """Return the partition path segment for *record*."""
    ...

partition_path(record, base='') abstractmethod

Return the full relative path (base + partition) for record.

Source code in packages/dataenginex/src/dataenginex/lakehouse/partitioning.py
@abstractmethod
def partition_path(self, record: dict[str, Any], base: str = "") -> str:
    """Return the full relative path (base + partition) for *record*."""
    ...

GCSStorage

Bases: StorageBackend

Google Cloud Storage backend.

Reads/writes JSON-serialised records to a GCS bucket. Requires google-cloud-storage at runtime.

Parameters

bucket: GCS bucket name. prefix: Key prefix for all objects (default ""). project: GCP project ID (optional, uses ADC default). api_endpoint: Custom API endpoint for GCS-compatible services (e.g. fake-gcs-server). When None, the default Google endpoint is used.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
class GCSStorage(StorageBackend):
    """Google Cloud Storage backend.

    Reads/writes JSON-serialised records to a GCS bucket.  Requires
    ``google-cloud-storage`` at runtime.

    Parameters
    ----------
    bucket:
        GCS bucket name.
    prefix:
        Key prefix for all objects (default ``""``).
    project:
        GCP project ID (optional, uses ADC default).
    api_endpoint:
        Custom API endpoint for GCS-compatible services (e.g.
        ``fake-gcs-server``).  When ``None``, the default Google
        endpoint is used.
    """

    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        project: str | None = None,
        api_endpoint: str | None = None,
    ) -> None:
        self.bucket_name = bucket
        self.prefix = prefix.rstrip("/")

        if not _HAS_GCS:
            logger.warning("google-cloud-storage not installed — GCSStorage operations will fail")
            self._bucket = None
        else:
            from google.auth import credentials as ga_credentials

            if api_endpoint:
                # Use anonymous credentials for local emulators
                anon = ga_credentials.AnonymousCredentials()  # type: ignore[no-untyped-call]
                client = gcs_storage.Client(
                    project=project or "test-project",
                    credentials=anon,
                )
                client._connection.API_BASE_URL = api_endpoint
            else:
                client = gcs_storage.Client(project=project)
            self._bucket = client.bucket(bucket)
            logger.info("GCSStorage initialised: gs://%s/%s", bucket, prefix)

    def _blob_name(self, path: str) -> str:
        return f"{self.prefix}/{path}" if self.prefix else path

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write JSON-serialised data to GCS."""
        if self._bucket is None:
            logger.error("GCSStorage: google-cloud-storage not available")
            return False
        try:
            records = data if isinstance(data, list) else [data]
            body = json.dumps(records, default=str)
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            blob.upload_from_string(body, content_type="application/json")
            logger.info(
                "Wrote %d records to gs://%s/%s",
                len(records),
                self.bucket_name,
                blob.name,
            )
            return True
        except Exception as exc:
            logger.error("GCSStorage write failed: %s", exc)
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read JSON data from GCS."""
        if self._bucket is None:
            logger.error("GCSStorage: google-cloud-storage not available")
            return None
        try:
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            body = blob.download_as_text()
            return json.loads(body)
        except Exception as exc:
            logger.error("GCSStorage read failed: %s", exc)
            return None

    def delete(self, path: str) -> bool:
        """Delete object from GCS."""
        if self._bucket is None:
            logger.error("GCSStorage: google-cloud-storage not available")
            return False
        try:
            blob = self._bucket.blob(self._blob_name(f"{path}.json"))
            blob.delete()
            logger.info("Deleted gs://%s/%s", self.bucket_name, blob.name)
            return True
        except Exception as exc:
            logger.error("GCSStorage delete failed: %s", exc)
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List GCS objects under *prefix*."""
        if self._bucket is None:
            return []
        try:
            full_prefix = self._blob_name(prefix)
            return [blob.name for blob in self._bucket.list_blobs(prefix=full_prefix)]
        except Exception as exc:
            logger.error("GCSStorage list_objects failed: %s", exc)
            return []

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in GCS."""
        if self._bucket is None:
            return False
        try:
            result: bool = self._bucket.blob(self._blob_name(f"{path}.json")).exists()
            return result
        except Exception:
            return False

write(data, path, format=StorageFormat.PARQUET)

Write JSON-serialised data to GCS.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write JSON-serialised data to GCS."""
    if self._bucket is None:
        logger.error("GCSStorage: google-cloud-storage not available")
        return False
    try:
        records = data if isinstance(data, list) else [data]
        body = json.dumps(records, default=str)
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        blob.upload_from_string(body, content_type="application/json")
        logger.info(
            "Wrote %d records to gs://%s/%s",
            len(records),
            self.bucket_name,
            blob.name,
        )
        return True
    except Exception as exc:
        logger.error("GCSStorage write failed: %s", exc)
        return False

read(path, format=StorageFormat.PARQUET)

Read JSON data from GCS.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read JSON data from GCS."""
    if self._bucket is None:
        logger.error("GCSStorage: google-cloud-storage not available")
        return None
    try:
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        body = blob.download_as_text()
        return json.loads(body)
    except Exception as exc:
        logger.error("GCSStorage read failed: %s", exc)
        return None

delete(path)

Delete object from GCS.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def delete(self, path: str) -> bool:
    """Delete object from GCS."""
    if self._bucket is None:
        logger.error("GCSStorage: google-cloud-storage not available")
        return False
    try:
        blob = self._bucket.blob(self._blob_name(f"{path}.json"))
        blob.delete()
        logger.info("Deleted gs://%s/%s", self.bucket_name, blob.name)
        return True
    except Exception as exc:
        logger.error("GCSStorage delete failed: %s", exc)
        return False

list_objects(prefix='')

List GCS objects under prefix.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List GCS objects under *prefix*."""
    if self._bucket is None:
        return []
    try:
        full_prefix = self._blob_name(prefix)
        return [blob.name for blob in self._bucket.list_blobs(prefix=full_prefix)]
    except Exception as exc:
        logger.error("GCSStorage list_objects failed: %s", exc)
        return []

exists(path)

Return True if path exists in GCS.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in GCS."""
    if self._bucket is None:
        return False
    try:
        result: bool = self._bucket.blob(self._blob_name(f"{path}.json")).exists()
        return result
    except Exception:
        return False

JsonStorage

Bases: StorageBackend

Simple JSON-file storage for development and testing.

Each write call serialises data (list of dicts) as a JSON array.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
class JsonStorage(StorageBackend):
    """Simple JSON-file storage for development and testing.

    Each ``write`` call serialises *data* (list of dicts) as a JSON array.
    """

    def __init__(self, base_path: str = "data") -> None:
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        logger.info("JsonStorage initialised at %s", self.base_path)

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Serialize *data* as JSON and write to *path*."""
        try:
            full = self.base_path / f"{path}.json"
            full.parent.mkdir(parents=True, exist_ok=True)
            records = self._normalise(data)
            full.write_text(json.dumps(records, indent=2, default=str))
            logger.info("Wrote %d records to %s", len(records), full)
            return True
        except Exception as exc:
            logger.error("JsonStorage write failed: %s", exc)
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read and deserialize a JSON file at *path*."""
        try:
            full = self.base_path / f"{path}.json"
            if not full.exists():
                logger.warning("File not found: %s", full)
                return None
            return json.loads(full.read_text())
        except Exception as exc:
            logger.error("JsonStorage read failed: %s", exc)
            return None

    def delete(self, path: str) -> bool:
        """Delete the JSON file at *path* if it exists."""
        try:
            full = self.base_path / f"{path}.json"
            if full.exists():
                full.unlink()
                logger.info("Deleted %s", full)
            return True
        except Exception as exc:
            logger.error("JsonStorage delete failed: %s", exc)
            return False

    @staticmethod
    def _normalise(data: Any) -> list[dict[str, Any]]:
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return [{"value": data}]

    def list_objects(self, prefix: str = "") -> list[str]:
        """List JSON entries under *prefix*."""
        target = self.base_path / prefix
        if not target.exists():
            return []
        return [
            str(p.relative_to(self.base_path).with_suffix(""))
            for p in target.rglob("*.json")
            if p.is_file()
        ]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* has a corresponding JSON file."""
        return (self.base_path / f"{path}.json").exists()

write(data, path, format=StorageFormat.PARQUET)

Serialize data as JSON and write to path.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Serialize *data* as JSON and write to *path*."""
    try:
        full = self.base_path / f"{path}.json"
        full.parent.mkdir(parents=True, exist_ok=True)
        records = self._normalise(data)
        full.write_text(json.dumps(records, indent=2, default=str))
        logger.info("Wrote %d records to %s", len(records), full)
        return True
    except Exception as exc:
        logger.error("JsonStorage write failed: %s", exc)
        return False

read(path, format=StorageFormat.PARQUET)

Read and deserialize a JSON file at path.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read and deserialize a JSON file at *path*."""
    try:
        full = self.base_path / f"{path}.json"
        if not full.exists():
            logger.warning("File not found: %s", full)
            return None
        return json.loads(full.read_text())
    except Exception as exc:
        logger.error("JsonStorage read failed: %s", exc)
        return None

delete(path)

Delete the JSON file at path if it exists.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def delete(self, path: str) -> bool:
    """Delete the JSON file at *path* if it exists."""
    try:
        full = self.base_path / f"{path}.json"
        if full.exists():
            full.unlink()
            logger.info("Deleted %s", full)
        return True
    except Exception as exc:
        logger.error("JsonStorage delete failed: %s", exc)
        return False

list_objects(prefix='')

List JSON entries under prefix.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List JSON entries under *prefix*."""
    target = self.base_path / prefix
    if not target.exists():
        return []
    return [
        str(p.relative_to(self.base_path).with_suffix(""))
        for p in target.rglob("*.json")
        if p.is_file()
    ]

exists(path)

Return True if path has a corresponding JSON file.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* has a corresponding JSON file."""
    return (self.base_path / f"{path}.json").exists()

ParquetStorage

Bases: StorageBackend

Parquet file storage backed by pyarrow.

Falls back to JsonStorage when pyarrow is not installed.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
class ParquetStorage(StorageBackend):
    """Parquet file storage backed by *pyarrow*.

    Falls back to ``JsonStorage`` when *pyarrow* is not installed.
    """

    def __init__(self, base_path: str = "data", compression: str = "snappy") -> None:
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        self.compression = compression

        if _HAS_PYARROW:
            logger.info("ParquetStorage initialised at %s (pyarrow available)", self.base_path)
        else:
            logger.warning("pyarrow not installed — ParquetStorage will use JSON fallback")
            self._fallback = JsonStorage(str(self.base_path))

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write *data* as a Parquet file at *path*."""
        if not _HAS_PYARROW:
            return self._fallback.write(data, path, format)

        try:
            full = self.base_path / f"{path}.parquet"
            full.parent.mkdir(parents=True, exist_ok=True)
            records = self._to_records(data)
            if not records:
                logger.warning("No records to write to %s", full)
                return False
            table = pa.Table.from_pylist(records)
            pq.write_table(table, str(full), compression=self.compression)
            logger.info("Wrote %d records to %s", len(records), full)
            return True
        except Exception as exc:
            logger.error("ParquetStorage write failed: %s", exc)
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read a Parquet file at *path* and return records."""
        if not _HAS_PYARROW:
            return self._fallback.read(path, format)

        try:
            full = self.base_path / f"{path}.parquet"
            if not full.exists():
                logger.warning("Parquet file not found: %s", full)
                return None
            table = pq.read_table(str(full))
            return table.to_pylist()
        except Exception as exc:
            logger.error("ParquetStorage read failed: %s", exc)
            return None

    def delete(self, path: str) -> bool:
        """Delete the Parquet file at *path* if it exists."""
        if not _HAS_PYARROW:
            return self._fallback.delete(path)

        try:
            full = self.base_path / f"{path}.parquet"
            if full.exists():
                full.unlink()
                logger.info("Deleted %s", full)
            return True
        except Exception as exc:
            logger.error("ParquetStorage delete failed: %s", exc)
            return False

    @staticmethod
    def _to_records(data: Any) -> list[dict[str, Any]]:
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []

    def list_objects(self, prefix: str = "") -> list[str]:
        """List Parquet entries under *prefix*."""
        if not _HAS_PYARROW:
            return self._fallback.list_objects(prefix)
        target = self.base_path / prefix
        if not target.exists():
            return []
        return [
            str(p.relative_to(self.base_path).with_suffix(""))
            for p in target.rglob("*.parquet")
            if p.is_file()
        ]

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* has a corresponding Parquet file."""
        if not _HAS_PYARROW:
            return self._fallback.exists(path)
        return (self.base_path / f"{path}.parquet").exists()

write(data, path, format=StorageFormat.PARQUET)

Write data as a Parquet file at path.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write *data* as a Parquet file at *path*."""
    if not _HAS_PYARROW:
        return self._fallback.write(data, path, format)

    try:
        full = self.base_path / f"{path}.parquet"
        full.parent.mkdir(parents=True, exist_ok=True)
        records = self._to_records(data)
        if not records:
            logger.warning("No records to write to %s", full)
            return False
        table = pa.Table.from_pylist(records)
        pq.write_table(table, str(full), compression=self.compression)
        logger.info("Wrote %d records to %s", len(records), full)
        return True
    except Exception as exc:
        logger.error("ParquetStorage write failed: %s", exc)
        return False

read(path, format=StorageFormat.PARQUET)

Read a Parquet file at path and return records.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read a Parquet file at *path* and return records."""
    if not _HAS_PYARROW:
        return self._fallback.read(path, format)

    try:
        full = self.base_path / f"{path}.parquet"
        if not full.exists():
            logger.warning("Parquet file not found: %s", full)
            return None
        table = pq.read_table(str(full))
        return table.to_pylist()
    except Exception as exc:
        logger.error("ParquetStorage read failed: %s", exc)
        return None

delete(path)

Delete the Parquet file at path if it exists.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def delete(self, path: str) -> bool:
    """Delete the Parquet file at *path* if it exists."""
    if not _HAS_PYARROW:
        return self._fallback.delete(path)

    try:
        full = self.base_path / f"{path}.parquet"
        if full.exists():
            full.unlink()
            logger.info("Deleted %s", full)
        return True
    except Exception as exc:
        logger.error("ParquetStorage delete failed: %s", exc)
        return False

list_objects(prefix='')

List Parquet entries under prefix.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List Parquet entries under *prefix*."""
    if not _HAS_PYARROW:
        return self._fallback.list_objects(prefix)
    target = self.base_path / prefix
    if not target.exists():
        return []
    return [
        str(p.relative_to(self.base_path).with_suffix(""))
        for p in target.rglob("*.parquet")
        if p.is_file()
    ]

exists(path)

Return True if path has a corresponding Parquet file.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* has a corresponding Parquet file."""
    if not _HAS_PYARROW:
        return self._fallback.exists(path)
    return (self.base_path / f"{path}.parquet").exists()

S3Storage

Bases: StorageBackend

AWS S3 object storage backend.

Reads/writes JSON-serialised records to an S3 bucket. Requires boto3 at runtime.

Parameters

bucket: S3 bucket name. prefix: Key prefix for all objects (default ""). region: AWS region (default "us-east-1"). endpoint_url: Custom endpoint for S3-compatible services (e.g. LocalStack). When None, the default AWS endpoint is used.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
class S3Storage(StorageBackend):
    """AWS S3 object storage backend.

    Reads/writes JSON-serialised records to an S3 bucket.  Requires
    ``boto3`` at runtime.

    Parameters
    ----------
    bucket:
        S3 bucket name.
    prefix:
        Key prefix for all objects (default ``""``).
    region:
        AWS region (default ``"us-east-1"``).
    endpoint_url:
        Custom endpoint for S3-compatible services (e.g. LocalStack).
        When ``None``, the default AWS endpoint is used.
    """

    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        region: str = "us-east-1",
        endpoint_url: str | None = None,
    ) -> None:
        self.bucket = bucket
        self.prefix = prefix.rstrip("/")
        self.region = region
        self.endpoint_url = endpoint_url

        if not _HAS_BOTO3:
            logger.warning("boto3 not installed — S3Storage operations will fail")
            self._client = None
        else:
            client_kwargs: dict[str, Any] = {"region_name": region}
            if endpoint_url:
                client_kwargs["endpoint_url"] = endpoint_url
            self._client = boto3.client("s3", **client_kwargs)
            logger.info("S3Storage initialised: s3://%s/%s", bucket, prefix)

    def _key(self, path: str) -> str:
        return f"{self.prefix}/{path}" if self.prefix else path

    def write(
        self,
        data: Any,
        path: str,
        format: StorageFormat = StorageFormat.PARQUET,
    ) -> bool:
        """Write JSON-serialised data to S3."""
        if self._client is None:
            logger.error("S3Storage: boto3 not available")
            return False
        try:
            records = data if isinstance(data, list) else [data]
            body = json.dumps(records, default=str).encode()
            key = self._key(f"{path}.json")
            self._client.put_object(Bucket=self.bucket, Key=key, Body=body)
            logger.info("Wrote %d records to s3://%s/%s", len(records), self.bucket, key)
            return True
        except Exception as exc:
            logger.error("S3Storage write failed: %s", exc)
            return False

    def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
        """Read JSON data from S3."""
        if self._client is None:
            logger.error("S3Storage: boto3 not available")
            return None
        try:
            key = self._key(f"{path}.json")
            response = self._client.get_object(Bucket=self.bucket, Key=key)
            body = response["Body"].read().decode()
            return json.loads(body)
        except Exception as exc:
            logger.error("S3Storage read failed: %s", exc)
            return None

    def delete(self, path: str) -> bool:
        """Delete object from S3."""
        if self._client is None:
            logger.error("S3Storage: boto3 not available")
            return False
        try:
            key = self._key(f"{path}.json")
            self._client.delete_object(Bucket=self.bucket, Key=key)
            logger.info("Deleted s3://%s/%s", self.bucket, key)
            return True
        except Exception as exc:
            logger.error("S3Storage delete failed: %s", exc)
            return False

    def list_objects(self, prefix: str = "") -> list[str]:
        """List S3 objects under *prefix*."""
        if self._client is None:
            return []
        try:
            full_prefix = self._key(prefix)
            resp = self._client.list_objects_v2(Bucket=self.bucket, Prefix=full_prefix)
            return [obj["Key"] for obj in resp.get("Contents", [])]
        except Exception as exc:
            logger.error("S3Storage list_objects failed: %s", exc)
            return []

    def exists(self, path: str) -> bool:
        """Return ``True`` if *path* exists in S3."""
        if self._client is None:
            return False
        try:
            self._client.head_object(Bucket=self.bucket, Key=self._key(f"{path}.json"))
            return True
        except Exception:
            return False

write(data, path, format=StorageFormat.PARQUET)

Write JSON-serialised data to S3.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def write(
    self,
    data: Any,
    path: str,
    format: StorageFormat = StorageFormat.PARQUET,
) -> bool:
    """Write JSON-serialised data to S3."""
    if self._client is None:
        logger.error("S3Storage: boto3 not available")
        return False
    try:
        records = data if isinstance(data, list) else [data]
        body = json.dumps(records, default=str).encode()
        key = self._key(f"{path}.json")
        self._client.put_object(Bucket=self.bucket, Key=key, Body=body)
        logger.info("Wrote %d records to s3://%s/%s", len(records), self.bucket, key)
        return True
    except Exception as exc:
        logger.error("S3Storage write failed: %s", exc)
        return False

read(path, format=StorageFormat.PARQUET)

Read JSON data from S3.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def read(self, path: str, format: StorageFormat = StorageFormat.PARQUET) -> Any:
    """Read JSON data from S3."""
    if self._client is None:
        logger.error("S3Storage: boto3 not available")
        return None
    try:
        key = self._key(f"{path}.json")
        response = self._client.get_object(Bucket=self.bucket, Key=key)
        body = response["Body"].read().decode()
        return json.loads(body)
    except Exception as exc:
        logger.error("S3Storage read failed: %s", exc)
        return None

delete(path)

Delete object from S3.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def delete(self, path: str) -> bool:
    """Delete object from S3."""
    if self._client is None:
        logger.error("S3Storage: boto3 not available")
        return False
    try:
        key = self._key(f"{path}.json")
        self._client.delete_object(Bucket=self.bucket, Key=key)
        logger.info("Deleted s3://%s/%s", self.bucket, key)
        return True
    except Exception as exc:
        logger.error("S3Storage delete failed: %s", exc)
        return False

list_objects(prefix='')

List S3 objects under prefix.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def list_objects(self, prefix: str = "") -> list[str]:
    """List S3 objects under *prefix*."""
    if self._client is None:
        return []
    try:
        full_prefix = self._key(prefix)
        resp = self._client.list_objects_v2(Bucket=self.bucket, Prefix=full_prefix)
        return [obj["Key"] for obj in resp.get("Contents", [])]
    except Exception as exc:
        logger.error("S3Storage list_objects failed: %s", exc)
        return []

exists(path)

Return True if path exists in S3.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def exists(self, path: str) -> bool:
    """Return ``True`` if *path* exists in S3."""
    if self._client is None:
        return False
    try:
        self._client.head_object(Bucket=self.bucket, Key=self._key(f"{path}.json"))
        return True
    except Exception:
        return False

get_storage(uri, **kwargs)

Create a :class:StorageBackend from a URI scheme.

Supported schemes:

  • file:// (or no scheme) → :class:JsonStorage
  • s3://bucket/prefix → :class:S3Storage
  • gs://bucket/prefix → :class:GCSStorage

Extra kwargs are forwarded to the backend constructor.

Raises

ValueError If the URI scheme is not supported.

Source code in packages/dataenginex/src/dataenginex/lakehouse/storage.py
def get_storage(uri: str, **kwargs: Any) -> StorageBackend:
    """Create a :class:`StorageBackend` from a URI scheme.

    Supported schemes:

    * ``file://`` (or no scheme) → :class:`JsonStorage`
    * ``s3://bucket/prefix``     → :class:`S3Storage`
    * ``gs://bucket/prefix``     → :class:`GCSStorage`

    Extra *kwargs* are forwarded to the backend constructor.

    Raises
    ------
    ValueError
        If the URI scheme is not supported.
    """
    from urllib.parse import urlparse

    parsed = urlparse(uri)
    if parsed.scheme in ("", "file"):
        path = parsed.path or "data"
        return JsonStorage(base_path=path, **kwargs)
    if parsed.scheme == "s3":
        return S3Storage(
            bucket=parsed.netloc,
            prefix=parsed.path.lstrip("/"),
            **kwargs,
        )
    if parsed.scheme == "gs":
        return GCSStorage(
            bucket=parsed.netloc,
            prefix=parsed.path.lstrip("/"),
            **kwargs,
        )
    msg = f"Unsupported storage URI scheme: {parsed.scheme!r}"
    raise ValueError(msg)