Skip to content

Commit 6c4f231

Browse files
authored
feat(topic): add subscription dispatch rate commands (#2013)
* feat(topic): add subscription dispatch rate commands * docs(topic): clarify default values in rate commands
1 parent b9ee88f commit 6c4f231

7 files changed

Lines changed: 387 additions & 7 deletions
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func GetSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
27+
desc := cmdutils.LongDescription{}
28+
desc.CommandUsedFor = "Get subscription message dispatch rate for a topic"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
msg := cmdutils.Example{
33+
Desc: "Get subscription message dispatch rate for a topic",
34+
Command: "pulsarctl topics get-subscription-dispatch-rate topic",
35+
}
36+
examples = append(examples, msg)
37+
desc.CommandExamples = examples
38+
39+
var out []cmdutils.Output
40+
successOut := cmdutils.Output{
41+
Desc: "normal output",
42+
Out: "Get subscription message dispatch rate successfully for [topic]",
43+
}
44+
out = append(out, successOut, ArgError)
45+
out = append(out, TopicNameErrors...)
46+
out = append(out, TopicLevelPolicyNotEnabledError)
47+
out = append(out, NamespaceErrors...)
48+
desc.CommandOutput = out
49+
50+
vc.SetDescription(
51+
"get-subscription-dispatch-rate",
52+
"Get subscription message dispatch rate for a topic",
53+
desc.ToString(),
54+
desc.ExampleToString(),
55+
"get-subscription-dispatch-rate",
56+
)
57+
58+
vc.SetRunFuncWithNameArg(func() error {
59+
return doGetSubscriptionDispatchRate(vc)
60+
}, "the topic name is not specified or the topic name is specified more than one")
61+
62+
vc.EnableOutputFlagSet()
63+
}
64+
65+
func doGetSubscriptionDispatchRate(vc *cmdutils.VerbCmd) error {
66+
// for testing
67+
if vc.NameError != nil {
68+
return vc.NameError
69+
}
70+
71+
topic, err := utils.GetTopicName(vc.NameArg)
72+
if err != nil {
73+
return err
74+
}
75+
76+
admin := cmdutils.NewPulsarClient()
77+
dispatchRateData, err := admin.Topics().GetSubscriptionDispatchRate(*topic)
78+
if err == nil {
79+
oc := cmdutils.NewOutputContent().WithObject(dispatchRateData)
80+
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
81+
}
82+
return err
83+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func RemoveSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
27+
desc := cmdutils.LongDescription{}
28+
desc.CommandUsedFor = "Remove subscription message dispatch rate for a topic"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
msg := cmdutils.Example{
33+
Desc: "Remove subscription message dispatch rate for a topic",
34+
Command: "pulsarctl topics remove-subscription-dispatch-rate topic",
35+
}
36+
examples = append(examples, msg)
37+
desc.CommandExamples = examples
38+
39+
var out []cmdutils.Output
40+
successOut := cmdutils.Output{
41+
Desc: "normal output",
42+
Out: "Remove subscription message dispatch rate successfully for [topic]",
43+
}
44+
out = append(out, successOut, ArgError)
45+
out = append(out, TopicNameErrors...)
46+
out = append(out, TopicLevelPolicyNotEnabledError)
47+
out = append(out, NamespaceErrors...)
48+
desc.CommandOutput = out
49+
50+
vc.SetDescription(
51+
"remove-subscription-dispatch-rate",
52+
"Remove subscription message dispatch rate for a topic",
53+
desc.ToString(),
54+
desc.ExampleToString(),
55+
"remove-subscription-dispatch-rate",
56+
)
57+
58+
vc.SetRunFuncWithNameArg(func() error {
59+
return doRemoveSubscriptionDispatchRate(vc)
60+
}, "the topic name is not specified or the topic name is specified more than one")
61+
}
62+
63+
func doRemoveSubscriptionDispatchRate(vc *cmdutils.VerbCmd) error {
64+
// for testing
65+
if vc.NameError != nil {
66+
return vc.NameError
67+
}
68+
69+
topic, err := utils.GetTopicName(vc.NameArg)
70+
if err != nil {
71+
return err
72+
}
73+
74+
admin := cmdutils.NewPulsarClient()
75+
err = admin.Topics().RemoveSubscriptionDispatchRate(*topic)
76+
if err == nil {
77+
vc.Command.Printf("Remove subscription message dispatch rate successfully for [%s]\n", topic.String())
78+
}
79+
return err
80+
}

