Skip to content

Commit 66066b0

Browse files
authored
[k2] add stream_set_blocking (#1633)
1 parent f1594bb commit 66066b0

5 files changed

Lines changed: 57 additions & 0 deletions

File tree

builtin-functions/kphp-light/stdlib/file-functions.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ define('STREAM_CLIENT_CONNECT', 1);
1818
define('DEFAULT_SOCKET_TIMEOUT', 60);
1919

2020
function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed;
21+
function stream_set_blocking ($stream, $mode ::: bool) ::: bool;
2122

2223
function fopen ($filename ::: string, $mode ::: string): mixed;
2324
/** @kphp-extern-func-info interruptible */

runtime-light/k2-platform/k2-api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ TLS_INITIAL_EXEC inline thread_local InstanceState* instance_state_ptr{};
4545

4646
inline constexpr int32_t errno_ok = 0;
4747
inline constexpr int32_t errno_e2big = E2BIG;
48+
inline constexpr int32_t errno_ebadfd = EBADF;
4849
inline constexpr int32_t errno_ebusy = EBUSY;
4950
inline constexpr int32_t errno_enodev = ENODEV;
5051
inline constexpr int32_t errno_einval = EINVAL;

runtime-light/stdlib/file/file-system-functions.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ inline resource f$stream_socket_client(const string& address, std::optional<std:
192192
return make_instance<kphp::fs::socket>(*std::move(expected));
193193
}
194194

195+
inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept {
196+
if (auto socket{from_mixed<class_instance<kphp::fs::socket>>(stream, {})}; !socket.is_null()) {
197+
socket.get()->set_blocking(mode);
198+
return true;
199+
}
200+
201+
kphp::log::warning("unexpected resource in stream_set_blocking -> {}", stream.to_string().c_str());
202+
return false;
203+
}
204+
195205
inline Optional<string> f$file_get_contents(const string& stream) noexcept {
196206
if (auto sync_resource{from_mixed<class_instance<kphp::fs::sync_resource>>(f$fopen(stream, FileSystemImageState::get().READ_MODE), {})};
197207
!sync_resource.is_null()) {

runtime-light/stdlib/file/resource.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,8 @@ class socket : public async_resource {
658658

659659
static auto open(std::string_view scheme) noexcept -> std::expected<socket, int32_t>;
660660

661+
auto set_blocking(bool blocking) noexcept -> void;
662+
661663
auto write(std::span<const std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> override;
662664
auto read(std::span<std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> override;
663665
auto get_contents() noexcept -> kphp::coro::task<std::expected<string, int32_t>> override;
@@ -689,6 +691,10 @@ inline auto socket::open(std::string_view scheme) noexcept -> std::expected<sock
689691
return expected;
690692
}
691693

694+
inline auto socket::set_blocking(bool blocking) noexcept -> void {
695+
m_stream.set_blocking(blocking);
696+
}
697+
692698
inline auto socket::write(std::span<const std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
693699
if (!m_open) [[unlikely]] {
694700
co_return std::unexpected{k2::errno_enodev};

runtime-light/streams/stream.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727
namespace kphp::component {
2828

2929
class stream {
30+
bool m_non_blocking{false};
3031
k2::descriptor m_descriptor{k2::INVALID_PLATFORM_DESCRIPTOR};
3132
kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()};
3233

3334
explicit stream(k2::descriptor descriptor) noexcept
3435
: m_descriptor(descriptor) {}
3536

37+
auto read_non_blocking(std::span<std::byte> buf) const noexcept -> std::expected<size_t, int32_t>;
38+
auto write_non_blocking(std::span<const std::byte> buf) const noexcept -> std::expected<size_t, int32_t>;
39+
3640
public:
3741
stream(stream&& other) noexcept
3842
: m_descriptor(std::exchange(other.m_descriptor, k2::INVALID_PLATFORM_DESCRIPTOR)) {}
@@ -60,6 +64,7 @@ class stream {
6064
auto descriptor() const noexcept -> k2::descriptor;
6165
auto reset(k2::descriptor descriptor) noexcept -> void;
6266
auto status() const noexcept -> k2::StreamStatus;
67+
auto set_blocking(bool blocking) noexcept -> void;
6368

6469
auto read(std::span<std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>>;
6570
template<std::invocable<std::span<const std::byte>> F>
@@ -72,6 +77,28 @@ class stream {
7277

7378
// ================================================================================================
7479

80+
inline auto stream::read_non_blocking(std::span<std::byte> buf) const noexcept -> std::expected<size_t, int32_t> {
81+
switch (status().read_status) {
82+
case k2::IOStatus::IOBlocked:
83+
return std::expected<size_t, int32_t>{0};
84+
case k2::IOStatus::IOClosed:
85+
return std::unexpected{k2::errno_ebadfd};
86+
case k2::IOStatus::IOAvailable:
87+
return k2::read(m_descriptor, buf);
88+
}
89+
}
90+
91+
inline auto stream::write_non_blocking(std::span<const std::byte> buf) const noexcept -> std::expected<size_t, int32_t> {
92+
switch (status().write_status) {
93+
case k2::IOStatus::IOBlocked:
94+
return std::expected<size_t, int32_t>{0};
95+
case k2::IOStatus::IOClosed:
96+
return std::unexpected{k2::errno_ebadfd};
97+
case k2::IOStatus::IOAvailable:
98+
return k2::write(m_descriptor, buf);
99+
}
100+
}
101+
75102
inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected<kphp::component::stream, int32_t> {
76103
int32_t errc{};
77104
k2::descriptor descriptor{k2::INVALID_PLATFORM_DESCRIPTOR};
@@ -125,7 +152,15 @@ inline auto stream::status() const noexcept -> k2::StreamStatus {
125152
return stream_status;
126153
}
127154

155+
inline auto stream::set_blocking(bool blocking) noexcept -> void {
156+
m_non_blocking = !blocking;
157+
}
158+
128159
inline auto stream::read(std::span<std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
160+
if (m_non_blocking) {
161+
co_return read_non_blocking(buf);
162+
}
163+
129164
for (size_t read{}; read < buf.size();) {
130165
switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::read)) {
131166
case kphp::coro::poll_status::event:
@@ -158,6 +193,10 @@ auto stream::read_all(F f) const noexcept -> kphp::coro::task<std::expected<void
158193
}
159194

160195
inline auto stream::write(std::span<const std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
196+
if (m_non_blocking) {
197+
co_return write_non_blocking(buf);
198+
}
199+
161200
for (size_t written{}; written < buf.size();) {
162201
switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::write)) {
163202
case kphp::coro::poll_status::event:

0 commit comments

Comments
 (0)