diff --git a/backend/cmd/config.go b/backend/cmd/config.go index a586a6533..9aeb4208b 100644 --- a/backend/cmd/config.go +++ b/backend/cmd/config.go @@ -14,9 +14,14 @@ type Network struct { Manual bool } +type Transport struct { + PropagateFault bool +} + type Config struct { - Vehicle vehicle.Config - Server server.Config - Adj Adj - Network Network + Vehicle vehicle.Config + Server server.Config + Adj Adj + Network Network + Transport Transport } diff --git a/backend/cmd/config.toml b/backend/cmd/config.toml index 1d4be2c2e..7ac70bf6f 100644 --- a/backend/cmd/config.toml +++ b/backend/cmd/config.toml @@ -23,3 +23,5 @@ branch = "main" # Leave blank when using ADJ as a submodule (like this: "") test = true [network] manual = false +[transport] +propagate_fault = true diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 860259acc..c0b891979 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -33,8 +33,8 @@ import ( vehicle_models "github.com/HyperloopUPV-H8/h9-backend/internal/vehicle/models" "github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction" "github.com/HyperloopUPV-H8/h9-backend/pkg/broker" - connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection" blcu_topics "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/blcu" + connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection" data_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/data" logger_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/logger" message_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/message" @@ -322,6 +322,7 @@ func main() { // <--- transport ---> transp := transport.NewTransport(trace.Logger) + transp.SetpropagateFault(config.Transport.PropagateFault) // <--- vehicle ---> ipToBoardId := make(map[string]abstraction.BoardId) diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index 1c0d96eb8..8fb199e4e 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -36,6 +36,8 @@ type Transport struct { tftp *tftp.Client + propagateFault bool + api abstraction.TransportAPI logger zerolog.Logger @@ -170,6 +172,14 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error { return } + if transport.propagateFault && packet.Id() == 0 { + connectionLogger.Info().Msg("replicating packet with id 0 to all boards") + err := transport.handlePacketEvent(NewPacketMessage(packet)) + if err != nil { + connectionLogger.Error().Err(err).Msg("failed to replicate packet") + } + } + from := conn.RemoteAddr().String() to := conn.LocalAddr().String() @@ -208,6 +218,37 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er // handlePacketEvent is used to send an order to one of the connected boards func (transport *Transport) handlePacketEvent(message PacketMessage) error { eventLogger := transport.logger.With().Str("type", fmt.Sprintf("%T", message.Packet)).Uint16("id", uint16(message.Id())).Logger() + + if message.Id() == 0 { + eventLogger.Info().Msg("broadcasting packet id 0") + data, err := transport.encoder.Encode(message.Packet) + if err != nil { + eventLogger.Error().Stack().Err(err).Msg("encode") + transport.errChan <- err + return err + } + + transport.connectionsMx.Lock() + defer transport.connectionsMx.Unlock() + for target, conn := range transport.connections { + eventLogger := eventLogger.With().Str("target", string(target)).Logger() + + totalWritten := 0 + for totalWritten < len(data) { + n, err := conn.Write(data[totalWritten:]) + eventLogger.Trace().Int("amount", n).Msg("written chunk") + totalWritten += n + if err != nil { + eventLogger.Error().Stack().Err(err).Msg("write") + transport.errChan <- err + return err + } + } + eventLogger.Info().Msg("sent") + } + return nil + } + target, ok := transport.idToTarget[message.Id()] if !ok { eventLogger.Debug().Msg("target not found") @@ -299,6 +340,15 @@ func (transport *Transport) handleConversation(socket network.Socket, reader io. return } + // Intercept packets with id == 0 and replicate + if transport.propagateFault && packet.Id() == 0 { + conversationLogger.Info().Msg("replicating packet with id 0 to all boards") + err := transport.handlePacketEvent(NewPacketMessage(packet)) + if err != nil { + conversationLogger.Error().Err(err).Msg("failed to replicate packet") + } + } + transport.api.Notification(NewPacketNotification(packet, srcAddr, dstAddr, time.Now())) } }() @@ -322,3 +372,7 @@ func (transport *Transport) SendFault() { // transport.errChan <- err // } } + +func (transport *Transport) SetpropagateFault(enabled bool) { + transport.propagateFault = enabled +}