Skip to content

Commit 7e59e80

Browse files
committed
fix(versioning): close owner review correctness gaps
1 parent 10f17fd commit 7e59e80

9 files changed

Lines changed: 175 additions & 25 deletions

File tree

pkg/core/discovery/subscriber/zk_config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func processConfigUpsert[T coremodel.Resource](
170170
return err
171171
}
172172
emitter.Send(events.NewResourceChangedEventWithContext(cache.Added, nil, newRuleRes, map[string]string{
173-
"source-registry": sourceRegistryZookeeper,
173+
events.SourceRegistryContextKey: sourceRegistryZookeeper,
174174
}))
175175
return nil
176176
}
@@ -189,7 +189,7 @@ func processConfigUpsert[T coremodel.Resource](
189189
}
190190

191191
emitter.Send(events.NewResourceChangedEventWithContext(cache.Updated, oldMetadataRes, newRuleRes, map[string]string{
192-
"source-registry": sourceRegistryZookeeper,
192+
events.SourceRegistryContextKey: sourceRegistryZookeeper,
193193
}))
194194
return nil
195195
}
@@ -228,7 +228,7 @@ func processConfigDelete[T coremodel.Resource](
228228
return err
229229
}
230230
emitter.Send(events.NewResourceChangedEventWithContext(cache.Deleted, oldRuleRes, nil, map[string]string{
231-
"source-registry": sourceRegistryZookeeper,
231+
events.SourceRegistryContextKey: sourceRegistryZookeeper,
232232
}))
233233
return nil
234234
}

pkg/core/events/eventbus.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ import (
2626
"github.com/apache/dubbo-admin/pkg/core/resource/model"
2727
)
2828

