Reusable API components — auth, health, errors, pagination, rate limiting.
Public API::
from dataenginex.api import (
HealthChecker, HealthStatus, ComponentHealth,
APIHTTPException, BadRequestError, NotFoundError, ServiceUnavailableError,
PaginatedResponse, paginate,
AuthMiddleware, AuthUser, create_token, decode_token,
RateLimiter, RateLimitMiddleware,
)
Routers are defined in application packages that build on dataenginex.
Requires the [api] extra::
pip install dataenginex[api]
AuthMiddleware
Bases: BaseHTTPMiddleware
Starlette middleware that enforces JWT auth when enabled.
When DEX_AUTH_ENABLED is "true" (case-insensitive), every request
to a non-public path must carry a valid Authorization: Bearer <token>
header. The decoded claims are stored on request.state.auth_user.
Source code in src/dataenginex/api/auth.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 | class AuthMiddleware(BaseHTTPMiddleware):
"""Starlette middleware that enforces JWT auth when enabled.
When ``DEX_AUTH_ENABLED`` is ``"true"`` (case-insensitive), every request
to a non-public path must carry a valid ``Authorization: Bearer <token>``
header. The decoded claims are stored on ``request.state.auth_user``.
"""
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Validate the bearer token and attach ``auth_user`` to request state."""
enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
if not enabled:
return await call_next(request)
# Skip public endpoints
if request.url.path in _PUBLIC_PATHS:
return await call_next(request)
secret = os.getenv("DEX_JWT_SECRET", "")
if not secret:
logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
return JSONResponse(
status_code=500,
content={"error": "auth_config_error", "message": "Auth secret not configured"},
)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={"error": "unauthorized", "message": "Missing bearer token"},
)
token = auth_header[7:]
try:
claims = decode_token(token, secret)
except ValueError:
logger.exception("JWT validation failed")
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": "Invalid or expired authentication token",
},
)
request.state.auth_user = AuthUser(
sub=claims.get("sub", "anonymous"),
roles=claims.get("roles", []),
claims=claims,
)
return await call_next(request)
|
dispatch(request, call_next)
async
Validate the bearer token and attach auth_user to request state.
Source code in src/dataenginex/api/auth.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 | async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Validate the bearer token and attach ``auth_user`` to request state."""
enabled = os.getenv("DEX_AUTH_ENABLED", "false").lower() == "true"
if not enabled:
return await call_next(request)
# Skip public endpoints
if request.url.path in _PUBLIC_PATHS:
return await call_next(request)
secret = os.getenv("DEX_JWT_SECRET", "")
if not secret:
logger.error("DEX_AUTH_ENABLED=true but DEX_JWT_SECRET is not set")
return JSONResponse(
status_code=500,
content={"error": "auth_config_error", "message": "Auth secret not configured"},
)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={"error": "unauthorized", "message": "Missing bearer token"},
)
token = auth_header[7:]
try:
claims = decode_token(token, secret)
except ValueError:
logger.exception("JWT validation failed")
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": "Invalid or expired authentication token",
},
)
request.state.auth_user = AuthUser(
sub=claims.get("sub", "anonymous"),
roles=claims.get("roles", []),
claims=claims,
)
return await call_next(request)
|
AuthUser
dataclass
Resolved identity from a valid JWT.
Source code in src/dataenginex/api/auth.py
104
105
106
107
108
109
110 | @dataclass
class AuthUser:
"""Resolved identity from a valid JWT."""
sub: str
roles: list[str]
claims: dict[str, Any]
|
APIHTTPException
Bases: HTTPException
Base HTTP exception with error code and details.
Source code in src/dataenginex/api/errors.py
17
18
19
20
21
22
23
24
25
26
27
28
29 | class APIHTTPException(HTTPException):
"""Base HTTP exception with error code and details."""
def __init__(
self,
status_code: int,
message: str,
code: str = "api_error",
details: list[ErrorDetail] | None = None,
) -> None:
self.code = code
self.details = details
super().__init__(status_code=status_code, detail=message)
|
BadRequestError
Bases: APIHTTPException
Raised for 400 validation or malformed requests.
Source code in src/dataenginex/api/errors.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45 | class BadRequestError(APIHTTPException):
"""Raised for 400 validation or malformed requests."""
def __init__(
self,
message: str = "Bad request",
details: list[ErrorDetail] | None = None,
) -> None:
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
message=message,
code="bad_request",
details=details,
)
|
NotFoundError
Bases: APIHTTPException
Raised for 404 not found errors.
Source code in src/dataenginex/api/errors.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | class NotFoundError(APIHTTPException):
"""Raised for 404 not found errors."""
def __init__(
self,
message: str = "Resource not found",
details: list[ErrorDetail] | None = None,
) -> None:
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
message=message,
code="not_found",
details=details,
)
|
ServiceUnavailableError
Bases: APIHTTPException
Raised when a dependency is unavailable.
Source code in src/dataenginex/api/errors.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | class ServiceUnavailableError(APIHTTPException):
"""Raised when a dependency is unavailable."""
def __init__(
self,
message: str = "Service unavailable",
details: list[ErrorDetail] | None = None,
) -> None:
super().__init__(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
message=message,
code="service_unavailable",
details=details,
)
|
ComponentHealth
dataclass
Health status of a single dependency component.
Attributes:
| Name |
Type |
Description |
name |
str
|
Component identifier (e.g. "database", "cache").
|
status |
HealthStatus
|
|
message |
str | None
|
Optional human-readable message.
|
duration_ms |
float | None
|
Time taken for the health check in milliseconds.
|
Source code in src/dataenginex/api/health.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | @dataclass(frozen=True)
class ComponentHealth:
"""Health status of a single dependency component.
Attributes:
name: Component identifier (e.g. ``"database"``, ``"cache"``).
status: Current health status.
message: Optional human-readable message.
duration_ms: Time taken for the health check in milliseconds.
"""
name: str
status: HealthStatus
message: str | None = None
duration_ms: float | None = None
def to_dict(self) -> dict[str, object | None]:
"""Serialize to a plain dictionary suitable for JSON responses."""
return {
"name": self.name,
"status": self.status.value,
"message": self.message,
"duration_ms": self.duration_ms,
}
|
to_dict()
Serialize to a plain dictionary suitable for JSON responses.
Source code in src/dataenginex/api/health.py
| def to_dict(self) -> dict[str, object | None]:
"""Serialize to a plain dictionary suitable for JSON responses."""
return {
"name": self.name,
"status": self.status.value,
"message": self.message,
"duration_ms": self.duration_ms,
}
|
HealthChecker
Runs health checks for DEX dependencies.
Source code in src/dataenginex/api/health.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 | class HealthChecker:
"""Runs health checks for DEX dependencies."""
def __init__(self, timeout_seconds: float = 1.0) -> None:
self.timeout_seconds = timeout_seconds
async def check_all(self) -> list[ComponentHealth]:
"""Run all dependency health checks concurrently."""
return [
await self.check_database(),
await self.check_cache(),
await self.check_external_api(),
]
def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
"""Derive an aggregate status from individual component results."""
if any(c.status == HealthStatus.UNHEALTHY for c in components):
return HealthStatus.UNHEALTHY
if any(c.status == HealthStatus.DEGRADED for c in components):
return HealthStatus.DEGRADED
return HealthStatus.HEALTHY
async def check_database(self) -> ComponentHealth:
"""Check database connectivity via TCP probe."""
host = os.getenv("DEX_DB_HOST")
port = os.getenv("DEX_DB_PORT")
if not host or not port:
return ComponentHealth(
name="database",
status=HealthStatus.SKIPPED,
message="database not configured",
)
start = time.perf_counter()
ok, message = await self._tcp_check(host, int(port))
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="database",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
async def check_cache(self) -> ComponentHealth:
"""Check cache connectivity via TCP probe."""
host = os.getenv("DEX_CACHE_HOST")
port = os.getenv("DEX_CACHE_PORT")
if not host or not port:
return ComponentHealth(
name="cache",
status=HealthStatus.SKIPPED,
message="cache not configured",
)
start = time.perf_counter()
ok, message = await self._tcp_check(host, int(port))
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="cache",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
async def check_external_api(self) -> ComponentHealth:
"""Check external API reachability via HTTP GET."""
url = os.getenv("DEX_EXTERNAL_API_URL")
if not url:
return ComponentHealth(
name="external_api",
status=HealthStatus.SKIPPED,
message="external API not configured",
)
start = time.perf_counter()
timeout = httpx.Timeout(self.timeout_seconds)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
ok = response.status_code < 500
message = f"status_code={response.status_code}"
except httpx.HTTPError as exc:
ok = False
message = f"error={exc.__class__.__name__}"
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="external_api",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
async def _tcp_check(self, host: str, port: int) -> tuple[bool, str]:
try:
await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=self.timeout_seconds
)
return True, "reachable"
except (TimeoutError, ConnectionRefusedError, OSError) as exc:
return False, f"error={exc.__class__.__name__}"
|
check_all()
async
Run all dependency health checks concurrently.
Source code in src/dataenginex/api/health.py
| async def check_all(self) -> list[ComponentHealth]:
"""Run all dependency health checks concurrently."""
return [
await self.check_database(),
await self.check_cache(),
await self.check_external_api(),
]
|
overall_status(components)
Derive an aggregate status from individual component results.
Source code in src/dataenginex/api/health.py
| def overall_status(self, components: list[ComponentHealth]) -> HealthStatus:
"""Derive an aggregate status from individual component results."""
if any(c.status == HealthStatus.UNHEALTHY for c in components):
return HealthStatus.UNHEALTHY
if any(c.status == HealthStatus.DEGRADED for c in components):
return HealthStatus.DEGRADED
return HealthStatus.HEALTHY
|
check_database()
async
Check database connectivity via TCP probe.
Source code in src/dataenginex/api/health.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | async def check_database(self) -> ComponentHealth:
"""Check database connectivity via TCP probe."""
host = os.getenv("DEX_DB_HOST")
port = os.getenv("DEX_DB_PORT")
if not host or not port:
return ComponentHealth(
name="database",
status=HealthStatus.SKIPPED,
message="database not configured",
)
start = time.perf_counter()
ok, message = await self._tcp_check(host, int(port))
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="database",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
|
check_cache()
async
Check cache connectivity via TCP probe.
Source code in src/dataenginex/api/health.py
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 | async def check_cache(self) -> ComponentHealth:
"""Check cache connectivity via TCP probe."""
host = os.getenv("DEX_CACHE_HOST")
port = os.getenv("DEX_CACHE_PORT")
if not host or not port:
return ComponentHealth(
name="cache",
status=HealthStatus.SKIPPED,
message="cache not configured",
)
start = time.perf_counter()
ok, message = await self._tcp_check(host, int(port))
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="cache",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
|
check_external_api()
async
Check external API reachability via HTTP GET.
Source code in src/dataenginex/api/health.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 | async def check_external_api(self) -> ComponentHealth:
"""Check external API reachability via HTTP GET."""
url = os.getenv("DEX_EXTERNAL_API_URL")
if not url:
return ComponentHealth(
name="external_api",
status=HealthStatus.SKIPPED,
message="external API not configured",
)
start = time.perf_counter()
timeout = httpx.Timeout(self.timeout_seconds)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
ok = response.status_code < 500
message = f"status_code={response.status_code}"
except httpx.HTTPError as exc:
ok = False
message = f"error={exc.__class__.__name__}"
duration_ms = (time.perf_counter() - start) * 1000
return ComponentHealth(
name="external_api",
status=HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY,
message=message,
duration_ms=round(duration_ms, 2),
)
|
HealthStatus
Bases: StrEnum
Supported health statuses.
Source code in src/dataenginex/api/health.py
| class HealthStatus(StrEnum):
"""Supported health statuses."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
SKIPPED = "skipped"
|
PaginatedResponse
Bases: BaseModel
Generic paginated response wrapper.
Source code in src/dataenginex/api/pagination.py
| class PaginatedResponse(BaseModel):
"""Generic paginated response wrapper."""
data: list[Any] = Field(default_factory=list)
pagination: PaginationMeta
|
Bases: BaseModel
Pagination metadata returned alongside results.
Source code in src/dataenginex/api/pagination.py
| class PaginationMeta(BaseModel):
"""Pagination metadata returned alongside results."""
total: int = Field(description="Total number of items")
limit: int = Field(description="Page size")
has_next: bool = Field(description="Whether more items exist")
next_cursor: str | None = Field(None, description="Opaque cursor for next page")
has_previous: bool = Field(default=False, description="Whether previous items exist")
|
RateLimiter
In-memory token-bucket rate limiter.
Parameters
requests_per_minute:
Sustained request rate per client.
burst:
Maximum instantaneous burst size.
Source code in src/dataenginex/api/rate_limit.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | class RateLimiter:
"""In-memory token-bucket rate limiter.
Parameters
----------
requests_per_minute:
Sustained request rate per client.
burst:
Maximum instantaneous burst size.
"""
def __init__(self, requests_per_minute: int = 60, burst: int = 10) -> None:
self.rpm = requests_per_minute
self.burst = burst
self._buckets: dict[str, _Bucket] = {}
def allow(self, client_id: str) -> bool:
"""Return ``True`` if *client_id* has remaining tokens; consume one token."""
bucket = self._buckets.get(client_id)
if bucket is None:
bucket = _Bucket(
tokens=float(self.burst),
last_refill=time.monotonic(),
capacity=float(self.burst),
refill_rate=self.rpm / 60.0,
)
self._buckets[client_id] = bucket
return bucket.consume()
def get_stats(self) -> dict[str, Any]:
"""Return current rate-limiter configuration and client count."""
return {
"rpm": self.rpm,
"burst": self.burst,
"active_clients": len(self._buckets),
}
def cleanup(self, max_age_seconds: float = 300.0) -> int:
"""Evict buckets idle for more than *max_age_seconds*."""
now = time.monotonic()
stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
for k in stale:
del self._buckets[k]
return len(stale)
|
allow(client_id)
Return True if client_id has remaining tokens; consume one token.
Source code in src/dataenginex/api/rate_limit.py
67
68
69
70
71
72
73
74
75
76
77
78 | def allow(self, client_id: str) -> bool:
"""Return ``True`` if *client_id* has remaining tokens; consume one token."""
bucket = self._buckets.get(client_id)
if bucket is None:
bucket = _Bucket(
tokens=float(self.burst),
last_refill=time.monotonic(),
capacity=float(self.burst),
refill_rate=self.rpm / 60.0,
)
self._buckets[client_id] = bucket
return bucket.consume()
|
get_stats()
Return current rate-limiter configuration and client count.
Source code in src/dataenginex/api/rate_limit.py
| def get_stats(self) -> dict[str, Any]:
"""Return current rate-limiter configuration and client count."""
return {
"rpm": self.rpm,
"burst": self.burst,
"active_clients": len(self._buckets),
}
|
cleanup(max_age_seconds=300.0)
Evict buckets idle for more than max_age_seconds.
Source code in src/dataenginex/api/rate_limit.py
| def cleanup(self, max_age_seconds: float = 300.0) -> int:
"""Evict buckets idle for more than *max_age_seconds*."""
now = time.monotonic()
stale = [k for k, b in self._buckets.items() if now - b.last_refill > max_age_seconds]
for k in stale:
del self._buckets[k]
return len(stale)
|
RateLimitMiddleware
Bases: BaseHTTPMiddleware
Starlette middleware applying per-IP rate limiting.
Source code in src/dataenginex/api/rate_limit.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | class RateLimitMiddleware(BaseHTTPMiddleware):
"""Starlette middleware applying per-IP rate limiting."""
def __init__(self, app: Any) -> None: # noqa: ANN401
super().__init__(app)
rpm = int(os.getenv("DEX_RATE_LIMIT_RPM", "60"))
burst = int(os.getenv("DEX_RATE_LIMIT_BURST", "10"))
self._limiter = RateLimiter(requests_per_minute=rpm, burst=burst)
self._enabled = os.getenv("DEX_RATE_LIMIT_ENABLED", "false").lower() == "true"
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Enforce per-IP rate limits; return 429 when tokens are exhausted."""
if not self._enabled:
return await call_next(request)
if request.url.path in _EXEMPT_PATHS:
return await call_next(request)
client_ip = request.client.host if request.client else "unknown"
if not self._limiter.allow(client_ip):
logger.warning("rate limit exceeded", client_ip=client_ip)
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"message": "Too many requests — please slow down",
},
headers={"Retry-After": "60"},
)
return await call_next(request)
|
dispatch(request, call_next)
async
Enforce per-IP rate limits; return 429 when tokens are exhausted.
Source code in src/dataenginex/api/rate_limit.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Enforce per-IP rate limits; return 429 when tokens are exhausted."""
if not self._enabled:
return await call_next(request)
if request.url.path in _EXEMPT_PATHS:
return await call_next(request)
client_ip = request.client.host if request.client else "unknown"
if not self._limiter.allow(client_ip):
logger.warning("rate limit exceeded", client_ip=client_ip)
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"message": "Too many requests — please slow down",
},
headers={"Retry-After": "60"},
)
return await call_next(request)
|
create_token(payload, secret, ttl=3600)
Create a HS256 JWT token.
Parameters
payload:
Claims dict (e.g. {"sub": "user123", "roles": ["admin"]}).
secret:
HMAC shared secret.
ttl:
Time-to-live in seconds (default 1 hour).
Source code in src/dataenginex/api/auth.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 | def create_token(payload: dict[str, Any], secret: str, ttl: int = 3600) -> str:
"""Create a HS256 JWT token.
Parameters
----------
payload:
Claims dict (e.g. ``{"sub": "user123", "roles": ["admin"]}``).
secret:
HMAC shared secret.
ttl:
Time-to-live in seconds (default 1 hour).
"""
header = {"alg": "HS256", "typ": "JWT"}
now = int(time.time())
payload = {**payload, "iat": now, "exp": now + ttl}
segments = [
_b64url_encode(json.dumps(header).encode()),
_b64url_encode(json.dumps(payload, default=str).encode()),
]
signing_input = f"{segments[0]}.{segments[1]}"
signature = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
segments.append(_b64url_encode(signature))
return ".".join(segments)
|
decode_token(token, secret)
Decode and verify a HS256 JWT token. Raises ValueError on failure.
Source code in src/dataenginex/api/auth.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | def decode_token(token: str, secret: str) -> dict[str, Any]:
"""Decode and verify a HS256 JWT token. Raises ``ValueError`` on failure."""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Malformed JWT")
signing_input = f"{parts[0]}.{parts[1]}"
expected_sig = hmac.new(secret.encode(), signing_input.encode(), sha256).digest()
actual_sig = _b64url_decode(parts[2])
if not hmac.compare_digest(expected_sig, actual_sig):
raise ValueError("Invalid JWT signature")
payload: dict[str, Any] = json.loads(_b64url_decode(parts[1]))
exp = payload.get("exp")
if exp is not None and int(exp) < int(time.time()):
raise ValueError("Token expired")
return payload
|
paginate(items, *, cursor=None, limit=20, max_limit=100)
Slice items and return a PaginatedResponse.
Parameters
items:
Full list of items to paginate.
cursor:
Opaque cursor from a previous response (or None for the first page).
limit:
Number of items per page.
max_limit:
Hard ceiling on limit to prevent abuse.
Source code in src/dataenginex/api/pagination.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | def paginate(
items: list[Any],
*,
cursor: str | None = None,
limit: int = 20,
max_limit: int = 100,
) -> PaginatedResponse:
"""Slice *items* and return a ``PaginatedResponse``.
Parameters
----------
items:
Full list of items to paginate.
cursor:
Opaque cursor from a previous response (or *None* for the first page).
limit:
Number of items per page.
max_limit:
Hard ceiling on *limit* to prevent abuse.
"""
limit = min(max(1, limit), max_limit)
if cursor:
try:
offset = decode_cursor(cursor)
except ValueError:
offset = 0 # Reset to first page on invalid cursor
else:
offset = 0
total = len(items)
page = items[offset : offset + limit]
has_next = (offset + limit) < total
next_cursor = encode_cursor(offset + limit) if has_next else None
has_previous = offset > 0
return PaginatedResponse(
data=page,
pagination=PaginationMeta(
total=total,
limit=limit,
has_next=has_next,
next_cursor=next_cursor,
has_previous=has_previous,
),
)
|