Skip to content

dataenginex.api

FastAPI components — auth, health checks, error handling, pagination, rate limiting.

dataenginex.api

Reusable API components — auth, health, errors, pagination, rate limiting, quality.

Public API::

from dataenginex.api import (
    HealthChecker, HealthStatus, ComponentHealth,
    APIHTTPException, BadRequestError, NotFoundError, ServiceUnavailableError,
    PaginatedResponse, paginate,
    AuthMiddleware, AuthUser, create_token, decode_token,
    RateLimiter, RateLimitMiddleware,
    get_quality_store, set_quality_store,
)

AuthMiddleware

Bases: BaseHTTPMiddleware

Starlette middleware that enforces JWT auth when enabled.

When DEX_AUTH_ENABLED is "true" (case-insensitive), every request to a non-public path must carry a valid Authorization: Bearer <token> header. The decoded claims are stored on request.state.auth_user.

Source code in packages/dataenginex/src/dataenginex/api/auth.py
class AuthMiddleware(BaseHTTPMiddleware):
    """Starlette middleware that enforces JWT auth when enabled.

    When ``DEX_AUTH_ENABLED`` is ``"true"`` (case-insensitive), every request
    to a non-public path must carry a valid ``Authorization: Bearer <token>``
    header. The decoded claims are stored on ``request.state.auth_user``.
    """

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        """Validate the bearer token and attach ``auth_user`` to request state."""
        enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
        if not enabled:
            return await call_next(request)

        # Skip public endpoints
        if request.url.path in _PUBLIC_PATHS:
            return await call_next(request)

        secret = os.getenv("DEX_JWT_SECRET", "")
        if not secret:
            logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
            return JSONResponse(
                status_code=500,
                content={"error": "auth_config_error", "message": "Auth secret not configured"},
            )

        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            return JSONResponse(
                status_code=401,
                content={"error": "unauthorized", "message": "Missing bearer token"},
            )

        token = auth_header[7:]
        try:
            claims = decode_token(token, secret)
        except ValueError:
            logger.exception("JWT validation failed")
            return JSONResponse(
                status_code=401,
                content={
                    "error": "unauthorized",
                    "message": "Invalid or expired authentication token",
                },
            )

        request.state.auth_user = AuthUser(
            sub=claims.get("sub", "anonymous"),
            roles=claims.get("roles", []),
            claims=claims,
        )
        return await call_next(request)

dispatch(request, call_next) async

Validate the bearer token and attach auth_user to request state.

Source code in packages/dataenginex/src/dataenginex/api/auth.py
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """Validate the bearer token and attach ``auth_user`` to request state."""
    enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
    if not enabled:
        return await call_next(request)

    # Skip public endpoints
    if request.url.path in _PUBLIC_PATHS:
        return await call_next(request)

    secret = os.getenv("DEX_JWT_SECRET", "")
    if not secret:
        logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
        return JSONResponse(
            status_code=500,
            content={"error": "auth_config_error", "message": "Auth secret not configured"},
        )

    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        return JSONResponse(
            status_code=401,
            content={"error": "unauthorized", "message": "Missing bearer token"},
        )

    token = auth_header[7:]
    try:
        claims = decode_token(token, secret)
    except ValueError:
        logger.exception("JWT validation failed")
        return JSONResponse(
            status_code=401,
            content={
                "error": "unauthorized",
                "message": "Invalid or expired authentication token",
            },
        )

    request.state.auth_user = AuthUser(
        sub=claims.get("sub", "anonymous"),
        roles=claims.get("roles", []),
        claims=claims,
    )
    return await call_next(request)

AuthUser dataclass

Resolved identity from a valid JWT.

Source code in packages/dataenginex/src/dataenginex/api/auth.py
@dataclass
class AuthUser:
    """Resolved identity from a valid JWT."""

    sub: str
    roles: list[str]
    claims: dict[str, Any]

APIHTTPException

Bases: HTTPException

Base HTTP exception with error code and details.

