Skip to content

Commit 3034a05

Browse files
committed
feat(sinks): make tenant and namespace optional with defaults
1 parent a231e67 commit 3034a05

4 files changed

Lines changed: 706 additions & 249 deletions

File tree

cmd/snmcp-e2e/main.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ func run(ctx context.Context, cfg config) error {
141141
functionInputTopic := fmt.Sprintf("persistent://%s/function-input-%d", namespace, suffix)
142142
functionOutputTopic := fmt.Sprintf("persistent://%s/function-output-%d", namespace, suffix)
143143
functionName := fmt.Sprintf("echo-%d", suffix)
144+
sinkInputTopic := fmt.Sprintf("persistent://%s/sink-input-%d", namespace, suffix)
145+
sinkName := fmt.Sprintf("file-sink-%d", suffix)
146+
sinkPath := fmt.Sprintf("/tmp/snmcp-e2e-sink-%d", suffix)
147+
sinkPathUpdated := fmt.Sprintf("/tmp/snmcp-e2e-sink-updated-%d", suffix)
144148

145149
result, err := callTool(ctx, adminClient, "pulsar_admin_tenant", map[string]any{
146150
"resource": "tenant",
@@ -325,6 +329,80 @@ func run(ctx context.Context, cfg config) error {
325329
return err
326330
}
327331

332+
result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
333+
"resource": "topic",
334+
"operation": "create",
335+
"topic": sinkInputTopic,
336+
"partitions": float64(0),
337+
})
338+
if err := requireToolOK(result, err, "pulsar_admin_topic create sink input"); err != nil {
339+
return err
340+
}
341+
342+
builtInSinks, err := listBuiltInSinks(ctx, adminClient)
343+
if err != nil {
344+
return err
345+
}
346+
if !hasBuiltInSink(builtInSinks, "file") {
347+
return fmt.Errorf("built-in sink 'file' not available")
348+
}
349+
350+
result, err = callTool(ctx, adminClient, "pulsar_admin_sinks", map[string]any{
351+
"operation": "create",
352+
"tenant": tenant,
353+
"namespace": namespaceName,
354+
"name": sinkName,
355+
"sink-type": "file",
356+
"inputs": []string{sinkInputTopic},
357+
"sink-config": map[string]any{
358+
"path": sinkPath,
359+
},
360+
})
361+
if err := requireToolOK(result, err, "pulsar_admin_sinks create"); err != nil {
362+
return err
363+
}
364+
365+
if err := waitForSinkRunning(ctx, adminClient, tenant, namespaceName, sinkName, 60*time.Second); err != nil {
366+
return err
367+
}
368+
369+
result, err = callTool(ctx, adminClient, "pulsar_admin_sinks", map[string]any{
370+
"operation": "update",
371+
"tenant": tenant,
372+
"namespace": namespaceName,
373+
"name": sinkName,
374+
"sink-type": "file",
375+
"sink-config": map[string]any{
376+
"path": sinkPathUpdated,
377+
},
378+
})
379+
if err := requireToolOK(result, err, "pulsar_admin_sinks update"); err != nil {
380+
return err
381+
}
382+
383+
result, err = callTool(ctx, adminClient, "pulsar_admin_sinks", map[string]any{
384+
"operation": "get",
385+
"tenant": tenant,
386+
"namespace": namespaceName,
387+
"name": sinkName,
388+
})
389+
if err := requireToolOK(result, err, "pulsar_admin_sinks get"); err != nil {
390+
return err
391+
}
392+
if err := assertSinkConfigPath(firstText(result), sinkPathUpdated); err != nil {
393+
return err
394+
}
395+
396+
result, err = callTool(ctx, adminClient, "pulsar_admin_sinks", map[string]any{
397+
"operation": "delete",
398+
"tenant": tenant,
399+
"namespace": namespaceName,
400+
"name": sinkName,
401+
})
402+
if err := requireToolOK(result, err, "pulsar_admin_sinks delete"); err != nil {
403+
return err
404+
}
405+
328406
result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
329407
"resource": "topic",
330408
"operation": "create",
@@ -637,6 +715,128 @@ func assertFunctionUserConfig(raw string, key string) error {
637715
return nil
638716
}
639717

718+
type connectorDefinition struct {
719+
Name string `json:"name"`
720+
}
721+
722+
func listBuiltInSinks(ctx context.Context, c *client.Client) ([]connectorDefinition, error) {
723+
result, err := callTool(ctx, c, "pulsar_admin_sinks", map[string]any{
724+
"operation": "list-built-in",
725+
})
726+
if err := requireToolOK(result, err, "pulsar_admin_sinks list-built-in"); err != nil {
727+
return nil, err
728+
}
729+
raw := firstText(result)
730+
if raw == "" {
731+
return nil, errors.New("empty built-in sink list result")
732+
}
733+
var sinks []connectorDefinition
734+
if err := json.Unmarshal([]byte(raw), &sinks); err != nil {
735+
return nil, fmt.Errorf("failed to parse built-in sink list: %w", err)
736+
}
737+
return sinks, nil
738+
}
739+
740+
func hasBuiltInSink(definitions []connectorDefinition, name string) bool {
741+
for _, definition := range definitions {
742+
if definition.Name == name {
743+
return true
744+
}
745+
}
746+
return false
747+
}
748+
749+
type sinkStatus struct {
750+
NumInstances int `json:"numInstances"`
751+
NumRunning int `json:"numRunning"`
752+
Instances []sinkInstanceStatus `json:"instances"`
753+
}
754+
755+
type sinkInstanceStatus struct {
756+
InstanceID int `json:"instanceId"`
757+
Status sinkInstanceStatusData `json:"status"`
758+
}
759+
760+
type sinkInstanceStatusData struct {
761+
Running bool `json:"running"`
762+
Err string `json:"error"`
763+
}
764+
765+
func waitForSinkRunning(ctx context.Context, c *client.Client, tenant, namespace, name string, timeout time.Duration) error {
766+
deadline := time.Now().Add(timeout)
767+
for time.Now().Before(deadline) {
768+
status, err := getSinkStatus(ctx, c, tenant, namespace, name)
769+
if err == nil && allSinkInstancesRunning(status) {
770+
return nil
771+
}
772+
select {
773+
case <-ctx.Done():
774+
return ctx.Err()
775+
case <-time.After(2 * time.Second):
776+
}
777+
}
778+
return fmt.Errorf("sink %s did not reach running state within %s", name, timeout.String())
779+
}
780+
781+
func getSinkStatus(ctx context.Context, c *client.Client, tenant, namespace, name string) (sinkStatus, error) {
782+
result, err := callTool(ctx, c, "pulsar_admin_sinks", map[string]any{
783+
"operation": "status",
784+
"tenant": tenant,
785+
"namespace": namespace,
786+
"name": name,
787+
})
788+
if err := requireToolOK(result, err, "pulsar_admin_sinks status"); err != nil {
789+
return sinkStatus{}, err
790+
}
791+
raw := firstText(result)
792+
if raw == "" {
793+
return sinkStatus{}, errors.New("empty sink status result")
794+
}
795+
var status sinkStatus
796+
if err := json.Unmarshal([]byte(raw), &status); err != nil {
797+
return sinkStatus{}, fmt.Errorf("failed to parse sink status: %w", err)
798+
}
799+
return status, nil
800+
}
801+
802+
func allSinkInstancesRunning(status sinkStatus) bool {
803+
if status.NumInstances == 0 || status.NumRunning < status.NumInstances {
804+
return false
805+
}
806+
for _, instance := range status.Instances {
807+
if !instance.Status.Running {
808+
return false
809+
}
810+
}
811+
return true
812+
}
813+
814+
type sinkConfig struct {
815+
Configs map[string]interface{} `json:"configs"`
816+
}
817+
818+
func assertSinkConfigPath(raw string, expected string) error {
819+
if raw == "" {
820+
return errors.New("empty sink config result")
821+
}
822+
var config sinkConfig
823+
if err := json.Unmarshal([]byte(raw), &config); err != nil {
824+
return fmt.Errorf("failed to parse sink config: %w", err)
825+
}
826+
if config.Configs == nil {
827+
return errors.New("missing configs in sink config")
828+
}
829+
value, ok := config.Configs["path"]
830+
if !ok {
831+
return errors.New("missing configs.path in sink config")
832+
}
833+
actual := fmt.Sprintf("%v", value)
834+
if actual != expected {
835+
return fmt.Errorf("unexpected sink config path: %s", actual)
836+
}
837+
return nil
838+
}
839+
640840
type consumeResponse struct {
641841
MessagesConsumed int `json:"messages_consumed"`
642842
Messages []consumeMessage `json:"messages"`

docs/tools/pulsar_admin_sinks.md

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,60 +5,82 @@ Manage Apache Pulsar Sinks for data movement and integration. Pulsar Sinks are c
55
This tool provides complete lifecycle management for sink connectors:
66

77
- **list**: List all sinks in a namespace
8-
- `tenant` (string, required): The tenant name
9-
- `namespace` (string, required): The namespace name
8+
- `tenant` (string, optional): The tenant name (default: `public`)
9+
- `namespace` (string, optional): The namespace name (default: `default`)
1010

1111
- **get**: Get sink configuration
12-
- `tenant` (string, required): The tenant name
13-
- `namespace` (string, required): The namespace name
12+
- `tenant` (string, optional): The tenant name (default: `public`)
13+
- `namespace` (string, optional): The namespace name (default: `default`)
1414
- `name` (string, required): The sink name
1515

1616
- **status**: Get runtime status of a sink (instances, metrics)
17-
- `tenant` (string, required): The tenant name
18-
- `namespace` (string, required): The namespace name
17+
- `tenant` (string, optional): The tenant name (default: `public`)
18+
- `namespace` (string, optional): The namespace name (default: `default`)
1919
- `name` (string, required): The sink name
2020

2121
- **create**: Deploy a new sink connector
22-
- `tenant` (string, required): The tenant name
23-
- `namespace` (string, required): The namespace name
24-
- `name` (string, required): The sink name
22+
- `tenant` (string, optional): The tenant name (default: `public`)
23+
- `namespace` (string, optional): The namespace name (default: `default`)
24+
- `name` (string, required): The sink name (can be provided via `sink-config-file`)
2525
- Either `archive` or `sink-type` must be specified (but not both):
2626
- `archive` (string): Path to the archive file containing sink code
2727
- `sink-type` (string): Built-in connector type to use (e.g., 'jdbc', 'elastic-search', 'kafka')
2828
- Either `inputs` or `topics-pattern` must be specified:
2929
- `inputs` (array): The sink's input topics (array of strings)
3030
- `topics-pattern` (string): TopicsPattern to consume from topics matching the pattern (regex)
3131
- `subs-name` (string, optional): Pulsar subscription name for input topic consumer
32+
- `subs-position` (string, optional): Subscription position (`Latest` or `Earliest`)
33+
- `classname` (string, optional): Sink class name for custom archives
34+
- `processing-guarantees` (string, optional): Delivery semantics
35+
- `retain-ordering` (boolean, optional): Preserve message ordering
36+
- `retain-key-ordering` (boolean, optional): Preserve key ordering
37+
- `auto-ack` (boolean, optional): Auto-ack messages
38+
- `cleanup-subscription` (boolean, optional): Delete subscription on sink delete (default: true)
3239
- `parallelism` (number, optional): Number of instances to run concurrently (default: 1)
40+
- `cpu` / `ram` / `disk` (number, optional): Resource allocation per instance
41+
- `custom-serde-inputs` (object, optional): Map of input topics to SerDe class names
42+
- `custom-schema-inputs` (object, optional): Map of input topics to schema type/class
43+
- `input-specs` (object, optional): Map of input topics to consumer config
44+
- `max-redeliver-count` (number, optional): Max redeliver attempts
45+
- `dead-letter-topic` (string, optional): Dead letter topic
46+
- `timeout-ms` (number, optional): Processing timeout in milliseconds
47+
- `negative-ack-redelivery-delay-ms` (number, optional): Negative ack redelivery delay
48+
- `custom-runtime-options` (string, optional): Runtime customization options
49+
- `secrets` (object, optional): Secrets configuration map
50+
- `sink-config-file` (string, optional): Path to YAML sink config file
3351
- `sink-config` (object, optional): Connector-specific configuration parameters
52+
- `transform-function` (string, optional): Transform function
53+
- `transform-function-classname` (string, optional): Transform class name
54+
- `transform-function-config` (string, optional): Transform config
3455

3556
- **update**: Update an existing sink connector
36-
- `tenant` (string, required): The tenant name
37-
- `namespace` (string, required): The namespace name
38-
- `name` (string, required): The sink name
57+
- `tenant` (string, optional): The tenant name (default: `public`)
58+
- `namespace` (string, optional): The namespace name (default: `default`)
59+
- `name` (string, required): The sink name (can be provided via `sink-config-file`)
3960
- Parameters similar to `create` operation (all optional during update)
61+
- `update-auth-data` (boolean, optional): Update auth data during update
4062

4163
- **delete**: Delete a sink
42-
- `tenant` (string, required): The tenant name
43-
- `namespace` (string, required): The namespace name
64+
- `tenant` (string, optional): The tenant name (default: `public`)
65+
- `namespace` (string, optional): The namespace name (default: `default`)
4466
- `name` (string, required): The sink name
4567

4668
- **start**: Start a stopped sink
47-
- `tenant` (string, required): The tenant name
48-
- `namespace` (string, required): The namespace name
69+
- `tenant` (string, optional): The tenant name (default: `public`)
70+
- `namespace` (string, optional): The namespace name (default: `default`)
4971
- `name` (string, required): The sink name
5072

5173
- **stop**: Stop a running sink
52-
- `tenant` (string, required): The tenant name
53-
- `namespace` (string, required): The namespace name
74+
- `tenant` (string, optional): The tenant name (default: `public`)
75+
- `namespace` (string, optional): The namespace name (default: `default`)
5476
- `name` (string, required): The sink name
5577

5678
- **restart**: Restart a sink
57-
- `tenant` (string, required): The tenant name
58-
- `namespace` (string, required): The namespace name
79+
- `tenant` (string, optional): The tenant name (default: `public`)
80+
- `namespace` (string, optional): The namespace name (default: `default`)
5981
- `name` (string, required): The sink name
6082

6183
- **list-built-in**: List all built-in sink connectors available in the system
6284
- No parameters required
6385

64-
Built-in sink connectors are available for common systems like Kafka, JDBC, Elasticsearch, and cloud storage. Sinks follow the tenant/namespace/name hierarchy for organization and access control, can scale through parallelism configuration, and support configurable subscription types. Sinks require proper permissions to access their input topics.
86+
Built-in sink connectors are available for common systems like Kafka, JDBC, Elasticsearch, and cloud storage. Sinks follow the tenant/namespace/name hierarchy for organization and access control, can scale through parallelism configuration, and support configurable subscription types. Sinks require proper permissions to access their input topics.

0 commit comments

Comments
 (0)