diff --git a/README.md b/README.md index 8e4a1868b07f..84f36d795553 100644 --- a/README.md +++ b/README.md @@ -576,7 +576,7 @@ For more detailed instructions on using the Toolbox Core SDK, see the [project's README][toolbox-llamaindex-readme]. [toolbox-llamaindex]: https://pypi.org/project/toolbox-llamaindex/ - [toolbox-llamaindex-readme]: https://github.com/googleapis/genai-toolbox-llamaindex-python/blob/main/README.md + [toolbox-llamaindex-readme]: https://github.com/googleapis/mcp-toolbox-llamaindex-python/blob/main/README.md diff --git a/docs/SPANNER_README.md b/docs/SPANNER_README.md index 646699bf849a..5444e11e5f33 100644 --- a/docs/SPANNER_README.md +++ b/docs/SPANNER_README.md @@ -31,8 +31,6 @@ An editor configured to use the Cloud Spanner MCP server can use its AI capabili 2. Add the required inputs for your [instance](https://docs.cloud.google.com/spanner/docs/instances) in the configuration pop-up, then click "Save". You can update this configuration at any time in the "Configure" tab. -You'll now be able to see all enabled tools in the "Tools" tab. - > [!NOTE] > If you encounter issues with Windows Defender blocking the execution, you may need to configure an allowlist. See [Configure exclusions for Microsoft Defender Antivirus](https://learn.microsoft.com/en-us/microsoft-365/security/defender-endpoint/configure-exclusions-microsoft-defender-antivirus?view=o365-worldwide) for more details. @@ -40,9 +38,9 @@ You'll now be able to see all enabled tools in the "Tools" tab. Once configured, the MCP server will automatically provide Cloud Spanner capabilities to your AI assistant. You can: -* "Execute a DML query to update customer names." -* "List all tables in the `my-database`." -* "Execute a DQL query to select data from `orders` table." +- "Execute a DML query to update customer names." +- "List all tables in the `my-database`." +- "Execute a DQL query to select data from `orders` table." ## Server Capabilities diff --git a/docs/SQLCOMMENTER_README.md b/docs/SQLCOMMENTER_README.md new file mode 100644 index 000000000000..c5fb3f46bdf8 --- /dev/null +++ b/docs/SQLCOMMENTER_README.md @@ -0,0 +1,70 @@ +# SQLCommenter Integration Overview + +> [!NOTE] +> This README follows the same clean formatting conventions as other documentation files (e.g., `SPANNER_README.md`). + +The **SQLCommenter** observability layer has been added to every SQL-capable database source driver in the GenAI Toolbox. It automatically injects a comment containing useful metadata (tool name, controller, driver, OpenTelemetry trace) into every SQL statement executed by the toolbox. + +## What is injected? +```sql +/*action='myTool',controller='myAgent',db_driver='postgres',traceparent='00-...-...-01'*/ SELECT ... +``` +The comment is generated by `internal/sqlcommenter/commenter.go` and attached via the `RunSQL` method of each source. + +## Covered Drivers +The following drivers have been instrumented with SQLCommenter. Note that for NoSQL and BigQuery, the implementation adapts to the database's native observation capabilities. + +| Driver / Service | Source file | Injection Mechanism | +|------------------|-------------|-------------------| +| **AlloyDB Postgres** | `internal/sources/alloydbpg/alloydb_pg.go` | SQL Comment (`/* ... */`) | +| **BigQuery** | `internal/sources/bigquery/bigquery.go` | **Job Labels** (Metadata Tags) | +| **Cassandra (CQL)** | `internal/sources/cassandra/cassandra.go` | CQL Comment (`/* ... */`) | +| **ClickHouse** | `internal/sources/clickhouse/clickhouse.go` | SQL Comment (`/* ... */`) | +| **CloudSQL MSSQL** | `internal/sources/cloudsqlmssql/cloud_sql_mssql.go` | SQL Comment (`/* ... */`) | +| **CloudSQL MySQL** | `internal/sources/cloudsqlmysql/cloud_sql_mysql.go` | SQL Comment (`/* ... */`) | +| **CloudSQL Postgres** | `internal/sources/cloudsqlpg/cloud_sql_pg.go` | SQL Comment (`/* ... */`) | +| **CockroachDB** | `internal/sources/cockroachdb/cockroachdb.go` | SQL Comment (`/* ... */`) | +| **Couchbase (N1QL)** | `internal/sources/couchbase/couchbase.go`
`internal/tools/couchbase/couchbase.go` | N1QL Comment (`/* ... */`) | + +| **Firebird** | `internal/sources/firebird/firebird.go` | SQL Comment (`/* ... */`) | +| **MindsDB** | `internal/sources/mindsdb/mindsdb.go` | SQL Comment (`/* ... */`) | +| **MSSQL** | `internal/sources/mssql/mssql.go` | SQL Comment (`/* ... */`) | +| **MySQL** | `internal/sources/mysql/mysql.go` | SQL Comment (`/* ... */`) | +| **OceanBase** | `internal/sources/oceanbase/oceanbase.go` | SQL Comment (`/* ... */`) | +| **Oracle** | `internal/sources/oracle/oracle.go` | SQL Comment (`/* ... */`) | +| **Postgres** | `internal/sources/postgres/postgres.go` | SQL Comment (`/* ... */`) | +| **SingleStore** | `internal/sources/singlestore/singlestore.go` | SQL Comment (`/* ... */`) | +| **Snowflake** | `internal/sources/snowflake/snowflake.go` | SQL Comment (`/* ... */`) | +| **Spanner** | `internal/sources/spanner/spanner.go` | SQL Comment (`/* ... */`) | +| **SQLite** | `internal/sources/sqlite/sqlite.go` | SQL Comment (`/* ... */`) | +| **TiDB** | `internal/sources/tidb/tidb.go` | SQL Comment (`/* ... */`) | +| **Trino** | `internal/sources/trino/trino.go` | SQL Comment (`/* ... */`) | +| **YugabyteDB** | `internal/sources/yugabytedb/yugabytedb.go` | SQL Comment (`/* ... */`) | + +## Non-SQL Drivers (Exclusions) +Drivers like **MongoDB, Firestore, Redis, Bigtable, and Dgraph** are currently excluded from SQLCommenter injection because they do not support standard leading block comments at the protocol level. For these drivers, observability is provided safely via standard OpenTelemetry traces. + +## How it works +1. **Context injection** – each `RunSQL` method calls `sqlcommenter.WithDBDriver` to store the driver name in the request context. +2. **Comment generation** – `sqlcommenter.AppendComment` reads the context (tool name, controller, driver, traceparent) and prepends the comment to the SQL statement. +3. **BigQuery job labels** – for BigQuery, the same metadata is attached to job labels as key-value pairs (e.g., `controller=mcp-toolbox`). +4. **Session Lifecycle Management** – The `sseManager` in `internal/server/mcp.go` now implements an `onRemove` callback to safely deregister agent identities when a session times out or is closed, preventing memory leaks in high-velocity environments. + +## Configuration & Identity Extraction +The SQLCommenter metadata handles identity tracking using a strict "Automatic First, Static Second" priority sequence. + +### The `controller` Tag Priority +The `controller` tag maps the query directly to the identity of the AI Agent executing the action. + +1. **Automatic MCP Extraction (Primary):** When an agent connects via the Model Context Protocol (MCP), the server natively traps the `clientInfo.name` payload from the initial handshake (e.g., "Mosa" or "Claude Desktop"). This is automatically injected into the context mapping for every database trace. **Zero manual configuration is required.** +2. **Environment Variable Fallback:** If the toolbox is deployed *without* an MCP client (such as a raw Dockerized API handling direct REST `curl` requests), the automatic extraction will understandably return empty. In these non-MCP architectures, operators can supply the `TOOLBOX_AGENT_NAME` environment variable to statically route queries to a named identity. + +### Other Environment Overrides +- **`TOOLBOX_APP_NAME`**: Overrides the `application` tag value. Represents the overall framework running the queries. (Default: `mcp-toolbox`) + +## Verification +- Unit tests in `internal/sqlcommenter/commenter_test.go` confirm the comment is added for Postgres, MySQL, and SQLite. +- Build verification (`go build ./...`) ensures no breaking signature changes were introduced. + +--- +*Last updated on 2026‑04‑10 by Antigravity* diff --git a/docs/en/documentation/introduction/_index.md b/docs/en/documentation/introduction/_index.md index abf71f8bc11a..da155088aee7 100644 --- a/docs/en/documentation/introduction/_index.md +++ b/docs/en/documentation/introduction/_index.md @@ -281,7 +281,7 @@ For more detailed instructions on using the Toolbox LangChain SDK, see the {{% tab header="Llamaindex" lang="en" %}} Once you've installed the [Toolbox Llamaindex -SDK](https://github.com/googleapis/genai-toolbox-llamaindex-python), you can load +SDK](https://github.com/googleapis/mcp-toolbox-llamaindex-python), you can load tools: {{< highlight python >}} @@ -296,7 +296,7 @@ async with ToolboxClient("http://127.0.0.1:5000") as client: {{< /highlight >}} For more detailed instructions on using the Toolbox Llamaindex SDK, see the -[README](https://github.com/googleapis/genai-toolbox-llamaindex-python/blob/main/README.md). +[README](https://github.com/googleapis/mcp-toolbox-llamaindex-python/blob/main/README.md). {{% /tab %}} {{< /tabpane >}} diff --git a/internal/server/api.go b/internal/server/api.go index 5384734ed3e5..a6b2fd43b743 100644 --- a/internal/server/api.go +++ b/internal/server/api.go @@ -23,6 +23,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/render" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/parameters" @@ -234,6 +235,9 @@ func toolInvokeHandler(s *Server, w http.ResponseWriter, r *http.Request) { return } + // Inject tool name into context for SQLCommenter + ctx = sqlcommenter.WithToolName(ctx, toolName) + res, err := tool.Invoke(ctx, s.ResourceMgr, params, accessToken) // Determine what error to return to the users. diff --git a/internal/server/common_test.go b/internal/server/common_test.go index 2694566a6835..951ac781a47c 100644 --- a/internal/server/common_test.go +++ b/internal/server/common_test.go @@ -148,7 +148,7 @@ func setUpServer(t *testing.T, router string, tools map[string]tools.Tool, tools t.Fatalf("unable to create custom metrics: %s", err) } - sseManager := newSseManager(ctx) + sseManager := newSseManager(ctx, nil) resourceManager := resources.NewResourceManager(nil, nil, nil, tools, toolsets, prompts, promptsets) diff --git a/internal/server/mcp.go b/internal/server/mcp.go index ffcb5e57ad95..9fed7fee5a15 100644 --- a/internal/server/mcp.go +++ b/internal/server/mcp.go @@ -36,6 +36,7 @@ import ( mcputil "github.com/googleapis/mcp-toolbox/internal/server/mcp/util" v20241105 "github.com/googleapis/mcp-toolbox/internal/server/mcp/v20241105" v20250326 "github.com/googleapis/mcp-toolbox/internal/server/mcp/v20250326" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -56,6 +57,7 @@ type sseSession struct { type sseManager struct { mu sync.Mutex sseSessions map[string]*sseSession + onRemove func(id string) } func (m *sseManager) get(id string) (*sseSession, bool) { @@ -66,6 +68,9 @@ func (m *sseManager) get(id string) (*sseSession, bool) { // Be defensive: a nil session entry should be treated as unavailable. if ok && session == nil { delete(m.sseSessions, id) + if m.onRemove != nil { + m.onRemove(id) + } } return nil, false } @@ -73,10 +78,11 @@ func (m *sseManager) get(id string) (*sseSession, bool) { return session, true } -func newSseManager(ctx context.Context) *sseManager { +func newSseManager(ctx context.Context, onRemove func(id string)) *sseManager { sseM := &sseManager{ mu: sync.Mutex{}, sseSessions: make(map[string]*sseSession), + onRemove: onRemove, } go sseM.cleanupRoutine(ctx) return sseM @@ -93,6 +99,9 @@ func (m *sseManager) remove(id string) { m.mu.Lock() delete(m.sseSessions, id) m.mu.Unlock() + if m.onRemove != nil { + m.onRemove(id) + } } func (m *sseManager) cleanupRoutine(ctx context.Context) { @@ -112,6 +121,9 @@ func (m *sseManager) cleanupRoutine(ctx context.Context) { for id, sess := range m.sseSessions { if now.Sub(sess.lastActive) > timeout { delete(m.sseSessions, id) + if m.onRemove != nil { + m.onRemove(id) + } } } }() @@ -248,7 +260,8 @@ func (s *stdioSession) readInputStream(ctx context.Context) error { var v string var res any - v, res, err = processMcpMessage(msgCtx, []byte(line), s.server, s.protocol, "", "", nil, "") + // stdio is a single persistent connection; use "stdio" as the session ID. + v, res, err = processMcpMessage(msgCtx, []byte(line), s.server, s.protocol, "stdio", "", "", nil, "") if err != nil { // errors during the processing of message will generate a valid MCP Error response. // server can continue to run. @@ -536,7 +549,22 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { networkProtocolVersion := fmt.Sprintf("%d.%d", r.ProtoMajor, r.ProtoMinor) - v, res, err := processMcpMessage(ctx, body, s, protocolVersion, toolsetName, promptsetName, r.Header, networkProtocolVersion) + // Determine the effective session ID for clientName correlation. + // - SSE (v20241105): URL param ?sessionId= present on every request. + // - v20250326+: Mcp-Session-Id header sent by the client after initialize. + // - New sessions (initialize): neither is set yet, so we pre-generate the + // UUID here so it can be stored during initialize AND returned in the + // response header in the same call — avoiding a one-request lag. + mcpSessionID := headerSessionId + if mcpSessionID == "" { + mcpSessionID = paramSessionId + } + if mcpSessionID == "" { + // New session: pre-generate so initialize can store clientName by it. + mcpSessionID = uuid.New().String() + } + + v, res, err := processMcpMessage(ctx, body, s, protocolVersion, mcpSessionID, toolsetName, promptsetName, r.Header, networkProtocolVersion) if err != nil { s.logger.DebugContext(ctx, fmt.Errorf("error processing message: %w", err).Error()) } @@ -549,9 +577,10 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { return } - // for v20250326, add the `Mcp-Session-Id` header + // for v20250326, add the `Mcp-Session-Id` header using the pre-generated ID + // so the client can echo it back on subsequent requests for correlation. if v == v20250326.PROTOCOL_VERSION { - sessionId = uuid.New().String() + sessionId = mcpSessionID w.Header().Set("Mcp-Session-Id", sessionId) } @@ -585,8 +614,13 @@ func httpHandler(s *Server, w http.ResponseWriter, r *http.Request) { render.JSON(w, r, res) } -// processMcpMessage process the messages received from clients -func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVersion string, toolsetName string, promptsetName string, header http.Header, networkProtocolVersion string) (string, any, error) { +// processMcpMessage process the messages received from clients. +// sessionID is an opaque string that uniquely identifies the MCP session +// (SSE session UUID, Mcp-Session-Id header value, or "stdio"). It is used +// to correlate the clientInfo.name from the initialize handshake with +// subsequent tools/call requests so that each agent's queries are +// automatically tagged with the correct controller name. +func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVersion string, sessionID string, toolsetName string, promptsetName string, header http.Header, networkProtocolVersion string) (string, any, error) { operationStart := time.Now() logger, err := util.LoggerFromContext(ctx) @@ -718,7 +752,7 @@ func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVers // Process the method switch baseMessage.Method { case mcputil.INITIALIZE: - result, version, err := mcp.InitializeResponse(ctx, baseMessage.Id, body, s.version) + result, version, clientName, err := mcp.InitializeResponse(ctx, baseMessage.Id, body, s.version) if err != nil { span.SetStatus(codes.Error, err.Error()) if rpcErr, ok := result.(jsonrpc.JSONRPCError); ok { @@ -727,9 +761,21 @@ func processMcpMessage(ctx context.Context, body []byte, s *Server, protocolVers } return "", result, err } + // Persist the agent name so tools/call requests on the same session + // can inject it automatically as the SQLCommenter "controller" tag. + if clientName != "" && sessionID != "" { + s.mcpClientNames.Store(sessionID, clientName) + } span.SetAttributes(attribute.String("mcp.protocol.version", version)) return version, result, err default: + // Inject the agent name captured during initialize into the context so + // AppendComment / JobLabels can tag queries with the correct controller. + if sessionID != "" { + if v, ok := s.mcpClientNames.Load(sessionID); ok { + ctx = sqlcommenter.WithAgentName(ctx, v.(string)) + } + } toolset, ok := s.ResourceMgr.GetToolset(toolsetName) if !ok { err := fmt.Errorf("toolset does not exist") diff --git a/internal/server/mcp/mcp.go b/internal/server/mcp/mcp.go index a47feaf22a5b..5a9452fd8f8c 100644 --- a/internal/server/mcp/mcp.go +++ b/internal/server/mcp/mcp.go @@ -47,13 +47,19 @@ var SUPPORTED_PROTOCOL_VERSIONS = []string{ // InitializeResponse runs capability negotiation and protocol version agreement. // This is the Initialization phase of the lifecycle for MCP client-server connections. // Always start with the latest protocol version supported. -func InitializeResponse(ctx context.Context, id jsonrpc.RequestId, body []byte, toolboxVersion string) (any, string, error) { +// Returns (response, protocolVersion, clientName, error). +// clientName is taken from clientInfo.name in the initialize request and +// identifies the MCP agent (e.g. "sales-agent"). It is empty when the client +// does not send clientInfo. +func InitializeResponse(ctx context.Context, id jsonrpc.RequestId, body []byte, toolboxVersion string) (any, string, string, error) { var req mcputil.InitializeRequest if err := json.Unmarshal(body, &req); err != nil { err = fmt.Errorf("invalid mcp initialize request: %w", err) - return jsonrpc.NewError(id, jsonrpc.INVALID_REQUEST, err.Error(), nil), "", err + return jsonrpc.NewError(id, jsonrpc.INVALID_REQUEST, err.Error(), nil), "", "", err } + clientName := req.Params.ClientInfo.Name + var protocolVersion string v := req.Params.ProtocolVersion if slices.Contains(SUPPORTED_PROTOCOL_VERSIONS, v) { @@ -87,7 +93,7 @@ func InitializeResponse(ctx context.Context, id jsonrpc.RequestId, body []byte, Result: result, } - return res, protocolVersion, nil + return res, protocolVersion, clientName, nil } // NotificationHandler process notifications request. It MUST NOT send a response. diff --git a/internal/server/mcp/v20241105/method.go b/internal/server/mcp/v20241105/method.go index efcbab26f64c..fdbe4f58fbbe 100644 --- a/internal/server/mcp/v20241105/method.go +++ b/internal/server/mcp/v20241105/method.go @@ -26,6 +26,7 @@ import ( "github.com/googleapis/mcp-toolbox/internal/prompts" "github.com/googleapis/mcp-toolbox/internal/server/mcp/jsonrpc" "github.com/googleapis/mcp-toolbox/internal/server/resources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/parameters" @@ -223,6 +224,8 @@ func toolsCallHandler(ctx context.Context, id jsonrpc.RequestId, resourceMgr *re // run tool invocation and generate response. executionStart := time.Now() + // Inject tool name into context for SQLCommenter + ctx = sqlcommenter.WithToolName(ctx, toolName) results, err := tool.Invoke(ctx, resourceMgr, params, accessToken) executionDuration := time.Since(executionStart).Seconds() diff --git a/internal/server/mcp/v20250326/method.go b/internal/server/mcp/v20250326/method.go index 1d4292f38467..92782ac898f9 100644 --- a/internal/server/mcp/v20250326/method.go +++ b/internal/server/mcp/v20250326/method.go @@ -26,6 +26,7 @@ import ( "github.com/googleapis/mcp-toolbox/internal/prompts" "github.com/googleapis/mcp-toolbox/internal/server/mcp/jsonrpc" "github.com/googleapis/mcp-toolbox/internal/server/resources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/parameters" @@ -223,6 +224,8 @@ func toolsCallHandler(ctx context.Context, id jsonrpc.RequestId, resourceMgr *re // run tool invocation and generate response. executionStart := time.Now() + // Inject tool name into context for SQLCommenter + ctx = sqlcommenter.WithToolName(ctx, toolName) results, err := tool.Invoke(ctx, resourceMgr, params, accessToken) executionDuration := time.Since(executionStart).Seconds() diff --git a/internal/server/mcp/v20250618/method.go b/internal/server/mcp/v20250618/method.go index 529bd90e3870..71521afa150d 100644 --- a/internal/server/mcp/v20250618/method.go +++ b/internal/server/mcp/v20250618/method.go @@ -26,6 +26,7 @@ import ( "github.com/googleapis/mcp-toolbox/internal/prompts" "github.com/googleapis/mcp-toolbox/internal/server/mcp/jsonrpc" "github.com/googleapis/mcp-toolbox/internal/server/resources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/parameters" @@ -216,6 +217,8 @@ func toolsCallHandler(ctx context.Context, id jsonrpc.RequestId, resourceMgr *re // run tool invocation and generate response. executionStart := time.Now() + // Inject tool name into context for SQLCommenter + ctx = sqlcommenter.WithToolName(ctx, toolName) results, err := tool.Invoke(ctx, resourceMgr, params, accessToken) executionDuration := time.Since(executionStart).Seconds() diff --git a/internal/server/mcp/v20251125/method.go b/internal/server/mcp/v20251125/method.go index 68fd5c3bfd57..5139c2139597 100644 --- a/internal/server/mcp/v20251125/method.go +++ b/internal/server/mcp/v20251125/method.go @@ -26,6 +26,7 @@ import ( "github.com/googleapis/mcp-toolbox/internal/prompts" "github.com/googleapis/mcp-toolbox/internal/server/mcp/jsonrpc" "github.com/googleapis/mcp-toolbox/internal/server/resources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/parameters" @@ -216,6 +217,8 @@ func toolsCallHandler(ctx context.Context, id jsonrpc.RequestId, resourceMgr *re // run tool invocation and generate response. executionStart := time.Now() + // Inject tool name into context for SQLCommenter + ctx = sqlcommenter.WithToolName(ctx, toolName) results, err := tool.Invoke(ctx, resourceMgr, params, accessToken) executionDuration := time.Since(executionStart).Seconds() diff --git a/internal/server/mcp_test.go b/internal/server/mcp_test.go index b5d4a5b1acac..93e052cd9595 100644 --- a/internal/server/mcp_test.go +++ b/internal/server/mcp_test.go @@ -1122,7 +1122,7 @@ func TestStdioSession(t *testing.T) { t.Fatalf("unable to create custom metrics: %s", err) } - sseManager := newSseManager(ctx) + sseManager := newSseManager(ctx, nil) resourceManager := resources.NewResourceManager(nil, nil, nil, toolsMap, toolsets, promptsMap, promptsets) @@ -1173,7 +1173,7 @@ func TestSseManagerGetNonExistentSession(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m := newSseManager(ctx) + m := newSseManager(ctx, nil) // Must not panic when session ID doesn't exist in the map. session, ok := m.get("non-existent-id") @@ -1189,7 +1189,7 @@ func TestSseManagerGetNilSessionValue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m := newSseManager(ctx) + m := newSseManager(ctx, nil) m.sseSessions["nil-session-id"] = nil session, ok := m.get("nil-session-id") diff --git a/internal/server/server.go b/internal/server/server.go index f0ccad606515..81db46205ae2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -26,6 +26,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/go-chi/chi/v5" @@ -59,6 +60,11 @@ type Server struct { sseManager *sseManager ResourceMgr *resources.ResourceManager mcpPrmFile string + // mcpClientNames maps MCP session ID → clientInfo.name captured during the + // initialize handshake. It allows tools/call requests (which arrive as + // separate HTTP requests) to inject the originating agent name as the + // SQLCommenter "controller" tag without requiring a static env var. + mcpClientNames sync.Map } func InitializeConfigs(ctx context.Context, cfg ServerConfig) ( @@ -373,8 +379,6 @@ func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { addr := net.JoinHostPort(cfg.Address, strconv.Itoa(cfg.Port)) srv := &http.Server{Addr: addr, Handler: r} - sseManager := newSseManager(ctx) - resourceManager := resources.NewResourceManager(sourcesMap, authServicesMap, embeddingModelsMap, toolsMap, toolsetsMap, promptsMap, promptsetsMap) s := &Server{ @@ -383,12 +387,16 @@ func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { root: r, logger: l, instrumentation: instrumentation, - sseManager: sseManager, ResourceMgr: resourceManager, toolboxUrl: cfg.ToolboxUrl, mcpPrmFile: cfg.McpPrmFile, } + sseManager := newSseManager(ctx, func(id string) { + s.mcpClientNames.Delete(id) + }) + s.sseManager = sseManager + // cors if slices.Contains(cfg.AllowedOrigins, "*") { s.logger.WarnContext(ctx, "wildcard (`*`) allows all origin to access the resource and is not secure. Use it with cautious for public, non-sensitive data, or during local development. Recommended to use `--allowed-origins` flag") @@ -464,7 +472,7 @@ func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { } r.Mount("/mcp", mcpR) - if cfg.EnableAPI { + if cfg.EnableAPI || cfg.UI { apiR, err := apiRouter(s) if err != nil { return nil, err diff --git a/internal/sources/alloydbpg/alloydb_pg.go b/internal/sources/alloydbpg/alloydb_pg.go index c7617ae824d7..ecae5faff3d4 100644 --- a/internal/sources/alloydbpg/alloydb_pg.go +++ b/internal/sources/alloydbpg/alloydb_pg.go @@ -24,7 +24,6 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" "github.com/googleapis/mcp-toolbox/internal/util" - "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/trace" ) @@ -103,30 +102,7 @@ func (s *Source) PostgresPool() *pgxpool.Pool { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { - results, err := s.Pool.Query(ctx, statement, params...) - if err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - defer results.Close() - - fields := results.FieldDescriptions() - var out []any - for results.Next() { - v, err := results.Values() - if err != nil { - return nil, fmt.Errorf("unable to parse row: %w", err) - } - row := orderedmap.Row{} - for i, f := range fields { - row.Add(f.Name, v[i]) - } - out = append(out, row) - } - // this will catch actual query execution errors - if err := results.Err(); err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - return out, nil + return sources.RunSQLWithPgxQueryer(ctx, s.PostgresPool(), statement, params, "pgx") } func getOpts(ipType, userAgent string, useIAM bool) ([]alloydbconn.Option, error) { diff --git a/internal/sources/bigquery/bigquery.go b/internal/sources/bigquery/bigquery.go index 73e11fdbd2da..a87f2989a908 100644 --- a/internal/sources/bigquery/bigquery.go +++ b/internal/sources/bigquery/bigquery.go @@ -28,6 +28,7 @@ import ( dataplexapi "cloud.google.com/go/dataplex/apiv1" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" @@ -573,6 +574,12 @@ func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, state query.ConnectionProperties = connProps } + // Attach SQLCommenter-derived labels so queries are visible in BigQuery + // job history grouped by tool name, application, and framework. + if labels := sqlcommenter.JobLabels(ctx); len(labels) > 0 { + query.Labels = labels + } + // This block handles SELECT statements, which return a row set. // We iterate through the results, convert each row into a map of // column names to values, and return the collection of rows. diff --git a/internal/sources/cassandra/cassandra.go b/internal/sources/cassandra/cassandra.go index 9e82c22f8e56..ae27e6eddddb 100644 --- a/internal/sources/cassandra/cassandra.go +++ b/internal/sources/cassandra/cassandra.go @@ -21,6 +21,7 @@ import ( gocql "github.com/apache/cassandra-gocql-driver/v2" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util/parameters" "go.opentelemetry.io/otel/trace" ) @@ -95,6 +96,11 @@ func (s *Source) SourceType() string { } func (s *Source) RunSQL(ctx context.Context, statement string, params parameters.ParamValues) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "cassandra") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + sliceParams := params.AsSlice() iter := s.CassandraSession().Query(statement, sliceParams...).IterContext(ctx) diff --git a/internal/sources/clickhouse/clickhouse.go b/internal/sources/clickhouse/clickhouse.go index 7389c4814ab9..46e79e8af2e6 100644 --- a/internal/sources/clickhouse/clickhouse.go +++ b/internal/sources/clickhouse/clickhouse.go @@ -24,6 +24,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util/parameters" "go.opentelemetry.io/otel/trace" ) @@ -101,6 +102,11 @@ func (s *Source) ClickHousePool() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params parameters.ParamValues) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "clickhouse") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + var sliceParams []any if params != nil { sliceParams = params.AsSlice() diff --git a/internal/sources/cloudsqlmssql/cloud_sql_mssql.go b/internal/sources/cloudsqlmssql/cloud_sql_mssql.go index 8a3605265acf..d461c0fbd5b7 100644 --- a/internal/sources/cloudsqlmssql/cloud_sql_mssql.go +++ b/internal/sources/cloudsqlmssql/cloud_sql_mssql.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/cloudsqlconn/sqlserver/mssql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "go.opentelemetry.io/otel/trace" @@ -108,6 +109,11 @@ func (s *Source) MSSQLDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mssql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.MSSQLDB().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/cloudsqlmysql/cloud_sql_mysql.go b/internal/sources/cloudsqlmysql/cloud_sql_mysql.go index cce65db37497..6924bc7a2d67 100644 --- a/internal/sources/cloudsqlmysql/cloud_sql_mysql.go +++ b/internal/sources/cloudsqlmysql/cloud_sql_mysql.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/cloudsqlconn/mysql/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools/mysql/mysqlcommon" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" @@ -107,6 +108,11 @@ func (s *Source) MySQLDatabase() string { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mysql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.MySQLPool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/cloudsqlpg/cloud_sql_pg.go b/internal/sources/cloudsqlpg/cloud_sql_pg.go index 818e2271cb29..1c531e28d724 100644 --- a/internal/sources/cloudsqlpg/cloud_sql_pg.go +++ b/internal/sources/cloudsqlpg/cloud_sql_pg.go @@ -23,7 +23,6 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" "github.com/googleapis/mcp-toolbox/internal/util" - "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/trace" ) @@ -109,30 +108,7 @@ func (s *Source) PostgresPool() *pgxpool.Pool { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { - results, err := s.PostgresPool().Query(ctx, statement, params...) - if err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - defer results.Close() - - fields := results.FieldDescriptions() - var out []any - for results.Next() { - values, err := results.Values() - if err != nil { - return nil, fmt.Errorf("unable to parse row: %w", err) - } - row := orderedmap.Row{} - for i, f := range fields { - row.Add(f.Name, values[i]) - } - out = append(out, row) - } - // this will catch actual query execution errors - if err := results.Err(); err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - return out, nil + return sources.RunSQLWithPgxQueryer(ctx, s.PostgresPool(), statement, params, "pgx") } func getConnectionConfig(ctx context.Context, user, pass, dbname string) (string, bool, error) { diff --git a/internal/sources/cockroachdb/cockroachdb.go b/internal/sources/cockroachdb/cockroachdb.go index 0838b13e7401..cb66783bcdd1 100644 --- a/internal/sources/cockroachdb/cockroachdb.go +++ b/internal/sources/cockroachdb/cockroachdb.go @@ -25,7 +25,7 @@ import ( "strings" "time" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" + "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" "github.com/googleapis/mcp-toolbox/internal/util" @@ -144,6 +144,10 @@ func (s *Source) ExecuteTxWithRetry(ctx context.Context, fn func(pgx.Tx) error) return crdbpgx.ExecuteTx(ctx, s.Pool, pgx.TxOptions{}, fn) } +func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + return sources.RunSQLWithPgxQueryer(ctx, s, statement, params, "pgx") +} + // Query executes a query using the connection pool with MCP security enforcement. // For read-only queries, connection-level retry is sufficient. // For write operations requiring transaction retry, use ExecuteTxWithRetry directly. diff --git a/internal/sources/couchbase/couchbase.go b/internal/sources/couchbase/couchbase.go index 60c0fdbbcb3d..37db4aa4f129 100644 --- a/internal/sources/couchbase/couchbase.go +++ b/internal/sources/couchbase/couchbase.go @@ -25,6 +25,7 @@ import ( tlsutil "github.com/couchbase/tools-common/http/tls" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util/parameters" "go.opentelemetry.io/otel/trace" ) @@ -112,7 +113,12 @@ func (s *Source) CouchbaseQueryScanConsistency() uint { return s.QueryScanConsistency } -func (s *Source) RunSQL(statement string, params parameters.ParamValues) (any, error) { +func (s *Source) RunSQL(ctx context.Context, statement string, params parameters.ParamValues) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "couchbase") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.CouchbaseScope().Query(statement, &gocb.QueryOptions{ ScanConsistency: gocb.QueryScanConsistency(s.CouchbaseQueryScanConsistency()), NamedParameters: params.AsMap(), diff --git a/internal/sources/dgraph/dgraph.go b/internal/sources/dgraph/dgraph.go index 983d4959ab4d..42ce713ba93b 100644 --- a/internal/sources/dgraph/dgraph.go +++ b/internal/sources/dgraph/dgraph.go @@ -115,7 +115,9 @@ func (s *Source) DgraphClient() *DgraphClient { return s.Client } -func (s *Source) RunSQL(statement string, params parameters.ParamValues, isQuery bool, timeout string) (any, error) { +func (s *Source) RunSQL(ctx context.Context, statement string, params parameters.ParamValues, isQuery bool, timeout string) (any, error) { + // Dgraph does not support SQL block comments, so we explicitly skip injecting SQLCommenter metadata here. + paramsMap := params.AsMapWithDollarPrefix() resp, err := s.DgraphClient().ExecuteQuery(statement, paramsMap, isQuery, timeout) if err != nil { diff --git a/internal/sources/firebird/firebird.go b/internal/sources/firebird/firebird.go index 9429e3d0ce01..3a065c1ae8c0 100644 --- a/internal/sources/firebird/firebird.go +++ b/internal/sources/firebird/firebird.go @@ -22,9 +22,9 @@ import ( "github.com/goccy/go-yaml" _ "github.com/nakagami/firebirdsql" - "go.opentelemetry.io/otel/trace" - "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" + "go.opentelemetry.io/otel/trace" ) const SourceType string = "firebird" @@ -97,6 +97,11 @@ func (s *Source) FirebirdDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "firebird") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + rows, err := s.FirebirdDB().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/mindsdb/mindsdb.go b/internal/sources/mindsdb/mindsdb.go index f0abefb972ce..d746853b1dc7 100644 --- a/internal/sources/mindsdb/mindsdb.go +++ b/internal/sources/mindsdb/mindsdb.go @@ -23,6 +23,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools/mysql/mysqlcommon" "go.opentelemetry.io/otel/trace" ) @@ -103,6 +104,11 @@ func (s *Source) MySQLPool() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mysql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + // MindsDB now supports MySQL prepared statements natively results, err := s.MindsDBPool().QueryContext(ctx, statement, params...) if err != nil { diff --git a/internal/sources/mssql/mssql.go b/internal/sources/mssql/mssql.go index 06c33af691fb..1e39c1c84b83 100644 --- a/internal/sources/mssql/mssql.go +++ b/internal/sources/mssql/mssql.go @@ -22,6 +22,7 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" _ "github.com/microsoft/go-mssqldb" @@ -106,6 +107,11 @@ func (s *Source) MSSQLDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mssql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.MSSQLDB().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/mysql/mysql.go b/internal/sources/mysql/mysql.go index 477c9a982826..ea23dd26bce6 100644 --- a/internal/sources/mysql/mysql.go +++ b/internal/sources/mysql/mysql.go @@ -23,6 +23,7 @@ import ( driver "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools/mysql/mysqlcommon" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" @@ -106,6 +107,11 @@ func (s *Source) MySQLDatabase() string { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mysql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.MySQLPool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/oceanbase/oceanbase.go b/internal/sources/oceanbase/oceanbase.go index fbeb03d42524..bd49e591338b 100644 --- a/internal/sources/oceanbase/oceanbase.go +++ b/internal/sources/oceanbase/oceanbase.go @@ -23,6 +23,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools/mysql/mysqlcommon" "go.opentelemetry.io/otel/trace" ) @@ -99,6 +100,11 @@ func (s *Source) OceanBasePool() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mysql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.OceanBasePool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/oracle/oci_cgo.go b/internal/sources/oracle/oci_cgo.go new file mode 100644 index 000000000000..1976b20f1a8b --- /dev/null +++ b/internal/sources/oracle/oci_cgo.go @@ -0,0 +1,21 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build cgo + +package oracle + +import ( + _ "github.com/godror/godror" // OCI driver (requires CGO) +) diff --git a/internal/sources/oracle/oci_pure.go b/internal/sources/oracle/oci_pure.go new file mode 100644 index 000000000000..a7f12916e73a --- /dev/null +++ b/internal/sources/oracle/oci_pure.go @@ -0,0 +1,19 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package oracle + +import ( + _ "github.com/sijms/go-ora/v2" // Pure Go driver (always included) +) diff --git a/internal/sources/oracle/oracle.go b/internal/sources/oracle/oracle.go index 30d1332a7fb5..58f312cf5061 100644 --- a/internal/sources/oracle/oracle.go +++ b/internal/sources/oracle/oracle.go @@ -15,6 +15,7 @@ import ( _ "github.com/sijms/go-ora/v2" // Pure Go driver "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" "go.opentelemetry.io/otel/trace" ) @@ -138,6 +139,11 @@ func (s *Source) OracleDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any, readOnly bool) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "oracle") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + if !readOnly { result, err := s.OracleDB().ExecContext(ctx, statement, params...) if err != nil { diff --git a/internal/sources/pgx_util.go b/internal/sources/pgx_util.go new file mode 100644 index 000000000000..b831ddc22c70 --- /dev/null +++ b/internal/sources/pgx_util.go @@ -0,0 +1,63 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sources + +import ( + "context" + "fmt" + + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" + "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" + "github.com/jackc/pgx/v5" +) + +// PgxQueryer abstracts connection pools and wrapper classes that can execute native Pgx queries. +type PgxQueryer interface { + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) +} + +// RunSQLWithPgxQueryer executes a standard SQL statement with SQLCommenter telemetry +// across any driver natively supporting the pgx execution interface. +func RunSQLWithPgxQueryer(ctx context.Context, queryer PgxQueryer, statement string, params []any, driver string) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, driver) + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + + results, err := queryer.Query(ctx, statement, params...) + if err != nil { + return nil, fmt.Errorf("unable to execute query: %w", err) + } + defer results.Close() + + fields := results.FieldDescriptions() + var out []any + for results.Next() { + values, err := results.Values() + if err != nil { + return nil, fmt.Errorf("unable to parse row: %w", err) + } + row := orderedmap.Row{} + for i, f := range fields { + row.Add(f.Name, values[i]) + } + out = append(out, row) + } + // this will catch actual query execution errors + if err := results.Err(); err != nil { + return nil, fmt.Errorf("unable to execute query: %w", err) + } + return out, nil +} diff --git a/internal/sources/postgres/postgres.go b/internal/sources/postgres/postgres.go index 86c7235ac53f..eb3ed6527adb 100644 --- a/internal/sources/postgres/postgres.go +++ b/internal/sources/postgres/postgres.go @@ -23,7 +23,6 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" "github.com/googleapis/mcp-toolbox/internal/util" - "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/trace" @@ -102,30 +101,7 @@ func (s *Source) PostgresPool() *pgxpool.Pool { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { - results, err := s.PostgresPool().Query(ctx, statement, params...) - if err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - defer results.Close() - - fields := results.FieldDescriptions() - var out []any - for results.Next() { - values, err := results.Values() - if err != nil { - return nil, fmt.Errorf("unable to parse row: %w", err) - } - row := orderedmap.Row{} - for i, f := range fields { - row.Add(f.Name, values[i]) - } - out = append(out, row) - } - // this will catch actual query execution errors - if err := results.Err(); err != nil { - return nil, fmt.Errorf("unable to execute query: %w", err) - } - return out, nil + return sources.RunSQLWithPgxQueryer(ctx, s.PostgresPool(), statement, params, "pgx") } func initPostgresConnectionPool(ctx context.Context, tracer trace.Tracer, name, host, port, user, pass, dbname string, queryParams map[string]string, queryExecMode string) (*pgxpool.Pool, error) { diff --git a/internal/sources/singlestore/singlestore.go b/internal/sources/singlestore/singlestore.go index 971808d908d6..9b7889d4d52a 100644 --- a/internal/sources/singlestore/singlestore.go +++ b/internal/sources/singlestore/singlestore.go @@ -24,6 +24,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools/mysql/mysqlcommon" "go.opentelemetry.io/otel/trace" ) @@ -108,6 +109,11 @@ func (s *Source) SingleStorePool() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "mysql") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.SingleStorePool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/snowflake/snowflake.go b/internal/sources/snowflake/snowflake.go index 408bbc1a3759..1635316449cb 100644 --- a/internal/sources/snowflake/snowflake.go +++ b/internal/sources/snowflake/snowflake.go @@ -20,6 +20,7 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/jmoiron/sqlx" _ "github.com/snowflakedb/gosnowflake" "go.opentelemetry.io/otel/trace" @@ -98,6 +99,11 @@ func (s *Source) SnowflakeDB() *sqlx.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "snowflake") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + rows, err := s.DB.QueryxContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/spanner/spanner.go b/internal/sources/spanner/spanner.go index 37ea52fcd0ae..d7a72eb434ac 100644 --- a/internal/sources/spanner/spanner.go +++ b/internal/sources/spanner/spanner.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/spanner" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "go.opentelemetry.io/otel/trace" @@ -138,6 +139,11 @@ func processRows(iter *spanner.RowIterator) ([]any, error) { } func (s *Source) RunSQL(ctx context.Context, readOnly bool, statement string, params map[string]any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "spanner") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + var results []any var err error var opErr error diff --git a/internal/sources/sqlite/sqlite.go b/internal/sources/sqlite/sqlite.go index 8c15ceed45b8..d9591b0c17ea 100644 --- a/internal/sources/sqlite/sqlite.go +++ b/internal/sources/sqlite/sqlite.go @@ -22,6 +22,7 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util/orderedmap" "go.opentelemetry.io/otel/trace" _ "modernc.org/sqlite" // Pure Go SQLite driver @@ -94,6 +95,11 @@ func (s *Source) SQLiteDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "sqlite") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + // Execute the SQL query with parameters rows, err := s.SQLiteDB().QueryContext(ctx, statement, params...) if err != nil { diff --git a/internal/sources/tidb/tidb.go b/internal/sources/tidb/tidb.go index 3ab2b5cc2dc4..84063493d9a7 100644 --- a/internal/sources/tidb/tidb.go +++ b/internal/sources/tidb/tidb.go @@ -24,6 +24,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "go.opentelemetry.io/otel/trace" ) @@ -106,6 +107,11 @@ func (s *Source) TiDBPool() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "tidb") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.TiDBPool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/trino/trino.go b/internal/sources/trino/trino.go index 722bc2d214cf..e4f6e0149111 100644 --- a/internal/sources/trino/trino.go +++ b/internal/sources/trino/trino.go @@ -25,6 +25,7 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/util" trinogo "github.com/trinodb/trino-go-client/trino" "go.opentelemetry.io/otel/trace" @@ -109,6 +110,11 @@ func (s *Source) TrinoDB() *sql.DB { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "trino") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.TrinoDB().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) diff --git a/internal/sources/yugabytedb/yugabytedb.go b/internal/sources/yugabytedb/yugabytedb.go index 10d1471723a8..57dce1c63b54 100644 --- a/internal/sources/yugabytedb/yugabytedb.go +++ b/internal/sources/yugabytedb/yugabytedb.go @@ -20,6 +20,7 @@ import ( "github.com/goccy/go-yaml" "github.com/googleapis/mcp-toolbox/internal/sources" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/yugabyte/pgx/v5/pgxpool" "go.opentelemetry.io/otel/trace" ) @@ -100,6 +101,11 @@ func (s *Source) YugabyteDBPool() *pgxpool.Pool { } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { + // Inject the database driver into the context for SQLCommenter + ctx = sqlcommenter.WithDBDriver(ctx, "pgx") + // Decorate the statement with SQLCommenter metadata from the context + statement = sqlcommenter.AppendComment(ctx, statement) + results, err := s.YugabyteDBPool().Query(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) @@ -120,7 +126,6 @@ func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (an out = append(out, vMap) } - // this will catch actual query execution errors if err := results.Err(); err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) } diff --git a/internal/sqlcommenter/README.md b/internal/sqlcommenter/README.md new file mode 100644 index 000000000000..7a48fb54dbcd --- /dev/null +++ b/internal/sqlcommenter/README.md @@ -0,0 +1,116 @@ +# sqlcommenter + +Internal package that annotates outgoing SQL queries and BigQuery jobs with +observability metadata, following the [SQLCommenter specification][spec]. + +The annotations make every query visible in **Cloud SQL Insights** grouped by +the MCP tool that issued it, the service that owns the toolbox instance, and +the framework version. + +--- + +## How it works + +### Postgres / Cloud SQL Postgres / AlloyDB / MySQL / Cloud SQL MySQL + +A trailing comment is appended to the SQL string before it reaches the driver: + +```sql +SELECT * FROM orders WHERE id = $1 +/*action='get-orders',application='my-service',controller='sales-agent',db_driver='pgx',framework='mcp-toolbox'*/ +``` + +Cloud SQL Insights indexes these keys and lets you filter and group queries by +`action` (tool name), `controller` (agent name), `application` (service name), +`db_driver`, and `framework` in the Insights dashboard. + +### BigQuery + +BigQuery ignores SQL comments for observability purposes, so labels are attached +to the **QueryJobConfig** instead. They are visible in `INFORMATION_SCHEMA.JOBS` +and the BigQuery console under job details: + +``` +labels.action = "get-orders" +labels.application = "my-service" +labels.framework = "mcp-toolbox" +``` + +--- + +## Controller tag — how the agent name is resolved + +The `controller` tag identifies which AI agent issued the query. It is +resolved automatically with this priority (highest → lowest): + +1. **Dynamic — MCP `clientInfo.name`** (automatic, zero config) + When an MCP client connects it sends an `initialize` message containing + `clientInfo: { name: "sales-agent", version: "1.0" }`. The toolbox server + captures this name per session and injects it into the context before + every `tools/call`. Multiple agents connecting to the same toolbox + instance each get their own `controller` tag with no configuration. + +2. **Static — `TOOLBOX_AGENT_NAME` env var** (fallback for non-MCP callers) + Set this when all callers share a single agent identity, or when calling + the REST API directly rather than via MCP. + +3. **Tool name** (last resort) + If neither of the above is set, `controller` mirrors `action` (the tool + name), so at least the query is still distinguishable. + +## Configuration + +| Environment variable | Purpose | Default | +|---|---|---| +| `TOOLBOX_APP_NAME` | The `application` tag — identifies the service running the toolbox (e.g. `order-service`). | `mcp-toolbox` | +| `TOOLBOX_AGENT_NAME` | Static fallback for `controller` when MCP `clientInfo.name` is not available. | *(tool name)* | + +--- + +## Package API + +```go +// --- Tool layer (called in each tool's Invoke) --- +ctx = sqlcommenter.WithToolName(ctx, t.Name) // sets "action" +ctx = sqlcommenter.WithDBDriver(ctx, "pgx") // sets "db_driver"; "mysql" for MySQL + +// Append a SQLCommenter comment (Postgres / MySQL path). +stmt := sqlcommenter.AppendComment(ctx, sql) + +// Build BigQuery job labels (BigQuery source RunSQL path). +query.Labels = sqlcommenter.JobLabels(ctx) + +// --- Server layer (called automatically; no manual wiring needed) --- +ctx = sqlcommenter.WithAgentName(ctx, clientInfo.Name) // injected per session +``` + +--- + +| `internal/sources/pgx_util.go` | **Centralized Handler** — Executes SQL with SQLCommenter telemetry for all Postgres-based drivers | +| `internal/server/mcp.go` | Stores `clientName` per session on `initialize`; handles **onRemove** session cleanup to prevent leaks | +| `internal/server/server.go` | Initializes `mcpClientNames` and lifecycle callbacks for memory safety | + +Cloud SQL Postgres, Cloud SQL MySQL, and AlloyDB are covered automatically — +they reuse the same `postgres-execute-sql` / `mysql-execute-sql` tool types, +only backed by different source drivers. + +--- + +## Comment format + +Keys and values are **percent-encoded** (RFC 3986 via `net/url.QueryEscape`) +and emitted in **lexicographic key order**, matching the SQLCommenter spec. +A trailing semicolon in the original SQL is preserved — the comment is placed +before it so the statement remains syntactically valid. + +Keys emitted per driver: + +| Key | Value | Postgres/MySQL | BigQuery | +|---|---|:---:|:---:| +| `action` | Tool name (e.g. `get-orders`) | ✓ comment | ✓ label | +| `application` | `$TOOLBOX_APP_NAME` | ✓ comment | ✓ label | +| `controller` | Agent name (auto from MCP) → `$TOOLBOX_AGENT_NAME` → tool name | ✓ comment | — | +| `db_driver` | `pgx` or `mysql` | ✓ comment | — | +| `framework` | `mcp-toolbox` | ✓ comment | ✓ label | + +[spec]: https://google.github.io/sqlcommenter/ diff --git a/internal/sqlcommenter/commenter.go b/internal/sqlcommenter/commenter.go new file mode 100644 index 000000000000..9351e919a389 --- /dev/null +++ b/internal/sqlcommenter/commenter.go @@ -0,0 +1,228 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sqlcommenter appends SQLCommenter-formatted comments to SQL statements +// so that Cloud SQL Insights can group and filter queries by agent tool, service, +// and framework. For BigQuery it exposes helpers to build Job Labels instead, +// because BigQuery surfaces observability data through job metadata rather than +// SQL comments. +// +// SQLCommenter spec: https://google.github.io/sqlcommenter/ +// Cloud SQL Insights recognised keys: action, application, controller, +// db_driver, framework, route. +package sqlcommenter + +import ( + "context" + "fmt" + "net/url" + "os" + "sort" + "strings" + + "go.opentelemetry.io/otel/trace" +) + +// Context key type — unexported to avoid collisions with other packages. +type contextKey int + +const ( + keyToolName contextKey = iota + keyDBDriver // e.g. "pgx" or "mysql" + keyAgentName // MCP clientInfo.name captured at initialize time +) + +// WithToolName stores the MCP tool name in the context so that AppendComment +// can retrieve it without callers having to thread it through every call. +func WithToolName(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, keyToolName, name) +} + +// WithDBDriver stores the database driver identifier (e.g. "pgx", "mysql") in +// the context. +func WithDBDriver(ctx context.Context, driver string) context.Context { + return context.WithValue(ctx, keyDBDriver, driver) +} + +// WithAgentName stores the MCP agent name in the context. It is set +// automatically by the server layer from clientInfo.name in the MCP +// initialize handshake, so individual tools do not need to call this. +// It is used as the "controller" tag, overriding TOOLBOX_AGENT_NAME. +func WithAgentName(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, keyAgentName, name) +} + +// appName returns the value of TOOLBOX_APP_NAME, falling back to "mcp-toolbox" +// when the variable is not set. +func appName() string { + if v := os.Getenv("TOOLBOX_APP_NAME"); v != "" { + return v + } + return "mcp-toolbox" +} + +// agentName returns the value of TOOLBOX_AGENT_NAME, which is used as the +// "controller" tag in SQLCommenter comments. This identifies the AI agent +// (e.g. "sales-agent", "support-bot") rather than the individual tool action. +// Falls back to an empty string when unset, which causes the tag to be omitted. +func agentName() string { + return os.Getenv("TOOLBOX_AGENT_NAME") +} + +// encode percent-encodes a value according to the SQLCommenter spec (RFC 3986). +func encode(v string) string { + return strings.ReplaceAll(url.QueryEscape(v), "+", "%20") +} + +// AppendComment appends a SQLCommenter comment to sql. The comment carries the +// tool name (action), application name (application), database driver +// (db_driver), and framework. +// +// If the SQL statement already ends with a semicolon the comment is inserted +// before it so that the statement remains syntactically valid. +// +// Example output: +// +// SELECT * FROM orders WHERE id = $1 +// /*action='get-orders',application='my-service',db_driver='pgx',framework='mcp-toolbox'*/ +func AppendComment(ctx context.Context, sql string) string { + tags := map[string]string{ + "framework": "mcp-toolbox", + "application": appName(), + } + + if v, ok := ctx.Value(keyToolName).(string); ok && v != "" { + tags["action"] = v + } + // controller resolution priority (highest → lowest): + // 1. Dynamic agent name from MCP clientInfo.name (set by server layer) + // 2. Static TOOLBOX_AGENT_NAME env var + // 3. Tool name (action) as fallback + switch { + case ctx.Value(keyAgentName) != nil: + if v := ctx.Value(keyAgentName).(string); v != "" { + tags["controller"] = v + } + case agentName() != "": + tags["controller"] = agentName() + default: + if v, ok := ctx.Value(keyToolName).(string); ok && v != "" { + tags["controller"] = v + } + } + if v, ok := ctx.Value(keyDBDriver).(string); ok && v != "" { + tags["db_driver"] = v + } + + // Add traceparent for OpenTelemetry distributed tracing correlation + if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { + tags["traceparent"] = fmt.Sprintf("00-%s-%s-%s", + spanCtx.TraceID().String(), + spanCtx.SpanID().String(), + spanCtx.TraceFlags().String(), + ) + } + + // Build key=value pairs in deterministic (sorted) order. + keys := make([]string, 0, len(tags)) + for k := range tags { + keys = append(keys, k) + } + sort.Strings(keys) + + parts := make([]string, 0, len(tags)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s='%s'", encode(k), encode(tags[k]))) + } + + comment := "/*" + strings.Join(parts, ",") + "*/" + + // Preserve a trailing semicolon — place the comment before it. + trimmed := strings.TrimRight(sql, " \t\r\n") + if strings.HasSuffix(trimmed, ";") { + return trimmed[:len(trimmed)-1] + "\n" + comment + ";" + } + return sql + "\n" + comment +} + + +// JobLabels returns a map of BigQuery Job Labels built from the context values +// set by WithToolName / WithDBDriver, plus fixed framework/application keys. +// Pass the result to query.Labels before calling query.Run(). +// +// Key resolution matches AppendComment to ensure consistent observability. +func JobLabels(ctx context.Context) map[string]string { + labels := map[string]string{ + "framework": "mcp-toolbox", + "application": sanitizeLabelValue(appName()), + "db_driver": "bigquery", + } + + if v, ok := ctx.Value(keyToolName).(string); ok && v != "" { + labels["action"] = sanitizeLabelValue(v) + } + + // controller resolution priority (highest → lowest): + // 1. Dynamic agent name from MCP clientInfo.name (set by server layer) + // 2. Static TOOLBOX_AGENT_NAME env var + // 3. Tool name (action) as fallback + switch { + case ctx.Value(keyAgentName) != nil: + if v := ctx.Value(keyAgentName).(string); v != "" { + labels["controller"] = sanitizeLabelValue(v) + } + case agentName() != "": + labels["controller"] = sanitizeLabelValue(agentName()) + default: + if v, ok := ctx.Value(keyToolName).(string); ok && v != "" { + labels["controller"] = sanitizeLabelValue(v) + } + } + + // Add traceparent for BigQuery Job Label correlation + if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { + labels["traceparent"] = sanitizeLabelValue(fmt.Sprintf("00-%s-%s-%s", + spanCtx.TraceID().String(), + spanCtx.SpanID().String(), + spanCtx.TraceFlags().String(), + )) + } + + return labels +} + +// sanitizeLabelValue converts a string to a valid BigQuery label value. +// BigQuery label values must be lowercase letters, numbers, hyphens, and +// underscores; at most 63 characters. +func sanitizeLabelValue(v string) string { + v = strings.ToLower(v) + var b strings.Builder + for _, r := range v { + switch { + case r >= 'a' && r <= 'z': + b.WriteRune(r) + case r >= '0' && r <= '9': + b.WriteRune(r) + case r == '-' || r == '_': + b.WriteRune(r) + default: + b.WriteRune('-') + } + } + s := b.String() + if len(s) > 63 { + s = s[:63] + } + return s +} diff --git a/internal/sqlcommenter/commenter_test.go b/internal/sqlcommenter/commenter_test.go new file mode 100644 index 000000000000..7f7f1b34a31c --- /dev/null +++ b/internal/sqlcommenter/commenter_test.go @@ -0,0 +1,72 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlcommenter + +import ( + "context" + "strings" + "testing" +) + +func testContext() context.Context { + ctx := context.Background() + ctx = WithToolName(ctx, "testTool") + ctx = WithAgentName(ctx, "testAgent") + return ctx +} + +func TestAppendComment_Postgres(t *testing.T) { + ctx := testContext() + ctx = WithDBDriver(ctx, "postgres") + + sql := "SELECT * FROM users WHERE id = 1" + got := AppendComment(ctx, sql) + + // The query should start with the original sql and end with the comment block, + // or the comment block should be prepended before the semicolon/end. + // We'll just verify the comment string is present. + expectedComment := "/*action='testTool',application='mcp-toolbox',controller='testAgent',db_driver='postgres',framework='mcp-toolbox'*/" + + if !strings.Contains(got, expectedComment) { + t.Fatalf("expected comment %s not found in got=%s", expectedComment, got) + } +} + +func TestAppendComment_NoSQL_Cassandra(t *testing.T) { + ctx := testContext() + ctx = WithDBDriver(ctx, "cassandra") + + sql := "SELECT * FROM keyspace.table" + got := AppendComment(ctx, sql) + + expectedComment := "/*action='testTool',application='mcp-toolbox',controller='testAgent',db_driver='cassandra',framework='mcp-toolbox'*/" + + if !strings.Contains(got, expectedComment) { + t.Fatalf("expected comment %s not found in got=%s", expectedComment, got) + } +} + +func TestAppendComment_NoMetadata(t *testing.T) { + // Need to check what happens when only appName and framework are present (they are defaults). + ctx := context.Background() + sql := "SELECT 1" + got := AppendComment(ctx, sql) + + if !strings.Contains(got, "/*application='mcp-toolbox',framework='mcp-toolbox'*/") { + t.Fatalf("expected basic default comment, got=%s", got) + } +} + + diff --git a/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go b/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go index 4fdc89908835..c7c1089b5008 100644 --- a/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go +++ b/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go @@ -26,6 +26,7 @@ import ( "github.com/googleapis/mcp-toolbox/internal/embeddingmodels" "github.com/googleapis/mcp-toolbox/internal/sources" bigqueryds "github.com/googleapis/mcp-toolbox/internal/sources/bigquery" + "github.com/googleapis/mcp-toolbox/internal/sqlcommenter" "github.com/googleapis/mcp-toolbox/internal/tools" bqutil "github.com/googleapis/mcp-toolbox/internal/tools/bigquery/bigquerycommon" "github.com/googleapis/mcp-toolbox/internal/util" @@ -282,6 +283,11 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para return nil, util.NewClientServerError("error getting logger", http.StatusInternalServerError, err) } logger.DebugContext(ctx, fmt.Sprintf("executing `%s` tool query: %s", resourceType, sql)) + + // Embed the tool name in the context so RunSQL can attach it as a BigQuery + // Job Label for Cloud SQL Insights / BigQuery observability. + ctx = sqlcommenter.WithToolName(ctx, t.Name) + resp, err := source.RunSQL(ctx, bqClient, sql, statementType, nil, connProps) if err != nil { return nil, util.NewClientServerError("error running sql", http.StatusInternalServerError, err) diff --git a/internal/tools/couchbase/couchbase.go b/internal/tools/couchbase/couchbase.go index 3a6e9b4b11e4..81e7c031fc8a 100644 --- a/internal/tools/couchbase/couchbase.go +++ b/internal/tools/couchbase/couchbase.go @@ -46,7 +46,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T type compatibleSource interface { CouchbaseScope() *gocb.Scope - RunSQL(string, parameters.ParamValues) (any, error) + RunSQL(context.Context, string, parameters.ParamValues) (any, error) } type Config struct { @@ -114,7 +114,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para return nil, util.NewAgentError("unable to extract standard params", err) } - resp, err := source.RunSQL(newStatement, newParams) + resp, err := source.RunSQL(ctx, newStatement, newParams) if err != nil { return nil, util.ProcessGeneralError(err) } diff --git a/internal/tools/dgraph/dgraph.go b/internal/tools/dgraph/dgraph.go index 8d2aa8dd70d1..fae1b5597615 100644 --- a/internal/tools/dgraph/dgraph.go +++ b/internal/tools/dgraph/dgraph.go @@ -46,7 +46,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T type compatibleSource interface { DgraphClient() *dgraph.DgraphClient - RunSQL(string, parameters.ParamValues, bool, string) (any, error) + RunSQL(context.Context, string, parameters.ParamValues, bool, string) (any, error) } type Config struct { @@ -100,7 +100,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para if err != nil { return nil, util.NewClientServerError("source used is not compatible with the tool", http.StatusInternalServerError, err) } - resp, err := source.RunSQL(t.Statement, params, t.IsQuery, t.Timeout) + resp, err := source.RunSQL(ctx, t.Statement, params, t.IsQuery, t.Timeout) if err != nil { return nil, util.ProcessGeneralError(err) } diff --git a/internal/tools/mysql/mysqlexecutesql/mysqlexecutesql.go b/internal/tools/mysql/mysqlexecutesql/mysqlexecutesql.go index 18fae0c2d4f3..decfc0c32cda 100644 --- a/internal/tools/mysql/mysqlexecutesql/mysqlexecutesql.go +++ b/internal/tools/mysql/mysqlexecutesql/mysqlexecutesql.go @@ -110,6 +110,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para return nil, util.NewClientServerError("error getting logger", http.StatusInternalServerError, err) } logger.DebugContext(ctx, fmt.Sprintf("executing `%s` tool query: %s", resourceType, sqlStr)) + resp, err := source.RunSQL(ctx, sqlStr, nil) if err != nil { return nil, util.ProcessGeneralError(err)