Skip to content

Commit d8f3878

Browse files
committed
Add UDP server implementation and related functionalities
- Introduced `ChannelUnixUdpServer` class inheriting from `ChannelUnixSocketServer` to handle UDP-specific operations. - Implemented methods for creating a UDP server socket, handling incoming datagrams, and sending buffers to clients. - Added packet size handling logic in `getPacketSize` and `sendBuffer` methods. - Enhanced error handling for socket operations and added constants for maximum UDP payload size. - Updated `ChannelUnixSocketServer` to manage client connections and data processing. - Refactored `ChannelUnixTcpServer` to reduce code duplication by reusing common functionalities from `ChannelUnixSocketServer`.
1 parent 05a2955 commit d8f3878

11 files changed

Lines changed: 798 additions & 302 deletions

File tree

cpp/modules/l4/include/Channel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include <expected>
4+
35
#include "data.h"
46
#include "SerializerL4.h"
57

cpp/modules/l4/include/cxx_17/data.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,17 @@ namespace styxlib
1616
};
1717

1818
using SizeResult = ExpectedSizeResult;
19+
20+
/**
21+
* Reads the first X bytes of buffer and interprets them as a packet size.
22+
* @param headerSize The size of the packet header (1, 2, or 4 bytes).
23+
* @param buffer The buffer to read from.
24+
* @param bufferSize The number of valid bytes available in the buffer.
25+
* @return The decoded packet payload size, or an ErrorCode if the buffer
26+
* is too small to hold the header.
27+
*/
28+
SizeResult getPacketSize(
29+
const PacketHeaderSize &headerSize,
30+
const uint8_t *buffer,
31+
Size bufferSize);
1932
}

cpp/modules/l4/include/cxx_23/data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44
namespace styxlib
55
{
66
using SizeResult = std::expected<Size, ErrorCode>;
7+
8+
79
}

cpp/modules/l4/include/data.h

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace styxlib
1919
UnknownClient,
2020
BufferTooSmall,
2121
InvalidHeaderSize,
22-
NullptrArgument
22+
NullptrArgument,
23+
SendFailed
2324
};
2425

2526
enum class PacketHeaderSize : uint8_t
@@ -29,6 +30,29 @@ namespace styxlib
2930
Size4Bytes = 4
3031
};
3132

33+
34+
35+
// inline uint8_t to_uint8_t(const PacketHeaderSize &headerSize)
36+
// {
37+
// return static_cast<uint8_t>(headerSize);
38+
// }
39+
40+
// /**
41+
// * Sets the packet size in the provided buffer according to the specified header size.
42+
// * @param headerSize The size of the packet header (1, 2, or 4 bytes).
43+
// * @param buffer The buffer where the packet size will be set.
44+
// * @param bufferSize The size of the buffer.
45+
// * @param packetSize The size of the packet to set.
46+
// * @return The number of bytes used for the header, or an ErrorCode if an error occurs.
47+
// */
48+
// std::expected<uint8_t, ErrorCode> setPacketSize(
49+
// const PacketHeaderSize &headerSize,
50+
// uint8_t *buffer,
51+
// Size bufferSize,
52+
// Size packetSize);
53+
54+
55+
3256
using StyxString = std::string;
3357
using StyxDate = uint32_t;
3458
using StyxBuffer = uint8_t *;
@@ -71,4 +95,20 @@ namespace styxlib
7195
#else
7296
// Handle older standards (C++14, C++11, etc.)
7397
#error "This library requires at least C++17."
74-
#endif
98+
#endif
99+
100+
namespace styxlib
101+
{
102+
/**
103+
* Reads the first X bytes of buffer and interprets them as a packet size.
104+
* @param headerSize The size of the packet header (1, 2, or 4 bytes).
105+
* @param buffer The buffer to read from.
106+
* @param bufferSize The number of valid bytes available in the buffer.
107+
* @return The decoded packet payload size, or an ErrorCode if the buffer
108+
* is too small to hold the header.
109+
*/
110+
SizeResult getPacketSize(
111+
const PacketHeaderSize &headerSize,
112+
const uint8_t *buffer,
113+
Size bufferSize);
114+
}

cpp/modules/l4/include/impl/ChannelUnixSocket.h

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
#pragma once
22

3-
#include <optional>
3+
#include <map>
4+
#include <thread>
5+
#include <atomic>
46
#include <future>
7+
#include <optional>
58
#include <string>
9+
#include <poll.h>
610

711
#include "Channel.h"
12+
#include "ChannelTx.h"
13+
#include "ClientsRepo.h"
814
#include "impl/ChannelUnixFile.h"
15+
#include "impl/ProgressObservableMutexImpl.h"
916

