Skip to content

Commit 0d78a7c

Browse files
authored
add packages url support in mesh worker service (#261)
* add packages url support in mesh worker service * fix CI * fix CI * fix github action * fix CI * address codacy * address codacy * address codacy
1 parent 3fb4001 commit 0d78a7c

8 files changed

Lines changed: 215 additions & 30 deletions

File tree

.ci/helm.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ function ci::verify_mesh_function() {
234234
function ci::print_function_log() {
235235
FUNCTION_NAME=$1
236236
${KUBECTL} describe pod -lname=${FUNCTION_NAME}
237+
sleep 120
237238
${KUBECTL} logs -lname=${FUNCTION_NAME} --all-containers=true
238239
}
239240

.github/workflows/project.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727

2828
- name: InstallKubebuilder
2929
run: |
30-
curl -L https://go.kubebuilder.io/dl/2.3.1/linux/amd64 | tar -xz -C /tmp/
30+
curl -L https://github.com/kubernetes-sigs/kubebuilder/releases/download/v2.3.1/kubebuilder_2.3.1_linux_amd64.tar.gz | tar -xz -C /tmp/
3131
sudo mv /tmp/kubebuilder_2.3.1_linux_amd64 /usr/local/kubebuilder
3232
export PATH=$PATH:/usr/local/kubebuilder/bin
3333

mesh-worker-service/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<version>v0.1.7-rc1</version>
3030

3131
<properties>
32-
<pulsar.version>2.8.0.7</pulsar.version>
32+
<pulsar.version>2.8.0.13</pulsar.version>
3333
<lombok.version>1.18.16</lombok.version>
3434
<log4j2.version>2.14.0</log4j2.version>
3535
<kubernetes-client.version>12.0.1</kubernetes-client.version>

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ public void registerFunction(final String tenant,
142142
functionName,
143143
functionPkgUrl,
144144
functionConfig,
145-
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
146-
cluster
145+
cluster,
146+
worker()
147147
);
148148
// override namespace by configuration file
149149
v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
@@ -211,8 +211,8 @@ public void updateFunction(final String tenant,
211211
functionName,
212212
functionPkgUrl,
213213
functionConfig,
214-
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
215-
cluster
214+
cluster,
215+
worker()
216216
);
217217
Call getCall = worker().getCustomObjectsApi().getNamespacedCustomObjectCall(
218218
group,
@@ -246,6 +246,7 @@ public void updateFunction(final String tenant,
246246
executeCall(replaceCall, V1alpha1Function.class);
247247
} catch (Exception e) {
248248
log.error("update {}/{}/{} function failed, error message: {}", tenant, namespace, functionName, e);
249+
e.printStackTrace();
249250
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
250251
}
251252
}
@@ -611,4 +612,5 @@ private void upsertFunction(final String tenant,
611612
}
612613
}
613614
}
615+
614616
}

mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java

Lines changed: 104 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package io.functionmesh.compute.util;
2020

2121
import com.google.gson.Gson;
22+
import io.functionmesh.compute.MeshWorkerService;
2223
import io.functionmesh.compute.functions.models.V1alpha1Function;
2324
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpec;
2425
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecGolang;
@@ -36,18 +37,24 @@
3637
import lombok.extern.slf4j.Slf4j;
3738
import org.apache.commons.lang3.StringUtils;
3839
import org.apache.logging.log4j.util.Strings;
40+
import org.apache.pulsar.client.admin.PulsarAdminException;
3941
import org.apache.pulsar.common.functions.ConsumerConfig;
4042
import org.apache.pulsar.common.functions.FunctionConfig;
4143
import org.apache.pulsar.common.functions.ProducerConfig;
4244
import org.apache.pulsar.common.functions.Resources;
45+
import org.apache.pulsar.common.functions.Utils;
4346
import org.apache.pulsar.common.policies.data.ExceptionInformation;
4447
import org.apache.pulsar.common.policies.data.FunctionStatus;
4548
import org.apache.pulsar.common.util.RestException;
4649
import org.apache.pulsar.functions.proto.Function;
4750
import org.apache.pulsar.functions.proto.InstanceCommunication;
51+
import org.apache.pulsar.functions.utils.FunctionCommon;
4852
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
4953

