Skip to content

Commit 4c46ff9

Browse files
committed
refactor(backend, transport): split handleTCPConn into focused helpers
- Extract TCP configuration into configureTCPConn - Extract IP-to-target lookup into resolveTarget - Extract single-connection check into checkIfActiveConnection - Extract connection registration into addConnection - Extract receive loop into tcpReceiveLoop
1 parent d052480 commit 4c46ff9

1 file changed

Lines changed: 81 additions & 44 deletions

File tree

backend/pkg/transport/transport.go

Lines changed: 81 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,45 @@ func (transport *Transport) HandleServer(config tcp.ServerConfig, local string)
119119
// handleTCPConn is used to handle the specific TCP connections to the boards. It detects errors caused
120120
// on concurrent reads and writes, so other routines should not worry about closing or handling errors
121121
func (transport *Transport) handleTCPConn(conn net.Conn) error {
122+
transport.configureTCPConn(conn)
123+
124+
target, err := transport.resolveTarget(conn)
125+
if err != nil {
126+
return err
127+
}
128+
129+
connectionLogger := transport.logger.With().Str("remoteAddress", conn.RemoteAddr().String()).Str("target", string(target)).Logger()
130+
connectionLogger.Info().Msg("new connection")
131+
132+
if err := transport.checkIfActiveConnection(target, conn, connectionLogger); err != nil {
133+
transport.errChan <- err
134+
return err
135+
}
136+
137+
conn, errChan := tcp.WithErrChan(conn)
138+
defer func() {
139+
conn.Close()
140+
connectionLogger.Info().Msg("close")
141+
}()
142+
143+
cleanupConn := transport.addConnection(target, conn, connectionLogger)
144+
defer cleanupConn()
145+
146+
transport.api.ConnectionUpdate(target, true)
147+
defer transport.api.ConnectionUpdate(target, false)
148+
149+
transport.tcpReceiveLoop(conn, connectionLogger)
150+
151+
err = <-errChan
152+
if err != nil {
153+
connectionLogger.Error().Stack().Err(err).Msg("")
154+
transport.errChan <- err
155+
}
156+
return err
157+
}
158+
159+
// configureTCPConn sets TCP-level options like linger and no-delay.
160+
func (transport *Transport) configureTCPConn(conn net.Conn) {
122161
if tcpConn, ok := conn.(*net.TCPConn); ok {
123162
transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection linger")
124163
err := tcpConn.SetLinger(0)
@@ -134,89 +173,87 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error {
134173
transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set no delay")
135174
}
136175
}
176+
}
177+
178+
// resolveTarget maps the remote IP address of the connection to a TransportTarget
179+
// using the ipToTarget map.
180+
func (transport *Transport) resolveTarget(conn net.Conn) (abstraction.TransportTarget, error) {
181+
remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
182+
ip := remoteAddr.IP.String()
137183

138-
target, ok := transport.ipToTarget[conn.RemoteAddr().(*net.TCPAddr).IP.String()]
184+
target, ok := transport.ipToTarget[ip]
139185
if !ok {
140186
conn.Close()
141-
transport.logger.Warn().Str("remoteAddress", conn.RemoteAddr().(*net.TCPAddr).IP.String()).Msg("ip target not found")
187+
transport.logger.Warn().Str("remoteAddress", ip).Msg("ip target not found")
142188
err := ErrUnknownTarget{Remote: conn.RemoteAddr()}
143189
transport.errChan <- err
144-
return err
190+
var zero abstraction.TransportTarget
191+
return zero, err
192+
145193
}
194+
return target, nil
195+
}
146196

