Skip to content

Commit fdcfaf4

Browse files
authored
Merge pull request #1462 from VirajSalaka/sync-fix-mcp-2
Make MCP Proxies sync based on Eventing Mechanism
2 parents d8a58e7 + 35be1b3 commit fdcfaf4

14 files changed

Lines changed: 1252 additions & 195 deletions

File tree

common/eventhub/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
EventTypeLLMProxy EventType = "LLM_PROXY"
3737
// EventTypeLLMTemplate represents an LLM template change event
3838
EventTypeLLMTemplate EventType = "LLM_TEMPLATE"
39+
// EventTypeMCPProxy represents an MCP proxy change event
40+
EventTypeMCPProxy EventType = "MCP_PROXY"
3941

4042
// EmptyEventData is the canonical JSON payload for events that do not
4143
// require additional data beyond the top-level event fields.

gateway/gateway-controller/cmd/controller/main.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,18 @@ func main() {
189189
}
190190
log.Info("Loaded API keys", slog.Int("count", apiKeyXDSManager.GetAPIKeyCount()))
191191

192+
// MCP proxies are stored in their source form and need to be rehydrated into
193+
// the derived RestAPI representation before startup snapshot and policy work.
194+
for _, storedCfg := range configStore.GetAllByKind(string(api.Mcp)) {
195+
if err := utils.HydrateStoredMCPConfig(storedCfg); err != nil {
196+
log.Error("Failed to hydrate stored MCP proxy configuration",
197+
slog.String("id", storedCfg.UUID),
198+
slog.String("handle", storedCfg.Handle),
199+
slog.Any("error", err))
200+
os.Exit(1)
201+
}
202+
}
203+
192204
// Initialize xDS snapshot manager with router config
193205
snapshotManager := xds.NewSnapshotManager(configStore, log, &cfg.Router, db, cfg)
194206

