Add UnixFileChannel#51
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR adds UnixFileChannel functionality to enable communication via Unix file descriptors (pipes). The implementation provides a new abstraction layer for file descriptor-based channels and a concrete pipe implementation.
- Introduces
ChannelUnixFilebase class for file descriptor-based communication - Adds
ChannelUnixPipeImplfor Unix pipe communication - Unifies channel interfaces by removing
ChannelTxOneToManyand usingChannelTxconsistently - Adds comprehensive test coverage for the new pipe channel functionality
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/modules/l4/include/Channel.h | Moves PacketHeaderSize enum to this file and consolidates channel TX interfaces |
| cpp/modules/l4/include/data.h | Changes ClientId from int to uint16_t and adds InvalidClientId constant |
| cpp/modules/l4/include/impl/ChannelUnixFile.h | New base class for file descriptor-based channel communication |
| cpp/modules/l4/include/impl/ChannelUnixPipeImpl.h | New header for Unix pipe channel implementation |
| cpp/modules/l4/include/impl/ChannelUnixTcp.h | Updates to use unified channel interface and new file descriptor types |
| cpp/modules/l4/include/impl/ClientsRepoImpl.h | Initializes client ID counter to start after InvalidClientId |
| cpp/modules/l4/src/impl/ChannelUnixFile.cpp | Implements file descriptor send/receive operations with packet framing |
| cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp | Implements bidirectional pipe-based communication channel |
| cpp/modules/l4/src/impl/ChannelUnixTcp.cpp | Updates TCP channel to use unified interface and adds clientId parameter |
| cpp/modules/l4/test/TestClasses.h | Extracts common test deserializer class for reuse across test files |
| cpp/modules/l4/test/test_ChannelUnixTcp.cpp | Updates test to use extracted test class and new send API signature |
| cpp/modules/l4/test/test_ChannelUnixPipeImpl.cpp | New comprehensive test suite for pipe channel functionality |
| cpp/modules/l4/CMakeLists.txt | Adds new source and test files to build configuration |
| .vscode/tasks.json | Enhances task presentation settings for better debugging experience |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| running.store(true); | ||
| // Create pipes | ||
| int pipeFds[2]; | ||
| if (pipe(pipeFds) == -1) | ||
| { | ||
| startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket)); | ||
| return; | ||
| } | ||
| rxFds.readFd = pipeFds[0]; | ||
| rxFds.writeFd = pipeFds[1]; | ||
| if (pipe(pipeFds) == -1) | ||
| { | ||
| startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Potential race condition. The running flag is set to true immediately at the start of workThreadFunction, but if pipe creation fails (lines 81-92), the function returns early without setting running back to false. This leaves the isStarted() method returning true even though the channel failed to start, which could prevent subsequent start() attempts.
| #include "impl/ChannelUnixFile.h" | ||
|
|
||
| #include <unistd.h> | ||
| #include <iostream> |
There was a problem hiding this comment.
Missing required includes for htonl, strerror, and std::memcpy. The file uses htonl (line 60) which requires <arpa/inet.h> or <netinet/in.h>, strerror (lines 72, 109, 118) which requires <cstring> or <string.h>, and std::memcpy (line 61) which requires <cstring>.
| #include <iostream> | |
| #include <iostream> | |
| #include <arpa/inet.h> | |
| #include <cstring> |
| break; | ||
| case PacketHeaderSize::Size2Bytes: | ||
| packetSizeReadResult = ::read(fds.readFd, buffer, 2); | ||
| packetSize = buffer[1] | (buffer[0] << 8); |
There was a problem hiding this comment.
Inconsistent byte order handling for 2-byte packet size. In sendBuffer (line 55-56), the size is sent in little-endian format (buffer[0] = size & 0xFF; buffer[1] = (size >> 8) & 0xFF), but in readBufferBlocking, it's read as big-endian (buffer[1] | (buffer[0] << 8)). This will cause data corruption. The read operation should match the write operation: packetSize = buffer[0] | (buffer[1] << 8);
| packetSize = buffer[1] | (buffer[0] << 8); | |
| packetSize = buffer[0] | (buffer[1] << 8); |
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case PacketHeaderSize::Size1Byte: | ||
| buffer[0] = static_cast<uint8_t>(size); | ||
| break; | ||
| case PacketHeaderSize::Size2Bytes: | ||
| buffer[1] = size & 0xFF; | ||
| buffer[0] = (size >> 8) & 0xFF; | ||
| break; | ||
| case PacketHeaderSize::Size4Bytes: | ||
| buffer[3] = size & 0xFF; | ||
| buffer[2] = (size >> 8) & 0xFF; | ||
| buffer[1] = (size >> 16) & 0xFF; | ||
| buffer[0] = (size >> 24) & 0xFF; | ||
| break; |
There was a problem hiding this comment.
Bug: Missing packet size validation. The function should validate that the packet size can fit in the specified header size:
Size1Byte: size must be <= 255Size2Bytes: size must be <= 65535Size4Bytes: no limit for uint16_t Size type
Without this validation, silent truncation occurs when sizes exceed the header capacity, leading to incorrect packet parsing. The removed code in ChannelUnixTcp.cpp had this validation returning ErrorCode::PacketTooLarge.
| if (size < 4) { | ||
| throw std::invalid_argument("Size must be at least 4 to fit the header"); | ||
| } |
There was a problem hiding this comment.
Bug: Incorrect parameter validation. The check validates that size (the packet size to encode) is at least 4, but this doesn't make sense. The size parameter represents the packet data size being encoded into the header, which can be any value from 0 up to the header's maximum capacity. The buffer capacity is always 4 bytes (from the array declaration), so this check is unnecessary and validates the wrong thing.
| if (size < 4) { | |
| throw std::invalid_argument("Size must be at least 4 to fit the header"); | |
| } |
| { | ||
| private: | ||
| std::atomic_int nextId{0}; | ||
| std::atomic_int nextId{InvalidClientId + 1}; |
There was a problem hiding this comment.
Bug: Type mismatch between std::atomic_int and ClientId (now uint16_t). The atomic should be std::atomic<uint16_t> or std::atomic<ClientId> to match the return type. With the current implementation, the counter can overflow beyond 65535 (uint16_t max), causing truncation when returned as ClientId. This can lead to duplicate client IDs after 65536 clients.
| std::atomic_int nextId{InvalidClientId + 1}; | |
| std::atomic<ClientId> nextId{InvalidClientId + 1}; |
| running.store(true); | ||
| // Create pipes | ||
| int pipeFds[2]; | ||
| if (pipe(pipeFds) == -1) | ||
| { | ||
| startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket)); | ||
| return; |
There was a problem hiding this comment.
Bug: The running flag is set to true at the start of the function, but if pipe creation fails (lines 81-84 or 88-91), the function returns early without setting running back to false. This leaves the channel in an inconsistent state where isStarted() returns true but the channel is not actually running. The flag should only be set to true after successful initialization, or reset to false before returning on error.
| ssize_t result = ::write(fds.writeFd, packetSizeBuffer, headerSize); | ||
| if (result < 0) | ||
| { | ||
| std::cerr << "Error writing packet size to fd " << fds.writeFd << ": " << strerror(errno) << std::endl; |
There was a problem hiding this comment.
Bug: Missing include for strerror. The function strerror is used but <cstring> is not included. Add #include <cstring> to the includes at the top of the file.
| } | ||
|
|
||
| ::send(socket.value(), packetSizeBuffer, static_cast<uint8_t>(packetSizeHeader), 0); | ||
| uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer)); |
There was a problem hiding this comment.
Bug: setPacketSize is being called with sizeof(packetSizeBuffer) (which is always 4) instead of the actual packet size. This will write the buffer size to the header instead of the message size, causing the receiver to read the wrong amount of data. Should be: setPacketSize(packetSizeHeader, packetSizeBuffer, size)
| uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer)); | |
| uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, size); |
| } | ||
| // Send the buffer over the file descriptor | ||
| uint8_t packetSizeBuffer[4] = {0}; | ||
| uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer)); |
There was a problem hiding this comment.
Bug: setPacketSize is being called with sizeof(packetSizeBuffer) (which is always 4) instead of the actual packet size. This will write the buffer size to the header instead of the message size, causing the receiver to read the wrong amount of data. Should be: setPacketSize(config.packetSizeHeader, packetSizeBuffer, size)
| uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer)); | |
| uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, size); |
| break; | ||
| if (num_events < 0) | ||
| { | ||
| std::cerr << "Poll error: " << strerror(errno) << std::endl; |
There was a problem hiding this comment.
Bug: Missing include for strerror. The function strerror is used but <cstring> is not included. Add #include <cstring> to the includes at the top of the file.
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| fds.readFd = InvalidFileDescriptor; | ||
| fds.writeFd = InvalidFileDescriptor; | ||
| rxFds.readFd = InvalidFileDescriptor; | ||
| rxFds.writeFd = InvalidFileDescriptor; | ||
| txFds.readFd = InvalidFileDescriptor; | ||
| txFds.writeFd = InvalidFileDescriptor; |
There was a problem hiding this comment.
The closeDescriptors() method doesn't check isDescriptorOwned before resetting fds, rxFds, and txFds to InvalidFileDescriptor (lines 158-163). This means even when the descriptors aren't owned (and thus shouldn't be closed), the fields are still reset. This could cause issues if the caller expects to reuse these descriptors. Consider only resetting the fields when isDescriptorOwned is true.
| } | |
| fds.readFd = InvalidFileDescriptor; | |
| fds.writeFd = InvalidFileDescriptor; | |
| rxFds.readFd = InvalidFileDescriptor; | |
| rxFds.writeFd = InvalidFileDescriptor; | |
| txFds.readFd = InvalidFileDescriptor; | |
| txFds.writeFd = InvalidFileDescriptor; | |
| fds.readFd = InvalidFileDescriptor; | |
| fds.writeFd = InvalidFileDescriptor; | |
| rxFds.readFd = InvalidFileDescriptor; | |
| rxFds.writeFd = InvalidFileDescriptor; | |
| txFds.readFd = InvalidFileDescriptor; | |
| txFds.writeFd = InvalidFileDescriptor; | |
| } |
| switch (config.packetSizeHeader) | ||
| { | ||
| case PacketHeaderSize::Size1Byte: | ||
| packetSizeReadResult = ::read(fds.readFd, buffer, 1); | ||
| packetSize = buffer[0]; | ||
| break; | ||
| case PacketHeaderSize::Size2Bytes: | ||
| packetSizeReadResult = ::read(fds.readFd, buffer, 2); | ||
| packetSize = buffer[1] | (buffer[0] << 8); | ||
| break; | ||
| case PacketHeaderSize::Size4Bytes: | ||
| packetSizeReadResult = ::read(fds.readFd, buffer, 4); | ||
| packetSize = buffer[3] | | ||
| (buffer[2] << 8) | | ||
| (buffer[1] << 16) | | ||
| (buffer[0] << 24); | ||
| break; | ||
| } |
There was a problem hiding this comment.
The packet size reading logic duplicates similar logic from ChannelUnixTcp.cpp. Consider extracting this into a shared helper function (e.g., readPacketSize) similar to how setPacketSize was created for writing, to improve code maintainability and reduce duplication.
| const styxlib::StyxBuffer buffer, | ||
| styxlib::Size size) override | ||
| { | ||
| totalReceivedBytes += size; |
There was a problem hiding this comment.
The totalReceivedBytes field is modified in handleBuffer (line 22) which may be called from multiple threads, but it's not protected by any synchronization mechanism. Consider making it std::atomic<uint32_t> to ensure thread-safe updates.
| if (receivedBytesPromise) { | ||
| receivedBytesPromise->set_value(size); | ||
| receivedBytesPromise = nullptr; |
There was a problem hiding this comment.
The receivedBytesPromise is accessed and modified without synchronization. If handleBuffer can be called from multiple threads, this could lead to race conditions. Consider adding a mutex to protect access to this field.
| this->fds.readFd = fds.readFd; | ||
| this->fds.writeFd = fds.writeFd; | ||
| isDescriptorOwned = false; |
There was a problem hiding this comment.
Direct assignment to fds.readFd and fds.writeFd without synchronization could cause race conditions if stop() or other methods access these fields concurrently. Consider using atomic operations or mutex protection for these file descriptor assignments.
| if (packetSize > 0xFFFFFFFF) { | ||
| return std::unexpected(ErrorCode::PacketTooLarge); | ||
| } |
There was a problem hiding this comment.
The condition packetSize > 0xFFFFFFFF will always be false if Size is defined as a 32-bit unsigned integer (uint32_t), since a 32-bit value cannot exceed 0xFFFFFFFF. Either remove this check or change it to be meaningful (e.g., if Size can be 64-bit). If Size is larger than 32-bit, this check makes sense; otherwise, it's dead code.
| if (packetSize > 0xFFFFFFFF) { | |
| return std::unexpected(ErrorCode::PacketTooLarge); | |
| } |
| include/Channel.h | ||
| include/ChannelUnixTcp.h | ||
| include/impl/ChannelUnixTcp.h | ||
| include/impl/ChannelUnixPipeImpl.h | ||
| include/impl/ChannelUnixFile.h |
There was a problem hiding this comment.
Header files should be listed under PRIVATE or in a separate set, not under PUBLIC in target_sources. The PUBLIC keyword here is misleading - these headers should be managed through target_include_directories instead. Consider moving these to a FILE_SET HEADERS (CMake 3.23+) or removing the PUBLIC keyword.
| this->startPromise = nullptr; | ||
| if (this->serverThread.joinable()) | ||
| { | ||
| this->serverThread.join(); | ||
| } |
There was a problem hiding this comment.
Setting startPromise to nullptr before the thread joins could cause issues if the worker thread tries to access it. The worker thread at line 85, 94, and 105 uses startPromise. Although stopRequested is set to true first, there's still a potential race condition. Consider joining the thread first, then clearing the promise.
| this->startPromise = nullptr; | |
| if (this->serverThread.joinable()) | |
| { | |
| this->serverThread.join(); | |
| } | |
| if (this->serverThread.joinable()) | |
| { | |
| this->serverThread.join(); | |
| } | |
| this->startPromise = nullptr; |
| using Tag = uint16_t; | ||
| using Type = uint16_t; | ||
| using ClientId = int; | ||
| using ClientId = uint16_t; |
There was a problem hiding this comment.
Changing ClientId from int to uint16_t is a breaking API change that limits the maximum number of clients from ~2 billion to 65,535. Consider whether this limitation is acceptable for your use case, and ensure this change is properly documented in release notes. If backward compatibility is needed, consider versioning the API.
| using ClientId = uint16_t; | |
| using ClientId = uint32_t; |
No description provided.