Skip to content

Add UnixFileChannel#51

Merged
vshcryabets merged 7 commits into
masterfrom
update/cpp-file-channel2
Nov 17, 2025
Merged

Add UnixFileChannel#51
vshcryabets merged 7 commits into
masterfrom
update/cpp-file-channel2

Conversation

@vshcryabets
Copy link
Copy Markdown
Owner

No description provided.

@vshcryabets vshcryabets self-assigned this Nov 17, 2025
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds UnixFileChannel functionality to enable communication via Unix file descriptors (pipes). The implementation provides a new abstraction layer for file descriptor-based channels and a concrete pipe implementation.

  • Introduces ChannelUnixFile base class for file descriptor-based communication
  • Adds ChannelUnixPipeImpl for Unix pipe communication
  • Unifies channel interfaces by removing ChannelTxOneToMany and using ChannelTx consistently
  • Adds comprehensive test coverage for the new pipe channel functionality

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
cpp/modules/l4/include/Channel.h Moves PacketHeaderSize enum to this file and consolidates channel TX interfaces
cpp/modules/l4/include/data.h Changes ClientId from int to uint16_t and adds InvalidClientId constant
cpp/modules/l4/include/impl/ChannelUnixFile.h New base class for file descriptor-based channel communication
cpp/modules/l4/include/impl/ChannelUnixPipeImpl.h New header for Unix pipe channel implementation
cpp/modules/l4/include/impl/ChannelUnixTcp.h Updates to use unified channel interface and new file descriptor types
cpp/modules/l4/include/impl/ClientsRepoImpl.h Initializes client ID counter to start after InvalidClientId
cpp/modules/l4/src/impl/ChannelUnixFile.cpp Implements file descriptor send/receive operations with packet framing
cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp Implements bidirectional pipe-based communication channel
cpp/modules/l4/src/impl/ChannelUnixTcp.cpp Updates TCP channel to use unified interface and adds clientId parameter
cpp/modules/l4/test/TestClasses.h Extracts common test deserializer class for reuse across test files
cpp/modules/l4/test/test_ChannelUnixTcp.cpp Updates test to use extracted test class and new send API signature
cpp/modules/l4/test/test_ChannelUnixPipeImpl.cpp New comprehensive test suite for pipe channel functionality
cpp/modules/l4/CMakeLists.txt Adds new source and test files to build configuration
.vscode/tasks.json Enhances task presentation settings for better debugging experience

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +78 to +92
running.store(true);
// Create pipes
int pipeFds[2];
if (pipe(pipeFds) == -1)
{
startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket));
return;
}
rxFds.readFd = pipeFds[0];
rxFds.writeFd = pipeFds[1];
if (pipe(pipeFds) == -1)
{
startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket));
return;
}
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential race condition. The running flag is set to true immediately at the start of workThreadFunction, but if pipe creation fails (lines 81-92), the function returns early without setting running back to false. This leaves the isStarted() method returning true even though the channel failed to start, which could prevent subsequent start() attempts.

Copilot uses AI. Check for mistakes.
Comment thread cpp/modules/l4/src/impl/ChannelUnixFile.cpp Outdated
Comment thread cpp/modules/l4/CMakeLists.txt Outdated
Comment thread cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp
#include "impl/ChannelUnixFile.h"

#include <unistd.h>
#include <iostream>
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required includes for htonl, strerror, and std::memcpy. The file uses htonl (line 60) which requires <arpa/inet.h> or <netinet/in.h>, strerror (lines 72, 109, 118) which requires <cstring> or <string.h>, and std::memcpy (line 61) which requires <cstring>.

Suggested change
#include <iostream>
#include <iostream>
#include <arpa/inet.h>
#include <cstring>

