Skip to content

dataenginex.middleware

Logging, metrics, tracing, and request handling middleware.

dataenginex.middleware

Middleware — logging, metrics, tracing, and request handling.

Public API::

from dataenginex.middleware import (
    configure_logging, get_logger, APP_VERSION,
    get_metrics, PrometheusMetricsMiddleware,
    RequestLoggingMiddleware,
    configure_tracing, instrument_fastapi, get_tracer,
)

PrometheusMetricsMiddleware

Bases: BaseHTTPMiddleware

Middleware to collect Prometheus metrics for HTTP requests.

Source code in packages/dataenginex/src/dataenginex/middleware/metrics_middleware.py
class PrometheusMetricsMiddleware(BaseHTTPMiddleware):
    """Middleware to collect Prometheus metrics for HTTP requests."""

    async def dispatch(self, request: Request, call_next: Callable[..., Any]) -> Response:
        """Process request and collect metrics."""
        # Skip metrics collection for the /metrics endpoint itself
        if request.url.path == "/metrics":
            return cast(Response, await call_next(request))

        method = request.method
        path = request.url.path
        environment = os.getenv("ENVIRONMENT", "dev").lower()

        # Track in-flight requests
        http_requests_in_flight.labels(environment=environment).inc()

        start_time = time.time()

        try:
            # Process request
            response = await call_next(request)
            status = response.status_code

            # Record request metrics
            http_requests_total.labels(
                method=method,
                endpoint=path,
                status=str(status),
                environment=environment,
            ).inc()

            return cast(Response, response)

        except Exception as exc:
            # Track exceptions
            http_exceptions_total.labels(
                exception_type=type(exc).__name__,
                environment=environment,
            ).inc()
            http_requests_total.labels(
                method=method,
                endpoint=path,
                status="500",
                environment=environment,
            ).inc()
            raise

        finally:
            # Record duration
            duration = time.time() - start_time
            http_request_duration_seconds.labels(
                method=method,
                endpoint=path,
                environment=environment,
            ).observe(duration)

            # Decrement in-flight counter
            http_requests_in_flight.labels(environment=environment).dec()

dispatch(request, call_next) async

Process request and collect metrics.

Source code in packages/dataenginex/src/dataenginex/middleware/metrics_middleware.py
async def dispatch(self, request: Request, call_next: Callable[..., Any]) -> Response:
    """Process request and collect metrics."""
    # Skip metrics collection for the /metrics endpoint itself
    if request.url.path == "/metrics":
        return cast(Response, await call_next(request))

    method = request.method
    path = request.url.path
    environment = os.getenv("ENVIRONMENT", "dev").lower()

    # Track in-flight requests
    http_requests_in_flight.labels(environment=environment).inc()

    start_time = time.time()

    try:
        # Process request
        response = await call_next(request)
        status = response.status_code

        # Record request metrics
        http_requests_total.labels(
            method=method,
            endpoint=path,
            status=str(status),
            environment=environment,
        ).inc()

        return cast(Response, response)

    except Exception as exc:
        # Track exceptions
        http_exceptions_total.labels(
            exception_type=type(exc).__name__,
            environment=environment,
        ).inc()
        http_requests_total.labels(
            method=method,
            endpoint=path,
            status="500",
            environment=environment,
        ).inc()
        raise

    finally:
        # Record duration
        duration = time.time() - start_time
        http_request_duration_seconds.labels(
            method=method,
            endpoint=path,
            environment=environment,
        ).observe(duration)

        # Decrement in-flight counter
        http_requests_in_flight.labels(environment=environment).dec()

RequestLoggingMiddleware

Bases: BaseHTTPMiddleware

Middleware to log all HTTP requests with request ID tracking.

