Skip to content

Commit c8add4b

Browse files
authored
feat(metadata): add renew and GC policy for app-level metadata (#3371)
* feat(metadata): add renew and GC policy for app-level metadata * fix ci * fix ci * fix review * fix nil * rebase develop, resolve conflicts, add units
1 parent c745999 commit c8add4b

16 files changed

Lines changed: 1536 additions & 315 deletions

common/constant/key.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,11 @@ const (
427427
MetadataServiceURLParamsPropertyName = MetadataServicePrefix + "url-params"
428428
MetadataServiceURLsPropertyName = MetadataServicePrefix + "urls"
429429
ServiceDiscoveryKey = "service_discovery" // indicate which service discovery instance will be used
430+
431+
// metadata GC configuration keys
432+
MetadataGCEnabledKey = "metadata.gc.enabled"
433+
MetadataGCWindowKey = "metadata.gc.window" // GC window in days, aligned with daily renew cycle
434+
MetadataRenewOnStartupKey = "metadata.renew-on-startup" // whether to run renewAppMetadata once on startup
430435
)
431436

432437
// Generic Filter

metadata/info/metadata_info.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type MetadataInfo struct {
6767
exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls
6868
subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls
6969
mu sync.RWMutex `json:"-" hessian:"-"`
70+
LastUpdatedTime int64 `json:"lastUpdatedTime,omitempty" hessian:"-"`
7071
}
7172

7273
func NewAppMetadataInfo(app string) *MetadataInfo {
@@ -221,6 +222,25 @@ func (info *MetadataInfo) ReplaceExportedServices(urls []*common.URL) {
221222
}
222223
}
223224

225+
// Snapshot creates a deep copy of the MetadataInfo for safe concurrent access.
226+
// The caller can modify the snapshot without affecting the original.
227+
func (info *MetadataInfo) Snapshot() MetadataInfo {
228+
info.mu.RLock()
229+
defer info.mu.RUnlock()
230+
231+
services := make(map[string]*ServiceInfo, len(info.Services))
232+
for k, v := range info.Services {
233+
si := *v
234+
services[k] = &si
235+
}
236+
return MetadataInfo{
237+
App: info.App,
238+
Revision: info.Revision,
239+
Tag: info.Tag,
240+
Services: services,
241+
}
242+
}
243+
224244
func (info *MetadataInfo) findExportedServiceURL(matchKey string) *common.URL {
225245
for _, urls := range info.exportedServiceURLs {
226246
for _, serviceURL := range urls {

metadata/mapping/metadata/service_name_mapping_concurrency_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
)
3131

3232
import (
33+
"dubbo.apache.org/dubbo-go/v3/common"
3334
"dubbo.apache.org/dubbo-go/v3/metadata/info"
3435
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
3536
"dubbo.apache.org/dubbo-go/v3/metadata/report"
@@ -86,6 +87,11 @@ func (stubReport) GetServiceAppMapping(string, string, mapping.MappingListener)
8687
return nil, nil
8788
}
8889
func (stubReport) RemoveServiceAppMappingListener(string, string) error { return nil }
90+
func (stubReport) UnPublishAppMetadata(string, string) error { return nil }
91+
func (stubReport) ListAppRevisions(string) ([]report.AppRevision, error) {
92+
return nil, nil
93+
}
94+
func (stubReport) URL() *common.URL { return nil }
8995

9096
// casReport registers mappings with optimistic concurrency against a versionedStore, exactly
9197
// as the etcd/zk/nacos reports now do, returning report.ErrMappingCASConflict on conflict.

metadata/mapping/metadata/service_name_mapping_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,18 @@ func (m *mockMetadataReport) RemoveServiceAppMappingListener(string, string) err
289289
args := m.Called()
290290
return args.Error(0)
291291
}
292+
293+
func (m *mockMetadataReport) UnPublishAppMetadata(string, string) error {
294+
args := m.Called()
295+
return args.Error(0)
296+
}
297+
298+
func (m *mockMetadataReport) ListAppRevisions(string) ([]report.AppRevision, error) {
299+
args := m.Called()
300+
return args.Get(0).([]report.AppRevision), args.Error(1)
301+
}
302+
303+
func (m *mockMetadataReport) URL() *common.URL {
304+
u, _ := common.NewURL("mock://127.0.0.1:8848")
305+
return u
306+
}

metadata/report/etcd/report.go

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package etcd
1919

2020
import (
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"strings"
2425
)
@@ -29,6 +30,8 @@ import (
2930
"github.com/dubbogo/gost/log/logger"
3031

3132
perrors "github.com/pkg/errors"
33+
34+
clientv3 "go.etcd.io/etcd/client/v3"
3235
)
3336

3437
import (
@@ -40,6 +43,26 @@ import (
4043
"dubbo.apache.org/dubbo-go/v3/metadata/report"
4144
)
4245

46+
// etcdClient abstracts the etcd Client operations used by etcdMetadataReport.
47+
type etcdClient interface {
48+
Get(key string) (string, error)
49+
Put(key, value string) error
50+
Delete(key string) error
51+
GetChildren(key string) ([]string, []string, error)
52+
GetValAndRev(key string) (string, int64, error)
53+
Create(key, value string) error
54+
UpdateWithRev(key, value string, rev int64, opts ...clientv3.OpOption) error
55+
}
56+
57+
// etcdClientWrapper wraps *gxetcd.Client to implement etcdClient.
58+
type etcdClientWrapper struct {
59+
*gxetcd.Client
60+
}
61+
62+
func (w etcdClientWrapper) Put(key, value string) error {
63+
return w.Client.Put(key, value)
64+
}
65+
4366
const DEFAULT_ROOT = "dubbo"
4467

4568
func init() {
@@ -50,13 +73,19 @@ func init() {
5073

5174
// etcdMetadataReport is the implementation of MetadataReport based etcd
5275
type etcdMetadataReport struct {
53-
client *gxetcd.Client
76+
client etcdClient
5477
rootDir string
78+
url *common.URL
79+
}
80+
81+
// URL returns the URL used to create this metadata report.
82+
func (e *etcdMetadataReport) URL() *common.URL {
83+
return e.url
5584
}
5685

5786
// GetAppMetadata get metadata info from etcd
5887
func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info.MetadataInfo, error) {
59-
key := e.rootDir + application + constant.PathSeparator + revision
88+
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
6089
data, err := e.client.Get(key)
6190
if err != nil {
6291
return nil, err
@@ -68,7 +97,7 @@ func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info
6897

6998
// PublishAppMetadata publish metadata info to etcd
7099
func (e *etcdMetadataReport) PublishAppMetadata(application, revision string, info *info.MetadataInfo) error {
71-
key := e.rootDir + application + constant.PathSeparator + revision
100+
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
72101
value, err := json.Marshal(info)
73102
if err == nil {
74103
err = e.client.Put(key, string(value))
@@ -125,6 +154,35 @@ func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group s
125154
return nil
126155
}
127156

157+
// UnPublishAppMetadata removes metadata for a specific revision from etcd.
158+
// This operation is idempotent.
159+
func (e *etcdMetadataReport) UnPublishAppMetadata(application, revision string) error {
160+
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
161+
return e.client.Delete(key)
162+
}
163+
164+
func (e *etcdMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) {
165+
prefix := e.rootDir + constant.PathSeparator + application + constant.PathSeparator
166+
keys, values, err := e.client.GetChildren(prefix)
167+
if err != nil {
168+
if errors.Is(perrors.Cause(err), gxetcd.ErrKVPairNotFound) {
169+
return nil, nil
170+
}
171+
return nil, err
172+
}
173+
174+
result := make([]report.AppRevision, 0, len(keys))
175+
for i, key := range keys {
176+
// Extract revision from key suffix (key is full path, revision is last segment)
177+
revision := key[strings.LastIndex(key, constant.PathSeparator)+1:]
178+
result = append(result, report.AppRevision{
179+
Revision: revision,
180+
ModifyTime: report.ParseMetadataLastUpdatedTime([]byte(values[i])),
181+
})
182+
}
183+
return result, nil
184+
}
185+
128186
type etcdMetadataReportFactory struct{}
129187

130188
// CreateMetadataReport get the MetadataReport instance of etcd
@@ -138,5 +196,5 @@ func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report
138196
}
139197
group := url.GetParam(constant.MetadataReportGroupKey, DEFAULT_ROOT)
140198
group = constant.PathSeparator + strings.TrimPrefix(group, constant.PathSeparator)
141-
return &etcdMetadataReport{client: client, rootDir: group}
199+
return &etcdMetadataReport{client: etcdClientWrapper{client}, rootDir: group, url: url}
142200
}

0 commit comments

Comments
 (0)