Copilot uses AI. Check for mistakes.
break;
case PacketHeaderSize::Size2Bytes:
packetSizeReadResult = ::read(fds.readFd, buffer, 2);
packetSize = buffer[1] | (buffer[0] << 8);
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent byte order handling for 2-byte packet size. In sendBuffer (line 55-56), the size is sent in little-endian format (buffer[0] = size & 0xFF; buffer[1] = (size >> 8) & 0xFF), but in readBufferBlocking, it's read as big-endian (buffer[1] | (buffer[0] << 8)). This will cause data corruption. The read operation should match the write operation: packetSize = buffer[0] | (buffer[1] << 8);

Suggested change
packetSize = buffer[1] | (buffer[0] << 8);
packetSize = buffer[0] | (buffer[1] << 8);

Copilot uses AI. Check for mistakes.
Comment thread cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp Outdated
Comment thread cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp Outdated
Comment thread cpp/modules/l4/src/impl/ChannelUnixPipeImpl.cpp Outdated
Comment thread cpp/modules/l4/src/impl/ChannelUnixFile.cpp Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +13 to +25
case PacketHeaderSize::Size1Byte:
buffer[0] = static_cast<uint8_t>(size);
break;
case PacketHeaderSize::Size2Bytes:
buffer[1] = size & 0xFF;
buffer[0] = (size >> 8) & 0xFF;
break;
case PacketHeaderSize::Size4Bytes:
buffer[3] = size & 0xFF;
buffer[2] = (size >> 8) & 0xFF;
buffer[1] = (size >> 16) & 0xFF;
buffer[0] = (size >> 24) & 0xFF;
break;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing packet size validation. The function should validate that the packet size can fit in the specified header size:

  • Size1Byte: size must be <= 255
  • Size2Bytes: size must be <= 65535
  • Size4Bytes: no limit for uint16_t Size type

Without this validation, silent truncation occurs when sizes exceed the header capacity, leading to incorrect packet parsing. The removed code in ChannelUnixTcp.cpp had this validation returning ErrorCode::PacketTooLarge.

Copilot uses AI. Check for mistakes.
Comment thread cpp/modules/l4/src/Channel.cpp Outdated
Comment on lines +8 to +10
if (size < 4) {
throw std::invalid_argument("Size must be at least 4 to fit the header");
}
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect parameter validation. The check validates that size (the packet size to encode) is at least 4, but this doesn't make sense. The size parameter represents the packet data size being encoded into the header, which can be any value from 0 up to the header's maximum capacity. The buffer capacity is always 4 bytes (from the array declaration), so this check is unnecessary and validates the wrong thing.

Suggested change
if (size < 4) {
throw std::invalid_argument("Size must be at least 4 to fit the header");
}

Copilot uses AI. Check for mistakes.
{
private:
std::atomic_int nextId{0};
std::atomic_int nextId{InvalidClientId + 1};
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Type mismatch between std::atomic_int and ClientId (now uint16_t). The atomic should be std::atomic<uint16_t> or std::atomic<ClientId> to match the return type. With the current implementation, the counter can overflow beyond 65535 (uint16_t max), causing truncation when returned as ClientId. This can lead to duplicate client IDs after 65536 clients.

Suggested change
std::atomic_int nextId{InvalidClientId + 1};
std::atomic<ClientId> nextId{InvalidClientId + 1};

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +84
running.store(true);
// Create pipes
int pipeFds[2];
if (pipe(pipeFds) == -1)
{
startPromise->set_value(std::unexpected(ErrorCode::CantCreateSocket));
return;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The running flag is set to true at the start of the function, but if pipe creation fails (lines 81-84 or 88-91), the function returns early without setting running back to false. This leaves the channel in an inconsistent state where isStarted() returns true but the channel is not actually running. The flag should only be set to true after successful initialization, or reset to false before returning on error.

Copilot uses AI. Check for mistakes.
ssize_t result = ::write(fds.writeFd, packetSizeBuffer, headerSize);
if (result < 0)
{
std::cerr << "Error writing packet size to fd " << fds.writeFd << ": " << strerror(errno) << std::endl;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing include for strerror. The function strerror is used but <cstring> is not included. Add #include <cstring> to the includes at the top of the file.

Copilot uses AI. Check for mistakes.
}

::send(socket.value(), packetSizeBuffer, static_cast<uint8_t>(packetSizeHeader), 0);
uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer));
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: setPacketSize is being called with sizeof(packetSizeBuffer) (which is always 4) instead of the actual packet size. This will write the buffer size to the header instead of the message size, causing the receiver to read the wrong amount of data. Should be: setPacketSize(packetSizeHeader, packetSizeBuffer, size)

