Skip to content

dataenginex.middleware

Middleware — logging, metrics, and domain-level Prometheus counters.

Public API::

from dataenginex.middleware import configure_logging, get_logger
from dataenginex.middleware import get_metrics
from dataenginex.middleware.domain_metrics import pipeline_runs_total, ai_tokens_total

configure_logging(log_level='INFO', json_logs=True)

Configure structlog for the application.

Parameters:

Name Type Description Default
log_level str

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

'INFO'
json_logs bool

If True, output JSON logs; otherwise use coloured console

True
Source code in src/dataenginex/middleware/logging_config.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
    """Configure structlog for the application.

    Args:
        log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        json_logs: If True, output JSON logs; otherwise use coloured console
    """
    numeric_level = getattr(logging, log_level.upper(), logging.INFO)

    # Route all stdlib logging through our intercept handler.
    # Set stdlib root to WARNING so that structlog's own internal calls
    # (which may touch stdlib) don't re-enter the handler.
    logging.basicConfig(
        handlers=[_InterceptHandler()],
        level=numeric_level,
        force=True,
    )

    processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        add_app_context,
        structlog.processors.StackInfoRenderer(),
    ]

    if json_logs:
        processors.append(structlog.processors.format_exc_info)
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer())

    # Use PrintLoggerFactory (direct stdout) — NOT stdlib.LoggerFactory.
    # stdlib.LoggerFactory routes back through logging.Logger which triggers
    # _InterceptHandler again, causing infinite recursion.
    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(numeric_level),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

get_logger(name)

Get a configured structlog logger instance.

Parameters:

Name Type Description Default
name str

Logger name (typically __name__ of the calling module)

required

Returns:

Type Description
BoundLogger

Configured structlog logger

Source code in src/dataenginex/middleware/logging_config.py
122
123
124
125
126
127
128
129
130
131
def get_logger(name: str) -> structlog.stdlib.BoundLogger:
    """Get a configured structlog logger instance.

    Args:
        name: Logger name (typically ``__name__`` of the calling module)

    Returns:
        Configured structlog logger
    """
    return cast(structlog.stdlib.BoundLogger, structlog.get_logger(name))

get_metrics()

Generate Prometheus metrics in text format.

Returns:

Type Description
tuple[bytes, str]

Tuple of (metrics_data, content_type)

Source code in src/dataenginex/middleware/metrics.py
62
63
64
65
66
67
68
69
def get_metrics() -> tuple[bytes, str]:
    """
    Generate Prometheus metrics in text format.

    Returns:
        Tuple of (metrics_data, content_type)
    """
    return generate_latest(REGISTRY), CONTENT_TYPE_LATEST