5054
import javax.ws.rs.core.Response;
55+
import java.io.File;
56+
import java.io.IOException;
57+
import java.nio.file.Files;
5158
import java.nio.file.Path;
5259
import java.nio.file.Paths;
5360
import java.util.ArrayList;
@@ -66,7 +73,8 @@ public class FunctionsUtil {
6673

6774
public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String kind, String group, String version
6875
, String functionName, String functionPkgUrl, FunctionConfig functionConfig
69-
, Map<String, Object> customConfigs, String cluster) {
76+
, String cluster, MeshWorkerService worker) {
77+
Map<String, Object> customConfigs = worker.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
7078
CustomRuntimeOptions customRuntimeOptions = CommonUtil.getCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
7179
String clusterName = CommonUtil.getClusterName(cluster, customRuntimeOptions);
7280

@@ -236,28 +244,70 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k
236244
// v1alpha1FunctionSpecPulsar.setAuthConfig(CommonUtil.getPulsarClusterAuthConfigMapName(clusterName));
237245
v1alpha1FunctionSpec.setPulsar(v1alpha1FunctionSpecPulsar);
238246

239-
String location = String.format("%s/%s/%s", functionConfig.getTenant(), functionConfig.getNamespace(),
240-
functionName);
241-
if (StringUtils.isNotEmpty(functionPkgUrl)) {
242-
location = functionPkgUrl;
247+
// TODO: dynamic file name to function CRD
248+
String fileName = "/pulsar/function-executable";
249+
boolean isPkgUrlProvided = StringUtils.isNotEmpty(functionPkgUrl);
250+
File componentPackageFile = null;
251+
try {
252+
if (isPkgUrlProvided) {
253+
if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
254+
componentPackageFile = downloadPackageFile(worker, functionPkgUrl);
255+
} else {
256+
log.warn("get unsupported function package url {}", functionPkgUrl);
257+
throw new IllegalArgumentException("Function Package url is not valid. supported url (function/sink/source)");
258+
}
259+
} else {
260+
// TODO: support upload JAR to bk
261+
throw new IllegalArgumentException("uploading package to mesh worker service is not supported yet.");
262+
}
263+
} catch (Exception e) {
264+
log.error("Invalid register function request {}: {}", functionName, e);
265+
e.printStackTrace();
266+
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
267+
}
268+
Class<?>[] typeArgs = null;
269+
if (componentPackageFile != null) {
270+
typeArgs = extractTypeArgs(functionConfig, componentPackageFile);
243271
}
244272
if (StringUtils.isNotEmpty(functionConfig.getJar())) {
245273
V1alpha1FunctionSpecJava v1alpha1FunctionSpecJava = new V1alpha1FunctionSpecJava();
246-
Path path = Paths.get(functionConfig.getJar());
247-
v1alpha1FunctionSpecJava.setJar(path.getFileName().toString());
248-
v1alpha1FunctionSpecJava.setJarLocation(location);
274+
v1alpha1FunctionSpecJava.setJar(fileName);
275+
if (isPkgUrlProvided) {
276+
v1alpha1FunctionSpecJava.setJarLocation(functionPkgUrl);
277+
}
278+
String extraDependenciesDir = "";
279+
if (StringUtils.isNotEmpty(worker.getFactoryConfig().getExtraFunctionDependenciesDir())) {
280+
if (Paths.get(worker.getFactoryConfig().getExtraFunctionDependenciesDir()).isAbsolute()) {
281+
extraDependenciesDir = worker.getFactoryConfig().getExtraFunctionDependenciesDir();
282+
} else {
283+
extraDependenciesDir = "/pulsar/" + worker.getFactoryConfig().getExtraFunctionDependenciesDir();
284+
}
285+
} else {
286+
extraDependenciesDir = "/pulsar/instances/deps";
287+
}
288+
v1alpha1FunctionSpecJava.setExtraDependenciesDir(extraDependenciesDir);
249289
v1alpha1FunctionSpec.setJava(v1alpha1FunctionSpecJava);
290+
if (typeArgs != null) {
291+
if (typeArgs.length == 2 && typeArgs[0] != null) {
292+
v1alpha1FunctionSpecInput.setTypeClassName(typeArgs[0].getName());
293+
}
294+
if (typeArgs.length == 2 && typeArgs[1] != null) {
295+
v1alpha1FunctionSpecOutput.setTypeClassName(typeArgs[1].getName());
296+
}
297+
}
250298
} else if (StringUtils.isNotEmpty(functionConfig.getPy())) {
251299
V1alpha1FunctionSpecPython v1alpha1FunctionSpecPython = new V1alpha1FunctionSpecPython();
252-
Path path = Paths.get(functionConfig.getPy());
253-
v1alpha1FunctionSpecPython.setPy(path.getFileName().toString());
254-
v1alpha1FunctionSpecPython.setPyLocation(location);
300+
v1alpha1FunctionSpecPython.setPy(fileName);
301+
if (isPkgUrlProvided) {
302+
v1alpha1FunctionSpecPython.setPyLocation(functionPkgUrl);
303+
}
255304
v1alpha1FunctionSpec.setPython(v1alpha1FunctionSpecPython);
256305
} else if (StringUtils.isNotEmpty(functionConfig.getGo())) {
257306
V1alpha1FunctionSpecGolang v1alpha1FunctionSpecGolang = new V1alpha1FunctionSpecGolang();
258-
Path path = Paths.get(functionConfig.getGo());
259-
v1alpha1FunctionSpecGolang.setGo(path.getFileName().toString());
260-
v1alpha1FunctionSpecGolang.setGoLocation(location);
307+
v1alpha1FunctionSpecGolang.setGo(fileName);
308+
if (isPkgUrlProvided) {
309+
v1alpha1FunctionSpecGolang.setGoLocation(functionPkgUrl);
310+
}
261311
v1alpha1FunctionSpec.setGolang(v1alpha1FunctionSpecGolang);
262312
}
263313

@@ -422,12 +472,21 @@ public static FunctionConfig createFunctionConfigFromV1alpha1Function(String ten
422472
if (v1alpha1FunctionSpec.getJava() != null) {
423473
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
424474
functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJar());
475+
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getJava().getJarLocation())) {
476+
functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJarLocation());
477+
}
425478
} else if (v1alpha1FunctionSpec.getPython() != null) {
426479
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
427480
functionConfig.setPy(v1alpha1FunctionSpec.getPython().getPy());
481+
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getPython().getPyLocation())) {
482+
functionConfig.setJar(v1alpha1FunctionSpec.getPython().getPyLocation());
483+
}
428484
} else if (v1alpha1FunctionSpec.getGolang() != null) {
429485
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
430486
functionConfig.setGo(v1alpha1FunctionSpec.getGolang().getGo());
487+
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getGolang().getGoLocation())) {
488+
functionConfig.setJar(v1alpha1FunctionSpec.getGolang().getGoLocation());
489+
}
431490
}
432491
if (v1alpha1FunctionSpec.getMaxMessageRetry() != null) {
433492
functionConfig.setMaxMessageRetries(v1alpha1FunctionSpec.getMaxMessageRetry());
@@ -497,4 +556,35 @@ public static void convertFunctionStatusToInstanceStatusData(InstanceCommunicati
497556
functionInstanceStatusData.setLastInvocationTime(functionStatus.getLastInvocationTime());
498557
}
499558

559+
private static File downloadPackageFile(MeshWorkerService worker, String packageName) throws IOException, PulsarAdminException {
560+
Path tempDirectory;
561+
if (worker.getWorkerConfig().getDownloadDirectory() != null) {
562+
tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
563+
} else {
564+
// use the Nar extraction directory as a temporary directory for downloaded files
565+
tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
566+
}
567+
File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
568+
worker.getBrokerAdmin().packages().download(packageName, file.toString());
569+
return file;
570+
}
571+
572+
private static Class<?>[] extractTypeArgs(final FunctionConfig functionConfig,
573+
final File componentPackageFile) {
574+
Class<?>[] typeArgs = null;
575+
if (componentPackageFile == null) {
576+
return null;
577+
}
578+
ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
579+
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && clsLoader != null) {
580+
try {
581+
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
582+
} catch (ClassNotFoundException | NoClassDefFoundError e) {
583+
throw new IllegalArgumentException(
584+
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
585+
}
586+
}
587+
return typeArgs;
588+
}
589+
500590
}

mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.pulsar.common.policies.data.FunctionStatus;
5252
import org.apache.pulsar.functions.proto.InstanceCommunication;
5353
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
54+
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
5455
import org.apache.pulsar.functions.worker.WorkerConfig;
5556
import org.junit.Assert;
5657
import org.junit.Test;
@@ -78,6 +79,7 @@
7879
Response.class,
7980
RealResponseBody.class,
8081
CommonUtil.class,
82+
FunctionsUtil.class,
8183
InstanceControlGrpc.InstanceControlFutureStub.class})
8284
@PowerMockIgnore({"javax.management.*"})
8385
public class FunctionsImplTest {
@@ -316,9 +318,13 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
316318
CustomObjectsApi customObjectsApi = PowerMockito.mock(CustomObjectsApi.class);
317319
PowerMockito.when(meshWorkerService.getCustomObjectsApi()).thenReturn(customObjectsApi);
318320
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
321+
KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
319322
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
323+
PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
324+
PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
320325
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
321326
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
327+
PowerMockito.when(workerConfig.getFunctionsWorkerServiceCustomConfigs()).thenReturn(Collections.emptyMap());
322328
PulsarAdmin pulsarAdmin = PowerMockito.mock(PulsarAdmin.class);
323329
PowerMockito.when(meshWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
324330
Tenants tenants = PowerMockito.mock(Tenants.class);
@@ -327,6 +333,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
327333
Response response = PowerMockito.mock(Response.class);
328334
ResponseBody responseBody = PowerMockito.mock(RealResponseBody.class);
329335
ApiClient apiClient = PowerMockito.mock(ApiClient.class);
336+
PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
330337

331338
String tenant = "public";
332339
String namespace = "default";
@@ -336,12 +343,12 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
336343
String version = "v1alpha1";
337344
String kind = "Function";
338345

339-
FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
346+
FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
340347

341348
PowerMockito.when(tenants.getTenantInfo(tenant)).thenReturn(null);
342349

343350
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group,
344-
version, functionName, null, functionConfig, Collections.emptyMap(), null);
351+
version, functionName, functionConfig.getJar(), functionConfig, null, meshWorkerService);
345352

