Skip to content

Commit 4a4453e

Browse files
committed
update
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
1 parent 3bfe0c1 commit 4a4453e

9 files changed

Lines changed: 79 additions & 51 deletions

src/ray/common/event_memory_monitor.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ void EventMemoryMonitor::MonitoringThreadMain() {
192192

193193
if (high_modified && IsEnabled()) {
194194
Disable();
195-
kill_workers_callback_(
196-
MemoryMonitorUtils::TakeSystemMemorySnapshot(cgroup_path_));
195+
kill_workers_callback_(MemoryMonitorUtils::TakeSystemMemorySnapshot(cgroup_path_),
196+
MemoryMonitorInterface::kNull);
197197
}
198198
} else {
199199
RAY_LOG(ERROR) << absl::StrFormat(

src/ray/common/memory_monitor_interface.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ struct SystemMemorySnapshot {
4848
using ProcessesMemorySnapshot = absl::flat_hash_map<pid_t, int64_t>;
4949

5050
/**
51-
* @brief Callback that runs at each monitoring interval.
51+
* @brief Callback invoked by the memory monitor when worker killing should be triggered.
5252
*
53-
* \param system_memory snapshot of system memory information.
53+
* \param system_memory snapshot of system memory information at the time the monitor
54+
* decided to fire.
55+
* \param threshold_bytes the byte threshold against which the monitor evaluated
56+
* \p system_memory.used_bytes. For monitors that are event-driven rather than
57+
* threshold-driven (e.g. cgroup memory.events / PSI) this should be
58+
* MemoryMonitorInterface::kNull to indicate no single byte threshold applies.
5459
*/
55-
using KillWorkersCallback = std::function<void(SystemMemorySnapshot system_memory)>;
60+
using KillWorkersCallback =
61+
std::function<void(SystemMemorySnapshot system_memory, int64_t threshold_bytes)>;
5662

5763
/**
5864
* @brief implementations of this interface monitors the memory usage of the node

src/ray/common/pressure_memory_monitor.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ void PressureMemoryMonitor::MonitoringThreadMain() {
161161
if (fds[0].revents & POLLPRI) {
162162
if (IsEnabled()) {
163163
Disable();
164-
kill_workers_callback_(
165-
MemoryMonitorUtils::TakeSystemMemorySnapshot(cgroup_path_));
164+
kill_workers_callback_(MemoryMonitorUtils::TakeSystemMemorySnapshot(cgroup_path_),
165+
MemoryMonitorInterface::kNull);
166166
}
167167
} else if (fds[0].revents & POLLERR) {
168168
RAY_LOG(ERROR) << "Got POLLERR while monitoring memory pressure. "

src/ray/common/tests/event_memory_monitor_test.cc

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ TEST_F(EventMemoryMonitorTest, TestNonexistentCgroupPathFailsGracefully) {
6767
std::string nonexistent_path = "/nonexistent/cgroup/path";
6868
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
6969
EventMemoryMonitor::Create(std::move(nonexistent_path),
70-
[](SystemMemorySnapshot) {});
70+
[](SystemMemorySnapshot, int64_t) {});
7171

7272
ASSERT_TRUE(result.has_error())
7373
<< "Failed to catch invalid cgroup path when creating EventMemoryMonitor";
@@ -79,7 +79,8 @@ TEST_F(EventMemoryMonitorTest, TestMissingMemoryEventsFileFailsGracefully) {
7979
RAY_CHECK(empty_dir_or.ok()) << empty_dir_or.status().ToString();
8080
std::unique_ptr<TempDirectory> empty_dir = std::move(empty_dir_or.value());
8181
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
82-
EventMemoryMonitor::Create(empty_dir->GetPath(), [](SystemMemorySnapshot) {});
82+
EventMemoryMonitor::Create(empty_dir->GetPath(),
83+
[](SystemMemorySnapshot, int64_t) {});
8384

8485
ASSERT_TRUE(result.has_error())
8586
<< "Failed to catch invalid cgroup configuration when creating EventMemoryMonitor";
@@ -89,7 +90,7 @@ TEST_F(EventMemoryMonitorTest, TestMissingMemoryEventsFileFailsGracefully) {
8990
TEST_F(EventMemoryMonitorTest, TestSuccessfulCreationWithValidPath) {
9091
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
9192
EventMemoryMonitor::Create(mock_cgroup_dir_->GetPath(),
92-
[](SystemMemorySnapshot) {});
93+
[](SystemMemorySnapshot, int64_t) {});
9394
ASSERT_TRUE(result.has_value())
9495
<< "Failed to create EventMemoryMonitor: " << result.message();
9596
std::unique_ptr<EventMemoryMonitor> monitor = std::move(result.value());
@@ -100,7 +101,11 @@ TEST_F(EventMemoryMonitorTest, TestCallbackCalledWhenHighEventChanges) {
100101
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
101102

102103
auto callback_latch = std::make_shared<boost::latch>(1);
103-
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot) {
104+
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot,
105+
int64_t threshold_bytes) {
106+
EXPECT_EQ(threshold_bytes, MemoryMonitorInterface::kNull)
107+
<< "EventMemoryMonitor is event-driven so it should report kNull as the "
108+
"threshold bytes.";
104109
callback_latch->count_down();
105110
};
106111

@@ -120,7 +125,7 @@ TEST_F(EventMemoryMonitorTest, TestNoCallbackWhenValuesUnchanged) {
120125
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
121126

122127
auto callback_latch = std::make_shared<boost::latch>(1);
123-
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot) {
128+
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot, int64_t) {
124129
callback_latch->count_down();
125130
};
126131

@@ -139,7 +144,7 @@ TEST_F(EventMemoryMonitorTest, TestNoCallbackWhenIrrelevantEventChanges) {
139144
WriteMemoryEventsFile(events_file_->GetPath(), 0, 0);
140145

141146
auto callback_latch = std::make_shared<boost::latch>(1);
142-
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot) {
147+
KillWorkersCallback callback = [callback_latch](SystemMemorySnapshot, int64_t) {
143148
callback_latch->count_down();
144149
};
145150

@@ -161,17 +166,17 @@ TEST_F(EventMemoryMonitorTest, TestMultipleCallbacksOnMultipleChanges) {
161166
auto latch2 = std::make_shared<boost::latch>(1);
162167
auto latch3 = std::make_shared<boost::latch>(1);
163168
std::atomic<int> callback_count{0};
164-
KillWorkersCallback callback =
165-
[&callback_count, latch1, latch2, latch3](SystemMemorySnapshot) {
166-
int count = ++callback_count;
167-
if (count == 1) {
168-
latch1->count_down();
169-
} else if (count == 2) {
170-
latch2->count_down();
171-
} else if (count == 3) {
172-
latch3->count_down();
173-
}
174-
};
169+
KillWorkersCallback callback = [&callback_count, latch1, latch2, latch3](
170+
SystemMemorySnapshot, int64_t) {
171+
int count = ++callback_count;
172+
if (count == 1) {
173+
latch1->count_down();
174+
} else if (count == 2) {
175+
latch2->count_down();
176+
} else if (count == 3) {
177+
latch3->count_down();
178+
}
179+
};
175180

176181
StatusSetOr<std::unique_ptr<EventMemoryMonitor>, StatusT::IOError> result =
177182
EventMemoryMonitor::Create(mock_cgroup_dir_->GetPath(), callback);

src/ray/common/tests/pressure_memory_monitor_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ TEST_F(PressureMemoryMonitorTest, TestNonexistentCgroupPathFailsGracefully) {
100100
MemoryPsi psi = {.mode = "some", .stall_proportion = 0.5f, .stall_duration_s = 2};
101101
std::string nonexistent_path = "/nonexistent/cgroup/path";
102102
auto result = PressureMemoryMonitor::Create(
103-
psi, std::move(nonexistent_path), [](const SystemMemorySnapshot &) {});
103+
psi, std::move(nonexistent_path), [](SystemMemorySnapshot, int64_t) {});
104104

105105
ASSERT_TRUE(result.has_error())
106106
<< "Failed to catch invalid cgroup path when creating PressureMemoryMonitor";
@@ -111,7 +111,7 @@ TEST_F(PressureMemoryMonitorTest, TestMonitorCreationWritesTriggerStringToFile)
111111
MemoryPsi psi = {.mode = "some", .stall_proportion = 0.5f, .stall_duration_s = 2};
112112

113113
auto result = PressureMemoryMonitor::Create(
114-
psi, mock_cgroup_dir_->GetPath(), [](const SystemMemorySnapshot &) {});
114+
psi, mock_cgroup_dir_->GetPath(), [](SystemMemorySnapshot, int64_t) {});
115115
ASSERT_TRUE(result.has_value())
116116
<< "Failed to create PressureMemoryMonitor: " << result.message();
117117

@@ -164,7 +164,7 @@ TEST_F(PressureMemoryMonitorTest,
164164
close(listener);
165165

166166
std::shared_ptr<boost::latch> has_called_once = std::make_shared<boost::latch>(1);
167-
auto kill_workers_callback = [has_called_once](const SystemMemorySnapshot &) {
167+
auto kill_workers_callback = [has_called_once](SystemMemorySnapshot, int64_t) {
168168
has_called_once->count_down();
169169
};
170170
std::unique_ptr<PressureMemoryMonitor> monitor =

src/ray/common/tests/threshold_memory_monitor_test.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,16 @@ TEST_F(ThresholdMemoryMonitorTest, TestMonitorTriggerCanDetectMemoryUsage) {
5454
MakeThresholdMemoryMonitor(
5555
0 /*memory_usage_threshold_bytes*/,
5656
1 /*refresh_interval_ms*/,
57-
[has_checked_once](SystemMemorySnapshot system_memory) {
57+
[has_checked_once](SystemMemorySnapshot system_memory, int64_t threshold_bytes) {
5858
ASSERT_GT(system_memory.total_bytes, 0)
5959
<< "Reported total bytes from cgroup is <= 0. Is the system memory snapshot "
6060
"taken correctly?";
6161
ASSERT_GE(system_memory.used_bytes, 0)
6262
<< "Reported used bytes from cgroup is < 0. Is the system memory snapshot "
6363
"taken correctly?";
64+
ASSERT_EQ(threshold_bytes, 0)
65+
<< "Threshold bytes propagated to the callback should match the value the "
66+
"monitor was constructed with.";
6467
has_checked_once->count_down();
6568
},
6669
"" /*root_cgroup_path*/);
@@ -86,10 +89,14 @@ TEST_F(ThresholdMemoryMonitorTest,
8689
MakeThresholdMemoryMonitor(
8790
memory_usage_threshold_bytes, // (70%)
8891
1 /*refresh_interval_ms*/,
89-
[has_checked_once, cgroup_total_bytes](SystemMemorySnapshot system_memory) {
92+
[has_checked_once, cgroup_total_bytes, memory_usage_threshold_bytes](
93+
SystemMemorySnapshot system_memory, int64_t threshold_bytes) {
9094
ASSERT_EQ(system_memory.total_bytes, cgroup_total_bytes)
9195
<< "Unexpected total bytes read from cgroup. Are we correctly reading memory "
9296
"from the cgroup?";
97+
ASSERT_EQ(threshold_bytes, memory_usage_threshold_bytes)
98+
<< "Threshold bytes propagated to the callback should match the value the "
99+
"monitor was constructed with.";
93100
has_checked_once->count_down();
94101
},
95102
cgroup_dir /*root_cgroup_path*/);
@@ -117,7 +124,7 @@ TEST_F(ThresholdMemoryMonitorTest,
117124
MakeThresholdMemoryMonitor(
118125
memory_usage_threshold_bytes, // (70%)
119126
1 /*refresh_interval_ms*/,
120-
[callback_triggered](SystemMemorySnapshot system_memory) {
127+
[callback_triggered](SystemMemorySnapshot system_memory, int64_t threshold_bytes) {
121128
callback_triggered->store(true);
122129
},
123130
cgroup_dir /*root_cgroup_path*/);

src/ray/common/threshold_memory_monitor.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ ThresholdMemoryMonitor::ThresholdMemoryMonitor(KillWorkersCallback kill_workers_
6060

6161
if (is_usage_above_threshold && IsEnabled()) {
6262
Disable();
63-
kill_workers_callback_(std::move(cur_memory_snapshot));
63+
kill_workers_callback_(std::move(cur_memory_snapshot),
64+
memory_usage_threshold_bytes_);
6465
}
6566
},
6667
monitor_interval_ms,

src/ray/raylet/node_manager.cc

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3048,9 +3048,9 @@ std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage(
30483048

30493049
// Picks the workers and kills the process if the memory usage is above the threshold.
30503050
KillWorkersCallback NodeManager::CreateKillWorkersCallback() {
3051-
return [this](SystemMemorySnapshot system_memory_snapshot) {
3051+
return [this](SystemMemorySnapshot system_memory_snapshot, int64_t threshold_bytes) {
30523052
io_service_.post(
3053-
[this, system_memory = std::move(system_memory_snapshot)]() {
3053+
[this, system_memory = std::move(system_memory_snapshot), threshold_bytes]() {
30543054
ProcessesMemorySnapshot process_memory_snapshot =
30553055
MemoryMonitorUtils::TakePerProcessMemorySnapshot();
30563056
std::vector<std::shared_ptr<WorkerInterface>> workers =
@@ -3074,25 +3074,13 @@ KillWorkersCallback NodeManager::CreateKillWorkersCallback() {
30743074
return;
30753075
}
30763076

3077-
// Compute the memory usage threshold
3078-
int64_t total_memory_bytes = system_memory.total_bytes;
3079-
int64_t computed_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold(
3080-
total_memory_bytes,
3081-
RayConfig::instance().memory_usage_threshold(),
3082-
RayConfig::instance().min_memory_free_bytes(),
3083-
initial_config_.enable_resource_isolation,
3084-
*cgroup_manager_);
3085-
float computed_threshold_fraction =
3086-
static_cast<float>(computed_threshold_bytes) /
3087-
static_cast<float>(total_memory_bytes);
3088-
30893077
std::string oom_kill_details = CreateOomKillMessageDetails(
30903078
workers_to_kill_and_should_retry,
30913079
self_node_id_,
30923080
system_memory,
30933081
store_client_->GetMemoryUsage().value_or("Not available"),
30943082
process_memory_snapshot,
3095-
computed_threshold_fraction);
3083+
threshold_bytes);
30963084
std::string oom_kill_suggestions =
30973085
CreateOomKillMessageSuggestions(workers_to_kill_and_should_retry);
30983086

@@ -3162,7 +3150,7 @@ std::string NodeManager::CreateOomKillMessageDetails(
31623150
const SystemMemorySnapshot &system_memory_snapshot,
31633151
const std::string &object_store_memory_usage,
31643152
const ProcessesMemorySnapshot &process_memory_snapshot,
3165-
float usage_threshold) const {
3153+
int64_t threshold_bytes) const {
31663154
if (workers_to_kill.empty()) {
31673155
return "";
31683156
}
@@ -3175,6 +3163,25 @@ std::string NodeManager::CreateOomKillMessageDetails(
31753163
"%.2f",
31763164
static_cast<float>(system_memory_snapshot.total_bytes) / 1024 / 1024 / 1024);
31773165

3166+
// Render the threshold. For threshold-based monitors `threshold_bytes` is the
3167+
// exact byte value the monitor compared against. For event- or pressure-driven
3168+
// monitors (kNull) there is no single byte threshold to report, so we fall back
3169+
// to describing the triggering signal instead.
3170+
std::string threshold_str;
3171+
if (threshold_bytes == MemoryMonitorInterface::kNull) {
3172+
threshold_str = "cgroup memory event/pressure signal (no fixed threshold)";
3173+
} else {
3174+
float threshold_fraction =
3175+
system_memory_snapshot.total_bytes > 0
3176+
? static_cast<float>(threshold_bytes) /
3177+
static_cast<float>(system_memory_snapshot.total_bytes)
3178+
: 0.0f;
3179+
threshold_str =
3180+
absl::StrFormat("%.2fGB (%f)",
3181+
static_cast<float>(threshold_bytes) / 1024 / 1024 / 1024,
3182+
threshold_fraction);
3183+
}
3184+
31783185
const auto &first_worker = workers_to_kill.front().first;
31793186
std::string node_ip = first_worker->IpAddress();
31803187

@@ -3221,7 +3228,7 @@ std::string NodeManager::CreateOomKillMessageDetails(
32213228

32223229
return absl::StrFormat(
32233230
"Memory on the node (IP: %s, ID: %s) was %sGB / %sGB (%f), "
3224-
"which exceeds the memory usage threshold of %f; "
3231+
"which exceeds the memory usage threshold of %s; "
32253232
"Object store memory usage: [%s]; "
32263233
"Ray killed %d worker(s) based on the killing policy: "
32273234
"[%s]; "
@@ -3233,7 +3240,7 @@ std::string NodeManager::CreateOomKillMessageDetails(
32333240
used_bytes_gb,
32343241
total_bytes_gb,
32353242
usage_fraction,
3236-
usage_threshold,
3243+
threshold_str,
32373244
absl::StrReplaceAll(object_store_memory_usage, {{"\n", "; "}}),
32383245
workers_to_kill.size(),
32393246
absl::StrJoin(worker_details, "; "),

src/ray/raylet/node_manager.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
772772
* @param node_id The ID of the node.
773773
* @param system_memory_snapshot The snapshot of the system memory.
774774
* @param process_memory_snapshot The snapshot of the process memory.
775-
* @param usage_threshold The memory limit.
775+
* @param threshold_bytes The byte threshold used by the memory monitor to decide
776+
* that the kill should fire. Pass MemoryMonitorInterface::kNull if the
777+
* monitor is event-driven and has no single byte threshold.
776778
* @return The detail message for the workers that are killed due to memory running low.
777779
*/
778780
std::string CreateOomKillMessageDetails(
@@ -782,7 +784,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
782784
const SystemMemorySnapshot &system_memory_snapshot,
783785
const std::string &object_store_memory_usage,
784786
const ProcessesMemorySnapshot &process_memory_snapshot,
785-
float usage_threshold) const;
787+
int64_t threshold_bytes) const;
786788

787789
/**
788790
* @param workers_to_kill The workers to print the kill suggestions for.

0 commit comments

Comments
 (0)