Skip to content

Commit c718ea4

Browse files
committed
feat(topic): add subscription dispatch rate commands
1 parent b9ee88f commit c718ea4

5 files changed

Lines changed: 382 additions & 2 deletions
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func GetSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
27+
desc := cmdutils.LongDescription{}
28+
desc.CommandUsedFor = "Get subscription message dispatch rate for a topic"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
msg := cmdutils.Example{
33+
Desc: "Get subscription message dispatch rate for a topic",
34+
Command: "pulsarctl topics get-subscription-dispatch-rate topic",
35+
}
36+
examples = append(examples, msg)
37+
desc.CommandExamples = examples
38+
39+
var out []cmdutils.Output
40+
successOut := cmdutils.Output{
41+
Desc: "normal output",
42+
Out: "Get subscription message dispatch rate successfully for [topic]",
43+
}
44+
out = append(out, successOut, ArgError)
45+
out = append(out, TopicNameErrors...)
46+
out = append(out, TopicLevelPolicyNotEnabledError)
47+
out = append(out, NamespaceErrors...)
48+
desc.CommandOutput = out
49+
50+
vc.SetDescription(
51+
"get-subscription-dispatch-rate",
52+
"Get subscription message dispatch rate for a topic",
53+
desc.ToString(),
54+
desc.ExampleToString(),
55+
"get-subscription-dispatch-rate",
56+
)
57+
58+
vc.SetRunFuncWithNameArg(func() error {
59+
return doGetSubscriptionDispatchRate(vc)
60+
}, "the topic name is not specified or the topic name is specified more than one")
61+
62+
vc.EnableOutputFlagSet()
63+
}
64+
65+
func doGetSubscriptionDispatchRate(vc *cmdutils.VerbCmd) error {
66+
// for testing
67+
if vc.NameError != nil {
68+
return vc.NameError
69+
}
70+
71+
topic, err := utils.GetTopicName(vc.NameArg)
72+
if err != nil {
73+
return err
74+
}
75+
76+
admin := cmdutils.NewPulsarClient()
77+
dispatchRateData, err := admin.Topics().GetSubscriptionDispatchRate(*topic)
78+
if err == nil {
79+
oc := cmdutils.NewOutputContent().WithObject(dispatchRateData)
80+
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
81+
}
82+
return err
83+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func RemoveSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
27+
desc := cmdutils.LongDescription{}
28+
desc.CommandUsedFor = "Remove subscription message dispatch rate for a topic"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
msg := cmdutils.Example{
33+
Desc: "Remove subscription message dispatch rate for a topic",
34+
Command: "pulsarctl topics remove-subscription-dispatch-rate topic",
35+
}
36+
examples = append(examples, msg)
37+
desc.CommandExamples = examples
38+
39+
var out []cmdutils.Output
40+
successOut := cmdutils.Output{
41+
Desc: "normal output",
42+
Out: "Remove subscription message dispatch rate successfully for [topic]",
43+
}
44+
out = append(out, successOut, ArgError)
45+
out = append(out, TopicNameErrors...)
46+
out = append(out, TopicLevelPolicyNotEnabledError)
47+
out = append(out, NamespaceErrors...)
48+
desc.CommandOutput = out
49+
50+
vc.SetDescription(
51+
"remove-subscription-dispatch-rate",
52+
"Remove subscription message dispatch rate for a topic",
53+
desc.ToString(),
54+
desc.ExampleToString(),
55+
"remove-subscription-dispatch-rate",
56+
)
57+
58+
vc.SetRunFuncWithNameArg(func() error {
59+
return doRemoveSubscriptionDispatchRate(vc)
60+
}, "the topic name is not specified or the topic name is specified more than one")
61+
}
62+
63+
func doRemoveSubscriptionDispatchRate(vc *cmdutils.VerbCmd) error {
64+
// for testing
65+
if vc.NameError != nil {
66+
return vc.NameError
67+
}
68+
69+
topic, err := utils.GetTopicName(vc.NameArg)
70+
if err != nil {
71+
return err
72+
}
73+
74+
admin := cmdutils.NewPulsarClient()
75+
err = admin.Topics().RemoveSubscriptionDispatchRate(*topic)
76+
if err == nil {
77+
vc.Command.Printf("Remove subscription message dispatch rate successfully for [%s]\n", topic.String())
78+
}
79+
return err
80+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
"github.com/spf13/pflag"
23+
24+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
25+
)
26+
27+
func SetSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
28+
desc := cmdutils.LongDescription{}
29+
desc.CommandUsedFor = "Set subscription message dispatch rate for a topic"
30+
desc.CommandPermission = "This command requires tenant admin permissions."
31+
32+
var examples []cmdutils.Example
33+
msg := cmdutils.Example{
34+
Desc: "Set subscription message dispatch rate for a topic",
35+
Command: "pulsarctl topics set-subscription-dispatch-rate topic " +
36+
"--msg-dispatch-rate 4 --byte-dispatch-rate 5 --dispatch-rate-period 6 --relative-to-publish-rate",
37+
}
38+
examples = append(examples, msg)
39+
desc.CommandExamples = examples
40+
41+
var out []cmdutils.Output
42+
successOut := cmdutils.Output{
43+
Desc: "normal output",
44+
Out: "Set subscription message dispatch rate successfully for [topic]",
45+
}
46+
out = append(out, successOut, ArgError)
47+
out = append(out, TopicNameErrors...)
48+
out = append(out, TopicLevelPolicyNotEnabledError)
49+
out = append(out, NamespaceErrors...)
50+
desc.CommandOutput = out
51+
52+
vc.SetDescription(
53+
"set-subscription-dispatch-rate",
54+
"Set subscription message dispatch rate for a topic",
55+
desc.ToString(),
56+
desc.ExampleToString(),
57+
"set-subscription-dispatch-rate",
58+
)
59+
dispatchRateData := &utils.DispatchRateData{}
60+
vc.SetRunFuncWithNameArg(func() error {
61+
return doSetSubscriptionDispatchRate(vc, dispatchRateData)
62+
}, "the topic name is not specified or the topic name is specified more than one")
63+
64+
vc.FlagSetGroup.InFlagSet("SubscriptionDispatchRate", func(set *pflag.FlagSet) {
65+
set.Int64VarP(
66+
&dispatchRateData.DispatchThrottlingRateInMsg,
67+
"msg-dispatch-rate",
68+
"",
69+
-1,
70+
"message-dispatch-rate (default -1 will be overwrite if not passed)")
71+
set.Int64VarP(
72+
&dispatchRateData.DispatchThrottlingRateInByte,
73+
"byte-dispatch-rate",
74+
"",
75+
-1,
76+
"byte-dispatch-rate (default -1 will be overwrite if not passed)")
77+
set.Int64VarP(
78+
&dispatchRateData.RatePeriodInSecond,
79+
"dispatch-rate-period",
80+
"",
81+
1,
82+
"dispatch-rate-period in second type (default 1 second will be overwrite if not passed)")
83+
set.BoolVarP(
84+
&dispatchRateData.RelativeToPublishRate,
85+
"relative-to-publish-rate",
86+
"",
87+
false,
88+
"dispatch rate relative to publish-rate (if publish-relative flag is enabled "+
89+
"then broker will apply throttling value to (publish-rate + dispatch rate))")
90+
})
91+
vc.EnableOutputFlagSet()
92+
}
93+
94+
func doSetSubscriptionDispatchRate(vc *cmdutils.VerbCmd, dispatchRateData *utils.DispatchRateData) error {
95+
// for testing
96+
if vc.NameError != nil {
97+
return vc.NameError
98+
}
99+
100+
topic, err := utils.GetTopicName(vc.NameArg)
101+
if err != nil {
102+
return err
103+
}
104+
admin := cmdutils.NewPulsarClient()
105+
err = admin.Topics().SetSubscriptionDispatchRate(*topic, *dispatchRateData)
106+
if err == nil {
107+
vc.Command.Printf("Set subscription message dispatch rate successfully for [%s]\n", topic.String())
108+
}
109+
return err
110+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"encoding/json"
22+
"testing"
23+
24+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
25+
"github.com/onsi/gomega"
26+
)
27+
28+
func TestSubscriptionDispatchRate(t *testing.T) {
29+
g := gomega.NewWithT(t)
30+
31+
topicName := "persistent://public/default/test-subscription-dispatch-rate-topic"
32+
args := []string{"create", topicName, "1"}
33+
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
34+
g.Expect(execErr).Should(gomega.BeNil())
35+
36+
getArgs := []string{"get-subscription-dispatch-rate", topicName}
37+
getOut, execErr, _, _ := TestTopicCommands(GetSubscriptionDispatchRateCmd, getArgs)
38+
g.Expect(execErr).Should(gomega.BeNil())
39+
g.Expect(getOut.String()).Should(gomega.Equal("null"))
40+
41+
setArgs := []string{"set-subscription-dispatch-rate", topicName, "--msg-dispatch-rate", "5",
42+
"--byte-dispatch-rate", "4", "--dispatch-rate-period", "3", "--relative-to-publish-rate"}
43+
setOut, execErr, _, _ := TestTopicCommands(SetSubscriptionDispatchRateCmd, setArgs)
44+
g.Expect(execErr).Should(gomega.BeNil())
45+
g.Expect(setOut.String()).Should(gomega.Equal(
46+
"Set subscription message dispatch rate successfully for [" + topicName + "]\n"))
47+
48+
g.Eventually(func(g gomega.Gomega) {
49+
getOut, execErr, _, _ := TestTopicCommands(GetSubscriptionDispatchRateCmd, getArgs)
50+
g.Expect(execErr).Should(gomega.BeNil())
51+
var dispatchRateData utils.DispatchRateData
52+
err := json.Unmarshal(getOut.Bytes(), &dispatchRateData)
53+
g.Expect(err).Should(gomega.BeNil())
54+
g.Expect(dispatchRateData.DispatchThrottlingRateInMsg).Should(gomega.Equal(int64(5)))
55+
g.Expect(dispatchRateData.DispatchThrottlingRateInByte).Should(gomega.Equal(int64(4)))
56+
g.Expect(dispatchRateData.RatePeriodInSecond).Should(gomega.Equal(int64(3)))
57+
g.Expect(dispatchRateData.RelativeToPublishRate).Should(gomega.Equal(true))
58+
}).Should(gomega.Succeed())
59+
60+
setArgs = []string{"remove-subscription-dispatch-rate", topicName}
61+
setOut, execErr, _, _ = TestTopicCommands(RemoveSubscriptionDispatchRateCmd, setArgs)
62+
g.Expect(execErr).Should(gomega.BeNil())
63+
g.Expect(setOut.String()).Should(gomega.Equal(
64+
"Remove subscription message dispatch rate successfully for [" + topicName + "]\n"))
65+
66+
g.Eventually(func(g gomega.Gomega) {
67+
getOut, execErr, _, _ := TestTopicCommands(GetSubscriptionDispatchRateCmd, getArgs)
68+
g.Expect(execErr).Should(gomega.BeNil())
69+
g.Expect(getOut.String()).Should(gomega.Equal("null"))
70+
}).Should(gomega.Succeed())
71+
72+
setArgs = []string{"set-subscription-dispatch-rate", topicName, "--msg-dispatch-rate", "5",
73+
"--byte-dispatch-rate", "4", "--dispatch-rate-period", "3"}
74+
setOut, execErr, _, _ = TestTopicCommands(SetSubscriptionDispatchRateCmd, setArgs)
75+
g.Expect(execErr).Should(gomega.BeNil())
76+
g.Expect(setOut.String()).Should(gomega.Equal(
77+
"Set subscription message dispatch rate successfully for [" + topicName + "]\n"))
78+
79+
g.Eventually(func(g gomega.Gomega) {
80+
getOut, execErr, _, _ := TestTopicCommands(GetSubscriptionDispatchRateCmd, getArgs)
81+
g.Expect(execErr).Should(gomega.BeNil())
82+
var dispatchRateData utils.DispatchRateData
83+
err := json.Unmarshal(getOut.Bytes(), &dispatchRateData)
84+
g.Expect(err).Should(gomega.BeNil())
85+
g.Expect(dispatchRateData.DispatchThrottlingRateInMsg).Should(gomega.Equal(int64(5)))
86+
g.Expect(dispatchRateData.DispatchThrottlingRateInByte).Should(gomega.Equal(int64(4)))
87+
g.Expect(dispatchRateData.RatePeriodInSecond).Should(gomega.Equal(int64(3)))
88+
g.Expect(dispatchRateData.RelativeToPublishRate).Should(gomega.Equal(false))
89+
}).Should(gomega.Succeed())
90+
}
91+
92+
func TestSetSubscriptionDispatchRateOnNonExistingTopic(t *testing.T) {
93+
g := gomega.NewWithT(t)
94+
95+
args := []string{"set-subscription-dispatch-rate", "persistent://public/default/non-existing-subscription-dispatch-rate-topic"}
96+
_, execErr, _, _ := TestTopicCommands(SetSubscriptionDispatchRateCmd, args)
97+
g.Expect(execErr).ShouldNot(gomega.BeNil())
98+
}
99+
100+
func TestGetSubscriptionDispatchRateOnNonExistingTopic(t *testing.T) {
101+
g := gomega.NewWithT(t)
102+
103+
args := []string{"get-subscription-dispatch-rate", "persistent://public/default/non-existing-subscription-dispatch-rate-topic"}
104+
_, execErr, _, _ := TestTopicCommands(GetSubscriptionDispatchRateCmd, args)
105+
g.Expect(execErr).ShouldNot(gomega.BeNil())
106+
}

0 commit comments

Comments
 (0)