Skip to content

Commit 3f3c79c

Browse files
authored
[Fix](pyudf) make Python server pool selection alive-aware and version-isolated (#62620)
`PythonServerManager` does not check if the python process corresponding to the version is alive when retrieving the process, which may cause errors like: ```text java.lang.IllegalStateException: PYTHON_UDF_BLOCKED suite=python_udf_cross_feature_import_storage scenario=python_udf_cross_feature_import_storage.inline_runtime reason=inline probe failed. reason=errCode = 2, detailMessage = (172.20.49.73)[INTERNAL_ERROR]IOError: Flight stream finish failed with gRPC code 14, message: failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/doris_python_udf_55799.sock: Connection refused ``` After modification, before obtaining the Python process, it will check if the process is alive to ensure the availability of this feature.
1 parent 17bbba4 commit 3f3c79c

9 files changed

Lines changed: 465 additions & 95 deletions

File tree

be/src/udf/python/python_server.cpp

Lines changed: 113 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <boost/asio.hpp>
2828
#include <boost/process.hpp>
2929
#include <fstream>
30+
#include <future>
3031

3132
#include "arrow/flight/client.h"
3233
#include "common/config.h"
@@ -37,32 +38,70 @@
3738

3839
namespace doris {
3940

40-
template <typename T>
41+
std::shared_ptr<PythonServerManager::VersionedProcessPool>
42+
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
43+
std::lock_guard<std::mutex> lock(_pools_mutex);
44+
auto& pool = _process_pools[version];
45+
if (!pool) {
46+
pool = std::make_shared<VersionedProcessPool>();
47+
}
48+
return pool;
49+
}
50+
51+
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
52+
PythonServerManager::_snapshot_process_pools() {
53+
std::lock_guard<std::mutex> lock(_pools_mutex);
54+
std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
55+
snapshot.reserve(_process_pools.size());
56+
for (const auto& [version, pool] : _process_pools) {
57+
snapshot.emplace_back(version, pool);
58+
}
59+
return snapshot;
60+
}
61+
62+
#ifdef BE_TEST
63+
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
64+
std::vector<ProcessPtr> processes,
65+
bool initialized) {
66+
auto versioned_pool = _get_or_create_process_pool(version);
67+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
68+
versioned_pool->processes = std::move(processes);
69+
versioned_pool->initialized = initialized;
70+
}
71+
72+
std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const PythonVersion& version) {
73+
auto versioned_pool = _get_or_create_process_pool(version);
74+
return versioned_pool->processes;
75+
}
76+
#endif
77+
78+
template <typename ClientType>
4179
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
42-
std::shared_ptr<T>* client,
80+
std::shared_ptr<ClientType>* client,
4381
const std::shared_ptr<arrow::Schema>& data_schema) {
44-
// Ensure process pool is initialized for this version
45-
RETURN_IF_ERROR(ensure_pool_initialized(version));
82+
std::shared_ptr<VersionedProcessPool> versioned_pool =
83+
DORIS_TRY(_ensure_pool_initialized(version));
4684

4785
ProcessPtr process;
48-
RETURN_IF_ERROR(get_process(version, &process));
86+
RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
4987

50-
if constexpr (std::is_same_v<T, PythonUDAFClient>) {
51-
RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
88+
if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
89+
RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
5290
} else {
53-
RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
91+
RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
5492
}
5593

5694
return Status::OK();
5795
}
5896

59-
Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version) {
60-
std::lock_guard<std::mutex> lock(_pools_mutex);
97+
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
98+
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
99+
auto versioned_pool = _get_or_create_process_pool(version);
100+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
61101

62102
// Check if already initialized
63-
if (_initialized_versions.count(version)) return Status::OK();
103+
if (versioned_pool->initialized) return versioned_pool;
64104

65-
std::vector<ProcessPtr>& pool = _process_pools[version];
66105
// 0 means use CPU core count as default, otherwise use the specified value
67106
int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
68107
: CpuInfo::num_cores();
@@ -91,7 +130,7 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version
91130
for (int i = 0; i < max_pool_size; i++) {
92131
Status s = futures[i].get();
93132
if (s.ok() && temp_processes[i]) {
94-
pool.push_back(std::move(temp_processes[i]));
133+
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
95134
success_count++;
96135
} else {
97136
failure_count++;
@@ -100,38 +139,64 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version
100139
}
101140
}
102141

103-
if (pool.empty()) {
104-
return Status::InternalError(
142+
if (versioned_pool->processes.empty()) {
143+
return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
105144
"Failed to initialize Python process pool: all {} process creation attempts failed",
106-
max_pool_size);
145+
max_pool_size));
107146
}
108147

