Skip to content

Commit f4cce40

Browse files
feat(backend, udp): add UDP configuration and enhance UDP server functionality
1 parent 48e4a86 commit f4cce40

File tree

6 files changed

+110
-52
lines changed

6 files changed

+110
-52
lines changed

backend/cmd/config.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ max_retries = 0 # Maximum retries before cycling (0 = infinite retr
3434
connection_timeout_ms = 1000 # Connection timeout in milliseconds
3535
keep_alive_ms = 1000 # Keep-alive interval in milliseconds
3636

37+
# UDP Configuration
38+
# These settings control the UDP server's buffer sizes and performance characteristics
39+
[udp]
40+
ring_buffer_size = 64 # Size of the ring buffer for incoming packets (number of packets, not bytes)
41+
packet_chan_size = 16 # Size of the channel buffer
42+
3743
# TFTP Configuration
3844
[tftp]
3945
block_size = 131072 # TFTP block size in bytes (128kB)

backend/cmd/dev-config.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ boards = ["HVSCU", "HVSCU-Cabinet", "PCU", "LCU", "BCU", "BMSL"]
2121
branch = "software" # Leave blank when using ADJ as a submodule (like this: "")
2222
validate = true # Execute ADJ validator before starting backend
2323

24-
2524
# Transport Configuration
2625
[transport]
2726
propagate_fault = false
@@ -36,6 +35,12 @@ max_retries = 0 # Maximum retries before cycling (0 = infinite re
3635
connection_timeout_ms = 999999 # Timeout for the initial connection attempt
3736
keep_alive_ms = 0 # Keep-alive interval in milliseconds (0 to disable)
3837

38+
# UDP Configuration
39+
# These settings control the UDP server's buffer sizes and performance characteristics
40+
[udp]
41+
ring_buffer_size = 64 # Size of the ring buffer for incoming packets (number of packets, not bytes)
42+
packet_chan_size = 16 # Size of the channel buffer
43+
3944
# Server Configuration
4045
[server.ethernet-view]
4146
address = "127.0.0.1:4040"

backend/cmd/setup_transport.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func configureTransport(
5050
configureTCPServerTransport(adj, transp)
5151

5252
// Start handling network packets using UDP server
53-
configureUDPServerTransport(adj, transp)
53+
configureUDPServerTransport(adj, transp, config)
5454

5555
}
5656

