Skip to content

Commit 43ac676

Browse files
committed
redpanda: the great bootstrap refactor
The start-up process for `redpanda` now looks like this: 1. Cluster configs are hydrated from the local `config_cache` and legacy `node_cfg_yaml`. 2. `storage::api` is constructed (but not started), and the local `kvstore` is recovered. 3. We bootstrap from the `kvstore`: 1. Apply the local feature table snapshot (potentially stale) from the `kvstore`. 2. We attempt to load any existing cluster/node UUIDs from the `kvstore`. 4. We perform cluster discovery - if we are a joiner, we will perform our RPC to register with the cluster, and collect the `controller_join_snapshot` this way. If we are simply a restarter, this is mostly a no-op. 5. We bootstrap our view of the controller - if we were a joiner, we already have our required snapshot. If we are a restarter, we utilize the new RPC to fetch the controller snapshot from the leader. Unlike the first time registration RPC, we do permit this to fail, and will operate with stale configs/feature tables during bootstrap. If we have a controller snapshot (obtained in either aforementioned way), it is applied to the feature table, and the configuration is preloaded. 6. We now have a consistent view with the rest of the cluster of the feature table and cluster configuration for the rest of the bootstrapping process. raft0 replay still happens _after_ the rest of the system is started. A big caveat with cluster recovery - `needs_restart` properties fetched can no longer be applied until the recovered node has restarted. There is no clear way to make this _not_ the behavior.
1 parent 26a6f29 commit 43ac676

10 files changed

Lines changed: 495 additions & 315 deletions

File tree

src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
126126
// Update the cluster config (via the controller, rather than shard local).
127127
cluster::config_update_request req;
128128
req.upsert.emplace_back("log_segment_size_jitter_percent", "1");
129+
req.upsert.emplace_back("log_segment_size", "2147483649");
129130
app.controller->get_config_frontend()
130131
.local()
131132
.patch(std::move(req), model::timeout_clock::now() + 30s)
@@ -228,6 +229,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
228229
raft0 = nullptr;
229230
restart(should_wipe::yes);
230231
task_local_cfg.get("log_segment_size_jitter_percent").reset();
232+
task_local_cfg.get("log_segment_size").reset();
231233
RPTEST_REQUIRE_EVENTUALLY(5s, [this] {
232234
return app.storage.local().get_cluster_uuid().has_value();
233235
});
@@ -239,6 +241,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
239241
.has_value());
240242
ASSERT_NE(
241243
1, config::shard_local_cfg().log_segment_size_jitter_percent.value());
244+
ASSERT_NE(2147483649, config::shard_local_cfg().log_segment_size.value());
242245
ASSERT_TRUE(!app.controller->get_credential_store().local().contains(
243246
security::credential_user{"userguy"}));
244247
ASSERT_EQ(
@@ -274,14 +277,26 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
274277
.is_recovery_active();
275278
});
276279

280+
bool has_restarted = false;
277281
// Validate the controller state is restored.
278282
auto validate_post_recovery = [&] {
279283
ASSERT_TRUE(app.controller->get_feature_table()
280284
.local()
281285
.get_configured_license()
282286
.has_value());
287+
// log_segment_size_jitter_percent is marked as needs_restart::yes. We
288+
// won't see its recovered value reflected until the node is restarted.
289+
auto log_segment_size_jitter_expected
290+
= has_restarted ? 1
291+
: config::shard_local_cfg()
292+
.log_segment_size_jitter_percent.default_value();
283293
ASSERT_EQ(
284-
1, config::shard_local_cfg().log_segment_size_jitter_percent.value());
294+
log_segment_size_jitter_expected,
295+
config::shard_local_cfg().log_segment_size_jitter_percent.value());
296+
// On the other hand, log_segment_size is marked as needs_restart::no,
297+
// so we will see its value reflected immediately.
298+
ASSERT_EQ(
299+
2147483649, config::shard_local_cfg().log_segment_size.value());
285300