109148
LOG(INFO) << "Python process pool initialized for version " << version.to_string()
110149
<< ": created " << success_count << " processes"
111150
<< (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "");
112151

113-
_initialized_versions.insert(version);
152+
versioned_pool->initialized = true;
114153
_start_health_check_thread();
115154

116-
return Status::OK();
155+
return versioned_pool;
117156
}
118157

119-
Status PythonServerManager::get_process(const PythonVersion& version, ProcessPtr* process) {
120-
std::lock_guard<std::mutex> lock(_pools_mutex);
121-
std::vector<ProcessPtr>& pool = _process_pools[version];
158+
Status PythonServerManager::_get_process(
159+
const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
160+
ProcessPtr* process) {
161+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
162+
std::vector<ProcessPtr>& pool = versioned_pool->processes;
122163

123164
if (UNLIKELY(pool.empty())) {
124165
return Status::InternalError("Python process pool is empty for version {}",
125166
version.to_string());
126167
}
127168

128-
// Find process with minimum load (use_count - 1 gives active client count)
129-
auto min_iter = std::min_element(
130-
pool.begin(), pool.end(),
131-
[](const ProcessPtr& a, const ProcessPtr& b) { return a.use_count() < b.use_count(); });
169+
// Prefer an already-alive process and only use load balancing inside that alive subset.
170+
// keep dead entries stay in the pool for the background health checker
171+
// unless there is no alive process left for the current request.
172+
auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
173+
[](const ProcessPtr& a, const ProcessPtr& b) {
174+
const bool a_alive = a && a->is_alive();
175+
const bool b_alive = b && b->is_alive();
176+
if (a_alive != b_alive) {
177+
return a_alive > b_alive;
178+
}
179+
return a.use_count() < b.use_count();
180+
});
181+
182+
if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
183+
*process = *min_alive_iter;
184+
return Status::OK();
185+
}
186+
187+
// Only reach here when the pool has no alive process at all. Try one foreground
188+
// recovery so the caller has a chance to proceed; leave batch repair to health check.
189+
auto& candidate = pool.front();
190+
ProcessPtr replacement;
191+
Status status = fork(version, &replacement);
192+
if (!status.ok()) {
193+
return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
194+
"Python process pool has no available process for version {}, reason: {}",
195+
version.to_string(), status.to_string());
196+
}
132197

133-
// Return process with minimum load
134-
*process = *min_iter;
198+
candidate = std::move(replacement);
199+
*process = candidate;
135200
return Status::OK();
136201
}
137202

@@ -191,6 +256,7 @@ Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* proce
191256
}
192257

