Skip to content

Commit 6c4c506

Browse files
authored
Remove max replicas, auth and secret configuration (#162)
* Remove max replicas to avoiding create hpa * Fixed auth and secret configuration * Support configuration `extraDependenciesDir ` * Support configure `typeClassName`
1 parent 1958edd commit 6c4c506

9 files changed

Lines changed: 101 additions & 28 deletions

File tree

mesh-worker-service/src/main/java/io/functionmesh/compute/models/FunctionMeshConnectorDefinition.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public class FunctionMeshConnectorDefinition extends ConnectorDefinition {
5757
*/
5858
private String imageTag;
5959

60+
/**
61+
* Type name of the connector or function
62+
* If not set, the default value '[B' will be used
63+
*/
64+
private String typeClassName;
65+
6066
public String toFullImageURL() {
6167
return String.format("%s%s:%s", imageRegistry != null ? imageRegistry : DEFAULT_REGISTRY,
6268
imageRepository, imageTag != null ? imageTag : version);

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package io.functionmesh.compute.rest.api;
2020

2121
import com.google.common.collect.Maps;
22+
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecJava;
2223
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPod;
2324
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumeMounts;
2425
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumes;
@@ -28,6 +29,7 @@
2829
import io.functionmesh.compute.util.KubernetesUtils;
2930
import lombok.extern.slf4j.Slf4j;
3031
import okhttp3.Call;
32+
import org.apache.commons.lang.StringUtils;
3133
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
3234
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
3335
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -308,14 +310,25 @@ private void upsertFunction(final String tenant,
308310
List<V1alpha1FunctionSpecPodVolumeMounts> volumeMountsList = (List<V1alpha1FunctionSpecPodVolumeMounts>) volumeMounts;
309311
v1alpha1Function.getSpec().setVolumeMounts(volumeMountsList);
310312
}
311-
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
312-
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
313-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
314-
v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
315-
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
316-
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
317-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
318-
v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
313+
if (functionsWorkerServiceCustomConfigs.get("extraDependenciesDir") != null) {
314+
V1alpha1FunctionSpecJava v1alpha1FunctionSpecJava = new V1alpha1FunctionSpecJava();
315+
v1alpha1FunctionSpecJava.setExtraDependenciesDir(
316+
(String)functionsWorkerServiceCustomConfigs.get("extraDependenciesDir"));
317+
v1alpha1Function.getSpec().setJava(v1alpha1FunctionSpecJava);
318+
}
319+
if (!StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationPlugin())
320+
&& !StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationParameters())) {
321+
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
322+
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
323+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
324+
v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
325+
}
326+
if (worker().getWorkerConfig().getTlsEnabled()) {
327+
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
328+
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
329+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
330+
v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
331+
}
319332
} catch (Exception e) {
320333
log.error("Error create or update auth or tls secret for {} {}/{}/{}",
321334
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.collect.Maps;
2222
import io.functionmesh.compute.sinks.models.V1alpha1Sink;
23+
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecJava;
2324
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPod;
2425
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumeMounts;
2526
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumes;
@@ -344,6 +345,11 @@ private void upsertSink(final String tenant,
344345
Map<String, Object> functionsWorkerServiceCustomConfigs = worker()
345346
.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
346347
Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
348+
if (functionsWorkerServiceCustomConfigs.get("extraDependenciesDir") != null) {
349+
V1alpha1SinkSpecJava v1alpha1SinkSpecJava = new V1alpha1SinkSpecJava();
350+
v1alpha1SinkSpecJava.setExtraDependenciesDir((String)functionsWorkerServiceCustomConfigs.get("extraDependenciesDir"));
351+
v1alpha1Sink.getSpec().setJava(v1alpha1SinkSpecJava);
352+
}
347353
if (volumes != null) {
348354
List<V1alpha1SinkSpecPodVolumes> volumesList = (List<V1alpha1SinkSpecPodVolumes>) volumes;
349355
v1alpha1Sink.getSpec().getPod().setVolumes(volumesList);
@@ -353,14 +359,19 @@ private void upsertSink(final String tenant,
353359
List<V1alpha1SinkSpecPodVolumeMounts> volumeMountsList = (List<V1alpha1SinkSpecPodVolumeMounts>) volumeMounts;
354360
v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList);
355361
}
356-
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
357-
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
358-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
359-
v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
360-
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
361-
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
362-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
363-
v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
362+
if (!StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationPlugin())
363+
&& !StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationParameters())) {
364+
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
365+
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
366+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
367+
v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
368+
}
369+
if (worker().getWorkerConfig().getTlsEnabled()) {
370+
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
371+
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
372+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
373+
v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
374+
}
364375
} catch (Exception e) {
365376
log.error("Error create or update auth or tls secret data for {} {}/{}/{}",
366377
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package io.functionmesh.compute.rest.api;
2020

2121
import com.google.common.collect.Maps;
22+
import io.functionmesh.compute.sources.models.V1alpha1SourceSpecJava;
2223
import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPod;
2324
import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumeMounts;
2425
import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumes;
@@ -318,19 +319,30 @@ private void upsertSource(final String tenant,
318319
List<V1alpha1SourceSpecPodVolumes> volumesList = (List<V1alpha1SourceSpecPodVolumes>) volumes;
319320
v1alpha1Source.getSpec().getPod().setVolumes(volumesList);
320321
}
322+
if (functionsWorkerServiceCustomConfigs.get("extraDependenciesDir") != null) {
323+
V1alpha1SourceSpecJava v1alpha1SourceSpecJava = new V1alpha1SourceSpecJava();
324+
v1alpha1SourceSpecJava.setExtraDependenciesDir(
325+
(String)functionsWorkerServiceCustomConfigs.get("extraDependenciesDir"));
326+
v1alpha1Source.getSpec().setJava(v1alpha1SourceSpecJava);
327+
}
321328
Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
322329
if (volumeMounts != null) {
323330
List<V1alpha1SourceSpecPodVolumeMounts> volumeMountsList = (List<V1alpha1SourceSpecPodVolumeMounts>) volumeMounts;
324331
v1alpha1Source.getSpec().setVolumeMounts(volumeMountsList);
325332
}
326-
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
327-
v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
328-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
329-
v1alpha1Source.getSpec().getPulsar().setAuthSecret(authSecretName);
330-
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
331-
v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
332-
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
333-
v1alpha1Source.getSpec().getPulsar().setTlsSecret(tlsSecretName);
333+
if (!StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationPlugin())
334+
&& !StringUtils.isEmpty(worker().getWorkerConfig().getBrokerClientAuthenticationParameters())) {
335+
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
336+
v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
337+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
338+
v1alpha1Source.getSpec().getPulsar().setAuthSecret(authSecretName);
339+
}
340+
if (worker().getWorkerConfig().getTlsEnabled()) {
341+
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
342+
v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
343+
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
344+
v1alpha1Source.getSpec().getPulsar().setTlsSecret(tlsSecretName);
345+
}
334346
} catch (Exception e) {
335347
log.error("Error create or update auth or tls secret for {} {}/{}/{}",
336348
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);

mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k
200200
v1alpha1FunctionSpec.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
201201

202202
v1alpha1FunctionSpec.setReplicas(functionDetails.getParallelism());
203-
v1alpha1FunctionSpec.setMaxReplicas(functionDetails.getParallelism());
204203

205204
v1alpha1FunctionSpec.setLogTopic(functionConfig.getLogTopic());
206205

mesh-worker-service/src/main/java/io/functionmesh/compute/util/KubernetesUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public static String upsertSecret(
176176
.run();
177177

178178
if (!success.get()) {
179-
throw new RuntimeException(String.format("Failed to create authentication secret for %s %s-%s/%s/%s",
179+
throw new RuntimeException(String.format("Failed to create secret for %s %s-%s/%s/%s",
180180
type, cluster, tenant, namespace, name));
181181
}
182182

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String
158158

159159
if (Strings.isNotEmpty(customRuntimeOptions.getInputTypeClassName())) {
160160
v1alpha1SinkSpecInput.setTypeClassName(customRuntimeOptions.getInputTypeClassName());
161+
} else {
162+
if (connectorsManager == null) {
163+
v1alpha1SinkSpecInput.setTypeClassName("[B");
164+
} else {
165+
FunctionMeshConnectorDefinition functionMeshConnectorDefinition =
166+
connectorsManager.getConnectorDefinition(sinkConfig.getArchive());
167+
if (functionMeshConnectorDefinition == null) {
168+
v1alpha1SinkSpecInput.setTypeClassName("[B");
169+
} else {
170+
if (functionMeshConnectorDefinition.getTypeClassName() == null) {
171+
v1alpha1SinkSpecInput.setTypeClassName("[B");
172+
} else {
173+
v1alpha1SinkSpecInput.setTypeClassName(functionMeshConnectorDefinition.getTypeClassName());
174+
}
175+
}
176+
}
161177
}
162178

163179
v1alpha1SinkSpecInput.setTopics(new ArrayList<>(sinkConfig.getInputs()));
@@ -187,7 +203,6 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String
187203
}
188204

189205
v1alpha1SinkSpec.setReplicas(functionDetails.getParallelism());
190-
v1alpha1SinkSpec.setMaxReplicas(functionDetails.getParallelism());
191206

192207
double cpu = sinkConfig.getResources() != null &&
193208
sinkConfig.getResources().getCpu() != 0 ? sinkConfig.getResources().getCpu() : 1;

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.functionmesh.compute.models.CustomRuntimeOptions;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.logging.log4j.util.Strings;
38+
import org.apache.pulsar.common.functions.FunctionDefinition;
3839
import org.apache.pulsar.common.functions.ProducerConfig;
3940
import org.apache.pulsar.common.functions.Resources;
4041
import org.apache.pulsar.common.io.SourceConfig;
@@ -163,6 +164,22 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
163164

164165
if (Strings.isNotEmpty(customRuntimeOptions.getOutputTypeClassName())) {
165166
v1alpha1SourceSpecOutput.setTypeClassName(customRuntimeOptions.getOutputTypeClassName());
167+
} else {
168+
if (connectorsManager == null) {
169+
v1alpha1SourceSpecOutput.setTypeClassName("[B");
170+
} else {
171+
FunctionMeshConnectorDefinition functionMeshConnectorDefinition =
172+
connectorsManager.getConnectorDefinition(sourceConfig.getArchive());
173+
if (functionMeshConnectorDefinition == null) {
174+
v1alpha1SourceSpecOutput.setTypeClassName("[B");
175+
} else {
176+
if (functionMeshConnectorDefinition.getTypeClassName() == null) {
177+
v1alpha1SourceSpecOutput.setTypeClassName("[B");
178+
} else {
179+
v1alpha1SourceSpecOutput.setTypeClassName(functionMeshConnectorDefinition.getTypeClassName());
180+
}
181+
}
182+
}
166183
}
167184

168185
v1alpha1SourceSpec.setOutput(v1alpha1SourceSpecOutput);
@@ -172,7 +189,6 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
172189
}
173190

174191
v1alpha1SourceSpec.setReplicas(functionDetails.getParallelism());
175-
v1alpha1SourceSpec.setMaxReplicas(functionDetails.getParallelism());
176192

177193
double cpu = sourceConfig.getResources() != null && sourceConfig.getResources().getCpu() != 0 ? sourceConfig.getResources().getCpu() : 1;
178194
long ramRequest = sourceConfig.getResources() != null && sourceConfig.getResources().getRam() != 0 ? sourceConfig.getResources().getRam() : 1073741824;

mesh-worker-service/src/main/resources/functions_worker.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,4 @@ functionsWorkerServiceCustomConfigs:
254254
secret:
255255
defaultMode: 420
256256
secretName: pulsarcluster-data
257+
extraDependenciesDir: /pulsar/lib/*

0 commit comments

Comments
 (0)