Skip to content

Commit b8c965b

Browse files
authored
[k2] rpc server part 3 (#1310)
1 parent 856c753 commit b8c965b

10 files changed

Lines changed: 103 additions & 245 deletions

File tree

runtime-light/server/http/init-functions.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,12 @@ std::string_view process_headers(const tl::K2InvokeHttp& invoke_http, PhpScriptB
196196

197197
namespace kphp::http {
198198

199-
void init_server(tl::K2InvokeHttp invoke_http) noexcept {
199+
void init_server(tl::TLBuffer& tlb) noexcept {
200+
tl::K2InvokeHttp invoke_http{};
201+
if (!invoke_http.fetch(tlb)) [[unlikely]] {
202+
kphp::log::error("erroneous http request");
203+
}
204+
200205
auto& superglobals{InstanceState::get().php_script_mutable_globals_singleton.get_superglobals()};
201206
auto& server{superglobals.v$_SERVER};
202207
auto& http_server_instance_st{HttpServerInstanceState::get()};
@@ -306,7 +311,6 @@ void init_server(tl::K2InvokeHttp invoke_http) noexcept {
306311
const auto connection_kind{http_server_instance_st.connection_kind == connection_kind::keep_alive ? CONNECTION_KEEP_ALIVE : CONNECTION_CLOSE};
307312
static_SB.clean() << headers::CONNECTION.data() << ": " << connection_kind.data();
308313
kphp::http::header({static_SB.c_str(), static_SB.size()}, true, status::NO_STATUS);
309-
310314
kphp::log::info("http server initialized with: "
311315
"server addr -> {}, "
312316
"server port -> {}, "

runtime-light/server/http/init-functions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
#include "runtime-common/core/runtime-core.h"
88
#include "runtime-light/coroutine/task.h"
9-
#include "runtime-light/tl/tl-functions.h"
9+
#include "runtime-light/tl/tl-core.h"
1010

1111
namespace kphp::http {
1212

13-
void init_server(tl::K2InvokeHttp invoke_http) noexcept;
13+
void init_server(tl::TLBuffer& tlb) noexcept;
1414

1515
kphp::coro::task<> finalize_server(const string_buffer& output) noexcept;
1616

runtime-light/server/rpc/init-functions.cpp

Lines changed: 32 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "runtime-common/core/runtime-core.h"
1313
#include "runtime-light/core/globals/php-script-globals.h"
1414
#include "runtime-light/server/rpc/rpc-server-state.h"
15+
#include "runtime-light/tl/tl-core.h"
1516
#include "runtime-light/tl/tl-functions.h"
1617
#include "runtime-light/tl/tl-types.h"
1718
#include "runtime-light/utils/logs.h"
@@ -79,64 +80,48 @@ void process_rpc_invoke_req_extra(const tl::rpcInvokeReqExtra& extra, PhpScriptB
7980
}
8081
}
8182

82-
void process_dest_actor(const tl::rpcDestActor& dest_actor, PhpScriptBuiltInSuperGlobals& superglobals) noexcept {
83-
superglobals.v$_SERVER.set_value(string{RPC_ACTOR_ID.data(), RPC_ACTOR_ID.size()}, dest_actor.actor_id.value);
84-
}
85-
86-
void process_dest_flags(const tl::rpcDestFlags& dest_flags, PhpScriptBuiltInSuperGlobals& superglobals) noexcept {
87-
superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast<int64_t>(dest_flags.flags.value));
88-
process_rpc_invoke_req_extra(dest_flags.extra, superglobals);
89-
}
90-
91-
void process_dest_actor_flags(const tl::rpcDestActorFlags& dest_actor_flags, PhpScriptBuiltInSuperGlobals& superglobals) noexcept {
92-
superglobals.v$_SERVER.set_value(string{RPC_ACTOR_ID.data(), RPC_ACTOR_ID.size()}, dest_actor_flags.actor_id.value);
93-
superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast<int64_t>(dest_actor_flags.flags.value));
94-
process_rpc_invoke_req_extra(dest_actor_flags.extra, superglobals);
95-
}
96-
97-
void process_rpc_invoke_req(const tl::rpcInvokeReq& rpc_invoke_req, PhpScriptBuiltInSuperGlobals& superglobals) noexcept {
98-
if (rpc_invoke_req.opt_dest_actor) {
99-
process_dest_actor((*rpc_invoke_req.opt_dest_actor).inner, superglobals);
100-
}
101-
if (rpc_invoke_req.opt_dest_flags) {
102-
process_dest_flags((*rpc_invoke_req.opt_dest_flags).inner, superglobals);
103-
}
104-
if (rpc_invoke_req.opt_dest_actor_flags) {
105-
process_dest_actor_flags((*rpc_invoke_req.opt_dest_actor_flags).inner, superglobals);
106-
}
107-
RpcServerInstanceState::get().buffer.store_bytes(rpc_invoke_req.query);
108-
}
109-
11083
} // namespace
11184

11285
namespace kphp::rpc {
11386

114-
void init_server(tl::K2InvokeRpc invoke_rpc) noexcept {
115-
auto& net_pid{invoke_rpc.net_pid};
116-
auto& rpc_invoke_req{invoke_rpc.rpc_invoke_req.inner};
87+
void init_server(tl::TLBuffer& tlb) noexcept {
88+
tl::K2InvokeRpc invoke_rpc{};
89+
if (!invoke_rpc.fetch(tlb)) [[unlikely]] {
90+
kphp::log::error("erroneous rpc request");
91+
}
92+
11793
auto& rpc_server_instance_st{RpcServerInstanceState::get()};
118-
auto& superglobals{PhpScriptMutableGlobals::current().get_superglobals()};
94+
rpc_server_instance_st.query_id = invoke_rpc.query_id.value;
95+
rpc_server_instance_st.buffer.store_bytes(invoke_rpc.query);
11996

120-
rpc_server_instance_st.query_id = rpc_invoke_req.query_id.value;
97+
const auto opt_magic{rpc_server_instance_st.buffer.lookup_trivial<uint32_t>()};
98+
if (!opt_magic) [[unlikely]] {
99+
kphp::log::error("erroneous rpc request");
100+
}
121101

122-
superglobals.v$_SERVER.set_value(string{RPC_REQUEST_ID.data(), RPC_REQUEST_ID.size()}, rpc_invoke_req.query_id.value);
123-
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_IP.data(), RPC_REMOTE_IP.size()}, static_cast<int64_t>(net_pid.get_ip()));
124-
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_PORT.data(), RPC_REMOTE_PORT.size()}, static_cast<int64_t>(net_pid.get_port()));
125-
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_PID.data(), RPC_REMOTE_PID.size()}, static_cast<int64_t>(net_pid.get_pid()));
126-
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_UTIME.data(), RPC_REMOTE_UTIME.size()}, static_cast<int64_t>(net_pid.get_utime()));
127-
process_rpc_invoke_req(rpc_invoke_req, superglobals);
102+
auto& superglobals{PhpScriptMutableGlobals::current().get_superglobals()};
103+
superglobals.v$_SERVER.set_value(string{RPC_REQUEST_ID.data(), RPC_REQUEST_ID.size()}, invoke_rpc.query_id.value);
104+
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_IP.data(), RPC_REMOTE_IP.size()}, static_cast<int64_t>(invoke_rpc.net_pid.get_ip()));
105+
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_PORT.data(), RPC_REMOTE_PORT.size()}, static_cast<int64_t>(invoke_rpc.net_pid.get_port()));
106+
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_PID.data(), RPC_REMOTE_PID.size()}, static_cast<int64_t>(invoke_rpc.net_pid.get_pid()));
107+
superglobals.v$_SERVER.set_value(string{RPC_REMOTE_UTIME.data(), RPC_REMOTE_UTIME.size()}, static_cast<int64_t>(invoke_rpc.net_pid.get_utime()));
108+
if (invoke_rpc.opt_actor_id) {
109+
superglobals.v$_SERVER.set_value(string{RPC_ACTOR_ID.data(), RPC_ACTOR_ID.size()}, (*invoke_rpc.opt_actor_id).value);
110+
}
111+
if (invoke_rpc.opt_extra) {
112+
superglobals.v$_SERVER.set_value(string{RPC_EXTRA_FLAGS.data(), RPC_EXTRA_FLAGS.size()}, static_cast<int64_t>((*invoke_rpc.opt_extra).flags.value));
113+
process_rpc_invoke_req_extra(*invoke_rpc.opt_extra, superglobals);
114+
}
128115
kphp::log::info("rpc server initialized with: "
129116
"remote pid -> {}, "
130117
"remote port -> {}, "
131118
"query_id -> {}, "
132-
"dest_actor -> {}, "
133-
"dest_flags -> {:#b}, "
134-
"dest_actor_flags -> actor_id {}, flags {:#b}",
135-
net_pid.get_pid(), net_pid.get_port(), rpc_invoke_req.query_id.value,
136-
rpc_invoke_req.opt_dest_actor.has_value() ? (*rpc_invoke_req.opt_dest_actor).inner.actor_id.value : 0,
137-
rpc_invoke_req.opt_dest_flags.has_value() ? (*rpc_invoke_req.opt_dest_flags).inner.flags.value : 0,
138-
rpc_invoke_req.opt_dest_actor_flags.has_value() ? (*rpc_invoke_req.opt_dest_actor_flags).inner.actor_id.value : 0,
139-
rpc_invoke_req.opt_dest_actor_flags.has_value() ? (*rpc_invoke_req.opt_dest_actor_flags).inner.flags.value : 0);
119+
"actor_id -> {}, "
120+
"extra -> {:#b}, "
121+
"request -> {:#x}",
122+
invoke_rpc.net_pid.get_pid(), invoke_rpc.net_pid.get_port(), invoke_rpc.query_id.value,
123+
invoke_rpc.opt_actor_id.has_value() ? (*invoke_rpc.opt_actor_id).value : 0,
124+
invoke_rpc.opt_extra.has_value() ? (*invoke_rpc.opt_extra).flags.value : 0, *opt_magic);
140125
}
141126

