Skip to content

Commit d695c0e

Browse files
authored
[k2] add wait for rpc ignore answer requests in instance epilogue (#1536)
1 parent f857696 commit d695c0e

4 files changed

Lines changed: 39 additions & 7 deletions

File tree

runtime-light/coroutine/await-set.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "runtime-light/coroutine/coroutine-state.h"
1414
#include "runtime-light/coroutine/detail/await-set.h"
1515
#include "runtime-light/coroutine/type-traits.h"
16+
#include "runtime-light/stdlib/diagnostics/logs.h"
1617

1718
namespace kphp::coro {
1819

@@ -44,10 +45,12 @@ class await_set {
4445
template<typename awaitable_type>
4546
requires kphp::coro::concepts::awaitable<awaitable_type> && std::is_same_v<typename awaitable_traits<awaitable_type>::awaiter_return_type, return_type>
4647
void push(awaitable_type awaitable) noexcept {
48+
kphp::log::assertion(m_await_broker != nullptr);
4749
m_await_broker->start_task(detail::await_set::make_await_set_task(std::move(awaitable)), m_coroutine_stack_root, STACK_RETURN_ADDRESS);
4850
}
4951

5052
auto next() noexcept {
53+
kphp::log::assertion(m_await_broker != nullptr);
5154
return detail::await_set::await_set_awaitable<return_type>{*m_await_broker};
5255
}
5356

@@ -56,12 +59,9 @@ class await_set {
5659
}
5760

5861
size_t size() const noexcept {
62+
kphp::log::assertion(m_await_broker != nullptr);
5963
return m_await_broker->size();
6064
}
61-
62-
~await_set() {
63-
m_await_broker.release();
64-
}
6565
};
6666

6767
} // namespace kphp::coro

runtime-light/state/instance-state.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "runtime-common/core/std/containers.h"
1818
#include "runtime-light/core/globals/php-init-scripts.h"
1919
#include "runtime-light/core/globals/php-script-globals.h"
20+
#include "runtime-light/coroutine/await-set.h"
2021
#include "runtime-light/coroutine/task.h"
2122
#include "runtime-light/k2-platform/k2-api.h"
2223
#include "runtime-light/server/cli/init-functions.h"
@@ -27,6 +28,7 @@
2728
#include "runtime-light/stdlib/diagnostics/logs.h"
2829
#include "runtime-light/stdlib/fork/fork-functions.h"
2930
#include "runtime-light/stdlib/fork/fork-state.h"
31+
#include "runtime-light/stdlib/rpc/rpc-client-state.h"
3032
#include "runtime-light/stdlib/time/time-functions.h"
3133
#include "runtime-light/streams/read-ext.h"
3234
#include "runtime-light/streams/stream.h"
@@ -214,4 +216,22 @@ kphp::coro::task<> InstanceState::run_instance_epilogue() noexcept {
214216
web_state.session_is_finished = true;
215217
web_state.session.reset();
216218
}
219+
220+
/*
221+
* Unlike regular RPC requests whose results the user code waits for via rpc_fetch_responses,
222+
* thereby guaranteeing they are sent, the user code does not wait for requests sent with the
223+
* ignore_answer flag. Therefore, we can’t guarantee that the coroutines responsible for
224+
* sending ignore_answer requests have finished. This means the requests might not be sent
225+
* if the instance terminates.
226+
*
227+
* This await suspends the current coroutine until all pending ignore_answer requests are
228+
* fully sent. While suspended, other forks and coroutines may continue running.
229+
*
230+
* After this call completes, delivery of all ignore_answer requests is guaranteed.
231+
*/
232+
auto& rpc_client_instance_st{RpcClientInstanceState::get()};
233+
auto ignore_answer_request_await_set{std::exchange(rpc_client_instance_st.ignore_answer_request_awaiter_tasks, kphp::coro::await_set<void>{})};
234+
while (!ignore_answer_request_await_set.empty()) {
235+
co_await ignore_answer_request_await_set.next();
236+
}
217237
}

runtime-light/stdlib/rpc/rpc-api.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,11 @@ kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std
344344
co_return std::move(opt_response);
345345
}};
346346

347+
static constexpr auto ignore_answer_awaiter_coroutine{[](kphp::component::stream stream, std::chrono::milliseconds timeout) -> kphp::coro::shared_task<void> {
348+
auto fetch_task{kphp::component::fetch_response(stream, [](std::span<const std::byte>) noexcept {})};
349+
std::ignore = co_await kphp::coro::io_scheduler::get().schedule(std::move(fetch_task), timeout);
350+
}};
351+
347352
// normalize timeout
348353
using namespace std::chrono_literals;
349354
static constexpr auto DEFAULT_TIMEOUT{300ms};
@@ -356,13 +361,18 @@ kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std
356361
})
357362
.value_or(DEFAULT_TIMEOUT),
358363
MIN_TIMEOUT, MAX_TIMEOUT)};
364+
if (ignore_answer) {
365+
// start ignore answer awaiter task
366+
auto ignore_answer_awaiter_task{ignore_answer_awaiter_coroutine(std::move(stream), timeout)};
367+
kphp::log::assertion(kphp::coro::io_scheduler::get().start(ignore_answer_awaiter_task));
368+
369+
rpc_client_instance_st.ignore_answer_request_awaiter_tasks.push(std::move(ignore_answer_awaiter_task));
370+
co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
371+
}
359372
// start awaiter task
360373
auto awaiter_task{awaiter_coroutine(query_id, std::move(stream), timeout, collect_responses_extra_info)};
361374
kphp::log::assertion(kphp::coro::io_scheduler::get().start(awaiter_task));
362375

363-
if (ignore_answer) {
364-
co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
365-
}
366376
rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task));
367377
co_return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp};
368378
}

runtime-light/stdlib/rpc/rpc-client-state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "runtime-common/core/allocator/script-allocator.h"
1313
#include "runtime-common/core/runtime-core.h"
1414
#include "runtime-common/core/std/containers.h"
15+
#include "runtime-light/coroutine/await-set.h"
1516
#include "runtime-light/coroutine/shared-task.h"
1617
#include "runtime-light/stdlib/rpc/rpc-constants.h"
1718
#include "runtime-light/stdlib/rpc/rpc-extra-info.h"
@@ -26,6 +27,7 @@ struct RpcClientInstanceState final : private vk::not_copyable {
2627
kphp::stl::unordered_map<int64_t, class_instance<RpcTlQuery>, kphp::memory::script_allocator> response_fetcher_instances;
2728
kphp::stl::unordered_map<int64_t, std::pair<kphp::rpc::response_extra_info_status, kphp::rpc::response_extra_info>, kphp::memory::script_allocator>
2829
rpc_responses_extra_info;
30+
kphp::coro::await_set<void> ignore_answer_request_awaiter_tasks;
2931

3032
RpcClientInstanceState() noexcept = default;
3133

0 commit comments

Comments
 (0)