147-
connectionLogger := transport.logger.With().Str("remoteAddress", conn.RemoteAddr().String()).Str("target", string(target)).Logger()
148-
connectionLogger.Info().Msg("new connection")
197+
// checkIfActiveConnection closes and rejects conn if target already has an active connection.
198+
func (transport *Transport) checkIfActiveConnection(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger,) error {
199+
transport.connectionsMx.Lock()
200+
defer transport.connectionsMx.Unlock()
149201

150-
if err := func() error {
151-
transport.connectionsMx.Lock()
152-
defer transport.connectionsMx.Unlock()
153-
if _, ok := transport.connections[target]; ok {
154-
conn.Close()
155-
connectionLogger.Debug().Msg("already connected")
156-
return ErrTargetAlreadyConnected{Target: target}
157-
}
158-
return nil
159-
}(); err != nil {
202+
if _, ok := transport.connections[target]; ok {
203+
conn.Close()
204+
logger.Debug().Msg("already connected")
205+
err := ErrTargetAlreadyConnected{Target: target}
160206
transport.errChan <- err
161207
return err
162208
}
209+
return nil
210+
}
163211

164-
conn, errChan := tcp.WithErrChan(conn)
165-
defer func() {
166-
conn.Close()
167-
connectionLogger.Info().Msg("close")
168-
}()
212+
// addConnection stores conn for target and returns a cleanup that removes it.
213+
func (transport *Transport) addConnection(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger) func() {
214+
transport.connectionsMx.Lock()
215+
logger.Debug().Msg("added connection")
216+
transport.connections[target] = conn
217+
transport.connectionsMx.Unlock()
169218

170-
func() {
219+
return func() {
171220
transport.connectionsMx.Lock()
172-
defer transport.connectionsMx.Unlock()
173-
connectionLogger.Debug().Msg("added connection")
174-
transport.connections[target] = conn
175-
}()
176-
defer func() {
177-
transport.connectionsMx.Lock()
178-
defer transport.connectionsMx.Unlock()
179-
connectionLogger.Debug().Msg("removed connection")
221+
logger.Debug().Msg("removed connection")
180222
delete(transport.connections, target)
181-
}()
182-
183-
transport.api.ConnectionUpdate(target, true)
184-
defer transport.api.ConnectionUpdate(target, false)
223+
transport.connectionsMx.Unlock()
224+
}
225+
}
185226

227+
// tcpReceiveLoop reads packets from conn and forwards notifications until an error occurs.
228+
func (transport *Transport) tcpReceiveLoop(conn net.Conn, logger zerolog.Logger) {
186229
go func() {
187230
for {
188231
packet, err := transport.decoder.DecodeNext(conn)
189232
if err != nil {
190-
connectionLogger.Error().Stack().Err(err).Msg("decode")
233+
logger.Error().Stack().Err(err).Msg("decode")
191234
transport.errChan <- err
192235
transport.SendFault()
193236
return
194237
}
195238

196239
if transport.propagateFault && packet.Id() == 0 {
197-
connectionLogger.Info().Msg("replicating packet with id 0 to all boards")
240+
logger.Info().Msg("replicating packet with id 0 to all boards")
198241
err := transport.handlePacketEvent(NewPacketMessage(packet))
199242
if err != nil {
200-
connectionLogger.Error().Err(err).Msg("failed to replicate packet")
243+
logger.Error().Err(err).Msg("failed to replicate packet")
201244
}
202245
}
203246

204247
from := conn.RemoteAddr().String()
205248
to := conn.LocalAddr().String()
206249

207-
connectionLogger.Trace().Type("type", packet).Msg("packet")
250+
logger.Trace().Type("type", packet).Msg("packet")
208251
transport.api.Notification(NewPacketNotification(packet, from, to, time.Now()))
209252
}
210253
}()
211-
212-
err := <-errChan
213-
if err != nil {
214-
connectionLogger.Error().Stack().Err(err).Msg("")
215-
transport.errChan <- err
216-
}
217-
return err
218254
}
219255

256+
220257
// SendMessage triggers an event to send something to the vehicle. Some messages
221258
// might additional means to pass information around (e.g. file read and write)
222259
func (transport *Transport) SendMessage(message abstraction.TransportMessage) error {

0 commit comments

Comments
 (0)