142127
} // namespace kphp::rpc

runtime-light/server/rpc/init-functions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
#pragma once
66

7-
#include "runtime-light/tl/tl-functions.h"
7+
#include "runtime-light/tl/tl-core.h"
88

99
namespace kphp::rpc {
1010

11-
void init_server(tl::K2InvokeRpc invoke_rpc) noexcept;
11+
void init_server(tl::TLBuffer& tlb) noexcept;
1212

1313
} // namespace kphp::rpc

runtime-light/state/instance-state.cpp

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,7 @@ kphp::coro::task<> InstanceState::init_cli_instance() noexcept {
105105
}
106106

107107
kphp::coro::task<> InstanceState::init_server_instance() noexcept {
108-
auto init_k2_invoke_http{[](tl::TLBuffer& tlb) noexcept {
109-
tl::K2InvokeHttp invoke_http{};
110-
if (!invoke_http.fetch(tlb)) [[unlikely]] {
111-
kphp::log::error("erroneous http request");
112-
}
113-
kphp::http::init_server(std::move(invoke_http));
114-
}};
115-
auto init_k2_invoke_rpc{[](tl::TLBuffer& tlb) noexcept {
116-
tl::K2InvokeRpc invoke_rpc{};
117-
if (!invoke_rpc.fetch(tlb)) [[unlikely]] {
118-
kphp::log::error("erroneous rpc request");
119-
}
120-
kphp::rpc::init_server(std::move(invoke_rpc));
121-
}};
122-
auto init_k2_invoke_jw{[](tl::TLBuffer& tlb) noexcept {
108+
auto init_k2_invoke_jw{[](tl::TLBuffer& tlb) noexcept { // TODO rework
123109
tl::K2InvokeJobWorker invoke_jw{};
124110
if (!invoke_jw.fetch(tlb)) [[unlikely]] {
125111
kphp::log::error("erroneous job worker request");
@@ -147,13 +133,13 @@ kphp::coro::task<> InstanceState::init_server_instance() noexcept {
147133
case tl::K2_INVOKE_HTTP_MAGIC: {
148134
instance_kind_ = instance_kind::http_server;
149135
standard_stream_ = stream_d;
150-
init_k2_invoke_http(tlb);
136+
kphp::http::init_server(tlb);
151137
break;
152138
}
153139
case tl::K2_INVOKE_RPC_MAGIC: {
154140
instance_kind_ = instance_kind::rpc_server;
155141
standard_stream_ = stream_d;
156-
init_k2_invoke_rpc(tlb);
142+
kphp::rpc::init_server(tlb);
157143
break;
158144
}
159145
case tl::K2_INVOKE_JOB_WORKER_MAGIC: {

runtime-light/stdlib/rpc/rpc-api.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
#pragma once
66

7-
#include <cinttypes>
87
#include <concepts>
98
#include <cstddef>
109
#include <cstdint>
@@ -171,16 +170,15 @@ inline kphp::coro::task<bool> f$store_error(int64_t error_code, string error_msg
171170
}
172171

173172
tl::TLBuffer tlb; // FIXME reserve exact size
174-
tl::K2RpcResponse{.flags = {.value = 0x0},
175-
.req_result = {.inner = tl::reqError{.error_code = {.value = static_cast<int32_t>(error_code)},
176-
.error = {.value = {error_msg.c_str(), error_msg.size()}}}}}
173+
tl::K2RpcResponse{.value = tl::k2RpcResponseError{.error_code = tl::i32{.value = static_cast<int32_t>(error_code)},
174+
.error = tl::string{.value = {error_msg.c_str(), error_msg.size()}}}}
177175
.store(tlb);
178176

179177
auto expected{co_await kphp::rpc::send_response({reinterpret_cast<const std::byte*>(tlb.data()), tlb.size()})};
180178
if (!expected) [[unlikely]] {
181179
kphp::log::warning("can't store RPC error: {}", std::to_underlying(expected.error()));
182180
}
183-
kphp::log::error("store_error called. error_code: %" PRIi64 ", error_msg: %s", error_code, error_msg.c_str());
181+
kphp::log::error("store_error called. error_code: {}, error_msg: {}", error_code, error_msg.c_str());
184182
std::unreachable();
185183
}
186184

@@ -196,8 +194,8 @@ inline kphp::coro::task<> f$rpc_server_store_response(class_instance<C$VK$TL$Rpc
196194
// so create a TLBuffer owned by this coroutine
197195
auto& rpc_server_instance_st{RpcServerInstanceState::get()};
198196
tl::TLBuffer tlb; // FIXME reserve exact size
199-
tl::K2RpcResponse{.flags = {.value = 0x0},
200-
.req_result = {.inner = std::string_view{rpc_server_instance_st.buffer.data(), rpc_server_instance_st.buffer.size()}}}
197+
tl::K2RpcResponse{
198+
.value = tl::k2RpcResponseHeader{.flags = {}, .extra = {}, .result = {rpc_server_instance_st.buffer.data(), rpc_server_instance_st.buffer.size()}}}
201199
.store(tlb);
202200
auto expected{co_await kphp::rpc::send_response({reinterpret_cast<const std::byte*>(tlb.data()), tlb.size()})};
203201
if (!expected) [[unlikely]] {

runtime-light/tl/tl-functions.cpp

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -127,36 +127,4 @@ bool K2InvokeHttp::fetch(tl::TLBuffer& tlb) noexcept {
127127
return ok;
128128
}
129129

130-
// ===== RPC =====
131-
132-
bool rpcInvokeReq::fetch(tl::TLBuffer& tlb) noexcept {
133-
bool ok{query_id.fetch(tlb)};
134-
bool fetched{};
135-
while (ok && !fetched) {
136-
const auto magic{tlb.lookup_trivial<uint32_t>().value_or(TL_ZERO)};
137-
switch (magic) {
138-
case TL_RPC_DEST_ACTOR: {
139-
ok &= !opt_dest_actor.has_value() && opt_dest_actor.emplace().fetch(tlb);
140-
break;
141-
}
142-
case TL_RPC_DEST_FLAGS: {
143-
ok &= !opt_dest_flags.has_value() && opt_dest_flags.emplace().fetch(tlb);
144-
break;
145-
}
146-
case TL_RPC_DEST_ACTOR_FLAGS: {
147-
ok &= !opt_dest_actor_flags.has_value() && opt_dest_actor_flags.emplace().fetch(tlb);
148-
break;
149-
}
150-
default: {
151-
const auto opt_query{tlb.fetch_bytes(tlb.remaining())};
152-
query = opt_query.value_or(std::string_view{});
153-
ok &= opt_query.has_value();
154-
fetched = true;
155-
break;
156-
}
157-
}
158-
}
159-
return ok;
160-
}
161-
162130
} // namespace tl

runtime-light/tl/tl-functions.h

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <optional>
99
#include <string_view>
1010

11-
#include "common/tl/constants/common.h"
1211
#include "runtime-light/tl/tl-core.h"
1312
#include "runtime-light/tl/tl-types.h"
1413

@@ -128,7 +127,7 @@ struct ConfdataGetWildcard final {
128127

129128
// ===== HTTP =====
130129

131-
inline constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909'efe8;
130+
inline constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0x80c3'7baa;
132131

133132
class K2InvokeHttp final {
134133
static constexpr auto SCHEME_FLAG = static_cast<uint32_t>(1U << 0U);
@@ -148,38 +147,35 @@ class K2InvokeHttp final {
148147

149148
// ===== RPC =====
150149

151-
struct rpcInvokeReq final {
152-
tl::i64 query_id{};
153-
std::optional<tl::RpcDestActor> opt_dest_actor;
154-
std::optional<tl::RpcDestFlags> opt_dest_flags;
155-
std::optional<tl::RpcDestActorFlags> opt_dest_actor_flags;
156-
std::string_view query;
157-
158-
bool fetch(tl::TLBuffer& tlb) noexcept;
159-
};
160-
161-
struct RpcInvokeReq final {
162-
tl::rpcInvokeReq inner{};
163-
164-
bool fetch(tl::TLBuffer& tlb) noexcept {
165-
tl::details::magic magic{};
166-
return magic.fetch(tlb) && magic.expect(TL_RPC_INVOKE_REQ) && inner.fetch(tlb);
167-
}
168-
};
169-
170150
inline constexpr uint32_t K2_INVOKE_RPC_MAGIC = 0xd909'efe9;
171151

172-
struct K2InvokeRpc final {
152+
class K2InvokeRpc final {
153+
static constexpr auto ACTOR_ID_FLAG = static_cast<uint32_t>(1U << 0U);
154+
static constexpr auto EXTRA_FLAG = static_cast<uint32_t>(1U << 1U);
155+
156+
public:
157+
tl::details::mask flags{};
158+
tl::i64 query_id{};
173159
tl::netPid net_pid{};
174-
tl::RpcInvokeReq rpc_invoke_req{};
160+
std::optional<tl::i64> opt_actor_id;
161+
std::optional<tl::rpcInvokeReqExtra> opt_extra;
162+
std::string_view query;
175163

176164
bool fetch(tl::TLBuffer& tlb) noexcept {
177165
tl::details::magic magic{};
178166
bool ok{magic.fetch(tlb) && magic.expect(K2_INVOKE_RPC_MAGIC)};
179-
ok &= tl::details::mask{}.fetch(tlb);
167+
ok &= flags.fetch(tlb);
168+
ok &= query_id.fetch(tlb);
180169
ok &= net_pid.fetch(tlb);
181-
ok &= rpc_invoke_req.fetch(tlb);
182-
return ok;
170+
if (static_cast<bool>(flags.value & ACTOR_ID_FLAG)) {
171+
ok &= opt_actor_id.emplace().fetch(tlb);
172+
}
173+
if (static_cast<bool>(flags.value & EXTRA_FLAG)) {
174+
ok &= opt_extra.emplace().fetch(tlb);
175+
}
176+
const auto opt_query{tlb.fetch_bytes(tlb.remaining())};
177+
query = opt_query.value_or(std::string_view{});
178+
return ok && opt_query.has_value();
183179
}
184180
};
185181

runtime-light/tl/tl-types.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,7 @@ bool CertInfoItem::fetch(TLBuffer& tlb) noexcept {
155155
// ===== RPC =====
156156

157157
bool rpcInvokeReqExtra::fetch(tl::TLBuffer& tlb) noexcept {
158-
return_binlog_pos = static_cast<bool>(flags.value & RETURN_BINLOG_POS_FLAG);
159-
return_binlog_time = static_cast<bool>(flags.value & RETURN_BINLOG_TIME_FLAG);
160-
return_pid = static_cast<bool>(flags.value & RETURN_PID_FLAG);
161-
return_request_sizes = static_cast<bool>(flags.value & RETURN_REQUEST_SIZES_FLAG);
162-
return_failed_subqueries = static_cast<bool>(flags.value & RETURN_FAILED_SUBQUERIES_FLAG);
163-
return_query_stats = static_cast<bool>(flags.value & RETURN_QUERY_STATS_FLAG);
164-
no_result = static_cast<bool>(flags.value & NORESULT_FLAG);
165-
return_view_number = static_cast<bool>(flags.value & RETURN_VIEW_NUMBER_FLAG);
166-
167-
bool ok{true};
158+
bool ok{flags.fetch(tlb)};
168159
if (ok && static_cast<bool>(flags.value & WAIT_BINLOG_POS_FLAG)) {
169160
ok &= opt_wait_binlog_pos.emplace().fetch(tlb);
170161
}
@@ -190,10 +181,19 @@ bool rpcInvokeReqExtra::fetch(tl::TLBuffer& tlb) noexcept {
190181
ok &= opt_random_delay.emplace().fetch(tlb);
191182
}
192183

184+
return_binlog_pos = static_cast<bool>(flags.value & RETURN_BINLOG_POS_FLAG);
185+
return_binlog_time = static_cast<bool>(flags.value & RETURN_BINLOG_TIME_FLAG);
186+
return_pid = static_cast<bool>(flags.value & RETURN_PID_FLAG);
187+
return_request_sizes = static_cast<bool>(flags.value & RETURN_REQUEST_SIZES_FLAG);
188+
return_failed_subqueries = static_cast<bool>(flags.value & RETURN_FAILED_SUBQUERIES_FLAG);
189+
return_query_stats = static_cast<bool>(flags.value & RETURN_QUERY_STATS_FLAG);
190+
no_result = static_cast<bool>(flags.value & NORESULT_FLAG);
191+
return_view_number = static_cast<bool>(flags.value & RETURN_VIEW_NUMBER_FLAG);
193192
return ok;
194193
}
195194

196195
void rpcReqResultExtra::store(tl::TLBuffer& tlb) const noexcept {
196+
flags.store(tlb);
197197
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
198198
binlog_pos.store(tlb);
199199
}

0 commit comments

Comments
 (0)