-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: add SQLCommenter observability to database drivers (fixes #2899) #2913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1c2d286
ff2d6e2
10b26f7
c2ea6d7
381e0bc
4102ec5
d520d1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changes in this file should be reversed. It is redundant for this sqlcommenter. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,18 +31,16 @@ An editor configured to use the Cloud Spanner MCP server can use its AI capabili | |
|
|
||
| 2. Add the required inputs for your [instance](https://docs.cloud.google.com/spanner/docs/instances) in the configuration pop-up, then click "Save". You can update this configuration at any time in the "Configure" tab. | ||
|
|
||
| You'll now be able to see all enabled tools in the "Tools" tab. | ||
|
|
||
| > [!NOTE] | ||
| > If you encounter issues with Windows Defender blocking the execution, you may need to configure an allowlist. See [Configure exclusions for Microsoft Defender Antivirus](https://learn.microsoft.com/en-us/microsoft-365/security/defender-endpoint/configure-exclusions-microsoft-defender-antivirus?view=o365-worldwide) for more details. | ||
|
|
||
| ## Usage | ||
|
|
||
| Once configured, the MCP server will automatically provide Cloud Spanner capabilities to your AI assistant. You can: | ||
|
|
||
| * "Execute a DML query to update customer names." | ||
| * "List all tables in the `my-database`." | ||
| * "Execute a DQL query to select data from `orders` table." | ||
| - "Execute a DML query to update customer names." | ||
| - "List all tables in the `my-database`." | ||
| - "Execute a DQL query to select data from `orders` table." | ||
|
Comment on lines
+41
to
+43
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep to |
||
|
|
||
| ## Server Capabilities | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doc should not be included here. We can add sqlcommenter under |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,70 @@ | ||||||||
| # SQLCommenter Integration Overview | ||||||||
|
|
||||||||
| > [!NOTE] | ||||||||
| > This README follows the same clean formatting conventions as other documentation files (e.g., `SPANNER_README.md`). | ||||||||
|
|
||||||||
| The **SQLCommenter** observability layer has been added to every SQL-capable database source driver in the GenAI Toolbox. It automatically injects a comment containing useful metadata (tool name, controller, driver, OpenTelemetry trace) into every SQL statement executed by the toolbox. | ||||||||
|
|
||||||||
| ## What is injected? | ||||||||
| ```sql | ||||||||
| /*action='myTool',controller='myAgent',db_driver='postgres',traceparent='00-...-...-01'*/ SELECT ... | ||||||||
| ``` | ||||||||
| The comment is generated by `internal/sqlcommenter/commenter.go` and attached via the `RunSQL` method of each source. | ||||||||
|
|
||||||||
| ## Covered Drivers | ||||||||
| The following drivers have been instrumented with SQLCommenter. Note that for NoSQL and BigQuery, the implementation adapts to the database's native observation capabilities. | ||||||||
|
|
||||||||
| | Driver / Service | Source file | Injection Mechanism | | ||||||||
| |------------------|-------------|-------------------| | ||||||||
| | **AlloyDB Postgres** | `internal/sources/alloydbpg/alloydb_pg.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **BigQuery** | `internal/sources/bigquery/bigquery.go` | **Job Labels** (Metadata Tags) | | ||||||||
| | **Cassandra (CQL)** | `internal/sources/cassandra/cassandra.go` | CQL Comment (`/* ... */`) | | ||||||||
| | **ClickHouse** | `internal/sources/clickhouse/clickhouse.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **CloudSQL MSSQL** | `internal/sources/cloudsqlmssql/cloud_sql_mssql.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **CloudSQL MySQL** | `internal/sources/cloudsqlmysql/cloud_sql_mysql.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **CloudSQL Postgres** | `internal/sources/cloudsqlpg/cloud_sql_pg.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **CockroachDB** | `internal/sources/cockroachdb/cockroachdb.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Couchbase (N1QL)** | `internal/sources/couchbase/couchbase.go`<br>`internal/tools/couchbase/couchbase.go` | N1QL Comment (`/* ... */`) | | ||||||||
|
|
||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| | **Firebird** | `internal/sources/firebird/firebird.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **MindsDB** | `internal/sources/mindsdb/mindsdb.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **MSSQL** | `internal/sources/mssql/mssql.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **MySQL** | `internal/sources/mysql/mysql.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **OceanBase** | `internal/sources/oceanbase/oceanbase.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Oracle** | `internal/sources/oracle/oracle.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Postgres** | `internal/sources/postgres/postgres.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **SingleStore** | `internal/sources/singlestore/singlestore.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Snowflake** | `internal/sources/snowflake/snowflake.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Spanner** | `internal/sources/spanner/spanner.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **SQLite** | `internal/sources/sqlite/sqlite.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **TiDB** | `internal/sources/tidb/tidb.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **Trino** | `internal/sources/trino/trino.go` | SQL Comment (`/* ... */`) | | ||||||||
| | **YugabyteDB** | `internal/sources/yugabytedb/yugabytedb.go` | SQL Comment (`/* ... */`) | | ||||||||
|
|
||||||||
| ## Non-SQL Drivers (Exclusions) | ||||||||
| Drivers like **MongoDB, Firestore, Redis, Bigtable, and Dgraph** are currently excluded from SQLCommenter injection because they do not support standard leading block comments at the protocol level. For these drivers, observability is provided safely via standard OpenTelemetry traces. | ||||||||
|
|
||||||||
| ## How it works | ||||||||
| 1. **Context injection** – each `RunSQL` method calls `sqlcommenter.WithDBDriver` to store the driver name in the request context. | ||||||||
| 2. **Comment generation** – `sqlcommenter.AppendComment` reads the context (tool name, controller, driver, traceparent) and prepends the comment to the SQL statement. | ||||||||
| 3. **BigQuery job labels** – for BigQuery, the same metadata is attached to job labels as key-value pairs (e.g., `controller=mcp-toolbox`). | ||||||||
| 4. **Session Lifecycle Management** – The `sseManager` in `internal/server/mcp.go` now implements an `onRemove` callback to safely deregister agent identities when a session times out or is closed, preventing memory leaks in high-velocity environments. | ||||||||
|
|
||||||||
| ## Configuration & Identity Extraction | ||||||||
| The SQLCommenter metadata handles identity tracking using a strict "Automatic First, Static Second" priority sequence. | ||||||||
|
|
||||||||
| ### The `controller` Tag Priority | ||||||||
| The `controller` tag maps the query directly to the identity of the AI Agent executing the action. | ||||||||
|
|
||||||||
| 1. **Automatic MCP Extraction (Primary):** When an agent connects via the Model Context Protocol (MCP), the server natively traps the `clientInfo.name` payload from the initial handshake (e.g., "Mosa" or "Claude Desktop"). This is automatically injected into the context mapping for every database trace. **Zero manual configuration is required.** | ||||||||
| 2. **Environment Variable Fallback:** If the toolbox is deployed *without* an MCP client (such as a raw Dockerized API handling direct REST `curl` requests), the automatic extraction will understandably return empty. In these non-MCP architectures, operators can supply the `TOOLBOX_AGENT_NAME` environment variable to statically route queries to a named identity. | ||||||||
|
|
||||||||
| ### Other Environment Overrides | ||||||||
| - **`TOOLBOX_APP_NAME`**: Overrides the `application` tag value. Represents the overall framework running the queries. (Default: `mcp-toolbox`) | ||||||||
|
|
||||||||
| ## Verification | ||||||||
| - Unit tests in `internal/sqlcommenter/commenter_test.go` confirm the comment is added for Postgres, MySQL, and SQLite. | ||||||||
| - Build verification (`go build ./...`) ensures no breaking signature changes were introduced. | ||||||||
|
|
||||||||
| --- | ||||||||
| *Last updated on 2026‑04‑10 by Antigravity* | ||||||||
|
Comment on lines
+68
to
+70
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes should be reversed. We did not update the llamaindex sdk repo name~ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ import ( | |
| mcputil "github.com/googleapis/mcp-toolbox/internal/server/mcp/util" | ||
| v20241105 "github.com/googleapis/mcp-toolbox/internal/server/mcp/v20241105" | ||
| v20250326 "github.com/googleapis/mcp-toolbox/internal/server/mcp/v20250326" | ||
| "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" | ||
| "github.com/googleapis/mcp-toolbox/internal/util" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/attribute" | ||
|
|
@@ -56,6 +57,7 @@ type sseSession struct { | |
| type sseManager struct { | ||
| mu sync.Mutex | ||
| sseSessions map[string]*sseSession | ||
| onRemove func(id string) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? |
||
| } | ||
|
|
||
| func (m *sseManager) get(id string) (*sseSession, bool) { | ||
|
|
@@ -66,17 +68,21 @@ func (m *sseManager) get(id string) (*sseSession, bool) { | |
| // Be defensive: a nil session entry should be treated as unavailable. | ||
| if ok && session == nil { | ||
| delete(m.sseSessions, id) | ||
| if m.onRemove != nil { | ||
| m.onRemove(id) | ||
| } | ||
| } | ||
| return nil, false | ||
| } | ||
| session.lastActive = time.Now() | ||
| return session, true | ||
| } | ||
|
|
||
| func newSseManager(ctx context.Context) *sseManager { | ||
| func newSseManager(ctx context.Context, onRemove func(id string)) *sseManager { | ||
| sseM := &sseManager{ | ||
| mu: sync.Mutex{}, | ||
| sseSessions: make(map[string]*sseSession), | ||
| onRemove: onRemove, | ||
| } | ||
| go sseM.cleanupRoutine(ctx) | ||
| return sseM | ||
|
|
@@ -93,6 +99,9 @@ func (m *sseManager) remove(id string) { | |
| m.mu.Lock() | ||
| delete(m.sseSessions, id) | ||
| m.mu.Unlock() | ||
| if m.onRemove != nil { | ||
| m.onRemove(id) | ||
| } | ||
| } | ||
|
|
||
| func (m *sseManager) cleanupRoutine(ctx context.Context) { | ||
|
|
@@ -112,6 +121,9 @@ func (m *sseManager) cleanupRoutine(ctx context.Context) { | |
| for id, sess := range m.sseSessions { | ||
| if now.Sub(sess.lastActive) > timeout { | ||
| delete(m.sseSessions, id) | ||
| if m.onRemove != nil { | ||
| m.onRemove(id) | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
@@ -248,7 +260,8 @@ func (s *stdioSession) readInputStream(ctx context.Context) error { | |
|
|
||
| var v string | ||
| var res any | ||
| v, res, err = processMcpMessage(msgCtx, []byte(line), s.server, s.protocol, "", "", nil, "") | ||
| // stdio is a single persistent connection; use "stdio" as the session ID. | ||
| v, res, err = processMcpMessage(msgCtx, []byte(line), s.server, s.protocol, "stdio", "", "", nil, "") | ||
| if err != nil { | ||
| // errors during the processing of message will generate a valid MCP Error response. | ||
| // server can continue to run. | ||
|
|
@@ -536,7 +549,22 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { | |
|
|
||
| networkProtocolVersion := fmt.Sprintf("%d.%d", r.ProtoMajor, r.ProtoMinor) | ||
|
|
||
| v, res, err := processMcpMessage(ctx, body, s, protocolVersion, toolsetName, promptsetName, r.Header, networkProtocolVersion) | ||
| // Determine the effective session ID for clientName correlation. | ||
| // - SSE (v20241105): URL param ?sessionId=<uuid> present on every request. | ||
| // - v20250326+: Mcp-Session-Id header sent by the client after initialize. | ||
| // - New sessions (initialize): neither is set yet, so we pre-generate the | ||
| // UUID here so it can be stored during initialize AND returned in the | ||
| // response header in the same call — avoiding a one-request lag. | ||
| mcpSessionID := headerSessionId | ||
| if mcpSessionID == "" { | ||
| mcpSessionID = paramSessionId | ||
| } | ||
| if mcpSessionID == "" { | ||
| // New session: pre-generate so initialize can store clientName by it. | ||
| mcpSessionID = uuid.New().String() | ||
| } | ||
|
|
||
| v, res, err := processMcpMessage(ctx, body, s, protocolVersion, mcpSessionID, toolsetName, promptsetName, r.Header, networkProtocolVersion) | ||
| if err != nil { | ||
| s.logger.DebugContext(ctx, fmt.Errorf("error processing message: %w", err).Error()) | ||
| } | ||
|
|
@@ -549,9 +577,10 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { | |
| return | ||
| } | ||
|
|
||
| // for v20250326, add the `Mcp-Session-Id` header | ||
| // for v20250326, add the `Mcp-Session-Id` header using the pre-generated ID | ||
| // so the client can echo it back on subsequent requests for correlation. | ||
| if v == v20250326.PROTOCOL_VERSION { | ||
| sessionId = uuid.New().String() | ||
| sessionId = mcpSessionID | ||
| w.Header().Set("Mcp-Session-Id", sessionId) | ||
| } | ||
|
|
||
|
|
@@ -585,8 +614,13 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { | |
| render.JSON(w, r, res) | ||
| } | ||
|
|
||
| // processMcpMessage process the messages received from clients | ||
| func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVersion string, toolsetName string, promptsetName string, header http.Header, networkProtocolVersion string) (string, any, error) { | ||
| // processMcpMessage process the messages received from clients. | ||
| // sessionID is an opaque string that uniquely identifies the MCP session | ||
| // (SSE session UUID, Mcp-Session-Id header value, or "stdio"). It is used | ||
| // to correlate the clientInfo.name from the initialize handshake with | ||
| // subsequent tools/call requests so that each agent's queries are | ||
| // automatically tagged with the correct controller name. | ||
| func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVersion string, sessionID string, toolsetName string, promptsetName string, header http.Header, networkProtocolVersion string) (string, any, error) { | ||
| operationStart := time.Now() | ||
|
|
||
| logger, err := util.LoggerFromContext(ctx) | ||
|
|
@@ -718,7 +752,7 @@ func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVers | |
| // Process the method | ||
| switch baseMessage.Method { | ||
| case mcputil.INITIALIZE: | ||
| result, version, err := mcp.InitializeResponse(ctx, baseMessage.Id, body, s.version) | ||
| result, version, clientName, err := mcp.InitializeResponse(ctx, baseMessage.Id, body, s.version) | ||
| if err != nil { | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| if rpcErr, ok := result.(jsonrpc.JSONRPCError); ok { | ||
|
|
@@ -727,9 +761,21 @@ func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVers | |
| } | ||
| return "", result, err | ||
| } | ||
| // Persist the agent name so tools/call requests on the same session | ||
| // can inject it automatically as the SQLCommenter "controller" tag. | ||
| if clientName != "" && sessionID != "" { | ||
| s.mcpClientNames.Store(sessionID, clientName) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure if I like the idea of storing sessionIDs especially for streamableHTTP when MCP is trying to be stateless. |
||
| } | ||
| span.SetAttributes(attribute.String("mcp.protocol.version", version)) | ||
| return version, result, err | ||
| default: | ||
| // Inject the agent name captured during initialize into the context so | ||
| // AppendComment / JobLabels can tag queries with the correct controller. | ||
| if sessionID != "" { | ||
| if v, ok := s.mcpClientNames.Load(sessionID); ok { | ||
| ctx = sqlcommenter.WithAgentName(ctx, v.(string)) | ||
| } | ||
| } | ||
| toolset, ok := s.ResourceMgr.GetToolset(toolsetName) | ||
| if !ok { | ||
| err := fmt.Errorf("toolset does not exist") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reverse, this change is redundant~