|
| 1 | +from http import HTTPStatus |
| 2 | + |
| 3 | +from flask import request |
| 4 | +from flask_restx import Resource |
| 5 | + |
| 6 | +from mindsdb.api.http.utils import http_error |
| 7 | +from mindsdb.api.http.namespaces.configs.integrations import ns_conf |
| 8 | +from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy |
| 9 | +from mindsdb.integrations.libs.passthrough import PassthroughProtocol |
| 10 | +from mindsdb.integrations.libs.passthrough_types import ( |
| 11 | + ALLOWED_METHODS, |
| 12 | + FORBIDDEN_REQUEST_HEADERS, |
| 13 | + PassthroughError, |
| 14 | + PassthroughNotSupportedError, |
| 15 | + PassthroughRequest, |
| 16 | + PassthroughResponse, |
| 17 | + PassthroughValidationError, |
| 18 | +) |
| 19 | +from mindsdb.interfaces.database.integrations import integration_controller |
| 20 | +from mindsdb.metrics.metrics import api_endpoint_metrics |
| 21 | +from mindsdb.utilities import log |
| 22 | + |
| 23 | +logger = log.getLogger(__name__) |
| 24 | + |
| 25 | + |
| 26 | +def _handler_supports_passthrough(handler_module) -> bool: |
| 27 | + handler_cls = getattr(handler_module, "Handler", None) |
| 28 | + if handler_cls is None: |
| 29 | + return False |
| 30 | + # issubclass is the right check for Protocol when classes define the |
| 31 | + # methods as real methods (not just dynamic attrs); runtime_checkable |
| 32 | + # Protocols support issubclass in that mode. |
| 33 | + try: |
| 34 | + return issubclass(handler_cls, PassthroughProtocol) |
| 35 | + except TypeError: |
| 36 | + return False |
| 37 | + |
| 38 | + |
| 39 | +def _get_passthrough_handler(name: str): |
| 40 | + """Look up the datasource's handler and verify it satisfies the contract.""" |
| 41 | + proxy = FakeMysqlProxy() |
| 42 | + handler = proxy.session.integration_controller.get_data_handler(name) |
| 43 | + if not isinstance(handler, PassthroughProtocol): |
| 44 | + raise PassthroughNotSupportedError(f"datasource '{name}' does not support REST passthrough") |
| 45 | + return handler |
| 46 | + |
| 47 | + |
| 48 | +def _parse_passthrough_request(payload: dict) -> PassthroughRequest: |
| 49 | + if not isinstance(payload, dict): |
| 50 | + raise PassthroughValidationError("request body must be a JSON object") |
| 51 | + |
| 52 | + method = payload.get("method") |
| 53 | + path = payload.get("path") |
| 54 | + if not isinstance(method, str) or method.upper() not in ALLOWED_METHODS: |
| 55 | + raise PassthroughValidationError(f"'method' must be one of {sorted(ALLOWED_METHODS)}") |
| 56 | + if not isinstance(path, str) or not path.startswith("/"): |
| 57 | + raise PassthroughValidationError("'path' must be a string starting with '/'") |
| 58 | + |
| 59 | + headers = payload.get("headers") or {} |
| 60 | + if not isinstance(headers, dict): |
| 61 | + raise PassthroughValidationError("'headers' must be an object") |
| 62 | + for name in headers: |
| 63 | + if not isinstance(name, str): |
| 64 | + raise PassthroughValidationError("header names must be strings") |
| 65 | + if name.lower() in FORBIDDEN_REQUEST_HEADERS or name.lower().startswith("proxy-"): |
| 66 | + raise PassthroughValidationError(f"header '{name}' is not allowed in passthrough requests") |
| 67 | + |
| 68 | + query = payload.get("query") or {} |
| 69 | + if not isinstance(query, dict): |
| 70 | + raise PassthroughValidationError("'query' must be an object") |
| 71 | + |
| 72 | + return PassthroughRequest( |
| 73 | + method=method.upper(), |
| 74 | + path=path, |
| 75 | + query={str(k): str(v) for k, v in query.items()}, |
| 76 | + headers={str(k): str(v) for k, v in headers.items()}, |
| 77 | + body=payload.get("body"), |
| 78 | + ) |
| 79 | + |
| 80 | + |
| 81 | +def _serialize_response(resp: PassthroughResponse) -> dict: |
| 82 | + return { |
| 83 | + "status_code": resp.status_code, |
| 84 | + "headers": resp.headers, |
| 85 | + "body": resp.body, |
| 86 | + "content_type": resp.content_type, |
| 87 | + } |
| 88 | + |
| 89 | + |
| 90 | +def _passthrough_error_response(err: PassthroughError): |
| 91 | + return { |
| 92 | + "error_code": err.error_code, |
| 93 | + "message": str(err), |
| 94 | + }, err.http_status |
| 95 | + |
| 96 | + |
| 97 | +@ns_conf.route("/<name>/passthrough") |
| 98 | +@ns_conf.param("name", "Datasource name") |
| 99 | +class Passthrough(Resource): |
| 100 | + @ns_conf.doc("passthrough") |
| 101 | + @api_endpoint_metrics("POST", "/integrations/passthrough") |
| 102 | + def post(self, name: str): |
| 103 | + payload = request.json or {} |
| 104 | + try: |
| 105 | + req = _parse_passthrough_request(payload) |
| 106 | + handler = _get_passthrough_handler(name) |
| 107 | + response = handler.api_passthrough(req) |
| 108 | + except PassthroughError as e: |
| 109 | + return _passthrough_error_response(e) |
| 110 | + except Exception as e: # noqa: BLE001 |
| 111 | + logger.exception("passthrough failed for datasource %s", name) |
| 112 | + return http_error(HTTPStatus.INTERNAL_SERVER_ERROR, "PassthroughError", str(e)) |
| 113 | + |
| 114 | + return _serialize_response(response), 200 |
| 115 | + |
| 116 | + |
| 117 | +@ns_conf.route("/<name>/passthrough/test") |
| 118 | +@ns_conf.param("name", "Datasource name") |
| 119 | +class PassthroughTest(Resource): |
| 120 | + @ns_conf.doc("passthrough_test") |
| 121 | + @api_endpoint_metrics("POST", "/integrations/passthrough/test") |
| 122 | + def post(self, name: str): |
| 123 | + try: |
| 124 | + handler = _get_passthrough_handler(name) |
| 125 | + except PassthroughError as e: |
| 126 | + return _passthrough_error_response(e) |
| 127 | + except Exception as e: # noqa: BLE001 |
| 128 | + logger.exception("passthrough test lookup failed for datasource %s", name) |
| 129 | + return http_error(HTTPStatus.INTERNAL_SERVER_ERROR, "PassthroughError", str(e)) |
| 130 | + |
| 131 | + result = handler.test_passthrough() |
| 132 | + return result, 200 |
| 133 | + |
| 134 | + |
| 135 | +@ns_conf.route("/capabilities") |
| 136 | +class Capabilities(Resource): |
| 137 | + """Return structured passthrough capabilities per handler. |
| 138 | +
|
| 139 | + The new ``handlers`` dict is the canonical shape callers should migrate |
| 140 | + to. The legacy flat ``bearer_passthrough`` list is still populated for |
| 141 | + backward compat — Minds can migrate on its own timeline. |
| 142 | + """ |
| 143 | + |
| 144 | + @ns_conf.doc("integration_capabilities") |
| 145 | + @api_endpoint_metrics("GET", "/integrations/capabilities") |
| 146 | + def get(self): |
| 147 | + handlers: dict[str, dict] = {} |
| 148 | + bearer_engines: list[str] = [] |
| 149 | + handler_modules = getattr(integration_controller, "handler_modules", {}) or {} |
| 150 | + for engine, module in handler_modules.items(): |
| 151 | + try: |
| 152 | + if not _handler_supports_passthrough(module): |
| 153 | + continue |
| 154 | + handler_cls = getattr(module, "Handler", None) |
| 155 | + # Read the declarative auth mode off the handler class. Default |
| 156 | + # to "bearer" so protocol-only handlers that don't inherit the |
| 157 | + # mixin still land in a sensible bucket. |
| 158 | + auth_mode = getattr(handler_cls, "_auth_mode", "bearer") |
| 159 | + handlers[engine] = { |
| 160 | + "auth_modes": [auth_mode], |
| 161 | + "operations": ["passthrough"], |
| 162 | + } |
| 163 | + if auth_mode == "bearer": |
| 164 | + bearer_engines.append(engine) |
| 165 | + except Exception: |
| 166 | + # A broken handler module should not break the capabilities endpoint. |
| 167 | + logger.debug("skipping handler %s during capability probe", engine, exc_info=True) |
| 168 | + bearer_engines.sort() |
| 169 | + return { |
| 170 | + "handlers": handlers, |
| 171 | + # TODO: remove in v2 once Minds has migrated to the `handlers` |
| 172 | + # structured shape. Keep backward-compat for now. |
| 173 | + "bearer_passthrough": bearer_engines, |
| 174 | + }, 200 |
0 commit comments