Skip to content

Commit 704568c

Browse files
authored
meter, config: meter public and private response traffic independently (#993)
1 parent 831b47c commit 704568c

11 files changed

Lines changed: 251 additions & 78 deletions

File tree

lib/config/proxy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ type ProxyServerOnline struct {
6161
// In k8s, the pod terminationGracePeriodSeconds can be set to very long so that these configs can be updated online.
6262
GracefulWaitBeforeShutdown int `yaml:"graceful-wait-before-shutdown,omitempty" toml:"graceful-wait-before-shutdown,omitempty" json:"graceful-wait-before-shutdown,omitempty" reloadable:"true"`
6363
GracefulCloseConnTimeout int `yaml:"graceful-close-conn-timeout,omitempty" toml:"graceful-close-conn-timeout,omitempty" json:"graceful-close-conn-timeout,omitempty" reloadable:"true"`
64+
// Public and private traffic are metered separately.
65+
PublicEndpoints []string `yaml:"public-endpoints,omitempty" toml:"public-endpoints,omitempty" json:"public-endpoints,omitempty" reloadable:"true"`
6466
}
6567

6668
type ProxyServer struct {

pkg/balance/router/group.go

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/pingcap/tiproxy/pkg/balance/observer"
1818
"github.com/pingcap/tiproxy/pkg/balance/policy"
1919
"github.com/pingcap/tiproxy/pkg/metrics"
20+
"github.com/pingcap/tiproxy/pkg/util/netutil"
2021
"go.uber.org/zap"
2122
)
2223

@@ -76,21 +77,15 @@ func NewGroup(values []string, bpCreator func(lg *zap.Logger) policy.BalancePoli
7677
}
7778

7879
func (g *Group) parseValues() error {
79-
var parseErr error
8080
switch g.matchType {
8181
case MatchClientCIDR, MatchProxyCIDR:
82-
cidrList := make([]*net.IPNet, 0, len(g.values))
83-
for _, v := range g.values {
84-
_, cidr, err := net.ParseCIDR(v)
85-
if err == nil {
86-
cidrList = append(cidrList, cidr)
87-
} else {
88-
parseErr = err
89-
}
82+
cidrList, parseErr := netutil.ParseCIDRList(g.values)
83+
if parseErr != nil {
84+
return parseErr
9085
}
9186
g.cidrList = cidrList
9287
}
93-
return parseErr
88+
return nil
9489
}
9590

9691
func (g *Group) Match(clientInfo ClientInfo) bool {
@@ -100,26 +95,11 @@ func (g *Group) Match(clientInfo ClientInfo) bool {
10095
if g.matchType == MatchClientCIDR {
10196
addr = clientInfo.ClientAddr
10297
}
103-
if addr == nil || reflect.ValueOf(addr).IsNil() {
104-
return false
105-
}
106-
value := addr.String()
107-
ipStr, _, err := net.SplitHostPort(value)
98+
contains, err := netutil.CIDRContainsIP(g.cidrList, addr)
10899
if err != nil {
109-
g.lg.Error("parsing address failed", zap.String("addr", value), zap.Error(err))
110-
return false
100+
g.lg.Error("checking CIDR failed", zap.String("addr", addr.String()), zap.Error(err))
111101
}
112-
ip := net.ParseIP(ipStr)
113-
if ip == nil {
114-
g.lg.Error("parsing IP failed", zap.String("ip", value))
115-
return false
116-
}
117-
for _, cidr := range g.cidrList {
118-
if cidr.Contains(ip) {
119-
return true
120-
}
121-
}
122-
return false
102+
return contains
123103
}
124104
return true
125105
}

pkg/balance/router/group_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestParseCIDR(t *testing.T) {
2626
}{
2727
{
2828
cidrs: []string{"1.1.1.1"},
29-
success: false,
29+
success: true,
3030
},
3131
{
3232
cidrs: []string{"1.1.1.1/32"},

pkg/manager/meter/meter.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ const (
2525
// The timeout can not be too long because the pod grace termination period is fixed.
2626
writeTimeout = 10 * time.Second
2727
category = "proxy"
28+
29+
crossAZKey = "crossZone_bytes"
30+
publicEndpointKey = "public_outBound_bytes"
31+
privateEndpointKey = "private_outBound_bytes"
2832
)
2933

3034
type MeterData struct {
31-
respBytes int64
32-
crossAZBytes int64
35+
publicRespBytes int64
36+
privateRespBytes int64
37+
crossAZBytes int64
3338
}
3439

3540
type Meter struct {
@@ -62,11 +67,15 @@ func NewMeter(cfg *config.Config, lg *zap.Logger) (*Meter, error) {
6267
}, nil
6368
}
6469

65-
func (m *Meter) IncTraffic(clusterID string, respBytes, crossAZBytes int64) {
70+
func (m *Meter) IncTraffic(clusterID string, respBytes, crossAZBytes int64, fromPublicEndpoint bool) {
6671
m.Lock()
6772
defer m.Unlock()
6873
orig := m.data[clusterID]
69-
orig.respBytes += respBytes
74+
if fromPublicEndpoint {
75+
orig.publicRespBytes += respBytes
76+
} else {
77+
orig.privateRespBytes += respBytes
78+
}
7079
orig.crossAZBytes += crossAZBytes
7180
m.data[clusterID] = orig
7281
}
@@ -109,11 +118,12 @@ func (m *Meter) flush(ts int64, timeout time.Duration) {
109118
array := make([]map[string]any, 0, len(data))
110119
for clusterID, d := range data {
111120
array = append(array, map[string]any{
112-
"version": "1",
113-
"cluster_id": clusterID,
114-
"source_name": category,
115-
"crossZone_bytes": &common.MeteringValue{Value: uint64(d.crossAZBytes), Unit: "bytes"},
116-
"outBound_bytes": &common.MeteringValue{Value: uint64(d.respBytes), Unit: "bytes"},
121+
"version": "1",
122+
"cluster_id": clusterID,
123+
"source_name": category,
124+
crossAZKey: &common.MeteringValue{Value: uint64(d.crossAZBytes), Unit: "bytes"},
125+
privateEndpointKey: &common.MeteringValue{Value: uint64(d.privateRespBytes), Unit: "bytes"},
126+
publicEndpointKey: &common.MeteringValue{Value: uint64(d.publicRespBytes), Unit: "bytes"},
117127
})
118128
}
119129

pkg/manager/meter/meter_test.go

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,23 @@ func TestNewMeter(t *testing.T) {
7272
func TestWrite(t *testing.T) {
7373
m, reader := createLocalMeter(t, t.TempDir())
7474
ts := time.Now().Unix() / 60 * 60
75-
m.IncTraffic("cluster-1", 100, 200)
76-
m.IncTraffic("cluster-2", 200, 300)
75+
m.IncTraffic("cluster-1", 100, 200, false)
76+
m.IncTraffic("cluster-2", 200, 300, false)
77+
m.IncTraffic("cluster-1", 500, 200, true)
78+
m.IncTraffic("cluster-2", 600, 300, true)
7779
m.flush(ts, time.Second)
7880

7981
data := readMeteringData(t, reader, ts)
8082
require.Len(t, data, 2)
81-
resp, crossAZ := getValuesFromData(t, data, "cluster-1")
82-
require.Equal(t, int64(100), resp)
83-
require.Equal(t, int64(200), crossAZ)
84-
85-
resp, crossAZ = getValuesFromData(t, data, "cluster-2")
86-
require.Equal(t, int64(200), resp)
87-
require.Equal(t, int64(300), crossAZ)
83+
publicResp, privateResp, crossAZ := getValuesFromData(t, data, "cluster-1")
84+
require.Equal(t, int64(500), publicResp)
85+
require.Equal(t, int64(100), privateResp)
86+
require.Equal(t, int64(400), crossAZ)
87+
88+
publicResp, privateResp, crossAZ = getValuesFromData(t, data, "cluster-2")
89+
require.Equal(t, int64(600), publicResp)
90+
require.Equal(t, int64(200), privateResp)
91+
require.Equal(t, int64(600), crossAZ)
8892
}
8993

9094
func TestLoop(t *testing.T) {
@@ -97,34 +101,40 @@ func TestLoop(t *testing.T) {
97101
for range 10 {
98102
wg.Run(func() {
99103
for range 100 {
100-
m.IncTraffic("cluster-1", 1, 2)
101-
m.IncTraffic("cluster-2", 1, 2)
104+
m.IncTraffic("cluster-1", 1, 2, false)
105+
m.IncTraffic("cluster-2", 1, 2, false)
106+
m.IncTraffic("cluster-1", 10, 2, true)
107+
m.IncTraffic("cluster-2", 10, 2, true)
102108
time.Sleep(time.Millisecond)
103109
}
104110
}, nil)
105111
}
106112
wg.Wait()
107113
require.NoError(t, m.Close())
108114

109-
totalResp, totalCrossAZ := make(map[string]int64), make(map[string]int64)
115+
totalPublicResp, totalPrivateResp, totalCrossAZ := make(map[string]int64), make(map[string]int64), make(map[string]int64)
110116
for ts := startTime / 60 * 60; ts <= startTime/60*60+60; ts += 60 {
111117
data := readMeteringData(t, reader, ts)
112118
if len(data) == 0 {
113119
continue
114120
}
115121
require.Len(t, data, 2)
116-
resp, crossAZ := getValuesFromData(t, data, "cluster-1")
117-
totalResp["cluster-1"] += resp
122+
publicResp, privateResp, crossAZ := getValuesFromData(t, data, "cluster-1")
123+
totalPublicResp["cluster-1"] += publicResp
124+
totalPrivateResp["cluster-1"] += privateResp
118125
totalCrossAZ["cluster-1"] += crossAZ
119-
resp, crossAZ = getValuesFromData(t, data, "cluster-2")
120-
totalResp["cluster-2"] += resp
126+
publicResp, privateResp, crossAZ = getValuesFromData(t, data, "cluster-2")
127+
totalPublicResp["cluster-2"] += publicResp
128+
totalPrivateResp["cluster-2"] += privateResp
121129
totalCrossAZ["cluster-2"] += crossAZ
122130
}
123131

124-
require.Equal(t, int64(1000), totalResp["cluster-1"])
125-
require.Equal(t, int64(2000), totalCrossAZ["cluster-1"])
126-
require.Equal(t, int64(1000), totalResp["cluster-2"])
127-
require.Equal(t, int64(2000), totalCrossAZ["cluster-2"])
132+
require.Equal(t, int64(10000), totalPublicResp["cluster-1"])
133+
require.Equal(t, int64(1000), totalPrivateResp["cluster-1"])
134+
require.Equal(t, int64(4000), totalCrossAZ["cluster-1"])
135+
require.Equal(t, int64(10000), totalPublicResp["cluster-2"])
136+
require.Equal(t, int64(1000), totalPrivateResp["cluster-2"])
137+
require.Equal(t, int64(4000), totalCrossAZ["cluster-2"])
128138
}
129139

130140
func createLocalMeter(t *testing.T, dir string) (*Meter, *meteringreader.MeteringReader) {
@@ -183,17 +193,20 @@ func readMeteringData(t *testing.T, reader *meteringreader.MeteringReader, ts in
183193
return meteringData.Data
184194
}
185195

186-
func getValuesFromData(t *testing.T, data []map[string]any, clusterID string) (int64, int64) {
196+
func getValuesFromData(t *testing.T, data []map[string]any, clusterID string) (int64, int64, int64) {
187197
for i := range data {
188198
if data[i]["cluster_id"] == clusterID {
189-
outBound, ok := data[i]["outBound_bytes"].(map[string]any)
199+
publicOutBound, ok := data[i][publicEndpointKey].(map[string]any)
200+
require.True(t, ok)
201+
privateOutBound, ok := data[i][privateEndpointKey].(map[string]any)
190202
require.True(t, ok)
191-
crossZone, ok := data[i]["crossZone_bytes"].(map[string]any)
203+
crossZone, ok := data[i][crossAZKey].(map[string]any)
192204
require.True(t, ok)
193-
require.Equal(t, "bytes", outBound["unit"])
205+
require.Equal(t, "bytes", publicOutBound["unit"])
206+
require.Equal(t, "bytes", privateOutBound["unit"])
194207
require.Equal(t, "bytes", crossZone["unit"])
195-
return int64(outBound["value"].(float64)), int64(crossZone["value"].(float64))
208+
return int64(publicOutBound["value"].(float64)), int64(privateOutBound["value"].(float64)), int64(crossZone["value"].(float64))
196209
}
197210
}
198-
return 0, 0
211+
return 0, 0, 0
199212
}

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ const (
9090
type BCConfig struct {
9191
HealthyKeepAlive config.KeepAlive
9292
UnhealthyKeepAlive config.KeepAlive
93+
FromPublicEndpoints func(addr net.Addr) bool
9394
TickerInterval time.Duration
9495
CheckBackendInterval time.Duration
9596
DialTimeout time.Duration
@@ -115,7 +116,7 @@ func (cfg *BCConfig) check() {
115116
}
116117

117118
type Meter interface {
118-
IncTraffic(clusterID string, respBytes, crossAZBytes int64)
119+
IncTraffic(clusterID string, respBytes, crossAZBytes int64, fromPublicEndpoint bool)
119120
}
120121

121122
// BackendConnManager migrates a session from one BackendConnection to another.
@@ -162,10 +163,11 @@ type BackendConnManager struct {
162163
sync.Mutex
163164
m map[any]any
164165
}
165-
connectionID uint64
166-
quitSource ErrorSource
167-
cpt capture.Capture
168-
meter Meter
166+
connectionID uint64
167+
quitSource ErrorSource
168+
cpt capture.Capture
169+
meter Meter
170+
fromPublicEndpoint bool
169171
}
170172

171173
// NewBackendConnManager creates a BackendConnManager.
@@ -235,6 +237,9 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO pnet.Packet
235237
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), nil, SrcNone)
236238
endTime := time.Now()
237239
addHandshakeMetrics(mgr.ServerAddr(), endTime.Sub(startTime))
240+
if mgr.config.FromPublicEndpoints != nil {
241+
mgr.fromPublicEndpoint = mgr.config.FromPublicEndpoints(clientIO.ProxyAddr())
242+
}
238243
mgr.updateTraffic(*mgr.backendIO.Load())
239244

240245
mgr.cmdProcessor.capability = mgr.authenticator.capability
@@ -451,7 +456,7 @@ func (mgr *BackendConnManager) updateTraffic(backendIO pnet.PacketIO) {
451456
if !mgr.curBackend.Local() {
452457
crossAZBytes = int64(outBytes - mgr.outBytes + inBytes - mgr.inBytes)
453458
}
454-
mgr.meter.IncTraffic(keyspace, int64(inBytes-mgr.inBytes), crossAZBytes)
459+
mgr.meter.IncTraffic(keyspace, int64(inBytes-mgr.inBytes), crossAZBytes, mgr.fromPublicEndpoint)
455460
}
456461
}
457462
mgr.inBytes, mgr.inPackets, mgr.outBytes, mgr.outPackets = inBytes, inPackets, outBytes, outPackets

pkg/proxy/backend/backend_conn_mgr_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,9 @@ func TestTrafficMetrics(t *testing.T) {
13691369
meter := newMeter()
13701370
ts := newBackendMgrTester(t, func(cfg *testConfig) {
13711371
cfg.proxyConfig.meter = meter
1372+
cfg.proxyConfig.bcConfig.FromPublicEndpoints = func(net.Addr) bool {
1373+
return true
1374+
}
13721375
})
13731376
var inBytes, inPackets, outBytes, outPackets int
13741377
runners := []runner{
@@ -1398,7 +1401,8 @@ func TestTrafficMetrics(t *testing.T) {
13981401
require.NoError(t, err)
13991402
require.True(t, inBytes2 > inBytes && inPackets2 > inPackets && outBytes2 > outBytes && outPackets2 > outPackets)
14001403
require.True(t, inBytes2 > 4096 && inPackets2 > 1000)
1401-
require.True(t, meter.respBytes["key1"] > 4096)
1404+
require.True(t, meter.publicRespBytes["key1"] > 4096)
1405+
require.EqualValues(t, 0, meter.privateRespBytes["key1"])
14021406
inBytes, inPackets, outBytes, outPackets = inBytes2, inPackets2, outBytes2, outPackets2
14031407
// The first backend is local, so no cross-az traffic.
14041408
crossLocationBytes2, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter)
@@ -1444,7 +1448,8 @@ func TestTrafficMetrics(t *testing.T) {
14441448
inBytes2, inPackets2, outBytes2, outPackets2, err := readTraffic(addr)
14451449
require.NoError(t, err)
14461450
require.True(t, inBytes2 > inBytes1 && inPackets2 > inPackets1 && outBytes2 > outBytes1 && outPackets2 > outPackets1)
1447-
require.True(t, meter.respBytes["key2"] > 0)
1451+
require.True(t, meter.publicRespBytes["key2"] > 0)
1452+
require.EqualValues(t, 0, meter.privateRespBytes["key2"])
14481453
// The second backend is remote, so exists cross-az traffic.
14491454
crossLocationBytes2, err := metrics.ReadCounter(metrics.CrossLocationBytesCounter)
14501455
require.NoError(t, err)

pkg/proxy/backend/mock_proxy_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,18 +160,24 @@ func (mc *mockCapture) Close() {
160160
var _ Meter = (*mockMeter)(nil)
161161

162162
type mockMeter struct {
163-
crossAZBytes map[string]int64
164-
respBytes map[string]int64
163+
crossAZBytes map[string]int64
164+
privateRespBytes map[string]int64
165+
publicRespBytes map[string]int64
165166
}
166167

167168
func newMeter() *mockMeter {
168169
return &mockMeter{
169-
crossAZBytes: make(map[string]int64),
170-
respBytes: make(map[string]int64),
170+
crossAZBytes: make(map[string]int64),
171+
publicRespBytes: make(map[string]int64),
172+
privateRespBytes: make(map[string]int64),
171173
}
172174
}
173175

174-
func (m *mockMeter) IncTraffic(clusterID string, respBytes, crossAZBytes int64) {
176+
func (m *mockMeter) IncTraffic(clusterID string, respBytes, crossAZBytes int64, fromPublicEndpoint bool) {
175177
m.crossAZBytes[clusterID] += crossAZBytes
176-
m.respBytes[clusterID] += respBytes
178+
if fromPublicEndpoint {
179+
m.publicRespBytes[clusterID] += respBytes
180+
} else {
181+
m.privateRespBytes[clusterID] += respBytes
182+
}
177183
}

pkg/proxy/proxy.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pingcap/tiproxy/pkg/proxy/keepalive"
2121
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
2222
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
23+
"github.com/pingcap/tiproxy/pkg/util/netutil"
2324
"github.com/pingcap/tiproxy/pkg/util/waitgroup"
2425
"go.uber.org/zap"
2526
)
@@ -29,6 +30,7 @@ type serverState struct {
2930
healthyKeepAlive config.KeepAlive
3031
unhealthyKeepAlive config.KeepAlive
3132
clients map[uint64]*client.ClientConnection
33+
publicEndpoints []*net.IPNet
3234
maxConnections uint64
3335
connBufferSize int
3436
requireBackendTLS bool
@@ -84,6 +86,10 @@ func NewSQLServer(logger *zap.Logger, cfg *config.Config, certMgr *cert.CertMana
8486
}
8587

8688
func (s *SQLServer) reset(cfg *config.Config) {
89+
cidrList, parseErr := netutil.ParseCIDRList(cfg.Proxy.PublicEndpoints)
90+
if parseErr != nil {
91+
s.logger.Warn("failed to parse public endpoints", zap.Error(parseErr))
92+
}
8793
s.mu.Lock()
8894
s.mu.tcpKeepAlive = cfg.Proxy.FrontendKeepalive.Enabled
8995
s.mu.maxConnections = cfg.Proxy.MaxConnections
@@ -94,6 +100,7 @@ func (s *SQLServer) reset(cfg *config.Config) {
94100
s.mu.healthyKeepAlive = cfg.Proxy.BackendHealthyKeepalive
95101
s.mu.unhealthyKeepAlive = cfg.Proxy.BackendUnhealthyKeepalive
96102
s.mu.connBufferSize = cfg.Proxy.ConnBufferSize
103+
s.mu.publicEndpoints = cidrList
97104
s.mu.Unlock()
98105
}
99106

0 commit comments

Comments
 (0)