Skip to content

Commit 514d7c0

Browse files
committed
✨ Add socket::send_exact
1 parent 3913527 commit 514d7c0

1 file changed

Lines changed: 126 additions & 0 deletions

File tree

include/asyncpp/io/socket.h

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ namespace asyncpp::io {
3838
class socket_accept_awaitable;
3939
class socket_accept_error_code_awaitable;
4040
class socket_send_awaitable;
41+
class socket_send_exact_awaitable;
4142
class socket_recv_awaitable;
4243
class socket_recv_exact_awaitable;
4344
class socket_recv_from_awaitable;
@@ -49,6 +50,7 @@ namespace asyncpp::io {
4950
using socket_accept_error_code_cancellable_awaitable =
5051
detail::cancellable_awaitable<socket_accept_error_code_awaitable>;
5152
using socket_send_cancellable_awaitable = detail::cancellable_awaitable<socket_send_awaitable>;
53+
using socket_send_exact_cancellable_awaitable = detail::cancellable_awaitable<socket_send_exact_awaitable>;
5254
using socket_recv_cancellable_awaitable = detail::cancellable_awaitable<socket_recv_awaitable>;
5355
using socket_recv_exact_cancellable_awaitable = detail::cancellable_awaitable<socket_recv_exact_awaitable>;
5456
using socket_recv_from_cancellable_awaitable = detail::cancellable_awaitable<socket_recv_from_awaitable>;
@@ -123,6 +125,8 @@ namespace asyncpp::io {
123125
[[nodiscard]] socket_accept_error_code_awaitable accept(std::error_code& ec) noexcept;
124126
[[nodiscard]] socket_send_awaitable send(const void* buffer, std::size_t size) noexcept;
125127
[[nodiscard]] socket_send_awaitable send(const void* buffer, std::size_t size, std::error_code& ec) noexcept;
128+
[[nodiscard]] socket_send_exact_awaitable send_exact(const void* buffer, std::size_t size) noexcept;
129+
[[nodiscard]] socket_send_exact_awaitable send_exact(const void* buffer, std::size_t size, std::error_code& ec) noexcept;
126130
[[nodiscard]] socket_recv_awaitable recv(void* buffer, std::size_t size) noexcept;
127131
[[nodiscard]] socket_recv_awaitable recv(void* buffer, std::size_t size, std::error_code& ec) noexcept;
128132
[[nodiscard]] socket_recv_exact_awaitable recv_exact(void* buffer, std::size_t size) noexcept;
@@ -146,6 +150,10 @@ namespace asyncpp::io {
146150
asyncpp::stop_token st) noexcept;
147151
[[nodiscard]] socket_send_cancellable_awaitable send(const void* buffer, std::size_t size,
148152
asyncpp::stop_token st, std::error_code& ec) noexcept;
153+
[[nodiscard]] socket_send_exact_cancellable_awaitable send_exact(const void* buffer, std::size_t size,
154+
asyncpp::stop_token st) noexcept;
155+
[[nodiscard]] socket_send_exact_cancellable_awaitable send_exact(const void* buffer, std::size_t size,
156+
asyncpp::stop_token st, std::error_code& ec) noexcept;
149157
[[nodiscard]] socket_recv_cancellable_awaitable recv(void* buffer, std::size_t size,
150158
asyncpp::stop_token st) noexcept;
151159
[[nodiscard]] socket_recv_cancellable_awaitable recv(void* buffer, std::size_t size, asyncpp::stop_token st,
@@ -173,6 +181,9 @@ namespace asyncpp::io {
173181
template<typename FN>
174182
requires(std::is_invocable_v<FN, size_t, std::error_code>)
175183
void send(const void* buffer, std::size_t size, FN&& cb, asyncpp::stop_token st = {});
184+
template<typename FN>
185+
requires(std::is_invocable_v<FN, size_t, std::error_code>)
186+
void send_exact(const void* buffer, std::size_t size, FN&& cb, asyncpp::stop_token st = {});
176187
template<typename FN>
177188
requires(std::is_invocable_v<FN, size_t, std::error_code>)
178189
void recv(void* buffer, std::size_t size, FN&& cb, asyncpp::stop_token st = {});
@@ -260,6 +271,21 @@ namespace asyncpp::io {
260271
void await_resume();
261272
};
262273

274+
class socket_send_exact_awaitable : public detail::socket_awaitable_base {
275+
std::byte const* m_buffer;
276+
std::size_t const m_size;
277+
std::size_t m_remaining;
278+
asyncpp::coroutine_handle<> m_handle;
279+
std::error_code* const m_ec;
280+
281+
public:
282+
socket_send_exact_awaitable(socket& sock, const void* buffer, size_t size,
283+
std::error_code* ec = nullptr) noexcept
284+
: socket_awaitable_base{sock}, m_buffer{static_cast<const std::byte*>(buffer)}, m_size{size}, m_remaining{size}, m_ec{ec} {}
285+
bool await_suspend(coroutine_handle<> hdl);
286+
size_t await_resume();
287+
};
288+
263289
class socket_recv_awaitable : public detail::socket_awaitable_base {
264290
void* const m_buffer;
265291
std::size_t const m_size;
@@ -355,6 +381,15 @@ namespace asyncpp::io {
355381
return socket_send_awaitable(*this, buffer, size, &ec);
356382
}
357383

384+
[[nodiscard]] inline socket_send_exact_awaitable socket::send_exact(const void* buffer, std::size_t size) noexcept {
385+
return socket_send_exact_awaitable(*this, buffer, size);
386+
}
387+
388+
[[nodiscard]] inline socket_send_exact_awaitable socket::send_exact(const void* buffer, std::size_t size,
389+
std::error_code& ec) noexcept {
390+
return socket_send_exact_awaitable(*this, buffer, size, &ec);
391+
}
392+
358393
[[nodiscard]] inline socket_recv_awaitable socket::recv(void* buffer, std::size_t size) noexcept {
359394
return socket_recv_awaitable(*this, buffer, size);
360395
}
@@ -421,6 +456,16 @@ namespace asyncpp::io {
421456
return socket_send_cancellable_awaitable(std::move(st), *this, buffer, size, &ec);
422457
}
423458

459+
[[nodiscard]] inline socket_send_exact_cancellable_awaitable socket::send_exact(const void* buffer, std::size_t size,
460+
asyncpp::stop_token st) noexcept {
461+
return socket_send_exact_cancellable_awaitable(std::move(st), *this, buffer, size);
462+
}
463+
464+
[[nodiscard]] inline socket_send_exact_cancellable_awaitable
465+
socket::send_exact(const void* buffer, std::size_t size, asyncpp::stop_token st, std::error_code& ec) noexcept {
466+
return socket_send_exact_cancellable_awaitable(std::move(st), *this, buffer, size, &ec);
467+
}
468+
424469
[[nodiscard]] inline socket_recv_cancellable_awaitable socket::recv(void* buffer, std::size_t size,
425470
asyncpp::stop_token st) noexcept {
426471
return socket_recv_cancellable_awaitable(std::move(st), *this, buffer, size);
@@ -487,6 +532,46 @@ namespace asyncpp::io {
487532
*m_ec = m_completion.result;
488533
}
489534

535+
inline bool socket_send_exact_awaitable::await_suspend(coroutine_handle<> hdl) {
536+
m_completion.callback = [](void* ptr) {
537+
auto that = static_cast<socket_send_exact_awaitable*>(ptr);
538+
auto engine = that->m_socket.service().engine();
539+
do {
540+
if (that->m_completion.result_size == 0) {
541+
that->m_completion.result = std::make_error_code(std::errc::not_connected);
542+
}
543+
if (that->m_completion.result) {
544+
that->m_handle.resume();
545+
break;
546+
}
547+
that->m_buffer += that->m_completion.result_size;
548+
that->m_remaining -= that->m_completion.result_size;
549+
if (that->m_remaining == 0) {
550+
that->m_handle.resume();
551+
break;
552+
}
553+
} while (engine->enqueue_send(that->m_socket.native_handle(), that->m_buffer, that->m_remaining,
554+
&that->m_completion));
555+
};
556+
m_completion.userdata = this;
557+
m_handle = hdl;
558+
auto engine = m_socket.service().engine();
559+
while (engine->enqueue_send(m_socket.native_handle(), m_buffer, m_remaining, &m_completion)) {
560+
if (m_completion.result) return false;
561+
m_buffer += m_completion.result_size;
562+
m_remaining -= m_completion.result_size;
563+
if (m_remaining == 0) return false;
564+
}
565+
return true;
566+
}
567+
568+
inline size_t socket_send_exact_awaitable::await_resume() {
569+
if (!m_completion.result) return m_size - m_remaining;
570+
if (m_ec == nullptr) throw std::system_error(m_completion.result);
571+
*m_ec = m_completion.result;
572+
return m_size - m_remaining;
573+
}
574+
490575
inline bool socket_recv_awaitable::await_suspend(coroutine_handle<> hdl) {
491576
m_completion.callback = [](void* ptr) { coroutine_handle<>::from_address(ptr).resume(); };
492577
m_completion.userdata = hdl.address();
@@ -663,6 +748,47 @@ namespace asyncpp::io {
663748
if (service().engine()->enqueue_send(native_handle(), buffer, size, info)) { data::handle(info); }
664749
}
665750

751+
template<typename FN>
752+
requires(std::is_invocable_v<FN, size_t, std::error_code>)
753+
inline void socket::send_exact(const void* buffer, std::size_t size, FN&& cb, asyncpp::stop_token st) {
754+
struct data : detail::io_engine::completion_data {
755+
FN m_real_cb;
756+
const std::span<const std::byte> m_buffer;
757+
size_t m_size_sent;
758+
asyncpp::stop_callback<detail::cancel_io_stop_callback> m_stop_cb;
759+
socket& m_socket;
760+
761+
data(FN&& cb, std::span<const std::byte> buf, asyncpp::stop_token st, socket& socket)
762+
: completion_data{&handle, this}, m_real_cb(std::move(cb)), m_buffer(buf), m_size_sent(0),
763+
m_stop_cb(std::move(st), detail::cancel_io_stop_callback{this, socket.service().engine()}),
764+
m_socket(socket) {}
765+
766+
static void handle(void* ptr) {
767+
auto that = static_cast<data*>(ptr);
768+
if (that->result)
769+
that->m_real_cb(0, that->result);
770+
else {
771+
that->m_size_sent += that->result_size;
772+
if (that->m_size_sent < that->m_buffer.size())
773+
return that->send_some(); // Early out without self-delete
774+
else
775+
that->m_real_cb(that->m_size_sent, {});
776+
}
777+
delete that;
778+
};
779+
780+
void send_some() {
781+
auto engine = m_socket.service().engine();
782+
if (engine->enqueue_send(m_socket.native_handle(), m_buffer.data() + m_size_sent,
783+
m_buffer.size() - m_size_sent, this)) {
784+
data::handle(this);
785+
}
786+
}
787+
};
788+
auto info = new data(std::move(cb), std::span(static_cast<const std::byte*>(buffer), size), std::move(st), *this);
789+
info->send_some();
790+
}
791+
666792
template<typename FN>
667793
requires(std::is_invocable_v<FN, size_t, std::error_code>)
668794
inline void socket::recv(void* buffer, std::size_t size, FN&& cb, asyncpp::stop_token st) {

0 commit comments

Comments
 (0)