diff --git a/src/rpc/rpc_host_port.h b/src/rpc/rpc_host_port.h index f3e38cdd67..4aeb64e797 100644 --- a/src/rpc/rpc_host_port.h +++ b/src/rpc/rpc_host_port.h @@ -73,7 +73,7 @@ class TProtocol; } else { \ _target.reserve(_obj.field.size()); \ for (const auto &addr : _obj.field) { \ - _target.emplace_back(host_port::from_address(addr)); \ + _target.emplace_back(dsn::host_port::from_address(addr)); \ } \ } \ } while (0) diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 3dbd17be42..8df06065aa 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1692,7 +1692,10 @@ inline std::unique_ptr create_table_aggregate_stats_calcs row.app_id); for (const auto &pc : iter->second) { - if (pc.hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -1723,7 +1726,10 @@ create_partition_aggregate_stats_calcs(const int32_t table_id, partition_stat_map increases; partition_stat_map rates; for (size_t i = 0; i < rows.size(); ++i) { - if (pcs[i].hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pcs[i], primary, primary); + + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -1979,12 +1985,16 @@ inline bool get_app_partition_stat(shell_context *sc, m.name, app_id_x, partition_index_x, counter_name)) { // only primary partition will be counted const auto find = pcs_by_appid.find(app_id_x); - if (find != pcs_by_appid.end() && - find->second[partition_index_x].hp_primary == nodes[i].hp) { - row_data &row = rows[app_id_name[app_id_x]][partition_index_x]; - row.row_name = std::to_string(partition_index_x); - row.app_id = app_id_x; - update_app_pegasus_perf_counter(row, counter_name, m.value); + if (find != pcs_by_appid.end()) { + dsn::host_port primary; + GET_HOST_PORT(find->second[partition_index_x], primary, primary); + + if (primary == nodes[i].hp) { + row_data &row = rows[app_id_name[app_id_x]][partition_index_x]; + row.row_name = std::to_string(partition_index_x); + row.app_id = app_id_x; + update_app_pegasus_perf_counter(row, counter_name, m.value); + } } } else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) { // if the app_name from perf-counter isn't existed(maybe the app was dropped), it @@ -2233,8 +2243,13 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s if (find == pcs_by_appid.end()) // app id not found continue; auto &pc = find->second[partition_index_x]; - if (pc.hp_primary != nodes[i].hp) // not primary replica + + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + + if (primary != nodes[i].hp) { // not primary replica continue; + } if (pc.partition_flags != 0) // already calculated continue; pc.partition_flags = 1; diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp index 3c8c31ca7a..c463978c1b 100644 --- a/src/shell/commands/data_operations.cpp +++ b/src/shell/commands/data_operations.cpp @@ -2240,7 +2240,10 @@ create_rdb_estimated_keys_stats_calcs(const int32_t table_id, partition_stat_map sums; for (size_t i = 0; i < rows.size(); ++i) { - if (pcs[i].hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pcs[i], primary, primary); + + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -2885,9 +2888,15 @@ bool calculate_hash_value(command_executor *e, shell_context *sc, arguments args tp.add_row_name_and_data("partition_index", partition_index); if (pcs.size() > partition_index) { const auto &pc = pcs[partition_index]; - tp.add_row_name_and_data("primary", pc.hp_primary.to_string()); - tp.add_row_name_and_data("secondaries", - fmt::format("{}", fmt::join(pc.hp_secondaries, ","))); + + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + + tp.add_row_name_and_data("primary", primary.to_string()); + tp.add_row_name_and_data("secondaries", fmt::format("{}", fmt::join(secondaries, ","))); } } tp.output(std::cout); diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 8b8804287a..3df7b8be5d 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -556,13 +556,19 @@ bool ls_nodes(command_executor *, shell_context *sc, arguments args) } for (const auto &pc : pcs) { - if (pc.hp_primary) { - auto find = tmp_map.find(pc.hp_primary); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + + if (primary) { + auto find = tmp_map.find(primary); if (find != tmp_map.end()) { find->second.primary_count++; } } - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { auto find = tmp_map.find(secondary); if (find != tmp_map.end()) { find->second.secondary_count++; diff --git a/src/shell/commands/recovery.cpp b/src/shell/commands/recovery.cpp index ed0cef36aa..a75fbeaccd 100644 --- a/src/shell/commands/recovery.cpp +++ b/src/shell/commands/recovery.cpp @@ -166,18 +166,23 @@ bool recover(command_executor *e, shell_context *sc, arguments args) dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pinfo) { - if (pinfo.config.hp_last_drops.size() < 2) { + std::vector last_drops; + GET_HOST_PORTS(pinfo.config, last_drops, last_drops); + + if (last_drops.size() < 2) { return dsn::host_port(); } - std::vector last_two_nodes(pinfo.config.hp_last_drops.end() - 2, - pinfo.config.hp_last_drops.end()); + std::vector last_two_nodes(last_drops.end() - 2, last_drops.end()); std::vector last_dropped; for (const auto &node : last_two_nodes) { - const auto it = std::find_if( - pinfo.dropped.begin(), - pinfo.dropped.end(), - [&node](const dsn::replication::ddd_node_info &r) { return r.hp_node == node; }); + const auto it = std::find_if(pinfo.dropped.begin(), + pinfo.dropped.end(), + [&node](const dsn::replication::ddd_node_info &r) { + dsn::host_port hp_node_from_dropped; + GET_HOST_PORT(r, node, hp_node_from_dropped); + return hp_node_from_dropped == node; + }); if (it->is_alive && it->is_collected) { last_dropped.push_back(*it); } @@ -186,7 +191,9 @@ dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pi if (last_dropped.size() == 1) { const auto &ninfo = last_dropped.back(); if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree) { - return ninfo.hp_node; + dsn::host_port node; + GET_HOST_PORT(ninfo, node, node); + return node; } } else if (last_dropped.size() == 2) { const auto &secondary = last_dropped.front(); @@ -196,19 +203,24 @@ dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pi // - choose the node with the largest last committed decree // - if last committed decree is the same, choose node with the largest ballot + dsn::host_port latest_node; + dsn::host_port secondary_node; + GET_HOST_PORT(latest, node, latest_node); + GET_HOST_PORT(secondary, node, secondary_node); + if (latest.last_committed_decree == secondary.last_committed_decree && latest.last_committed_decree >= pinfo.config.last_committed_decree) { - return latest.ballot >= secondary.ballot ? latest.hp_node : secondary.hp_node; + return latest.ballot >= secondary.ballot ? latest_node : secondary_node; } if (latest.last_committed_decree > secondary.last_committed_decree && latest.last_committed_decree >= pinfo.config.last_committed_decree) { - return latest.hp_node; + return latest_node; } if (secondary.last_committed_decree > latest.last_committed_decree && secondary.last_committed_decree >= pinfo.config.last_committed_decree) { - return secondary.hp_node; + return secondary_node; } } @@ -293,43 +305,49 @@ bool ddd_diagnose(command_executor *e, shell_context *sc, arguments args) out << " config: ballot(" << pinfo.config.ballot << "), " << "last_committed(" << pinfo.config.last_committed_decree << ")" << std::endl; out << " ----" << std::endl; - dsn::host_port latest_dropped, secondary_latest_dropped; - if (pinfo.config.hp_last_drops.size() > 0) { - latest_dropped = pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 1]; + + std::vector last_drops; + GET_HOST_PORTS(pinfo.config, last_drops, last_drops); + + dsn::host_port latest_dropped; + dsn::host_port secondary_latest_dropped; + if (!last_drops.empty()) { + latest_dropped = last_drops[last_drops.size() - 1]; } - if (pinfo.config.hp_last_drops.size() > 1) { - secondary_latest_dropped = - pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 2]; + if (last_drops.size() > 1) { + secondary_latest_dropped = last_drops[last_drops.size() - 2]; } int j = 0; for (const dsn::replication::ddd_node_info &n : pinfo.dropped) { - dsn::host_port hp_node; - GET_HOST_PORT(n, node, hp_node); + dsn::host_port node; + GET_HOST_PORT(n, node, node); char time_buf[30] = {0}; ::dsn::utils::time_ms_to_string(n.drop_time_ms, time_buf); out << " dropped[" << j++ << "]: " - << "node(" << hp_node << "), " + << "node(" << node << "), " << "drop_time(" << time_buf << "), " << "alive(" << (n.is_alive ? "true" : "false") << "), " << "collected(" << (n.is_collected ? "true" : "false") << "), " << "ballot(" << n.ballot << "), " << "last_committed(" << n.last_committed_decree << "), " << "last_prepared(" << n.last_prepared_decree << ")"; - if (hp_node == latest_dropped) + if (node == latest_dropped) { out << " <== the latest"; - else if (hp_node == secondary_latest_dropped) + } else if (node == secondary_latest_dropped) { out << " <== the secondary latest"; + } out << std::endl; } out << " ----" << std::endl; j = 0; - for (const auto &r : pinfo.config.hp_last_drops) { + for (const auto &r : last_drops) { out << " last_drops[" << j++ << "]: " << "node(" << r.to_string() << ")"; - if (j == (int)pinfo.config.hp_last_drops.size() - 1) + if (j == static_cast(last_drops.size()) - 1) { out << " <== the secondary latest"; - else if (j == (int)pinfo.config.hp_last_drops.size()) + } else if (j == static_cast(last_drops.size())) { out << " <== the latest"; + } out << std::endl; } out << " ----" << std::endl; diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 957225b12f..c5ba138cd7 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -341,11 +341,17 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) double disk_used_for_all_replicas = 0; int all_replicas_count = 0; for (const auto &pc : pcs) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + std::string primary_str("-"); - if (pc.hp_primary) { + if (primary) { bool disk_found = false; double disk_value = 0; - auto f1 = disk_map.find(pc.hp_primary); + auto f1 = disk_map.find(primary); if (f1 != disk_map.end()) { auto &sub_map = f1->second; auto f2 = sub_map.find(pc.pid.get_partition_index()); @@ -360,7 +366,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } bool count_found = false; double count_value = 0; - auto f3 = count_map.find(pc.hp_primary); + auto f3 = count_map.find(primary); if (f3 != count_map.end()) { auto &sub_map = f3->second; auto f4 = sub_map.find(pc.pid.get_partition_index()); @@ -372,7 +378,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) // TODO(wangdan): refactor as format style. std::stringstream oss; - oss << pc.hp_primary.resolve(resolve_ip) << "("; + oss << primary.resolve(resolve_ip) << "("; if (disk_found) { oss << disk_value; } else { @@ -391,12 +397,12 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) { std::stringstream oss; oss << "["; - for (int j = 0; j < pc.hp_secondaries.size(); j++) { + for (int j = 0; j < secondaries.size(); j++) { if (j != 0) oss << ","; bool found = false; double value = 0; - auto f1 = disk_map.find(pc.hp_secondaries[j]); + auto f1 = disk_map.find(secondaries[j]); if (f1 != disk_map.end()) { auto &sub_map = f1->second; auto f2 = sub_map.find(pc.pid.get_partition_index()); @@ -409,7 +415,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } bool count_found = false; double count_value = 0; - auto f3 = count_map.find(pc.hp_secondaries[j]); + auto f3 = count_map.find(secondaries[j]); if (f3 != count_map.end()) { auto &sub_map = f3->second; auto f3 = sub_map.find(pc.pid.get_partition_index()); @@ -420,7 +426,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } // TODO(wangdan): refactor as format style. - oss << pc.hp_secondaries[j].resolve(resolve_ip) << "("; + oss << secondaries[j].resolve(resolve_ip) << "("; if (found) { oss << value; } else { @@ -441,8 +447,8 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) if (detailed) { tp_details.add_row(std::to_string(pc.pid.get_partition_index())); tp_details.append_data(pc.ballot); - tp_details.append_data(fmt::format( - "{}/{}", pc.hp_secondaries.size() + (pc.hp_primary ? 1 : 0), pc.max_replica_count)); + tp_details.append_data( + fmt::format("{}/{}", secondaries.size() + (primary ? 1 : 0), pc.max_replica_count)); tp_details.append_data(primary_str); tp_details.append_data(secondary_str); }