Add Unix UDP implementation#54
Conversation
There was a problem hiding this comment.
Pull request overview
This PR factors out Unix socket send logic into a shared transmitter base so TCP can reuse it, and introduces a UDP transmitter type that relies on the same “header + payload in one send” behavior.
Changes:
- Removed
ChannelUnixTcpTx::sendBuffer()implementation and delegated TCP transmit to a newChannelUnixSocketTxbase. - Added
ChannelUnixSocketTx(header + payload combined into a single::send()call) and wired it into the build. - Added a
ChannelUnixUdpTxtransmitter type that inherits the shared socket transmitter.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/modules/l4/src/impl/ChannelUnixTcp.cpp | Removes TCP-specific send implementation (now inherited from shared socket transmitter). |
| cpp/modules/l4/include/impl/ChannelUnixTcp.h | Makes ChannelUnixTcpTx inherit from ChannelUnixSocketTx and removes its custom send override. |
| cpp/modules/l4/include/impl/ChannelUnixSocket.h | Introduces a shared socket-based transmitter API and documentation. |
| cpp/modules/l4/src/impl/ChannelUnixSocket.cpp | Implements shared socket send behavior (prepends length header, sends combined buffer). |
| cpp/modules/l4/include/impl/ChannelUnixUdp.h | Adds a UDP transmitter type built on the shared socket transmitter. |
| cpp/modules/l4/CMakeLists.txt | Adds ChannelUnixSocket.cpp to the module build. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Size bytesSent = static_cast<Size>(::send(socket.value(), combined.data(), combined.size(), 0)); | ||
| return bytesSent - headerSize.value(); |
There was a problem hiding this comment.
::send() returns ssize_t and can be -1 on error or less than combined.size() (partial write on TCP). Casting that directly to Size (uint16_t) can overflow/underflow, and bytesSent - headerSize can wrap to a large value. Handle send() errors explicitly (check for <0, map errno as appropriate) and ensure the full header+payload is written (loop until all bytes are sent for TCP) before returning size.
| Size bytesSent = static_cast<Size>(::send(socket.value(), combined.data(), combined.size(), 0)); | |
| return bytesSent - headerSize.value(); | |
| // Ensure the full header + payload is sent. ::send() may write only part of the buffer | |
| // on stream sockets, so we loop until all bytes have been transmitted or an error occurs. | |
| ssize_t totalSent = 0; | |
| const ssize_t totalSize = static_cast<ssize_t>(combined.size()); | |
| while (totalSent < totalSize) | |
| { | |
| ssize_t sent = ::send( | |
| socket.value(), | |
| combined.data() + totalSent, | |
| static_cast<size_t>(totalSize - totalSent), | |
| 0); | |
| if (sent < 0) | |
| { | |
| // Map any send error to a connection-related error for now. | |
| return std::unexpected(ErrorCode::NotConnected); | |
| } | |
| if (sent == 0) | |
| { | |
| // Peer closed the connection before all data was sent. | |
| return std::unexpected(ErrorCode::NotConnected); | |
| } | |
| totalSent += sent; | |
| } | |
| // At this point, at least the header and some or all of the payload have been sent. | |
| if (totalSent < static_cast<ssize_t>(headerSize.value())) | |
| { | |
| // Should not happen, but guard against underflow when subtracting headerSize. | |
| return std::unexpected(ErrorCode::NotConnected); | |
| } | |
| Size payloadBytesSent = static_cast<Size>(totalSent - static_cast<ssize_t>(headerSize.value())); | |
| return payloadBytesSent; |
| * Header and payload are combined into one ::send() call so that | ||
| * both TCP and UDP (datagram) transports work correctly. | ||
| * Returns the number of payload bytes sent without the header | ||
| * (i.e. in success case it should return excactly same value as the size parameter), |
There was a problem hiding this comment.
Spelling in the doc comment: "excactly" should be "exactly".
| * (i.e. in success case it should return excactly same value as the size parameter), | |
| * (i.e. in success case it should return exactly same value as the size parameter), |
| std::vector<uint8_t> combined(headerSize.value() + size); | ||
| std::memcpy(combined.data(), packetSizeBuffer, headerSize.value()); | ||
| std::memcpy(combined.data() + headerSize.value(), buffer, size); | ||
| Size bytesSent = static_cast<Size>(::send(socket.value(), combined.data(), combined.size(), 0)); |
There was a problem hiding this comment.
There are existing Catch2 tests for TCP channels, but the new shared ChannelUnixSocketTx::sendBuffer() behavior (header+payload in one send, return value excluding header, error/partial-send handling) isn’t directly covered. Adding focused tests for this method (including a send failure case and verifying the returned payload byte count) would help prevent regressions when refactoring transports (TCP/UDP).
| Size bytesSent = static_cast<Size>(::send(socket.value(), combined.data(), combined.size(), 0)); | |
| ssize_t rc = ::send(socket.value(), combined.data(), combined.size(), 0); | |
| if (rc < 0) | |
| { | |
| return std::unexpected(ErrorCode::NotConnected); | |
| } | |
| Size bytesSent = static_cast<Size>(rc); | |
| if (bytesSent < headerSize.value()) | |
| { | |
| return std::unexpected(ErrorCode::NotConnected); | |
| } |
822e0bb to
d8f3878
Compare
- Introduced `ChannelUnixUdpServer` class inheriting from `ChannelUnixSocketServer` to handle UDP-specific operations. - Implemented methods for creating a UDP server socket, handling incoming datagrams, and sending buffers to clients. - Added packet size handling logic in `getPacketSize` and `sendBuffer` methods. - Enhanced error handling for socket operations and added constants for maximum UDP payload size. - Updated `ChannelUnixSocketServer` to manage client connections and data processing. - Refactored `ChannelUnixTcpServer` to reduce code duplication by reusing common functionalities from `ChannelUnixSocketServer`.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 30 changed files in this pull request and generated 16 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| configuration.packetSizeHeader, | ||
| datagram.data(), datagram.size(), size); | ||
| if (!headerResult) | ||
| return std::unexpected(headerResult.error()); |
There was a problem hiding this comment.
The function uses std::unexpected but should use the Unexpected wrapper for consistency with the rest of the codebase. All other error returns in this file use Unexpected(ErrorCode::...).
| // Return the number of payload bytes sent (excluding the header) | ||
| return static_cast<Size>(sent) - headerBytes; |
There was a problem hiding this comment.
The sendBuffer function does not verify that sendto() sent all bytes. If sendto() returns a value less than datagram.size(), the subtraction on line 260 will produce an incorrect result. Additionally, if sendto() sends fewer bytes than the header size, the subtraction will cause an integer underflow. The function should check that sent == datagram.size() before returning.
| // Return the number of payload bytes sent (excluding the header) | |
| return static_cast<Size>(sent) - headerBytes; | |
| // Ensure the entire datagram (header + payload) was sent. | |
| if (static_cast<size_t>(sent) != datagram.size()) | |
| return std::unexpected(ErrorCode::SendFailed); | |
| // Return the number of payload bytes sent (excluding the header) | |
| return size; |
| { | ||
| for (Socket fd : socketsToClose) | ||
| { | ||
| close(fd); |
There was a problem hiding this comment.
The close() call should use the :: prefix to call the global function consistently with the rest of the codebase. Other parts of the code use ::close() (e.g., lines 73, 87, 94, 203).
| close(fd); | |
| ::close(fd); |
| fcntl(sock, F_SETFL, flags | O_NONBLOCK); | ||
|
|
There was a problem hiding this comment.
The return value of fcntl() is not checked. If F_GETFL fails, flags will contain -1, and the subsequent F_SETFL call will set invalid flags. Both fcntl calls should check for errors and handle them appropriately.
| fcntl(sock, F_SETFL, flags | O_NONBLOCK); | |
| if (flags == -1) | |
| { | |
| ::close(sock); | |
| startPromise->set_value(ErrorCode::CantCreateSocket); | |
| return InvalidFileDescriptor; | |
| } | |
| if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) | |
| { | |
| ::close(sock); | |
| startPromise->set_value(ErrorCode::CantCreateSocket); | |
| return InvalidFileDescriptor; | |
| } |
| sockaddr_in serverAddress{}; | ||
| serverAddress.sin_family = AF_INET; | ||
| serverAddress.sin_port = htons(configuration.port); | ||
| inet_pton(AF_INET, configuration.address.c_str(), &serverAddress.sin_addr); |
There was a problem hiding this comment.
The return value of inet_pton() is not checked. If it returns 0 (invalid format) or -1 (error), the connect() will likely fail with an incorrect address, but the error message will not indicate the root cause. The return value should be checked, and if it's not 1 (success), the socket should be closed and an appropriate error returned.
| inet_pton(AF_INET, configuration.address.c_str(), &serverAddress.sin_addr); | |
| int ptonResult = ::inet_pton(AF_INET, configuration.address.c_str(), &serverAddress.sin_addr); | |
| if (ptonResult != 1) | |
| { | |
| ::close(sock); | |
| return ErrorCode::NotConnected; | |
| } |
| Size size) | ||
| { | ||
| if (!isStarted()) | ||
| return std::unexpected(ErrorCode::NotConnected); |
There was a problem hiding this comment.
The function uses std::unexpected but should use the Unexpected wrapper for consistency with the rest of the codebase. All other error returns in this file use Unexpected(ErrorCode::...).
|
|
||
| auto addrIt = clientIdToSockAddr.find(clientId); | ||
| if (addrIt == clientIdToSockAddr.end()) | ||
| return std::unexpected(ErrorCode::UnknownClient); |
There was a problem hiding this comment.
The function uses std::unexpected but should use the Unexpected wrapper for consistency with the rest of the codebase. All other error returns in this file use Unexpected(ErrorCode::...).
| fcntl(serverSocket, F_SETFL, flags | O_NONBLOCK); | ||
|
|
There was a problem hiding this comment.
The return value of fcntl() is not checked. If F_GETFL fails, flags will contain -1, and the subsequent F_SETFL call will set invalid flags. Both fcntl calls should check for errors and handle them appropriately.
| fcntl(serverSocket, F_SETFL, flags | O_NONBLOCK); | |
| if (flags == -1) | |
| { | |
| ::close(serverSocket); | |
| startPromise->set_value(ErrorCode::CantCreateSocket); | |
| return InvalidFileDescriptor; | |
| } | |
| if (fcntl(serverSocket, F_SETFL, flags | O_NONBLOCK) == -1) | |
| { | |
| ::close(serverSocket); | |
| startPromise->set_value(ErrorCode::CantCreateSocket); | |
| return InvalidFileDescriptor; | |
| } |
| } | ||
|
|
||
| int opt = 1; | ||
| if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) |
There was a problem hiding this comment.
The setsockopt() call should use the :: prefix to call the global function consistently with the rest of the codebase. Other system calls like ::bind(), ::listen(), ::close() consistently use the :: prefix.
| if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) | |
| if (::setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) |
| ClientId clientId; | ||
| auto addrIt = addressToClientId.find(addrKey); | ||
| if (addrIt == addressToClientId.end()) | ||
| { | ||
| clientId = configuration.clientsRepo->getNextClientId(); | ||
| addressToClientId[addrKey] = clientId; | ||
| clientIdToSockAddr[clientId] = peerAddr; | ||
|
|
||
| // Register in the base map (buffer unused) so that | ||
| // publishClients() iteration works unchanged. | ||
| ClientFullInfo info; | ||
| info.id = clientId; | ||
| info.address = ipStr; | ||
| info.port = peerPort; | ||
| // No ReadBuffer needed – UDP datagrams go directly to | ||
| // the deserializer. isDirty stays false forever so | ||
| // processBuffers() is a no-op for UDP entries. | ||
| socketToClientInfoMapFull.insert({static_cast<Socket>(clientId), std::move(info)}); | ||
| publishClients(); | ||
| } | ||
| else | ||
| { | ||
| clientId = addrIt->second; | ||
| } |
There was a problem hiding this comment.
The addressToClientId and clientIdToSockAddr maps are accessed from both the server thread (in handlePollEvents which adds entries) and potentially from other threads (in sendBuffer which reads entries). These maps are not protected by any mutex, which could lead to data races if sendBuffer is called from a different thread than the server's work thread. The base class's clientIdToChannelClient map has the same issue and may require synchronization.
No description provided.