Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/corosio/src/detail/epoll/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,41 @@ class epoll_socket_impl
system::error_code*,
std::size_t*) override;

int native_handle() const noexcept { return fd_; }
bool is_open() const noexcept { return fd_ >= 0; }
/**
* @brief Shut down one or both directions of the underlying socket.
*
* @param what Specifies which direction to shut down:
* `socket::shutdown_receive` → receive (SHUT_RD),
* `socket::shutdown_send` → send (SHUT_WR),
* `socket::shutdown_both` → both (SHUT_RDWR).
* @return system::error_code Empty error code on success, otherwise an error code constructed from `errno` describing the failure.
*/
system::error_code shutdown(socket::shutdown_type what) noexcept override
{
int how;
switch (what)
{
case socket::shutdown_receive: how = SHUT_RD; break;
case socket::shutdown_send: how = SHUT_WR; break;
case socket::shutdown_both: how = SHUT_RDWR; break;
}
if (::shutdown(fd_, how) != 0)
return make_err(errno);
return {};
}

/**
* @brief Retrieve the underlying native file descriptor for this socket.
*
* @return int The native file descriptor, or -1 if the socket is not open.
*/
int native_handle() const noexcept { return fd_; }
/**
* @brief Checks whether the underlying socket file descriptor is valid/open.
*
* @return `true` if the socket is open (file descriptor is greater than or equal to 0), `false` otherwise.
*/
bool is_open() const noexcept { return fd_ >= 0; }
void cancel() noexcept;
void close_socket() noexcept;
void set_socket(int fd) noexcept { fd_ = fd; }
Expand Down Expand Up @@ -726,4 +759,4 @@ work_finished() noexcept

#endif // BOOST_COROSIO_BACKEND_EPOLL

#endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
#endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
42 changes: 40 additions & 2 deletions src/corosio/src/detail/iocp/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ class win_socket_impl
internal_->read_some(h, d, buf, token, ec, bytes);
}

/**
* @brief Initiates an asynchronous write operation using the socket's internal state.
*
* Starts or schedules a write of the provided buffer sequence and will resume the
* awaiting coroutine when the operation completes or is cancelled.
*
* @param h Coroutine handle to resume on completion.
* @param d Executor used to perform the operation.
* @param buf Mutable buffer sequence containing data to write.
* @param token Stop token that can request cancellation of the operation.
* @param ec Pointer to receive the resulting error code on completion (may be null).
* @param bytes Pointer to receive the number of bytes written on completion (may be null).
*/
void write_some(
std::coroutine_handle<> h,
capy::any_executor_ref d,
Expand All @@ -225,7 +238,32 @@ class win_socket_impl
internal_->write_some(h, d, buf, token, ec, bytes);
}

win_socket_impl_internal* get_internal() const noexcept { return internal_.get(); }
/**
* @brief Shuts down parts of the underlying socket according to the specified shutdown type.
*
* @param what Specifies which direction of the connection to disable: receive, send, or both.
* @return system::error_code `{}` on success; otherwise an error code representing the WinSock error (from WSAGetLastError).
*/
system::error_code shutdown(socket::shutdown_type what) noexcept override
{
int how;
switch (what)
{
case socket::shutdown_receive: how = SD_RECEIVE; break;
case socket::shutdown_send: how = SD_SEND; break;
case socket::shutdown_both: how = SD_BOTH; break;
}
if (::shutdown(internal_->native_handle(), how) != 0)
return make_err(WSAGetLastError());
return {};
}

/**
* @brief Access the underlying internal socket implementation.
*
* @return win_socket_impl_internal* Pointer to the internal win_socket_impl_internal, or `nullptr` if no internal instance is present.
*/
win_socket_impl_internal* get_internal() const noexcept { return internal_.get(); }
};

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -449,4 +487,4 @@ class win_sockets

#endif // BOOST_COROSIO_BACKEND_IOCP

#endif // BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP
#endif // BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP
48 changes: 45 additions & 3 deletions src/corosio/src/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,22 @@ socket(
{
}

/**
* @brief Ensure the socket has an underlying implementation and open the native socket handle.
*
* Creates and stores the backend-specific socket implementation from the execution context
* and opens the associated native socket handle. If the socket is already open, the call
* returns immediately.
*
* @throws std::system_error If the underlying service fails to open the native socket; the
* implementation is released before the exception is thrown.
*/
void
socket::
open()
{
if (impl_)
return; // Already open
return;

auto& svc = ctx_->use_service<socket_service>();
auto& wrapper = svc.create_impl();
Expand All @@ -70,18 +80,34 @@ open()
}
}

/**
* @brief Releases the underlying socket implementation and resets the socket state.
*
* If the socket is not open, the call has no effect. Otherwise this releases the
* held implementation (closing the underlying resource) and clears the internal
* implementation pointer.
*/
void
socket::
close()
{
if (!impl_)
return; // Already closed
return;

auto* wrapper = static_cast<socket_impl_type*>(impl_);
wrapper->release();
impl_ = nullptr;
}

