Handle WebSub delta updates without full undeploy/redeploy#1921
Handle WebSub delta updates without full undeploy/redeploy#1921senthuran16 wants to merge 24 commits into
Conversation
Handle WebSub delta updates without full undeploy/redeploy
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go`:
- Around line 41-67: ApplyBindingDelta mutates e.channel.Channels concurrently
with HubHandler and WebhookReceiverHandler reading it, causing a data race; add
a receiver-level sync.RWMutex to the receiver struct and use it to protect all
accesses to e.channel.Channels: wrap mutations in ApplyBindingDelta (the
delete(e.channel.Channels, ...) loop and the e.channel.Channels[...] =
kafkaTopic assignments) with mu.Lock()/mu.Unlock(), and wrap reads in HubHandler
and WebhookReceiverHandler with mu.RLock()/mu.RUnlock(); alternatively, if you
prefer snapshots, have ApplyBindingDelta produce an immutable copy of the
channel map and ensure handlers always read from that atomic snapshot reference
(use the mutex or atomic.Value to swap snapshots) so no handler reads the live
map while it is being modified.
In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 928-963: The code currently ensures topics and applies the binding
delta (brokerDriver.EnsureTopics, updater.ApplyBindingDelta,
brokerDriver.DeleteTopics) before constructing the new policy chains, so if
r.buildWebSubApiPolicyChains fails the system state has already changed; move
the call to r.buildWebSubApiPolicyChains(newWSB, vhost) (and capture subKey,
inKey, outKey, chChainKeys, err) to execute and validate before calling
brokerDriver.EnsureTopics or updater.ApplyBindingDelta, then proceed with
EnsureTopics/ApplyBindingDelta/DeleteTopics only after the chains are
successfully built (or alternatively implement an explicit rollback path that
reverts ApplyBindingDelta/DeleteTopics if buildWebSubApiPolicyChains fails).
- Around line 906-957: Snapshot the necessary state under r.mu (receiver from
r.activeReceivers[oldWSB.Name], assert it implements webSubBindingUpdater,
brokerDriver from r.activeBrokerDrivers[oldWSB.Name], oldBinding :=
r.hub.GetBinding(oldWSB.Name], and the addedChannels/removedChannels computed
from webSubChannelTopicMap/diffChannelTopics), then release r.mu before calling
any blocking external methods (brokerDriver.EnsureTopics,
updater.ApplyBindingDelta, brokerDriver.DeleteTopics); after those calls
complete reacquire r.mu to validate current runtime state and commit the updated
binding state (or handle errors) so the long-running
EnsureTopics/ApplyBindingDelta/DeleteTopics do not execute while holding r.mu.
- Around line 980-981: Replace the direct call to
binding.WebSubApiSubscriptionTopic(...) with the instance helper
r.webSubSubscriptionSyncTopic(...) when building the entry for r.bindingTopics
so the stored topic matches what AddWebSubApiBinding recorded; specifically,
update the line that sets r.bindingTopics[newWSB.Name] =
webSubTopicList(newChannels, internalSubTopic) to pass
r.webSubSubscriptionSyncTopic(newWSB.Name, newWSB.Version) (instead of
binding.WebSubApiSubscriptionTopic(...)) so the webSubTopicList uses the
configured subscription-sync topic; ensure you keep the same arguments
(newWSB.Name, newWSB.Version) and the webSubTopicList usage unchanged.
In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go`:
- Around line 200-205: The handler sends a 404 JSON with c.JSON when
errors.Is(err, websubapi.ErrNotFound) is true but does not return, so execution
falls through into mapRenderError/generic error handling and attempts to write a
second response; update the branch in websub_api_handler.go (the block that
checks websubapi.ErrNotFound) to immediately return after calling c.JSON to stop
further processing and avoid duplicate responses.
In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 155-188: RenderSpec currently mutates existing (via
templateengine.RenderSpec(existing,...)) and you then call
s.db.UpdateConfig(existing), which can persist resolved secret values; instead
call templateengine.RenderSpec on a deep copy of existing (create a copy of the
config struct before rendering), use that copy to obtain renderedConfig and to
run s.validator.Validate and s.validateArtifactConflicts, and only persist the
original unresolved existing (after updating non-secret fields like DisplayName,
Version, DesiredState, UpdatedAt, DeployedAt, CPSyncStatus) with
s.db.UpdateConfig; keep all secret resolution and renderedConfig usage confined
to the copied object so plaintext secrets are never written back to storage.
- Around line 115-120: The code allows an empty apiConfig.Metadata.Name which
later leads to persisting a blank handle; change the logic around the
HandleMismatchError check to first validate a non-empty mismatch (if
apiConfig.Metadata.Name != "" && apiConfig.Metadata.Name != params.Handle return
HandleMismatchError) and then, if apiConfig.Metadata.Name == "", set
apiConfig.Metadata.Name = params.Handle before assigning into
existing.Configuration and existing.SourceConfiguration; apply the same fix at
the second occurrence (around the code referenced at lines 152-153) so the
persisted source config always has the correct default handle while still
rejecting explicit mismatches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 532e0778-3d9b-4f9c-a1c0-9c833f93614f
📒 Files selected for processing (9)
event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.goevent-gateway/gateway-runtime/internal/runtime/runtime.goevent-gateway/gateway-runtime/internal/xdsclient/handler.goevent-gateway/gateway-runtime/internal/xdsclient/handler_test.gogateway/gateway-controller/pkg/api/handlers/handlers.gogateway/gateway-controller/pkg/api/handlers/handlers_test.gogateway/gateway-controller/pkg/api/handlers/websub_api_handler.gogateway/gateway-controller/pkg/service/websubapi/errors.gogateway/gateway-controller/pkg/service/websubapi/service.go
…l.Channels with concurrent request handlers
Fix Coderabbit Suggestions : WebSub delta updates without full undeploy/redeploy
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go (1)
140-174:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUnguarded reads of
e.channel.Channelsoutside HTTP handlers create a race condition.Verification confirms that
ApplyBindingDeltacorrectly acquireschannelMu.Lock()around all mutations (lines 62–70 in delta.go). However, the following reads lack synchronization and will race with concurrentApplyBindingDeltacalls:
Start()ranges overe.channel.Channelsat lines 143 and 169 without holdingchannelMu.RLock().reconcileSubscriptions()builds theownedChannelsmap at lines 197–200 without the lock.- The reconciler callback at line 214 accesses
e.channel.Channels[sub.Topic]from a background goroutine without holdingchannelMu.RLock().Since
ApplyBindingDeltais called asynchronously viaUpdateWebSubApiBinding(runtime.go line 950), the reconciler callback and writer can execute concurrently, causing a data race. Guard all reads withchannelMu.RLock()to prevent this.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go` around lines 140 - 174, Reads of e.channel.Channels in Start(), reconcileSubscriptions(), and the reconciler callback are not synchronized and can race with ApplyBindingDelta which uses channelMu.Lock(); fix by wrapping all reads with channelMu.RLock()/RUnlock() (use defer for unlock) — specifically, acquire channelMu.RLock() around the loop in WebSubReceiver.Start() that ranges e.channel.Channels, around the ownedChannels map construction inside reconcileSubscriptions(), and inside the reconciler callback before accessing e.channel.Channels[sub.Topic]; keep the lock scope minimal and mirror the existing channelMu naming used by ApplyBindingDelta.
♻️ Duplicate comments (1)
gateway/gateway-controller/pkg/service/websubapi/service.go (1)
159-161:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse a deep copy for render/validate isolation, not a shallow struct copy.
renderedExisting := *existingis shallow. If rendering mutates nested/interface-backed data in place, resolved values can leak back intoexistingand then be persisted byUpdateConfig(existing). Please deep-copy configuration payloads beforeRenderSpec.Based on learnings: For secret handling during API deployment in the gateway-controller codebase, follow the intended design “resolve at runtime, persist unresolved.” Never persist or write
resolvedCfg(plaintext secrets) to storage or logs.Also applies to: 191-191
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@gateway/gateway-controller/pkg/service/websubapi/service.go` around lines 159 - 161, The code uses a shallow copy renderedExisting := *existing before calling templateengine.RenderSpec, which can allow in-place mutations of nested or interface-backed fields to leak plaintext secrets back into existing and be persisted by UpdateConfig; instead create a true deep copy of the configuration payload (e.g., via a canonical deep-clone helper/JSON marshal-unmarshal or an existing Clone/DeepCopy utility) and pass that deep-copied value to templateengine.RenderSpec with s.secretResolver and log so resolved secrets never mutate the original existing object or get passed to UpdateConfig(existing); ensure the same deep-copy fix is applied at the other occurrence around line 191.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 108-109: params.Logger is used directly (log := params.Logger)
which can be nil and cause panics when calling log.Error/log.Info; change the
initialization to use a safe fallback (e.g., if params.Logger == nil { log =
s.logger } or fallback to slog.Default()) before any use. Update the three
occurrences around the file (the assignment at "log := params.Logger" and the
similar uses at the blocks mentioned near lines 192-193 and 206-210) to ensure
log is non-nil by defaulting to s.logger (or slog.Default()) so subsequent calls
like log.Error/log.Info are safe.
---
Outside diff comments:
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go`:
- Around line 140-174: Reads of e.channel.Channels in Start(),
reconcileSubscriptions(), and the reconciler callback are not synchronized and
can race with ApplyBindingDelta which uses channelMu.Lock(); fix by wrapping all
reads with channelMu.RLock()/RUnlock() (use defer for unlock) — specifically,
acquire channelMu.RLock() around the loop in WebSubReceiver.Start() that ranges
e.channel.Channels, around the ownedChannels map construction inside
reconcileSubscriptions(), and inside the reconciler callback before accessing
e.channel.Channels[sub.Topic]; keep the lock scope minimal and mirror the
existing channelMu naming used by ApplyBindingDelta.
---
Duplicate comments:
In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 159-161: The code uses a shallow copy renderedExisting :=
*existing before calling templateengine.RenderSpec, which can allow in-place
mutations of nested or interface-backed fields to leak plaintext secrets back
into existing and be persisted by UpdateConfig; instead create a true deep copy
of the configuration payload (e.g., via a canonical deep-clone helper/JSON
marshal-unmarshal or an existing Clone/DeepCopy utility) and pass that
deep-copied value to templateengine.RenderSpec with s.secretResolver and log so
resolved secrets never mutate the original existing object or get passed to
UpdateConfig(existing); ensure the same deep-copy fix is applied at the other
occurrence around line 191.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c557a3d4-39af-4760-9447-af3126938eec
📒 Files selected for processing (6)
event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.goevent-gateway/gateway-runtime/internal/runtime/runtime.gogateway/gateway-controller/pkg/api/handlers/websub_api_handler.gogateway/gateway-controller/pkg/service/websubapi/service.go
🚧 Files skipped from review as they are similar to previous changes (3)
- gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go
- event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go
- event-gateway/gateway-runtime/internal/runtime/runtime.go
…tition Make the compacted topic's partition/replication settings configurable
…ts-1 Fix Integration Test build failure
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
event-gateway/gateway-runtime/internal/config/config.go (1)
304-309: ⚡ Quick winAdd upper-bound checks in runtime validation for fail-fast behavior.
validateKafkaConfignow checks only> 0. Adding max checks here (matching connector validation) prevents oversized values from passing config load and failing later in runtime flows.Proposed patch
@@ import ( "fmt" "log/slog" + "math" "os" "strconv" "strings" @@ if kafkaCfg.CompactTopicPartitions <= 0 { return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions) } + if kafkaCfg.CompactTopicPartitions > math.MaxInt32 { + return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, kafkaCfg.CompactTopicPartitions) + } if kafkaCfg.CompactTopicReplicationFactor <= 0 { return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor) } + if kafkaCfg.CompactTopicReplicationFactor > math.MaxInt16 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, kafkaCfg.CompactTopicReplicationFactor) + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@event-gateway/gateway-runtime/internal/config/config.go` around lines 304 - 309, validateKafkaConfig currently only enforces lower bounds for CompactTopicPartitions and CompactTopicReplicationFactor; add matching upper-bound checks (same maxima used by the connector) so oversized values fail fast during config load. Modify the validation in validateKafkaConfig to verify kafkaCfg.CompactTopicPartitions and kafkaCfg.CompactTopicReplicationFactor are within the allowed range (e.g., 1 <= value <= MAX) and return fmt.Errorf with a clear message referencing the field when the value exceeds the maximum; use the existing field names kafkaCfg.CompactTopicPartitions and kafkaCfg.CompactTopicReplicationFactor to locate the checks and mirror the connector's limits.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go`:
- Around line 221-224: The consumerGroupID change widened the hex slice from 16
to 32 characters and will change IDs for all existing callbacks; revert to the
previous ID format or add a compatibility fallback: in consumerGroupID (using
cm.groupPrefix and sha256.Sum256) generate the original ID
(hex.EncodeToString(h[:])[:16]) to preserve existing groups, or implement a
deterministic dual-check/migration path that can look up both the old ID and the
new ID formats so existing consumer state/offsets are not lost during rollout.
---
Nitpick comments:
In `@event-gateway/gateway-runtime/internal/config/config.go`:
- Around line 304-309: validateKafkaConfig currently only enforces lower bounds
for CompactTopicPartitions and CompactTopicReplicationFactor; add matching
upper-bound checks (same maxima used by the connector) so oversized values fail
fast during config load. Modify the validation in validateKafkaConfig to verify
kafkaCfg.CompactTopicPartitions and kafkaCfg.CompactTopicReplicationFactor are
within the allowed range (e.g., 1 <= value <= MAX) and return fmt.Errorf with a
clear message referencing the field when the value exceeds the maximum; use the
existing field names kafkaCfg.CompactTopicPartitions and
kafkaCfg.CompactTopicReplicationFactor to locate the checks and mirror the
connector's limits.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9d7c6a8b-6d0e-4b6e-9cc3-75cac4fd8a54
📒 Files selected for processing (7)
event-gateway/gateway-runtime/configs/config.tomlevent-gateway/gateway-runtime/internal/config/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.goevent-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.goevent-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.gogateway/gateway-controller/pkg/api/handlers/handlers_test.go
✅ Files skipped from review due to trivial changes (1)
- event-gateway/gateway-runtime/configs/config.toml
🚧 Files skipped from review as they are similar to previous changes (1)
- gateway/gateway-controller/pkg/api/handlers/handlers_test.go
| // Format: {prefix}-websub-{sha256(callbackURL)[:32]} | ||
| func (cm *ConsumerManager) consumerGroupID(callbackURL string) string { | ||
| h := sha256.Sum256([]byte(callbackURL)) | ||
| return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16] | ||
| return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32] |
There was a problem hiding this comment.
Preserve consumer-group ID compatibility to avoid state reset
Changing the hash slice from 16 to 32 at Line 224 changes consumer group IDs for all existing callbacks. That can break continuity of consumer state/offset tracking after rollout. Please keep the previous ID format (or add an explicit migration/fallback path) so existing groups remain stable across upgrades.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go`
around lines 221 - 224, The consumerGroupID change widened the hex slice from 16
to 32 characters and will change IDs for all existing callbacks; revert to the
previous ID format or add a compatibility fallback: in consumerGroupID (using
cm.groupPrefix and sha256.Sum256) generate the original ID
(hex.EncodeToString(h[:])[:16]) to preserve existing groups, or implement a
deterministic dual-check/migration path that can look up both the old ID and the
new ID formats so existing consumer state/offsets are not lost during rollout.
…ts-1 Complete modified test case
|
This PR will be replaced by:
Hence, closing this PR |
Purpose
WebSub binding updates were going through full remove-and-readd flows, which caused unnecessary undeploy behavior, dropped live receiver state, and made channel-only
updates heavier than needed. Resolves N/A.
Goals
Approach
subscription topic
UpdateWebSubApiBinding(...)instead of remove-then-add for changed bindingsUser stories
WebSub API updates with channel-only changes are applied without unnecessary undeploys, while real structural changes still rebind safely.
Documentation
N/A - runtime/controller behavior fix only; no product documentation update included in this PR.
Automation tests
Security checks
Samples
N/A
Related PRs
#1918
Test environment
Local diff review on Ubuntu 24.04.4 LTS