1017
namespace styxlib
1118
{
@@ -88,4 +95,130 @@ namespace styxlib
8895
std::future<void> disconnect();
8996
bool isConnected() const;
9097
};
98+
99+
/**
100+
* Common base for socket-based server channels (TCP, UDP).
101+
*
102+
* Handles the shared lifecycle (start / stop / isStarted), the poll loop,
103+
* client bookkeeping, processBuffers, and cleanupClosedSockets.
104+
*
105+
* Subclasses must implement createServerSocket() to set up the protocol-
106+
* specific listen/bind socket. Subclasses may also override
107+
* acceptClients(), readDataFromSocket(), handlePollEvents(),
108+
* cleanupClosedSockets(), and sendBuffer() to customise per-protocol
109+
* behaviour (e.g. UDP does not call accept() and uses recvfrom/sendto).
110+
*/
111+
class ChannelUnixSocketServer : public ChannelRx, public ChannelTx
112+
{
113+
public:
114+
class Configuration
115+
{
116+
public:
117+
const uint16_t port;
118+
const std::shared_ptr<ClientsRepo> clientsRepo{nullptr};
119+
const PacketHeaderSize packetSizeHeader{PacketHeaderSize::Size2Bytes};
120+
const uint16_t iounit{8192};
121+
const DeserializerL4Ptr deserializer{nullptr};
122+
const uint16_t maxClients{16};
123+
124+
Configuration(
125+
uint16_t port,
126+
std::shared_ptr<ClientsRepo> clientsRepo,
127+
PacketHeaderSize packetSizeHeader,
128+
uint16_t iounit,
129+
DeserializerL4Ptr deserializer,
130+
uint8_t maxClients)
131+
: port(port),
132+
clientsRepo(clientsRepo),
133+
packetSizeHeader(packetSizeHeader),
134+
iounit(iounit),
135+
deserializer(deserializer),
136+
maxClients(maxClients)
137+
{
138+
}
139+
};
140+
141+
struct ClientInfo
142+
{
143+
ClientId id{0};
144+
std::string address{""};
145+
uint16_t port{0};
146+
};
147+
148+
protected:
149+
class ClientFullInfo : public ClientInfo, public ReadBuffer
150+
{
151+
};
152+
153+
const Configuration configuration;
154+
std::thread serverThread;
155+
Socket serverSocketFd{InvalidFileDescriptor};
156+
157+
// Sockets / pseudo-sockets -> per-client state
158+
std::map<Socket, ClientFullInfo> socketToClientInfoMapFull;
159+
std::vector<Socket> socketsToClose;
160+
161+
// Per-client transmit channels (used by connection-oriented protocols)
162+
std::map<ClientId, std::shared_ptr<ChannelUnixSocketTx>> clientIdToChannelClient;
163+
164+
ProgressObservableMutexImpl<std::vector<ClientInfo>> clientsObserver;
165+
std::atomic<bool> running{false};
166+
std::atomic<bool> stopRequested{false};
167+
std::unique_ptr<std::promise<ErrorCode>> startPromise;
168+
std::vector<pollfd> pollFds;
169+
170+
// ── Extension points ─────────────────────────────────────────────────
171+
/**
172+
* Create, configure and bind (and optionally listen on) the server
173+
* socket. Must return a valid fd on success or set startPromise with
174+
* an error code and return InvalidFileDescriptor on failure.
175+
*/
176+
virtual Socket createServerSocket() = 0;
177+
178+
/** Called when the server-socket poll fd fires with POLLIN.
179+
* Default: calls accept() in a loop until EAGAIN.
180+
* Return true to indicate a new client was accepted (loop continues). */
181+
virtual bool acceptClients(Socket serverSocket);
182+
183+
/** Called when a client-socket poll fd fires with POLLIN.
184+
* Default: reads via recv() into the client's ReadBuffer. */
185+
virtual void readDataFromSocket(Socket clientFd);
186+
187+
/** Dispatch POLLIN / POLLERR / POLLHUP events for all ready fds.
188+
* Default implementation: server fd => acceptClients loop,
189+
* client fd => readDataFromSocket,
190+
* POLLERR|POLLHUP => mark socket for closure. */
191+
virtual void handlePollEvents(Socket serverSocket, size_t numEvents);
192+
193+
/** Remove sockets that were queued in socketsToClose.
194+
* Default: calls ::close(), removes from pollFds and maps. */
195+
virtual void cleanupClosedSockets();
196+
197+
/** Process dirty ReadBuffers held in socketToClientInfoMapFull and
198+
* forward complete packets to the deserializer. Not virtual – the
199+
* framing logic is identical for all transports. */
200+
void processBuffers();
201+
202+
void workThreadFunction();
203+
204+
/** Helper to rebuild and publish the clients observer vector. */
205+
void publishClients();
206+
207+
/** Called at the end of stop() so subclasses can clean up protocol-
208+
* specific state (e.g. address maps in the UDP server). */
209+
virtual void onStop() {}
210+
211+
public:
212+
explicit ChannelUnixSocketServer(const Configuration &config);
213+
~ChannelUnixSocketServer() override;
214+
215+
SizeResult sendBuffer(ClientId clientId, const StyxBuffer buffer, Size size) override;
216+
std::future<ErrorCode> start();
217+
std::future<void> stop();
218+
bool isStarted() const;
219+
ProgressObserver<std::vector<ClientInfo>> &getClientsObserver()
220+
{
221+
return clientsObserver;
222+
}
223+
};
91224
} // namespace styxlib
Lines changed: 18 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
#pragma once
22