pkg/ctl/topic/set_dispatch_rate.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,19 @@ func SetDispatchRateCmd(vc *cmdutils.VerbCmd) {
6666
"msg-dispatch-rate",
6767
"",
6868
-1,
69-
"message-dispatch-rate (default -1 will be overwrite if not passed)")
69+
"message-dispatch-rate (defaults to -1 and overwrites the existing value when omitted)")
7070
set.Int64VarP(
7171
&dispatchRateData.DispatchThrottlingRateInByte,
7272
"byte-dispatch-rate",
7373
"",
7474
-1,
75-
"byte-dispatch-rate (default -1 will be overwrite if not passed)")
75+
"byte-dispatch-rate (defaults to -1 and overwrites the existing value when omitted)")
7676
set.Int64VarP(
7777
&dispatchRateData.RatePeriodInSecond,
7878
"dispatch-rate-period",
7979
"",
8080
1,
81-
"dispatch-rate-period in second type (default 1 second will be overwrite if not passed)")
81+
"dispatch-rate-period in second type (defaults to 1 second and overwrites the existing value when omitted)")
8282
set.BoolVarP(
8383
&dispatchRateData.RelativeToPublishRate,
8484
"relative-to-publish-rate",

pkg/ctl/topic/set_publish_rate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ func SetPublishRateCmd(vc *cmdutils.VerbCmd) {
6767
"msg-publish-rate",
6868
"",
6969
-1,
70-
"message-publish-rate (default -1 will be overwrite if not passed)")
70+
"message-publish-rate (defaults to -1 and overwrites the existing value when omitted)")
7171
set.Int64VarP(
7272
&publishRateData.PublishThrottlingRateInByte,
7373
"byte-publish-rate",
7474
"",
7575
-1,
76-
"byte-publish-rate (default -1 will be overwrite if not passed)")
76+
"byte-publish-rate (defaults to -1 and overwrites the existing value when omitted)")
7777
})
7878
vc.EnableOutputFlagSet()
7979
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
"github.com/spf13/pflag"
23+
24+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
25+
)
26+
27+
func SetSubscriptionDispatchRateCmd(vc *cmdutils.VerbCmd) {
28+
desc := cmdutils.LongDescription{}
29+
desc.CommandUsedFor = "Set subscription message dispatch rate for a topic"
30+
desc.CommandPermission = "This command requires tenant admin permissions."
31+
32+
var examples []cmdutils.Example
33+
msg := cmdutils.Example{
34+
Desc: "Set subscription message dispatch rate for a topic",
35+
Command: "pulsarctl topics set-subscription-dispatch-rate topic " +
36+
"--msg-dispatch-rate 4 --byte-dispatch-rate 5 --dispatch-rate-period 6 --relative-to-publish-rate",
37+
}
38+
examples = append(examples, msg)
39+
desc.CommandExamples = examples
40+
41+
var out []cmdutils.Output
42+
successOut := cmdutils.Output{
43+
Desc: "normal output",
44+
Out: "Set subscription message dispatch rate successfully for [topic]",
45+
}
46+
out = append(out, successOut, ArgError)
47+
out = append(out, TopicNameErrors...)
48+
out = append(out, TopicLevelPolicyNotEnabledError)
49+
out = append(out, NamespaceErrors...)
50+
desc.CommandOutput = out
51+
52+
vc.SetDescription(
53+
"set-subscription-dispatch-rate",
54+
"Set subscription message dispatch rate for a topic",
55+
desc.ToString(),
56+
desc.ExampleToString(),
57+
"set-subscription-dispatch-rate",
58+
)
59+
dispatchRateData := &utils.DispatchRateData{}
60+
vc.SetRunFuncWithNameArg(func() error {
61+
return doSetSubscriptionDispatchRate(vc, dispatchRateData)
62+
}, "the topic name is not specified or the topic name is specified more than one")
63+
64+
vc.FlagSetGroup.InFlagSet("SubscriptionDispatchRate", func(set *pflag.FlagSet) {
65+
set.Int64VarP(
66+
&dispatchRateData.DispatchThrottlingRateInMsg,
67+
"msg-dispatch-rate",
68+
"",
69+
-1,
70+
"message-dispatch-rate (defaults to -1 and overwrites the existing value when omitted)")
71+
set.Int64VarP(
72+
&dispatchRateData.DispatchThrottlingRateInByte,
73+
"byte-dispatch-rate",
74+
"",
75+
-1,
76+
"byte-dispatch-rate (defaults to -1 and overwrites the existing value when omitted)")
77+
set.Int64VarP(
78+
&dispatchRateData.RatePeriodInSecond,
79+
"dispatch-rate-period",
80+
"",
81+
1,
82+
"dispatch-rate-period in second type (defaults to 1 second and overwrites the existing value when omitted)")
83+
set.BoolVarP(
84+
&dispatchRateData.RelativeToPublishRate,
85+
"relative-to-publish-rate",
86+
"",
87+
false,
88+
"dispatch rate relative to publish-rate (if publish-relative flag is enabled "+
89+
"then broker will apply throttling value to (publish-rate + dispatch rate))")
90+
})
91+
vc.EnableOutputFlagSet()
92+
}
93+
94+
func doSetSubscriptionDispatchRate(vc *cmdutils.VerbCmd, dispatchRateData *utils.DispatchRateData) error {
95+
// for testing
96+
if vc.NameError != nil {
97+
return vc.NameError
98+
}
99+
100+
topic, err := utils.GetTopicName(vc.NameArg)
101+
if err != nil {
102+
return err
103+
}
104+
admin := cmdutils.NewPulsarClient()
105+
err = admin.Topics().SetSubscriptionDispatchRate(*topic, *dispatchRateData)
106+
if err == nil {
107+
vc.Command.Printf("Set subscription message dispatch rate successfully for [%s]\n", topic.String())
108+
}
109+
return err
110+
}

0 commit comments

Comments
 (0)