Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TProtocol;
} else { \
_target.reserve(_obj.field.size()); \
for (const auto &addr : _obj.field) { \
_target.emplace_back(host_port::from_address(addr)); \
_target.emplace_back(dsn::host_port::from_address(addr)); \
} \
} \
} while (0)
Expand Down
33 changes: 24 additions & 9 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,10 @@ inline std::unique_ptr<aggregate_stats_calcs> create_table_aggregate_stats_calcs
row.app_id);

for (const auto &pc : iter->second) {
if (pc.hp_primary != node) {
dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);

if (primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}
Expand Down Expand Up @@ -1723,7 +1726,10 @@ create_partition_aggregate_stats_calcs(const int32_t table_id,
partition_stat_map increases;
partition_stat_map rates;
for (size_t i = 0; i < rows.size(); ++i) {
if (pcs[i].hp_primary != node) {
dsn::host_port primary;
GET_HOST_PORT(pcs[i], primary, primary);

if (primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}
Expand Down Expand Up @@ -1979,12 +1985,16 @@ inline bool get_app_partition_stat(shell_context *sc,
m.name, app_id_x, partition_index_x, counter_name)) {
// only primary partition will be counted
const auto find = pcs_by_appid.find(app_id_x);
if (find != pcs_by_appid.end() &&
find->second[partition_index_x].hp_primary == nodes[i].hp) {
row_data &row = rows[app_id_name[app_id_x]][partition_index_x];
row.row_name = std::to_string(partition_index_x);
row.app_id = app_id_x;
update_app_pegasus_perf_counter(row, counter_name, m.value);
if (find != pcs_by_appid.end()) {
dsn::host_port primary;
GET_HOST_PORT(find->second[partition_index_x], primary, primary);

if (primary == nodes[i].hp) {
row_data &row = rows[app_id_name[app_id_x]][partition_index_x];
row.row_name = std::to_string(partition_index_x);
row.app_id = app_id_x;
update_app_pegasus_perf_counter(row, counter_name, m.value);
}
}
} else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) {
// if the app_name from perf-counter isn't existed(maybe the app was dropped), it
Expand Down Expand Up @@ -2233,8 +2243,13 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s
if (find == pcs_by_appid.end()) // app id not found
continue;
auto &pc = find->second[partition_index_x];
if (pc.hp_primary != nodes[i].hp) // not primary replica

dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);

if (primary != nodes[i].hp) { // not primary replica
continue;
}
if (pc.partition_flags != 0) // already calculated
continue;
pc.partition_flags = 1;
Expand Down
17 changes: 13 additions & 4 deletions src/shell/commands/data_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,10 @@ create_rdb_estimated_keys_stats_calcs(const int32_t table_id,

partition_stat_map sums;
for (size_t i = 0; i < rows.size(); ++i) {
if (pcs[i].hp_primary != node) {
dsn::host_port primary;
GET_HOST_PORT(pcs[i], primary, primary);

if (primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}
Expand Down Expand Up @@ -2885,9 +2888,15 @@ bool calculate_hash_value(command_executor *e, shell_context *sc, arguments args
tp.add_row_name_and_data("partition_index", partition_index);
if (pcs.size() > partition_index) {
const auto &pc = pcs[partition_index];
tp.add_row_name_and_data("primary", pc.hp_primary.to_string());
tp.add_row_name_and_data("secondaries",
fmt::format("{}", fmt::join(pc.hp_secondaries, ",")));

dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);

tp.add_row_name_and_data("primary", primary.to_string());
tp.add_row_name_and_data("secondaries", fmt::format("{}", fmt::join(secondaries, ",")));
}
}
tp.output(std::cout);
Expand Down
12 changes: 9 additions & 3 deletions src/shell/commands/node_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,13 +556,19 @@ bool ls_nodes(command_executor *, shell_context *sc, arguments args)
}

for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);

if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
Expand Down
68 changes: 43 additions & 25 deletions src/shell/commands/recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,23 @@ bool recover(command_executor *e, shell_context *sc, arguments args)

dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pinfo)
{
if (pinfo.config.hp_last_drops.size() < 2) {
std::vector<dsn::host_port> last_drops;
GET_HOST_PORTS(pinfo.config, last_drops, last_drops);

if (last_drops.size() < 2) {
return dsn::host_port();
}

std::vector<dsn::host_port> last_two_nodes(pinfo.config.hp_last_drops.end() - 2,
pinfo.config.hp_last_drops.end());
std::vector<dsn::host_port> last_two_nodes(last_drops.end() - 2, last_drops.end());
std::vector<dsn::replication::ddd_node_info> last_dropped;
for (const auto &node : last_two_nodes) {
const auto it = std::find_if(
pinfo.dropped.begin(),
pinfo.dropped.end(),
[&node](const dsn::replication::ddd_node_info &r) { return r.hp_node == node; });
const auto it = std::find_if(pinfo.dropped.begin(),
pinfo.dropped.end(),
[&node](const dsn::replication::ddd_node_info &r) {
dsn::host_port hp_node_from_dropped;
GET_HOST_PORT(r, node, hp_node_from_dropped);
return hp_node_from_dropped == node;
});
if (it->is_alive && it->is_collected) {
last_dropped.push_back(*it);
}
Expand All @@ -186,7 +191,9 @@ dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pi
if (last_dropped.size() == 1) {
const auto &ninfo = last_dropped.back();
if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree) {
return ninfo.hp_node;
dsn::host_port node;
GET_HOST_PORT(ninfo, node, node);
return node;
}
} else if (last_dropped.size() == 2) {
const auto &secondary = last_dropped.front();
Expand All @@ -196,19 +203,24 @@ dsn::host_port diagnose_recommend(const dsn::replication::ddd_partition_info &pi
// - choose the node with the largest last committed decree
// - if last committed decree is the same, choose node with the largest ballot

dsn::host_port latest_node;
dsn::host_port secondary_node;
GET_HOST_PORT(latest, node, latest_node);
GET_HOST_PORT(secondary, node, secondary_node);

if (latest.last_committed_decree == secondary.last_committed_decree &&
latest.last_committed_decree >= pinfo.config.last_committed_decree) {
return latest.ballot >= secondary.ballot ? latest.hp_node : secondary.hp_node;
return latest.ballot >= secondary.ballot ? latest_node : secondary_node;
}

if (latest.last_committed_decree > secondary.last_committed_decree &&
latest.last_committed_decree >= pinfo.config.last_committed_decree) {
return latest.hp_node;
return latest_node;
}

if (secondary.last_committed_decree > latest.last_committed_decree &&
secondary.last_committed_decree >= pinfo.config.last_committed_decree) {
return secondary.hp_node;
return secondary_node;
}
}

Expand Down Expand Up @@ -293,43 +305,49 @@ bool ddd_diagnose(command_executor *e, shell_context *sc, arguments args)
out << " config: ballot(" << pinfo.config.ballot << "), "
<< "last_committed(" << pinfo.config.last_committed_decree << ")" << std::endl;
out << " ----" << std::endl;
dsn::host_port latest_dropped, secondary_latest_dropped;
if (pinfo.config.hp_last_drops.size() > 0) {
latest_dropped = pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 1];

std::vector<dsn::host_port> last_drops;
GET_HOST_PORTS(pinfo.config, last_drops, last_drops);

dsn::host_port latest_dropped;
dsn::host_port secondary_latest_dropped;
if (!last_drops.empty()) {
latest_dropped = last_drops[last_drops.size() - 1];
}
if (pinfo.config.hp_last_drops.size() > 1) {
secondary_latest_dropped =
pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 2];
if (last_drops.size() > 1) {
secondary_latest_dropped = last_drops[last_drops.size() - 2];
}
int j = 0;
for (const dsn::replication::ddd_node_info &n : pinfo.dropped) {
dsn::host_port hp_node;
GET_HOST_PORT(n, node, hp_node);
dsn::host_port node;
GET_HOST_PORT(n, node, node);
char time_buf[30] = {0};
::dsn::utils::time_ms_to_string(n.drop_time_ms, time_buf);
out << " dropped[" << j++ << "]: "
<< "node(" << hp_node << "), "
<< "node(" << node << "), "
<< "drop_time(" << time_buf << "), "
<< "alive(" << (n.is_alive ? "true" : "false") << "), "
<< "collected(" << (n.is_collected ? "true" : "false") << "), "
<< "ballot(" << n.ballot << "), "
<< "last_committed(" << n.last_committed_decree << "), "
<< "last_prepared(" << n.last_prepared_decree << ")";
if (hp_node == latest_dropped)
if (node == latest_dropped) {
out << " <== the latest";
else if (hp_node == secondary_latest_dropped)
} else if (node == secondary_latest_dropped) {
out << " <== the secondary latest";
}
out << std::endl;
}
out << " ----" << std::endl;
j = 0;
for (const auto &r : pinfo.config.hp_last_drops) {
for (const auto &r : last_drops) {
out << " last_drops[" << j++ << "]: "
<< "node(" << r.to_string() << ")";
if (j == (int)pinfo.config.hp_last_drops.size() - 1)
if (j == static_cast<int>(last_drops.size()) - 1) {
out << " <== the secondary latest";
else if (j == (int)pinfo.config.hp_last_drops.size())
} else if (j == static_cast<int>(last_drops.size())) {
out << " <== the latest";
}
out << std::endl;
}
out << " ----" << std::endl;
Expand Down
26 changes: 16 additions & 10 deletions src/shell/commands/table_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,17 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
double disk_used_for_all_replicas = 0;
int all_replicas_count = 0;
for (const auto &pc : pcs) {
dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);

std::string primary_str("-");
if (pc.hp_primary) {
if (primary) {
bool disk_found = false;
double disk_value = 0;
auto f1 = disk_map.find(pc.hp_primary);
auto f1 = disk_map.find(primary);
if (f1 != disk_map.end()) {
auto &sub_map = f1->second;
auto f2 = sub_map.find(pc.pid.get_partition_index());
Expand All @@ -360,7 +366,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
}
bool count_found = false;
double count_value = 0;
auto f3 = count_map.find(pc.hp_primary);
auto f3 = count_map.find(primary);
if (f3 != count_map.end()) {
auto &sub_map = f3->second;
auto f4 = sub_map.find(pc.pid.get_partition_index());
Expand All @@ -372,7 +378,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)

// TODO(wangdan): refactor as format style.
std::stringstream oss;
oss << pc.hp_primary.resolve(resolve_ip) << "(";
oss << primary.resolve(resolve_ip) << "(";
if (disk_found) {
oss << disk_value;
} else {
Expand All @@ -391,12 +397,12 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
{
std::stringstream oss;
oss << "[";
for (int j = 0; j < pc.hp_secondaries.size(); j++) {
for (int j = 0; j < secondaries.size(); j++) {
if (j != 0)
oss << ",";
bool found = false;
double value = 0;
auto f1 = disk_map.find(pc.hp_secondaries[j]);
auto f1 = disk_map.find(secondaries[j]);
if (f1 != disk_map.end()) {
auto &sub_map = f1->second;
auto f2 = sub_map.find(pc.pid.get_partition_index());
Expand All @@ -409,7 +415,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
}
bool count_found = false;
double count_value = 0;
auto f3 = count_map.find(pc.hp_secondaries[j]);
auto f3 = count_map.find(secondaries[j]);
if (f3 != count_map.end()) {
auto &sub_map = f3->second;
auto f3 = sub_map.find(pc.pid.get_partition_index());
Expand All @@ -420,7 +426,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
}

// TODO(wangdan): refactor as format style.
oss << pc.hp_secondaries[j].resolve(resolve_ip) << "(";
oss << secondaries[j].resolve(resolve_ip) << "(";
if (found) {
oss << value;
} else {
Expand All @@ -441,8 +447,8 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
if (detailed) {
tp_details.add_row(std::to_string(pc.pid.get_partition_index()));
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format(
"{}/{}", pc.hp_secondaries.size() + (pc.hp_primary ? 1 : 0), pc.max_replica_count));
tp_details.append_data(
fmt::format("{}/{}", secondaries.size() + (primary ? 1 : 0), pc.max_replica_count));
tp_details.append_data(primary_str);
tp_details.append_data(secondary_str);
}
Expand Down
Loading