|
43 | 43 | RedpandaVersion, |
44 | 44 | ) |
45 | 45 | from rptest.tests.redpanda_test import RedpandaTest |
46 | | -from rptest.util import expect_exception, expect_http_error |
| 46 | +from rptest.util import expect_exception, expect_http_error, wait_until_result |
47 | 47 | from rptest.utils.si_utils import BucketView |
48 | 48 |
|
49 | 49 | BOOTSTRAP_CONFIG = { |
@@ -71,7 +71,10 @@ def check_restart_clears(admin, redpanda, nodes=None): |
71 | 71 | nodes = redpanda.nodes |
72 | 72 |
|
73 | 73 | status = admin.get_cluster_config_status() |
74 | | - for n in status: |
| 74 | + relevant_ids = {redpanda.node_id(n) for n in nodes} |
| 75 | + relevant = [s for s in status if s["node_id"] in relevant_ids] |
| 76 | + assert len(relevant_ids) == len(relevant) |
| 77 | + for n in relevant: |
75 | 78 | assert n["restart"] is True |
76 | 79 |
|
77 | 80 | first_node = nodes[0] |
@@ -141,6 +144,33 @@ def is_complete(node): |
141 | 144 | ) |
142 | 145 |
|
143 | 146 |
|
| 147 | +def wait_for_active_nodes_version_status_sync(admin, redpanda, version, nodes): |
| 148 | + """ |
| 149 | + Like wait_for_version_status_sync, but only requires the subset of |
| 150 | + `active_nodes` to agree on `version`. Statuses for other nodes (e.g. a |
| 151 | + downed node still listed by the controller) are ignored. |
| 152 | + """ |
| 153 | + active_ids = {redpanda.node_id(n) for n in nodes} |
| 154 | + |
| 155 | + def is_complete(node): |
| 156 | + node_status = admin.get_cluster_config_status(node=node) |
| 157 | + relevant = [s for s in node_status if s["node_id"] in active_ids] |
| 158 | + return len(relevant) == len(active_ids) and { |
| 159 | + s["config_version"] for s in relevant |
| 160 | + } == {version} |
| 161 | + |
| 162 | + for node in nodes: |
| 163 | + wait_until( |
| 164 | + lambda n=node: is_complete(n), |
| 165 | + timeout_sec=10, |
| 166 | + backoff_sec=0.5, |
| 167 | + err_msg=( |
| 168 | + f"Config status did not converge on {version} for active " |
| 169 | + f"nodes {sorted(active_ids)}" |
| 170 | + ), |
| 171 | + ) |
| 172 | + |
| 173 | + |
144 | 174 | class ClusterConfigBootstrapTest(RedpandaTest): |
145 | 175 | def __init__(self, *args, **kwargs): |
146 | 176 | super().__init__(*args, extra_rp_conf={}, **kwargs) |
@@ -2599,6 +2629,337 @@ def assert_restart_status(expect: bool): |
2599 | 2629 | assert n["restart"] is False |
2600 | 2630 |
|
2601 | 2631 |
|
| 2632 | +class ClusterConfigMultiNodeBootstrapTest(RedpandaTest): |
| 2633 | + def __init__(self, test_context): |
| 2634 | + super().__init__( |
| 2635 | + test_context, num_brokers=3, si_settings=SISettings(test_context) |
| 2636 | + ) |
| 2637 | + self.admin = Admin(self.redpanda) |
| 2638 | + self.rpk = RpkTool(self.redpanda) |
| 2639 | + |
| 2640 | + def setUp(self): |
| 2641 | + # Skip starting redpanda, so that test can explicitly start |
| 2642 | + # it with some override_cfg_params |
| 2643 | + pass |
| 2644 | + |
| 2645 | + def _local_replica_stms(self, node, topic_name, partition): |
| 2646 | + """ |
| 2647 | + Return the set of stm names registered for `topic_name`/`partition` on |
| 2648 | + `node`'s local replica, or None if the replica has not materialized yet. |
| 2649 | + Suitable for passing to wait_until_result. |
| 2650 | + """ |
| 2651 | + node_id = self.redpanda.node_id(node) |
| 2652 | + state = self.admin.get_partition_state( |
| 2653 | + "kafka", topic_name, partition, node=node |
| 2654 | + ) |
| 2655 | + for r in state.get("replicas", []): |
| 2656 | + if r.get("raft_state", {}).get("node_id") == node_id: |
| 2657 | + return {s["name"] for s in r["raft_state"].get("stms", [])} |
| 2658 | + return None |
| 2659 | + |
| 2660 | + @cluster(num_nodes=3) |
| 2661 | + def test_node_delayed_restart(self): |
| 2662 | + """ |
| 2663 | + A node which has gone down should see the most up to date cluster config immediately in the bootstrap process, instead of needing to restart again. |
| 2664 | + """ |
| 2665 | + |
| 2666 | + def assert_restart_status_on_nodes(expect: bool, relevant_nodes): |
| 2667 | + relevant_ids = {self.redpanda.node_id(n) for n in relevant_nodes} |
| 2668 | + status = self.admin.get_cluster_config_status() |
| 2669 | + relevant = [s for s in status if s["node_id"] in relevant_ids] |
| 2670 | + assert len(relevant_ids) == len(relevant) |
| 2671 | + for n in relevant: |
| 2672 | + assert n["restart"] is expect, ( |
| 2673 | + f"Expected restart status {n['restart']} to be {expect}" |
| 2674 | + ) |
| 2675 | + |
| 2676 | + active_nodes = self.redpanda.nodes[0:2] |
| 2677 | + down_node = self.redpanda.nodes[2] |
| 2678 | + all_nodes = self.redpanda.nodes |
| 2679 | + self.redpanda.start(all_nodes) |
| 2680 | + |
| 2681 | + # Wait for config status to populate |
| 2682 | + wait_until( |
| 2683 | + lambda: len(self.admin.get_cluster_config_status()) == 3, |
| 2684 | + timeout_sec=30, |
| 2685 | + backoff_sec=1, |
| 2686 | + ) |
| 2687 | + |
| 2688 | + assert_restart_status_on_nodes(False, all_nodes) |
| 2689 | + |
| 2690 | + # Bring one of the nodes down. |
| 2691 | + self.redpanda.stop_node(down_node) |
| 2692 | + |
| 2693 | + # An arbitrary restart-requiring setting with a non-default value |
| 2694 | + new_setting = (CLOUD_TOPICS_CONFIG_STR, True) |
| 2695 | + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) |
| 2696 | + new_version = patch_result["config_version"] |
| 2697 | + wait_for_active_nodes_version_status_sync( |
| 2698 | + self.admin, self.redpanda, new_version, nodes=active_nodes |
| 2699 | + ) |
| 2700 | + assert_restart_status_on_nodes(True, active_nodes) |
| 2701 | + |
| 2702 | + # Restart existing nodes to get them into a clean state |
| 2703 | + check_restart_clears(self.admin, self.redpanda, nodes=active_nodes) |
| 2704 | + |
| 2705 | + config = { |
| 2706 | + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, |
| 2707 | + } |
| 2708 | + topic_name = "tapioca" |
| 2709 | + self.rpk.create_topic( |
| 2710 | + topic=topic_name, |
| 2711 | + partitions=1, |
| 2712 | + replicas=3, |
| 2713 | + config=config, |
| 2714 | + ) |
| 2715 | + topic_desc = self.rpk.describe_topic_configs(topic_name) |
| 2716 | + assert ( |
| 2717 | + topic_desc[TopicSpec.PROPERTY_STORAGE_MODE][0] |
| 2718 | + == TopicSpec.STORAGE_MODE_CLOUD |
| 2719 | + ) |
| 2720 | + |
| 2721 | + # Start the node back up. |
| 2722 | + self.redpanda.start_node(down_node) |
| 2723 | + |
| 2724 | + # Verify ctp_stm is registered on down_node's local replica of the |
| 2725 | + # cloud topic. This proves bootstrap applied cloud_topics_enabled=true |
| 2726 | + # (a needs_restart=yes property) before partition_manager constructed |
| 2727 | + # the partition; otherwise ctp_stm would be missing on this node |
| 2728 | + # until another restart. |
| 2729 | + down_node_id = self.redpanda.node_id(down_node) |
| 2730 | + stm_names = wait_until_result( |
| 2731 | + lambda: self._local_replica_stms(down_node, topic_name, 0), |
| 2732 | + timeout_sec=30, |
| 2733 | + backoff_sec=1, |
| 2734 | + err_msg=f"{topic_name} replica never materialized on restarted " |
| 2735 | + f"node {down_node_id}", |
| 2736 | + ) |
| 2737 | + assert "ctp_stm" in stm_names, ( |
| 2738 | + f"ctp_stm missing on restarted node {down_node_id}; got stms {stm_names}. Bootstrap did not apply cloud_topics_enabled before partition_manager built {topic_name}." |
| 2739 | + ) |
| 2740 | + |
| 2741 | + status = self.admin.get_cluster_config_status() |
| 2742 | + for n in status: |
| 2743 | + assert n["restart"] is False |
| 2744 | + |
| 2745 | + @cluster(num_nodes=3) |
| 2746 | + def test_cloud_topic_on_joining_node(self): |
| 2747 | + """ |
| 2748 | + A node joining a cluster for the first time should pick up |
| 2749 | + cloud_topics_enabled (a needs_restart=yes property) from the |
| 2750 | + register_with_cluster join snapshot, so that partition_manager |
| 2751 | + registers ctp_stm when the cloud topic's partition is constructed. |
| 2752 | + """ |
| 2753 | + seed_nodes = self.redpanda.nodes[0:2] |
| 2754 | + joiner_node = self.redpanda.nodes[2] |
| 2755 | + |
| 2756 | + # Bring up a 2-node cluster first. |
| 2757 | + self.redpanda.start(seed_nodes) |
| 2758 | + wait_until( |
| 2759 | + lambda: len(self.admin.get_cluster_config_status()) == 2, |
| 2760 | + timeout_sec=30, |
| 2761 | + backoff_sec=1, |
| 2762 | + ) |
| 2763 | + |
| 2764 | + # Enable cloud_topics_enabled and restart both seeds so the value |
| 2765 | + # is in active on whichever seed ends up serving the joiner's |
| 2766 | + # register_with_cluster RPC and validating the cloud topic create. |
| 2767 | + new_setting = (CLOUD_TOPICS_CONFIG_STR, True) |
| 2768 | + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) |
| 2769 | + new_version = patch_result["config_version"] |
| 2770 | + wait_for_active_nodes_version_status_sync( |
| 2771 | + self.admin, self.redpanda, new_version, nodes=seed_nodes |
| 2772 | + ) |
| 2773 | + self.redpanda.restart_nodes(seed_nodes) |
| 2774 | + seed_ids = {self.redpanda.node_id(n) for n in seed_nodes} |
| 2775 | + wait_until( |
| 2776 | + lambda: all( |
| 2777 | + s["restart"] is False |
| 2778 | + for s in self.admin.get_cluster_config_status() |
| 2779 | + if s["node_id"] in seed_ids |
| 2780 | + ), |
| 2781 | + timeout_sec=30, |
| 2782 | + backoff_sec=1, |
| 2783 | + err_msg="seed restart flag did not clear after seed restart", |
| 2784 | + ) |
| 2785 | + |
| 2786 | + # Join the third node for the first time. |
| 2787 | + self.redpanda.start_node(joiner_node) |
| 2788 | + wait_until( |
| 2789 | + lambda: len(self.admin.get_cluster_config_status()) == 3, |
| 2790 | + timeout_sec=30, |
| 2791 | + backoff_sec=1, |
| 2792 | + ) |
| 2793 | + |
| 2794 | + # Create a cloud topic with rf=3 so the joiner hosts a replica. |
| 2795 | + topic_name = "tapioca_joiner" |
| 2796 | + self.rpk.create_topic( |
| 2797 | + topic=topic_name, |
| 2798 | + partitions=1, |
| 2799 | + replicas=3, |
| 2800 | + config={ |
| 2801 | + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, |
| 2802 | + }, |
| 2803 | + ) |
| 2804 | + |
| 2805 | + # ctp_stm must be registered on the joiner's replica. If the |
| 2806 | + # joiner's bootstrap left cloud_topics_enabled in pending instead |
| 2807 | + # of active, partition_manager would build the partition without |
| 2808 | + # ctp_stm and we'd silently lose cloud-topics functionality on |
| 2809 | + # this node until another restart. |
| 2810 | + joiner_id = self.redpanda.node_id(joiner_node) |
| 2811 | + stm_names = wait_until_result( |
| 2812 | + lambda: self._local_replica_stms(joiner_node, topic_name, 0), |
| 2813 | + timeout_sec=30, |
| 2814 | + backoff_sec=1, |
| 2815 | + err_msg=f"{topic_name} replica never materialized on joiner " |
| 2816 | + f"node {joiner_id}", |
| 2817 | + ) |
| 2818 | + assert "ctp_stm" in stm_names, ( |
| 2819 | + f"ctp_stm missing on joiner node {joiner_id}; got stms {stm_names}. " |
| 2820 | + f"register_with_cluster snapshot did not apply cloud_topics_enabled " |
| 2821 | + f"before partition_manager built {topic_name}." |
| 2822 | + ) |
| 2823 | + |
| 2824 | + status = self.admin.get_cluster_config_status() |
| 2825 | + for n in status: |
| 2826 | + assert n["restart"] is False, ( |
| 2827 | + f"Unexpected restart=true after fresh join: {status}" |
| 2828 | + ) |
| 2829 | + |
| 2830 | + @cluster(num_nodes=3) |
| 2831 | + def test_cluster_recovery_needs_restart_property(self): |
| 2832 | + """ |
| 2833 | + After cluster recovery applies a needs_restart=yes property, the |
| 2834 | + active value should remain at the default until nodes restart. |
| 2835 | + After a restart, the recovered value should be in active because |
| 2836 | + bootstrap reads the local cache (which apply_delta -> store_delta |
| 2837 | + wrote during recovery). |
| 2838 | + """ |
| 2839 | + # Faster cluster metadata upload so the source backup is captured |
| 2840 | + # quickly. enable_cluster_metadata_upload_loop is true by default. |
| 2841 | + self.redpanda.add_extra_rp_conf( |
| 2842 | + { |
| 2843 | + "controller_snapshot_max_age_sec": 1, |
| 2844 | + "cloud_storage_cluster_metadata_upload_interval_ms": 1000, |
| 2845 | + } |
| 2846 | + ) |
| 2847 | + |
| 2848 | + all_nodes = self.redpanda.nodes |
| 2849 | + self.redpanda.start(all_nodes) |
| 2850 | + wait_until( |
| 2851 | + lambda: len(self.admin.get_cluster_config_status()) == 3, |
| 2852 | + timeout_sec=30, |
| 2853 | + backoff_sec=1, |
| 2854 | + ) |
| 2855 | + |
| 2856 | + PROPERTY_NAME = "storage_compaction_key_map_memory_limit_percent" |
| 2857 | + PROPERTY_DEFAULT = 12 |
| 2858 | + NEW_PROPERTY_VALUE = 6 |
| 2859 | + # storage_compaction_key_map_memory_limit_percent is needs_restart=yes with default 12. |
| 2860 | + new_setting = ( |
| 2861 | + PROPERTY_NAME, |
| 2862 | + NEW_PROPERTY_VALUE, |
| 2863 | + ) |
| 2864 | + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) |
| 2865 | + new_version = patch_result["config_version"] |
| 2866 | + wait_for_active_nodes_version_status_sync( |
| 2867 | + self.admin, self.redpanda, new_version, nodes=all_nodes |
| 2868 | + ) |
| 2869 | + |
| 2870 | + # Let the metadata upload loop capture the post-patch state. |
| 2871 | + time.sleep(5) |
| 2872 | + |
| 2873 | + # Wipe and bring up a fresh cluster. |
| 2874 | + self.redpanda.stop() |
| 2875 | + for n in all_nodes: |
| 2876 | + self.redpanda.remove_local_data(n) |
| 2877 | + self.redpanda.restart_nodes(all_nodes) |
| 2878 | + self.admin.await_stable_leader( |
| 2879 | + "controller", |
| 2880 | + partition=0, |
| 2881 | + namespace="redpanda", |
| 2882 | + timeout_s=60, |
| 2883 | + backoff_s=2, |
| 2884 | + ) |
| 2885 | + |
| 2886 | + # Use suppress_pending=True so we read the active value only, not |
| 2887 | + # the pending-aware view that rpk cluster_config_get returns by |
| 2888 | + # default. We want to verify that the recovered value lands in |
| 2889 | + # pending without changing active until restart. |
| 2890 | + for n in all_nodes: |
| 2891 | + v = self.admin.get_cluster_config( |
| 2892 | + node=n, key=PROPERTY_NAME, suppress_pending=True |
| 2893 | + )[PROPERTY_NAME] |
| 2894 | + assert v == PROPERTY_DEFAULT, ( |
| 2895 | + f"Expected active {PROPERTY_NAME}={v} to be default value " |
| 2896 | + f"{PROPERTY_DEFAULT=} on {n.name} pre-recovery" |
| 2897 | + ) |
| 2898 | + |
| 2899 | + # Run cluster recovery. |
| 2900 | + self.admin.initialize_cluster_recovery() |
| 2901 | + |
| 2902 | + def cluster_recovery_complete(): |
| 2903 | + return ( |
| 2904 | + "inactive" in self.admin.get_cluster_recovery_status().json()["state"] |
| 2905 | + ) |
| 2906 | + |
| 2907 | + wait_until(cluster_recovery_complete, timeout_sec=60, backoff_sec=1) |
| 2908 | + |
| 2909 | + status = self.admin.get_cluster_config_status() |
| 2910 | + for n in status: |
| 2911 | + assert n["restart"] is True, ( |
| 2912 | + f"Expected restart=true after recovery for needs_restart " |
| 2913 | + f"property, got status {status}" |
| 2914 | + ) |
| 2915 | + |
| 2916 | + # After recovery, the needs_restart=yes property is in pending. |
| 2917 | + # Active stays at the pre-recovery value; the pending-aware view |
| 2918 | + # already reflects the recovered value. |
| 2919 | + for n in all_nodes: |
| 2920 | + active = self.admin.get_cluster_config( |
| 2921 | + node=n, key=PROPERTY_NAME, suppress_pending=True |
| 2922 | + )[PROPERTY_NAME] |
| 2923 | + assert active == PROPERTY_DEFAULT, ( |
| 2924 | + f"Expected active {PROPERTY_NAME}={active} to still be " |
| 2925 | + f"default {PROPERTY_DEFAULT=} on {n.name} after recovery " |
| 2926 | + f"(needs_restart=yes properties land in pending, not active)" |
| 2927 | + ) |
| 2928 | + pending = self.admin.get_cluster_config(node=n, key=PROPERTY_NAME)[ |
| 2929 | + PROPERTY_NAME |
| 2930 | + ] |
| 2931 | + assert pending == NEW_PROPERTY_VALUE, ( |
| 2932 | + f"Expected pending-aware view of {PROPERTY_NAME}={pending} " |
| 2933 | + f"to reflect the recovered value {NEW_PROPERTY_VALUE=} on " |
| 2934 | + f"{n.name}" |
| 2935 | + ) |
| 2936 | + |
| 2937 | + self.redpanda.restart_nodes(all_nodes) |
| 2938 | + self.admin.await_stable_leader( |
| 2939 | + "controller", |
| 2940 | + partition=0, |
| 2941 | + namespace="redpanda", |
| 2942 | + timeout_s=60, |
| 2943 | + backoff_s=2, |
| 2944 | + ) |
| 2945 | + |
| 2946 | + # After restart, hydrate_cluster_config -> load_cache -> |
| 2947 | + # preload_local writes the recovered value into active. |
| 2948 | + for n in all_nodes: |
| 2949 | + v = self.admin.get_cluster_config( |
| 2950 | + node=n, key=PROPERTY_NAME, suppress_pending=True |
| 2951 | + )[PROPERTY_NAME] |
| 2952 | + assert v == NEW_PROPERTY_VALUE, ( |
| 2953 | + f"Expected active {PROPERTY_NAME}={v} to be " |
| 2954 | + f"{NEW_PROPERTY_VALUE=} on {n.name} after recovery + restart" |
| 2955 | + ) |
| 2956 | + status = self.admin.get_cluster_config_status() |
| 2957 | + for n in status: |
| 2958 | + assert n["restart"] is False, ( |
| 2959 | + f"Unexpected restart=true after post-recovery restart: {status}" |
| 2960 | + ) |
| 2961 | + |
| 2962 | + |
2602 | 2963 | class ClusterConfigLegacyDefaultTest(RedpandaTest, ClusterConfigHelpersMixin): |
2603 | 2964 | """ |
2604 | 2965 | Test config::legacy_default feature, that defaults for features can be |
|
0 commit comments