From 84eb286e2583f918f5162422811225c51ccfb606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20F=C3=ADsica?= Date: Mon, 26 May 2025 12:20:55 +0200 Subject: [PATCH 1/5] Replicate id 0 packets with handlePacketEvent --- backend/pkg/transport/transport.go | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index 1c0d96eb8..bc9be7c12 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -208,6 +208,37 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er // handlePacketEvent is used to send an order to one of the connected boards func (transport *Transport) handlePacketEvent(message PacketMessage) error { eventLogger := transport.logger.With().Str("type", fmt.Sprintf("%T", message.Packet)).Uint16("id", uint16(message.Id())).Logger() + + if message.Id() == 0 { + eventLogger.Info().Msg("broadcasting packet id 0") + data, err := transport.encoder.Encode(message.Packet) + if err != nil { + eventLogger.Error().Stack().Err(err).Msg("encode") + transport.errChan <- err + return err + } + + transport.connectionsMx.Lock() + defer transport.connectionsMx.Unlock() + for target, conn := range transport.connections { + eventLogger := eventLogger.With().Str("target", string(target)).Logger() + + totalWritten := 0 + for totalWritten < len(data) { + n, err := conn.Write(data[totalWritten:]) + eventLogger.Trace().Int("amount", n).Msg("written chunk") + totalWritten += n + if err != nil { + eventLogger.Error().Stack().Err(err).Msg("write") + transport.errChan <- err + return err + } + } + eventLogger.Info().Msg("sent") + } + return nil + } + target, ok := transport.idToTarget[message.Id()] if !ok { eventLogger.Debug().Msg("target not found") From c8ae2d80171a0074e0ad6d4d7f684c34a241fdd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20F=C3=ADsica?= Date: Mon, 26 May 2025 12:29:06 +0200 Subject: [PATCH 2/5] Catch id 0 packets with the sniffer --- backend/pkg/transport/transport.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index bc9be7c12..c8621a1e9 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -330,6 +330,15 @@ func (transport *Transport) handleConversation(socket network.Socket, reader io. return } + // Intercept packets with id == 0 and replicate + if packet.Id() == 0 { + conversationLogger.Info().Msg("replicating packet with id 0 to all boards") + err := transport.handlePacketEvent(NewPacketMessage(packet)) + if err != nil { + conversationLogger.Error().Err(err).Msg("failed to replicate packet") + } + } + transport.api.Notification(NewPacketNotification(packet, srcAddr, dstAddr, time.Now())) } }() From 742eec1e181c4df0f57d86afa81880d4405b56e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20F=C3=ADsica?= Date: Mon, 26 May 2025 12:52:50 +0200 Subject: [PATCH 3/5] Add option in config.toml to enable progagating fault --- backend/cmd/config.go | 13 +++++++++---- backend/cmd/config.toml | 2 ++ backend/cmd/main.go | 3 ++- backend/pkg/transport/transport.go | 8 +++++++- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/backend/cmd/config.go b/backend/cmd/config.go index a586a6533..9aeb4208b 100644 --- a/backend/cmd/config.go +++ b/backend/cmd/config.go @@ -14,9 +14,14 @@ type Network struct { Manual bool } +type Transport struct { + PropagateFault bool +} + type Config struct { - Vehicle vehicle.Config - Server server.Config - Adj Adj - Network Network + Vehicle vehicle.Config + Server server.Config + Adj Adj + Network Network + Transport Transport } diff --git a/backend/cmd/config.toml b/backend/cmd/config.toml index 1d4be2c2e..069e2986f 100644 --- a/backend/cmd/config.toml +++ b/backend/cmd/config.toml @@ -23,3 +23,5 @@ branch = "main" # Leave blank when using ADJ as a submodule (like this: "") test = true [network] manual = false +[transport] +propagate_fault = false diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 860259acc..c0b891979 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -33,8 +33,8 @@ import ( vehicle_models "github.com/HyperloopUPV-H8/h9-backend/internal/vehicle/models" "github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction" "github.com/HyperloopUPV-H8/h9-backend/pkg/broker" - connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection" blcu_topics "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/blcu" + connection_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/connection" data_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/data" logger_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/logger" message_topic "github.com/HyperloopUPV-H8/h9-backend/pkg/broker/topics/message" @@ -322,6 +322,7 @@ func main() { // <--- transport ---> transp := transport.NewTransport(trace.Logger) + transp.SetpropagateFault(config.Transport.PropagateFault) // <--- vehicle ---> ipToBoardId := make(map[string]abstraction.BoardId) diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index c8621a1e9..50babc065 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -36,6 +36,8 @@ type Transport struct { tftp *tftp.Client + propagateFault bool + api abstraction.TransportAPI logger zerolog.Logger @@ -331,7 +333,7 @@ func (transport *Transport) handleConversation(socket network.Socket, reader io. } // Intercept packets with id == 0 and replicate - if packet.Id() == 0 { + if transport.propagateFault && packet.Id() == 0 { conversationLogger.Info().Msg("replicating packet with id 0 to all boards") err := transport.handlePacketEvent(NewPacketMessage(packet)) if err != nil { @@ -362,3 +364,7 @@ func (transport *Transport) SendFault() { // transport.errChan <- err // } } + +func (transport *Transport) SetpropagateFault(enabled bool) { + transport.propagateFault = enabled +} From 94fc1ce4a7de6c028975c6210bd2865eb6498fc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20F=C3=ADsica=20Parra?= <130096040+JFisica@users.noreply.github.com> Date: Mon, 26 May 2025 14:02:38 +0200 Subject: [PATCH 4/5] propagate_fault ON by default Co-authored-by: Marc Sanchis --- backend/cmd/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/cmd/config.toml b/backend/cmd/config.toml index 069e2986f..7ac70bf6f 100644 --- a/backend/cmd/config.toml +++ b/backend/cmd/config.toml @@ -24,4 +24,4 @@ test = true [network] manual = false [transport] -propagate_fault = false +propagate_fault = true From 2da2f48af6146ac8c59f9c37300731589dadcc92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20F=C3=ADsica?= Date: Mon, 26 May 2025 20:04:48 +0200 Subject: [PATCH 5/5] Propagate fault with TCP packets --- backend/pkg/transport/transport.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index 50babc065..8fb199e4e 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -172,6 +172,14 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error { return } + if transport.propagateFault && packet.Id() == 0 { + connectionLogger.Info().Msg("replicating packet with id 0 to all boards") + err := transport.handlePacketEvent(NewPacketMessage(packet)) + if err != nil { + connectionLogger.Error().Err(err).Msg("failed to replicate packet") + } + } + from := conn.RemoteAddr().String() to := conn.LocalAddr().String()