diff --git a/CMakeLists.txt b/CMakeLists.txt index f3bb015d5c..491f40f22b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -457,9 +457,9 @@ set(LZ4_INCLUDE_DIR ${INSTALL_INCLUDEDIR}) ExternalProject_Add(zlib DEPENDS URL - https://github.com/madler/zlib/releases/download/v1.2.13/zlib-1.2.13.tar.gz + https://github.com/madler/zlib/releases/download/v1.3.1/zlib-1.3.1.tar.gz URL_HASH - MD5=9b8aa094c4e5765dabf4da391f00d15c + MD5=9855b6d802d7fe5b7bd5b196a2271655 DOWNLOAD_NO_PROGRESS 1 UPDATE_COMMAND diff --git a/conf/pika.conf b/conf/pika.conf index 97d171d419..3475cd0034 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -42,7 +42,7 @@ slow-cmd-thread-pool-size : 1 admin-thread-pool-size : 2 # Slow cmd list e.g. hgetall, mset -slow-cmd-list : +slow-cmd-list : # List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed. # Default commands: info, ping, monitor @@ -95,7 +95,7 @@ proto-max-bulk-len : 512M # If <= 0, a proper value is automatically calculated. # (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB) # Supported Units [K|M|G], arena-block-size default unit is in [bytes]. -arena-block-size : +arena-block-size : # Timeout of Pika's connection, counting down starts When there are no requests # on a connection (it enters sleep state), when the countdown reaches 0, the connection @@ -109,12 +109,12 @@ timeout : 60 # [NOTICE] If this admin password is the same as user password (including both being empty), # in this scenario, users are not subject to the restrictions imposed by the userblacklist. # PS: "user password" refers to value of the parameter below: userpass. -requirepass : +requirepass : # Password for replication verify, used for authentication when a slave # connects to a master to request replication. # [NOTICE] The value of this parameter must match the "requirepass" setting on the master. -masterauth : +masterauth : # The [password of user], which is empty by default. # [NOTICE] If this user password is the same as admin password (including both being empty), @@ -153,9 +153,28 @@ replication-num : 0 # The default value of consensus-level is 0, which means this feature is not enabled. consensus-level : 0 +# Batch processing configuration (used by both command collection and consensus mechanism) +# The maximum number of items in a batch (both command collection and consensus) +# Default: 100 +batch-size : 100 + +# Batch processing configuration (used by both command collection and consensus mechanism) +# The maximum waiting batch for (both command collection and consensus) +# Default: 5 +batch-max-wait-time : 5 + +# The timeout in milliseconds for waiting for a batch ACK from a slave. +# Default: 500 +replication-ack-timeout : 500 + +# Enable command batch processing for better performance +# When enabled, write commands will be collected and processed in batches +# Default: no +command-batch-enabled : yes + # The Prefix of dump file's name. # All the files that generated by command "bgsave" will be name with this prefix. -dump-prefix : +dump-prefix : # daemonize [yes | no]. #daemonize : yes @@ -536,13 +555,13 @@ cache-num : 16 # cache-model 0:cache_none 1:cache_read cache-model : 1 # cache-type: string, set, zset, list, hash, bit -cache-type: string, set, zset, list, hash, bit +cache-type : string, set, zset, list, hash, bit # Set the maximum number of elements in the cache of the Set, list, Zset data types -cache-value-item-max-size: 1024 +cache-value-item-max-size : 1024 # Sets the maximum number of bytes for Key when the String data type is updated in the cache -max-key-size-in-cache: 1048576 +max-key-size-in-cache : 1048576 # Maximum number of keys in the zset redis cache # On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum @@ -574,10 +593,10 @@ cache-maxmemory : 10737418240 cache-maxmemory-policy : 1 # cache-maxmemory-samples -cache-maxmemory-samples: 5 +cache-maxmemory-samples : 5 # cache-lfu-decay-time -cache-lfu-decay-time: 1 +cache-lfu-decay-time : 1 # is possible to manage access to Pub/Sub channels with ACL rules as well. The @@ -631,12 +650,12 @@ cache-lfu-decay-time: 1 # 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' # which serves for the scenario of codis-pika cluster reelection # You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] -internal-used-unfinished-full-sync : +internal-used-unfinished-full-sync : # for wash data from 4.0.0 to 4.0.1 # https://github.com/OpenAtomFoundation/pika/issues/2886 # default value: true -wash-data: true +wash-data : true # Pika automatic compact compact strategy, a complement to rocksdb compact. # Trigger the compact background task periodically according to `compact-interval` @@ -671,3 +690,6 @@ dont-compact-sst-created-in-seconds : 20 # According to the number of sst files in rocksdb, # compact every `compact-every-num-of-files` file. best-delete-min-ratio : 10 +# Generated by ReplicationID CONFIG REWRITE +replication-id : d605afa1b464ddf3e571966482dd934ec7336a4bac49aa0a6b +run-id : 58ec490577bb7defd64f6fe642d7609af67896b1 diff --git a/include/pika_binlog.h b/include/pika_binlog.h index 43615ae0b4..26dd350c80 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -57,10 +57,15 @@ class Binlog : public pstd::noncopyable { pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog); pstd::Status IsOpened(); pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr); + pstd::WritableFile* GetQueue() { return queue_.get(); } /* * Set Producer pro_num and pro_offset with lock */ pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0); + + // Force sync data to disk + pstd::Status Sync(); + // Need to hold Lock(); pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index); diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index bc4c28db6a..de01ce5034 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn { std::vector> resp_array; std::shared_ptr time_stat_; - + void TryWriteResp(); private: net::ServerThread* const server_thread_; std::string current_db_; @@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn { void ProcessMonitor(const PikaCmdArgsType& argv); void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); - void TryWriteResp(); + // void TryWriteResp(); }; struct ClientInfo { diff --git a/include/pika_command_collector.h b/include/pika_command_collector.h new file mode 100644 index 0000000000..fe74bcf6ab --- /dev/null +++ b/include/pika_command_collector.h @@ -0,0 +1,91 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_COMMAND_COLLECTOR_H_ +#define PIKA_COMMAND_COLLECTOR_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "include/pika_command.h" +#include "include/pika_define.h" +#include "pstd/include/pstd_status.h" + +#include "include/pika_consensus.h" + +/** + * @brief PikaCommandCollector is used to collect write commands and process them in batches + * + * Main functions: + * 1. Collect write commands and process them in optimized batches after reaching the threshold + * 2. Handle the conflict of the same key (the later command will overwrite the earlier command) + * 3. Send commands in batches to the consensus coordinator with batch-level synchronization + * 4. Support asynchronous callback notification of command processing results + * 5. Track performance metrics for batch processing + +*/ +class PikaCommandCollector { + public: + // Callback function type after command processing is completed + using CommandCallback = std::function; + + /** + * @brief constructor + * @param coordinator consensus coordinator reference + * @param batch_max_wait_time maximum wait time in milliseconds + */ + // Constructor with raw pointer (original) + PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5); + + // Constructor with shared_ptr (for compatibility with make_shared calls) + PikaCommandCollector(std::shared_ptr coordinator, int batch_max_wait_time = 5); + + ~PikaCommandCollector(); + + /** + * @brief Add command to collector + * @param cmd_ptr command pointer + * @param callback callback function after processing is completed + * @return whether the addition was successful + */ + bool AddCommand(std::shared_ptr cmd_ptr, CommandCallback callback); + + /** + * @brief Set the batch max wait time + * @param batch_max_wait_time maximum wait time in milliseconds + */ + void SetBatchMaxWaitTime(int batch_max_wait_time); + + /** + * @brief Get batch processing statistics + * @return Pair of (total_processed_commands, total_batches) + */ + std::pair GetBatchStats() const; + + private: + //Consensus coordinator reference + ConsensusCoordinator* coordinator_; + + // Batch processing configuration + std::atomic batch_max_wait_time_; + + // Batch statistics + std::atomic total_processed_{0}; + std::atomic total_batches_{0}; +}; + +#endif // PIKA_COMMAND_COLLECTOR_H_ \ No newline at end of file diff --git a/include/pika_command_queue.h b/include/pika_command_queue.h new file mode 100644 index 0000000000..62574fcd2d --- /dev/null +++ b/include/pika_command_queue.h @@ -0,0 +1,99 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_COMMAND_QUEUE_H_ +#define PIKA_COMMAND_QUEUE_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "pstd/include/pstd_mutex.h" +#include "pstd/include/env.h" +#include "include/pika_command.h" +#include "include/pika_define.h" +#include "pstd/include/env.h" +#include "include/pika_define.h" + +// Callback function type for command completion notification +using CommandCallback = std::function; + +// Structure representing a batch of commands +struct CommandBatch { + std::vector> commands; + std::vector callbacks; + uint64_t batch_id; + uint64_t create_time; + std::string db_name; + std::vector binlog_offsets; // Binlog offsets for each command + + CommandBatch(const std::vector>& cmds, + const std::vector& cbs, + const std::string& db) + : commands(cmds), callbacks(cbs), db_name(db) { + static std::atomic next_id{1}; + batch_id = next_id.fetch_add(1); + create_time = pstd::NowMicros(); + } + + bool Empty() const { + return commands.empty(); + } + + size_t Size() const { + return commands.size(); + } +}; + +// New structure to group multiple CommandBatches for RocksDB processing +struct BatchGroup { + std::vector> batches; + LogOffset end_offset; // Only store the final offset of the last batch + BatchGroup() = default; + BatchGroup(const std::vector>& batches, + const LogOffset& final_offset) + : batches(batches), end_offset(final_offset) {} + bool Empty() const { return batches.empty(); } + size_t BatchCount() const { return batches.size(); } +}; + +// Thread-safe command queue for batched command processing +class CommandQueue { +public: + explicit CommandQueue(size_t max_size); + ~CommandQueue(); + + // Enqueue a command batch (blocking if queue is full) + bool EnqueueBatch(std::shared_ptr batch); + + // Dequeue a command batch (blocking if queue is empty) + std::shared_ptr DequeueBatch(); + + // Dequeue all available batches (non-blocking) + std::vector> DequeueAllBatches(); + + // Get current queue size + size_t Size() const; + + // Check if queue is empty + bool Empty() const; + + // Shutdown the queue + void Shutdown(); + +private: + std::queue> cmd_queue_; + mutable std::mutex queue_mutex_; + std::condition_variable queue_cv_; + size_t max_size_; + std::atomic shutdown_{false}; +}; + +#endif // PIKA_COMMAND_QUEUE_H_ \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f0..d9cd9a91bd 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -69,6 +69,21 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return sync_thread_num_; } + + bool command_batch_enabled() { + std::shared_lock l(rwlock_); + return command_batch_enabled_; + } + + int batch_size() { + std::shared_lock l(rwlock_); + return batch_size_; + } + + int batch_max_wait_time() { + std::shared_lock l(rwlock_); + return batch_max_wait_time_; + } int sync_binlog_thread_num() { std::shared_lock l(rwlock_); return sync_binlog_thread_num_; @@ -350,6 +365,16 @@ class PikaConf : public pstd::BaseConf { int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); } int consensus_level() { return consensus_level_.load(); } int replication_num() { return replication_num_.load(); } + int replication_ack_timeout() { + std::shared_lock l(rwlock_); + return replication_ack_timeout_; + } + + // Function to set replication acknowledgment timeout (used by batch system) + void SetReplicationAckTimeout(int timeout) { + std::lock_guard l(rwlock_); + replication_ack_timeout_ = timeout; + } int rate_limiter_mode() { std::shared_lock l(rwlock_); return rate_limiter_mode_; @@ -436,7 +461,6 @@ class PikaConf : public pstd::BaseConf { bool is_admin_cmd(const std::string& cmd) { return admin_cmd_set_.find(cmd) != admin_cmd_set_.end(); } - // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; } @@ -462,6 +486,23 @@ class PikaConf : public pstd::BaseConf { std::lock_guard l(rwlock_); thread_num_ = value; } + + void SetCommandBatchEnabled(const bool value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("command-batch-enabled", value ? "yes" : "no"); + command_batch_enabled_ = value; + } + + void SetCommandBatchSize(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-size", std::to_string(value)); + batch_size_ = value; + } + void SetCommandBatchMaxWaitTime(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-max-wait-time", std::to_string(value)); + batch_max_wait_time_ = value; + } void SetTimeout(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("timeout", std::to_string(value)); @@ -665,6 +706,17 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value)); max_conn_rbuf_size_.store(value); } + void SetConsensusBatchSize(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-size", std::to_string(value)); + batch_size_ = value; + } + // This method is used by config update system + void UpdateReplicationAckTimeout(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("replication-ack-timeout", std::to_string(value)); + replication_ack_timeout_ = value; + } void SetMaxCacheFiles(const int& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-cache-files", std::to_string(value)); @@ -929,6 +981,12 @@ class PikaConf : public pstd::BaseConf { std::string server_id_; std::string run_id_; std::string replication_id_; + + // 命令批处理相关配置 + bool command_batch_enabled_ = true; + int batch_size_ = 100; + int batch_max_wait_time_ = 5; + int replication_ack_timeout_ = 5000; std::string requirepass_; std::string masterauth_; std::string userpass_; @@ -1047,7 +1105,7 @@ class PikaConf : public pstd::BaseConf { int throttle_bytes_per_second_ = 200 << 20; // 200MB/s int max_rsync_parallel_num_ = kMaxRsyncParallelNum; std::atomic_int64_t rsync_timeout_ms_ = 1000; - + /* kUninitialized = 0, // unknown setting kDisable = 1, // disable perf stats diff --git a/include/pika_consensus.h b/include/pika_consensus.h index 78e20eb3ab..94258ef638 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -244,6 +244,10 @@ class ConsensusCoordinator { SyncProgress sync_pros_; std::shared_ptr stable_logger_; std::shared_ptr mem_logger_; + + // Make db_name accessible to external classes + public: + const std::string& db_name() const { return db_name_; } // pacificA public: @@ -258,6 +262,8 @@ class ConsensusCoordinator { pstd::Status CommitAppLog(const LogOffset& master_committed_id); pstd::Status UpdateCommittedID(); pstd::Status ApplyBinlog(const std::shared_ptr& cmd_ptr); + pstd::Status BatchApplyBinlogs(const std::vector& logs_to_apply); + void BatchInternalApplyFollower(const std::vector>& cmd_ptrs); pstd::Status ProcessCoordination(); LogOffset GetCommittedId() { @@ -273,9 +279,16 @@ class ConsensusCoordinator { prepared_id_ = offset; } void SetCommittedId(const LogOffset& offset) { - std::lock_guard l(committed_id_rwlock_); - committed_id_ = offset; - context_->UpdateAppliedIndex(committed_id_); + { + std::lock_guard l(committed_id_rwlock_); + if (committed_id_ >= offset) { + return; + } + committed_id_ = offset; + context_->UpdateAppliedIndex(committed_id_); + } + notification_counter_.fetch_add(1); + //LOG(INFO) << "Master SetCommittedId: Updated to " << offset.ToString(); } private: @@ -283,9 +296,10 @@ class ConsensusCoordinator { private: std::shared_mutex is_consistency_rwlock_; - bool is_consistency_ = false; + bool is_consistency_ = true; std::shared_mutex committed_id_rwlock_; LogOffset committed_id_ = LogOffset(); + std::atomic notification_counter_{0}; std::shared_mutex prepared_id__rwlock_; LogOffset prepared_id_ = LogOffset(); std::shared_ptr logs_; diff --git a/include/pika_define.h b/include/pika_define.h index c09d0d7c38..6568685a33 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -140,7 +140,9 @@ struct LogOffset { bool operator<=(const LogOffset& other) const { return b_offset <= other.b_offset; } bool operator>=(const LogOffset& other) const { return b_offset >= other.b_offset; } bool operator>(const LogOffset& other) const { return b_offset > other.b_offset; } + bool operator!=(const LogOffset& other) const { return b_offset != other.b_offset; } std::string ToString() const { return b_offset.ToString() + " " + l_offset.ToString(); } + bool IsValid() const { return b_offset.filenum > 0 || b_offset.offset > 0; } BinlogOffset b_offset; LogicOffset l_offset; }; @@ -178,10 +180,18 @@ const std::string BinlogSyncStateMsg[] = {"NotSync", "ReadFromCache", "ReadFromF struct BinlogChip { LogOffset offset_; std::string binlog_; - BinlogChip(const LogOffset& offset, std::string binlog) : offset_(offset), binlog_(std::move(binlog)) {} + bool is_batch_ = false; + + BinlogChip(const LogOffset& offset, std::string binlog) + : offset_(offset), binlog_(std::move(binlog)), is_batch_(false) {} + + BinlogChip(const LogOffset& offset, std::string binlog, bool is_batch) + : offset_(offset), binlog_(std::move(binlog)), is_batch_(is_batch) {} + BinlogChip(const BinlogChip& binlog_chip) { offset_ = binlog_chip.offset_; binlog_ = binlog_chip.binlog_; + is_batch_ = binlog_chip.is_batch_; } }; diff --git a/include/pika_kv.h b/include/pika_kv.h index 82939d29d9..8e5e5a6f32 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -24,6 +24,7 @@ class SetCmd : public Cmd { res.push_back(key_); return res; } + ~SetCmd() {} void Do() override; void DoUpdateCache() override; void DoThroughDB() override; diff --git a/include/pika_rm.h b/include/pika_rm.h index 709d5722cc..a5e7a13519 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -12,6 +12,10 @@ #include #include #include +#include +#include +#include +#include #include "pstd/include/pstd_status.h" @@ -22,13 +26,16 @@ #include "include/pika_slave_node.h" #include "include/pika_stable_log.h" #include "include/rsync_client.h" +#include "include/pika_command_collector.h" +#include "include/pika_command_queue.h" #define kBinlogSendPacketNum 40 #define kBinlogSendBatchNum 100 // unit seconds -#define kSendKeepAliveTimeout (2 * 1000000) -#define kRecvKeepAliveTimeout (20 * 1000000) +// WXR +#define kSendKeepAliveTimeout (100 * 1000000) +#define kRecvKeepAliveTimeout (200 * 1000000) class SyncDB { @@ -83,16 +90,22 @@ class SyncMasterDB : public SyncDB { return coordinator_.StableLogger()->Logger(); } + std::shared_ptr GetSlaveNode(const std::string& ip, int port); + // Make coordinator_ accessible to StableLog class + ConsensusCoordinator& GetCoordinator() { return coordinator_; } + std::shared_ptr GetCommandCollector(); + private: // invoker need to hold slave_mu_ pstd::Status ReadBinlogFileToWq(const std::shared_ptr& slave_ptr); - std::shared_ptr GetSlaveNode(const std::string& ip, int port); + //std::shared_ptr GetSlaveNode(const std::string& ip, int port); std::unordered_map> GetAllSlaveNodes(); pstd::Mutex session_mu_; int32_t session_id_ = 0; ConsensusCoordinator coordinator_; + std::shared_ptr command_collector_; //pacificA public: public: @@ -112,8 +125,7 @@ class SyncMasterDB : public SyncDB { pstd::Status UpdateCommittedID(); pstd::Status CommitAppLog(const LogOffset& master_committed_id); pstd::Status Truncate(const LogOffset& offset); - - + // pstd::Status WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms); }; class SyncSlaveDB : public SyncDB { @@ -233,6 +245,14 @@ class PikaReplicaManager { return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name); } + // Command Queue related methods + void EnqueueCommandBatch(std::shared_ptr batch); + std::shared_ptr DequeueCommandBatch(); + size_t GetCommandQueueSize() const; + bool IsCommandQueueEmpty() const; + // CommittedID notification for RocksDB thread + void NotifyCommittedID(const LogOffset& committed_id); + private: void InitDB(); pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip); @@ -243,10 +263,59 @@ class PikaReplicaManager { pstd::Mutex write_queue_mu_; + // db_name -> a queue of write task + using DBWriteTaskQueue = std::map>; + // ip:port -> a map of DBWriteTaskQueue + using SlaveWriteTaskQueue = std::map; + // every host owns a queue, the key is "ip + port" - std::unordered_map>> write_queues_; + SlaveWriteTaskQueue write_queues_; + + // client for replica std::unique_ptr pika_repl_client_; std::unique_ptr pika_repl_server_; + + // Condition variable for signaling when the write queue has new items + pstd::CondVar write_queue_cv_; + + std::shared_mutex is_consistency_rwlock_; + bool is_consistency_ = true; + std::shared_mutex committed_id_rwlock_; + + // Command queue for collected batches + std::unique_ptr command_queue_; + + // Background thread for processing command queue + std::unique_ptr command_queue_thread_; + std::atomic command_queue_running_{false}; + std::mutex command_queue_mutex_; + std::condition_variable command_queue_cv_; + + // RocksDB background thread for Put operations and client responses + std::unique_ptr rocksdb_back_thread_; + std::atomic rocksdb_thread_running_{false}; + std::mutex rocksdb_thread_mutex_; + std::condition_variable rocksdb_thread_cv_; + + // Pending batch groups waiting for CommittedID + std::queue> pending_batch_groups_; + std::mutex pending_batch_groups_mutex_; + + // Last committed ID for RocksDB thread processing + LogOffset last_committed_id_; + std::shared_mutex last_committed_id_mutex_; + + // Background thread processing methods + void StartCommandQueueThread(); + void StopCommandQueueThread(); + void CommandQueueLoop(); + void ProcessCommandBatches(const std::vector>& batches); + + // RocksDB background thread methods + void StartRocksDBThread(); + void StopRocksDBThread(); + void RocksDBThreadLoop(); + size_t ProcessCommittedBatchGroups(const LogOffset& committed_id); }; #endif // PIKA_RM_H diff --git a/include/pika_server.h b/include/pika_server.h index 41a8c9b346..f67e569645 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -581,7 +581,7 @@ class PikaServer : public pstd::noncopyable { std::string master_ip_; int master_port_ = 0; int repl_state_ = PIKA_REPL_NO_CONNECT; - bool is_consistency_ = false; + bool is_consistency_ = true; int role_ = PIKA_ROLE_SINGLE; int last_role_ = PIKA_ROLE_SINGLE; int last_meta_sync_timestamp_ = 0; diff --git a/src/net/src/bg_thread.cc b/src/net/src/bg_thread.cc index b0835330f9..a15e600494 100644 --- a/src/net/src/bg_thread.cc +++ b/src/net/src/bg_thread.cc @@ -6,6 +6,7 @@ #include "net/include/bg_thread.h" #include #include +#include namespace net { @@ -79,7 +80,7 @@ void* BGThread::ThreadMain() { while (!should_stop()) { std::unique_lock lock(mu_); - rsignal_.wait(lock, [this]() { return !queue_.empty() || !timer_queue_.empty() || should_stop(); }); + rsignal_.wait(lock, [this]() { return !queue_.empty() || should_stop(); }); if (should_stop()) { break; diff --git a/src/net/src/pb_conn.cc b/src/net/src/pb_conn.cc index 5185e8f51d..fae60f1ea7 100644 --- a/src/net/src/pb_conn.cc +++ b/src/net/src/pb_conn.cc @@ -42,7 +42,10 @@ ReadStatus PbConn::GetRequest() { case kHeader: { int quickack = 1; ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_); - setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack)); + #ifdef __linux__ + int quickack = 1; + setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack)); +#endif if (nread == -1) { if (errno == EAGAIN) { return kReadHalf; @@ -82,7 +85,10 @@ ReadStatus PbConn::GetRequest() { // read msg body ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_); int quickack = 1; - setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack)); + #ifdef __linux__ + int quickack = 1; + setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack)); +#endif if (nread == -1) { if (errno == EAGAIN) { return kReadHalf; @@ -131,7 +137,7 @@ WriteStatus PbConn::SendReply() { while (item_len - write_buf_.item_pos_ > 0) { nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_); if (nwritten <= 0) { - LOG(ERROR) << "nwritten less than 0"; + //LOG(ERROR) << "nwritten less than 0"; break; } g_network_statistic->IncrReplOutputBytes(nwritten); @@ -153,7 +159,7 @@ WriteStatus PbConn::SendReply() { if (item_len - write_buf_.item_pos_ != 0) { return kWriteHalf; } - LOG(ERROR) << "write item success"; + //LOG(ERROR) << "write item success"; } return kWriteAll; } diff --git a/src/pika_auxiliary_thread.cc b/src/pika_auxiliary_thread.cc index e94104b442..ebc02a42b0 100644 --- a/src/pika_auxiliary_thread.cc +++ b/src/pika_auxiliary_thread.cc @@ -35,11 +35,13 @@ void* PikaAuxiliaryThread::ThreadMain() { g_pika_server->CheckLeaderProtectedMode(); // TODO(whoiami) timeout + // 将增量数据写入 Write_queue_ s = g_pika_server->TriggerSendBinlogSync(); if (!s.ok()) { LOG(WARNING) << s.ToString(); } // send to peer + // 将 Write_queue 中的数据发送给从节点 int res = g_pika_server->SendToPeer(); if (res == 0) { // sleep 100 ms diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 187d63d8ad..76cb70b64f 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -298,7 +298,9 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* if (s.ok()) { s = queue_->Append(pstd::Slice(ptr, n)); if (s.ok()) { - s = queue_->Flush(); + //LOG(INFO) << "EmitPhysicalRecord Flush"; + //s = queue_->Sync(); + //s = queue_->Flush(); } } block_offset_ += static_cast(kHeaderSize + n); @@ -469,3 +471,10 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) { return Status::OK(); } + +Status Binlog::Sync() { + if (queue_) { + return queue_->Sync(); + } + return Status::Corruption("Logger not initialized"); +} \ No newline at end of file diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index a6cd5ec62f..153f3c1756 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -16,9 +16,11 @@ #include "include/pika_define.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_command_collector.h" #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" #include "src/pstd/include/scope_record_lock.h" +#include #include "rocksdb/perf_context.h" #include "rocksdb/iostats_context.h" @@ -217,8 +219,47 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st // Perform some operations rocksdb::get_perf_context()->Reset(); - // Process Command - c_ptr->Execute(); + + // Process Command - route write commands through CommandCollector for batching + if (c_ptr->is_write() && g_pika_conf->command_batch_enabled()) { + // Get the appropriate SyncMasterDB for command batching + auto sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(c_ptr->db_name())); + // 查看 DB 是不是 Master DB + if (sync_db) { + auto command_collector = sync_db->GetCommandCollector(); + if (command_collector) { + // Create callback to handle command completion + auto callback = [this, c_ptr](const LogOffset& offset, pstd::Status status) { + //LOG(INFO) << "Command completed"; + auto pc = dynamic_cast(c_ptr->GetConn().get()); + if (pc) { + auto resp_ptr = c_ptr->GetResp(); + if (resp_ptr) { + *resp_ptr = std::move(c_ptr->res().message()); + } + pc->resp_num--; + pc->TryWriteResp(); + } + }; + + // Add command to collector for batch processing + bool added = command_collector->AddCommand(c_ptr, callback); + if (!added) { + LOG(WARNING) << "Failed to add command " << c_ptr->name() << " to CommandCollector, executing directly"; + c_ptr->Execute(); + } + } else { + LOG(WARNING) << "CommandCollector not available, executing command directly"; + c_ptr->Execute(); + } + } else { + LOG(WARNING) << "SyncMasterDB not found for " << c_ptr->db_name() << ", executing command directly"; + c_ptr->Execute(); + } + } else { + // Non-write commands or batching disabled - execute directly + c_ptr->Execute(); + } time_stat_->process_done_ts_ = pstd::NowMicros(); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); @@ -553,8 +594,13 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); - *resp_ptr = std::move(cmd_ptr->res().message()); - resp_num--; + // *resp_ptr = std::move(cmd_ptr->res().message()); + // resp_num--; + if (opt == kCmdNameSet) { + } else { + *resp_ptr = std::move(cmd_ptr->res().message()); + resp_num--; + } } std::queue> PikaClientConn::GetTxnCmdQue() { return txn_cmd_que_; } diff --git a/src/pika_command.cc b/src/pika_command.cc index fa27505844..d72f5c355b 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -952,35 +952,36 @@ bool Cmd::DoReadCommandInCache() { void Cmd::DoBinlog() { - if (res().ok() && is_write() && g_pika_conf->write_binlog()) { - std::shared_ptr conn_ptr = GetConn(); - std::shared_ptr resp_ptr = GetResp(); - // Consider that dummy cmd appended by system, both conn and resp are null. - if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { - if (!conn_ptr) { - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty."; - } - if (!resp_ptr) { - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty."; - } - res().SetRes(CmdRes::kErrOther); - return; - } - - Status s = sync_db_->ConsensusProposeLog(shared_from_this()); - if (!s.ok()) { - if(g_pika_server->IsConsistency()&&s.IsTimeout()){ - res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency"); - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout" - << s.ToString(); - }else{ - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " - << s.ToString(); - res().SetRes(CmdRes::kErrOther, s.ToString()); - } - return; - } - } +// if (res().ok() && is_write() && g_pika_conf->write_binlog()) { +// std::shared_ptr conn_ptr = GetConn(); +// std::shared_ptr resp_ptr = GetResp(); +// // Consider that dummy cmd appended by system, both conn and resp are null. +// if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { +// if (!conn_ptr) { +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty."; +// } +// if (!resp_ptr) { +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty."; +// } +// res().SetRes(CmdRes::kErrOther); +// return; +// } + +// Status s = sync_db_->ConsensusProposeLog(shared_from_this()); +// if (!s.ok()) { +// if(g_pika_server->IsConsistency()&&s.IsTimeout()){ +// res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency"); +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout" +// << s.ToString(); +// }else{ +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " +// << s.ToString(); +// res().SetRes(CmdRes::kErrOther, s.ToString()); +// } +// return; +// } +// } + return; } #define PIKA_STAGE_DURATION_OUTPUT(duration) \ diff --git a/src/pika_command_collector.cc b/src/pika_command_collector.cc new file mode 100644 index 0000000000..424b2b28b4 --- /dev/null +++ b/src/pika_command_collector.cc @@ -0,0 +1,64 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include +#include "include/pika_command_collector.h" +#include "include/pika_rm.h" +#include "include/pika_conf.h" + +extern std::unique_ptr g_pika_conf; +extern std::unique_ptr g_pika_rm; + +PikaCommandCollector::PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time) + : coordinator_(coordinator), batch_max_wait_time_(batch_max_wait_time) { + LOG(INFO) << "PikaCommandCollector started for DB: " << coordinator_->db_name() + << " with batch_max_wait_time: " << batch_max_wait_time << "ms"; +} + +PikaCommandCollector::PikaCommandCollector(std::shared_ptr coordinator, int batch_max_wait_time) + : coordinator_(coordinator.get()), batch_max_wait_time_(batch_max_wait_time) { + LOG(INFO) << "PikaCommandCollector started for DB: " << coordinator_->db_name() + << " with batch_max_wait_time: " << batch_max_wait_time << "ms"; +} + +PikaCommandCollector::~PikaCommandCollector() { + LOG(INFO) << "PikaCommandCollector stopped, processed " << total_processed_.load() + << " commands, " << total_batches_.load() << " batches"; +} + +bool PikaCommandCollector::AddCommand(std::shared_ptr cmd_ptr, CommandCallback callback) { + if (!cmd_ptr || !cmd_ptr->is_write()) { + LOG(WARNING) << "Attempt to add non-write command to CommandCollector"; + return false; + } + + // Create a single-command batch directly + std::vector> commands = {cmd_ptr}; + std::vector callbacks = {std::move(callback)}; + + std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); + auto command_batch = std::make_shared(commands, callbacks, db_name); + + // Enqueue the batch directly to PikaReplicaManager + g_pika_rm->EnqueueCommandBatch(command_batch); + + // Update statistics + total_processed_.fetch_add(1); + total_batches_.fetch_add(1); + + //LOG(INFO) << "Added single command " << cmd_ptr->name() << " to CommandQueue"; + return true; +} + +void PikaCommandCollector::SetBatchMaxWaitTime(int batch_max_wait_time) { + batch_max_wait_time_.store(batch_max_wait_time); + LOG(INFO) << "BatchMaxWaitTime set to " << batch_max_wait_time << "ms"; +} + +std::pair PikaCommandCollector::GetBatchStats() const { + return {total_processed_.load(), total_batches_.load()}; +} \ No newline at end of file diff --git a/src/pika_command_queue.cc b/src/pika_command_queue.cc new file mode 100644 index 0000000000..e0c6694e31 --- /dev/null +++ b/src/pika_command_queue.cc @@ -0,0 +1,107 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_command_queue.h" +#include + +CommandQueue::CommandQueue(size_t max_size) : max_size_(max_size) { + LOG(INFO) << "CommandQueue created with max_size: " << max_size_; +} + +CommandQueue::~CommandQueue() { + Shutdown(); + LOG(INFO) << "CommandQueue destroyed"; +} + +bool CommandQueue::EnqueueBatch(std::shared_ptr batch) { + if (!batch || batch->Empty()) { + LOG(WARNING) << "Attempt to enqueue empty or null batch"; + return false; + } + + std::lock_guard lock(queue_mutex_); + + if (shutdown_.load()) { + LOG(WARNING) << "Cannot enqueue batch: queue is shutdown"; + return false; + } + + if (cmd_queue_.size() >= max_size_) { + LOG(WARNING) << "Command queue is full (size: " << cmd_queue_.size() + << ", max: " << max_size_ << "), dropping batch"; + return false; + } + + cmd_queue_.push(batch); + + //LOG(INFO) << "Enqueued command batch with " << batch->Size() + //<< " commands, queue size: " << cmd_queue_.size(); + + queue_cv_.notify_one(); + return true; +} + +std::shared_ptr CommandQueue::DequeueBatch() { + std::unique_lock lock(queue_mutex_); + + while (cmd_queue_.empty() && !shutdown_.load()) { + queue_cv_.wait(lock); + } + + if (shutdown_.load() && cmd_queue_.empty()) { + return nullptr; + } + + auto batch = cmd_queue_.front(); + cmd_queue_.pop(); + + //LOG(INFO) << "Dequeued command batch with " << batch->Size() + //<< " commands, remaining queue size: " << cmd_queue_.size(); + + return batch; +} + +std::vector> CommandQueue::DequeueAllBatches() { + std::vector> batches; + std::lock_guard lock(queue_mutex_); + + if (shutdown_.load()) { + return batches; + } + + // Take all available batches + while (!cmd_queue_.empty()) { + batches.push_back(cmd_queue_.front()); + cmd_queue_.pop(); + } + + if (!batches.empty()) { + size_t total_commands = 0; + for (const auto& batch : batches) { + total_commands += batch->Size(); + } + // LOG(INFO) << "Dequeued all batches: " << batches.size() + // << " batches with " << total_commands << " total commands"; + } + + return batches; +} + +size_t CommandQueue::Size() const { + std::lock_guard lock(queue_mutex_); + return cmd_queue_.size(); +} + +bool CommandQueue::Empty() const { + std::lock_guard lock(queue_mutex_); + return cmd_queue_.empty(); +} + +void CommandQueue::Shutdown() { + std::lock_guard lock(queue_mutex_); + shutdown_.store(true); + queue_cv_.notify_all(); + LOG(INFO) << "CommandQueue shutdown, remaining batches: " << cmd_queue_.size(); +} diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 94071eac7f..a4a7178cb7 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -412,6 +412,20 @@ int PikaConf::Load() { max_cache_statistic_keys_ = 0; } + // 命令批处理相关配置 + std::string command_batch_enabled; + GetConfStr("command-batch-enabled", &command_batch_enabled); + command_batch_enabled_ = (command_batch_enabled == "yes"); + + GetConfInt("batch-size", &batch_size_); + if (batch_size_ <= 0) { + batch_size_ = 100; + } + + GetConfInt("batch_max_wait_time", &batch_max_wait_time_); + if (batch_max_wait_time_ <= 0) { + batch_max_wait_time_ = 5; + } // disable_auto_compactions GetConfBool("disable_auto_compactions", &disable_auto_compactions_); @@ -707,9 +721,13 @@ int PikaConf::Load() { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); } - return ret; -} - + GetConfInt("replication-ack-timeout", &replication_ack_timeout_); + if (replication_ack_timeout_ <= 0) { + replication_ack_timeout_ = 5000; + } + return ret; + } + void PikaConf::TryPushDiffCommands(const std::string& command, const std::string& value) { if (!CheckConfExist(command)) { diff_commands_[command] = value; @@ -770,6 +788,9 @@ int PikaConf::ConfigRewrite() { SetConfStr("run-id", run_id_); SetConfStr("replication-id", replication_id_); SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_); + SetConfStr("command-batch-enabled", command_batch_enabled_ ? "yes" : "no"); + SetConfInt("batch-size", batch_size_); + SetConfInt("batch-max-wait-time", batch_max_wait_time_); SetConfInt("small-compaction-threshold", small_compaction_threshold_); SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); @@ -790,6 +811,7 @@ int PikaConf::ConfigRewrite() { SetConfInt("replication-num", replication_num_.load()); SetConfStr("slow-cmd-list", pstd::Set2String(slow_cmd_set_, ',')); SetConfInt("max-conn-rbuf-size", max_conn_rbuf_size_.load()); + SetConfInt("replication-ack-timeout", replication_ack_timeout_); // options for storage engine SetConfInt("max-cache-files", max_cache_files_); SetConfInt("max-background-compactions", max_background_compactions_); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index ef1960d589..6e9d587f5c 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -12,6 +12,7 @@ #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_repl_bgworker.h" using pstd::Status; @@ -142,14 +143,19 @@ Status SyncProgress::Update(const std::string& ip, int port, const LogOffset& st LogOffset acked_offset; { // update slave_ptr + LOG(INFO) << "UPdate"; std::lock_guard l(slave_ptr->slave_mu); Status s = slave_ptr->Update(start, end, &acked_offset); + slave_ptr->acked_offset = acked_offset; + if (!s.ok()) { return s; } // update match_index_ - // shared slave_ptr->slave_mu + // shared slave_ptr->slave_mu= match_index_[ip + std::to_string(port)] = acked_offset; + LOG(INFO) << "slave ip: " << ip << ", port :" << port << ",slave acked_offset " + << slave_ptr->acked_offset.ToString(); } return Status::OK(); @@ -374,6 +380,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_pt Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { if (is_consistency_) { + //LOG(INFO) << "is_consistency"; std::shared_ptr slave_ptr = sync_pros_.GetSlaveNode(ip, port); if (!slave_ptr) { return Status::NotFound("ip " + ip + " port " + std::to_string(port)); @@ -382,8 +389,8 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const std::lock_guard l(slave_ptr->slave_mu); slave_ptr->acked_offset = end; sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset); - LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset " - << slave_ptr->acked_offset.ToString(); + //LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset " + //<< slave_ptr->acked_offset.ToString(); if (slave_ptr->slave_state != kSlaveBinlogSync && slave_ptr->acked_offset >= slave_ptr->target_offset) { slave_ptr->slave_state = kSlaveBinlogSync; LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " << slave_ptr->acked_offset.ToString() @@ -391,6 +398,7 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const } } } else { + LOG(INFO) << "no_consistency"; LogOffset committed_index; Status s = sync_pros_.Update(ip, port, start, end, &committed_index); if (!s.ok()) { @@ -822,12 +830,13 @@ bool ConsensusCoordinator::checkFinished(const LogOffset& offset) { //// pacificA private: +// 持久化 Binlog Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr& cmd_ptr, LogOffset& cur_offset) { std::string content = cmd_ptr->ToRedisProtocol(); std::string binlog = std::string(); LogOffset offset = LogOffset(); Status s = stable_logger_->Logger()->Put(content, &offset, binlog); - LOG(INFO) << "PacificA binlog_offset :" << offset.ToString(); + //LOG(INFO) << "PacificA binlog_offset :" << offset.ToString(); cur_offset = offset; if (!s.ok()) { std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); @@ -841,14 +850,23 @@ Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr& cmd // If successful, append the log entry to the logs // TODO: 这里logs_的appendlog操作和上边的stable_logger_->Logger()->Put不是原子的,可能导致offset大的先被追加到logs_中, // 多线程写入的时候窗口会对不上,最终主从断开连接。需要加逻辑保证原子性 - logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); - - SetPreparedId(cur_offset); + //LOG(INFO) << "PersistAppendBinlog: About to append to logs_, current size=" << logs_->Size() + // << ", offset=" << cur_offset.ToString() << ", cmd=" << cmd_ptr->name(); + // logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); + //LOG(INFO) << "PersistAppendBinlog: Successfully appended to logs_, new size=" << logs_->Size(); + { + std::lock_guard l(order_mu_); + // Append to logs_ under order lock to maintain ordering + logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); + SetPreparedId(cur_offset); + } return stable_logger_->Logger()->IsOpened(); } +// 主节点持久化日志 Status ConsensusCoordinator::AppendEntries(const std::shared_ptr& cmd_ptr, LogOffset& cur_logoffset) { + //LOG(INFO) << "AppendEntries"; std::vector keys = cmd_ptr->current_key(); // slotkey shouldn't add binlog if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() && @@ -863,14 +881,17 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr& cmd_ptr, return s; } - g_pika_server->SignalAuxiliary(); + // g_pika_server->SignalAuxiliary(); return Status::OK(); } + +// 从节点持久化 LOG Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { + //LOG(INFO) << "AppendSlaveEntries"; LogOffset last_index = logs_->LastOffset(); if (attribute.logic_id() < last_index.l_offset.index) { - LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id() - << " cur last index " << last_index.l_offset.index; + //LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id() + //<< " cur last index " << last_index.l_offset.index; return Status::OK(); } LogOffset offset = LogOffset(); @@ -884,18 +905,21 @@ Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr& cmd_ /** * @brief Commit logs up to the given offset and update the committed ID. */ + +//从节点更新自己 CommittedID Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) { int index = logs_->FindOffset(logs_->FirstOffset()); int log_size = logs_->Size(); // Cache log size + //LOG(INFO) << "CommitAppLog, logs size: " << log_size; for (int i = index; i < log_size; ++i) { Log::LogItem log = logs_->At(i); if (master_committed_id >= log.offset) { - LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString() - << ", ApplyLog: " << log.offset.ToString(); + //LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString() + //<< ", ApplyLog: " << log.offset.ToString(); ApplyBinlog(log.cmd_ptr); } } - + // 日志截断 logs_->TruncateFrom(master_committed_id); // Truncate logs SetCommittedId(master_committed_id); // Update committed ID return Status::OK(); @@ -904,28 +928,51 @@ Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) /** * @brief Update the committed ID based on the Prepared ID of the slave */ +// 更新 CommittedID Status ConsensusCoordinator::UpdateCommittedID() { std::unordered_map> slaves = sync_pros_.GetAllSlaveNodes(); LogOffset slave_prepared_id = LogOffset(); for (const auto& slave : slaves) { if (slave.second->slave_state == kSlaveBinlogSync) { - if (slave_prepared_id == LogOffset()) { + slave_prepared_id = slave.second->acked_offset; + //LOG(INFO) << "slave_prepared_id: " << slave_prepared_id.ToString(); + /*if (slave_prepared_id == LogOffset()) { slave_prepared_id = slave.second->acked_offset; } else if (slave.second->acked_offset < slave_prepared_id) { slave_prepared_id = slave.second->acked_offset; - } + }*/ } } + // if (!has_active_slaves) { + // LogOffset master_prepared_id = GetPreparedId(); + // if (master_prepared_id.IsValid() && master_prepared_id >= GetCommittedId()) { + // SetCommittedId(master_prepared_id); + // LOG(INFO) << "PacificA update CommittedID (no active slaves): " << GetCommittedId().ToString() + // << ", Total slaves: " << total_slaves + // << ", kSlaveBinlogSync: " << binlog_sync_slaves + // << ", Other states: " << other_state_slaves; + // } else { + // LOG(INFO) << "PacificA update CommittedID: No active slaves, keeping current CommittedID: " << GetCommittedId().ToString() + // << ", Total slaves: " << total_slaves + // << ", kSlaveBinlogSync: " << binlog_sync_slaves + // << ", Other states: " << other_state_slaves; + // } + // // g_pika_rm->NotifyCommittedID(GetCommittedId()); + // return Status::OK(); + // } if (slave_prepared_id < GetCommittedId()) { LOG(WARNING) << "Error: slave_prepared_id (" << slave_prepared_id.ToString() << ") < master_committedId (" << GetCommittedId().ToString() << ")"; return Status::Error("slave_prepared_id < master_committedId"); } + // Master 节点更新 Committed ID SetCommittedId(slave_prepared_id); - LOG(INFO) << "PacificA update CommittedID: " << GetCommittedId().ToString(); + //LOG(INFO) << "PacificA update CommittedID: " << GetCommittedId().ToString(); return Status::OK(); } + +// 从节点应用日志 Status ConsensusCoordinator::ProcessCoordination() { LogOffset offset = LogOffset(); Status s = stable_logger_->Logger()->GetProducerStatus(&(offset.b_offset.filenum), &(offset.b_offset.offset), @@ -938,6 +985,7 @@ Status ConsensusCoordinator::ProcessCoordination() { } SetPreparedId(offset); if (g_pika_server->role() & PIKA_ROLE_MASTER && g_pika_server->last_role() & PIKA_ROLE_SLAVE) { + LOG(INFO) << "CommitAppLOG"; Status s = CommitAppLog(GetPreparedId()); if (!s.ok()) { return s; @@ -948,13 +996,16 @@ Status ConsensusCoordinator::ProcessCoordination() { // Execute the operation of writing to DB Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr& cmd_ptr) { auto opt = cmd_ptr->argv()[0]; + //LOG(INFO) << "[ApplyBinlog] Received command: " << opt << " for db: " << db_name_; if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + //LOG(INFO) << "[ApplyBinlog] Scheduling async task for " << opt; InternalApplyFollower(cmd_ptr); } else { int32_t wait_ms = 250; while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { // TODO: 暂时去掉了sleep的逻辑,考虑使用条件变量唤醒 //std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); + // WXR wait_ms *= 2; wait_ms = wait_ms < 3000 ? wait_ms : 3000; } @@ -964,6 +1015,25 @@ Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr& cmd_ptr) { return Status::OK(); } +// Batch apply commands to slave database +Status ConsensusCoordinator::BatchApplyBinlogs(const std::vector& logs_to_apply) { + if (logs_to_apply.empty()) { + return Status::OK(); + } + for (const auto& log_item : logs_to_apply) { + PikaReplBgWorker::WriteDBInSyncWay(log_item.cmd_ptr); + } + + return Status::OK(); +} + +void ConsensusCoordinator::BatchInternalApplyFollower(const std::vector>& cmd_ptrs) { + for (const auto& cmd_ptr : cmd_ptrs) { + // g_pika_rm->ScheduleWriteDBTask(cmd_ptr, db_name_); + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } +} + Status ConsensusCoordinator::SendBinlog(std::shared_ptr slave_ptr, std::string db_name) { std::vector tasks; @@ -987,6 +1057,7 @@ Status ConsensusCoordinator::SendBinlog(std::shared_ptr slave_ptr, st } if (!tasks.empty()) { + //LOG(INFO) << "task.size: " << tasks.size(); g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks); } return Status::OK(); diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 1c1abdd4cf..79dc6cd291 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -15,6 +15,20 @@ extern std::unique_ptr g_pika_conf; /* SET key value [NX] [XX] [EX ] [PX ] */ +// SetCmd::~SetCmd() { +// auto tmp_conn = GetConn(); +// if (!tmp_conn) { +// return; +// } + +// auto pc = dynamic_cast(tmp_conn.get()); +// std::shared_ptr resp_ptr = std::make_shared(); +// *resp_ptr = std::move(res().message()); +// pc->resp_num--; +// pc->resp_array.push_back(resp_ptr); +// pc->TryWriteResp(); +// LOG(INFO) << "SetCmd::~SetCmd() is completed"; +// } void SetCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameSet); diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 5340533160..0d3c7cbf7b 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -46,6 +46,7 @@ void PikaReplBgWorker::ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_of } void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { + //LOG(INFO) << "Handel BGWorkerWriteBinlog 1"; auto task_arg = static_cast(arg); const std::shared_ptr res = task_arg->res; std::shared_ptr conn = task_arg->conn; @@ -133,14 +134,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { slave_db->SetReplState(ReplState::kTryConnect); return; } - if(db->GetISConsistency()){ - const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id(); - LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index())); - Status s= db->CommitAppLog(master_committed_id); - if(!s.ok()){ - return; - } - } // empty binlog treated as keepalive packet if (binlog_res.binlog().empty()) { continue; @@ -165,6 +158,15 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { LOG(WARNING) << "DB " << worker->db_name_ << " Not Found"; return; } + if(db->GetISConsistency()){ + const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id(); + LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index())); + Status s= db->CommitAppLog(master_committed_id); + if(!s.ok()){ + return; + } + } + } if (only_keepalive) { @@ -178,11 +180,12 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { ack_end = productor_status; ack_end.l_offset.term = pb_end.l_offset.term; } - + db->GetCoordinator()->StableLogger()->Logger()->GetQueue()->Flush(); g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end); } int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv) { + //LOG(INFO) << "Start Handel WriteBinlog 2"; std::string opt = argv[0]; auto worker = static_cast(parser->data); // Monitor related @@ -218,6 +221,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red return -1; } if(db->GetISConsistency()){ + // 如果是一致性就调用这个函数 db->AppendSlaveEntries(c_ptr, worker->binlog_item_); }else{ db->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_); diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 498cd1d274..0891660d11 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -223,13 +223,13 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { db->Logger()->GetProducerStatus(&boffset.filenum, &boffset.offset); slave_db->SetMasterSessionId(session_id); LogOffset offset(boffset, logic_last_offset); - LOG(INFO)<<"PacificA slave first binlog stable offset : "<< offset.ToString(); + //LOG(INFO)<<"PacificA slave first binlog stable offset : "<< offset.ToString(); if(db->GetISConsistency()){ if (try_sync_response.has_prepared_id()){ const InnerMessage::BinlogOffset& prepared_id = try_sync_response.prepared_id(); LogOffset master_prepared_id(BinlogOffset(prepared_id.filenum(),prepared_id.offset()),LogicOffset(prepared_id.term(),prepared_id.index())); - LOG(INFO)<<"PacificA master TrySync Response master_prepared_id: "<GetPreparedId().ToString(); + //LOG(INFO)<<"PacificA master TrySync Response master_prepared_id: "<GetPreparedId().ToString(); if(master_prepared_id < db->GetPreparedId()){ if(master_prepared_id < db->GetCommittedId()){ @@ -250,7 +250,7 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { slave_db->SetReplState(ReplState::kConnected); // after connected, update receive time first to avoid connection timeout slave_db->SetLastRecvTime(pstd::NowMicros()); - LOG(INFO) << "DB: " << db_name << " TrySync Ok"; + //LOG(INFO) << "DB: " << db_name << " TrySync Ok"; } else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kSyncPointBePurged) { slave_db->SetReplState(ReplState::kTryDBSync); LOG(INFO) << "DB: " << db_name << " Need To Try DBSync"; diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index c8f1c9f9dc..2617a59aab 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) { - server_tp_ = std::make_unique(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer"); + server_tp_ = std::make_unique(1, 100000, "PikaReplServer"); pika_repl_server_thread_ = std::make_unique(ips, port, cron_interval); pika_repl_server_thread_->set_thread_name("PikaReplServer"); } diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 091c85a0de..911ab7dbc4 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -355,6 +355,7 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) { conn->NotifyWrite(); } +// 主节点处理从节点的 Binlog 回应 void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; @@ -401,6 +402,7 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { } if (is_first_send) { + LOG(INFO) << "first send"; if (range_start.b_offset != range_end.b_offset) { LOG(WARNING) << "first binlogsync request pb argument invalid"; conn->NotifyClose(); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9c777339ab..dd189e9d24 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -12,6 +12,7 @@ #include #include +#include #include "net/include/net_cli.h" @@ -25,6 +26,7 @@ using pstd::Status; extern std::unique_ptr g_pika_rm; extern PikaServer* g_pika_server; +extern std::unique_ptr g_pika_conf; /* SyncDB */ @@ -38,7 +40,9 @@ std::string SyncDB::DBName() { /* SyncMasterDB*/ SyncMasterDB::SyncMasterDB(const std::string& db_name) - : SyncDB(db_name), coordinator_(db_name) {} + : SyncDB(db_name), coordinator_(db_name) { + command_collector_ = std::make_shared(&coordinator_, 5); +} int SyncMasterDB::GetNumberOfSlaveNode() { return coordinator_.SyncPros().SlaveSize(); } @@ -191,6 +195,7 @@ Status SyncMasterDB::ReadBinlogFileToWq(const std::shared_ptr& slave_ return Status::OK(); } +// Master节点更查看 Slave 节点偏移量 Status SyncMasterDB::ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { Status s = coordinator_.UpdateSlave(ip, port, start, end); if (!s.ok()) { @@ -234,22 +239,27 @@ Status SyncMasterDB::WakeUpSlaveBinlogSync() { for (auto& slave_iter : slaves) { std::shared_ptr slave_ptr = slave_iter.second; std::lock_guard l(slave_ptr->slave_mu); + // slave 节点这次要发送的 sent_offset 一定要等于上次回包的 acked_offset,否则就不用发送了 if (slave_ptr->sent_offset == slave_ptr->acked_offset) { Status s; if (coordinator_.GetISConsistency()) { if(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate){ + // 强一致性给 slave 发送 binlog s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_); } } else { + // 非强一致性 s = ReadBinlogFileToWq(slave_ptr); } if (!s.ok()) { to_del.push_back(slave_ptr); - LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " - << slave_ptr->ToStringStatus() << " - " << s.ToString(); - } + // LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " + // << slave_ptr->ToStringStatus() << " - " << s.ToString(); } - } + } + + + } for (const auto& to_del_slave : to_del) { RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port()); @@ -325,6 +335,7 @@ bool SyncMasterDB::BinlogCloudPurge(uint32_t index) { return true; } +//WXR Status SyncMasterDB::CheckSyncTimeout(uint64_t now) { std::unordered_map> slaves = GetAllSlaveNodes(); @@ -424,18 +435,40 @@ LogOffset SyncMasterDB::GetCommittedId(){ Status SyncMasterDB::AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { return coordinator_.AppendSlaveEntries(cmd_ptr, attribute); } + Status SyncMasterDB::ProcessCoordination(){ return coordinator_.ProcessCoordination(); } + +// Master 节点更新自己的 CommittedID Status SyncMasterDB::UpdateCommittedID(){ - return coordinator_.UpdateCommittedID(); + // Master 更新自己的 CommittedID + //LOG(INFO) << "UpdateCommittedID"; + Status s = coordinator_.UpdateCommittedID(); + if (s.ok()) { + // Notify RocksDB thread of new CommittedID + LogOffset committed_id = coordinator_.GetCommittedId(); + if (committed_id.IsValid()) { + extern std::unique_ptr g_pika_rm; + if (g_pika_rm) { + // 唤醒 RocksDB 线程执行 + g_pika_rm->NotifyCommittedID(committed_id); + } + } + } else { + LOG(WARNING) << "UpdateCommittedID failed: " << s.ToString(); + } + return s; } + Status SyncMasterDB::Truncate(const LogOffset& offset){ return coordinator_.Truncate(offset); } Status SyncMasterDB::CommitAppLog(const LogOffset& master_committed_id){ return coordinator_.CommitAppLog(master_committed_id); } + + Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, const LogOffset& offset) { std::shared_ptr slave_ptr = GetSlaveNode(ip, port); if (!slave_ptr) { @@ -459,25 +492,92 @@ Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, cons if (!s.ok()) { return Status::Corruption("Init binlog file reader failed" + s.ToString()); // 如果初始化失败,返回错误状态 } + // 删除写队列中的数据 g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName()); slave_ptr->b_state = kReadFromFile; } - Status s = coordinator_.SendBinlog(slave_ptr, slave_ptr->DBName()); + /*Status s = coordinator_.SendBinlog(slave_ptr, slave_ptr->DBName()); if (!s.ok()) { return s; - } + }*/ return Status::OK(); } +/* +pstd::Status SyncMasterDB::WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms) { + // bug: 重发有问题 + // Get slave count + int slave_count = GetNumberOfSlaveNode(); + LOG(INFO) << "WaitForSlaveAcks: Waiting for ACKs from master and " << slave_count << " slave(s) for target " << target_offset.ToString(); + + // If no slaves, return success immediately + if (slave_count == 0) { + LOG(INFO) << "WaitForSlaveAcks: No slaves connected, returning success immediately"; + // Update committed_id + coordinator_.SetCommittedId(target_offset); + return Status::OK(); + } + + g_pika_rm->WakeUpBinlogSync(); + + // Use efficient polling mechanism to avoid creating extra threads + auto start_time = std::chrono::steady_clock::now(); + auto timeout_duration = std::chrono::milliseconds(timeout_ms); + const int POLL_INTERVAL_MS = 10; // 10ms polling interval + + while (true) { + // Check if timeout + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed >= timeout_duration) { + LOG(WARNING) << "WaitForSlaveAcks: after " << timeout_ms << "ms waiting for target " << target_offset.ToString(); + return Status::Timeout("Strong consistency replication timed out"); + } + + // Check acknowledgment status of each slave node + std::unordered_map> slaves = GetAllSlaveNodes(); + int ack_count = 1; // Master node is already acknowledged + + for (const auto& slave_pair : slaves) { + std::shared_ptr slave = slave_pair.second; + if (slave) { + slave->Lock(); + LogOffset acked_offset = slave->acked_offset; + slave->Unlock(); + + if (acked_offset >= target_offset) { + ack_count++; + } + } + } + + // Check if all nodes have acknowledged (1 master + N slaves) + int expected_acks = 1 + slave_count; + if (ack_count >= expected_acks) { + LOG(INFO) << "WaitForSlaveAcks: All " << expected_acks << " nodes have acknowledged target " << target_offset.ToString(); + // Update committed_id + coordinator_.SetCommittedId(target_offset); + return Status::OK(); + } + + // Sleep briefly then retry + std::this_thread::sleep_for(std::chrono::milliseconds(POLL_INTERVAL_MS)); + + // Periodically trigger binlog sync to ensure slave nodes receive data + if (elapsed.count() % 100 == 0) { // Trigger every 100ms + g_pika_rm->WakeUpBinlogSync(); + } + } +} +*/ + Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { // If consistency is not required, directly propose the log without waiting for consensus if (!coordinator_.GetISConsistency()) { return coordinator_.ProposeLog(cmd_ptr); } - auto start = std::chrono::steady_clock::now(); LogOffset offset; Status s = coordinator_.AppendEntries(cmd_ptr, offset); // Append the log entry to the coordinator @@ -485,19 +585,22 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { return s; } - // Wait for consensus to be achieved within 10 seconds - while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { - // Check if consensus has been achieved for the given log offset - if (checkFinished(offset)) { - return Status::OK(); - } - // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 - //std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + // // Wait for consensus to be achieved within 10 seconds + // while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { + // // Check if consensus has been achieved for the given log offset + // if (checkFinished(offset)) { + // return Status::OK(); + // } + // // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 + // //std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // } - return Status::Timeout("No consistency achieved within 10 seconds"); -} + // return Status::Timeout("No consistency achieved within 10 seconds"); + //LOG(INFO) << "ConsensusProposeLog: Successfully appended cmd " << cmd_ptr->name() + // << " with offset " << offset.ToString() << ", delegating to RocksDBThreadLoop"; + return Status::OK(); +} Status SyncMasterDB::ConsensusProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { return coordinator_.ProcessLeaderLog(cmd_ptr, attribute); @@ -515,6 +618,8 @@ std::unordered_map> SyncMasterDB::GetAll return coordinator_.SyncPros().GetAllSlaveNodes(); } +std::shared_ptr SyncMasterDB::GetCommandCollector() { return command_collector_; } + /* SyncSlaveDB */ SyncSlaveDB::SyncSlaveDB(const std::string& db_name) : SyncDB(db_name) { @@ -646,6 +751,9 @@ PikaReplicaManager::PikaReplicaManager() { int port = g_pika_conf->port() + kPortShiftReplServer; pika_repl_client_ = std::make_unique(3000, 60); pika_repl_server_ = std::make_unique(ips, port, 3000); + command_queue_ = std::make_unique(500); + // Initialize CommittedID tracking + last_committed_id_ = LogOffset(); InitDB(); } @@ -662,9 +770,17 @@ void PikaReplicaManager::Start() { LOG(FATAL) << "Start Repl Server Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + // Start command queue background thread + StartCommandQueueThread(); + // Start RocksDB background thread + StartRocksDBThread(); } void PikaReplicaManager::Stop() { + // Stop RocksDB background thread first + StopRocksDBThread(); + // Stop command queue background thread + StopCommandQueueThread(); pika_repl_client_->Stop(); pika_repl_server_->Stop(); } @@ -698,9 +814,12 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, std: const std::vector& tasks) { std::lock_guard l(write_queue_mu_); std::string index = ip + ":" + std::to_string(port); + //LOG(INFO) << "[Queue] Entering ProduceWriteQueue for " << ip << ":" << port << ", db: " << db_name << ", current queue size: " << write_queues_.size() << ", tasks to add: " << tasks.size(); for (auto& task : tasks) { write_queues_[index][db_name].push(task); } + //LOG(INFO) << "[Queue] Added " << tasks.size() << " tasks to queue for " << ip << ":" << port << ", db: " << db_name << ", new queue size: " << write_queues_[index][db_name].size(); + //LOG(INFO) << "[Queue] Exiting ProduceWriteQueue"; } int PikaReplicaManager::ConsumeWriteQueue() { @@ -710,7 +829,7 @@ int PikaReplicaManager::ConsumeWriteQueue() { std::lock_guard l(write_queue_mu_); for (auto& iter : write_queues_) { const std::string& ip_port = iter.first; - std::unordered_map>& p_map = iter.second; + std::map>& p_map = iter.second; for (auto& db_queue : p_map) { std::queue& queue = db_queue.second; for (int i = 0; i < kBinlogSendPacketNum; ++i) { @@ -747,12 +866,16 @@ int PikaReplicaManager::ConsumeWriteQueue() { LOG(WARNING) << "Parse ip_port error " << iter.first; continue; } + //LOG(INFO) << "message size: " << iter.second.size(); + //LOG(INFO) << "SendSlaveBinlogChips to " << iter.first << " start. for (auto& to_send : iter.second) { Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send); if (!s.ok()) { - LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString(); + LOG(WARNING) << "SendSlaveBinlogChips to " << iter.first << " failed, " << s.ToString(); to_delete.push_back(iter.first); continue; + } else { + //LOG(INFO) << "[Queue] SendSlaveBinlogChips to " << iter.first << " success."; } } } @@ -763,6 +886,9 @@ int PikaReplicaManager::ConsumeWriteQueue() { write_queues_.erase(del_queue); } } + // LOG(INFO) << "[Consume] Entering ConsumeWriteQueue, total queues: " << write_queues_.size(); + // LOG(INFO) << "[Consume] Consumed " << counter << " tasks"; + // LOG(INFO) << "[Consume] Exiting ConsumeWriteQueue"; return counter; } @@ -808,6 +934,7 @@ void PikaReplicaManager::ReplServerUpdateClientConnMap(const std::string& ip_por pika_repl_server_->UpdateClientConnMap(ip_port, fd); } +//更新 Binlog 状态 Status PikaReplicaManager::UpdateSyncBinlogStatus(const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end) { std::shared_lock l(dbs_rw_); @@ -820,15 +947,16 @@ Status PikaReplicaManager::UpdateSyncBinlogStatus(const RmNode& slave, const Log return s; } if(db->GetISConsistency()){ + // Master 节点更新 CommittedID s = db->UpdateCommittedID(); if (!s.ok()) { return s; } } - s = db->SyncBinlogToWq(slave.Ip(), slave.Port()); + /*s = db->SyncBinlogToWq(slave.Ip(), slave.Port()); if (!s.ok()) { return s; - } + }*/ return Status::OK(); } @@ -1181,3 +1309,408 @@ void PikaReplicaManager::BuildBinlogOffset(const LogOffset& offset, InnerMessage boffset->set_term(offset.l_offset.term); boffset->set_index(offset.l_offset.index); } + +// Command Queue related implementations +void PikaReplicaManager::EnqueueCommandBatch(std::shared_ptr batch) { + if (command_queue_) { + bool success = command_queue_->EnqueueBatch(batch); + if (success) { + // Notify the command queue thread that new data is available + { + std::lock_guard lock(command_queue_mutex_); + command_queue_cv_.notify_one(); + } + } else { + LOG(ERROR) << "Failed to enqueue command batch with " << batch->Size() << " commands"; + } + } else { + LOG(ERROR) << "Command queue not initialized"; + } +} + +std::shared_ptr PikaReplicaManager::DequeueCommandBatch() { + if (command_queue_) { + return command_queue_->DequeueBatch(); + } else { + LOG(ERROR) << "Command queue not initialized"; + return nullptr; + } +} + +size_t PikaReplicaManager::GetCommandQueueSize() const { + if (command_queue_) { + return command_queue_->Size(); + } + return 0; +} + +bool PikaReplicaManager::IsCommandQueueEmpty() const { + if (command_queue_) { + return command_queue_->Empty(); + } + return true; +} + +void PikaReplicaManager::StartCommandQueueThread() { + if (command_queue_running_.load()) { + LOG(WARNING) << "Command queue thread is already running"; + return; + } + command_queue_running_.store(true); + command_queue_thread_ = std::make_unique(&PikaReplicaManager::CommandQueueLoop, this); + LOG(INFO) << "Command queue background thread started"; +} + +void PikaReplicaManager::StopCommandQueueThread() { + if (!command_queue_running_.load()) { + return; + } + command_queue_running_.store(false); + // Notify the condition variable to wake up the thread + { + std::lock_guard lock(command_queue_mutex_); + command_queue_cv_.notify_one(); + } + + if (command_queue_thread_ && command_queue_thread_->joinable()) { + command_queue_thread_->join(); + } + LOG(INFO) << "Command queue background thread stopped"; +} + +void PikaReplicaManager::CommandQueueLoop() { + LOG(INFO) << "Command queue loop started"; + while (command_queue_running_.load()) { + try { + std::unique_lock lock(command_queue_mutex_); + // Wait for notification or timeout + command_queue_cv_.wait(lock, [this] { + return !command_queue_running_.load() || !command_queue_->Empty(); + }); + + // Check if we should exit + if (!command_queue_running_.load()) { + break; + } + + // Release lock before processing + lock.unlock(); + + // Continuously process batches until queue is empty + bool processed_any_batch = false; + do { + // Non-blocking dequeue all available batches + auto batches = command_queue_->DequeueAllBatches(); + + if (!batches.empty()) { + // Process all batches + ProcessCommandBatches(batches); + processed_any_batch = true; + //LOG(INFO) << "Processed " << batches.size() << " batches, checking for more..."; + } else { + processed_any_batch = false; + } + // Continue processing if we processed any batches (there might be more) + } while (processed_any_batch && command_queue_running_.load()); + } catch (const std::exception& e) { + LOG(ERROR) << "Error in command queue loop: " << e.what(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + //LOG(INFO) << "Command queue loop ended"; +} + +void PikaReplicaManager::ProcessCommandBatches(const std::vector>& batches) { + if (batches.empty()) { + return; + } + size_t total_commands = 0; + for (const auto& batch : batches) { + total_commands += batch->Size(); + } + //LOG(INFO) << "Processing " << batches.size() << " batches with " << total_commands << " total commands"; + + // Merge all commands from all batches for unified batch processing + std::vector> all_commands; + std::map, size_t>> command_to_batch_map; + size_t global_cmd_idx = 0; + for (const auto& batch : batches) { + if (!batch || batch->Empty()) { + continue; + } + for (size_t i = 0; i < batch->commands.size(); ++i) { + all_commands.push_back(batch->commands[i]); + command_to_batch_map[global_cmd_idx] = std::make_pair(batch, i); + global_cmd_idx++; + } + } + + if (all_commands.empty()) { + LOG(WARNING) << "No valid commands found in batches"; + return; + } + + // Use the first batch's database for consensus + std::string db_name = batches[0]->db_name; + std::shared_ptr sync_db = GetSyncMasterDBByName(DBInfo(db_name)); + if (!sync_db) { + LOG(WARNING) << "SyncMasterDB not found for database: " << db_name; + // Call callbacks with error for all batches + for (const auto& batch : batches) { + for (size_t i = 0; i < batch->callbacks.size(); ++i) { + if (batch->callbacks[i]) { + batch->callbacks[i](LogOffset(), pstd::Status::NotFound("Database not found")); + } + } + } + return; + } + + try { + // Process each command individually using AppendEntries + std::vector all_offsets; + all_offsets.resize(all_commands.size()); + + // Process each command using the original single-command logic + //LOG(INFO) << "all_command.size: " << all_commands.size(); + for (size_t cmd_idx = 0; cmd_idx < all_commands.size(); ++cmd_idx) { + const auto& cmd_ptr = all_commands[cmd_idx]; + LogOffset cmd_offset; + // Master 节点写日志 + pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr); + if (!s.ok()) { + LOG(ERROR) << "Failed to " << (sync_db->GetISConsistency() ? "append" : "propose") + << " command " << cmd_ptr->name() << ": " << s.ToString(); + // Call callbacks with error for remaining commands + for (size_t error_idx = cmd_idx; error_idx < all_commands.size(); ++error_idx) { + auto& batch_info = command_to_batch_map[error_idx]; + auto batch = batch_info.first; + size_t batch_cmd_idx = batch_info.second; + if (batch_cmd_idx < batch->callbacks.size() && batch->callbacks[batch_cmd_idx]) { + batch->callbacks[batch_cmd_idx](LogOffset(), s); + } + } + return; + } + // Get the prepared ID as the offset + // 获取 PreparedID + cmd_offset = sync_db->GetPreparedId(); + // 将 PrepardID 填充到 all_offsets 中 + all_offsets[cmd_idx] = cmd_offset; + + // Update individual batch offsets + auto& batch_info = command_to_batch_map[cmd_idx]; + auto batch = batch_info.first; + size_t batch_cmd_idx = batch_info.second; + + // Ensure the batch has enough space for offsets + if (batch->binlog_offsets.size() <= batch_cmd_idx) { + batch->binlog_offsets.resize(batch->commands.size()); + } + batch->binlog_offsets[batch_cmd_idx] = cmd_offset; + } + + //LOG(INFO) << "Successfully processed " << all_commands.size() << " commands individually"; + + // Create BatchGroup with the last offset as end_offset + LogOffset end_offset = all_offsets.back(); + //LOG(INFO) << "end_offset: " << end_offset.ToString(); + auto batch_group = std::make_shared(batches, end_offset); + + // Enqueue the BatchGroup + { + std::lock_guard lock(pending_batch_groups_mutex_); + pending_batch_groups_.push(batch_group); + //rocksdb_thread_cv_.notify_one(); + } + //LOG(INFO) << "Created BatchGroup with " << batches.size() << " batches and end_offset: " << end_offset.ToString(); + //./LOG(INFO) << "Pending BatchGroup with " << pending_batch_groups_.size() << " batches"; + } catch (const std::exception& e) { + LOG(ERROR) << "Exception in ProcessCommandBatches: " << e.what(); + // Call callbacks with error for all batches + for (const auto& batch : batches) { + for (size_t i = 0; i < batch->callbacks.size(); ++i) { + if (batch->callbacks[i]) { + batch->callbacks[i](LogOffset(), pstd::Status::Corruption("Exception in processing")); + } + } + } + } + + // Signal auxiliary thread once after processing all batches + g_pika_server->SignalAuxiliary(); +} + +// RocksDB Background Thread Implementation +void PikaReplicaManager::StartRocksDBThread() { + if (rocksdb_thread_running_.load()) { + LOG(WARNING) << "RocksDB background thread is already running"; + return; + } + rocksdb_thread_running_.store(true); + rocksdb_back_thread_ = std::make_unique(&PikaReplicaManager::RocksDBThreadLoop, this); + LOG(INFO) << "RocksDB background thread started"; +} + +void PikaReplicaManager::StopRocksDBThread() { + if (!rocksdb_thread_running_.load()) { + return; + } + rocksdb_thread_running_.store(false); + // Notify the condition variable to wake up the thread + { + std::lock_guard lock(rocksdb_thread_mutex_); + rocksdb_thread_cv_.notify_one(); + } + if (rocksdb_back_thread_ && rocksdb_back_thread_->joinable()) { + rocksdb_back_thread_->join(); + } + LOG(INFO) << "RocksDB background thread stopped"; +} + +void PikaReplicaManager::RocksDBThreadLoop() { + //LOG(INFO) << "RocksDB_back_thread started"; + while (rocksdb_thread_running_.load()) { + try { + std::unique_lock lock(rocksdb_thread_mutex_); + // Wait for CommittedID notification or shutdown signal + rocksdb_thread_cv_.wait(lock, [this] { + bool has_pending; + { + //std::lock_guard pending_lock(pending_batch_groups_mutex_); + has_pending = !pending_batch_groups_.empty(); + } + return !rocksdb_thread_running_.load() || has_pending; + }); + // Check if we should exit + if (!rocksdb_thread_running_.load()) { + break; + } + lock.unlock(); + // Get current committed ID + LogOffset committed_id; + { + std::unique_lock lock(last_committed_id_mutex_); + committed_id = last_committed_id_; + //LOG(INFO) << "committed_id: " << committed_id.ToString(); + } + //LOG(INFO) << "RocksDBThreadLoop"; + // Process committed BatchGroups + //if (committed_id.IsValid()) { + size_t groups_processed = ProcessCommittedBatchGroups(committed_id); + //if (groups_processed > 0) { + //LOG(INFO) << "Processed " << groups_processed << " committed BatchGroups"; + //} + //} + } catch (const std::exception& e) { + LOG(ERROR) << "Error in RocksDB thread loop: " << e.what(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + //LOG(INFO) << "RocksDB_back_thread ended"; +} + +size_t PikaReplicaManager::ProcessCommittedBatchGroups(const LogOffset& committed_id) { + std::queue> groups_to_process; + // Get pending BatchGroups and separate committed from uncommitted + { + //LOG(INFO) << "Start Process"; + std::lock_guard lock(pending_batch_groups_mutex_); + //LOG(INFO) << pending_batch_groups_.size() << " pending batch groups"; + while (!pending_batch_groups_.empty()) { + auto batch_group = pending_batch_groups_.front(); + // Check if this BatchGroup is committed by comparing its end_offset with committed_id + bool is_committed; + { + std::shared_lock lock(last_committed_id_mutex_); + is_committed = (batch_group->end_offset <= last_committed_id_); + //LOG(INFO) << "committed_id: " << last_committed_id_.ToString() + // << " And BatchGroup with end_offset " << batch_group->end_offset.ToString() + //<< " is committed"; + } + if (is_committed) { + groups_to_process.push(batch_group); + pending_batch_groups_.pop(); + } else { + //rocksdb_thread_cv_.notify_one(); + // First uncommitted BatchGroup found, stop processing + static LogOffset last_uncommitted_offset; + if (last_uncommitted_offset != batch_group->end_offset) { + //LOG(INFO) << "BatchGroup with end_offset " << batch_group->end_offset.ToString() + //<< " is not yet committed (committed_id: " << committed_id.ToString() << ")"; + last_uncommitted_offset = batch_group->end_offset; + } + break; + } + } + } + //LOG(INFO) << "RocksDB Start Process Committed BatchGroups"; + // Store the number of groups to process for return value + size_t groups_count = groups_to_process.size(); + // Process committed BatchGroups + while (!groups_to_process.empty()) { + auto batch_group = groups_to_process.front(); + groups_to_process.pop(); + //LOG(INFO) << "Processing committed BatchGroup with " << batch_group->BatchCount() + // << " batches for RocksDB Put operations and client callbacks"; + + // Process all batches in this group + for (const auto& batch : batch_group->batches) { + // Execute RocksDB Put operations for each command in the batch + for (size_t i = 0; i < batch->commands.size(); ++i) { + const auto& cmd_ptr = batch->commands[i]; + //LOG(INFO) << "[RocksDB] RocksDBThreadLoop" << "Processing BatchGroup with end_offset: " << batch_group->end_offset.ToString(); + try { + // Execute the command (this will perform the RocksDB Put operation) + cmd_ptr->Execute(); + // auto tmp_conn = cmd_ptr->GetConn(); + // auto pc = dynamic_cast(tmp_conn.get()); + // std::shared_ptr resp_ptr = std::make_shared(); + // *resp_ptr = std::move(cmd_ptr->res().message()); + // pc->resp_num--; + // pc->resp_array.push_back(resp_ptr); + // pc->TryWriteResp(); + if (cmd_ptr->res().ok()) { + //LOG(INFO) << "Successfully executed RocksDB Put for command: " << cmd_ptr->name(); + // Execute callback with BatchGroup's end_offset (all commands in the group get the same offset) + //if (i < batch->callbacks.size() && batch->callbacks[i]) { + //LOG(INFO) << "[Callback] RocksDBThreadLoop" + // << ", Executing callback for end_offset: " << batch_group->end_offset.ToString(); + batch->callbacks[i](batch_group->end_offset, pstd::Status::OK()); + //} + } else { + LOG(WARNING) << "Command " << cmd_ptr->name() << " execution failed: " << cmd_ptr->res().message(); + // Execute callback with command error, still use BatchGroup's end_offset + if (i < batch->callbacks.size() && batch->callbacks[i]) { + batch->callbacks[i](batch_group->end_offset, pstd::Status::Corruption("Command execution failed: " + cmd_ptr->res().message())); + } + } + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during command execution: " << e.what(); + // Execute callback with exception error, still use BatchGroup's end_offset + if (i < batch->callbacks.size() && batch->callbacks[i]) { + batch->callbacks[i](batch_group->end_offset, pstd::Status::Corruption("Exception during execution: " + std::string(e.what()))); + } + } + } + } + } + + return groups_count; +} + +// 唤醒 RocksDB 线程 +void PikaReplicaManager::NotifyCommittedID(const LogOffset& committed_id) { + { + std::unique_lock lock(last_committed_id_mutex_); + last_committed_id_ = committed_id; + //LOG(INFO) << "last_committed_id: " << last_committed_id_.ToString(); + } + // Notify RocksDB thread + { + //LOG(INFO) << "NotifyCommittedID"; + std::lock_guard lock(rocksdb_thread_mutex_); + rocksdb_thread_cv_.notify_one(); + } + //LOG(INFO) << "Notified RocksDB thread of new CommittedID: " << committed_id.ToString(); +} diff --git a/src/pika_server.cc b/src/pika_server.cc index a7d50b1e71..74f439060a 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -24,6 +24,7 @@ #include "include/pika_monotonic_time.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_command_collector.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -169,6 +170,14 @@ void PikaServer::Start() { } */ + if (g_pika_conf->command_batch_enabled()) { + auto master_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(g_pika_conf->default_db())); + if (master_db) { + auto command_collector = master_db->GetCommandCollector(); + LOG(INFO) << "Command batch enabled, command collector accessed successfully"; + } + } + ret = pika_client_processor_->Start(); if (ret != net::kSuccess) { dbs_.clear(); @@ -1096,9 +1105,17 @@ std::unordered_map PikaServer::ServerAllDBStat() { re int PikaServer::SendToPeer() { return g_pika_rm->ConsumeWriteQueue(); } -void PikaServer::SignalAuxiliary() { pika_auxiliary_thread_->cv_.notify_one(); } +void PikaServer::SignalAuxiliary() { + // LOG(INFO) << "[Signal] SignalAuxiliary called, notifying auxiliary thread"; + pika_auxiliary_thread_->cv_.notify_one(); +} -Status PikaServer::TriggerSendBinlogSync() { return g_pika_rm->WakeUpBinlogSync(); } +Status PikaServer::TriggerSendBinlogSync() { + // LOG(INFO) << "[Trigger] TriggerSendBinlogSync called, calling WakeUpBinlogSync"; + Status s = g_pika_rm->WakeUpBinlogSync(); + // LOG(INFO) << "[Trigger] WakeUpBinlogSync result: " << s.ToString(); + return s; +} int PikaServer::PubSubNumPat() { return pika_pubsub_thread_->PubSubNumPat(); } diff --git a/src/pstd/src/env.cc b/src/pstd/src/env.cc index 7dadf924ea..27dff0958a 100644 --- a/src/pstd/src/env.cc +++ b/src/pstd/src/env.cc @@ -425,7 +425,25 @@ class PosixMmapFile : public WritableFile { return s; } - Status Flush() override { return Status::OK(); } +Status Flush() override { + LOG(INFO) << "Flush"; + if (fd_ < 0) { + return IOError(filename_, EINVAL); + } + + // 刷写内核缓冲区到磁盘 + #if defined(__APPLE__) + LOG(INFO) << "fsync"; + if (fsync(fd_) != 0) { + #else + LOG(INFO) << "fdatasync"; + if (fdatasync(fd_) != 0) { + #endif + LOG(INFO) << "fdatasync error"; + return IOError(filename_, errno); + } + return Status::OK(); +} Status Sync() override { Status s; diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 3066a62759..0c4aa52cae 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -14,6 +14,7 @@ Redis::Redis(Storage* const s, const DataType& type) lock_mgr_(std::make_shared(1000, 0, std::make_shared())), small_compaction_threshold_(5000), small_compaction_duration_threshold_(10000) { + default_write_options_.disableWAL = true; statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); diff --git a/tests/integration/clean_start.sh b/tests/integration/clean_start.sh new file mode 100644 index 0000000000..f527ccedfc --- /dev/null +++ b/tests/integration/clean_start.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +cd /home/pika/caiyu/pikiwidb || exit + +clean_ports() { + echo "Checking and cleaning ports..." + + sudo killall -9 pika + + sleep 1 +} + +echo "Building project..." +sudo ./build.sh +if [ $? -ne 0 ]; then + echo "Build failed!" + exit 1 +fi +echo "Build successful." + +echo "Cleaning up test directory..." +sudo rm -rf ./output/pacifica_test/ +echo "Cleanup completed." + +clean_ports + +cd output || exit + +echo "Starting master and slave servers..." +sudo ../tests/integration/start_master_and_slave.sh +sleep 10 + +echo "Setting up strong consistency replication..." + +redis-cli -p 9302 slaveof 127.0.0.1 9301 strong +sleep 1 +echo "Replication setup successful." + +echo "Running benchmark..." + +# redis-cli -p 9301 set key "12313" +redis-benchmark -p 9301 -t set -n 100000 -c 10 --threads 1 +echo "Benchmark finished." + +echo -e "\n==== 主节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.INFO + +echo -e "\n==== 主节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.WARNING + +echo -e "\n==== 从节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.INFO + +echo -e "\n==== 从节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.WARNING \ No newline at end of file diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index d3d0f1257d..5517ade141 100755 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -1,135 +1,135 @@ -#!/bin/bash -# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. -# it's used to start pika master and slave, running path: build -cp ../conf/pika.conf ./pika_single.conf -cp ../conf/pika.conf ./pika_master.conf -cp ../conf/pika.conf ./pika_slave.conf -cp ../conf/pika.conf ./pika_rename.conf -cp ../conf/pika.conf ./pika_master_rename.conf -cp ../conf/pika.conf ./pika_slave_rename.conf -cp ../conf/pika.conf ./pika_acl_both_password.conf -cp ../conf/pika.conf ./pika_acl_only_admin_password.conf -cp ../conf/pika.conf ./pika_has_other_acl_user.conf -# Create folders for storing data on the primary and secondary nodes -mkdir master_data -mkdir slave_data -# Example Change the location for storing data on primary and secondary nodes in the configuration file -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_single.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9241|' \ - -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9231|' \ - -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf - -sed -i.bak \ - -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9251|' \ - -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9261|' \ - -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9271|' \ - -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9281|' \ - -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf -echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9291|' \ - -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9301|' \ - -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf - -# Start three nodes -./pika -c ./pika_single.conf -./pika -c ./pika_master.conf -./pika -c ./pika_slave.conf -./pika -c ./pika_rename.conf -./pika -c ./pika_acl_both_password.conf -./pika -c ./pika_acl_only_admin_password.conf -./pika -c ./pika_has_other_acl_user.conf -./pika -c ./pika_master_rename.conf -./pika -c ./pika_slave_rename.conf -#ensure both master and slave are ready -sleep 10 +# #!/bin/bash +# # This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. +# # it's used to start pika master and slave, running path: build +# cp ../conf/pika.conf ./pika_single.conf +# cp ../conf/pika.conf ./pika_master.conf +# cp ../conf/pika.conf ./pika_slave.conf +# cp ../conf/pika.conf ./pika_rename.conf +# cp ../conf/pika.conf ./pika_master_rename.conf +# cp ../conf/pika.conf ./pika_slave_rename.conf +# cp ../conf/pika.conf ./pika_acl_both_password.conf +# cp ../conf/pika.conf ./pika_acl_only_admin_password.conf +# cp ../conf/pika.conf ./pika_has_other_acl_user.conf +# # Create folders for storing data on the primary and secondary nodes +# mkdir master_data +# mkdir slave_data +# # Example Change the location for storing data on primary and secondary nodes in the configuration file +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_single.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9241|' \ +# -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9231|' \ +# -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf + +# sed -i.bak \ +# -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9251|' \ +# -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9261|' \ +# -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9271|' \ +# -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9281|' \ +# -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf +# echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9291|' \ +# -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9301|' \ +# -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf + +# # Start three nodes +# ./pika -c ./pika_single.conf +# ./pika -c ./pika_master.conf +# ./pika -c ./pika_slave.conf +# ./pika -c ./pika_rename.conf +# ./pika -c ./pika_acl_both_password.conf +# ./pika -c ./pika_acl_only_admin_password.conf +# ./pika -c ./pika_has_other_acl_user.conf +# ./pika -c ./pika_master_rename.conf +# ./pika -c ./pika_slave_rename.conf +# #ensure both master and slave are ready +# sleep 10 # 创建PacificA一致性测试的数据目录 mkdir -p pacifica_test/master