Skip to content

Commit a30de9b

Browse files
authored
Merge pull request #278 from HyperloopUPV-H8/backend/propagate-fault
[Backend] Propagate fault
2 parents bb21b6c + 2da2f48 commit a30de9b

4 files changed

Lines changed: 67 additions & 5 deletions

File tree

backend/cmd/config.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ type Network struct {
1414
Manual bool
1515
}
1616

17+
type Transport struct {
18+
PropagateFault bool
19+
}
20+
1721
type Config struct {
18-
Vehicle vehicle.Config
19-
Server server.Config
20-
Adj Adj
21-
Network Network
22+
Vehicle vehicle.Config
23+
Server server.Config
24+
Adj Adj
25+
Network Network
26+
Transport Transport
2227
}

backend/cmd/config.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,5 @@ branch = "main" # Leave blank when using ADJ as a submodule (like this: "")
2323
test = true
2424
[network]
2525
manual = false
26+
[transport]
27+
propagate_fault = true

backend/cmd/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333
vehicle_models "github.com/HyperloopUPV-H8/h9-backend/internal/vehicle/models"
3434
"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
3535
"github.com/HyperloopUPV-H8/h9-backend/pkg/broker"
36-
connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection"
3736
blcu_topics "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/blcu"
37+
connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection"
3838
data_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/data"
3939
logger_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/logger"
4040
message_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/message"
@@ -322,6 +322,7 @@ func main() {
322322

323323
// <--- transport --->
324324
transp := transport.NewTransport(trace.Logger)
325+
transp.SetpropagateFault(config.Transport.PropagateFault)
325326

326327
// <--- vehicle --->
327328
ipToBoardId := make(map[string]abstraction.BoardId)

backend/pkg/transport/transport.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Transport struct {
3636

3737
tftp *tftp.Client
3838

39+
propagateFault bool
40+
3941
api abstraction.TransportAPI
4042

4143
logger zerolog.Logger
@@ -170,6 +172,14 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error {
170172
return
171173
}
172174

175+
if transport.propagateFault && packet.Id() == 0 {
176+
connectionLogger.Info().Msg("replicating packet with id 0 to all boards")
177+
err := transport.handlePacketEvent(NewPacketMessage(packet))
178+
if err != nil {
179+
connectionLogger.Error().Err(err).Msg("failed to replicate packet")
180+
}
181+
}
182+
173183
from := conn.RemoteAddr().String()
174184
to := conn.LocalAddr().String()
175185

@@ -208,6 +218,37 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er
208218
// handlePacketEvent is used to send an order to one of the connected boards
209219
func (transport *Transport) handlePacketEvent(message PacketMessage) error {
210220
eventLogger := transport.logger.With().Str("type", fmt.Sprintf("%T", message.Packet)).Uint16("id", uint16(message.Id())).Logger()
221+
222+
if message.Id() == 0 {
223+
eventLogger.Info().Msg("broadcasting packet id 0")
224+
data, err := transport.encoder.Encode(message.Packet)
225+
if err != nil {
226+
eventLogger.Error().Stack().Err(err).Msg("encode")
227+
transport.errChan <- err
228+
return err
229+
}
230+
231+
transport.connectionsMx.Lock()
232+
defer transport.connectionsMx.Unlock()
233+
for target, conn := range transport.connections {
234+
eventLogger := eventLogger.With().Str("target", string(target)).Logger()
235+
236+
totalWritten := 0
237+
for totalWritten < len(data) {
238+
n, err := conn.Write(data[totalWritten:])
239+
eventLogger.Trace().Int("amount", n).Msg("written chunk")
240+
totalWritten += n
241+
if err != nil {
242+
eventLogger.Error().Stack().Err(err).Msg("write")
243+
transport.errChan <- err
244+
return err
245+
}
246+
}
247+
eventLogger.Info().Msg("sent")
248+
}
249+
return nil
250+
}
251+
211252
target, ok := transport.idToTarget[message.Id()]
212253
if !ok {
213254
eventLogger.Debug().Msg("target not found")
@@ -299,6 +340,15 @@ func (transport *Transport) handleConversation(socket network.Socket, reader io.
299340
return
300341
}
301342

343+
// Intercept packets with id == 0 and replicate
344+
if transport.propagateFault && packet.Id() == 0 {
345+
conversationLogger.Info().Msg("replicating packet with id 0 to all boards")
346+
err := transport.handlePacketEvent(NewPacketMessage(packet))
347+
if err != nil {
348+
conversationLogger.Error().Err(err).Msg("failed to replicate packet")
349+
}
350+
}
351+
302352
transport.api.Notification(NewPacketNotification(packet, srcAddr, dstAddr, time.Now()))
303353
}
304354
}()
@@ -322,3 +372,7 @@ func (transport *Transport) SendFault() {
322372
// transport.errChan <- err
323373
// }
324374
}
375+
376+
func (transport *Transport) SetpropagateFault(enabled bool) {
377+
transport.propagateFault = enabled
378+
}

0 commit comments

Comments
 (0)