@@ -133,9 +133,11 @@ func configureTCPServerTransport(
133133
func configureUDPServerTransport(
134134
adj adj_module.ADJ,
135135
transp *transport.Transport,
136+
config config.Config,
137+
136138
) {
137139
trace.Info().Msg("Starting UDP server")
138-
udpServer := udp.NewServer(adj.Info.Addresses[BACKEND], adj.Info.Ports[UDP], &trace.Logger)
140+
udpServer := udp.NewServer(adj.Info.Addresses[BACKEND], adj.Info.Ports[UDP], &trace.Logger, config.UDP.RingBufferSize, config.UDP.PacketChanSize)
139141
err := udpServer.Start()
140142
if err != nil {
141143
trace.Fatal().Err(err).Msg("failed to start UDP server: " + err.Error())

backend/internal/config/config_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ type TCP struct {
3636
KeepAlive int `toml:"keep_alive_ms"`
3737
}
3838

39+
type UDP struct {
40+
RingBufferSize int `toml:"ring_buffer_size"`
41+
PacketChanSize int `toml:"packet_chan_size"`
42+
}
43+
3944
type Logging struct {
4045
TimeUnit logger.TimeUnit `toml:"time_unit"`
4146
LoggingPath string `toml:"logging_path"`
@@ -49,5 +54,6 @@ type Config struct {
4954
Transport Transport
5055
TFTP TFTP
5156
TCP TCP
57+
UDP UDP
5258
Logging Logging
5359
}

backend/pkg/transport/network/udp/server.go

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package udp
33
import (
44
"fmt"
55
"net"
6+
"sync"
67
"time"
78

89
"github.com/rs/zerolog"
@@ -25,17 +26,32 @@ type Server struct {
2526
packetsCh chan Packet
2627
errorsCh chan error
2728
stopCh chan struct{}
29+
stopped bool
30+
31+
ring []Packet
32+
head int
33+
tail int
34+
count int
35+
ringMutex sync.Mutex
36+
notEmpty *sync.Cond
2837
}
2938

30-
func NewServer(address string, port uint16, logger *zerolog.Logger) *Server {
31-
return &Server{
39+
func NewServer(address string, port uint16, logger *zerolog.Logger, ringBufferSize int, packetChanSize int) *Server {
40+
s := &Server{
3241
address: address,
3342
port: port,
3443
logger: logger,
35-
packetsCh: make(chan Packet, 1000),
44+
packetsCh: make(chan Packet, packetChanSize),
3645
errorsCh: make(chan error, 100),
3746
stopCh: make(chan struct{}),
3847
}
48+
49+
s.ring = make([]Packet, ringBufferSize)
50+
s.head = 0
51+
s.tail = 0
52+
s.count = 0
53+
s.notEmpty = sync.NewCond(&s.ringMutex)
54+
return s
3955
}
4056

4157
func (s *Server) Start() error {
@@ -56,6 +72,7 @@ func (s *Server) Start() error {
5672
Msg("UDP server started")
5773

5874
go s.readLoop()
75+
go s.dispatchLoop()
5976
return nil
6077
}
6178

@@ -69,7 +86,7 @@ func (s *Server) readLoop() {
6986
default:
7087
// Set read deadline to allow periodic checking of stop channel
7188
s.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
72-
89+
7390
n, addr, err := s.conn.ReadFromUDP(buffer)
7491
if err != nil {
7592
// Check if it's a timeout error (expected)
@@ -104,11 +121,8 @@ func (s *Server) readLoop() {
104121
Int("size", len(payload)).
105122
Msg("received UDP packet")
106123

107-
select {
108-
case s.packetsCh <- packet:
109-
default:
110-
s.logger.Warn().Msg("packet channel full, dropping packet")
111-
}
124+
// Push packet to ring buffer
125+
s.push(packet)
112126
}
113127
}
114128
}
@@ -122,9 +136,73 @@ func (s *Server) GetErrors() <-chan error {
122136
}
123137

124138
func (s *Server) Stop() error {
139+
s.ringMutex.Lock()
140+
s.stopped = true
125141
close(s.stopCh)
142+
s.notEmpty.Broadcast() // despertar a los que esperan
143+
s.ringMutex.Unlock()
144+
126145
if s.conn != nil {
127146
return s.conn.Close()
128147
}
129148
return nil
130-
}
149+
}
150+
151+
func (s *Server) push(p Packet) {
152+
153+
s.ringMutex.Lock()
154+
defer s.ringMutex.Unlock()
155+
156+
if s.count == len(s.ring) {
157+
s.logger.Warn().Msg("Ring buffer full, overwriting oldest UDP packet")
158+
s.head = (s.head + 1) % len(s.ring)
159+
s.count--
160+
}
161+
162+
s.ring[s.tail] = p
163+
s.tail = (s.tail + 1) % len(s.ring)
164+
s.count++
165+
166+
s.notEmpty.Signal()
167+
}
168+
169+
func (s *Server) pop() (Packet, bool) {
170+
171+
s.ringMutex.Lock()
172+
defer s.ringMutex.Unlock()
173+
174+
for s.count == 0 && !s.stopped {
175+
s.notEmpty.Wait()
176+
}
177+
178+
if s.count == 0 && s.stopped {
179+
return Packet{}, false
180+
}
181+
182+
p := s.ring[s.head]
183+
s.head = (s.head + 1) % len(s.ring)
184+
s.count--
185+
186+
return p, true
187+
}
188+
189+
func (s *Server) dispatchLoop() {
190+
for {
191+
select {
192+
case <-s.stopCh:
193+
return
194+
default:
195+
}
196+
197+
packet, ok := s.pop()
198+
if !ok {
199+
return
200+
}
201+
202+
select {
203+
case s.packetsCh <- packet:
204+
case <-s.stopCh:
205+
return
206+
}
207+
}
208+
}

backend/pkg/transport/transport_test.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -943,45 +943,6 @@ func TestHandleServer_AcceptsAndDispatches(t *testing.T) {
943943
}
944944
}
945945

946-
func TestHandleUDPServer_Dispatches(t *testing.T) {
947-
tr, api := createTestTransport(t)
948-
tr.SetpropagateFault(false)
949-
950-
port := getAvailableUDPPort(t)
951-
logger := zerolog.Nop()
952-
server := udp.NewServer("127.0.0.1", port, &logger)
953-
if err := server.Start(); err != nil {
954-
t.Fatalf("failed to start UDP server: %v", err)
955-
}
956-
defer server.Stop()
957-
958-
go tr.HandleUDPServer(server)
959-
960-
packet := data.NewPacket(100)
961-
packet.SetTimestamp(time.Unix(0, 0))
962-
buf, err := tr.encoder.Encode(packet)
963-
if err != nil {
964-
t.Fatalf("encode failed: %v", err)
965-
}
966-
defer tr.encoder.ReleaseBuffer(buf)
967-
968-
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: int(port)})
969-
if err != nil {
970-
t.Fatalf("failed to dial UDP server: %v", err)
971-
}
972-
defer conn.Close()
973-
974-
if _, err := conn.Write(buf.Bytes()); err != nil {
975-
t.Fatalf("failed to send UDP packet: %v", err)
976-
}
977-
978-
if err := waitForCondition(func() bool {
979-
return len(api.GetNotifications()) > 0
980-
}, 2*time.Second, "Should receive notification from UDP server"); err != nil {
981-
t.Fatal(err)
982-
}
983-
}
984-
985946
func TestHandleConversation_DispatchesAndStopsOnError(t *testing.T) {
986947
tr, api := createTestTransport(t)
987948

0 commit comments

Comments
 (0)