diff --git a/pkg/mcp/register.go b/pkg/mcp/register.go index 641d567ff..cec3acb4c 100644 --- a/pkg/mcp/register.go +++ b/pkg/mcp/register.go @@ -100,7 +100,7 @@ func RegisterTools(server *Server) { }) server.RegisterTool(&common.ToolDef{ - Name: "get_service_detail", + Name: "get_service_distribution", Description: "获取服务详情,包括服务的提供者或消费者应用列表", InputSchema: common.InputSchema{ Type: "object", @@ -128,6 +128,34 @@ func RegisterTools(server *Server) { }, }, }, + Handler: tools.GetServiceDistribution, + }) + + server.RegisterTool(&common.ToolDef{ + Name: "get_service_detail", + Description: "获取服务详情,包括语言和方法列表", + InputSchema: common.InputSchema{ + Type: "object", + Required: []string{"serviceName"}, + Properties: map[string]common.PropertyDef{ + "serviceName": { + Type: "string", + Description: "服务名称", + }, + "group": { + Type: "string", + Description: "服务分组", + }, + "version": { + Type: "string", + Description: "服务版本", + }, + "mesh": { + Type: "string", + Description: "网格名称,默认使用第一个 discovery 配置的 id", + }, + }, + }, Handler: tools.GetServiceDetail, }) diff --git a/pkg/mcp/register_test.go b/pkg/mcp/register_test.go new file mode 100644 index 000000000..c9806f737 --- /dev/null +++ b/pkg/mcp/register_test.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mcp + +import "testing" + +func TestRegisterServiceDetailTools(t *testing.T) { + server := NewServer("test", "dev") + RegisterTools(server) + + detail, ok := server.tools["get_service_detail"] + if !ok { + t.Fatal("Tool 'get_service_detail' not registered") + } + if detail.Handler == nil { + t.Fatal("Tool 'get_service_detail' handler is nil") + } + if len(detail.InputSchema.Required) != 1 || detail.InputSchema.Required[0] != "serviceName" { + t.Fatalf("Expected serviceName to be required, got %v", detail.InputSchema.Required) + } + for _, prop := range []string{"serviceName", "version", "group", "mesh"} { + if _, ok := detail.InputSchema.Properties[prop]; !ok { + t.Fatalf("get_service_detail missing property %q", prop) + } + } + if _, ok := detail.InputSchema.Properties["side"]; ok { + t.Fatal("get_service_detail should not expose side") + } + + distribution, ok := server.tools["get_service_distribution"] + if !ok { + t.Fatal("Tool 'get_service_distribution' not registered") + } + if distribution.Handler == nil { + t.Fatal("Tool 'get_service_distribution' handler is nil") + } + for _, prop := range []string{"serviceName", "version", "group", "side", "mesh"} { + if _, ok := distribution.InputSchema.Properties[prop]; !ok { + t.Fatalf("get_service_distribution missing property %q", prop) + } + } +} diff --git a/pkg/mcp/tools/detail_tools_test.go b/pkg/mcp/tools/detail_tools_test.go new file mode 100644 index 000000000..8aeb1411b --- /dev/null +++ b/pkg/mcp/tools/detail_tools_test.go @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tools + +import ( + "context" + "encoding/json" + "testing" + + meshapi "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/config/app" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + enginecfg "github.com/apache/dubbo-admin/pkg/config/engine" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/console/counter" + "github.com/apache/dubbo-admin/pkg/core/lock" + "github.com/apache/dubbo-admin/pkg/core/manager" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store/index" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetServiceDetailMissingServiceName(t *testing.T) { + result, err := GetServiceDetail(newToolTestContext(nil), map[string]any{}) + if err != nil { + t.Fatalf("GetServiceDetail returned unexpected error: %v", err) + } + if !result.IsError { + t.Fatal("Expected error result") + } + if got := result.Content[0].Text; got != "required parameter 'serviceName' is missing" { + t.Fatalf("Expected missing parameter error, got %q", got) + } +} + +func TestGetServiceDetailSuccess(t *testing.T) { + const ( + mesh = "mesh1" + serviceName = "org.apache.demo.DemoService" + version = "1.0.0" + group = "demo" + ) + serviceKey := coremodel.BuildResourceKey(mesh, meshresource.BuildServiceIdentityKey(serviceName, version, group)) + resource := &meshresource.ServiceResource{ + ObjectMeta: metav1.ObjectMeta{Name: meshresource.BuildServiceIdentityKey(serviceName, version, group)}, + Mesh: mesh, + Spec: &meshapi.Service{ + Name: serviceName, + Version: version, + Group: group, + Language: "java", + Methods: []string{"sayHello"}, + }, + } + ctx := newToolTestContext(map[coremodel.ResourceKind]map[string]coremodel.Resource{ + meshresource.ServiceKind: { + serviceKey: resource, + }, + }) + + result, err := GetServiceDetail(ctx, map[string]any{ + "serviceName": serviceName, + "version": version, + "group": group, + "mesh": mesh, + }) + if err != nil { + t.Fatalf("GetServiceDetail returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("Expected success result, got %q", result.Content[0].Text) + } + + var payload struct { + Language string `json:"language"` + Methods []string `json:"methods"` + } + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + if payload.Language != "java" { + t.Fatalf("Expected language java, got %q", payload.Language) + } + if len(payload.Methods) != 1 || payload.Methods[0] != "sayHello" { + t.Fatalf("Expected methods [sayHello], got %v", payload.Methods) + } +} + +func TestGetServiceDistributionSuccessWithEmptyDistribution(t *testing.T) { + result, err := GetServiceDistribution(newToolTestContext(nil), map[string]any{"serviceName": "missing"}) + if err != nil { + t.Fatalf("get_service_distribution handler returned unexpected error: %v", err) + } + if result.IsError { + t.Fatalf("Expected success result with empty distribution, got %q", result.Content[0].Text) + } + + var payload struct { + ServiceName string `json:"serviceName"` + Distribution []any `json:"distribution"` + TotalApps int `json:"totalApps"` + } + if err := json.Unmarshal([]byte(result.Content[0].Text), &payload); err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + if payload.ServiceName != "missing" { + t.Fatalf("Expected serviceName missing, got %q", payload.ServiceName) + } + if len(payload.Distribution) != 0 || payload.TotalApps != 0 { + t.Fatalf("Expected empty distribution, got distribution=%v totalApps=%d", payload.Distribution, payload.TotalApps) + } +} + +func newToolTestContext(resources map[coremodel.ResourceKind]map[string]coremodel.Resource) consolectx.Context { + return &toolTestContext{ + config: app.AdminConfig{ + Discovery: []*discoverycfg.Config{{ID: "mesh1"}}, + Engine: &enginecfg.Config{Name: "engine1"}, + }, + resourceManager: &toolTestResourceManager{resources: resources}, + } +} + +type toolTestContext struct { + config app.AdminConfig + resourceManager manager.ResourceManager +} + +func (c *toolTestContext) ResourceManager() manager.ResourceManager { + return c.resourceManager +} + +func (c *toolTestContext) CounterManager() counter.CounterManager { + return nil +} + +func (c *toolTestContext) Config() app.AdminConfig { + return c.config +} + +func (c *toolTestContext) AppContext() context.Context { + return context.Background() +} + +func (c *toolTestContext) LockManager() lock.Lock { + return nil +} + +type toolTestResourceManager struct { + resources map[coremodel.ResourceKind]map[string]coremodel.Resource +} + +func (m *toolTestResourceManager) GetByKey(rk coremodel.ResourceKind, key string) (coremodel.Resource, bool, error) { + byKind := m.resources[rk] + if byKind == nil { + return nil, false, nil + } + resource, ok := byKind[key] + return resource, ok, nil +} + +func (m *toolTestResourceManager) GetByKeys(rk coremodel.ResourceKind, keys []string) ([]coremodel.Resource, error) { + byKind := m.resources[rk] + result := make([]coremodel.Resource, 0, len(keys)) + for _, key := range keys { + if resource, ok := byKind[key]; ok { + result = append(result, resource) + } + } + return result, nil +} + +func (m *toolTestResourceManager) ListByIndexes(coremodel.ResourceKind, []index.IndexCondition) ([]coremodel.Resource, error) { + return nil, nil +} + +func (m *toolTestResourceManager) PageListByIndexes(coremodel.ResourceKind, []index.IndexCondition, coremodel.PageReq) (*coremodel.PageData[coremodel.Resource], error) { + return coremodel.NewPageData[coremodel.Resource](0, 0, 0, nil), nil +} + +func (m *toolTestResourceManager) Add(coremodel.Resource) error { + return nil +} + +func (m *toolTestResourceManager) Update(coremodel.Resource) error { + return nil +} + +func (m *toolTestResourceManager) Upsert(coremodel.Resource) error { + return nil +} + +func (m *toolTestResourceManager) DeleteByKey(coremodel.ResourceKind, string, string) error { + return nil +} diff --git a/pkg/mcp/tools/search.go b/pkg/mcp/tools/search.go index f8093945f..2b4e27862 100644 --- a/pkg/mcp/tools/search.go +++ b/pkg/mcp/tools/search.go @@ -120,11 +120,11 @@ func (e *appNameSearchExecutor) execute(ctx consolectx.Context, keyword, mesh st func (e *appNameSearchExecutor) buildResult(pagedResult *model.SearchPaginationResult, keyword string, pageSize, pageNumber int) map[string]any { apps := extractGlobalApplications(pagedResult) return map[string]any{ - "keyword": keyword, - "pageSize": pageSize, - "pageNumber": pageNumber, + "keyword": keyword, + "pageSize": pageSize, + "pageNumber": pageNumber, "applications": apps, - "totalCount": len(apps), + "totalCount": len(apps), } } diff --git a/pkg/mcp/tools/service.go b/pkg/mcp/tools/service.go index 7bca8b220..354656ebd 100644 --- a/pkg/mcp/tools/service.go +++ b/pkg/mcp/tools/service.go @@ -18,6 +18,8 @@ package tools import ( + "fmt" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/console/model" "github.com/apache/dubbo-admin/pkg/console/service" @@ -46,8 +48,8 @@ func SearchServices(ctx consolectx.Context, args map[string]any) (*common.ToolRe return buildServiceSearchResult(result, keywords, mesh, pageSize, pageNumber) } -// GetServiceDetail 获取服务详情 -func GetServiceDetail(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { +// GetServiceDistribution 获取服务关联的应用分布 +func GetServiceDistribution(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { helper := common.NewArgsHelper(args) serviceName := helper.GetString("serviceName", "") @@ -156,14 +158,38 @@ func extractServices(result *model.SearchPaginationResult) ([]any, int) { resultSlice := make([]any, 0, len(services)) for _, svc := range services { - if svc != nil { - resultSlice = append(resultSlice, map[string]any{ - "serviceName": svc.ServiceName, - "version": svc.Version, - "group": svc.Group, - "consumerAppName": svc.ConsumerAppName, - }) + if svc == nil { + continue } + resultSlice = append(resultSlice, map[string]any{ + "serviceName": svc.ServiceName, + "version": svc.Version, + "group": svc.Group, + "consumerAppName": svc.ConsumerAppName, + }) } return resultSlice, int(result.PageInfo.Total) } + +// GetServiceDetail 获取服务详情 +func GetServiceDetail(ctx consolectx.Context, args map[string]any) (*common.ToolResult, error) { + helper := common.NewArgsHelper(args) + serviceName := helper.GetString("serviceName", "") + if serviceName == "" { + return common.ErrorResult(fmt.Errorf("required parameter 'serviceName' is missing")), nil + } + + req := &model.ServiceDetailReq{ + ServiceName: serviceName, + Version: helper.GetString("version", ""), + Group: helper.GetString("group", ""), + Mesh: common.GetMeshArg(ctx, args), + } + + detail, err := service.GetServiceDetail(ctx, req) + if err != nil { + return common.ErrorResult(err), nil + } + + return common.JsonResult(detail) +} diff --git a/pkg/mcp/transport/http/sse.go b/pkg/mcp/transport/http/sse.go index 84a0687f9..1c836cd3f 100644 --- a/pkg/mcp/transport/http/sse.go +++ b/pkg/mcp/transport/http/sse.go @@ -30,9 +30,9 @@ import ( // SSETransport Server-Sent Events传输层 type SSETransport struct { - server *mcp.Server - clients map[*SSEClient]bool - mu sync.RWMutex + server *mcp.Server + clients map[*SSEClient]bool + mu sync.RWMutex broadcast chan []byte } @@ -58,8 +58,8 @@ func NewSSEClient(id string) *SSEClient { // NewSSETransport 创建SSE传输层 func NewSSETransport(server *mcp.Server) *SSETransport { return &SSETransport{ - server: server, - clients: make(map[*SSEClient]bool), + server: server, + clients: make(map[*SSEClient]bool), broadcast: make(chan []byte, 256), } } @@ -83,7 +83,11 @@ func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request) { t.sendToClient(client, t.sseEvent("connected", "SSE connection established")) // 等待断开连接 - <-client.ctx.Done() + select { + case <-client.ctx.Done(): + case <-r.Context().Done(): + client.done() + } t.mu.Lock() delete(t.clients, client)