Skip to content

Commit 1d8e123

Browse files
committed
feat: add StreamNative Cloud context reset
1 parent 4a78d4a commit 1d8e123

13 files changed

Lines changed: 339 additions & 18 deletions

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ If you want to access to your StreamNative Cloud, you will need to have followin
9797
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json
9898

9999
# Start MCP server with StreamNative Cloud authentication and pre-configured context
100-
# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled
100+
# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled
101101
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster
102102

103103
# Start MCP server with external Kafka
@@ -118,7 +118,7 @@ docker run -i --rm -e SNMCP_ORGANIZATION=my-org -e SNMCP_KEY_FILE=/key.json -v /
118118
bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json
119119

120120
# Start MCP server with SSE and pre-configured StreamNative Cloud context
121-
# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled
121+
# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled
122122
bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster
123123

124124
# Start MCP server with SSE and external Kafka
@@ -292,7 +292,7 @@ Pulsar admin feature gates also register read-only MCP resources for the matchin
292292
| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) |
293293
| `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) |
294294

295-
> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context management tools (`sncloud_context_use_cluster`) are automatically disabled since the context is pre-configured.
295+
> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context mutation tools (`sncloud_context_use_cluster`, `sncloud_context_reset`) are automatically disabled since the context is pre-configured.
296296
297297
You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features:
298298
```bash

docs/tools/streamnative_cloud.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ If you encounter `ContextNotSetErr`, use `sncloud_context_available_clusters` to
2121

2222
---
2323

24+
#### sncloud_context_reset
25+
26+
Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection, and cluster-specific tools will return `ContextNotSetErr` until `sncloud_context_use_cluster` is used again.
27+
28+
- **sncloud_context_reset**
29+
- No parameters required
30+
31+
---
32+
2433
#### sncloud_context_whoami
2534

2635
Display the currently logged-in service account. Returns the name of the authenticated service account and the organization.

pkg/cmd/mcp/stdio.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/spf13/cobra"
3131
"github.com/streamnative/streamnative-mcp-server/pkg/common"
3232
"github.com/streamnative/streamnative-mcp-server/pkg/log"
33+
mcpctx "github.com/streamnative/streamnative-mcp-server/pkg/mcp"
3334
)
3435

3536
// NewCmdMcpStdioServer builds the stdio server command.
@@ -69,6 +70,15 @@ func runStdioServer(configOpts *ServerOptions) error {
6970
return fmt.Errorf("failed to create MCP server: %w", err)
7071
}
7172

73+
ctx = mcpctx.WithSNCloudSession(ctx, mcpServer.SNCloudSession)
74+
ctx = mcpctx.WithPulsarSession(ctx, mcpServer.PulsarSession)
75+
ctx = mcpctx.WithKafkaSession(ctx, mcpServer.KafkaSession)
76+
if configOpts.KeyFile != "" && configOpts.PulsarInstance != "" && configOpts.PulsarCluster != "" {
77+
if err := mcpctx.SetContext(ctx, configOpts.Options, configOpts.PulsarInstance, configOpts.PulsarCluster); err != nil {
78+
return fmt.Errorf("failed to set StreamNative Cloud context: %w", err)
79+
}
80+
}
81+
7282
stdioServer := server.NewStdioServer(mcpServer.MCPServer)
7383
stdioServer.SetErrorLogger(stdLogger)
7484

pkg/config/apiclient.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,28 @@ func NewSNCloudSessionFromOptions(options *Options) (*Session, error) {
155155
return session, nil
156156
}
157157

