1616#include " cloud_storage_clients/client_pool.h"
1717#include " cluster/cloud_metadata/offsets_upload_router.h"
1818#include " cluster/cloud_metadata/offsets_uploader.h"
19+ #include " cluster/cluster_discovery.h"
1920#include " cluster/config_manager.h"
2021#include " cluster/controller.h"
2122#include " cluster/node_isolation_watcher.h"
@@ -338,7 +339,11 @@ int application::run(int ac, char** av) {
338339 // Cluster config validation uses OpenSSL (e.g. TLS cipher
339340 // checks), so crypto must be initialized first.
340341 wire_up_and_start_crypto_services ();
342+ wire_up_pre_bootstrap_services ();
341343 hydrate_cluster_config (node_cfg_yaml);
344+ wire_up_and_start_rpc_service ();
345+ establish_cluster_view (app_signal);
346+ log_cluster_config ();
342347 init_crashtracker (app_signal);
343348 initialize ();
344349 check_environment ();
@@ -387,9 +392,20 @@ void application::initialize(
387392 std::optional<YAML::Node> schema_reg_client_cfg,
388393 std::optional<YAML::Node> audit_log_client_cfg) {
389394 ss::smp::invoke_on_all ([] {
390- // initialize memory groups now that our configuration is loaded
395+ // re-initialize memory groups now that our configuration is loaded
396+ memory_groups_holder ().reset ();
391397 memory_groups ();
392398 }).get ();
399+
400+ // With memory groups re-initialized, we can now set a proper memory
401+ // capacity in the _rpc server (which was constructed before a consistent
402+ // cluster view was established).
403+ _rpc
404+ .invoke_on_all ([](rpc::rpc_server& r) {
405+ r.set_memory_capacity (memory_groups ().rpc_total_memory ());
406+ })
407+ .get ();
408+
393409 construct_service (
394410 _memory_sampling, std::ref (_log), ss::sharded_parameter ([]() {
395411 return config::shard_local_cfg ().sampled_memory_profile .bind ();
@@ -460,21 +476,6 @@ void application::initialize(
460476 " data directory" , config::node ().data_directory ().path );
461477 syschecks::pidfile_create (config::node ().pidfile_path ());
462478 }
463- smp_groups::config smp_groups_cfg{
464- .raft_group_max_non_local_requests
465- = config::shard_local_cfg ().raft_smp_max_non_local_requests ().value_or (
466- smp_groups::default_raft_non_local_requests (
467- config::shard_local_cfg ().topic_partitions_per_shard ())),
468- .proxy_group_max_non_local_requests
469- = config::shard_local_cfg ().pp_sr_smp_max_non_local_requests ().value_or (
470- smp_groups::default_max_nonlocal_requests)};
471-
472- smp_service_groups.create_groups (smp_groups_cfg).get ();
473- _deferred.emplace_back (
474- [this ] { smp_service_groups.destroy_groups ().get (); });
475-
476- // Ensure the scheduling groups singleton is initialized early
477- std::ignore = scheduling_groups::instance ();
478479
479480 construct_service (_scheduling_groups_probe).get ();
480481 _scheduling_groups_probe
@@ -514,6 +515,10 @@ void application::initialize(
514515}
515516
516517void application::setup_metrics () {
518+ // Two systems that were created in the pre-bootstrapping process that may
519+ // now need their metrics enabled.
520+ feature_table.invoke_on_all (&features::feature_table::setup_metrics).get ();
521+ _rpc.invoke_on_all (&rpc::rpc_server::setup_metrics).get ();
517522 setup_internal_metrics ();
518523 setup_public_metrics ();
519524}
@@ -724,31 +729,78 @@ YAML::Node application::hydrate_node_config(const po::variables_map& cfg) {
724729 return config;
725730}
726731
727- void application::hydrate_cluster_config (const YAML::Node& config) {
728- auto config_printer = [this ](std::string_view service, const auto & cfg) {
729- std::vector<ss::sstring> items;
730- cfg.for_each ([&items, &service](const auto & item) {
731- items.push_back (
732- ssx::sformat (" {}.{}\t - {}" , service, item, item.desc ()));
733- });
734- std::sort (items.begin (), items.end ());
735- for (const auto & item : items) {
736- vlog (_log.info , " {}" , item);
737- }
738- };
732+ // Forward declarations of helper functions defined in application_config.cc
733+ std::optional<storage::file_sanitize_config> read_file_sanitizer_config ();
734+
735+ storage::kvstore_config kvstore_config_from_global_config (
736+ std::optional<storage::file_sanitize_config> sanitizer_config);
737+
738+ storage::log_config manager_config_from_global_config (
739+ scheduling_groups& sgs,
740+ std::optional<storage::file_sanitize_config> sanitizer_config);
741+
742+ void application::wire_up_pre_bootstrap_services () {
743+ // Ensure the scheduling groups singleton is initialized early
744+ std::ignore = scheduling_groups::instance ();
745+
746+ // Construct the feature table
747+ syschecks::systemd_message (" Creating feature table" ).get ();
748+ construct_service (feature_table).get ();
749+
750+ // Construct local storage
751+ const auto sanitizer_config = read_file_sanitizer_config ();
752+ syschecks::systemd_message (" Creating storage" ).get ();
753+ construct_service (
754+ storage,
755+ [c = sanitizer_config]() mutable {
756+ return kvstore_config_from_global_config (std::move (c));
757+ },
758+ [c = sanitizer_config]() mutable {
759+ auto log_cfg = manager_config_from_global_config (
760+ scheduling_groups::instance (), std::move (c));
761+ log_cfg.reclaim_opts .background_reclaimer_sg
762+ = scheduling_groups::instance ().cache_background_reclaim_sg ();
763+ return log_cfg;
764+ },
765+ std::ref (feature_table))
766+ .get ();
767+
768+ // Construct smp groups using potentially stale cluster config.
769+ auto & cfg = config::shard_local_cfg_unsafe ();
770+ smp_groups::config smp_groups_cfg{
771+ .raft_group_max_non_local_requests
772+ = cfg.raft_smp_max_non_local_requests ().value_or (
773+ smp_groups::default_raft_non_local_requests (
774+ cfg.topic_partitions_per_shard ())),
775+ .proxy_group_max_non_local_requests
776+ = cfg.pp_sr_smp_max_non_local_requests ().value_or (
777+ smp_groups::default_max_nonlocal_requests)};
778+
779+ smp_service_groups.create_groups (smp_groups_cfg).get ();
780+ _deferred.emplace_back (
781+ [this ] { smp_service_groups.destroy_groups ().get (); });
782+ }
739783
784+ void application::establish_cluster_view (::stop_signal& app_signal) {
785+ bootstrap_from_kvstore ().get ();
786+
787+ // Begin the cluster discovery manager so we can confirm our initial node
788+ // ID. A valid node ID is required before we can initialize the rest of our
789+ // subsystems. The local node and cluster UUIDs would have been set in
790+ // bootstrap_from_kvstore().
791+ _cluster_discovery = std::make_unique<cluster::cluster_discovery>(
792+ storage.local ().node_uuid (),
793+ storage.local ().get_cluster_uuid (),
794+ app_signal.abort_source ());
795+
796+ bootstrap_controller_view ().get ();
797+ }
798+
799+ void application::hydrate_cluster_config (const YAML::Node& config) {
740800 // This includes loading from local bootstrap file or legacy
741801 // config file on first-start or upgrade cases.
742802 _config_preload = cluster::config_manager::preload (config).get ();
743803
744- vlog (_log.info , " Cluster configuration properties:" );
745- vlog (_log.info , " (use `rpk cluster config edit` to change)" );
746- config_printer (" redpanda" , config::shard_local_cfg ());
747-
748- vlog (_log.info , " Node configuration properties:" );
749- vlog (_log.info , " (use `rpk redpanda config set <cfg> <value>` to change)" );
750- config_printer (" redpanda" , config::node ());
751-
752804 if (config[" pandaproxy" ]) {
753805 _proxy_config.emplace (config[" pandaproxy" ]);
754806 for (const auto & e : _proxy_config->errors ()) {
@@ -768,8 +820,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
768820 set_local_kafka_client_config (_proxy_client_config, config::node ());
769821 }
770822 set_pp_kafka_client_defaults (*_proxy_config, *_proxy_client_config);
771- config_printer (" pandaproxy" , *_proxy_config);
772- config_printer (" pandaproxy_client" , *_proxy_client_config);
773823 }
774824 if (config[" schema_registry" ]) {
775825 _schema_reg_config.emplace (config[" schema_registry" ]);
@@ -780,8 +830,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
780830 _schema_reg_client_config, config::node ());
781831 }
782832 set_sr_kafka_client_defaults (*_schema_reg_client_config);
783- config_printer (" schema_registry" , *_schema_reg_config);
784- config_printer (" schema_registry_client" , *_schema_reg_client_config);
785833 }
786834 // / Auditing will be toggled via cluster config settings, internal audit
787835 // / client options can be configured via local config properties
@@ -791,7 +839,44 @@ void application::hydrate_cluster_config(const YAML::Node& config) {
791839 set_local_kafka_client_config (_audit_log_client_config, config::node ());
792840 }
793841 set_auditing_kafka_client_defaults (*_audit_log_client_config);
794- config_printer (" audit_log_client" , *_audit_log_client_config);
842+ }
843+
844+ void application::log_cluster_config () {
845+ auto config_printer = [this ](std::string_view service, const auto & cfg) {
846+ std::vector<ss::sstring> items;
847+ cfg.for_each ([&items, &service](const auto & item) {
848+ items.push_back (
849+ ssx::sformat (" {}.{}\t - {}" , service, item, item.desc ()));
850+ });
851+ std::sort (items.begin (), items.end ());
852+ for (const auto & item : items) {
853+ vlog (_log.info , " {}" , item);
854+ }
855+ };
856+
857+ vlog (_log.info , " Cluster configuration properties:" );
858+ vlog (_log.info , " (use `rpk cluster config edit` to change)" );
859+ config_printer (" redpanda" , config::shard_local_cfg ());
860+
861+ vlog (_log.info , " Node configuration properties:" );
862+ vlog (_log.info , " (use `rpk redpanda config set <cfg> <value>` to change)" );
863+ config_printer (" redpanda" , config::node ());
864+
865+ if (_proxy_config) {
866+ config_printer (" pandaproxy" , *_proxy_config);
867+ }
868+ if (_proxy_client_config) {
869+ config_printer (" pandaproxy_client" , *_proxy_client_config);
870+ }
871+ if (_schema_reg_config) {
872+ config_printer (" schema_registry" , *_schema_reg_config);
873+ }
874+ if (_schema_reg_client_config) {
875+ config_printer (" schema_registry_client" , *_schema_reg_client_config);
876+ }
877+ if (_audit_log_client_config) {
878+ config_printer (" audit_log_client" , *_audit_log_client_config);
879+ }
795880}
796881
797882void application::check_environment () {
0 commit comments