Skip to content

Commit 953d9ea

Browse files
ericsyhRobertIndie
andauthored
feat: support the namespace offloadThresholdInSeconds API in pulsaradmin pkg (#1271)
* feat: support offloadThresholdInSeconds API in pulsaradmin pkg Signed-off-by: ericsyh <ericshenyuhao@outlook.com> * Update pulsaradmin/pkg/admin/namespace.go Co-authored-by: Zike Yang <zike@apache.org> * Update pulsaradmin/pkg/admin/namespace.go Co-authored-by: Zike Yang <zike@apache.org> * add test Signed-off-by: ericsyh <ericshenyuhao@outlook.com> * fix test Signed-off-by: ericsyh <ericshenyuhao@outlook.com> --------- Signed-off-by: ericsyh <ericshenyuhao@outlook.com> Co-authored-by: Zike Yang <zike@apache.org>
1 parent 01e32e7 commit 953d9ea

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

pulsaradmin/pkg/admin/namespace.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ type Namespaces interface {
114114
// GetOffloadThreshold returns the offloadThreshold for a namespace
115115
GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)
116116

117+
// SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace
118+
SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error
119+
120+
// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace
121+
GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error)
122+
117123
// SetCompactionThreshold sets the compactionThreshold for a namespace
118124
SetCompactionThreshold(namespace utils.NameSpaceName, threshold int64) error
119125

@@ -551,6 +557,18 @@ func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64,
551557
return result, err
552558
}
553559

560+
func (n *namespaces) SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error {
561+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
562+
return n.pulsar.Client.Put(endpoint, threshold)
563+
}
564+
565+
func (n *namespaces) GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) {
566+
var result int64
567+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
568+
err := n.pulsar.Client.Get(endpoint, &result)
569+
return result, err
570+
}
571+
554572
func (n *namespaces) SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error {
555573
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic")
556574
return n.pulsar.Client.Post(endpoint, max)

pulsaradmin/pkg/admin/namespace_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,67 @@ func TestNamespaces_GetSubscriptionExpirationTime(t *testing.T) {
277277
expected = -1
278278
assert.Equal(t, expected, subscriptionExpirationTime)
279279
}
280+
281+
func TestNamespaces_SetOffloadThresholdInSeconds(t *testing.T) {
282+
config := &config.Config{}
283+
admin, err := New(config)
284+
require.NoError(t, err)
285+
require.NotNil(t, admin)
286+
287+
tests := []struct {
288+
name string
289+
namespace string
290+
threshold int64
291+
errReason string
292+
}{
293+
{
294+
name: "Set valid offloadThresholdInSecond",
295+
namespace: "public/default",
296+
threshold: 60,
297+
errReason: "",
298+
},
299+
{
300+
name: "Set invalid offloadThresholdInSecond",
301+
namespace: "public/default",
302+
threshold: -60,
303+
errReason: "Invalid value for offloadThresholdInSecond",
304+
},
305+
{
306+
name: "Set valid offloadThresholdInSecond: 0",
307+
namespace: "public/default",
308+
threshold: 0,
309+
errReason: "",
310+
},
311+
}
312+
for _, tt := range tests {
313+
t.Run(tt.name, func(t *testing.T) {
314+
namespace, _ := utils.GetNamespaceName(tt.namespace)
315+
err := admin.Namespaces().SetOffloadThresholdInSeconds(*namespace, tt.threshold)
316+
if tt.errReason == "" {
317+
assert.Equal(t, nil, err)
318+
}
319+
if err != nil {
320+
restError := err.(rest.Error)
321+
assert.Equal(t, tt.errReason, restError.Reason)
322+
}
323+
})
324+
}
325+
}
326+
327+
func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
328+
config := &config.Config{}
329+
admin, err := New(config)
330+
require.NoError(t, err)
331+
require.NotNil(t, admin)
332+
333+
namespace, _ := utils.GetNamespaceName("public/default")
334+
335+
// set the subscription expiration time and get it
336+
err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
337+
60)
338+
assert.Equal(t, nil, err)
339+
offloadThresholdInSeconds, err := admin.Namespaces().GetOffloadThresholdInSeconds(*namespace)
340+
assert.Equal(t, nil, err)
341+
expected := int64(60)
342+
assert.Equal(t, expected, offloadThresholdInSeconds)
343+
}

0 commit comments

Comments
 (0)