@@ -19,6 +19,7 @@ package ttrpc
1919import (
2020 "context"
2121 "errors"
22+ "fmt"
2223 "io"
2324 "math/rand"
2425 "net"
@@ -28,6 +29,7 @@ import (
2829 "time"
2930
3031 "github.com/containerd/log"
32+ "github.com/moby/locker"
3133 "google.golang.org/grpc/codes"
3234 "google.golang.org/grpc/status"
3335)
@@ -289,6 +291,7 @@ func (s *Server) newConn(conn net.Conn, handshake interface{}) (*serverConn, err
289291 conn : conn ,
290292 handshake : handshake ,
291293 shutdown : make (chan struct {}),
294+ locker : locker .New (),
292295 }
293296 c .setState (connStateIdle )
294297 if err := s .addConnection (c ); err != nil {
@@ -306,6 +309,7 @@ type serverConn struct {
306309
307310 shutdownOnce sync.Once
308311 shutdown chan struct {} // forced shutdown, used by close
312+ locker * locker.Locker
309313}
310314
311315func (c * serverConn ) getState () (connState , bool ) {
@@ -478,17 +482,22 @@ func (c *serverConn) run(sctx context.Context) {
478482 }
479483 return nil
480484 }
485+ lockID := fmt .Sprintf ("%d" , id )
486+ c .locker .Lock (lockID )
481487 sh , err := c .server .services .handle (ctx , & req , respond )
482488 if err != nil {
483489 status , _ := status .FromError (err )
484490 if ! sendStatus (mh .StreamID , status ) {
491+ c .locker .Unlock (lockID )
485492 return
486493 }
494+ c .locker .Unlock (lockID )
487495 continue
488496 }
489497
490498 streams .Store (id , sh )
491499 atomic .AddInt32 (& active , 1 )
500+ c .locker .Unlock (lockID )
492501 }
493502 // TODO: else we must ignore this for future compat. log this?
494503 }
@@ -547,8 +556,11 @@ func (c *serverConn) run(sctx context.Context) {
547556 // The ttrpc protocol currently does not support the case where
548557 // the server is localClosed but not remoteClosed. Once the server
549558 // is closing, the whole stream may be considered finished
559+ lockID := fmt .Sprintf ("%d" , response .id )
560+ c .locker .Lock (lockID )
550561 streams .Delete (response .id )
551562 atomic .AddInt32 (& active , - 1 )
563+ c .locker .Unlock (lockID )
552564 }
553565 case err := <- recvErr :
554566 // TODO(stevvooe): Not wildly clear what we should do in this
0 commit comments