286301
// Validate User restoration.
287302
ASSERT_TRUE(app.controller->get_credential_store().local().contains(
@@ -339,6 +354,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
339354

340355
// Sanity check that the above invariants still hold after restarting.
341356
restart(should_wipe::no);
357+
has_restarted = true;
342358
RPTEST_REQUIRE_EVENTUALLY(5s, [this] {
343359
auto latest_recovery = app.controller->get_cluster_recovery_table()
344360
.local()

src/v/kafka/client/test/fixture.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ class kafka_client_fixture : public redpanda_thread_fixture {
3131
auto& config = config::shard_local_cfg();
3232
config.get("disable_metrics").set_value(false);
3333
}).get();
34+
app.wire_up_and_start_crypto_services();
35+
app.wire_up_pre_bootstrap_services();
36+
app.hydrate_cluster_config(make_minimal_cfg());
37+
app.wire_up_and_start_rpc_service();
38+
app.establish_cluster_view(*app_signal);
3439
app.initialize(proxy_config(), proxy_client_config());
3540
app.check_environment();
36-
app.wire_up_and_start_crypto_services();
3741
app.wire_up_and_start(*app_signal, test_mode);
3842
}
3943

src/v/kafka/server/tests/topic_recreate_test.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,13 @@ class recreate_test_fixture : public redpanda_thread_fixture {
137137
auto& config = config::shard_local_cfg();
138138
config.get("disable_metrics").set_value(false);
139139
}).get();
140+
app.wire_up_and_start_crypto_services();
141+
app.wire_up_pre_bootstrap_services();
142+
app.hydrate_cluster_config(make_minimal_cfg());
143+
app.wire_up_and_start_rpc_service();
144+
app.establish_cluster_view(*app_signal);
140145
app.initialize(proxy_config(), proxy_client_config());
141146
app.check_environment();
142-
app.wire_up_and_start_crypto_services();
143147
app.wire_up_and_start(*app_signal, true);
144148
}
145149
};

