-
Notifications
You must be signed in to change notification settings - Fork 952
Expand file tree
/
Copy pathredis_client_pipeline.hpp
More file actions
333 lines (259 loc) · 8.95 KB
/
redis_client_pipeline.hpp
File metadata and controls
333 lines (259 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#pragma once
#include "../acl_cpp_define.hpp"
#include "../stdlib/thread.hpp"
#include "../stdlib/string.hpp"
#include "../stdlib/box.hpp"
#include "../stdlib/atomic.hpp"
#include "redis_command.hpp"
#if !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)
namespace acl {
class token_tree;
class socket_stream;
class sslbase_conf;
class redis_client;
typedef enum {
redis_pipeline_t_cmd, // Redis command type
redis_pipeline_t_redirect, // Should redirect to another node
redis_pipeline_t_clusterdonw, // The redis node has been down
redis_pipeline_t_stop, // The current channel should stop
redis_pipeline_t_channel_closed,// The channel will be closed
} redis_pipeline_type_t;
class redis_pipeline_channel;
/**
* The message for transferring between redis command, redis client pipline
* and redis pipeline channel, which holds the redis command.
*/
class ACL_CPP_API redis_pipeline_message {
public:
redis_pipeline_message(redis_pipeline_type_t type,
box<redis_pipeline_message>* box);
~redis_pipeline_message();
//public:
void refer() {
++refers_;
}
void unrefer() {
if (--refers_ == 0) {
delete this;
}
}
long long get_refer() const {
return refers_;
}
redis_pipeline_message& set_type(redis_pipeline_type_t type) {
type_ = type;
return *this;
}
redis_pipeline_type_t get_type() const {
return type_;
}
//public:
// These thredd APIs are called in redis_command.cpp
// Called in redis_command::run().
void set_option(size_t nchild, const int* timeout);
// Called in redis_command::run()
void set(dbuf_pool* dbuf);
// Called in redis_command::build_request().
void set(const string* req);
// Called in redis_command::build_request().
void set_slot(size_t slot);
//public:
// Called in redis_pipeline_channel::flush_all().
const string* get_request() const {
return req_;
}
// Called in redis_pipeline_channel::wait_one().
dbuf_pool* get_dbuf() const {
return dbuf_;
}
// Called in redis_client_pipeline::run().
size_t get_slot() const {
return slot_;
}
// Called in redis_pipeline_channel::wait_one().
void set_addr(const char* addr);
// Called in redis_pipeline_channel::wait_one().
size_t get_nchild() const {
return nchild_;
}
// Called in redis_pipeline_channel::wait_one().
const int* get_timeout() const {
return timeout_ == -1 ? NULL : &timeout_;
}
//public:
void set_channel(redis_pipeline_channel* channel) {
channel_ = channel;
}
redis_pipeline_channel* get_channel() const {
return channel_;
}
//public:
void push(const redis_result* result);
const redis_result* wait() const;
const std::string& get_addr() const {
return addr_;
}
// Called in redis_pipeline_channel::wait_one().
size_t get_redirect_count() const {
return redirect_count_;
}
private:
redis_pipeline_type_t type_;
box<redis_pipeline_message>* box_;
int timeout_;
size_t nchild_;
dbuf_pool* dbuf_;
const string* req_;
const redis_result* result_;
size_t slot_;
std::string addr_;
size_t redirect_count_;
// The msg will be freed when refers_ is 0.
atomic_long refers_;
redis_pipeline_channel* channel_;
};
class redis_client_pipeline;
/**
* One pipeline channel thread for one redis node, which waits for message
* from pipline thread and try to combine more messages and sends to redis.
*/
class ACL_CPP_API redis_pipeline_channel : public thread {
public:
redis_pipeline_channel(redis_client_pipeline& pipeline,
const char* addr, int conn_timeout, int rw_timeout, bool retry);
~redis_pipeline_channel();
bool start_thread();
void stop_thread();
//public:
redis_pipeline_channel& set_ssl_conf(sslbase_conf* ssl_conf);
redis_pipeline_channel& set_passwd(const char* passwd);
const char* get_addr() const {
return addr_.c_str();
}
protected:
// @override from acl::thread
void* run();
private:
redis_client_pipeline& pipeline_;
string addr_;
string buf_;
redis_client* client_;
box<redis_pipeline_message>* box_;
std::list<redis_pipeline_message*> msgs_;
public:
void push(redis_pipeline_message* msg) const;
private:
bool handle_messages();
bool flush_requests();
bool wait_results();
const redis_result* get_result(socket_stream& conn,
redis_pipeline_message& msg) const;
bool handle_result(redis_pipeline_message* msg, const redis_result* result) const;
void all_failed();
};
/**
* Redis pipline communication, be set and used in redis_command to
* improve the performance of redis commands, but not all redis commands
* in acl can be used in pipeline mode, such as below:
* 1. multiple keys operation
* 2. blocked operation such as SUBSCRIBE in pubsub, BLPOP in list
*/
class ACL_CPP_API redis_client_pipeline : public thread {
public:
explicit redis_client_pipeline(const char* addr, box_type_t type = BOX_TYPE_MBOX);
virtual ~redis_client_pipeline();
// Start the pipeline thread
void start_thread();
// Stop the pipeline thread
void stop_thread();
//public:
// Called by redis_command in pipeline mode
const redis_result* exec(redis_pipeline_message* msg) const;
// Called by exec and application
void push(redis_pipeline_message* msg) const;
// Called by redis_command::get_pipeline_message, and can be overrided
// by child class. The box can be tbox, tbox_array, mbox, or fiber_tbox.
virtual box<redis_pipeline_message>* create_box();
//public:
// Set the ssl conf for the connection with redis internal.
redis_client_pipeline& set_ssl_conf(sslbase_conf* ssl_conf);
// Set the password for connecting the redis server
redis_client_pipeline& set_password(const char* passwd);
// Set network IO timeout
redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout);
// Set if retry on IO failed in redis_client
redis_client_pipeline& set_retry(bool on);
// Set the max hash slot of redis, the default valud is 16384
redis_client_pipeline& set_max_slot(int max_slot);
// Set if connecting all the redis nodes after starting
redis_client_pipeline& set_preconnect(bool yes);
// Get the max hash slot of redis
size_t get_max_slot() const {
return max_slot_;
}
protected:
// @override from acl::thread
void* run();
friend class redis_pipeline_channel;
// Called by redis_pipeline_channel
void notify(redis_pipeline_message* msg) const;
private:
string addr_; // The default redis address
string passwd_; // Password for connecting redis
sslbase_conf* ssl_conf_;// SSL will be used if not null
box_type_t box_type_; // The type of box
size_t max_slot_; // The max hash slot for redis cluster
int conn_timeout_; // Timeout to connect redis
int rw_timeout_; // IO timeout with redis
bool retry_; // If try again when disconnect from redis
bool preconn_; // If connecting all redis nodes when starting
token_tree* channels_; // holds and manage all pipeline channels
// The message queue for receiving redis message from other threads
box<redis_pipeline_message>* box_;
std::vector<char*> addrs_; // Hold all redis's addresses
const char** slot_addrs_; // Map hash slot with address
// Set the hash slot with the specified redis address
void set_slot(size_t slot, const char* addr);
// Set all hash slots' addresses of all redises
void set_all_slot();
// Start all pipeline channels threads
void start_channels();
// Stop all pipeline channels threads
void stop_channels() const;
// Start one pipeline channel thread with the specified redis address
redis_pipeline_channel* start_channel(const char* addr);
// Get one pipeline channel thread with the specified hash slot
redis_pipeline_channel* get_channel(size_t slot);
// Redirect one slot to another redis address
void redirect(const redis_pipeline_message& msg, size_t slot);
// When one redis node down, we should clear the node's hash slot map
// and stop the pipeline channel thread
void cluster_down(const redis_pipeline_message& msg);
// Stop one pipeline channel thread with the specified redis address,
// delete the channel thread after the thread exited.
void stop_channel(const char* addr) const;
// Delete the channel when one channel closed message got.
void channel_closed(redis_pipeline_channel* channel);
};
/**
* Sample:
* void main_thread() {
* acl::redis_client_pipeline pipeline("127.0.0.1:6379");
* pipeline.start_thread();
* // Start some threads
* ...
* // Wait for thease threads to exit and stop pipeline thread.
* pipeline.stop_thread();
* }
* // Execute redis command in one thread
* void test_thread(acl::redis_client_pipeline& pipeline) {
* acl::redis cmd;
* cmd.set_pipeline(&pipeline);
* acl::string key;
* for (size_t i = 0; i < 100000; i++) {
* key.format("test-key-%d", (int) i);
* cmd.del(key);
* }
*/
} // namespace acl
#endif // !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)