Skip to content

Commit 558d0d1

Browse files
committed
feat(script): add SCRIPT KILL and lua-time-limit support
This commit merges all changes of the current dev branch compared to unstable into a single commit. It includes implementation of the lua-time-limit configuration option, SCRIPT KILL command to safely interrupt long-running scripts, connection guard optimizations, worker event loop recursive event handling, and macOS TCP listener socket sharing.
1 parent 82eaa90 commit 558d0d1

15 files changed

Lines changed: 374 additions & 11 deletions

File tree

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ if ((CMAKE_CXX_COMPILER_ID STREQUAL "GNU") OR (CMAKE_CXX_COMPILER_ID STREQUAL "C
254254
endif()
255255
endif()
256256

257+
if (APPLE AND ENABLE_LUAJIT)
258+
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-no_deduplicate")
259+
endif()
260+
257261
# kvrocks objects target
258262
file(GLOB_RECURSE KVROCKS_SRCS src/*.cc)
259263
list(FILTER KVROCKS_SRCS EXCLUDE REGEX src/cli/main.cc)
@@ -286,6 +290,9 @@ target_compile_definitions(kvrocks_objs PUBLIC KVROCKS_STORAGE_ENGINE=RocksDB)
286290
if(ENABLE_OPENSSL)
287291
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_OPENSSL)
288292
endif()
293+
if(ENABLE_LUAJIT)
294+
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_LUAJIT)
295+
endif()
289296
if(ENABLE_NEW_ENCODING)
290297
target_compile_definitions(kvrocks_objs PUBLIC METADATA_ENCODING_VERSION=1)
291298
else()

cmake/luajit.cmake

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,26 @@ FetchContent_GetProperties(luajit)
4747
if (NOT lua_POPULATED)
4848
FetchContent_Populate(luajit)
4949

50-
set(LUA_CFLAGS "-DLUA_ANSI -DENABLE_CJSON_GLOBAL -DREDIS_STATIC= -DLUA_USE_MKSTEMP")
50+
# We use LUAJIT_CFLAGS instead of LUA_CFLAGS and we do not define LUA_ANSI.
51+
# LuaJIT relies heavily on architecture-specific assembly code and compiler optimization settings,
52+
# and LUA_ANSI is meant for standard Lua to restrict to ANSI C which LuaJIT does not support.
53+
set(LUAJIT_CFLAGS "-DENABLE_CJSON_GLOBAL -DREDIS_STATIC= -DLUA_USE_MKSTEMP -DLUAJIT_ENABLE_CHECKHOOK")
5154
if((CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") OR
5255
(CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang"))
53-
set(LUA_CFLAGS "${LUA_CFLAGS} -isysroot ${CMAKE_OSX_SYSROOT}")
56+
set(LUAJIT_CFLAGS "${LUAJIT_CFLAGS} -isysroot ${CMAKE_OSX_SYSROOT}")
5457
endif ()
5558

5659
if (CMAKE_HOST_APPLE)
5760
set(MACOSX_TARGET "MACOSX_DEPLOYMENT_TARGET=${CMAKE_OSX_DEPLOYMENT_TARGET}")
5861
endif ()
5962

63+
# We pass C compiler path CC and custom preprocessor definitions via XCFLAGS instead of CFLAGS.
64+
# Overriding CFLAGS on the make command line completely discards LuaJIT's own Makefile CFLAGS (such as
65+
# -O2 and target-specific compiler optimizations), which breaks the JIT compiler and assembler VM
66+
# stack unwinding on platforms like macOS arm64, leading to CPU spin locks or crashes.
6067
add_custom_target(make_luajit
6168
COMMAND ${MAKE_COMMAND} libluajit.a ${NINJA_MAKE_JOBS_FLAG}
62-
"CFLAGS=${LUA_CFLAGS}" ${MACOSX_TARGET}
69+
"CC=${CMAKE_C_COMPILER}" "XCFLAGS=${LUAJIT_CFLAGS}" ${MACOSX_TARGET}
6370
COMMAND ${CMAKE_COMMAND} -E make_directory ${luajit_BINARY_DIR}/include
6471
COMMAND sh -c "cp ${luajit_SOURCE_DIR}/src/*.h ${luajit_SOURCE_DIR}/src/*.hpp ${luajit_BINARY_DIR}/include/"
6572
WORKING_DIRECTORY ${luajit_SOURCE_DIR}/src

kvrocks.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,13 @@ txn-context-enabled no
492492
# Default: no
493493
lua-strict-key-accessing no
494494

495+
# Max execution time of a Lua script in milliseconds.
496+
# If the maximum execution time is reached, the server will log it
497+
# and start rejecting other queries with a BUSY error.
498+
# Setting it to 0 or a negative value means no timeout.
499+
# Default: 0 (disabled)
500+
lua-time-limit 0
501+
495502
################################## TLS ###################################
496503

497504
# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.

src/commands/cmd_script.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class CommandScript : public Commander {
6868
// There's a little tricky here since the script command was the write type
6969
// command but some subcommands like `exists` were readonly, so we want to allow
7070
// executing on slave here. Maybe we should find other way to do this.
71-
if (srv->IsSlave() && subcommand_ != "exists") {
71+
if (srv->IsSlave() && subcommand_ != "exists" && subcommand_ != "kill") {
7272
return {Status::RedisReadOnly, "You can't write against a read only slave"};
7373
}
7474

@@ -84,6 +84,12 @@ class CommandScript : public Commander {
8484
return s;
8585
}
8686
*output = redis::RESP_OK;
87+
} else if (args_.size() == 2 && subcommand_ == "kill") {
88+
auto s = srv->ScriptKill();
89+
if (!s) {
90+
return s;
91+
}
92+
*output = redis::RESP_OK;
8793
} else if (args_.size() >= 3 && subcommand_ == "exists") {
8894
*output = redis::MultiLen(args_.size() - 2);
8995
for (size_t j = 2; j < args_.size(); j++) {

src/config/config.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ Config::Config() {
263263
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
264264
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")},
265265
{"lua-strict-key-accessing", false, new YesNoField(&lua_strict_key_accessing, false)},
266+
{"lua-time-limit", false, new IntField(&lua_time_limit, 0, -1, INT_MAX)},
266267

267268
/* rocksdb options */
268269
{"rocksdb.compression", false,

src/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ struct Config {
205205

206206
bool lua_strict_key_accessing = false;
207207

208+
int lua_time_limit = 0;
209+
208210
std::vector<double> histogram_bucket_boundaries;
209211

210212
struct RocksDB {

src/server/redis_connection.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,18 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
433433
to_process_cmds->pop_front();
434434
if (cmd_tokens.empty()) continue;
435435

436+
bool is_script_kill = (util::EqualICase(cmd_tokens.front(), "script") && cmd_tokens.size() >= 2 &&
437+
util::EqualICase(cmd_tokens[1], "kill"));
438+
bool is_shutdown = util::EqualICase(cmd_tokens.front(), "shutdown");
439+
440+
if (srv_->IsScriptTimedOut()) {
441+
if (!is_script_kill && !is_shutdown) {
442+
Reply(redis::Error({Status::RedisErrorNoPrefix,
443+
"BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE."}));
444+
continue;
445+
}
446+
}
447+
436448
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
437449
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break;
438450
auto multi_error_exit = MakeScopeExit([&] {
@@ -457,7 +469,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
457469
continue;
458470
}
459471
auto current_cmd = std::move(*cmd_s);
460-
461472
const auto &attributes = current_cmd->GetAttributes();
462473
auto cmd_name = attributes->name;
463474

@@ -494,7 +505,9 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
494505
// that can guarantee other threads can't come into critical zone, such as DEBUG,
495506
// CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future).
496507
// Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time.
497-
if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) {
508+
if (is_script_kill || is_shutdown) {
509+
// Bypass locks to allow SCRIPT KILL and SHUTDOWN to run even if another thread is stuck in a script
510+
} else if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) {
498511
// No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard'
499512
} else if (cmd_flags & kCmdExclusive) {
500513
exclusivity = srv_->WorkExclusivityGuard();

src/server/server.cc

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,16 @@ Server::Server(engine::Storage *storage, Config *config)
101101
// init shard pub/sub channels
102102
pubsub_shard_channels_.resize(config->cluster_enabled ? HASH_SLOTS_SIZE : 1);
103103

104+
std::vector<int> listen_fds;
104105
for (int i = 0; i < config->workers; i++) {
105-
auto worker = std::make_unique<Worker>(this, config);
106+
#ifdef __APPLE__
107+
auto worker = std::make_unique<Worker>(this, config, listen_fds);
108+
if (i == 0) {
109+
listen_fds = worker->GetTCPListenFDs();
110+
}
111+
#else
112+
auto worker = std::make_unique<Worker>(this, config, std::vector<int>{});
113+
#endif
106114
// multiple workers can't listen to the same unix socket, so
107115
// listen unix socket only from a single worker - the first one
108116
if (!config->unixsocket.empty() && i == 0) {
@@ -255,6 +263,13 @@ void Server::Stop() {
255263
if (replication_thread_) replication_thread_->Stop();
256264
slaveof_mu_.unlock();
257265

266+
{
267+
std::lock_guard<std::mutex> guard(running_scripts_mu_);
268+
for (auto *rctx : running_scripts_) {
269+
rctx->is_killed = true;
270+
}
271+
}
272+
258273
for (const auto &worker : worker_threads_) {
259274
worker->Stop(0 /* immediately terminate */);
260275
}
@@ -1879,6 +1894,61 @@ StatusOr<std::unique_ptr<redis::Commander>> Server::LookupAndCreateCommand(const
18791894
return std::move(cmd);
18801895
}
18811896

1897+
void Server::RegisterRunningScript(lua::ScriptRunCtx *rctx) {
1898+
std::lock_guard<std::mutex> guard(running_scripts_mu_);
1899+
running_scripts_.push_back(rctx);
1900+
running_script_count_.fetch_add(1, std::memory_order_relaxed);
1901+
}
1902+
1903+
void Server::UnregisterRunningScript(lua::ScriptRunCtx *rctx) {
1904+
std::lock_guard<std::mutex> guard(running_scripts_mu_);
1905+
auto it = std::find(running_scripts_.begin(), running_scripts_.end(), rctx);
1906+
if (it != running_scripts_.end()) {
1907+
running_scripts_.erase(it);
1908+
running_script_count_.fetch_sub(1, std::memory_order_relaxed);
1909+
}
1910+
}
1911+
1912+
bool Server::IsScriptTimedOut() const {
1913+
int limit = config_->lua_time_limit;
1914+
if (limit <= 0) return false;
1915+
if (running_script_count_.load(std::memory_order_relaxed) == 0) return false;
1916+
1917+
uint64_t now_ms = util::GetTimeStampMS();
1918+
std::lock_guard<std::mutex> guard(running_scripts_mu_);
1919+
for (const auto *rctx : running_scripts_) {
1920+
if (now_ms - rctx->start_time_ms >= static_cast<uint64_t>(limit)) {
1921+
return true;
1922+
}
1923+
}
1924+
return false;
1925+
}
1926+
1927+
Status Server::ScriptKill() {
1928+
std::lock_guard<std::mutex> guard(running_scripts_mu_);
1929+
if (running_scripts_.empty()) {
1930+
return {Status::NotOK, "NOTBUSY No scripts in execution right now."};
1931+
}
1932+
1933+
bool has_unkillable = false;
1934+
for (auto *rctx : running_scripts_) {
1935+
if (rctx->is_write_dirty) {
1936+
has_unkillable = true;
1937+
} else {
1938+
rctx->is_killed = true;
1939+
}
1940+
}
1941+
1942+
if (has_unkillable) {
1943+
return {Status::NotOK,
1944+
"UNKILLABLE Sorry the script already executed write commands against the dataset. "
1945+
"You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE "
1946+
"command."};
1947+
}
1948+
1949+
return Status::OK();
1950+
}
1951+
18821952
Status Server::ScriptExists(const std::string &sha) const {
18831953
std::string body;
18841954
return ScriptGet(sha, &body);
@@ -2068,8 +2138,14 @@ void Server::AdjustWorkerThreads() {
20682138
}
20692139

20702140
void Server::increaseWorkerThreads(size_t delta) {
2141+
std::vector<int> listen_fds;
2142+
#ifdef __APPLE__
2143+
if (!worker_threads_.empty()) {
2144+
listen_fds = worker_threads_[0]->GetWorker()->GetTCPListenFDs();
2145+
}
2146+
#endif
20712147
for (size_t i = 0; i < delta; i++) {
2072-
auto worker = std::make_unique<Worker>(this, config_);
2148+
auto worker = std::make_unique<Worker>(this, config_, listen_fds);
20732149
auto worker_thread = std::make_unique<WorkerThread>(std::move(worker));
20742150
worker_thread->Start();
20752151
worker_threads_.emplace_back(std::move(worker_thread));

src/server/server.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@
6262

6363
constexpr const char *REDIS_VERSION = "7.0.0";
6464

65+
namespace lua {
66+
struct ScriptRunCtx;
67+
}
68+
6569
struct DBScanInfo {
6670
// Last scan system clock in seconds
6771
int64_t last_scan_time_secs = 0;
@@ -315,6 +319,11 @@ class Server {
315319
void KillClient(int64_t *killed, const std::string &addr, uint64_t id, uint64_t type, bool skipme,
316320
redis::Connection *conn);
317321

322+
void RegisterRunningScript(lua::ScriptRunCtx *rctx);
323+
void UnregisterRunningScript(lua::ScriptRunCtx *rctx);
324+
bool IsScriptTimedOut() const;
325+
Status ScriptKill();
326+
318327
Status ScriptExists(const std::string &sha) const;
319328
Status ScriptGet(const std::string &sha, std::string *body) const;
320329
Status ScriptSet(const std::string &sha, const std::string &body) const;
@@ -478,4 +487,8 @@ class Server {
478487
uint64_t id;
479488
};
480489
std::vector<PausedConnEntry> paused_conns_;
490+
491+
std::vector<lua::ScriptRunCtx *> running_scripts_;
492+
mutable std::mutex running_scripts_mu_;
493+
std::atomic<size_t> running_script_count_{0};
481494
};

src/server/worker.cc

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
#include "server.h"
5555
#include "storage/scripting.h"
5656

57-
Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) {
57+
Worker::Worker(Server *srv, Config *config, const std::vector<int> &tcp_listen_fds)
58+
: srv(srv), base_(event_base_new()) {
5859
if (!base_) throw std::runtime_error{"event base failed to be created"};
5960

6061
timer_.reset(NewEvent(base_, -1, EV_PERSIST));
@@ -66,6 +67,19 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
6667
ERROR("[worker] Failed to listen to socket with fd: {}, Error: {}", config->socket_fd, s.Msg());
6768
exit(1);
6869
}
70+
} else if (!tcp_listen_fds.empty()) {
71+
for (int fd : tcp_listen_fds) {
72+
int dup_fd = dup(fd);
73+
if (dup_fd == -1) {
74+
ERROR("[worker] Failed to dup fd: {}, Error: {}", fd, strerror(errno));
75+
exit(1);
76+
}
77+
evconnlistener *lev = NewEvconnlistener<&Worker::newTCPConnection>(
78+
base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, config->backlog, dup_fd);
79+
listen_events_.emplace_back(lev);
80+
tcp_listen_fds_.push_back(dup_fd);
81+
INFO("[worker] Listening on shared TCP fd: {} (original: {})", dup_fd, fd);
82+
}
6983
} else {
7084
const uint32_t ports[3] = {config->port, config->tls_port, 0};
7185

@@ -241,6 +255,7 @@ Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
241255
evconnlistener *lev =
242256
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
243257
listen_events_.emplace_back(lev);
258+
tcp_listen_fds_.push_back(dup_fd);
244259
INFO("[worker] Listening on dup'ed fd: {}", dup_fd);
245260
return Status::OK();
246261
}
@@ -285,6 +300,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
285300
auto lev =
286301
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd);
287302
listen_events_.emplace_back(lev);
303+
tcp_listen_fds_.push_back(fd);
288304
}
289305

290306
return Status::OK();
@@ -582,6 +598,33 @@ void Worker::LuaReset() {
582598

583599
int64_t Worker::GetLuaMemorySize() { return (int64_t)lua_gc(lua_, LUA_GCCOUNT, 0) * 1024; }
584600

601+
void Worker::PollEventLoop() {
602+
event_base_loop(base_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
603+
604+
// Flush all active connections' output buffers to their sockets
605+
std::lock_guard<std::mutex> lock(conns_mu_);
606+
for (const auto &iter : conns_) {
607+
auto *conn = iter.second;
608+
auto *bev = conn->GetBufferEvent();
609+
if (bev) {
610+
bool is_tls = false;
611+
#ifdef ENABLE_OPENSSL
612+
if (bufferevent_openssl_get_ssl(bev) != nullptr) {
613+
is_tls = true;
614+
}
615+
#endif
616+
if (is_tls) {
617+
bufferevent_flush(bev, EV_WRITE, BEV_FLUSH);
618+
} else {
619+
auto *output = bufferevent_get_output(bev);
620+
if (evbuffer_get_length(output) > 0) {
621+
evbuffer_write(output, conn->GetFD());
622+
}
623+
}
624+
}
625+
}
626+
}
627+
585628
void Worker::KickoutIdleClients(int timeout) {
586629
std::vector<std::pair<int, uint64_t>> to_be_killed_conns;
587630

0 commit comments

Comments
 (0)