src/v/redpanda/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ redpanda_cc_library(
3939
deps = [
4040
":cli_parser",
4141
"//src/v/base",
42+
"//src/v/bytes:iobuf_parser",
43+
"//src/v/bytes:iostream",
4244
"//src/v/cloud_io:cache",
4345
"//src/v/cloud_io:remote",
4446
"//src/v/cloud_storage",
@@ -66,6 +68,7 @@ redpanda_cc_library(
6668
"//src/v/cluster:offsets_lookup",
6769
"//src/v/cluster:partition_properties_stm",
6870
"//src/v/cluster:tx_manager_migrator_rpc",
71+
"//src/v/cluster:types",
6972
"//src/v/cluster/utils:partition_change_notifier_impl",
7073
"//src/v/cluster_link:fwd",
7174
"//src/v/cluster_link:rpc_service",
@@ -95,6 +98,7 @@ redpanda_cc_library(
9598
"//src/v/kafka/server:write_at_offset_stm",
9699
"//src/v/metrics",
97100
"//src/v/migrations",
101+
"//src/v/model",
98102
"//src/v/net",
99103
"//src/v/net:tls",
100104
"//src/v/pandaproxy:core",

src/v/redpanda/application.cc

Lines changed: 134 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "cloud_storage_clients/client_pool.h"
1717
#include "cluster/cloud_metadata/offsets_upload_router.h"
1818
#include "cluster/cloud_metadata/offsets_uploader.h"
19+
#include "cluster/cluster_discovery.h"
1920
#include "cluster/config_manager.h"
2021
#include "cluster/controller.h"
2122
#include "cluster/node_isolation_watcher.h"
@@ -338,7 +339,11 @@ int application::run(int ac, char** av) {
338339
// Cluster config validation uses OpenSSL (e.g. TLS cipher
339340
// checks), so crypto must be initialized first.
340341
wire_up_and_start_crypto_services();
342+
wire_up_pre_bootstrap_services();
341343
hydrate_cluster_config(node_cfg_yaml);
344+
wire_up_and_start_rpc_service();
345+
establish_cluster_view(app_signal);
346+
log_cluster_config();
342347
init_crashtracker(app_signal);
343348
initialize();
344349
check_environment();
@@ -387,9 +392,20 @@ void application::initialize(
387392
std::optional<YAML::Node> schema_reg_client_cfg,
388393
std::optional<YAML::Node> audit_log_client_cfg) {
389394
ss::smp::invoke_on_all([] {
390-
// initialize memory groups now that our configuration is loaded
395+
// re-initialize memory groups now that our configuration is loaded
396+
memory_groups_holder().reset();
391397
memory_groups();
392398
}).get();
399+
400+
// With memory groups re-initialized, we can now set a proper memory
401+
// capacity in the _rpc server (which was constructed before a consistent
402+
// cluster view was established).
403+
_rpc
404+
.invoke_on_all([](rpc::rpc_server& r) {
405+
r.set_memory_capacity(memory_groups().rpc_total_memory());
406+
})
407+
.get();
408+
393409
construct_service(
394410
_memory_sampling, std::ref(_log), ss::sharded_parameter([]() {
395411
return config::shard_local_cfg().sampled_memory_profile.bind();
@@ -460,21 +476,6 @@ void application::initialize(
460476
"data directory", config::node().data_directory().path);
461477
syschecks::pidfile_create(config::node().pidfile_path());
462478
}
463-
smp_groups::config smp_groups_cfg{
464-
.raft_group_max_non_local_requests
465-
= config::shard_local_cfg().raft_smp_max_non_local_requests().value_or(
466-
smp_groups::default_raft_non_local_requests(
467-
config::shard_local_cfg().topic_partitions_per_shard())),
468-
.proxy_group_max_non_local_requests
469-
= config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or(
470-
smp_groups::default_max_nonlocal_requests)};
471-
472-
smp_service_groups.create_groups(smp_groups_cfg).get();
473-
_deferred.emplace_back(
474-
[this] { smp_service_groups.destroy_groups().get(); });
475-
476-
// Ensure the scheduling groups singleton is initialized early
477-
std::ignore = scheduling_groups::instance();
478479

479480
construct_service(_scheduling_groups_probe).get();
480481
_scheduling_groups_probe
@@ -514,6 +515,18 @@ void application::initialize(
514515
}
515516

516517
void application::setup_metrics() {
518+
// Two systems that were created in the pre-bootstrapping process that may
519+
// now need their metrics enabled.
520+
feature_table.invoke_on_all(&features::feature_table::setup_metrics).get();
521+
_rpc
522+
.invoke_on_all([](rpc::rpc_server& r) {
523+
r.setup_metrics(
524+
/*disable_metrics=*/config::shard_local_cfg().disable_metrics(),
525+
/*disable_public_metrics=*/config::shard_local_cfg()
526+
.disable_public_metrics());
527+
})
528+
.get();
529+
_rpc.invoke_on_all(&rpc::rpc_server::setup_metrics).get();
517530
setup_internal_metrics();
518531
setup_public_metrics();
519532
}
@@ -724,31 +737,78 @@ YAML::Node application::hydrate_node_config(const po::variables_map& cfg) {
724737
return config;
725738
}
726739

727-
void application::hydrate_cluster_config(const YAML::Node& config) {
728-
auto config_printer = [this](std::string_view service, const auto& cfg) {
729-
std::vector<ss::sstring> items;
730-
cfg.for_each([&items, &service](const auto& item) {
731-
items.push_back(
732-
ssx::sformat("{}.{}\t- {}", service, item, item.desc()));
733-
});
734-
std::sort(items.begin(), items.end());
735-
for (const auto& item : items) {
736-
vlog(_log.info, "{}", item);
737-
}
738-
};
740+
// Forward declarations of helper functions defined in application_config.cc
741+
std::optional<storage::file_sanitize_config> read_file_sanitizer_config();
742+
743+
storage::kvstore_config kvstore_config_from_global_config(
744+
std::optional<storage::file_sanitize_config> sanitizer_config);
745+
746+
storage::log_config manager_config_from_global_config(
747+
scheduling_groups& sgs,
748+
std::optional<storage::file_sanitize_config> sanitizer_config);
749+
750+
void application::wire_up_pre_bootstrap_services() {
751+
// Ensure the scheduling groups singleton is initialized early
752+
std::ignore = scheduling_groups::instance();
753+
754+
// Construct the feature table
755+
syschecks::systemd_message("Creating feature table").get();
756+
construct_service(feature_table).get();
757+
758+
// Construct local storage
759+
const auto sanitizer_config = read_file_sanitizer_config();
760+
syschecks::systemd_message("Creating storage").get();
761+
construct_service(
762+
storage,
763+
[c = sanitizer_config]() mutable {
764+
return kvstore_config_from_global_config(std::move(c));
765+
},
766+
[c = sanitizer_config]() mutable {
767+
auto log_cfg = manager_config_from_global_config(
768+
scheduling_groups::instance(), std::move(c));
769+
log_cfg.reclaim_opts.background_reclaimer_sg
770+
= scheduling_groups::instance().cache_background_reclaim_sg();
771+
return log_cfg;
772+
},
773+
std::ref(feature_table))
774+
.get();
775+
776+
// Construct smp groups using potentially stale cluster config.
777+
auto& cfg = config::shard_local_cfg_unsafe();
778+
smp_groups::config smp_groups_cfg{
779+
.raft_group_max_non_local_requests
780+
= cfg.raft_smp_max_non_local_requests().value_or(
781+
smp_groups::default_raft_non_local_requests(
782+
cfg.topic_partitions_per_shard())),
783+
.proxy_group_max_non_local_requests
784+
= cfg.pp_sr_smp_max_non_local_requests().value_or(
785+
smp_groups::default_max_nonlocal_requests)};
786+
787+
smp_service_groups.create_groups(smp_groups_cfg).get();
788+
_deferred.emplace_back(
789+
[this] { smp_service_groups.destroy_groups().get(); });
790+
}
791+
792+
void application::establish_cluster_view(::stop_signal& app_signal) {
793+
bootstrap_from_kvstore().get();
739794

795+
// Begin the cluster discovery manager so we can confirm our initial node
796+
// ID. A valid node ID is required before we can initialize the rest of our
797+
// subsystems. The local node and cluster UUIDs would have been set in
798+
// bootstrap_from_kvstore().
799+
_cluster_discovery = std::make_unique<cluster::cluster_discovery>(
800+
storage.local().node_uuid(),
801+
storage.local().get_cluster_uuid(),
802+
app_signal.abort_source());
803+
804+
bootstrap_controller_view().get();
805+
}
806+
807+
void application::hydrate_cluster_config(const YAML::Node& config) {
740808
// This includes loading from local bootstrap file or legacy
741809
// config file on first-start or upgrade cases.
742810
_config_preload = cluster::config_manager::preload(config).get();
743811

744-
vlog(_log.info, "Cluster configuration properties:");
745-
vlog(_log.info, "(use `rpk cluster config edit` to change)");
746-
config_printer("redpanda", config::shard_local_cfg());
747-
748-
vlog(_log.info, "Node configuration properties:");
749-
vlog(_log.info, "(use `rpk redpanda config set <cfg> <value>` to change)");
750-
config_printer("redpanda", config::node());
751-
752812
if (config["pandaproxy"]) {
753813
_proxy_config.emplace(config["pandaproxy"]);
754814
for (const auto& e : _proxy_config->errors()) {
@@ -768,8 +828,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
768828
set_local_kafka_client_config(_proxy_client_config, config::node());
769829
}
770830
set_pp_kafka_client_defaults(*_proxy_config, *_proxy_client_config);
771-
config_printer("pandaproxy", *_proxy_config);
772-
config_printer("pandaproxy_client", *_proxy_client_config);
773831
}
774832
if (config["schema_registry"]) {
775833
_schema_reg_config.emplace(config["schema_registry"]);
@@ -780,8 +838,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
780838
_schema_reg_client_config, config::node());
781839
}
782840
set_sr_kafka_client_defaults(*_schema_reg_client_config);
783-
config_printer("schema_registry", *_schema_reg_config);
784-
config_printer("schema_registry_client", *_schema_reg_client_config);
785841
}
786842
/// Auditing will be toggled via cluster config settings, internal audit
787843
/// client options can be configured via local config properties
@@ -791,7 +847,44 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
791847
set_local_kafka_client_config(_audit_log_client_config, config::node());
792848
}
793849
set_auditing_kafka_client_defaults(*_audit_log_client_config);
794-
config_printer("audit_log_client", *_audit_log_client_config);
850+
}
851+
852+
void application::log_cluster_config() {
853+
auto config_printer = [this](std::string_view service, const auto& cfg) {
854+
std::vector<ss::sstring> items;
855+
cfg.for_each([&items, &service](const auto& item) {
856+
items.push_back(
857+
ssx::sformat("{}.{}\t- {}", service, item, item.desc()));
858+
});
859+
std::sort(items.begin(), items.end());
860+
for (const auto& item : items) {
861+
vlog(_log.info, "{}", item);
862+
}
863+
};
864+
865+
vlog(_log.info, "Cluster configuration properties:");
866+
vlog(_log.info, "(use `rpk cluster config edit` to change)");
867+
config_printer("redpanda", config::shard_local_cfg());
868+
869+
vlog(_log.info, "Node configuration properties:");
870+
vlog(_log.info, "(use `rpk redpanda config set <cfg> <value>` to change)");
871+
config_printer("redpanda", config::node());
872+
873+
if (_proxy_config) {
874+
config_printer("pandaproxy", *_proxy_config);
875+
}
876+
if (_proxy_client_config) {
877+
config_printer("pandaproxy_client", *_proxy_client_config);
878+
}
879+
if (_schema_reg_config) {
880+
config_printer("schema_registry", *_schema_reg_config);
881+
}
882+
if (_schema_reg_client_config) {
883+
config_printer("schema_registry_client", *_schema_reg_client_config);
884+
}
885+
if (_audit_log_client_config) {
886+
config_printer("audit_log_client", *_audit_log_client_config);
887+
}
795888
}
796889

797890
void application::check_environment() {

0 commit comments

Comments
 (0)