Skip to content

dataenginex.plugins

DataEngineX plugin system — discovery, registration, and lifecycle.

Plugins register via entry_points(group="dataenginex.plugins") in their pyproject.toml. At runtime, call :func:discover to find and instantiate all installed plugins.

Example pyproject.toml::

[project.entry-points."dataenginex.plugins"]
myplugin = "mypackage.plugin:MyPlugin"

Usage::

from dataenginex.plugins import discover, PluginRegistry

registry = PluginRegistry()
for plugin in discover():
    registry.register(plugin)

DataEngineXPlugin

Bases: ABC

Interface that every DataEngineX plugin must implement.

Attributes:

Name Type Description
name str

Short identifier (e.g. "myplugin").

version str

SemVer string (e.g. "0.5.0").

Source code in src/dataenginex/plugins/registry.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class DataEngineXPlugin(abc.ABC):
    """Interface that every DataEngineX plugin must implement.

    Attributes:
        name: Short identifier (e.g. ``"myplugin"``).
        version: SemVer string (e.g. ``"0.5.0"``).
    """

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """Unique plugin name."""

    @property
    @abc.abstractmethod
    def version(self) -> str:
        """Plugin version string."""

    @abc.abstractmethod
    def health_check(self) -> dict[str, Any]:
        """Return health status.

        Returns:
            Dict with at least ``{"status": "healthy"|"degraded"|"unhealthy"}``.
        """

    def get_metrics(self) -> dict[str, Any]:
        """Return plugin-specific metrics.

        Override this to expose custom Prometheus-compatible metrics.
        Default returns an empty dict.
        """
        return {}

    def register_routes(self, app: Any) -> None:
        """Mount plugin-specific routes onto the FastAPI ``app``.

        Override this if the plugin exposes HTTP endpoints.
        Default is a no-op.
        """
        return  # noqa: B027  — intentionally optional, not abstract

name abstractmethod property

Unique plugin name.

version abstractmethod property

Plugin version string.

health_check() abstractmethod

Return health status.

Returns:

Type Description
dict[str, Any]

Dict with at least {"status": "healthy"|"degraded"|"unhealthy"}.

Source code in src/dataenginex/plugins/registry.py
48
49
50
51
52
53
54
@abc.abstractmethod
def health_check(self) -> dict[str, Any]:
    """Return health status.

    Returns:
        Dict with at least ``{"status": "healthy"|"degraded"|"unhealthy"}``.
    """

get_metrics()

Return plugin-specific metrics.

Override this to expose custom Prometheus-compatible metrics. Default returns an empty dict.

Source code in src/dataenginex/plugins/registry.py
56
57
58
59
60
61
62
def get_metrics(self) -> dict[str, Any]:
    """Return plugin-specific metrics.

    Override this to expose custom Prometheus-compatible metrics.
    Default returns an empty dict.
    """
    return {}

register_routes(app)

Mount plugin-specific routes onto the FastAPI app.

Override this if the plugin exposes HTTP endpoints. Default is a no-op.

Source code in src/dataenginex/plugins/registry.py
64
65
66
67
68
69
70
def register_routes(self, app: Any) -> None:
    """Mount plugin-specific routes onto the FastAPI ``app``.

    Override this if the plugin exposes HTTP endpoints.
    Default is a no-op.
    """
    return  # noqa: B027  — intentionally optional, not abstract

PluginRegistry

Manages discovered plugins and provides lookup + health aggregation.

Usage::

registry = PluginRegistry()
for p in discover():
    registry.register(p)

registry.get("myplugin")           # single lookup
registry.all()                     # all registered plugins
registry.health_check_all()        # aggregated health
Source code in src/dataenginex/plugins/registry.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class PluginRegistry:
    """Manages discovered plugins and provides lookup + health aggregation.

    Usage::

        registry = PluginRegistry()
        for p in discover():
            registry.register(p)

        registry.get("myplugin")           # single lookup
        registry.all()                     # all registered plugins
        registry.health_check_all()        # aggregated health
    """

    def __init__(self) -> None:
        self._plugins: dict[str, DataEngineXPlugin] = {}

    def register(self, plugin: DataEngineXPlugin) -> None:
        """Register a plugin instance.

        Raises:
            ValueError: If a plugin with the same name is already registered.
        """
        if plugin.name in self._plugins:
            msg = f"plugin already registered: {plugin.name}"
            raise ValueError(msg)
        self._plugins[plugin.name] = plugin
        logger.info("registered plugin", name=plugin.name, version=plugin.version)

    def get(self, name: str) -> DataEngineXPlugin | None:
        """Look up a plugin by name.  Returns ``None`` if not found."""
        return self._plugins.get(name)

    def all(self) -> list[DataEngineXPlugin]:
        """Return all registered plugins in registration order."""
        return list(self._plugins.values())

    def health_check_all(self) -> dict[str, dict[str, Any]]:
        """Run health checks across all registered plugins.

        Returns:
            ``{name: health_dict}`` for each plugin.
        """
        results: dict[str, dict[str, Any]] = {}
        for name, plugin in self._plugins.items():
            try:
                results[name] = plugin.health_check()
            except Exception as exc:
                logger.error("health check failed", plugin=name, exc=str(exc))
                results[name] = {"status": "unhealthy", "error": str(exc)}
        return results

    @property
    def count(self) -> int:
        """Number of registered plugins."""
        return len(self._plugins)