Source code in packages/dataenginex/src/dataenginex/api/errors.py
class APIHTTPException(HTTPException):
    """Base HTTP exception with error code and details."""

    def __init__(
        self,
        status_code: int,
        message: str,
        code: str = "api_error",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        self.code = code
        self.details = details
        super().__init__(status_code=status_code, detail=message)

BadRequestError

Bases: APIHTTPException

Raised for 400 validation or malformed requests.

Source code in packages/dataenginex/src/dataenginex/api/errors.py
class BadRequestError(APIHTTPException):
    """Raised for 400 validation or malformed requests."""

    def __init__(
        self,
        message: str = "Bad request",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_400_BAD_REQUEST,
            message=message,
            code="bad_request",
            details=details,
        )

NotFoundError

Bases: APIHTTPException

Raised for 404 not found errors.

Source code in packages/dataenginex/src/dataenginex/api/errors.py
class NotFoundError(APIHTTPException):
    """Raised for 404 not found errors."""

    def __init__(
        self,
        message: str = "Resource not found",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_404_NOT_FOUND,
            message=message,
            code="not_found",
            details=details,
        )

ServiceUnavailableError

Bases: APIHTTPException

Raised when a dependency is unavailable.

Source code in packages/dataenginex/src/dataenginex/api/errors.py
class ServiceUnavailableError(APIHTTPException):
    """Raised when a dependency is unavailable."""

    def __init__(
        self,
        message: str = "Service unavailable",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            message=message,
            code="service_unavailable",
            details=details,
        )

ComponentHealth dataclass

Health status of a single dependency component.

Attributes:

Name Type Description
name str

Component identifier (e.g. "database", "cache").

status HealthStatus

Current health status.

message str | None

Optional human-readable message.

duration_ms float | None

Time taken for the health check in milliseconds.

Source code in packages/dataenginex/src/dataenginex/api/health.py
@dataclass(frozen=True)
class ComponentHealth:
    """Health status of a single dependency component.

    Attributes:
        name: Component identifier (e.g. ``"database"``, ``"cache"``).
        status: Current health status.
        message: Optional human-readable message.
        duration_ms: Time taken for the health check in milliseconds.
    """

    name: str
    status: HealthStatus
    message: str | None = None
    duration_ms: float | None = None

    def to_dict(self) -> dict[str, object | None]:
        """Serialize to a plain dictionary suitable for JSON responses."""
        return {
            "name": self.name,
            "status": self.status.value,
            "message": self.message,
            "duration_ms": self.duration_ms,
        }

to_dict()

Serialize to a plain dictionary suitable for JSON responses.

Source code in packages/dataenginex/src/dataenginex/api/health.py
def to_dict(self) -> dict[str, object | None]:
    """Serialize to a plain dictionary suitable for JSON responses."""
    return {
        "name": self.name,
        "status": self.status.value,
        "message": self.message,
        "duration_ms": self.duration_ms,
    }

HealthChecker

Runs health checks for DEX dependencies.

Source code in packages/dataenginex/src/dataenginex/api/health.py
class HealthChecker:
    """Runs health checks for DEX dependencies."""

    def __init__(self, timeout_seconds: float = 1.0) -> None:
        self.timeout_seconds = timeout_seconds

    async def check_all(self) -> list[ComponentHealth]:
        """Run all dependency health checks concurrently."""
        return [
            await self.check_database(),
            await self.check_cache(),
            await self.check_external_api(),
        ]

    def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
        """Derive an aggregate status from individual component results."""
        if any(c.status == HealthStatus.UNHEALTHY for c in components):
            return HealthStatus.UNHEALTHY
        if any(c.status == HealthStatus.DEGRADED for c in components):
            return HealthStatus.DEGRADED
        return HealthStatus.HEALTHY

    async def check_database(self) -> ComponentHealth:
        """Check database connectivity via TCP probe."""
        host = os.getenv("DEX_DB_HOST")
        port = os.getenv("DEX_DB_PORT")
        if not host or not port:
            return ComponentHealth(
                name="database",
                status=HealthStatus.SKIPPED,
                message="database not configured",
            )

        start = time.perf_counter()
        ok, message = await self._tcp_check(host, int(port))
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="database",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def check_cache(self) -> ComponentHealth:
        """Check cache connectivity via TCP probe."""
        host = os.getenv("DEX_CACHE_HOST")
        port = os.getenv("DEX_CACHE_PORT")
        if not host or not port:
            return ComponentHealth(
                name="cache",
                status=HealthStatus.SKIPPED,
                message="cache not configured",
            )

        start = time.perf_counter()
        ok, message = await self._tcp_check(host, int(port))
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="cache",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def check_external_api(self) -> ComponentHealth:
        """Check external API reachability via HTTP GET."""
        url = os.getenv("DEX_EXTERNAL_API_URL")
        if not url:
            return ComponentHealth(
                name="external_api",
                status=HealthStatus.SKIPPED,
                message="external API not configured",
            )

        start = time.perf_counter()
        timeout = httpx.Timeout(self.timeout_seconds)
        try:
            async with httpx.AsyncClient(timeout=timeout) as client:
                response = await client.get(url)
            ok = response.status_code < 500
            message = f"status_code={response.status_code}"
        except httpx.HTTPError as exc:
            ok = False
            message = f"error={exc.__class__.__name__}"
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="external_api",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def _tcp_check(self, host: str, port: int) -> tuple[bool, str]:
        try:
            await asyncio.wait_for(
                asyncio.open_connection(host, port), timeout=self.timeout_seconds
            )
            return True, "reachable"
        except (TimeoutError, ConnectionRefusedError, OSError) as exc:
            return False, f"error={exc.__class__.__name__}"

check_all() async

Run all dependency health checks concurrently.

Source code in packages/dataenginex/src/dataenginex/api/health.py
async def check_all(self) -> list[ComponentHealth]:
    """Run all dependency health checks concurrently."""
    return [
        await self.check_database(),
        await self.check_cache(),
        await self.check_external_api(),
    ]

overall_status(components)

Derive an aggregate status from individual component results.

Source code in packages/dataenginex/src/dataenginex/api/health.py
def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
    """Derive an aggregate status from individual component results."""
    if any(c.status == HealthStatus.UNHEALTHY for c in components):
        return HealthStatus.UNHEALTHY
    if any(c.status == HealthStatus.DEGRADED for c in components):
        return HealthStatus.DEGRADED
    return HealthStatus.HEALTHY

check_database() async

Check database connectivity via TCP probe.

Source code in packages/dataenginex/src/dataenginex/api/health.py
async def check_database(self) -> ComponentHealth:
    """Check database connectivity via TCP probe."""
    host = os.getenv("DEX_DB_HOST")
    port = os.getenv("DEX_DB_PORT")
    if not host or not port:
        return ComponentHealth(
            name="database",
            status=HealthStatus.SKIPPED,
            message="database not configured",
        )

    start = time.perf_counter()
    ok, message = await self._tcp_check(host, int(port))
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="database",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

check_cache() async

Check cache connectivity via TCP probe.

Source code in packages/dataenginex/src/dataenginex/api/health.py
async def check_cache(self) -> ComponentHealth:
    """Check cache connectivity via TCP probe."""
    host = os.getenv("DEX_CACHE_HOST")
    port = os.getenv("DEX_CACHE_PORT")
    if not host or not port:
        return ComponentHealth(
            name="cache",
            status=HealthStatus.SKIPPED,
            message="cache not configured",
        )

    start = time.perf_counter()
    ok, message = await self._tcp_check(host, int(port))
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="cache",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

check_external_api() async

Check external API reachability via HTTP GET.

Source code in packages/dataenginex/src/dataenginex/api/health.py
async def check_external_api(self) -> ComponentHealth:
    """Check external API reachability via HTTP GET."""
    url = os.getenv("DEX_EXTERNAL_API_URL")
    if not url:
        return ComponentHealth(
            name="external_api",
            status=HealthStatus.SKIPPED,
            message="external API not configured",
        )

    start = time.perf_counter()
    timeout = httpx.Timeout(self.timeout_seconds)
    try:
        async with httpx.AsyncClient(timeout=timeout) as client:
            response = await client.get(url)
        ok = response.status_code < 500
        message = f"status_code={response.status_code}"
    except httpx.HTTPError as exc:
        ok = False
        message = f"error={exc.__class__.__name__}"
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="external_api",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

HealthStatus

Bases: StrEnum

Supported health statuses.

Source code in packages/dataenginex/src/dataenginex/api/health.py
class HealthStatus(StrEnum):
    """Supported health statuses."""

    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    SKIPPED = "skipped"

PaginatedResponse

Bases: BaseModel

Generic paginated response wrapper.

Source code in packages/dataenginex/src/dataenginex/api/pagination.py
class PaginatedResponse(BaseModel):
    """Generic paginated response wrapper."""

    data: list[Any] = Field(default_factory=list)
    pagination: PaginationMeta

PaginationMeta

Bases: BaseModel

Pagination metadata returned alongside results.

Source code in packages/dataenginex/src/dataenginex/api/pagination.py
class PaginationMeta(BaseModel):
    """Pagination metadata returned alongside results."""

    total: int = Field(description="Total number of items")
    limit: int = Field(description="Page size")
    has_next: bool = Field(description="Whether more items exist")
    next_cursor: str | None = Field(None, description="Opaque cursor for next page")
    has_previous: bool = Field(default=False, description="Whether previous items exist")

RateLimiter

In-memory token-bucket rate limiter.

Parameters

requests_per_minute: Sustained request rate per client. burst: Maximum instantaneous burst size.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
class RateLimiter:
    """In-memory token-bucket rate limiter.

    Parameters
    ----------
    requests_per_minute:
        Sustained request rate per client.
    burst:
        Maximum instantaneous burst size.
    """

    def __init__(self, requests_per_minute: int = 60, burst: int = 10) -> None:
        self.rpm = requests_per_minute
        self.burst = burst
        self._buckets: dict[str, _Bucket] = {}

    def allow(self, client_id: str) -> bool:
        """Return ``True`` if *client_id* has remaining tokens; consume one token."""
        bucket = self._buckets.get(client_id)
        if bucket is None:
            bucket = _Bucket(
                tokens=float(self.burst),
                last_refill=time.monotonic(),
                capacity=float(self.burst),
                refill_rate=self.rpm / 60.0,
            )
            self._buckets[client_id] = bucket
        return bucket.consume()

    def get_stats(self) -> dict[str, Any]:
        """Return current rate-limiter configuration and client count."""
        return {
            "rpm": self.rpm,
            "burst": self.burst,
            "active_clients": len(self._buckets),
        }

    def cleanup(self, max_age_seconds: float = 300.0) -> int:
        """Evict buckets idle for more than *max_age_seconds*."""
        now = time.monotonic()
        stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
        for k in stale:
            del self._buckets[k]
        return len(stale)

allow(client_id)

Return True if client_id has remaining tokens; consume one token.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
def allow(self, client_id: str) -> bool:
    """Return ``True`` if *client_id* has remaining tokens; consume one token."""
    bucket = self._buckets.get(client_id)
    if bucket is None:
        bucket = _Bucket(
            tokens=float(self.burst),
            last_refill=time.monotonic(),
            capacity=float(self.burst),
            refill_rate=self.rpm / 60.0,
        )
        self._buckets[client_id] = bucket
    return bucket.consume()

get_stats()

Return current rate-limiter configuration and client count.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
def get_stats(self) -> dict[str, Any]:
    """Return current rate-limiter configuration and client count."""
    return {
        "rpm": self.rpm,
        "burst": self.burst,
        "active_clients": len(self._buckets),
    }

cleanup(max_age_seconds=300.0)

Evict buckets idle for more than max_age_seconds.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
def cleanup(self, max_age_seconds: float = 300.0) -> int:
    """Evict buckets idle for more than *max_age_seconds*."""
    now = time.monotonic()
    stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
    for k in stale:
        del self._buckets[k]
    return len(stale)

RateLimitMiddleware

Bases: BaseHTTPMiddleware

Starlette middleware applying per-IP rate limiting.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
class RateLimitMiddleware(BaseHTTPMiddleware):
    """Starlette middleware applying per-IP rate limiting."""

    def __init__(self, app: Any) -> None:  # noqa: ANN401
        super().__init__(app)
        rpm = int(os.getenv("DEX_RATE_LIMIT_RPM", "60"))
        burst = int(os.getenv("DEX_RATE_LIMIT_BURST", "10"))
        self._limiter = RateLimiter(requests_per_minute=rpm, burst=burst)
        self._enabled = os.getenv("DEX_RATE_LIMIT_ENABLED", "false").lower() == "true"

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        """Enforce per-IP rate limits; return 429 when tokens are exhausted."""
        if not self._enabled:
            return await call_next(request)

        if request.url.path in _EXEMPT_PATHS:
            return await call_next(request)

        client_ip = request.client.host if request.client else "unknown"
        if not self._limiter.allow(client_ip):
            logger.warning("Rate limit exceeded for %s", client_ip)
            return JSONResponse(
                status_code=429,
                content={
                    "error": "rate_limit_exceeded",
                    "message": "Too many requests — please slow down",
                },
                headers={"Retry-After": "60"},
            )

        return await call_next(request)

dispatch(request, call_next) async

Enforce per-IP rate limits; return 429 when tokens are exhausted.

Source code in packages/dataenginex/src/dataenginex/api/rate_limit.py
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """Enforce per-IP rate limits; return 429 when tokens are exhausted."""
    if not self._enabled:
        return await call_next(request)

    if request.url.path in _EXEMPT_PATHS:
        return await call_next(request)

    client_ip = request.client.host if request.client else "unknown"
    if not self._limiter.allow(client_ip):
        logger.warning("Rate limit exceeded for %s", client_ip)
        return JSONResponse(
            status_code=429,
            content={
                "error": "rate_limit_exceeded",
                "message": "Too many requests — please slow down",
            },
            headers={"Retry-After": "60"},
        )

    return await call_next(request)

create_token(payload, secret, ttl=3600)

Create a HS256 JWT token.

Parameters

payload: Claims dict (e.g. {"sub": "user123", "roles": ["admin"]}). secret: HMAC shared secret. ttl: Time-to-live in seconds (default 1 hour).

Source code in packages/dataenginex/src/dataenginex/api/auth.py
def create_token(payload: dict[str, Any], secret: str, ttl: int = 3600) -> str:
    """Create a HS256 JWT token.

    Parameters
    ----------
    payload:
        Claims dict (e.g. ``{"sub": "user123", "roles": ["admin"]}``).
    secret:
        HMAC shared secret.
    ttl:
        Time-to-live in seconds (default 1 hour).
    """
    header = {"alg": "HS256", "typ": "JWT"}
    now = int(time.time())
    payload = {**payload, "iat": now, "exp": now + ttl}

    segments = [
        _b64url_encode(json.dumps(header).encode()),
        _b64url_encode(json.dumps(payload, default=str).encode()),
    ]
    signing_input = f"{segments[0]}.{segments[1]}"
    signature = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
    segments.append(_b64url_encode(signature))
    return ".".join(segments)

decode_token(token, secret)

Decode and verify a HS256 JWT token. Raises ValueError on failure.

Source code in packages/dataenginex/src/dataenginex/api/auth.py
def decode_token(token: str, secret: str) -> dict[str, Any]:
    """Decode and verify a HS256 JWT token.  Raises ``ValueError`` on failure."""
    parts = token.split(".")
    if len(parts) != 3:
        raise ValueError("Malformed JWT")

    signing_input = f"{parts[0]}.{parts[1]}"
    expected_sig = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
    actual_sig = _b64url_decode(parts[2])

    if not hmac.compare_digest(expected_sig, actual_sig):
        raise ValueError("Invalid JWT signature")

    payload: dict[str, Any] = json.loads(_b64url_decode(parts[1]))
    exp = payload.get("exp")
    if exp is not None and int(exp) < int(time.time()):
        raise ValueError("Token expired")

    return payload

paginate(items, *, cursor=None, limit=20, max_limit=100)

Slice items and return a PaginatedResponse.

Parameters

items: Full list of items to paginate. cursor: Opaque cursor from a previous response (or None for the first page). limit: Number of items per page. max_limit: Hard ceiling on limit to prevent abuse.

Source code in packages/dataenginex/src/dataenginex/api/pagination.py
def paginate(
    items: list[Any],
    *,
    cursor: str | None = None,
    limit: int = 20,
    max_limit: int = 100,
) -> PaginatedResponse:
    """Slice *items* and return a ``PaginatedResponse``.

    Parameters
    ----------
    items:
        Full list of items to paginate.
    cursor:
        Opaque cursor from a previous response (or *None* for the first page).
    limit:
        Number of items per page.
    max_limit:
        Hard ceiling on *limit* to prevent abuse.
    """
    limit = min(max(1, limit), max_limit)
    offset = decode_cursor(cursor) if cursor else 0
    total = len(items)

    page = items[offset : offset + limit]
    has_next = (offset + limit) < total
    next_cursor = encode_cursor(offset + limit) if has_next else None
    has_previous = offset > 0

    return PaginatedResponse(
        data=page,
        pagination=PaginationMeta(
            total=total,
            limit=limit,
            has_next=has_next,
            next_cursor=next_cursor,
            has_previous=has_previous,
        ),
    )

get_quality_store()

Return the active quality store.

Source code in packages/dataenginex/src/dataenginex/api/routers/v1.py
def get_quality_store() -> QualityStore:
    """Return the active quality store."""
    return _quality_store

set_quality_store(store)

Replace the module-level quality store (call at app startup).

Source code in packages/dataenginex/src/dataenginex/api/routers/v1.py
def set_quality_store(store: QualityStore) -> None:
    """Replace the module-level quality store (call at app startup)."""
    global _quality_store  # noqa: PLW0603
    _quality_store = store