@@ -24,6 +24,7 @@ import (
2424 "net/http"
2525 "net/url"
2626 "os"
27+ "sort"
2728 "strings"
2829 "sync"
2930 "time"
@@ -142,9 +143,8 @@ func run(ctx context.Context, cfg config) error {
142143 functionOutputTopic := fmt .Sprintf ("persistent://%s/function-output-%d" , namespace , suffix )
143144 functionName := fmt .Sprintf ("echo-%d" , suffix )
144145 sinkInputTopic := fmt .Sprintf ("persistent://%s/sink-input-%d" , namespace , suffix )
145- sinkName := fmt .Sprintf ("file-sink-%d" , suffix )
146- sinkPath := fmt .Sprintf ("/tmp/snmcp-e2e-sink-%d" , suffix )
147- sinkPathUpdated := fmt .Sprintf ("/tmp/snmcp-e2e-sink-updated-%d" , suffix )
146+ sinkName := fmt .Sprintf ("e2e-sink-%d" , suffix )
147+ sinkParallelismUpdated := 2
148148
149149 result , err := callTool (ctx , adminClient , "pulsar_admin_tenant" , map [string ]any {
150150 "resource" : "tenant" ,
@@ -343,38 +343,34 @@ func run(ctx context.Context, cfg config) error {
343343 if err != nil {
344344 return err
345345 }
346- if ! hasBuiltInSink (builtInSinks , "file" ) {
347- return fmt .Errorf ("built-in sink 'file' not available" )
346+ sinkType , err := selectSinkType (builtInSinks , []string {"data-generator" , "batch-data-generator" })
347+ if err != nil {
348+ return err
348349 }
349350
350351 result , err = callTool (ctx , adminClient , "pulsar_admin_sinks" , map [string ]any {
351352 "operation" : "create" ,
352353 "tenant" : tenant ,
353354 "namespace" : namespaceName ,
354355 "name" : sinkName ,
355- "sink-type" : "file" ,
356+ "sink-type" : sinkType ,
356357 "inputs" : []string {sinkInputTopic },
357- "sink-config" : map [string ]any {
358- "path" : sinkPath ,
359- },
360358 })
361359 if err := requireToolOK (result , err , "pulsar_admin_sinks create" ); err != nil {
362360 return err
363361 }
364362
365- if err := waitForSinkRunning (ctx , adminClient , tenant , namespaceName , sinkName , 60 * time . Second ); err != nil {
363+ if _ , err := getSinkStatus (ctx , adminClient , tenant , namespaceName , sinkName ); err != nil {
366364 return err
367365 }
368366
369367 result , err = callTool (ctx , adminClient , "pulsar_admin_sinks" , map [string ]any {
370- "operation" : "update" ,
371- "tenant" : tenant ,
372- "namespace" : namespaceName ,
373- "name" : sinkName ,
374- "sink-type" : "file" ,
375- "sink-config" : map [string ]any {
376- "path" : sinkPathUpdated ,
377- },
368+ "operation" : "update" ,
369+ "tenant" : tenant ,
370+ "namespace" : namespaceName ,
371+ "name" : sinkName ,
372+ "sink-type" : sinkType ,
373+ "parallelism" : float64 (sinkParallelismUpdated ),
378374 })
379375 if err := requireToolOK (result , err , "pulsar_admin_sinks update" ); err != nil {
380376 return err
@@ -389,7 +385,7 @@ func run(ctx context.Context, cfg config) error {
389385 if err := requireToolOK (result , err , "pulsar_admin_sinks get" ); err != nil {
390386 return err
391387 }
392- if err := assertSinkConfigPath (firstText (result ), sinkPathUpdated ); err != nil {
388+ if err := assertSinkParallelism (firstText (result ), sinkParallelismUpdated ); err != nil {
393389 return err
394390 }
395391
@@ -737,13 +733,22 @@ func listBuiltInSinks(ctx context.Context, c *client.Client) ([]connectorDefinit
737733 return sinks , nil
738734}
739735
740- func hasBuiltInSink (definitions []connectorDefinition , name string ) bool {
736+ func selectSinkType (definitions []connectorDefinition , preferred []string ) (string , error ) {
737+ available := make (map [string ]struct {}, len (definitions ))
741738 for _ , definition := range definitions {
742- if definition .Name == name {
743- return true
739+ available [definition .Name ] = struct {}{}
740+ }
741+ for _ , name := range preferred {
742+ if _ , ok := available [name ]; ok {
743+ return name , nil
744744 }
745745 }
746- return false
746+ names := make ([]string , 0 , len (definitions ))
747+ for name := range available {
748+ names = append (names , name )
749+ }
750+ sort .Strings (names )
751+ return "" , fmt .Errorf ("no supported sink type available; found: %s" , strings .Join (names , ", " ))
747752}
748753
749754type sinkStatus struct {
@@ -762,22 +767,6 @@ type sinkInstanceStatusData struct {
762767 Err string `json:"error"`
763768}
764769
765- func waitForSinkRunning (ctx context.Context , c * client.Client , tenant , namespace , name string , timeout time.Duration ) error {
766- deadline := time .Now ().Add (timeout )
767- for time .Now ().Before (deadline ) {
768- status , err := getSinkStatus (ctx , c , tenant , namespace , name )
769- if err == nil && allSinkInstancesRunning (status ) {
770- return nil
771- }
772- select {
773- case <- ctx .Done ():
774- return ctx .Err ()
775- case <- time .After (2 * time .Second ):
776- }
777- }
778- return fmt .Errorf ("sink %s did not reach running state within %s" , name , timeout .String ())
779- }
780-
781770func getSinkStatus (ctx context.Context , c * client.Client , tenant , namespace , name string ) (sinkStatus , error ) {
782771 result , err := callTool (ctx , c , "pulsar_admin_sinks" , map [string ]any {
783772 "operation" : "status" ,
@@ -799,40 +788,21 @@ func getSinkStatus(ctx context.Context, c *client.Client, tenant, namespace, nam
799788 return status , nil
800789}
801790
802- func allSinkInstancesRunning (status sinkStatus ) bool {
803- if status .NumInstances == 0 || status .NumRunning < status .NumInstances {
804- return false
805- }
806- for _ , instance := range status .Instances {
807- if ! instance .Status .Running {
808- return false
809- }
810- }
811- return true
812- }
813-
814791type sinkConfig struct {
815- Configs map [string ]interface {} `json:"configs"`
792+ Configs map [string ]interface {} `json:"configs"`
793+ Parallelism int `json:"parallelism"`
816794}
817795
818- func assertSinkConfigPath (raw string , expected string ) error {
796+ func assertSinkParallelism (raw string , expected int ) error {
819797 if raw == "" {
820798 return errors .New ("empty sink config result" )
821799 }
822800 var config sinkConfig
823801 if err := json .Unmarshal ([]byte (raw ), & config ); err != nil {
824802 return fmt .Errorf ("failed to parse sink config: %w" , err )
825803 }
826- if config .Configs == nil {
827- return errors .New ("missing configs in sink config" )
828- }
829- value , ok := config .Configs ["path" ]
830- if ! ok {
831- return errors .New ("missing configs.path in sink config" )
832- }
833- actual := fmt .Sprintf ("%v" , value )
834- if actual != expected {
835- return fmt .Errorf ("unexpected sink config path: %s" , actual )
804+ if config .Parallelism != expected {
805+ return fmt .Errorf ("unexpected sink parallelism: %d" , config .Parallelism )
836806 }
837807 return nil
838808}
0 commit comments