Skip to content

Commit bf0e4ea

Browse files
Merge pull request #1992 from senthuran16/platform-api-for-webbroker-api
Add platform API changes for WebBroker APIs
2 parents 574afd6 + 3e38e6d commit bf0e4ea

31 files changed

Lines changed: 5856 additions & 79 deletions

gateway/gateway-controller/pkg/api/management/generated.go

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gateway/gateway-controller/pkg/controlplane/client.go

Lines changed: 254 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -939,13 +939,13 @@ func (c *Client) syncAPIKeysForExistingArtifacts(gatewayID string) {
939939
continue
940940
}
941941
if cfg.Kind != models.KindLlmProvider && cfg.Kind != models.KindLlmProxy &&
942-
cfg.Kind != models.KindRestApi && cfg.Kind != models.KindWebSubApi {
942+
cfg.Kind != models.KindRestApi && cfg.Kind != models.KindWebSubApi && cfg.Kind != models.KindWebBrokerApi {
943943
continue
944944
}
945945
artifactUUIDsByKind[cfg.Kind] = append(artifactUUIDsByKind[cfg.Kind], cfg.UUID)
946946
}
947947

948-
for _, kind := range []string{models.KindRestApi, models.KindWebSubApi, models.KindLlmProvider, models.KindLlmProxy} {
948+
for _, kind := range []string{models.KindRestApi, models.KindWebSubApi, models.KindWebBrokerApi, models.KindLlmProvider, models.KindLlmProxy} {
949949
select {
950950
case <-c.ctx.Done():
951951
c.logger.Info("Stopping API key bulk sync due to client context cancellation")
@@ -1301,6 +1301,12 @@ func (c *Client) handleMessage(messageType int, message []byte) {
13011301
c.handleWebSubAPIUndeployedEvent(event)
13021302
case "websub.deleted":
13031303
c.handleWebSubAPIDeletedEvent(event)
1304+
case "webbroker.deployed":
1305+
c.handleWebBrokerAPIDeployedEvent(event)
1306+
case "webbroker.undeployed":
1307+
c.handleWebBrokerAPIUndeployedEvent(event)
1308+
case "webbroker.deleted":
1309+
c.handleWebBrokerAPIDeletedEvent(event)
13041310
case "application.updated":
13051311
c.handleApplicationUpdatedEvent(event)
13061312
default:
@@ -2709,6 +2715,252 @@ func (c *Client) handleWebSubAPIDeletedEvent(event map[string]any) {
27092715
c.performFullAPIDeletion(apiID, apiConfig, deletedEvent.CorrelationID)
27102716
}
27112717

2718+
func (c *Client) handleWebBrokerAPIDeployedEvent(event map[string]any) {
2719+
c.logger.Debug("WebBroker API Deployment Event",
2720+
slog.Any("payload", event["payload"]),
2721+
slog.Any("timestamp", event["timestamp"]),
2722+
slog.Any("correlationId", event["correlationId"]),
2723+
)
2724+
2725+
eventBytes, err := json.Marshal(event)
2726+
if err != nil {
2727+
c.logger.Error("Failed to marshal WebBroker API deployment event for parsing",
2728+
slog.Any("error", err),
2729+
)
2730+
return
2731+
}
2732+
2733+
var deployedEvent WebBrokerAPIDeployedEvent
2734+
if err := json.Unmarshal(eventBytes, &deployedEvent); err != nil {
2735+
c.logger.Error("Failed to parse WebBroker API deployment event",
2736+
slog.Any("error", err),
2737+
)
2738+
return
2739+
}
2740+
2741+
apiID := deployedEvent.Payload.APIID
2742+
if apiID == "" {
2743+
c.logger.Error("API ID is empty in WebBroker API deployment event")
2744+
return
2745+
}
2746+
2747+
c.logger.Info("Processing WebBroker API deployment",
2748+
slog.String("api_id", apiID),
2749+
slog.String("deployment_id", deployedEvent.Payload.DeploymentID),
2750+
slog.String("correlation_id", deployedEvent.CorrelationID),
2751+
)
2752+
2753+
// Fetch WebBroker API definition from control plane
2754+
zipData, err := c.apiUtilsService.FetchWebBrokerAPIDefinition(apiID)
2755+
if err != nil {
2756+
c.logger.Error("Failed to fetch WebBroker API definition",
2757+
slog.String("api_id", apiID),
2758+
slog.Any("error", err),
2759+
)
2760+
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
2761+
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
2762+
return
2763+
}
2764+
2765+
yamlData, err := c.apiUtilsService.ExtractYAMLFromZip(zipData)
2766+
if err != nil {
2767+
c.logger.Error("Failed to extract YAML from WebBroker API ZIP",
2768+
slog.String("api_id", apiID),
2769+
slog.Any("error", err),
2770+
)
2771+
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
2772+
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
2773+
return
2774+
}
2775+
2776+
performedAt := deployedEvent.Payload.PerformedAt.Truncate(time.Millisecond)
2777+
if performedAt.IsZero() {
2778+
performedAt = time.Now().Truncate(time.Millisecond)
2779+
}
2780+
result, err := c.apiUtilsService.CreateAPIFromYAML(yamlData, apiID, deployedEvent.Payload.DeploymentID, &performedAt, deployedEvent.CorrelationID, c.deploymentService)
2781+
if err != nil {
2782+
c.logger.Error("Failed to create WebBroker API from YAML",
2783+
slog.String("api_id", apiID),
2784+
slog.Any("error", err),
2785+
)
2786+
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "failed",
2787+
deployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
2788+
return
2789+
}
2790+
2791+
if result.IsStale {
2792+
c.logger.Debug("Skipped stale WebBroker API deploy event (newer version exists in DB)",
2793+
slog.String("api_id", apiID),
2794+
slog.String("deployment_id", deployedEvent.Payload.DeploymentID),
2795+
)
2796+
return
2797+
}
2798+
2799+
c.sendDeploymentAck(deployedEvent.Payload.DeploymentID, apiID, "webbroker", "deploy", "success",
2800+
deployedEvent.Payload.PerformedAt, "")
2801+
2802+
c.logger.Info("Successfully processed WebBroker API deployment event",
2803+
slog.String("api_id", apiID),
2804+
slog.String("correlation_id", deployedEvent.CorrelationID),
2805+
)
2806+
}
2807+
2808+
func (c *Client) handleWebBrokerAPIUndeployedEvent(event map[string]any) {
2809+
c.logger.Debug("WebBroker API Undeployment Event",
2810+
slog.Any("payload", event["payload"]),
2811+
slog.Any("timestamp", event["timestamp"]),
2812+
slog.Any("correlationId", event["correlationId"]),
2813+
)
2814+
2815+
eventBytes, err := json.Marshal(event)
2816+
if err != nil {
2817+
c.logger.Error("Failed to marshal WebBroker API undeployment event for parsing",
2818+
slog.Any("error", err),
2819+
)
2820+
return
2821+
}
2822+
2823+
var undeployedEvent WebBrokerAPIUndeployedEvent
2824+
if err := json.Unmarshal(eventBytes, &undeployedEvent); err != nil {
2825+
c.logger.Error("Failed to parse WebBroker API undeployment event",
2826+
slog.Any("error", err),
2827+
)
2828+
return
2829+
}
2830+
2831+
apiID := undeployedEvent.Payload.APIID
2832+
if apiID == "" {
2833+
c.logger.Error("API ID is empty in WebBroker API undeployment event")
2834+
return
2835+
}
2836+
2837+
apiConfig, err := c.findAPIConfig(apiID)
2838+
if err != nil {
2839+
if storage.IsNotFoundError(err) {
2840+
c.logger.Warn("WebBroker API configuration not found for undeployment",
2841+
slog.String("api_id", apiID),
2842+
)
2843+
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "success",
2844+
undeployedEvent.Payload.PerformedAt, "")
2845+
return
2846+
}
2847+
c.logger.Error("Failed to fetch WebBroker API configuration for undeployment",
2848+
slog.String("api_id", apiID),
2849+
slog.String("correlation_id", undeployedEvent.CorrelationID),
2850+
slog.Any("error", err),
2851+
)
2852+
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
2853+
undeployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
2854+
return
2855+
}
2856+
2857+
if apiConfig.DeploymentID != "" && undeployedEvent.Payload.DeploymentID != "" &&
2858+
apiConfig.DeploymentID != undeployedEvent.Payload.DeploymentID {
2859+
c.logger.Warn("Ignoring stale WebBroker API undeploy event: deployment ID mismatch",
2860+
slog.String("api_id", apiID),
2861+
slog.String("event_deployment_id", undeployedEvent.Payload.DeploymentID),
2862+
slog.String("current_deployment_id", apiConfig.DeploymentID),
2863+
)
2864+
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
2865+
undeployedEvent.Payload.PerformedAt, "DEPLOYMENT_ID_MISMATCH")
2866+
return
2867+
}
2868+
2869+
performedAt := undeployedEvent.Payload.PerformedAt.Truncate(time.Millisecond)
2870+
if performedAt.IsZero() {
2871+
performedAt = time.Now().Truncate(time.Millisecond)
2872+
}
2873+
apiConfig.DesiredState = models.StateUndeployed
2874+
apiConfig.DeploymentID = undeployedEvent.Payload.DeploymentID
2875+
apiConfig.DeployedAt = &performedAt
2876+
apiConfig.UpdatedAt = time.Now()
2877+
2878+
affected, err := c.db.UpsertConfig(apiConfig)
2879+
if err != nil {
2880+
c.logger.Error("Failed to upsert config for WebBroker API undeployment",
2881+
slog.String("api_id", apiID),
2882+
slog.Any("error", err),
2883+
)
2884+
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "failed",
2885+
undeployedEvent.Payload.PerformedAt, "GATEWAY_PROCESSING_ERROR")
2886+
return
2887+
}
2888+
if !affected {
2889+
c.logger.Debug("Skipped stale WebBroker API undeploy event (newer version exists in DB)",
2890+
slog.String("api_id", apiID),
2891+
slog.String("deployment_id", undeployedEvent.Payload.DeploymentID),
2892+
)
2893+
return
2894+
}
2895+
2896+
evt := eventhub.Event{
2897+
EventType: eventhub.EventTypeAPI,
2898+
Action: "UPDATE",
2899+
EntityID: apiID,
2900+
EventID: undeployedEvent.CorrelationID,
2901+
}
2902+
if err := c.eventHub.PublishEvent(c.gatewayID, evt); err != nil {
2903+
c.logger.Error("Failed to publish WebBroker API undeployment event", slog.Any("error", err))
2904+
}
2905+
2906+
c.sendDeploymentAck(undeployedEvent.Payload.DeploymentID, apiID, "webbroker", "undeploy", "success",
2907+
undeployedEvent.Payload.PerformedAt, "")
2908+
2909+
c.logger.Info("Successfully processed WebBroker API undeployment event",
2910+
slog.String("api_id", apiID),
2911+
slog.String("correlation_id", undeployedEvent.CorrelationID),
2912+
)
2913+
}
2914+
2915+
func (c *Client) handleWebBrokerAPIDeletedEvent(event map[string]any) {
2916+
c.logger.Debug("WebBroker API Deleted Event",
2917+
slog.Any("payload", event["payload"]),
2918+
slog.Any("timestamp", event["timestamp"]),
2919+
slog.Any("correlationId", event["correlationId"]),
2920+
)
2921+
2922+
eventBytes, err := json.Marshal(event)
2923+
if err != nil {
2924+
c.logger.Error("Failed to marshal WebBroker API deleted event for parsing",
2925+
slog.Any("error", err),
2926+
)
2927+
return
2928+
}
2929+
2930+
var deletedEvent WebBrokerAPIDeletedEvent
2931+
if err := json.Unmarshal(eventBytes, &deletedEvent); err != nil {
2932+
c.logger.Error("Failed to parse WebBroker API deleted event",
2933+
slog.Any("error", err),
2934+
)
2935+
return
2936+
}
2937+
2938+
apiID := deletedEvent.Payload.APIID
2939+
if apiID == "" {
2940+
c.logger.Error("API ID is empty in WebBroker API deleted event")
2941+
return
2942+
}
2943+
2944+
apiConfig, err := c.findAPIConfig(apiID)
2945+
if err != nil {
2946+
if storage.IsNotFoundError(err) {
2947+
c.logger.Warn("WebBroker API configuration not found for deletion; running orphan cleanup",
2948+
slog.String("api_id", apiID),
2949+
)
2950+
c.cleanupOrphanedResources(apiID, deletedEvent.CorrelationID)
2951+
return
2952+
}
2953+
c.logger.Error("Failed to fetch WebBroker API configuration for deletion",
2954+
slog.String("api_id", apiID),
2955+
slog.String("correlation_id", deletedEvent.CorrelationID),
2956+
slog.Any("error", err),
2957+
)
2958+
return
2959+
}
2960+
2961+
c.performFullAPIDeletion(apiID, apiConfig, deletedEvent.CorrelationID)
2962+
}
2963+
27122964
func (c *Client) handleMCPProxyDeploymentEvent(event map[string]any) {
27132965
c.logger.Debug("MCP Proxy Deployment Event",
27142966
slog.Any("payload", event["payload"]),

0 commit comments

Comments
 (0)