Source code in packages/dataenginex/src/dataenginex/middleware/request_logging.py
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """Middleware to log all HTTP requests with request ID tracking."""

    def __init__(self, app: Callable[..., Any]) -> None:
        """Initialize middleware."""
        super().__init__(app)
        self.logger = structlog.get_logger(__name__)

    async def dispatch(self, request: Request, call_next: Callable[..., Any]) -> Response:
        """Process request and add logging context."""
        # Generate request ID
        request_id = str(uuid.uuid4())
        request.state.request_id = request_id

        # Bind context for this request
        clear_contextvars()
        bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_host=request.client.host if request.client else None,
        )

        # Log request start
        start_time = time.time()
        self.logger.info(
            "request_started",
            query_params=dict(request.query_params),
        )

        try:
            # Process request
            response = await call_next(request)

            # Calculate duration
            duration = time.time() - start_time

            # Log successful response
            self.logger.info(
                "request_completed",
                status_code=response.status_code,
                duration_seconds=round(duration, 3),
            )

            # Add request ID to response headers
            response.headers["X-Request-ID"] = request_id

            return cast(Response, response)

        except Exception as exc:
            # Log error
            duration = time.time() - start_time
            self.logger.error(
                "request_failed",
                error=str(exc),
                error_type=type(exc).__name__,
                duration_seconds=round(duration, 3),
                exc_info=True,
            )
            raise

        finally:
            # Clear context
            clear_contextvars()

__init__(app)

Initialize middleware.

Source code in packages/dataenginex/src/dataenginex/middleware/request_logging.py
def __init__(self, app: Callable[..., Any]) -> None:
    """Initialize middleware."""
    super().__init__(app)
    self.logger = structlog.get_logger(__name__)

dispatch(request, call_next) async

Process request and add logging context.

Source code in packages/dataenginex/src/dataenginex/middleware/request_logging.py
async def dispatch(self, request: Request, call_next: Callable[..., Any]) -> Response:
    """Process request and add logging context."""
    # Generate request ID
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id

    # Bind context for this request
    clear_contextvars()
    bind_contextvars(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
        client_host=request.client.host if request.client else None,
    )

    # Log request start
    start_time = time.time()
    self.logger.info(
        "request_started",
        query_params=dict(request.query_params),
    )

    try:
        # Process request
        response = await call_next(request)

        # Calculate duration
        duration = time.time() - start_time

        # Log successful response
        self.logger.info(
            "request_completed",
            status_code=response.status_code,
            duration_seconds=round(duration, 3),
        )

        # Add request ID to response headers
        response.headers["X-Request-ID"] = request_id

        return cast(Response, response)

    except Exception as exc:
        # Log error
        duration = time.time() - start_time
        self.logger.error(
            "request_failed",
            error=str(exc),
            error_type=type(exc).__name__,
            duration_seconds=round(duration, 3),
            exc_info=True,
        )
        raise

    finally:
        # Clear context
        clear_contextvars()

configure_logging(log_level='INFO', json_logs=True)

Configure loguru + structlog for the application.

Parameters:

Name Type Description Default
log_level str

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

'INFO'
json_logs bool

If True, output JSON logs; otherwise use coloured console

True
Source code in packages/dataenginex/src/dataenginex/middleware/logging_config.py
def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
    """Configure loguru + structlog for the application.

    Args:
        log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        json_logs: If True, output JSON logs; otherwise use coloured console
    """
    # -- loguru sink ---------------------------------------------------------
    _loguru_logger.remove()  # Drop default stderr handler

    if json_logs:
        _loguru_logger.add(
            sys.stdout,
            level=log_level.upper(),
            serialize=True,  # JSON output
            backtrace=False,
            diagnose=False,
        )
    else:
        _loguru_logger.add(
            sys.stdout,
            level=log_level.upper(),
            format=(
                "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
                "<level>{level: <8}</level> | "
                "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
                "<level>{message}</level>"
            ),
            colorize=True,
        )

    # -- Intercept stdlib logging → loguru -----------------------------------
    logging.basicConfig(handlers=[_InterceptHandler()], level=0, force=True)

    # -- structlog (for FastAPI middleware / structured context) --------------
    processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        add_app_context,
        structlog.processors.StackInfoRenderer(),
    ]

    if json_logs:
        processors.append(structlog.processors.format_exc_info)
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer())

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, log_level.upper())),
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

get_logger(name)

Get a configured structlog logger instance.