count property

Number of registered plugins.

register(plugin)

Register a plugin instance.

Raises:

Type Description
ValueError

If a plugin with the same name is already registered.

Source code in src/dataenginex/plugins/registry.py
132
133
134
135
136
137
138
139
140
141
142
def register(self, plugin: DataEngineXPlugin) -> None:
    """Register a plugin instance.

    Raises:
        ValueError: If a plugin with the same name is already registered.
    """
    if plugin.name in self._plugins:
        msg = f"plugin already registered: {plugin.name}"
        raise ValueError(msg)
    self._plugins[plugin.name] = plugin
    logger.info("registered plugin", name=plugin.name, version=plugin.version)

get(name)

Look up a plugin by name. Returns None if not found.

Source code in src/dataenginex/plugins/registry.py
144
145
146
def get(self, name: str) -> DataEngineXPlugin | None:
    """Look up a plugin by name.  Returns ``None`` if not found."""
    return self._plugins.get(name)

all()

Return all registered plugins in registration order.

Source code in src/dataenginex/plugins/registry.py
148
149
150
def all(self) -> list[DataEngineXPlugin]:
    """Return all registered plugins in registration order."""
    return list(self._plugins.values())

health_check_all()

Run health checks across all registered plugins.

Returns:

Type Description
dict[str, dict[str, Any]]

{name: health_dict} for each plugin.

Source code in src/dataenginex/plugins/registry.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def health_check_all(self) -> dict[str, dict[str, Any]]:
    """Run health checks across all registered plugins.

    Returns:
        ``{name: health_dict}`` for each plugin.
    """
    results: dict[str, dict[str, Any]] = {}
    for name, plugin in self._plugins.items():
        try:
            results[name] = plugin.health_check()
        except Exception as exc:
            logger.error("health check failed", plugin=name, exc=str(exc))
            results[name] = {"status": "unhealthy", "error": str(exc)}
    return results

discover()

Discover and instantiate all installed DataEngineX plugins.

Scans entry_points(group="dataenginex.plugins") and calls each entry point to get a plugin class, then instantiates it.

Returns:

Type Description
list[DataEngineXPlugin]

List of plugin instances. Broken plugins are logged and skipped.

Source code in src/dataenginex/plugins/registry.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def discover() -> list[DataEngineXPlugin]:
    """Discover and instantiate all installed DataEngineX plugins.

    Scans ``entry_points(group="dataenginex.plugins")`` and calls each
    entry point to get a plugin class, then instantiates it.

    Returns:
        List of plugin instances.  Broken plugins are logged and skipped.
    """
    plugins: list[DataEngineXPlugin] = []
    eps = entry_points(group=_ENTRY_POINT_GROUP)

    for ep in eps:
        try:
            plugin_cls = ep.load()
            instance = plugin_cls()
            if not isinstance(instance, DataEngineXPlugin):
                logger.warning(
                    "entry point does not implement DataEngineXPlugin — skipped",
                    plugin=ep.name,
                )
                continue
            plugins.append(instance)
            logger.info("discovered plugin", name=instance.name, version=instance.version)
        except Exception as exc:
            logger.error("failed to load plugin", plugin=ep.name, exc=str(exc))

    return plugins

get_package_version(package_name)

Return the installed version of package_name, or "0.0.0" if not found.

Source code in src/dataenginex/plugins/registry.py
15
16
17
18
19
20
def get_package_version(package_name: str) -> str:
    """Return the installed version of *package_name*, or ``"0.0.0"`` if not found."""
    try:
        return str(_pkg_version(package_name))
    except PackageNotFoundError:
        return "0.0.0"