Skip to content

dataenginex.api

Reusable API components — auth, health, errors, pagination, rate limiting.

Public API::

from dataenginex.api import (
    HealthChecker, HealthStatus, ComponentHealth,
    APIHTTPException, BadRequestError, NotFoundError, ServiceUnavailableError,
    PaginatedResponse, paginate,
    AuthMiddleware, AuthUser, create_token, decode_token,
    RateLimiter, RateLimitMiddleware,
)

Routers are defined in application packages that build on dataenginex.

Requires the [api] extra::

pip install dataenginex[api]

AuthMiddleware

Bases: BaseHTTPMiddleware

Starlette middleware that enforces JWT auth when enabled.

When DEX_AUTH_ENABLED is "true" (case-insensitive), every request to a non-public path must carry a valid Authorization: Bearer <token> header. The decoded claims are stored on request.state.auth_user.

Source code in src/dataenginex/api/auth.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class AuthMiddleware(BaseHTTPMiddleware):
    """Starlette middleware that enforces JWT auth when enabled.

    When ``DEX_AUTH_ENABLED`` is ``"true"`` (case-insensitive), every request
    to a non-public path must carry a valid ``Authorization: Bearer <token>``
    header. The decoded claims are stored on ``request.state.auth_user``.
    """

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        """Validate the bearer token and attach ``auth_user`` to request state."""
        enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
        if not enabled:
            return await call_next(request)

        # Skip public endpoints
        if request.url.path in _PUBLIC_PATHS:
            return await call_next(request)

        secret = os.getenv("DEX_JWT_SECRET", "")
        if not secret:
            logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
            return JSONResponse(
                status_code=500,
                content={"error": "auth_config_error", "message": "Auth secret not configured"},
            )

        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            return JSONResponse(
                status_code=401,
                content={"error": "unauthorized", "message": "Missing bearer token"},
            )

        token = auth_header[7:]
        try:
            claims = decode_token(token, secret)
        except ValueError:
            logger.exception("JWT validation failed")
            return JSONResponse(
                status_code=401,
                content={
                    "error": "unauthorized",
                    "message": "Invalid or expired authentication token",
                },
            )

        request.state.auth_user = AuthUser(
            sub=claims.get("sub", "anonymous"),
            roles=claims.get("roles", []),
            claims=claims,
        )
        return await call_next(request)

dispatch(request, call_next) async

Validate the bearer token and attach auth_user to request state.

Source code in src/dataenginex/api/auth.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """Validate the bearer token and attach ``auth_user`` to request state."""
    enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
    if not enabled:
        return await call_next(request)

    # Skip public endpoints
    if request.url.path in _PUBLIC_PATHS:
        return await call_next(request)

    secret = os.getenv("DEX_JWT_SECRET", "")
    if not secret:
        logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
        return JSONResponse(
            status_code=500,
            content={"error": "auth_config_error", "message": "Auth secret not configured"},
        )

    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        return JSONResponse(
            status_code=401,
            content={"error": "unauthorized", "message": "Missing bearer token"},
        )

    token = auth_header[7:]
    try:
        claims = decode_token(token, secret)
    except ValueError:
        logger.exception("JWT validation failed")
        return JSONResponse(
            status_code=401,
            content={
                "error": "unauthorized",
                "message": "Invalid or expired authentication token",
            },
        )

    request.state.auth_user = AuthUser(
        sub=claims.get("sub", "anonymous"),
        roles=claims.get("roles", []),
        claims=claims,
    )
    return await call_next(request)

AuthUser dataclass

Resolved identity from a valid JWT.

Source code in src/dataenginex/api/auth.py
104
105
106
107
108
109
110
@dataclass
class AuthUser:
    """Resolved identity from a valid JWT."""

    sub: str
    roles: list[str]
    claims: dict[str, Any]

APIHTTPException

Bases: HTTPException

Base HTTP exception with error code and details.