Suggested change
uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer));
uint8_t headerSize = setPacketSize(packetSizeHeader, packetSizeBuffer, size);

Copilot uses AI. Check for mistakes.
}
// Send the buffer over the file descriptor
uint8_t packetSizeBuffer[4] = {0};
uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer));
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: setPacketSize is being called with sizeof(packetSizeBuffer) (which is always 4) instead of the actual packet size. This will write the buffer size to the header instead of the message size, causing the receiver to read the wrong amount of data. Should be: setPacketSize(config.packetSizeHeader, packetSizeBuffer, size)

Suggested change
uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, sizeof(packetSizeBuffer));
uint8_t headerSize = setPacketSize(config.packetSizeHeader, packetSizeBuffer, size);

Copilot uses AI. Check for mistakes.
break;
if (num_events < 0)
{
std::cerr << "Poll error: " << strerror(errno) << std::endl;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing include for strerror. The function strerror is used but <cstring> is not included. Add #include <cstring> to the includes at the top of the file.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +157 to +163
}
fds.readFd = InvalidFileDescriptor;
fds.writeFd = InvalidFileDescriptor;
rxFds.readFd = InvalidFileDescriptor;
rxFds.writeFd = InvalidFileDescriptor;
txFds.readFd = InvalidFileDescriptor;
txFds.writeFd = InvalidFileDescriptor;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closeDescriptors() method doesn't check isDescriptorOwned before resetting fds, rxFds, and txFds to InvalidFileDescriptor (lines 158-163). This means even when the descriptors aren't owned (and thus shouldn't be closed), the fields are still reset. This could cause issues if the caller expects to reuse these descriptors. Consider only resetting the fields when isDescriptorOwned is true.

Suggested change
}
fds.readFd = InvalidFileDescriptor;
fds.writeFd = InvalidFileDescriptor;
rxFds.readFd = InvalidFileDescriptor;
rxFds.writeFd = InvalidFileDescriptor;
txFds.readFd = InvalidFileDescriptor;
txFds.writeFd = InvalidFileDescriptor;
fds.readFd = InvalidFileDescriptor;
fds.writeFd = InvalidFileDescriptor;
rxFds.readFd = InvalidFileDescriptor;
rxFds.writeFd = InvalidFileDescriptor;
txFds.readFd = InvalidFileDescriptor;
txFds.writeFd = InvalidFileDescriptor;
}

Copilot uses AI. Check for mistakes.
Comment on lines +63 to +80
switch (config.packetSizeHeader)
{
case PacketHeaderSize::Size1Byte:
packetSizeReadResult = ::read(fds.readFd, buffer, 1);
packetSize = buffer[0];
break;
case PacketHeaderSize::Size2Bytes:
packetSizeReadResult = ::read(fds.readFd, buffer, 2);
packetSize = buffer[1] | (buffer[0] << 8);
break;
case PacketHeaderSize::Size4Bytes:
packetSizeReadResult = ::read(fds.readFd, buffer, 4);
packetSize = buffer[3] |
(buffer[2] << 8) |
(buffer[1] << 16) |
(buffer[0] << 24);
break;
}
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The packet size reading logic duplicates similar logic from ChannelUnixTcp.cpp. Consider extracting this into a shared helper function (e.g., readPacketSize) similar to how setPacketSize was created for writing, to improve code maintainability and reduce duplication.