/**
* @brief Cancel all outstanding asynchronous operations on the socket.
*
* Requires that the socket implementation has been created (socket is open).
* After calling this, any pending asynchronous I/O initiated on the underlying
* socket will be requested to cancel.
*
* @pre impl_ must be non-null (socket must be open).
*/
void
socket::
cancel()
Expand All @@ -94,5 +120,21 @@ cancel()
#endif
}

/**
* @brief Shuts down one or both directions of the underlying socket.
*
* If the socket has an active implementation, delegates the shutdown request
* to that implementation; otherwise the call is a no-op.
*
* @param what Specifies which direction(s) to shut down (for example: read, write, or both).
*/
void
socket::
shutdown(shutdown_type what)
{
if (impl_)
get().shutdown(what);
}

} // namespace corosio
} // namespace boost
} // namespace boost
105 changes: 103 additions & 2 deletions test/unit/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,95 @@ struct socket_test
s2.close();
}

// Data Integrity
/**
* @brief Verifies that shutting down the send direction delivers remaining data and then produces EOF on the peer.
*
* @details Creates a socket pair, sends a short payload from one peer, calls shutdown on the sender's send direction,
* then confirms the receiver reads the payload and that a subsequent read completes with EOF.
*/

void
testShutdownSend()
{
io_context ioc;
auto [s1, s2] = test::make_socket_pair(ioc);

auto task = [](socket& a, socket& b) -> capy::task<>
{
// Write data then shutdown send
co_await a.write_some(capy::const_buffer("hello", 5));
a.shutdown(socket::shutdown_send);

// Read the data
char buf[32] = {};
auto [ec1, n1] = co_await b.read_some(
capy::mutable_buffer(buf, sizeof(buf)));
BOOST_TEST(!ec1);
BOOST_TEST_EQ(std::string_view(buf, n1), "hello");

// Next read should get EOF
auto [ec2, n2] = co_await b.read_some(
capy::mutable_buffer(buf, sizeof(buf)));
BOOST_TEST(ec2 == capy::cond::eof);
};
capy::run_async(ioc.get_executor())(task(s1, s2));

ioc.run();
s1.close();
s2.close();
}

/**
* @brief Verifies that shutting down the receive direction does not prevent the peer from sending.
*
* Creates a socket pair, performs a receive-direction shutdown on one endpoint, sends data
* from that endpoint, and asserts the peer receives the sent data without error.
*/
void
testShutdownReceive()
{
io_context ioc;
auto [s1, s2] = test::make_socket_pair(ioc);

auto task = [](socket& a, socket& b) -> capy::task<>
{
// Shutdown receive on b
b.shutdown(socket::shutdown_receive);

// b can still send
co_await b.write_some(capy::const_buffer("from_b", 6));

char buf[32] = {};
auto [ec, n] = co_await a.read_some(
capy::mutable_buffer(buf, sizeof(buf)));
BOOST_TEST(!ec);
BOOST_TEST_EQ(std::string_view(buf, n), "from_b");
};
capy::run_async(ioc.get_executor())(task(s1, s2));

ioc.run();
s1.close();
s2.close();
}

void
testShutdownOnClosedSocket()
{
io_context ioc;
socket sock(ioc);

// Shutdown on closed socket should not crash
sock.shutdown(socket::shutdown_send);
sock.shutdown(socket::shutdown_receive);
sock.shutdown(socket::shutdown_both);
}

/**
* @brief Verifies that a 128 KB payload can be transmitted end-to-end between two connected sockets.
*
* Creates a socket pair, sends a 128 KiB patterned buffer from one endpoint to the other,
* and asserts the receiver obtains the exact same bytes. Closes both sockets after completion.
*/

void
testLargeTransfer()
Expand Down Expand Up @@ -746,6 +834,14 @@ struct socket_test
s2.close();
}

/**
* @brief Executes the complete socket_test suite in a defined order.
*
* Runs all socket_test cases covering construction and move semantics; basic
* I/O operations; buffer-size variations; EOF and peer-closure behaviour;
* shutdown semantics; cancellation and close-while-reading scenarios;
* composed read/write operations; and data-integrity checks.
*/
void
run()
{
Expand All @@ -770,6 +866,11 @@ struct socket_test
testReadAfterPeerClose();
testWriteAfterPeerClose();

// Shutdown
testShutdownSend();
testShutdownReceive();
testShutdownOnClosedSocket();

// Cancellation
testCancelRead();
testCloseWhileReading();
Expand All @@ -789,4 +890,4 @@ struct socket_test
TEST_SUITE(socket_test, "boost.corosio.socket");

} // namespace corosio
} // namespace boost
} // namespace boost
Loading