3-
#include <map>
4-
#include <thread>
5-
#include <atomic>
6-
#include <future>
73
#include <optional>
8-
#include <poll.h>
94

105
#include "Channel.h"
116
#include "ChannelTx.h"
@@ -44,77 +39,28 @@ namespace styxlib
4439
std::future<ErrorCode> connect() override;
4540
};
4641

47-
class ChannelUnixTcpServer : public ChannelRx, public ChannelTx
42+
/**
43+
* TCP server channel.
44+
* Inherits the common server lifecycle, poll loop, buffer processing and
45+
* client bookkeeping from ChannelUnixSocketServer. Only the TCP-specific
46+
* socket setup (SOCK_STREAM / listen) and per-client accept/recv logic
47+
* is implemented here.
48+
*/
49+
class ChannelUnixTcpServer : public ChannelUnixSocketServer
4850
{
4951
public:
50-
class Configuration
51-
{
52-
public:
53-
const uint16_t port;
54-
const std::shared_ptr<ClientsRepo> clientsRepo{nullptr};
55-
const PacketHeaderSize packetSizeHeader{PacketHeaderSize::Size2Bytes};
56-
const uint16_t iounit{8192};
57-
const DeserializerL4Ptr deserializer{nullptr};
58-
const uint16_t maxClients{16};
52+
// Re-export the shared Configuration and ClientInfo types.
53+
using Configuration = ChannelUnixSocketServer::Configuration;
54+
using ClientInfo = ChannelUnixSocketServer::ClientInfo;
5955

60-
Configuration(
61-
uint16_t port,
62-
std::shared_ptr<ClientsRepo> clientsRepo,
63-
PacketHeaderSize packetSizeHeader,
64-
uint16_t iounit,
65-
DeserializerL4Ptr deserializer,
66-
uint8_t maxClients)
67-
: port(port),
68-
clientsRepo(clientsRepo),
69-
packetSizeHeader(packetSizeHeader),
70-
iounit(iounit),
71-
deserializer(deserializer),
72-
maxClients(maxClients)
73-
{
74-
}
75-
};
76-
struct ClientInfo
77-
{
78-
ClientId id{0};
79-
std::string address{""};
80-
uint16_t port{0};
81-
};
56+
explicit ChannelUnixTcpServer(const Configuration &config);
57+
ChannelUnixTcpServer(ChannelUnixTcpServer &&) = delete;
58+
ChannelUnixTcpServer &operator=(ChannelUnixTcpServer &&) = delete;
59+
~ChannelUnixTcpServer() override = default;
8260

8361
protected:
84-
class ClientFullInfo : public ClientInfo, public ReadBuffer
85-
{
86-
};
87-
const Configuration configuration;
88-
std::thread serverThread;
89-
// Sockets
90-
std::map<Socket, ClientFullInfo> socketToClientInfoMapFull;
91-
std::vector<Socket> socketsToClose;
92-
// Clients
93-
std::map<ClientId, std::shared_ptr<ChannelUnixTcpTx>> clientIdToChannelClient;
94-
95-
ProgressObservableMutexImpl<std::vector<ClientInfo>> clientsObserver;
96-
std::atomic<bool> running{false};
97-
std::atomic<bool> stopRequested{false};
98-
std::unique_ptr<std::promise<ErrorCode>> startPromise;
99-
std::vector<pollfd> pollFds;
100-
101-
void workThreadFunction();
102-
virtual bool acceptClients(Socket serverSocket);
103-
virtual void readDataFromSocket(Socket clientFd);
104-
void handlePollEvents(Socket serverSocket, size_t numEvents); // Handle poll events
105-
void cleanupClosedSockets(); // Clean up sockets marked for closure
106-
void processBuffers(); // Check dirty buffers and send data to deserializer
107-
108-
public:
109-
ChannelUnixTcpServer(const Configuration &config);
110-
~ChannelUnixTcpServer() override;
111-
SizeResult sendBuffer(ClientId clientId, const StyxBuffer buffer, Size size) override;
112-
std::future<ErrorCode> start();
113-
std::future<void> stop();
114-
ProgressObserver<std::vector<ClientInfo>> &getClientsObserver()
115-
{
116-
return clientsObserver;
117-
}
118-
bool isStarted() const;
62+
Socket createServerSocket() override;
63+
bool acceptClients(Socket serverSocket) override;
64+
void readDataFromSocket(Socket clientFd) override;
11965
};
12066
}

0 commit comments

Comments
 (0)