feat: import braft#3182
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
58ddb80 to
512a23a
Compare
512a23a to
5697b32
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR integrates braft (Baidu Raft) into Pika to provide distributed consensus and strong consistency guarantees, transitioning from asynchronous master-slave replication to Raft-based replication.
Key Changes:
- Added praft module with Raft consensus implementation
- Implemented binlog-based integration between Raft and storage layer
- Added Raft cluster management commands (RAFT.CLUSTER, RAFT.NODE, RAFT.CONFIG)
Reviewed Changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/praft/src/praft.cc | Core Raft implementation including RaftManager, PikaStateMachine, and command application logic |
| src/praft/include/praft/praft.h | Public API for Raft functionality including manager, node, and state machine classes |
| src/praft/src/binlog.proto | Protobuf definitions for binlog entries used in Raft log replication |
| src/storage/src/batch.cc | Batch operation abstraction supporting both direct RocksDB writes and Raft binlog submission |
| src/storage/include/storage/batch.h | Batch interface definitions for storage operations |
| src/pika_command.cc | Modified DoBinlog() to submit write commands to Raft instead of traditional binlog |
| src/pika_kv.cc | Modified SetCmd::Do() to skip RocksDB writes in Raft mode (writes occur in on_apply) |
| src/pika_raft.cc | Implementation of Raft management commands |
| include/pika_raft.h | Raft command declarations and g_in_raft_apply flag |
| CMakeLists.txt | Build configuration updates for braft, brpc, and leveldb dependencies |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| LOG(INFO) << "Parsed command: " << argv[0] << " with " << (argv.size() - 1) << " args"; | ||
|
|
||
| // Set thread-local flag to indicate we're in on_apply context | ||
| g_in_raft_apply = true; |
There was a problem hiding this comment.
The thread-local flag g_in_raft_apply is set without RAII protection. If an exception is thrown between lines 872 and 889, the flag will remain true and potentially cause incorrect behavior. Consider using an RAII guard class to automatically reset the flag.
| auto node = GetRaftNode(db_name); | ||
| if (!node) { | ||
| promise.set_value(rocksdb::Status::NotFound("Raft node not found")); | ||
| return pstd::Status::NotFound("Raft node not found for db: " + db_name); | ||
| } |
There was a problem hiding this comment.
The GetRaftNode() call at line 771 acquires a shared lock on nodes_mutex_, but raft_nodes_ is also accessed in other methods like CreateRaftNode() with unique locks. Ensure all accesses to raft_nodes_ are properly synchronized to prevent race conditions.
| case pikiwidb::DataType::kSets: return kSets; | ||
| case pikiwidb::DataType::kZSets: return kZSets; | ||
| case pikiwidb::DataType::kStreams: return kStreams; | ||
| default: return kAll; // 不应该发生 |
There was a problem hiding this comment.
The default case returns kAll which may not be appropriate for an unknown data type. Consider logging an error and returning a more explicit error state, or throwing an exception to signal invalid input.
| default: return kAll; // 不应该发生 | |
| default: | |
| LOG(ERROR) << "Unknown proto DataType: " << static_cast<int>(proto_type) << ", returning kAll"; | |
| return kAll; // 不应该发生 |
| // TODO: 这里需要将 promise 传递给 closure,让 Raft apply 完成后设置结果 | ||
| // 目前先立即返回 OK,实际应该等待 Raft 应用完成 |
There was a problem hiding this comment.
The TODO comment indicates that the promise is set to OK immediately (line 286) without waiting for Raft to complete. This creates a race condition where the client may receive success before the data is actually replicated. The promise should only be set after Raft's on_apply completes successfully.
| // Wait for Raft to apply (with 10 second timeout) | ||
| auto wait_status = future.wait_for(std::chrono::seconds(10)); |
There was a problem hiding this comment.
[nitpick] The 10-second timeout is hardcoded. Consider making this configurable through pika.conf to accommodate different deployment scenarios and network conditions.
| // Wait for Raft to apply (with 10 second timeout) | |
| auto wait_status = future.wait_for(std::chrono::seconds(10)); | |
| // Wait for Raft to apply (with configurable timeout) | |
| auto wait_status = future.wait_for(std::chrono::seconds(g_pika_server->raft_apply_timeout_sec())); |
| // TODO: For now, use db0 as default. Need to encode db_name in log. | ||
| // Could prepend db_name to Redis protocol: "<db_name>|*3\r\n..." | ||
| std::string db_name = "db0"; |
There was a problem hiding this comment.
[nitpick] The default db_name is hardcoded as 'db0'. This assumption may not hold for all deployments. Consider extracting this as a configuration parameter or determining it dynamically based on the actual database configuration.
| // TODO: For now, use db0 as default. Need to encode db_name in log. | |
| // Could prepend db_name to Redis protocol: "<db_name>|*3\r\n..." | |
| std::string db_name = "db0"; | |
| // Use configured default db_name if not prepended in log. | |
| // Could prepend db_name to Redis protocol: "<db_name>|*3\r\n..." | |
| std::string db_name; | |
| if (g_pika_conf && !g_pika_conf->default_db().empty()) { | |
| db_name = g_pika_conf->default_db(); | |
| } else { | |
| db_name = "db0"; // Fallback if config not available | |
| } |
| // Plan A: If Raft is enabled, skip actual write on first call | ||
| // The write will happen when on_apply executes this command | ||
| // Use thread-local flag to detect if we're in on_apply context | ||
| if (g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) { |
There was a problem hiding this comment.
There's a potential null pointer dereference if g_pika_server is null. Add a null check for g_pika_server before calling GetRaftManager().
| if (g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) { | |
| if (g_pika_server && g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) { |
| if (redis_proto_data[pos] != '*') { | ||
| LOG(ERROR) << "Invalid Redis protocol: missing '*'"; |
There was a problem hiding this comment.
No bounds checking before accessing redis_proto_data[pos]. If redis_proto_data is empty, this will cause undefined behavior. Add a check to ensure pos < redis_proto_data.size() before accessing.
| if (redis_proto_data[pos] != '*') { | |
| LOG(ERROR) << "Invalid Redis protocol: missing '*'"; | |
| if (redis_proto_data.empty() || pos >= redis_proto_data.size() || redis_proto_data[pos] != '*') { | |
| LOG(ERROR) << "Invalid Redis protocol: missing '*' or empty input"; |
| // TODO: 填充 Raft 和 Binlog 元信息 | ||
| // binlog_.set_term(/* current term */); | ||
| // binlog_.set_log_index(/* current index */); | ||
| // binlog_.set_filenum(/* current filenum */); | ||
| // binlog_.set_offset(/* current offset */); |
There was a problem hiding this comment.
Critical metadata fields (term, log_index, filenum, offset) are not being populated. These fields are essential for proper Raft log management and recovery. This TODO must be addressed before production use.
5697b32 to
38197e4
Compare
38197e4 to
12e9196
Compare
fdf3b29 to
a3479bb
Compare
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 18 comments.
Comments suppressed due to low confidence (7)
src/pika_raft.cc:1
- The comment contains Chinese text '风格'. Consider translating to English:
// Append binlog (pikiwidb_raft style)
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comment is in Chinese. Consider translating to English:
// Create WriteDoneClosure and pass promise
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comment is in Chinese. Consider translating to English:
// Serialize binlog
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comment is in Chinese. Consider translating to English:
// Create Raft task
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comment is in Chinese. Consider translating to English:
// Submit to Raft
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comment is in Chinese. Consider translating to English:
// Parse binlog
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
src/pika_raft.cc:1
- The comments are in Chinese. Consider translating to English:
// Call Storage::OnBinlogWrite() directly to apply binlogand// Note: log_index is currently set to 0, can be passed from external caller later
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ScopeRecordLock l(lock_mgr_, key); | ||
| return db_->Put(default_write_options_, key, strings_value.Encode()); | ||
|
|
||
| // Strings DB 只有默认列族,索引为 0 |
There was a problem hiding this comment.
The comment is in Chinese. For better maintainability and international collaboration, code comments should be in English. Consider translating to: // Strings DB only has the default column family, index 0
| // Strings DB 只有默认列族,索引为 0 | |
| // Strings DB only has the default column family, index 0 |
| continue; | ||
| } | ||
|
|
||
| // 直接解析 Protobuf binlog |
There was a problem hiding this comment.
The comment is in Chinese. For consistency with the rest of the codebase, consider translating to English: // Parse Protobuf binlog directly
| // 直接解析 Protobuf binlog | |
| // Parse Protobuf binlog directly |
| continue; | ||
| } | ||
|
|
||
| // 应用 binlog 到 storage 并获取结果 |
There was a problem hiding this comment.
The comment is in Chinese. Consider translating to English: // Apply binlog to storage and get result
| // 应用 binlog 到 storage 并获取结果 | |
| // Apply binlog to storage and get result |
| // 应用 binlog 到 storage 并获取结果 | ||
| rocksdb::Status apply_status = g_pika_server->GetRaftManager()->ApplyBinlogEntry(log_str); | ||
|
|
||
| // 根据应用结果设置 closure 状态 |
There was a problem hiding this comment.
The comment is in Chinese. Consider translating to English: // Set closure status based on application result
| // 根据应用结果设置 closure 状态 | |
| // Set closure status based on application result |
| for (const auto& entry : binlog.entries()) { | ||
| uint32_t cf_idx = entry.cf_idx(); | ||
|
|
||
|
|
There was a problem hiding this comment.
There are extra blank lines before the db variable declaration. Consider removing one of the blank lines for consistency with the rest of the codebase.
|
|
||
|
|
||
| void Cmd::DoBinlog() { | ||
| // 如果是 Raft 模式,跳过写 binlog(改用 Protobuf binlog) |
There was a problem hiding this comment.
The comment is in Chinese. Consider translating to English: // If in Raft mode, skip writing binlog (use Protobuf binlog instead)
| // 如果是 Raft 模式,跳过写 binlog(改用 Protobuf binlog) | |
| // If in Raft mode, skip writing binlog (use Protobuf binlog instead) |
| uint64_t before_do_binlog_us = pstd::NowMicros(); | ||
| this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000; | ||
| DoBinlog(); | ||
| DoBinlog(); |
There was a problem hiding this comment.
Trailing whitespace at the end of the line. Consider removing it for code cleanliness.
| DoBinlog(); | |
| DoBinlog(); |
| -DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX} | ||
| -DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE} | ||
| -DWITH_GFLAGS=ON | ||
| -DWITH_GFLAGS=OFF |
There was a problem hiding this comment.
The glog library is being configured with -DWITH_GFLAGS=OFF (changed from ON). Since gflags is a dependency and is already being built, this may cause compatibility issues. Consider verifying that glog without gflags integration is intentional and doesn't break existing logging configurations that may depend on gflags.
| -DWITH_GFLAGS=OFF | |
| -DWITH_GFLAGS=ON |
| -DBUILD_TESTING=OFF | ||
| -DBUILD_SHARED_LIBS=OFF | ||
| -DWITH_UNWIND=${LIBUNWIND_ON} | ||
| -DWITH_UNWIND=OFF |
There was a problem hiding this comment.
The glog library is being configured with -DWITH_UNWIND=OFF (changed from ${LIBUNWIND_ON}). This disables stack trace support in glog, which can make debugging crashes more difficult. Consider verifying that disabling libunwind support is intentional and acceptable for production use.
| -DWITH_UNWIND=OFF | |
| -DWITH_UNWIND=ON |
| DEPENDS | ||
| URL | ||
| https://github.com/madler/zlib/releases/download/v1.2.13/zlib-1.2.13.tar.gz | ||
| https://github.com/madler/zlib/releases/download/v1.3.1/zlib-1.3.1.tar.gz |
There was a problem hiding this comment.
The zlib version is being upgraded from 1.2.13 to 1.3.1. While version upgrades are generally good, ensure this has been tested thoroughly as zlib is a critical compression library used throughout the system. The corresponding MD5 hash has been updated, which is correct.
def88bd to
0a646cb
Compare
fe97dda to
792ae0a
Compare
741879d to
8141e26
Compare
Pika Raft 模式实现 Review 文档
1. 概述
Pika 的 Raft 模式基于 braft 实现分布式一致性,使用 brpc 进行节点间通信。
核心特性:
2. 架构概览
3. 核心组件
RaftManagerPikaRaftNodePikaStateMachinePPosixFileSystemAdaptor4. 数据流
写入流程
PikaRaftNode::AppendLog()提交到 RaftPikaStateMachine::on_apply()被调用Storage::OnBinlogWrite()写入 RocksDBBinlog 格式
定义在 binlog.proto:
5. 快照机制
GetSmallestFlushedLogIndex()LogIndexOfColumnFamilies追踪各 CF 的日志应用进度6. 配置选项
raft_enabledraft_group_idraft_election_timeout_msraft_snapshot_interval_s7. Review 重点
正确性
IsApplied()检查是否已应用,防止重复应用性能
线程安全
LogIndexOfColumnFamilies使用 mutex 保护on_apply()顺序调用8. 关键代码位置
on_apply()OnBinlogWrite()