Skip to content

Commit 7e1e5e7

Browse files
author
xiaobiao
committed
feat: implement SPUBLISH command for Redis 7.0+ Sharded Pub/Sub
- Add SPUBLISH command in cmd_pubsub.cc - Implement Server::SPublish() method for message distribution - Add test cases in pubsubshard_test.go - Support cluster mode with slot routing - Add ok-loading flag for loading state execution
1 parent bf66a9e commit 7e1e5e7

4 files changed

Lines changed: 127 additions & 0 deletions

File tree

src/commands/cmd_pubsub.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,22 @@ class CommandPubSub : public Commander {
252252
std::string subcommand_;
253253
};
254254

255+
class CommandSPublish : public Commander {
256+
public:
257+
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn,
258+
std::string *output) override {
259+
uint16_t slot = 0;
260+
if (srv->GetConfig()->cluster_enabled) {
261+
slot = GetSlotIdFromKey(args_[1]);
262+
}
263+
264+
int receivers = srv->SPublish(args_[1], args_[2], slot);
265+
*output = redis::Integer(receivers);
266+
267+
return Status::OK();
268+
}
269+
};
270+
255271
REDIS_REGISTER_COMMANDS(Pubsub, MakeCmdAttr<CommandPublish>("publish", 3, "read-only", NO_KEY),
256272
MakeCmdAttr<CommandMPublish>("mpublish", -3, "read-only", NO_KEY),
257273
MakeCmdAttr<CommandSubscribe>("subscribe", -2, "read-only no-multi no-script", NO_KEY),
@@ -260,6 +276,7 @@ REDIS_REGISTER_COMMANDS(Pubsub, MakeCmdAttr<CommandPublish>("publish", 3, "read-
260276
MakeCmdAttr<CommandPUnSubscribe>("punsubscribe", -1, "read-only no-multi no-script", NO_KEY),
261277
MakeCmdAttr<CommandSSubscribe>("ssubscribe", -2, "read-only no-multi no-script", NO_KEY),
262278
MakeCmdAttr<CommandSUnSubscribe>("sunsubscribe", -1, "read-only no-multi no-script", NO_KEY),
279+
MakeCmdAttr<CommandSPublish>("spublish", 3, "read-only ok-loading", NO_KEY),
263280
MakeCmdAttr<CommandPubSub>("pubsub", -2, "read-only no-script", NO_KEY), )
264281

265282
} // namespace redis

src/server/server.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,33 @@ void Server::ListSChannelSubscribeNum(const std::vector<std::string> &channels,
619619
}
620620
}
621621