Source code in src/dataenginex/api/errors.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class APIHTTPException(HTTPException):
    """Base HTTP exception with error code and details."""

    def __init__(
        self,
        status_code: int,
        message: str,
        code: str = "api_error",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        self.code = code
        self.details = details
        super().__init__(status_code=status_code, detail=message)

BadRequestError

Bases: APIHTTPException

Raised for 400 validation or malformed requests.

Source code in src/dataenginex/api/errors.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class BadRequestError(APIHTTPException):
    """Raised for 400 validation or malformed requests."""

    def __init__(
        self,
        message: str = "Bad request",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_400_BAD_REQUEST,
            message=message,
            code="bad_request",
            details=details,
        )

NotFoundError

Bases: APIHTTPException

Raised for 404 not found errors.

Source code in src/dataenginex/api/errors.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class NotFoundError(APIHTTPException):
    """Raised for 404 not found errors."""

    def __init__(
        self,
        message: str = "Resource not found",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_404_NOT_FOUND,
            message=message,
            code="not_found",
            details=details,
        )

ServiceUnavailableError

Bases: APIHTTPException

Raised when a dependency is unavailable.

Source code in src/dataenginex/api/errors.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class ServiceUnavailableError(APIHTTPException):
    """Raised when a dependency is unavailable."""

    def __init__(
        self,
        message: str = "Service unavailable",
        details: list[ErrorDetail] | None = None,
    ) -> None:
        super().__init__(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            message=message,
            code="service_unavailable",
            details=details,
        )

ComponentHealth dataclass

Health status of a single dependency component.

Attributes:

Name Type Description
name str

Component identifier (e.g. "database", "cache").

status HealthStatus

Current health status.

message str | None

Optional human-readable message.

duration_ms float | None

Time taken for the health check in milliseconds.

Source code in src/dataenginex/api/health.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass(frozen=True)
class ComponentHealth:
    """Health status of a single dependency component.

    Attributes:
        name: Component identifier (e.g. ``"database"``, ``"cache"``).
        status: Current health status.
        message: Optional human-readable message.
        duration_ms: Time taken for the health check in milliseconds.
    """

    name: str
    status: HealthStatus
    message: str | None = None
    duration_ms: float | None = None

    def to_dict(self) -> dict[str, object | None]:
        """Serialize to a plain dictionary suitable for JSON responses."""
        return {
            "name": self.name,
            "status": self.status.value,
            "message": self.message,
            "duration_ms": self.duration_ms,
        }

to_dict()

Serialize to a plain dictionary suitable for JSON responses.

Source code in src/dataenginex/api/health.py
45
46
47
48
49
50
51
52
def to_dict(self) -> dict[str, object | None]:
    """Serialize to a plain dictionary suitable for JSON responses."""
    return {
        "name": self.name,
        "status": self.status.value,
        "message": self.message,
        "duration_ms": self.duration_ms,
    }

HealthChecker

Runs health checks for DEX dependencies.

Source code in src/dataenginex/api/health.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class HealthChecker:
    """Runs health checks for DEX dependencies."""

    def __init__(self, timeout_seconds: float = 1.0) -> None:
        self.timeout_seconds = timeout_seconds

    async def check_all(self) -> list[ComponentHealth]:
        """Run all dependency health checks concurrently."""
        return [
            await self.check_database(),
            await self.check_cache(),
            await self.check_external_api(),
        ]

    def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
        """Derive an aggregate status from individual component results."""
        if any(c.status == HealthStatus.UNHEALTHY for c in components):
            return HealthStatus.UNHEALTHY
        if any(c.status == HealthStatus.DEGRADED for c in components):
            return HealthStatus.DEGRADED
        return HealthStatus.HEALTHY

    async def check_database(self) -> ComponentHealth:
        """Check database connectivity via TCP probe."""
        host = os.getenv("DEX_DB_HOST")
        port = os.getenv("DEX_DB_PORT")
        if not host or not port:
            return ComponentHealth(
                name="database",
                status=HealthStatus.SKIPPED,
                message="database not configured",
            )

        start = time.perf_counter()
        ok, message = await self._tcp_check(host, int(port))
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="database",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def check_cache(self) -> ComponentHealth:
        """Check cache connectivity via TCP probe."""
        host = os.getenv("DEX_CACHE_HOST")
        port = os.getenv("DEX_CACHE_PORT")
        if not host or not port:
            return ComponentHealth(
                name="cache",
                status=HealthStatus.SKIPPED,
                message="cache not configured",
            )

        start = time.perf_counter()
        ok, message = await self._tcp_check(host, int(port))
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="cache",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def check_external_api(self) -> ComponentHealth:
        """Check external API reachability via HTTP GET."""
        url = os.getenv("DEX_EXTERNAL_API_URL")
        if not url:
            return ComponentHealth(
                name="external_api",
                status=HealthStatus.SKIPPED,
                message="external API not configured",
            )

        start = time.perf_counter()
        timeout = httpx.Timeout(self.timeout_seconds)
        try:
            async with httpx.AsyncClient(timeout=timeout) as client:
                response = await client.get(url)
            ok = response.status_code < 500
            message = f"status_code={response.status_code}"
        except httpx.HTTPError as exc:
            ok = False
            message = f"error={exc.__class__.__name__}"
        duration_ms = (time.perf_counter() - start) * 1000
        return ComponentHealth(
            name="external_api",
            status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
            message=message,
            duration_ms=round(duration_ms, 2),
        )

    async def _tcp_check(self, host: str, port: int) -> tuple[bool, str]:
        try:
            await asyncio.wait_for(
                asyncio.open_connection(host, port), timeout=self.timeout_seconds
            )
            return True, "reachable"
        except (TimeoutError, ConnectionRefusedError, OSError) as exc:
            return False, f"error={exc.__class__.__name__}"

check_all() async

Run all dependency health checks concurrently.

Source code in src/dataenginex/api/health.py
61
62
63
64
65
66
67
async def check_all(self) -> list[ComponentHealth]:
    """Run all dependency health checks concurrently."""
    return [
        await self.check_database(),
        await self.check_cache(),
        await self.check_external_api(),
    ]

overall_status(components)

Derive an aggregate status from individual component results.

Source code in src/dataenginex/api/health.py
69
70
71
72
73
74
75
def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
    """Derive an aggregate status from individual component results."""
    if any(c.status == HealthStatus.UNHEALTHY for c in components):
        return HealthStatus.UNHEALTHY
    if any(c.status == HealthStatus.DEGRADED for c in components):
        return HealthStatus.DEGRADED
    return HealthStatus.HEALTHY

check_database() async

Check database connectivity via TCP probe.

Source code in src/dataenginex/api/health.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def check_database(self) -> ComponentHealth:
    """Check database connectivity via TCP probe."""
    host = os.getenv("DEX_DB_HOST")
    port = os.getenv("DEX_DB_PORT")
    if not host or not port:
        return ComponentHealth(
            name="database",
            status=HealthStatus.SKIPPED,
            message="database not configured",
        )

    start = time.perf_counter()
    ok, message = await self._tcp_check(host, int(port))
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="database",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

check_cache() async

Check cache connectivity via TCP probe.

Source code in src/dataenginex/api/health.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def check_cache(self) -> ComponentHealth:
    """Check cache connectivity via TCP probe."""
    host = os.getenv("DEX_CACHE_HOST")
    port = os.getenv("DEX_CACHE_PORT")
    if not host or not port:
        return ComponentHealth(
            name="cache",
            status=HealthStatus.SKIPPED,
            message="cache not configured",
        )

    start = time.perf_counter()
    ok, message = await self._tcp_check(host, int(port))
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="cache",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

check_external_api() async

Check external API reachability via HTTP GET.

Source code in src/dataenginex/api/health.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
async def check_external_api(self) -> ComponentHealth:
    """Check external API reachability via HTTP GET."""
    url = os.getenv("DEX_EXTERNAL_API_URL")
    if not url:
        return ComponentHealth(
            name="external_api",
            status=HealthStatus.SKIPPED,
            message="external API not configured",
        )

    start = time.perf_counter()
    timeout = httpx.Timeout(self.timeout_seconds)
    try:
        async with httpx.AsyncClient(timeout=timeout) as client:
            response = await client.get(url)
        ok = response.status_code < 500
        message = f"status_code={response.status_code}"
    except httpx.HTTPError as exc:
        ok = False
        message = f"error={exc.__class__.__name__}"
    duration_ms = (time.perf_counter() - start) * 1000
    return ComponentHealth(
        name="external_api",
        status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
        message=message,
        duration_ms=round(duration_ms, 2),
    )

HealthStatus

Bases: StrEnum

Supported health statuses.

Source code in src/dataenginex/api/health.py
20
21
22
23
24
25
26
class HealthStatus(StrEnum):
    """Supported health statuses."""

    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    SKIPPED = "skipped"

PaginatedResponse

Bases: BaseModel

Generic paginated response wrapper.

Source code in src/dataenginex/api/pagination.py
43
44
45
46
47
class PaginatedResponse(BaseModel):
    """Generic paginated response wrapper."""

    data: list[Any] = Field(default_factory=list)
    pagination: PaginationMeta

PaginationMeta

Bases: BaseModel

Pagination metadata returned alongside results.

Source code in src/dataenginex/api/pagination.py
33
34
35
36
37
38
39
40
class PaginationMeta(BaseModel):
    """Pagination metadata returned alongside results."""

    total: int = Field(description="Total number of items")
    limit: int = Field(description="Page size")
    has_next: bool = Field(description="Whether more items exist")
    next_cursor: str | None = Field(None, description="Opaque cursor for next page")
    has_previous: bool = Field(default=False, description="Whether previous items exist")

RateLimiter

In-memory token-bucket rate limiter.

Parameters

requests_per_minute: Sustained request rate per client. burst: Maximum instantaneous burst size.

Source code in src/dataenginex/api/rate_limit.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class RateLimiter:
    """In-memory token-bucket rate limiter.

    Parameters
    ----------
    requests_per_minute:
        Sustained request rate per client.
    burst:
        Maximum instantaneous burst size.
    """

    def __init__(self, requests_per_minute: int = 60, burst: int = 10) -> None:
        self.rpm = requests_per_minute
        self.burst = burst
        self._buckets: dict[str, _Bucket] = {}

    def allow(self, client_id: str) -> bool:
        """Return ``True`` if *client_id* has remaining tokens; consume one token."""
        bucket = self._buckets.get(client_id)
        if bucket is None:
            bucket = _Bucket(
                tokens=float(self.burst),
                last_refill=time.monotonic(),
                capacity=float(self.burst),
                refill_rate=self.rpm / 60.0,
            )
            self._buckets[client_id] = bucket
        return bucket.consume()

    def get_stats(self) -> dict[str, Any]:
        """Return current rate-limiter configuration and client count."""
        return {
            "rpm": self.rpm,
            "burst": self.burst,
            "active_clients": len(self._buckets),
        }

    def cleanup(self, max_age_seconds: float = 300.0) -> int:
        """Evict buckets idle for more than *max_age_seconds*."""
        now = time.monotonic()
        stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
        for k in stale:
            del self._buckets[k]
        return len(stale)

allow(client_id)

Return True if client_id has remaining tokens; consume one token.

Source code in src/dataenginex/api/rate_limit.py
67
68
69
70
71
72
73
74
75
76
77
78
def allow(self, client_id: str) -> bool:
    """Return ``True`` if *client_id* has remaining tokens; consume one token."""
    bucket = self._buckets.get(client_id)
    if bucket is None:
        bucket = _Bucket(
            tokens=float(self.burst),
            last_refill=time.monotonic(),
            capacity=float(self.burst),
            refill_rate=self.rpm / 60.0,
        )
        self._buckets[client_id] = bucket
    return bucket.consume()

get_stats()

Return current rate-limiter configuration and client count.

Source code in src/dataenginex/api/rate_limit.py
80
81
82
83
84
85
86
def get_stats(self) -> dict[str, Any]:
    """Return current rate-limiter configuration and client count."""
    return {
        "rpm": self.rpm,
        "burst": self.burst,
        "active_clients": len(self._buckets),
    }

cleanup(max_age_seconds=300.0)

Evict buckets idle for more than max_age_seconds.

Source code in src/dataenginex/api/rate_limit.py
88
89
90
91
92
93
94
def cleanup(self, max_age_seconds: float = 300.0) -> int:
    """Evict buckets idle for more than *max_age_seconds*."""
    now = time.monotonic()
    stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
    for k in stale:
        del self._buckets[k]
    return len(stale)

RateLimitMiddleware

Bases: BaseHTTPMiddleware

Starlette middleware applying per-IP rate limiting.

Source code in src/dataenginex/api/rate_limit.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class RateLimitMiddleware(BaseHTTPMiddleware):
    """Starlette middleware applying per-IP rate limiting."""

    def __init__(self, app: Any) -> None:  # noqa: ANN401
        super().__init__(app)
        rpm = int(os.getenv("DEX_RATE_LIMIT_RPM", "60"))
        burst = int(os.getenv("DEX_RATE_LIMIT_BURST", "10"))
        self._limiter = RateLimiter(requests_per_minute=rpm, burst=burst)
        self._enabled = os.getenv("DEX_RATE_LIMIT_ENABLED", "false").lower() == "true"

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        """Enforce per-IP rate limits; return 429 when tokens are exhausted."""
        if not self._enabled:
            return await call_next(request)

        if request.url.path in _EXEMPT_PATHS:
            return await call_next(request)

        client_ip = request.client.host if request.client else "unknown"
        if not self._limiter.allow(client_ip):
            logger.warning("rate limit exceeded", client_ip=client_ip)
            return JSONResponse(
                status_code=429,
                content={
                    "error": "rate_limit_exceeded",
                    "message": "Too many requests — please slow down",
                },
                headers={"Retry-After": "60"},
            )

        return await call_next(request)

dispatch(request, call_next) async

Enforce per-IP rate limits; return 429 when tokens are exhausted.

Source code in src/dataenginex/api/rate_limit.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """Enforce per-IP rate limits; return 429 when tokens are exhausted."""
    if not self._enabled:
        return await call_next(request)

    if request.url.path in _EXEMPT_PATHS:
        return await call_next(request)

    client_ip = request.client.host if request.client else "unknown"
    if not self._limiter.allow(client_ip):
        logger.warning("rate limit exceeded", client_ip=client_ip)
        return JSONResponse(
            status_code=429,
            content={
                "error": "rate_limit_exceeded",
                "message": "Too many requests — please slow down",
            },
            headers={"Retry-After": "60"},
        )

    return await call_next(request)

create_token(payload, secret, ttl=3600)

Create a HS256 JWT token.

Parameters

payload: Claims dict (e.g. {"sub": "user123", "roles": ["admin"]}). secret: HMAC shared secret. ttl: Time-to-live in seconds (default 1 hour).

Source code in src/dataenginex/api/auth.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def create_token(payload: dict[str, Any], secret: str, ttl: int = 3600) -> str:
    """Create a HS256 JWT token.

    Parameters
    ----------
    payload:
        Claims dict (e.g. ``{"sub": "user123", "roles": ["admin"]}``).
    secret:
        HMAC shared secret.
    ttl:
        Time-to-live in seconds (default 1 hour).
    """
    header = {"alg": "HS256", "typ": "JWT"}
    now = int(time.time())
    payload = {**payload, "iat": now, "exp": now + ttl}

    segments = [
        _b64url_encode(json.dumps(header).encode()),
        _b64url_encode(json.dumps(payload, default=str).encode()),
    ]
    signing_input = f"{segments[0]}.{segments[1]}"
    signature = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
    segments.append(_b64url_encode(signature))
    return ".".join(segments)

decode_token(token, secret)

Decode and verify a HS256 JWT token. Raises ValueError on failure.

Source code in src/dataenginex/api/auth.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def decode_token(token: str, secret: str) -> dict[str, Any]:
    """Decode and verify a HS256 JWT token.  Raises ``ValueError`` on failure."""
    parts = token.split(".")
    if len(parts) != 3:
        raise ValueError("Malformed JWT")

    signing_input = f"{parts[0]}.{parts[1]}"
    expected_sig = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
    actual_sig = _b64url_decode(parts[2])

    if not hmac.compare_digest(expected_sig, actual_sig):
        raise ValueError("Invalid JWT signature")

    payload: dict[str, Any] = json.loads(_b64url_decode(parts[1]))
    exp = payload.get("exp")
    if exp is not None and int(exp) < int(time.time()):
        raise ValueError("Token expired")

    return payload

paginate(items, *, cursor=None, limit=20, max_limit=100)

Slice items and return a PaginatedResponse.

Parameters

items: Full list of items to paginate. cursor: Opaque cursor from a previous response (or None for the first page). limit: Number of items per page. max_limit: Hard ceiling on limit to prevent abuse.

Source code in src/dataenginex/api/pagination.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def paginate(
    items: list[Any],
    *,
    cursor: str | None = None,
    limit: int = 20,
    max_limit: int = 100,
) -> PaginatedResponse:
    """Slice *items* and return a ``PaginatedResponse``.

    Parameters
    ----------
    items:
        Full list of items to paginate.
    cursor:
        Opaque cursor from a previous response (or *None* for the first page).
    limit:
        Number of items per page.
    max_limit:
        Hard ceiling on *limit* to prevent abuse.
    """
    limit = min(max(1, limit), max_limit)

    if cursor:
        try:
            offset = decode_cursor(cursor)
        except ValueError:
            offset = 0  # Reset to first page on invalid cursor
    else:
        offset = 0
    total = len(items)

    page = items[offset : offset + limit]
    has_next = (offset + limit) < total
    next_cursor = encode_cursor(offset + limit) if has_next else None
    has_previous = offset > 0

    return PaginatedResponse(
        data=page,
        pagination=PaginationMeta(
            total=total,
            limit=limit,
            has_next=has_next,
            next_cursor=next_cursor,
            has_previous=has_previous,
        ),
    )