29+
const SourceRegistryContextKey = "source-registry"
30+
2931
type Event interface {
3032
// Type returns the type of the event, see definitions in cache.DeltaType
3133
Type() cache.DeltaType
3234
// OldObj returns the old object, nil if old object doesn't exist in store
3335
OldObj() model.Resource
3436
// NewObj returns the new object, nil if event type is in [cache.Deleted]
3537
NewObj() model.Resource
36-
// Context returns the context of the event, if event provider want to pass extra info to the consumer, just use context
38+
// Context returns read-only event metadata. Subscribers must not mutate the returned map.
3739
Context() map[string]string
3840
// String returns the string representation of the event
3941
String() string

pkg/core/versioning/component.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (c *component) Init(ctx runtime.BuilderContext) error {
116116
return nil
117117
}
118118

119-
func (c *component) Start(rt runtime.Runtime, _ <-chan struct{}) error {
119+
func (c *component) Start(rt runtime.Runtime, stop <-chan struct{}) error {
120120
cfg := rt.Config().Versioning
121121
if cfg == nil {
122122
cfg = versioningcfg.Default()
@@ -140,6 +140,14 @@ func (c *component) Start(rt runtime.Runtime, _ <-chan struct{}) error {
140140
}
141141
}
142142
}
143+
if stop != nil {
144+
go func() {
145+
<-stop
146+
for _, sub := range c.subscribers {
147+
sub.FlushAll()
148+
}
149+
}()
150+
}
143151
return nil
144152
}
145153

pkg/core/versioning/e2e_rollback_drill_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestE2ERollbackDrill(t *testing.T) {
6060

6161
upstreamPush := newE2EConditionRoute(3)
6262
bus.Send(events.NewResourceChangedEventWithContext(cache.Updated, adminEdit, upstreamPush, map[string]string{
63-
"source-registry": "zookeeper",
63+
events.SourceRegistryContextKey: "zookeeper",
6464
}))
6565
items = requireVersions(t, store, original.ResourceKey(), 3)
6666
require.Equal(t, SourceUpstream, items[0].Source)

pkg/core/versioning/hint.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (r *AdminHintRegistry) Put(kind coremodel.ResourceKind, resourceKey, conten
6464
}
6565
r.mu.Lock()
6666
defer r.mu.Unlock()
67+
r.pruneExpiredLocked()
6768
r.hints[hintKey{kind: kind, resourceKey: resourceKey, hash: contentHash}] = hint
6869
}
6970

@@ -73,6 +74,7 @@ func (r *AdminHintRegistry) Take(kind coremodel.ResourceKind, resourceKey, conte
7374
}
7475
r.mu.Lock()
7576
defer r.mu.Unlock()
77+
r.pruneExpiredLocked()
7678
key := hintKey{kind: kind, resourceKey: resourceKey, hash: contentHash}
7779
hint, ok := r.hints[key]
7880
if !ok {

pkg/core/versioning/store.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (s *MemoryStore) InsertVersion(req InsertRequest, maxVersions int64) (*Vers
6767
}
6868
if ids := s.byRule[key]; len(ids) > 0 {
6969
latest := s.versions[ids[len(ids)-1]]
70-
if latest != nil && latest.ContentHash == req.ContentHash && latest.Operation == req.Operation {
70+
if shouldDedupVersion(latest, req) {
7171
cp := *latest
7272
if meta.CurrentVersion != nil && *meta.CurrentVersion == cp.ID {
7373
cp.IsCurrent = true
@@ -110,6 +110,16 @@ func (s *MemoryStore) InsertVersion(req InsertRequest, maxVersions int64) (*Vers
110110
return &cp, nil
111111
}
112112

113+
func shouldDedupVersion(latest *Version, req InsertRequest) bool {
114+
if latest == nil || latest.ContentHash != req.ContentHash {
115+
return false
116+
}
117+
if latest.Operation == OperationDelete || req.Operation == OperationDelete {
118+
return latest.Operation == req.Operation
119+
}
120+
return true
121+
}
122+
113123
func (s *MemoryStore) ListVersions(kind coremodel.ResourceKind, resourceKey string) ([]Version, error) {
114124
s.mu.Lock()
115125
defer s.mu.Unlock()

pkg/core/versioning/store_gorm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (s *GormStore) InsertVersion(req InsertRequest, maxVersions int64) (*Versio
6161
err = tx.Where("rule_kind = ? AND resource_key = ?", req.RuleKind, req.ResourceKey).
6262
Order("version_no DESC").
6363
First(&latest).Error
64-
if err == nil && latest.ContentHash == req.ContentHash && latest.Operation == req.Operation {
64+
if err == nil && shouldDedupVersion(&latest, req) {
6565
inserted = latest
6666
return nil
6767
}

pkg/core/versioning/subscriber.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ type pendingEvent struct {
4444
timer *time.Timer
4545
}
4646

47-
const sourceRegistryContextKey = "source-registry"
48-
4947
func NewSubscriber(kind coremodel.ResourceKind, store Store, hints *AdminHintRegistry, maxVersions int64, coalesceWindow time.Duration) *Subscriber {
5048
return &Subscriber{
5149
kind: kind,
@@ -160,14 +158,12 @@ func (s *Subscriber) record(event events.Event) error {
160158
author := "system:upstream"
161159
reason := ""
162160
var rolledBackFromID *int64
163-
hinted := false
164161
if ctx := event.Context(); ctx != nil {
165-
if registry := ctx[sourceRegistryContextKey]; registry != "" {
162+
if registry := ctx[events.SourceRegistryContextKey]; registry != "" {
166163
author = "system:" + registry
167164
}
168165
}
169166
if hint, ok := s.hints.Take(res.ResourceKind(), res.ResourceKey(), hash); ok {
170-
hinted = true
171167
source = hint.Source
172168
if source == "" {
173169
source = SourceAdmin
@@ -196,15 +192,6 @@ func (s *Subscriber) record(event events.Event) error {
196192
if author == "" {
197193
author = "system:unknown"
198194
}
199-
if !hinted && source == SourceUpstream && op != OperationDelete {
200-
latest, err := s.store.LatestVersion(ruleKind, resourceKey)
201-
if err != nil {
202-
return err
203-
}
204-
if latest != nil && latest.ContentHash == hash {
205-
return nil
206-
}
207-
}
208195
_, err = s.store.InsertVersion(InsertRequest{
209196
RuleKind: ruleKind,
210197
Mesh: mesh,

pkg/core/versioning/versioning_test.go

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import (
2929
meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
3030
appconfig "github.com/apache/dubbo-admin/pkg/config/app"
3131
eventbusconfig "github.com/apache/dubbo-admin/pkg/config/eventbus"
32+
"github.com/apache/dubbo-admin/pkg/config/mode"
33+
versioningcfg "github.com/apache/dubbo-admin/pkg/config/versioning"
3234
"github.com/apache/dubbo-admin/pkg/core/events"
35+
"github.com/apache/dubbo-admin/pkg/core/manager"
3336
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
3437
"github.com/apache/dubbo-admin/pkg/core/resource/model"
3538
coreruntime "github.com/apache/dubbo-admin/pkg/core/runtime"
@@ -74,6 +77,26 @@ func TestAdminHintTTLHitAndMiss(t *testing.T) {
7477
require.False(t, ok)
7578
}
7679

80+
func TestAdminHintPrunesExpiredOnPutAndTake(t *testing.T) {
81+
now := time.Now()
82+
reg := NewAdminHintRegistry()
83+
reg.now = func() time.Time { return now }
84+
85+
reg.Put(meshresource.ConditionRouteKind, "m/stale", "stale", AdminHint{ExpiresAt: now.Add(-time.Second)})
86+
require.Len(t, reg.hints, 1)
87+
88+
reg.Put(meshresource.ConditionRouteKind, "m/fresh", "fresh", AdminHint{
89+
ExpiresAt: now.Add(time.Second),
90+
})
91+
require.Len(t, reg.hints, 1)
92+
93+
reg.Put(meshresource.ConditionRouteKind, "m/stale", "stale", AdminHint{ExpiresAt: now.Add(-time.Second)})
94+
hint, ok := reg.Take(meshresource.ConditionRouteKind, "m/fresh", "fresh")
95+
require.True(t, ok)
96+
require.Empty(t, hint.Author)
97+
require.Empty(t, reg.hints)
98+
}
99+
77100
func TestMemoryStoreRetentionCurrentPointerAndDelete(t *testing.T) {
78101
store := NewMemoryStore()
79102
key := "mesh/demo.condition-router"
@@ -219,7 +242,7 @@ func TestSubscriberRespectsRegistrySourceContext(t *testing.T) {
219242
cache.Updated,
220243
nil,
221244
upstreamRes,
222-
map[string]string{"source-registry": "zookeeper"},
245+
map[string]string{events.SourceRegistryContextKey: "zookeeper"},
223246
)))
224247

225248
items, err := store.ListVersions(meshresource.ConditionRouteKind, upstreamRes.ResourceKey())
@@ -241,7 +264,7 @@ func TestSubscriberSkipsNoopUpstreamEchoAfterBootstrap(t *testing.T) {
241264
cache.Updated,
242265
nil,
243266
original,
244-
map[string]string{"source-registry": "zookeeper"},
267+
map[string]string{events.SourceRegistryContextKey: "zookeeper"},
245268
)))
246269

247270
items, err := store.ListVersions(meshresource.ConditionRouteKind, original.ResourceKey())
@@ -256,7 +279,7 @@ func TestSubscriberSkipsNoopUpstreamEchoAfterBootstrap(t *testing.T) {
256279
cache.Updated,
257280
original,
258281
changed,
259-
map[string]string{"source-registry": "zookeeper"},
282+
map[string]string{events.SourceRegistryContextKey: "zookeeper"},
260283
)))
261284

262285
items, err = store.ListVersions(meshresource.ConditionRouteKind, original.ResourceKey())
@@ -267,6 +290,26 @@ func TestSubscriberSkipsNoopUpstreamEchoAfterBootstrap(t *testing.T) {
267290
require.True(t, items[0].IsCurrent)
268291
}
269292

293+
func TestSubscriberRecordsEmptyCreateAfterDelete(t *testing.T) {
294+
store := NewMemoryStore()
295+
hints := NewAdminHintRegistry()
296+
sub := NewSubscriber(meshresource.ConditionRouteKind, store, hints, 5, 0)
297+
res := meshresource.NewConditionRouteResourceWithAttributes("demo.condition-router", "mesh")
298+
res.Spec = &meshproto.ConditionRoute{}
299+
300+
require.NoError(t, sub.ProcessEvent(events.NewResourceChangedEvent(cache.Added, nil, res)))
301+
require.NoError(t, sub.ProcessEvent(events.NewResourceChangedEvent(cache.Deleted, res, nil)))
302+
require.NoError(t, sub.ProcessEvent(events.NewResourceChangedEvent(cache.Added, nil, res)))
303+
304+
items, err := store.ListVersions(meshresource.ConditionRouteKind, res.ResourceKey())
305+
require.NoError(t, err)
306+
require.Len(t, items, 3)
307+
require.Equal(t, OperationCreate, items[0].Operation)
308+
require.True(t, items[0].IsCurrent)
309+
require.Equal(t, OperationDelete, items[1].Operation)
310+
require.False(t, items[1].IsCurrent)
311+
}
312+
270313
func TestSubscriberRecordsDeleteWithAdminHintSnapshot(t *testing.T) {
271314
store := NewMemoryStore()
272315
hints := NewAdminHintRegistry()
@@ -410,6 +453,37 @@ func TestDisabledServiceHistoryReturnsFeatureDisabled(t *testing.T) {
410453
require.ErrorIs(t, err, ErrFeatureDisabled)
411454
}
412455

456+
func TestComponentFlushesPendingVersionsOnStop(t *testing.T) {
457+
store := NewMemoryStore()
458+
sub := NewSubscriber(meshresource.ConditionRouteKind, store, NewAdminHintRegistry(), 5, time.Hour)
459+
comp := &component{
460+
store: store,
461+
subscribers: []*Subscriber{sub},
462+
}
463+
res := meshresource.NewConditionRouteResourceWithAttributes("demo.condition-router", "mesh")
464+
res.Spec = &meshproto.ConditionRoute{Key: "demo", Priority: 1}
465+
require.NoError(t, sub.ProcessEvent(events.NewResourceChangedEvent(cache.Added, nil, res)))
466+
467+
stop := make(chan struct{})
468+
require.NoError(t, comp.Start(testRuntime{
469+
cfg: appconfig.AdminConfig{
470+
Versioning: &versioningcfg.Config{
471+
Enabled: true,
472+
MaxVersionsPerRule: 5,
473+
},
474+
},
475+
components: map[coreruntime.ComponentType]coreruntime.Component{
476+
coreruntime.ResourceManager: testRMComponent{rm: fakeNoopResourceManager{}},
477+
},
478+
}, stop))
479+
close(stop)
480+
481+
require.Eventually(t, func() bool {
482+
items, err := store.ListVersions(meshresource.ConditionRouteKind, res.ResourceKey())
483+
return err == nil && len(items) == 1
484+
}, time.Second, 10*time.Millisecond)
485+
}
486+
413487
type fakeVersionResourceManager struct {
414488
subscriber *Subscriber
415489
}
@@ -564,3 +638,70 @@ func (c testBuilderContext) GetActivatedComponent(coreruntime.ComponentType) (co
564638
func (c testBuilderContext) ActivateComponent(coreruntime.Component) error {
565639
return nil
566640
}
641+
642+
type testRuntime struct {
643+
cfg appconfig.AdminConfig
644+
components map[coreruntime.ComponentType]coreruntime.Component
645+
}
646+
647+
func (r testRuntime) GetInstanceId() string {
648+
return "test-instance"
649+
}
650+
651+
func (r testRuntime) GetClusterId() string {
652+
return "test-cluster"
653+
}
654+
655+
func (r testRuntime) GetStartTime() time.Time {
656+
return time.Now()
657+
}
658+
659+
func (r testRuntime) GetMode() mode.Mode {
660+
return mode.Test
661+
}
662+
663+
func (r testRuntime) Config() appconfig.AdminConfig {
664+
return r.cfg
665+
}
666+
667+
func (r testRuntime) GetComponent(typ coreruntime.ComponentType) (coreruntime.Component, error) {
668+
return r.components[typ], nil
669+
}
670+
671+
func (r testRuntime) AppContext() context.Context {
672+
return context.Background()
673+
}
674+
675+
func (r testRuntime) Add(...coreruntime.Component) {}
676+
677+
func (r testRuntime) Start(<-chan struct{}) error {
678+
return nil
679+
}
680+
681+
type testRMComponent struct {
682+
rm manager.ResourceManager
683+
}
684+
685+
func (c testRMComponent) Type() coreruntime.ComponentType {
686+
return coreruntime.ResourceManager
687+
}
688+
689+
func (c testRMComponent) Order() int {
690+
return 0
691+
}
692+
693+
func (c testRMComponent) RequiredDependencies() []coreruntime.ComponentType {
694+
return nil
695+
}
696+
697+
func (c testRMComponent) Init(coreruntime.BuilderContext) error {
698+
return nil
699+
}
700+
701+
func (c testRMComponent) Start(coreruntime.Runtime, <-chan struct{}) error {
702+
return nil
703+
}
704+
705+
func (c testRMComponent) ResourceManager() manager.ResourceManager {
706+
return c.rm
707+
}

0 commit comments

Comments
 (0)