Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");

DEFINE_String(cdc_client_java_opts, "");

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ DECLARE_Int32(arrow_flight_sql_port);
// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

// JVM options passed to cdc_client (whitespace-separated). Inserted before -jar.
DECLARE_String(cdc_client_java_opts);

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
77 changes: 63 additions & 14 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@

#include <atomic>
#include <chrono>
#include <iterator>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -149,10 +152,26 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
}
#endif
} else {
} else if (!_adopted_external.load()) {
LOG(INFO) << "CDC client has never been started";
}

#ifndef BE_TEST
// Adopt an externally-managed cdc_client if the port already answers
// healthy (e.g. one started manually for debug / hotfix).
{
std::string adopt_response;
if (check_cdc_client_health(1, 0, adopt_response).ok()) {
if (!_adopted_external.exchange(true)) {
LOG(INFO) << "Adopting external cdc client on port "
<< doris::config::cdc_client_port;
}
return Status::OK();
}
}
_adopted_external.store(false);
#endif

const char* doris_home = getenv("DORIS_HOME");
const char* log_dir = getenv("LOG_DIR");
const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
Expand Down Expand Up @@ -181,7 +200,35 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
}
std::string path(java_home);
std::string java_bin = path + "/bin/java";
// Capture signal to prevent child process from becoming a zombie process

// Pre-build everything the child needs before fork(): heap allocation after
// fork() in a multi-threaded process can deadlock on inherited libc locks.
std::vector<std::string> argv_storage;
argv_storage.emplace_back("java");
const std::string user_java_opts = doris::config::cdc_client_java_opts;
if (!user_java_opts.empty()) {
std::istringstream iss(user_java_opts);
argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
std::istream_iterator<std::string>());
}
argv_storage.emplace_back(java_opts);
// OOM safety net (last-wins, user opts cannot disable).
argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
argv_storage.emplace_back("-jar");
argv_storage.emplace_back(cdc_jar_path);
argv_storage.emplace_back(cdc_jar_port);
argv_storage.emplace_back(backend_http_port);
argv_storage.emplace_back(cluster_token);

std::vector<char*> argv;
argv.reserve(argv_storage.size() + 1);
for (auto& s : argv_storage) {
argv.push_back(const_cast<char*>(s.c_str()));
}
argv.push_back(nullptr);

const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";

struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
Expand All @@ -194,33 +241,25 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
#else
pid_t pid = fork();
if (pid < 0) {
// Fork failed
st = Status::InternalError("Fork cdc client failed.");
st.to_protobuf(result->mutable_status());
return st;
} else if (pid == 0) {
// Child process
// When the parent process is killed, the child process also needs to exit
// Child: async-signal-safe operations only until execv().
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
// Redirect stdout and stderr to log out file
std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
if (out_fd < 0) {
perror("open cdc-client.out file failed");
exit(1);
_exit(1);
}
dup2(out_fd, STDOUT_FILENO);
dup2(out_fd, STDERR_FILENO);
close(out_fd);

// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), cluster_token.c_str(), (char*)NULL);
// If execlp returns, it means it failed
execv(java_bin.c_str(), argv.data());
perror("Cdc client child process error");
exit(1);
_exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);
Expand All @@ -233,7 +272,17 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
st.to_protobuf(result->mutable_status());
} else if (kill(pid, 0) != 0) {
// Port healthy but our child has exited: an external process is
// answering. Treat as adoption instead of masking dead PID as success.
_child_pid.store(0);
if (!_adopted_external.exchange(true)) {
LOG(INFO) << "Forked cdc client " << pid << " exited but port "
<< doris::config::cdc_client_port
<< " is healthy, adopting external instance";
}
} else {
_adopted_external.store(false);
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" << health_response;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/cdc_client_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ class CdcClientMgr {
pid_t get_child_pid() const { return _child_pid.load(); }
// For testing only: set child PID directly
void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
// For testing only: inspect / drive the adopt-external flag
bool get_adopted_external_for_test() const { return _adopted_external.load(); }
void set_adopted_external_for_test(bool v) { _adopted_external.store(v); }
#endif

private:
std::mutex _start_mutex;
std::atomic<pid_t> _child_pid {0};
std::atomic<bool> _adopted_external {false};
};

} // namespace doris
24 changes: 24 additions & 0 deletions be/test/runtime/cdc_client_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,30 @@ TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus) {
EXPECT_EQ(result.status().status_code(), 999);
}

// Verify _adopted_external defaults to false and start_cdc_client (BE_TEST
// short-circuit) does not alter it.
TEST_F(CdcClientMgrTest, AdoptedExternalDefaultFalse) {
CdcClientMgr mgr;
EXPECT_FALSE(mgr.get_adopted_external_for_test());

PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(mgr.get_adopted_external_for_test());
}

// Verify the _adopted_external flag round-trips through the setter/getter.
TEST_F(CdcClientMgrTest, AdoptedExternalSetterRoundTrip) {
CdcClientMgr mgr;
EXPECT_FALSE(mgr.get_adopted_external_for_test());

mgr.set_adopted_external_for_test(true);
EXPECT_TRUE(mgr.get_adopted_external_for_test());

mgr.set_adopted_external_for_test(false);
EXPECT_FALSE(mgr.get_adopted_external_for_test());
}

// Test send_request_to_cdc_client with empty API
TEST_F(CdcClientMgrTest, SendRequestEmptyApi) {
CdcClientMgr mgr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* Copied from Flink Cdc 3.6.0
*
* <p>Line 820~854: modified getColumnValue method to fix FLINK-39748.
* <p>Line 699-703, 705-706: doReadTableColumn also matches SCHEMA_NAME (besides TABLE_NAME, FLINK-38965) to avoid Duplicate key from a decoy schema's same-named table.
*/
public class PostgresConnection extends JdbcConnection {

Expand Down Expand Up @@ -695,8 +696,14 @@ private Optional<ColumnEditor> doReadTableColumn(
// - When querying 'user_sink', the pattern may also match 'userbsink' (due to '_')
// - When querying 'user%data' (where % is literal), it may match 'user_test_data' (due to
// '%')
// The schema name passed to getColumns is also a LIKE pattern, so a decoy schema
// (e.g. 'cdcXtest' matched by 'cdc_test' via '_') can return a same-named table whose
// TABLE_NAME still equals tableId.table(); compare the schema too, otherwise those
// columns merge with the real table's and collide on column name (Duplicate key).
final String resultSchemaName = columnMetadata.getString(2);
final String resultTableName = columnMetadata.getString(3);
if (!tableId.table().equals(resultTableName)) {
if (!tableId.table().equals(resultTableName)
|| (tableId.schema() != null && !tableId.schema().equals(resultSchemaName))) {
return Optional.empty();
}

Expand Down
Loading
Loading