@@ -292,8 +304,9 @@ func main() {
292304
loadedAPIs := configStore.GetAll()
293305
derivedCount := 0
294306
for _, apiConfig := range loadedAPIs {
295-
// Derive policy configuration from API
296-
if apiConfig.Kind == "RestApi" || apiConfig.Kind == "WebSubApi" {
307+
// MCP proxies hydrate into RestAPI form, so they can participate in the
308+
// same policy bootstrapping path as the other routed artifacts.
309+
if apiConfig.Kind == "RestApi" || apiConfig.Kind == "WebSubApi" || apiConfig.Kind == "Mcp" {
297310
storedPolicy := policybuilder.DerivePolicyFromAPIConfig(apiConfig, &cfg.Router, cfg, policyDefinitions)
298311
if storedPolicy != nil {
299312
if err := policyStore.Set(storedPolicy); err != nil {

gateway/gateway-controller/pkg/api/handlers/handlers.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func NewAPIServer(
113113
parser := config.NewParser()
114114
httpClient := &http.Client{Timeout: 10 * time.Second}
115115
routerConfig := &systemConfig.Router
116+
mcpDeploymentService := utils.NewMCPDeploymentService(store, db, snapshotManager, policyManager, policyValidator)
116117

117118
server := &APIServer{
118119
store: store,
@@ -124,7 +125,7 @@ func NewAPIServer(
124125
validator: validator,
125126
logger: logger,
126127
deploymentService: deploymentService,
127-
mcpDeploymentService: utils.NewMCPDeploymentService(store, db, snapshotManager, policyManager, policyValidator),
128+
mcpDeploymentService: mcpDeploymentService,
128129
llmDeploymentService: utils.NewLLMDeploymentService(store, db, snapshotManager, lazyResourceManager, templateDefinitions,
129130
deploymentService, routerConfig, policyVersionResolver, policyValidator),
130131
apiKeyService: apiKeyService,
@@ -138,6 +139,7 @@ func NewAPIServer(
138139
}
139140
if eventHub != nil {
140141
server.llmDeploymentService.SetEventHub(eventHub, systemConfig.Controller.Server.GatewayID)
142+
server.mcpDeploymentService.SetEventHub(eventHub, systemConfig.Controller.Server.GatewayID)
141143
}
142144

143145
// Create RestAPI service and handler
@@ -238,6 +240,9 @@ func (s *APIServer) SearchDeployments(c *gin.Context, kind string) {
238240
}
239241

240242
configs := s.store.GetAllByKind(kind)
243+
if kind == string(api.Mcp) && s.mcpDeploymentService != nil {
244+
configs = s.mcpDeploymentService.ListMCPProxies()
245+
}
241246

242247
// Filter based on kind to return appropriate response format
243248
if kind == string(api.Mcp) {
@@ -1294,7 +1299,7 @@ func (s *APIServer) CreateMCPProxy(c *gin.Context) {
12941299
}
12951300

12961301
// Build and add policy config derived from API configuration if policies are present
1297-
if s.policyManager != nil {
1302+
if s.policyManager != nil && s.eventHub == nil {
12981303
storedPolicy := s.buildStoredPolicyFromAPI(cfg)
12991304
if storedPolicy != nil {
13001305
if err := s.policyManager.AddPolicy(storedPolicy); err != nil {
@@ -1315,7 +1320,7 @@ func (s *APIServer) ListMCPProxies(c *gin.Context, params api.ListMCPProxiesPara
13151320
s.SearchDeployments(c, string(api.Mcp))
13161321
return
13171322
}
1318-
configs := s.store.GetAllByKind(string(api.Mcp))
1323+
configs := s.mcpDeploymentService.ListMCPProxies()
13191324

13201325
items := make([]api.MCPProxyListItem, len(configs))
13211326
for i, cfg := range configs {
@@ -1472,7 +1477,7 @@ func (s *APIServer) UpdateMCPProxy(c *gin.Context, id string) {
14721477
slog.String("handle", handle))
14731478

14741479
// Rebuild and update derived policy configuration
1475-
if s.policyManager != nil {
1480+
if s.policyManager != nil && s.eventHub == nil {
14761481
storedPolicy := s.buildStoredPolicyFromAPI(updated)
14771482
if storedPolicy != nil {
14781483
if err := s.policyManager.AddPolicy(storedPolicy); err != nil {

gateway/gateway-controller/pkg/api/handlers/handlers_test.go

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,7 @@ func createTestAPIServerWithDB(db storage.Storage) *APIServer {
756756

757757
deploymentService := utils.NewAPIDeploymentService(store, db, nil, validator, routerCfg)
758758
server.deploymentService = deploymentService
759+
server.mcpDeploymentService = utils.NewMCPDeploymentService(store, db, nil, nil, nil)
759760
server.llmDeploymentService = utils.NewLLMDeploymentService(
760761
store,
761762
db,
@@ -905,6 +906,66 @@ func createTestRestAPIRequestBody(t *testing.T, handle, displayName, version, co
905906
return body
906907
}
907908

909+
func createTestMCPRequestBody(t *testing.T, handle, displayName, version, contextPath string) []byte {
910+
t.Helper()
911+
upstreamURL := "http://backend.example.com"
912+
913+
mcpConfig := api.MCPProxyConfiguration{
914+
ApiVersion: api.MCPProxyConfigurationApiVersionGatewayApiPlatformWso2Comv1alpha1,
915+
Kind: api.Mcp,
916+
Metadata: api.Metadata{
917+
Name: handle,
918+
},
919+
Spec: api.MCPProxyConfigData{
920+
DisplayName: displayName,
921+
Version: version,
922+
Context: stringPtr(contextPath),
923+
Upstream: api.MCPProxyConfigData_Upstream{
924+
Url: &upstreamURL,
925+
},
926+
},
927+
}
928+
929+
body, err := json.Marshal(mcpConfig)
930+
require.NoError(t, err)
931+
return body
932+
}
933+
934+
func createTestMCPStoredConfig(t *testing.T, id, handle, displayName, version, contextPath string, desiredState models.DesiredState) *models.StoredConfig {
935+
t.Helper()
936+
upstreamURL := "http://backend.example.com"
937+
938+
cfg := &models.StoredConfig{
939+
UUID: id,
940+
Kind: string(api.Mcp),
941+
Handle: handle,
942+
DisplayName: displayName,
943+
Version: version,
944+
SourceConfiguration: api.MCPProxyConfiguration{
945+
ApiVersion: api.MCPProxyConfigurationApiVersionGatewayApiPlatformWso2Comv1alpha1,
946+
Kind: api.Mcp,
947+
Metadata: api.Metadata{
948+
Name: handle,
949+
},
950+
Spec: api.MCPProxyConfigData{
951+
DisplayName: displayName,
952+
Version: version,
953+
Context: stringPtr(contextPath),
954+
Upstream: api.MCPProxyConfigData_Upstream{
955+
Url: &upstreamURL,
956+
},
957+
},
958+
},
959+
DesiredState: desiredState,
960+
Origin: models.OriginGatewayAPI,
961+
CreatedAt: time.Now(),
962+
UpdatedAt: time.Now(),
963+
}
964+
965+
require.NoError(t, utils.HydrateStoredMCPConfig(cfg))
966+
return cfg
967+
}
968+
908969
func attachTestEventHub(server *APIServer, hub eventhub.EventHub, gatewayID string) {
909970
server.eventHub = hub
910971
server.gatewayID = gatewayID
@@ -930,6 +991,9 @@ func attachTestEventHub(server *APIServer, hub eventhub.EventHub, gatewayID stri
930991
if server.llmDeploymentService != nil {
931992
server.llmDeploymentService.SetEventHub(hub, gatewayID)
932993
}
994+
if server.mcpDeploymentService != nil {
995+
server.mcpDeploymentService.SetEventHub(hub, gatewayID)
996+
}
933997
}
934998

935999
func seedAPIForAPIKeyHandlerTests(t *testing.T, server *APIServer, handle string) *models.StoredConfig {
@@ -1634,10 +1698,22 @@ func TestDeleteLLMProxyNotFound(t *testing.T) {
16341698
t.Skip("Skipping test that requires full deployment service setup")
16351699
}
16361700

1637-
// TestListMCPProxies tests listing MCP proxies
1638-
// Note: This test requires full deployment service setup
16391701
func TestListMCPProxies(t *testing.T) {
1640-
t.Skip("Skipping test that requires full deployment service setup")
1702+
server := createTestAPIServer()
1703+
mockDB := server.db.(*MockStorage)
1704+
1705+
cfg := createTestMCPStoredConfig(t, "0000-mcp-id-0000-000000000000", "test-mcp", "Test MCP", "v1.0.0", "/mcp", models.StateDeployed)
1706+
require.NoError(t, mockDB.SaveConfig(cfg))
1707+
1708+
c, w := createTestContext("GET", "/mcp-proxies", nil)
1709+
server.ListMCPProxies(c, api.ListMCPProxiesParams{})
1710+
1711+
assert.Equal(t, http.StatusOK, w.Code)
1712+
1713+
var response map[string]any
1714+
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response))
1715+
assert.Equal(t, "success", response["status"])
1716+
assert.Equal(t, float64(1), response["count"])
16411717
}
16421718

16431719
// TestListMCPProxiesWithFilters tests listing MCP proxies with filters
@@ -1670,6 +1746,67 @@ func TestDeleteMCPProxyNotFound(t *testing.T) {
16701746
t.Skip("Skipping test that requires full deployment service setup")
16711747
}
16721748

1749+
func TestCreateMCPProxyWithDBAndEventHub(t *testing.T) {
1750+
server := createTestAPIServer()
1751+
mockDB := server.db.(*MockStorage)
1752+
mockHub := &mockEventHub{}
1753+
attachTestEventHub(server, mockHub, "test-gateway")
1754+
1755+
body := createTestMCPRequestBody(t, "test-mcp", "Test MCP", "v1.0.0", "/mcp")
1756+
c, w := createTestContextWithHeader("POST", "/mcp-proxies", body, map[string]string{
1757+
"Content-Type": "application/json",
1758+
})
1759+
c.Set(middleware.CorrelationIDKey, "corr-id-create-mcp")
1760+
1761+
server.CreateMCPProxy(c)
1762+
1763+
assert.Equal(t, http.StatusCreated, w.Code)
1764+
1765+
cfg, err := mockDB.GetConfigByKindAndHandle(string(api.Mcp), "test-mcp")
1766+
require.NoError(t, err)
1767+
require.Len(t, mockHub.publishedEvents, 1)
1768+
assert.Equal(t, "test-gateway", mockHub.publishedEvents[0].gatewayID)
1769+
assert.Equal(t, eventhub.EventTypeMCPProxy, mockHub.publishedEvents[0].event.EventType)
1770+
assert.Equal(t, "CREATE", mockHub.publishedEvents[0].event.Action)
1771+
assert.Equal(t, cfg.UUID, mockHub.publishedEvents[0].event.EntityID)
1772+
assert.Equal(t, "corr-id-create-mcp", mockHub.publishedEvents[0].event.EventID)
1773+
assert.Equal(t, eventhub.EmptyEventData, mockHub.publishedEvents[0].event.EventData)
1774+
1775+
_, err = server.store.Get(cfg.UUID)
1776+
require.Error(t, err)
1777+
}
1778+
1779+
func TestDeleteMCPProxyWithDBAndEventHub(t *testing.T) {
1780+
server := createTestAPIServer()
1781+
mockDB := server.db.(*MockStorage)
1782+
mockHub := &mockEventHub{}
1783+
attachTestEventHub(server, mockHub, "test-gateway")
1784+
1785+
cfg := createTestMCPStoredConfig(t, "0000-mcp-delete-id-0000-000000000000", "test-mcp", "Test MCP", "v1.0.0", "/mcp", models.StateDeployed)
1786+
require.NoError(t, mockDB.SaveConfig(cfg))
1787+
require.NoError(t, server.store.Add(cfg))
1788+
1789+
c, w := createTestContext("DELETE", "/mcp-proxies/test-mcp", nil)
1790+
c.Set(middleware.CorrelationIDKey, "corr-id-delete-mcp")
1791+
1792+
server.DeleteMCPProxy(c, "test-mcp")
1793+
1794+
assert.Equal(t, http.StatusOK, w.Code)
1795+
require.Len(t, mockHub.publishedEvents, 1)
1796+
assert.Equal(t, "test-gateway", mockHub.publishedEvents[0].gatewayID)
1797+
assert.Equal(t, eventhub.EventTypeMCPProxy, mockHub.publishedEvents[0].event.EventType)
1798+
assert.Equal(t, "DELETE", mockHub.publishedEvents[0].event.Action)
1799+
assert.Equal(t, cfg.UUID, mockHub.publishedEvents[0].event.EntityID)
1800+
assert.Equal(t, "corr-id-delete-mcp", mockHub.publishedEvents[0].event.EventID)
1801+
assert.Equal(t, eventhub.EmptyEventData, mockHub.publishedEvents[0].event.EventData)
1802+
1803+
_, err := mockDB.GetConfig(cfg.UUID)
1804+
require.Error(t, err)
1805+
1806+
_, err = server.store.Get(cfg.UUID)
1807+
require.NoError(t, err)
1808+
}
1809+
16731810
// TestGenerateAPIKeyNoAuth tests CreateAPIKey without authentication
16741811
func TestGenerateAPIKeyNoAuth(t *testing.T) {
16751812
server := createTestAPIServer()
@@ -3027,6 +3164,7 @@ func TestBuildStoredPolicyFromAPIWebSubApiWithPolicies(t *testing.T) {
30273164
// Test ListMCPProxies with stored configs that have unmarshal issues
30283165
func TestListMCPProxiesUnmarshalError(t *testing.T) {
30293166
server := createTestAPIServer()
3167+
mockDB := server.db.(*MockStorage)
30303168

30313169
// Add MCP config, then replace SourceConfiguration with something that can't be marshaled to JSON
30323170
cfg := &models.StoredConfig{
@@ -3048,8 +3186,8 @@ func TestListMCPProxiesUnmarshalError(t *testing.T) {
30483186
CreatedAt: time.Now(),
30493187
UpdatedAt: time.Now(),
30503188
}
3051-
_ = server.store.Add(cfg)
3052-
// Mutate SourceConfiguration to something that can't be JSON marshaled
3189+
require.NoError(t, mockDB.SaveConfig(cfg))
3190+
// Mutate the DB-backed object to something that can't be JSON marshaled.
30533191
cfg.SourceConfiguration = make(chan int)
30543192

30553193
c, w := createTestContext("GET", "/mcp-proxies", nil)

0 commit comments

Comments
 (0)