158+
// SetPulsarClusterContext updates the StreamNative Cloud cluster binding for the session.
159+
func (s *Session) SetPulsarClusterContext(instance, cluster string) {
160+
s.mutex.Lock()
161+
defer s.mutex.Unlock()
162+
163+
s.Ctx.PulsarInstance = instance
164+
s.Ctx.PulsarCluster = cluster
165+
}
166+
167+
// ResetPulsarClusterContext clears the StreamNative Cloud cluster binding for the session.
168+
func (s *Session) ResetPulsarClusterContext() {
169+
s.SetPulsarClusterContext("", "")
170+
}
171+
172+
// GetPulsarClusterContext returns the current StreamNative Cloud cluster binding.
173+
func (s *Session) GetPulsarClusterContext() (string, string) {
174+
s.mutex.RLock()
175+
defer s.mutex.RUnlock()
176+
177+
return s.Ctx.PulsarInstance, s.Ctx.PulsarCluster
178+
}
179+
158180
// initializeTokenRefresher initializes the token refresher for the session
159181
func (s *Session) initializeTokenRefresher() error {
160182
s.mutex.Lock()

pkg/kafka/connection.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) {
149149

150150
// SetKafkaContext initializes Kafka clients using the provided context.
151151
func (s *Session) SetKafkaContext(ctx KafkaContext) error {
152+
s.mutex.Lock()
153+
defer s.mutex.Unlock()
154+
152155
s.Ctx = ctx
153156
kc := &s.Ctx
154157
var err error
@@ -209,11 +212,32 @@ func (s *Session) SetKafkaContext(ctx KafkaContext) error {
209212
return nil
210213
}
211214

215+
// ResetKafkaContext clears the current Kafka context and closes the data client.
216+
func (s *Session) ResetKafkaContext() {
217+
s.mutex.Lock()
218+
defer s.mutex.Unlock()
219+
220+
if s.Client != nil {
221+
s.Client.Close()
222+
}
223+
224+
s.Ctx = KafkaContext{}
225+
s.Client = nil
226+
s.AdminClient = nil
227+
s.SchemaRegistryClient = nil
228+
s.ConnectClient = nil
229+
s.Options = nil
230+
}
231+
212232
// GetClient returns a Kafka client with optional overrides.
213233
func (s *Session) GetClient(opts ...kgo.Opt) (*kgo.Client, error) {
214234
s.mutex.Lock()
215235
defer s.mutex.Unlock()
216236

237+
if s.Ctx.BootstrapServers == "" {
238+
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
239+
}
240+
217241
if len(opts) > 0 {
218242
//nolint:gocritic
219243
clientOpts := append(s.Options, opts...)
@@ -240,6 +264,10 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) {
240264
s.mutex.Lock()
241265
defer s.mutex.Unlock()
242266

267+
if s.Ctx.BootstrapServers == "" {
268+
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
269+
}
270+
243271
if s.AdminClient == nil {
244272
if s.Client == nil {
245273
var err error
@@ -256,13 +284,16 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) {
256284

257285
// GetSchemaRegistryClient returns the schema registry client.
258286
func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) {
287+
s.mutex.Lock()
288+
defer s.mutex.Unlock()
289+
290+
if s.Ctx.BootstrapServers == "" {
291+
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
292+
}
259293
if s.Ctx.SchemaRegistryURL == "" {
260294
return nil, fmt.Errorf("schema registry not enabled on the current context")
261295
}
262296

263-
s.mutex.Lock()
264-
defer s.mutex.Unlock()
265-
266297
if s.SchemaRegistryClient == nil {
267298
SrOpts := []sr.ClientOpt{}
268299
SrOpts = append(SrOpts, sr.URLs(s.Ctx.SchemaRegistryURL))
@@ -285,13 +316,16 @@ func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) {
285316

286317
// GetConnectClient returns the Kafka Connect client.
287318
func (s *Session) GetConnectClient() (Connect, error) {
319+
s.mutex.Lock()
320+
defer s.mutex.Unlock()
321+
322+
if s.Ctx.BootstrapServers == "" {
323+
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
324+
}
288325
if s.Ctx.ConnectURL == "" {
289326
return nil, fmt.Errorf("kafka connect not enabled on the current context")
290327
}
291328

292-
s.mutex.Lock()
293-
defer s.mutex.Unlock()
294-
295329
if s.ConnectClient == nil {
296330
var err error
297331
s.ConnectClient, err = NewConnect(&s.Ctx)

pkg/kafka/connection_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2025 StreamNative
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafka
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
const contextNotSetErr = "err: ContextNotSetErr: Please set the cluster context first"
24+
25+
func TestSessionGetKafkaClientsRequireContext(t *testing.T) {
26+
session := &Session{}
27+
28+
_, err := session.GetClient()
29+
require.EqualError(t, err, contextNotSetErr)
30+
31+
_, err = session.GetAdminClient()
32+
require.EqualError(t, err, contextNotSetErr)
33+
34+
_, err = session.GetSchemaRegistryClient()
35+
require.EqualError(t, err, contextNotSetErr)
36+
37+
_, err = session.GetConnectClient()
38+
require.EqualError(t, err, contextNotSetErr)
39+
}
40+
41+
func TestSessionResetKafkaContextClearsContextAndClients(t *testing.T) {
42+
session := &Session{
43+
Ctx: KafkaContext{
44+
BootstrapServers: "localhost:9092",
45+
SchemaRegistryURL: "http://localhost:8081",
46+
ConnectURL: "http://localhost:8083",
47+
},
48+
}
49+
50+
session.ResetKafkaContext()
51+
52+
require.Equal(t, KafkaContext{}, session.Ctx)
53+
require.Nil(t, session.Client)
54+
require.Nil(t, session.AdminClient)
55+
require.Nil(t, session.SchemaRegistryClient)
56+
require.Nil(t, session.ConnectClient)
57+
require.Nil(t, session.Options)
58+
59+
_, err := session.GetAdminClient()
60+
require.EqualError(t, err, contextNotSetErr)
61+
}

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
7373
// For example: stop the manager, send alerts, implement backoff strategies
7474
}
7575

76-
if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {
76+
instance, cluster := s.SNCloudSession.GetPulsarClusterContext()
77+
if s.SNCloudSession.Ctx.Organization == "" || instance == "" || cluster == "" {
7778
log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set")
7879
return
7980
}

pkg/mcp/sncontext_tools.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@ func RegisterContextTools(s *server.MCPServer, features []string, skipContextToo
4949
mcp.Description("The name of the pulsar cluster to use"),
5050
),
5151
)
52-
// Skip registering context tools if context is already provided
52+
resetContextTool := mcp.NewTool("sncloud_context_reset",
53+
mcp.WithDescription("Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection; use `sncloud_context_use_cluster` before calling cluster-specific tools again."),
54+
)
55+
// Skip registering context mutation tools if context is already provided
5356
if !skipContextTools {
5457
s.AddTool(setContextTool, handleSetContext)
58+
s.AddTool(resetContextTool, handleResetContext)
5559
}
5660

5761
// Add available-contexts tool
@@ -112,6 +116,15 @@ func handleSetContext(ctx context.Context, request mcp.CallToolRequest) (*mcp.Ca
112116
return mcp.NewToolResultText("StreamNative Cloud context set successfully"), nil
113117
}
114118

119+
// handleResetContext handles the reset-context tool request
120+
func handleResetContext(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) {
121+
if err := ResetContext(ctx); err != nil {
122+
return mcp.NewToolResultError(fmt.Sprintf("Failed to reset context: %v", err)), nil
123+
}
124+
125+
return mcp.NewToolResultText("StreamNative Cloud context reset successfully"), nil
126+
}
127+
115128
func handleAvailableContexts(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) {
116129
promptResponse, err := buildSNCloudContextClusterPromptResult(ctx)
117130
if err != nil || promptResponse == nil {

pkg/mcp/sncontext_tools_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2025 StreamNative
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package mcp
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
mcpgo "github.com/mark3labs/mcp-go/mcp"
22+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
23+
"github.com/streamnative/streamnative-mcp-server/pkg/config"
24+
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
25+
pulsarsession "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
26+
"github.com/stretchr/testify/require"
27+
)
28+
29+
func TestHandleResetContextClearsSNCloudClusterSessions(t *testing.T) {
30+
snSession, err := config.NewSNCloudSession(config.SNCloudContext{
31+
JWTToken: "token",
32+
APIURL: "https://api.example.com",
33+
LogAPIURL: "https://logs.example.com",
34+
Organization: "org",
35+
PulsarInstance: "instance-a",
36+
PulsarCluster: "cluster-a",
37+
})
38+
require.NoError(t, err)
39+
40+
pulsarSession := &pulsarsession.Session{
41+
Ctx: pulsarsession.PulsarContext{
42+
ServiceURL: "pulsar://pulsar.example.com:6650",
43+
WebServiceURL: "https://pulsar.example.com",
44+
Token: "token",
45+
},
46+
PulsarCtlConfig: &cmdutils.ClusterConfig{WebServiceURL: "https://pulsar.example.com"},
47+
}
48+
kafkaSession := &kafka.Session{
49+
Ctx: kafka.KafkaContext{
50+
BootstrapServers: "kafka.example.com:9093",
51+
SchemaRegistryURL: "https://kafka.example.com/kafka",
52+
ConnectURL: "https://api.example.com/admin/kafkaconnect/",
53+
},
54+
}
55+
56+
ctx := context.Background()
57+
ctx = WithSNCloudSession(ctx, snSession)
58+
ctx = WithPulsarSession(ctx, pulsarSession)
59+
ctx = WithKafkaSession(ctx, kafkaSession)
60+
61+
result, err := handleResetContext(ctx, mcpgo.CallToolRequest{})
62+
require.NoError(t, err)
63+
require.False(t, result.IsError)
64+
65+
require.Empty(t, snSession.Ctx.PulsarInstance)
66+
require.Empty(t, snSession.Ctx.PulsarCluster)
67+
require.Equal(t, pulsarsession.PulsarContext{}, pulsarSession.Ctx)
68+
require.Equal(t, kafka.KafkaContext{}, kafkaSession.Ctx)
69+
70+
_, err = pulsarSession.GetAdminClient()
71+
require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first")
72+
73+
_, err = kafkaSession.GetAdminClient()
74+
require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first")
75+
}

0 commit comments

Comments
 (0)