346353
String clusterName = "test-pulsar";
347354
Map<String, String> customLabels = Maps.newHashMap();
@@ -380,7 +387,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
380387
functionName,
381388
null,
382389
null,
383-
null,
390+
functionConfig.getJar(),
384391
functionConfig,
385392
null,
386393
null);
@@ -507,6 +514,9 @@ public void updateFunctionTest() throws ApiException, IOException {
507514
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
508515
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
509516
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
517+
KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
518+
PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
519+
PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
510520

511521
Call getCall = PowerMockito.mock(Call.class);
512522
Response getResponse = PowerMockito.mock(Response.class);
@@ -540,7 +550,9 @@ public void updateFunctionTest() throws ApiException, IOException {
540550
null
541551
)).thenReturn(getCall);
542552

543-
FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
553+
PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
554+
555+
FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
544556

545557
PowerMockito.when(meshWorkerService.getCustomObjectsApi()
546558
.replaceNamespacedCustomObjectCall(
@@ -564,12 +576,13 @@ public void updateFunctionTest() throws ApiException, IOException {
564576
functionName,
565577
null,
566578
null,
567-
null,
579+
functionConfig.getJar(),
568580
functionConfig,
569581
null,
570582
null,
571583
null);
572584
} catch (Exception exception) {
585+
exception.printStackTrace();
573586
Assert.fail("Expected no exception to be thrown but got exception: " + exception);
574587
}
575588
}

mesh-worker-service/src/test/java/io/functionmesh/compute/testdata/Generate.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,31 @@ public static FunctionConfig CreateJavaFunctionConfig(String tenant, String name
5959
return functionConfig;
6060
}
6161

62+
public static FunctionConfig CreateJavaFunctionWithPackageURLConfig(String tenant, String namespace, String functionName) {
63+
FunctionConfig functionConfig = new FunctionConfig();
64+
functionConfig.setName(functionName);
65+
functionConfig.setTenant(tenant);
66+
functionConfig.setNamespace(namespace);
67+
functionConfig.setClassName("org.example.functions.WordCountFunction");
68+
functionConfig.setInputs(Collections.singletonList("persistent://public/default/sentences"));
69+
functionConfig.setParallelism(1);
70+
functionConfig.setCleanupSubscription(true);
71+
functionConfig.setOutput("persistent://public/default/count");
72+
Resources resources = new Resources();
73+
resources.setCpu(1.0);
74+
resources.setRam(102400L);
75+
functionConfig.setResources(resources);
76+
CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions();
77+
customRuntimeOptions.setClusterName(TEST_CLUSTER_NAME);
78+
customRuntimeOptions.setInputTypeClassName("java.lang.String");
79+
customRuntimeOptions.setOutputTypeClassName("java.lang.String");
80+
String customRuntimeOptionsJSON = new Gson().toJson(customRuntimeOptions, CustomRuntimeOptions.class);
81+
functionConfig.setCustomRuntimeOptions(customRuntimeOptionsJSON);
82+
functionConfig.setJar(String.format("function://public/default/%s@1.0", functionName));
83+
functionConfig.setAutoAck(true);
84+
return functionConfig;
85+
}
86+
6287
public static SinkConfig CreateSinkConfig(String tenant, String namespace, String functionName) {
6388
SinkConfig sinkConfig = new SinkConfig();
6489
sinkConfig.setName(functionName);

0 commit comments

Comments
 (0)