Parameters:

Name Type Description Default
name str

Logger name (typically __name__ of the calling module)

required

Returns:

Type Description
BoundLogger

Configured structlog logger

Source code in packages/dataenginex/src/dataenginex/middleware/logging_config.py
def get_logger(name: str) -> structlog.stdlib.BoundLogger:
    """Get a configured structlog logger instance.

    Args:
        name: Logger name (typically ``__name__`` of the calling module)

    Returns:
        Configured structlog logger
    """
    return cast(structlog.stdlib.BoundLogger, structlog.get_logger(name))

get_metrics()

Generate Prometheus metrics in text format.

Returns:

Type Description
tuple[bytes, str]

Tuple of (metrics_data, content_type)

Source code in packages/dataenginex/src/dataenginex/middleware/metrics.py
def get_metrics() -> tuple[bytes, str]:
    """
    Generate Prometheus metrics in text format.

    Returns:
        Tuple of (metrics_data, content_type)
    """
    return generate_latest(REGISTRY), CONTENT_TYPE_LATEST

configure_tracing(service_name=APP_NAME, service_version=APP_VERSION, otlp_endpoint=None, enable_console_export=False)

Configure OpenTelemetry tracing.

Parameters:

Name Type Description Default
service_name str

Name of the service

APP_NAME
service_version str

Version of the service

APP_VERSION
otlp_endpoint str | None

OTLP collector endpoint (e.g., "http://localhost:4317")

None
enable_console_export bool

If True, print spans to console (for debugging)

False

Returns:

Type Description
TracerProvider

Configured TracerProvider

Source code in packages/dataenginex/src/dataenginex/middleware/tracing.py
def configure_tracing(
    service_name: str = APP_NAME,
    service_version: str = APP_VERSION,
    otlp_endpoint: str | None = None,
    enable_console_export: bool = False,
) -> TracerProvider:
    """
    Configure OpenTelemetry tracing.

    Args:
        service_name: Name of the service
        service_version: Version of the service
        otlp_endpoint: OTLP collector endpoint (e.g., "http://localhost:4317")
        enable_console_export: If True, print spans to console (for debugging)

    Returns:
        Configured TracerProvider
    """
    # Create resource with service information
    resource = Resource.create(
        {
            "service.name": service_name,
            "service.version": service_version,
            "deployment.environment": os.getenv("ENVIRONMENT", "dev"),
        }
    )

    # Create tracer provider
    provider = TracerProvider(resource=resource)

    # Add span processors
    if otlp_endpoint:
        # Export to OTLP collector (Jaeger, Tempo, etc.)
        # Remove http:// or https:// prefix for gRPC endpoint
        endpoint = otlp_endpoint.replace("http://", "").replace("https://", "")
        otlp_exporter = OTLPSpanExporter(endpoint=endpoint, insecure=True)
        provider.add_span_processor(BatchSpanProcessor(otlp_exporter))

    if enable_console_export:
        # Export to console for debugging
        console_exporter = ConsoleSpanExporter()
        provider.add_span_processor(BatchSpanProcessor(console_exporter))

    # Set as global tracer provider
    trace.set_tracer_provider(provider)

    return provider

get_tracer(name)

Get a tracer instance.

Parameters:

Name Type Description Default
name str

Tracer name (typically name of the calling module)

required

Returns:

Type Description
Tracer

Tracer instance

Source code in packages/dataenginex/src/dataenginex/middleware/tracing.py
def get_tracer(name: str) -> trace.Tracer:
    """
    Get a tracer instance.

    Args:
        name: Tracer name (typically __name__ of the calling module)

    Returns:
        Tracer instance
    """
    return trace.get_tracer(name)

instrument_fastapi(app)

Instrument FastAPI application with OpenTelemetry.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application instance

required
Source code in packages/dataenginex/src/dataenginex/middleware/tracing.py
def instrument_fastapi(app: FastAPI) -> None:
    """
    Instrument FastAPI application with OpenTelemetry.

    Args:
        app: FastAPI application instance
    """
    FastAPIInstrumentor.instrument_app(app)