Copilot uses AI. Check for mistakes.
const styxlib::StyxBuffer buffer,
styxlib::Size size) override
{
totalReceivedBytes += size;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The totalReceivedBytes field is modified in handleBuffer (line 22) which may be called from multiple threads, but it's not protected by any synchronization mechanism. Consider making it std::atomic<uint32_t> to ensure thread-safe updates.

Copilot uses AI. Check for mistakes.
Comment on lines +23 to +25
if (receivedBytesPromise) {
receivedBytesPromise->set_value(size);
receivedBytesPromise = nullptr;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receivedBytesPromise is accessed and modified without synchronization. If handleBuffer can be called from multiple threads, this could lead to race conditions. Consider adding a mutex to protect access to this field.

Copilot uses AI. Check for mistakes.
Comment on lines +25 to +27
this->fds.readFd = fds.readFd;
this->fds.writeFd = fds.writeFd;
isDescriptorOwned = false;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct assignment to fds.readFd and fds.writeFd without synchronization could cause race conditions if stop() or other methods access these fields concurrently. Consider using atomic operations or mutex protection for these file descriptor assignments.

Copilot uses AI. Check for mistakes.
Comment on lines +28 to +30
if (packetSize > 0xFFFFFFFF) {
return std::unexpected(ErrorCode::PacketTooLarge);
}
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition packetSize > 0xFFFFFFFF will always be false if Size is defined as a 32-bit unsigned integer (uint32_t), since a 32-bit value cannot exceed 0xFFFFFFFF. Either remove this check or change it to be meaningful (e.g., if Size can be 64-bit). If Size is larger than 32-bit, this check makes sense; otherwise, it's dead code.

Suggested change
if (packetSize > 0xFFFFFFFF) {
return std::unexpected(ErrorCode::PacketTooLarge);
}

Copilot uses AI. Check for mistakes.
Comment thread cpp/modules/l4/CMakeLists.txt Outdated
Comment on lines +10 to +16
include/Channel.h
include/ChannelUnixTcp.h
include/impl/ChannelUnixTcp.h
include/impl/ChannelUnixPipeImpl.h
include/impl/ChannelUnixFile.h
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Header files should be listed under PRIVATE or in a separate set, not under PUBLIC in target_sources. The PUBLIC keyword here is misleading - these headers should be managed through target_include_directories instead. Consider moving these to a FILE_SET HEADERS (CMake 3.23+) or removing the PUBLIC keyword.

Copilot uses AI. Check for mistakes.
Comment on lines +60 to +64
this->startPromise = nullptr;
if (this->serverThread.joinable())
{
this->serverThread.join();
}
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting startPromise to nullptr before the thread joins could cause issues if the worker thread tries to access it. The worker thread at line 85, 94, and 105 uses startPromise. Although stopRequested is set to true first, there's still a potential race condition. Consider joining the thread first, then clearing the promise.

Suggested change
this->startPromise = nullptr;
if (this->serverThread.joinable())
{
this->serverThread.join();
}
if (this->serverThread.joinable())
{
this->serverThread.join();
}
this->startPromise = nullptr;

Copilot uses AI. Check for mistakes.
using Tag = uint16_t;
using Type = uint16_t;
using ClientId = int;
using ClientId = uint16_t;
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing ClientId from int to uint16_t is a breaking API change that limits the maximum number of clients from ~2 billion to 65,535. Consider whether this limitation is acceptable for your use case, and ensure this change is properly documented in release notes. If backward compatibility is needed, consider versioning the API.

Suggested change
using ClientId = uint16_t;
using ClientId = uint32_t;

Copilot uses AI. Check for mistakes.
@vshcryabets vshcryabets marked this pull request as ready for review November 17, 2025 21:26
@vshcryabets vshcryabets marked this pull request as draft November 17, 2025 21:37
@vshcryabets vshcryabets marked this pull request as ready for review November 17, 2025 21:37
@vshcryabets vshcryabets merged commit 73b811c into master Nov 17, 2025
3 checks passed
@vshcryabets vshcryabets deleted the update/cpp-file-channel2 branch November 17, 2025 21:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants