2121import com .google .common .collect .Maps ;
2222import io .functionmesh .compute .sinks .models .V1alpha1Sink ;
2323import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPod ;
24+ import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPodVolumeMounts ;
25+ import io .functionmesh .compute .sinks .models .V1alpha1SinkSpecPodVolumes ;
2426import io .functionmesh .compute .util .KubernetesUtils ;
2527import io .functionmesh .compute .util .SinksUtil ;
2628import io .functionmesh .compute .MeshWorkerService ;
4042import org .apache .pulsar .functions .utils .ComponentTypeUtils ;
4143import org .apache .pulsar .functions .worker .service .api .Sinks ;
4244import org .glassfish .jersey .media .multipart .FormDataContentDisposition ;
43-
4445import javax .ws .rs .core .Response ;
4546import java .io .InputStream ;
4647import java .net .URI ;
@@ -101,8 +102,7 @@ public void registerSink(
101102 clientAuthenticationDataHttps ,
102103 ComponentTypeUtils .toString (componentType ));
103104 this .validateTenantIsExist (tenant , namespace , sinkName , clientRole );
104- V1alpha1Sink v1alpha1Sink ;
105- v1alpha1Sink =
105+ V1alpha1Sink v1alpha1Sink =
106106 SinksUtil .createV1alpha1SkinFromSinkConfig (
107107 kind ,
108108 group ,
@@ -112,6 +112,7 @@ public void registerSink(
112112 uploadedInputStream ,
113113 sinkConfig ,
114114 this .meshWorkerServiceSupplier .get ().getConnectorsManager ());
115+ // override namesapce by configuration
115116 v1alpha1Sink .getMetadata ().setNamespace (KubernetesUtils .getNamespace (worker ().getFactoryConfig ()));
116117 try {
117118 Map <String , String > customLabels = Maps .newHashMap ();
@@ -125,33 +126,7 @@ public void registerSink(
125126 }
126127 pod .setLabels (customLabels );
127128 v1alpha1Sink .getSpec ().setPod (pod );
128- if (worker ().getWorkerConfig ().isAuthenticationEnabled ()) {
129- Function .FunctionDetails .Builder functionDetailsBuilder = Function .FunctionDetails .newBuilder ();
130- functionDetailsBuilder .setTenant (tenant );
131- functionDetailsBuilder .setNamespace (namespace );
132- functionDetailsBuilder .setName (sinkName );
133- worker ().getAuthProvider ().ifPresent (functionAuthProvider -> {
134- if (clientAuthenticationDataHttps != null ) {
135- try {
136- String authSecretName = KubernetesUtils .upsertSecret (kind .toLowerCase (), "auth" ,
137- v1alpha1Sink .getSpec ().getClusterName (), tenant , namespace , sinkName ,
138- worker ().getWorkerConfig (), worker ().getCoreV1Api (), worker ().getFactoryConfig ());
139- v1alpha1Sink .getSpec ().getPulsar ().setAuthSecret (authSecretName );
140- String tlsSecretName = KubernetesUtils .upsertSecret (kind .toLowerCase (), "tls" ,
141- v1alpha1Sink .getSpec ().getClusterName (), tenant , namespace , sinkName ,
142- worker ().getWorkerConfig (), worker ().getCoreV1Api (), worker ().getFactoryConfig ());
143- v1alpha1Sink .getSpec ().getPulsar ().setTlsSecret (tlsSecretName );
144- } catch (Exception e ) {
145- log .error ("Error caching authentication data for {} {}/{}/{}" ,
146- ComponentTypeUtils .toString (componentType ), tenant , namespace , sinkName , e );
147-
148-
149- throw new RestException (Response .Status .INTERNAL_SERVER_ERROR , String .format ("Error caching authentication data for %s %s:- %s" ,
150- ComponentTypeUtils .toString (componentType ), sinkName , e .getMessage ()));
151- }
152- }
153- });
154- }
129+ this .upsertSink (tenant , namespace , sinkName , sinkConfig , v1alpha1Sink , clientAuthenticationDataHttps );
155130 Call call =
156131 worker ().getCustomObjectsApi ()
157132 .createNamespacedCustomObjectCall (
@@ -211,6 +186,7 @@ public void updateSink(
211186 sinkPkgUrl ,
212187 uploadedInputStream ,
213188 sinkConfig , this .meshWorkerServiceSupplier .get ().getConnectorsManager ());
189+ this .upsertSink (tenant , namespace , sinkName , sinkConfig , v1alpha1Sink , clientAuthenticationDataHttps );
214190 v1alpha1Sink .getMetadata ().setNamespace (KubernetesUtils .getNamespace (worker ().getFactoryConfig ()));
215191 v1alpha1Sink
216192 .getMetadata ()
@@ -355,4 +331,46 @@ public List<ConnectorDefinition> getSinkList() {
355331 public List <ConfigFieldDefinition > getSinkConfigDefinition (String name ) {
356332 return new ArrayList <>();
357333 }
334+
335+ private void upsertSink (final String tenant ,
336+ final String namespace ,
337+ final String sinkName ,
338+ final SinkConfig sinkConfig ,
339+ V1alpha1Sink v1alpha1Sink ,
340+ AuthenticationDataHttps clientAuthenticationDataHttps ) {
341+ if (worker ().getWorkerConfig ().isAuthenticationEnabled ()) {
342+ if (clientAuthenticationDataHttps != null ) {
343+ try {
344+ Map <String , Object > functionsWorkerServiceCustomConfigs = worker ()
345+ .getWorkerConfig ().getFunctionsWorkerServiceCustomConfigs ();
346+ Object volumes = functionsWorkerServiceCustomConfigs .get ("volumes" );
347+ if (volumes != null ) {
348+ List <V1alpha1SinkSpecPodVolumes > volumesList = (List <V1alpha1SinkSpecPodVolumes >) volumes ;
349+ v1alpha1Sink .getSpec ().getPod ().setVolumes (volumesList );
350+ }
351+ Object volumeMounts = functionsWorkerServiceCustomConfigs .get ("volumeMounts" );
352+ if (volumeMounts != null ) {
353+ List <V1alpha1SinkSpecPodVolumeMounts > volumeMountsList = (List <V1alpha1SinkSpecPodVolumeMounts >) volumeMounts ;
354+ v1alpha1Sink .getSpec ().setVolumeMounts (volumeMountsList );
355+ }
356+ String authSecretName = KubernetesUtils .upsertSecret (kind .toLowerCase (), "auth" ,
357+ v1alpha1Sink .getSpec ().getClusterName (), tenant , namespace , sinkName ,
358+ worker ().getWorkerConfig (), worker ().getCoreV1Api (), worker ().getFactoryConfig ());
359+ v1alpha1Sink .getSpec ().getPulsar ().setAuthSecret (authSecretName );
360+ String tlsSecretName = KubernetesUtils .upsertSecret (kind .toLowerCase (), "tls" ,
361+ v1alpha1Sink .getSpec ().getClusterName (), tenant , namespace , sinkName ,
362+ worker ().getWorkerConfig (), worker ().getCoreV1Api (), worker ().getFactoryConfig ());
363+ v1alpha1Sink .getSpec ().getPulsar ().setTlsSecret (tlsSecretName );
364+ } catch (Exception e ) {
365+ log .error ("Error create or update auth or tls secret data for {} {}/{}/{}" ,
366+ ComponentTypeUtils .toString (componentType ), tenant , namespace , sinkName , e );
367+
368+
369+ throw new RestException (Response .Status .INTERNAL_SERVER_ERROR ,
370+ String .format ("Error create or update auth or tls secret for %s %s:- %s" ,
371+ ComponentTypeUtils .toString (componentType ), sinkName , e .getMessage ()));
372+ }
373+ }
374+ }
375+ }
358376}
0 commit comments