Skip to content

Commit d3e631d

Browse files
committed
update
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
1 parent 2fa47df commit d3e631d

9 files changed

Lines changed: 50 additions & 41 deletions

src/ray/common/event_memory_monitor.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ void EventMemoryMonitor::MonitoringThreadMain() {
192192

193193
if (high_modified && IsEnabled()) {
194194
Disable();
195-
kill_workers_callback_();
195+
kill_workers_callback_(
196+
"cgroup memory.events 'high' counter increased (event-driven)");
196197
}
197198
} else {
198199
RAY_LOG(ERROR) << absl::StrFormat(

src/ray/common/memory_monitor_interface.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <cstdint>
2020
#include <functional>
2121
#include <ostream>
22+
#include <string>
2223

2324
#include "absl/container/flat_hash_map.h"
2425
#include "ray/util/compat.h"
@@ -49,8 +50,10 @@ using ProcessesMemorySnapshot = absl::flat_hash_map<pid_t, int64_t>;
4950

5051
/**
5152
* @brief Callback to trigger worker oom killing when under memory pressure.
53+
* @param trigger_reason A human-readable description of why the monitor triggered
54+
* the kill (e.g. threshold exceeded, cgroup event, PSI pressure).
5255
*/
53-
using KillWorkersCallback = std::function<void()>;
56+
using KillWorkersCallback = std::function<void(const std::string &trigger_reason)>;
5457

5558
/**
5659
* @brief implementations of this interface monitors the memory usage of the node

src/ray/common/pressure_memory_monitor.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ void PressureMemoryMonitor::MonitoringThreadMain() {
161161
if (fds[0].revents & POLLPRI) {
162162
if (IsEnabled()) {
163163
Disable();
164-
kill_workers_callback_();
164+
kill_workers_callback_(
165+
"cgroup memory.pressure PSI trigger fired (pressure-driven)");
165166
}
166167
} else if (fds[0].revents & POLLERR) {
167168
RAY_LOG(ERROR) << "Got POLLERR while monitoring memory pressure. "

src/ray/common/tests/event_memory_monitor_test.cc

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ TEST_F(EventMemoryMonitorTest, TestCallbackCalledWhenHighEventChanges) {
9898
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
9999

100100
auto callback_latch = std::make_shared<boost::latch>(1);
101-
KillWorkersCallback callback = [callback_latch]() { callback_latch->count_down(); };
101+
KillWorkersCallback callback = [callback_latch](const std::string &) {
102+
callback_latch->count_down();
103+
};
102104

103105
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
104106
EventMemoryMonitor::Create(std::move(mock_cgroup_dir_->GetPath()),
@@ -116,7 +118,9 @@ TEST_F(EventMemoryMonitorTest, TestNoCallbackWhenValuesUnchanged) {
116118
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
117119

118120
auto callback_latch = std::make_shared<boost::latch>(1);
119-
KillWorkersCallback callback = [callback_latch]() { callback_latch->count_down(); };
121+
KillWorkersCallback callback = [callback_latch](const std::string &) {
122+
callback_latch->count_down();
123+
};
120124

121125
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
122126
EventMemoryMonitor::Create(mock_cgroup_dir_->GetPath(), callback);
@@ -133,7 +137,9 @@ TEST_F(EventMemoryMonitorTest, TestNoCallbackWhenIrrelevantEventChanges) {
133137
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
134138

135139
auto callback_latch = std::make_shared<boost::latch>(1);
136-
KillWorkersCallback callback = [callback_latch]() { callback_latch->count_down(); };
140+
KillWorkersCallback callback = [callback_latch](const std::string &) {
141+
callback_latch->count_down();
142+
};
137143

138144
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
139145
EventMemoryMonitor::Create(mock_cgroup_dir_->GetPath(), callback);
@@ -153,16 +159,17 @@ TEST_F(EventMemoryMonitorTest, TestMultipleCallbacksOnMultipleChanges) {
153159
auto latch2 = std::make_shared<boost::latch>(1);
154160
auto latch3 = std::make_shared<boost::latch>(1);
155161
std::atomic<int> callback_count{0};
156-
KillWorkersCallback callback = [&callback_count, latch1, latch2, latch3]() {
157-
int count = ++callback_count;
158-
if (count == 1) {
159-
latch1->count_down();
160-
} else if (count == 2) {
161-
latch2->count_down();
162-
} else if (count == 3) {
163-
latch3->count_down();
164-
}
165-
};
162+
KillWorkersCallback callback =
163+
[&callback_count, latch1, latch2, latch3](const std::string &) {
164+
int count = ++callback_count;
165+
if (count == 1) {
166+
latch1->count_down();
167+
} else if (count == 2) {
168+
latch2->count_down();
169+
} else if (count == 3) {
170+
latch3->count_down();
171+
}
172+
};
166173

167174
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
168175
EventMemoryMonitor::Create(mock_cgroup_dir_->GetPath(), callback);

src/ray/common/tests/pressure_memory_monitor_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,9 @@ TEST_F(PressureMemoryMonitorTest,
162162
close(listener);
163163

164164
std::shared_ptr<boost::latch> has_called_once = std::make_shared<boost::latch>(1);
165-
auto kill_workers_callback = [has_called_once]() { has_called_once->count_down(); };
165+
auto kill_workers_callback = [has_called_once](const std::string &) {
166+
has_called_once->count_down();
167+
};
166168
std::unique_ptr<PressureMemoryMonitor> monitor =
167169
std::make_unique<PressureMemoryMonitor>(
168170
mock_cgroup_dir_->GetPath(), listener_fd, kill_workers_callback);

src/ray/common/tests/threshold_memory_monitor_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ TEST_F(ThresholdMemoryMonitorTest, TestMonitorTriggerCanDetectMemoryUsage) {
5454
MakeThresholdMemoryMonitor(
5555
0 /*memory_usage_threshold_bytes*/,
5656
1 /*refresh_interval_ms*/,
57-
[has_checked_once]() { has_checked_once->count_down(); },
57+
[has_checked_once](const std::string &) { has_checked_once->count_down(); },
5858
"" /*root_cgroup_path*/);
5959
has_checked_once->wait();
6060
}
@@ -78,7 +78,7 @@ TEST_F(ThresholdMemoryMonitorTest,
7878
MakeThresholdMemoryMonitor(
7979
memory_usage_threshold_bytes, // (70%)
8080
1 /*refresh_interval_ms*/,
81-
[has_checked_once]() { has_checked_once->count_down(); },
81+
[has_checked_once](const std::string &) { has_checked_once->count_down(); },
8282
cgroup_dir /*root_cgroup_path*/);
8383

8484
has_checked_once->wait();

src/ray/common/threshold_memory_monitor.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ ThresholdMemoryMonitor::ThresholdMemoryMonitor(KillWorkersCallback kill_workers_
6060

6161
if (is_usage_above_threshold && IsEnabled()) {
6262
Disable();
63-
kill_workers_callback_();
63+
std::string trigger_reason = absl::StrFormat(
64+
"Memory usage %dB exceeded threshold of %dB (%.1f%% of %dB total)",
65+
cur_memory_snapshot.used_bytes,
66+
memory_usage_threshold_bytes_,
67+
static_cast<float>(memory_usage_threshold_bytes_) /
68+
static_cast<float>(cur_memory_snapshot.total_bytes) * 100,
69+
cur_memory_snapshot.total_bytes);
70+
kill_workers_callback_(trigger_reason);
6471
}
6572
},
6673
monitor_interval_ms,

src/ray/raylet/node_manager.cc

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3068,12 +3068,12 @@ void NodeManager::ReleaseKillWorkerInProgress() {
30683068

30693069
// Picks the workers and kills the process if the memory usage is above the threshold.
30703070
KillWorkersCallback NodeManager::CreateKillWorkersCallback() {
3071-
return [this]() {
3071+
return [this](const std::string &trigger_reason) {
30723072
if (!MarkKillWorkerInProgress()) {
30733073
return;
30743074
}
30753075
io_service_.post(
3076-
[this]() {
3076+
[this, trigger_reason]() {
30773077
std::vector<std::shared_ptr<WorkerInterface>> workers =
30783078
worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true,
30793079
/* filter_io_workers */ true);
@@ -3100,25 +3100,13 @@ KillWorkersCallback NodeManager::CreateKillWorkersCallback() {
31003100
return;
31013101
}
31023102

3103-
// Compute the memory usage threshold
3104-
int64_t total_memory_bytes = system_memory_snapshot.total_bytes;
3105-
int64_t computed_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold(
3106-
total_memory_bytes,
3107-
RayConfig::instance().memory_usage_threshold(),
3108-
RayConfig::instance().min_memory_free_bytes(),
3109-
initial_config_.enable_resource_isolation,
3110-
*cgroup_manager_);
3111-
float computed_threshold_fraction =
3112-
static_cast<float>(computed_threshold_bytes) /
3113-
static_cast<float>(total_memory_bytes);
3114-
31153103
std::string oom_kill_details = CreateOomKillMessageDetails(
31163104
workers_to_kill_and_should_retry,
31173105
self_node_id_,
31183106
system_memory_snapshot,
31193107
store_client_->GetMemoryUsage().value_or("Not available"),
31203108
process_memory_snapshot,
3121-
computed_threshold_fraction);
3109+
trigger_reason);
31223110
std::string oom_kill_suggestions =
31233111
CreateOomKillMessageSuggestions(workers_to_kill_and_should_retry);
31243112

@@ -3188,7 +3176,7 @@ std::string NodeManager::CreateOomKillMessageDetails(
31883176
const SystemMemorySnapshot &system_memory_snapshot,
31893177
const std::string &object_store_memory_usage,
31903178
const ProcessesMemorySnapshot &process_memory_snapshot,
3191-
float usage_threshold) const {
3179+
const std::string &trigger_reason) const {
31923180
if (workers_to_kill.empty()) {
31933181
return "";
31943182
}
@@ -3246,8 +3234,8 @@ std::string NodeManager::CreateOomKillMessageDetails(
32463234
}
32473235

32483236
return absl::StrFormat(
3249-
"Memory on the node (IP: %s, ID: %s) was %sGB / %sGB (%f), "
3250-
"which exceeds the memory usage threshold of %f; "
3237+
"Memory on the node (IP: %s, ID: %s) was %sGB / %sGB (%f); "
3238+
"Trigger: %s; "
32513239
"Object store memory usage: [%s]; "
32523240
"Ray killed %d worker(s) based on the killing policy: "
32533241
"[%s]; "
@@ -3259,7 +3247,7 @@ std::string NodeManager::CreateOomKillMessageDetails(
32593247
used_bytes_gb,
32603248
total_bytes_gb,
32613249
usage_fraction,
3262-
usage_threshold,
3250+
trigger_reason,
32633251
absl::StrReplaceAll(object_store_memory_usage, {{"\n", "; "}}),
32643252
workers_to_kill.size(),
32653253
absl::StrJoin(worker_details, "; "),

src/ray/raylet/node_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
786786
* @param node_id The ID of the node.
787787
* @param system_memory_snapshot The snapshot of the system memory.
788788
* @param process_memory_snapshot The snapshot of the process memory.
789-
* @param usage_threshold The memory limit.
789+
* @param trigger_reason The reason the memory monitor triggered the kill.
790790
* @return The detail message for the workers that are killed due to memory running low.
791791
*/
792792
std::string CreateOomKillMessageDetails(
@@ -796,7 +796,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
796796
const SystemMemorySnapshot &system_memory_snapshot,
797797
const std::string &object_store_memory_usage,
798798
const ProcessesMemorySnapshot &process_memory_snapshot,
799-
float usage_threshold) const;
799+
const std::string &trigger_reason) const;
800800

801801
/**
802802
* @param workers_to_kill The workers to print the kill suggestions for.

0 commit comments

Comments
 (0)