622+
int Server::SPublish(const std::string &channel, const std::string &msg, uint16_t slot) {
623+
assert((config_->cluster_enabled && slot < HASH_SLOTS_SIZE) || slot == 0);
624+
625+
int cnt = 0;
626+
std::lock_guard<std::mutex> guard(pubsub_shard_channels_mu_);
627+
628+
auto iter = pubsub_shard_channels_[slot].find(channel);
629+
if (iter == pubsub_shard_channels_[slot].end()) {
630+
return cnt;
631+
}
632+
633+
std::string reply;
634+
reply.append(redis::MultiLen(3));
635+
reply.append(redis::BulkString("smessage"));
636+
reply.append(redis::BulkString(channel));
637+
reply.append(redis::BulkString(msg));
638+
639+
for (const auto &conn_ctx : iter->second) {
640+
auto s = conn_ctx.owner->Reply(conn_ctx.fd, reply);
641+
if (s.IsOK()) {
642+
cnt++;
643+
}
644+
}
645+
646+
return cnt;
647+
}
648+
622649
void Server::BlockOnKey(const std::string &key, redis::Connection *conn) {
623650
std::lock_guard<std::mutex> guard(blocking_keys_mu_);
624651

src/server/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ class Server {
227227
size_t GetPubSubPatternSize() const { return pubsub_patterns_.size(); }
228228
void SSubscribeChannel(const std::string &channel, redis::Connection *conn, uint16_t slot);
229229
void SUnsubscribeChannel(const std::string &channel, redis::Connection *conn, uint16_t slot);
230+
int SPublish(const std::string &channel, const std::string &msg, uint16_t slot);
230231
void GetSChannelsByPattern(const std::string &pattern, std::vector<std::string> *channels);
231232
void ListSChannelSubscribeNum(const std::vector<std::string> &channels,
232233
std::vector<ChannelSubscribeNum> *channel_subscribe_nums);

tests/gocase/unit/pubsub/pubsubshard_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,85 @@ func TestPubSubShard(t *testing.T) {
162162
}
163163
})
164164
}
165+
166+
func TestSPublish(t *testing.T) {
167+
ctx := context.Background()
168+
169+
srv := util.StartServer(t, map[string]string{})
170+
defer srv.Close()
171+
rdb := srv.NewClient()
172+
defer func() { require.NoError(t, rdb.Close()) }()
173+
174+
t.Run("SPUBLISH to no subscribers", func(t *testing.T) {
175+
// Should return 0 when no subscribers
176+
n, err := rdb.Do(ctx, "SPUBLISH", "mychannel", "hello").Int()
177+
require.NoError(t, err)
178+
require.EqualValues(t, 0, n)
179+
})
180+
181+
t.Run("SPUBLISH to one subscriber", func(t *testing.T) {
182+
pubsub := rdb.SSubscribe(ctx, "mychannel")
183+
defer pubsub.Close()
184+
185+
// Receive the subscription message
186+
receiveType(t, pubsub, &redis.Subscription{})
187+
require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
188+
189+
// Publish message
190+
n, err := rdb.Do(ctx, "SPUBLISH", "mychannel", "hello world").Int()
191+
require.NoError(t, err)
192+
require.EqualValues(t, 1, n)
193+
194+
// Receive the message
195+
msg := receiveType(t, pubsub, &redis.Message{})
196+
require.EqualValues(t, "mychannel", msg.Channel)
197+
require.EqualValues(t, "hello world", msg.Payload)
198+
})
199+
200+
t.Run("SPUBLISH to multiple subscribers", func(t *testing.T) {
201+
channel := "testchannel{tag}"
202+
203+
pubsub1 := rdb.SSubscribe(ctx, channel)
204+
defer pubsub1.Close()
205+
receiveType(t, pubsub1, &redis.Subscription{})
206+
207+
pubsub2 := rdb.SSubscribe(ctx, channel)
208+
defer pubsub2.Close()
209+
receiveType(t, pubsub2, &redis.Subscription{})
210+
211+
// Publish message
212+
n, err := rdb.Do(ctx, "SPUBLISH", channel, "message from spublish").Int()
213+
require.NoError(t, err)
214+
require.EqualValues(t, 2, n)
215+
216+
// Both subscribers should receive the message
217+
msg1 := receiveType(t, pubsub1, &redis.Message{})
218+
require.EqualValues(t, "message from spublish", msg1.Payload)
219+
220+
msg2 := receiveType(t, pubsub2, &redis.Message{})
221+
require.EqualValues(t, "message from spublish", msg2.Payload)
222+
})
223+
224+
t.Run("SPUBLISH with cluster enabled", func(t *testing.T) {
225+
csrv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
226+
defer csrv.Close()
227+
crdb := csrv.NewClient()
228+
defer func() { require.NoError(t, crdb.Close()) }()
229+
230+
nodeID := "test_node_id_12345678901234567890123456789012"
231+
require.NoError(t, crdb.Do(ctx, "clusterx", "SETNODEID", nodeID).Err())
232+
clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", nodeID, csrv.Host(), csrv.Port())
233+
require.NoError(t, crdb.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
234+
235+
pubsub := crdb.SSubscribe(ctx, "mychannel{tag}")
236+
defer pubsub.Close()
237+
receiveType(t, pubsub, &redis.Subscription{})
238+
239+
n, err := crdb.Do(ctx, "SPUBLISH", "mychannel{tag}", "cluster message").Int()
240+
require.NoError(t, err)
241+
require.EqualValues(t, 1, n)
242+
243+
msg := receiveType(t, pubsub, &redis.Message{})
244+
require.EqualValues(t, "cluster message", msg.Payload)
245+
})
246+
}

0 commit comments

Comments
 (0)