Skip to content

Commit 2fe705b

Browse files
committed
Fixed BUG
1 parent d68b0c1 commit 2fe705b

4 files changed

Lines changed: 24 additions & 8 deletions

File tree

cluster/gate/proxy.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func (p *proxy) deliver(ctx context.Context, cid, uid int64, buf buffer.Buffer)
8484
Route: message.Route,
8585
Buffer: buf,
8686
}); err != nil {
87-
buf.Release()
8887
switch {
8988
case errors.Is(err, errors.ErrNotFoundRoute), errors.Is(err, errors.ErrNotFoundEndpoint):
9089
log.Warnf("deliver message failed, cid: %d uid: %d seq: %d route: %d err: %v", cid, uid, message.Seq, message.Route, err)

internal/link/node.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ func (l *NodeLinker) FetchNodeList(ctx context.Context, states ...cluster.State)
167167
// Deliver 投递消息给节点处理
168168
func (l *NodeLinker) Deliver(ctx context.Context, args *DeliverArgs) error {
169169
var (
170-
err error
171-
buf buffer.Buffer
170+
err error
171+
buf buffer.Buffer
172+
isDeliver bool
172173
)
173174

174175
switch b := args.Buffer.(type) {
@@ -187,15 +188,24 @@ func (l *NodeLinker) Deliver(ctx context.Context, args *DeliverArgs) error {
187188
if args.NID != "" {
188189
client, err := l.doBuildClient(args.NID)
189190
if err != nil {
191+
buf.Release()
190192
return err
191193
}
192194

193195
return client.Deliver(ctx, args.CID, args.UID, buf)
194196
} else {
195197
if _, err = l.doRPC(ctx, args.Route, args.UID, func(ctx context.Context, client *node.Client) (bool, any, error) {
198+
isDeliver = true
199+
196200
return false, nil, client.Deliver(ctx, args.CID, args.UID, buf)
197-
}); err != nil && !errors.Is(err, errors.ErrNotFoundUserLocation) {
198-
return err
201+
}); err != nil {
202+
if !isDeliver {
203+
buf.Release()
204+
}
205+
206+
if !errors.Is(err, errors.ErrNotFoundUserLocation) {
207+
return err
208+
}
199209
}
200210

201211
return nil

internal/transporter/internal/client/conn.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,19 @@ func (c *Conn) handshake(conn net.Conn) error {
121121
)
122122

123123
buf := protocol.EncodeHandshakeReq(seq, c.cli.opts.InsKind, c.cli.opts.InsID)
124-
defer buf.Release()
125124

126125
c.pending.store(seq, call)
127126

128127
if _, err := conn.Write(buf.Bytes()); err != nil {
128+
buf.Release()
129+
129130
close(call)
130131

131132
c.pending.delete(seq)
132133

133134
return err
135+
} else {
136+
buf.Release()
134137
}
135138

136139
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
@@ -139,7 +142,11 @@ func (c *Conn) handshake(conn net.Conn) error {
139142
select {
140143
case <-ctx.Done():
141144
return ctx.Err()
142-
case <-call:
145+
case buf := <-call:
146+
if buf != nil {
147+
buf.Release()
148+
}
149+
143150
return nil
144151
}
145152
}

network/tcp/server_conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ func (c *serverConn) read() {
339339

340340
isHeartbeat, err := packet.CheckHeartbeat(buf.Bytes())
341341
if err != nil {
342-
log.Errorf("check heartbeat message error: %v", err)
343342
buf.Release()
343+
log.Errorf("check heartbeat message error: %v", err)
344344
continue
345345
}
346346

0 commit comments

Comments
 (0)