Skip to content

Commit 0fc0a66

Browse files
authored
fix: JobCollector reuse FE RPC connection (#645)
1 parent e15c1a9 commit 0fc0a66

4 files changed

Lines changed: 37 additions & 16 deletions

File tree

cmd/ccr_syncer/ccr_syncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func main() {
228228
}()
229229

230230
// Step 9: start job collector
231-
jobCollector := NewJobCollector(db)
231+
jobCollector := NewJobCollector(db, hostInfo, factory)
232232
wg.Add(1)
233233
go func() {
234234
defer wg.Done()

cmd/ccr_syncer/job_collector.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,27 @@ import (
2222

2323
"github.com/selectdb/ccr_syncer/pkg/ccr"
2424
"github.com/selectdb/ccr_syncer/pkg/ccr/base"
25-
"github.com/selectdb/ccr_syncer/pkg/rpc"
2625
"github.com/selectdb/ccr_syncer/pkg/storage"
2726
"github.com/selectdb/ccr_syncer/pkg/xerror"
2827
"github.com/selectdb/ccr_syncer/pkg/xmetrics"
2928
log "github.com/sirupsen/logrus"
3029
)
3130

3231
type JobCollector struct {
33-
db storage.DB
34-
stop chan struct{}
32+
db storage.DB
33+
hostInfo string
34+
factory *ccr.Factory
35+
stop chan struct{}
3536
}
3637

37-
func NewJobCollector(db storage.DB) *JobCollector {
38-
return &JobCollector{db: db, stop: make(chan struct{})}
38+
func NewJobCollector(db storage.DB, hostInfo string, factory *ccr.Factory) *JobCollector {
39+
log.Infof("JobCollector initialized with hostInfo: %s", hostInfo)
40+
return &JobCollector{
41+
db: db,
42+
hostInfo: hostInfo,
43+
factory: factory,
44+
stop: make(chan struct{}),
45+
}
3946
}
4047

4148
func (c *JobCollector) Collect() {
@@ -59,11 +66,13 @@ func (c *JobCollector) Stop() {
5966
}
6067

6168
func (c *JobCollector) updateMetrics() error {
62-
jobs, err := c.db.GetJobs()
69+
_, jobs, err := c.db.GetStampAndJobs(c.hostInfo)
6370
if err != nil {
64-
return xerror.Wrapf(err, xerror.Normal, "get all jobs failed")
71+
return xerror.Wrapf(err, xerror.Normal, "get jobs by belong_to %s failed", c.hostInfo)
6572
}
6673

74+
log.Debugf("JobCollector fetched %d jobs for hostInfo: %s", len(jobs), c.hostInfo)
75+
6776
runningJobNum := 0
6877
for _, jobName := range jobs {
6978
jobInfo, err := loadJobInfo(jobName, c.db)
@@ -84,7 +93,7 @@ func (c *JobCollector) updateMetrics() error {
8493

8594
srcSpec := &jobInfo.Src
8695
commitSeq := jobProgress.CommitSeq
87-
lag, interval, err := getJobLag(srcSpec, commitSeq)
96+
lag, interval, err := c.getJobLag(srcSpec, commitSeq)
8897
if err != nil {
8998
log.Warnf("get job %s lag failed: %+v", jobName, err)
9099
continue
@@ -128,8 +137,8 @@ func loadJobInfo(jobName string, db storage.DB) (*ccr.Job, error) {
128137
return &job, nil
129138
}
130139

131-
func getJobLag(spec *base.Spec, commitSeq int64) (int64, float64, error) {
132-
feRpc, err := rpc.NewFeRpc(spec)
140+
func (c *JobCollector) getJobLag(spec *base.Spec, commitSeq int64) (int64, float64, error) {
141+
feRpc, err := c.factory.NewFeRpc(spec)
133142
if err != nil {
134143
return 0, 0, xerror.Wrapf(err, xerror.Normal, "new fe rpc failed")
135144
}

pkg/rpc/rpc_factory.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/selectdb/ccr_syncer/pkg/ccr/base"
2424
beservice "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/backendservice/backendservice"
2525
"github.com/selectdb/ccr_syncer/pkg/xerror"
26+
log "github.com/sirupsen/logrus"
2627

2728
"github.com/cloudwego/kitex/client"
2829
)
@@ -33,7 +34,7 @@ type IRpcFactory interface {
3334
}
3435

3536
type RpcFactory struct {
36-
feRpcs map[*base.Spec]IFeRpc
37+
feRpcs map[string]IFeRpc // key: connection string (cluster+host+port)
3738
feRpcsLock sync.Mutex
3839

3940
beRpcs map[base.Backend]IBeRpc
@@ -42,32 +43,43 @@ type RpcFactory struct {
4243

4344
func NewRpcFactory() IRpcFactory {
4445
return &RpcFactory{
45-
feRpcs: make(map[*base.Spec]IFeRpc),
46+
feRpcs: make(map[string]IFeRpc),
4647
beRpcs: make(map[base.Backend]IBeRpc),
4748
}
4849
}
4950

51+
// getFeKey generates a cache key for FE RPC based on connection info
52+
// Key format: "cluster@host:port:thrift_port"
53+
func getFeKey(spec *base.Spec) string {
54+
return fmt.Sprintf("%s@%s:%s:%s", spec.Cluster, spec.Host, spec.Port, spec.ThriftPort)
55+
}
56+
5057
func (rf *RpcFactory) NewFeRpc(spec *base.Spec) (IFeRpc, error) {
5158
// valid spec
5259
if err := spec.Valid(); err != nil {
5360
return nil, err
5461
}
5562

63+
// Generate cache key based on connection info (cluster + FE address)
64+
key := getFeKey(spec)
65+
5666
rf.feRpcsLock.Lock()
57-
if feRpc, ok := rf.feRpcs[spec]; ok {
67+
if feRpc, ok := rf.feRpcs[key]; ok {
5868
rf.feRpcsLock.Unlock()
69+
log.Debugf("RpcFactory: reused cached FeRpc for %s (cache hit)", key)
5970
return feRpc, nil
6071
}
6172
rf.feRpcsLock.Unlock()
6273

74+
log.Debugf("RpcFactory: creating new FeRpc for %s (cache miss)", key)
6375
feRpc, err := NewFeRpc(spec)
6476
if err != nil {
6577
return nil, err
6678
}
6779

6880
rf.feRpcsLock.Lock()
6981
defer rf.feRpcsLock.Unlock()
70-
rf.feRpcs[spec] = feRpc
82+
rf.feRpcs[key] = feRpc
7183
return feRpc, nil
7284
}
7385

pkg/service/http_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (s *HttpService) getLagHandler(w http.ResponseWriter, r *http.Request) {
298298
}
299299

300300
srcSpec := &job.Src
301-
feRpc, err := rpc.NewFeRpc(srcSpec)
301+
feRpc, err := s.jobManager.GetFactory().NewFeRpc(srcSpec)
302302
if err != nil {
303303
log.Warnf("new fe rpc failed: %+v", err)
304304
lagResult = &result{

0 commit comments

Comments
 (0)