193258
void PythonServerManager::_start_health_check_thread() {
259+
std::lock_guard<std::mutex> lock(_health_check_mutex);
194260
if (_health_check_thread) return;
195261

196262
LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
@@ -217,13 +283,13 @@ void PythonServerManager::_start_health_check_thread() {
217283
}
218284

219285
void PythonServerManager::_check_and_recreate_processes() {
220-
std::lock_guard<std::mutex> lock(_pools_mutex);
221-
222286
int total_checked = 0;
223287
int total_dead = 0;
224288
int total_recreated = 0;
225289

226-
for (auto& [version, pool] : _process_pools) {
290+
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
291+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
292+
auto& pool = versioned_pool->processes;
227293
for (size_t i = 0; i < pool.size(); ++i) {
228294
auto& process = pool[i];
229295
if (!process) continue;
@@ -268,15 +334,22 @@ void PythonServerManager::shutdown() {
268334
}
269335

270336
// Shutdown all processes
271-
std::lock_guard<std::mutex> lock(_pools_mutex);
272-
for (auto& [version, pool] : _process_pools) {
337+
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
338+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
339+
auto& pool = versioned_pool->processes;
273340
for (auto& process : pool) {
274341
if (process) {
275342
process->shutdown();
276343
}
277344
}
345+
pool.clear();
346+
versioned_pool->initialized = false;
347+
}
348+
349+
{
350+
std::lock_guard<std::mutex> lock(_pools_mutex);
351+
_process_pools.clear();
278352
}
279-
_process_pools.clear();
280353
}
281354

282355
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
@@ -305,11 +378,11 @@ Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
305378
}
306379

307380
void PythonServerManager::_refresh_memory_stats() {
308-
std::lock_guard<std::mutex> lock(_pools_mutex);
309-
310381
int64_t total_rss = 0;
311382

312-
for (const auto& [version, pool] : _process_pools) {
383+
for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
384+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
385+
const auto& pool = versioned_pool->processes;
313386
for (const auto& process : pool) {
314387
if (!process || !process->is_alive()) continue;
315388

@@ -339,15 +412,15 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
339412
return Status::InvalidArgument("Empty location for clear_module_cache");
340413
}
341414

342-
std::lock_guard<std::mutex> lock(_pools_mutex);
343-
344415
std::string body = fmt::format(R"({{"location": "{}"}})", location);
345416

346417
int success_count = 0;
347418
int fail_count = 0;
348419
bool has_active_process = false;
349420

350-
for (auto& [version, pool] : _process_pools) {
421+
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
422+
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
423+
auto& pool = versioned_pool->processes;
351424
for (auto& process : pool) {
352425
if (!process || !process->is_alive()) {
353426
continue;
@@ -422,4 +495,4 @@ template Status PythonServerManager::get_client<PythonUDTFClient>(
422495
std::shared_ptr<PythonUDTFClient>* client,
423496
const std::shared_ptr<arrow::Schema>& data_schema);
424497

425-
} // namespace doris
498+
} // namespace doris

be/src/udf/python/python_server.h

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
#include <atomic>
2121
#include <condition_variable>
2222
#include <memory>
23+
#include <mutex>
2324
#include <thread>
25+
#include <unordered_map>
26+
#include <vector>
2427

2528
#include "common/status.h"
2629
#include "runtime/memory/mem_tracker.h"
@@ -46,25 +49,41 @@ class PythonServerManager {
4649

4750
Status fork(const PythonVersion& version, ProcessPtr* process);
4851

49-
Status get_process(const PythonVersion& version, ProcessPtr* process);
50-
5152
// Clear Python module cache for a specific UDF location across all processes
5253
Status clear_module_cache(const std::string& location);
5354

54-
Status ensure_pool_initialized(const PythonVersion& version);
55-
5655
void shutdown();
5756

5857
#ifdef BE_TEST
5958
// For unit testing only.
6059
void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
6160

62-
std::unordered_map<PythonVersion, std::vector<ProcessPtr>>& process_pools_for_test() {
63-
return _process_pools;
64-
}
61+
void set_process_pool_for_test(const PythonVersion& version, std::vector<ProcessPtr> processes,
62+
bool initialized = true);
63+
64+
std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& version);
6565
#endif
6666

6767
private:
68+
struct VersionedProcessPool {
69+
std::mutex mutex;
70+
std::vector<ProcessPtr> processes;
71+
bool initialized = false;
72+
};
73+
74+
/**
75+
* Lazily initialize and return the process pool for specific Python version.
76+
*/
77+
Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
78+
const PythonVersion& version);
79+
80+
/**
81+
* Pick an available process from specific pool, recreating one on demand if needed.
82+
*/
83+
Status _get_process(const PythonVersion& version,
84+
const std::shared_ptr<VersionedProcessPool>& versioned_pool,
85+
ProcessPtr* process);
86+
6887
/**
6988
* Start health check background thread (called once by ensure_pool_initialized)
7089
* Thread periodically checks process health and refreshes memory stats
@@ -86,11 +105,14 @@ class PythonServerManager {
86105
*/
87106
void _refresh_memory_stats();
88107

89-
std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
90-
// Protects _process_pools access
108+
std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const PythonVersion& version);
109+
std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
110+
_snapshot_process_pools();
111+
112+
std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
113+
// Protects the version -> pool handle map only. Per-version process operations are guarded
114+
// by VersionedProcessPool::mutex.
91115
std::mutex _pools_mutex;
92-
// Track which versions have been initialized
93-
std::unordered_set<PythonVersion> _initialized_versions;
94116
// Health check background thread
95117
std::unique_ptr<std::thread> _health_check_thread;
96118
std::atomic<bool> _shutdown_flag {false};
@@ -99,4 +121,4 @@ class PythonServerManager {
99121
MemTracker _mem_tracker {"PythonUDFProcesses"};
100122
};
101123

102-
} // namespace doris
124+
} // namespace doris

0 commit comments

Comments
 (0)