Skip to content

Commit 91e17af

Browse files
authored
redirect upload JAR request to package management service (#308)
* support redirect upload JAR to package service * add integration tests * add tmate to debug * fix CI * fix package upload
1 parent c99c232 commit 91e17af

8 files changed

Lines changed: 99 additions & 2 deletions

File tree

.ci/clusters/values_mesh_worker_service.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ pulsar_metadata:
102102
functions:
103103
configData:
104104
functionsWorkerServiceNarPackage: /pulsar/mesh-worker-service.nar
105+
functionsWorkerServiceCustomConfigs:
106+
uploadEnabled: true
107+
functionEnabled: true
108+
sinkEnabled: true
109+
sourceEnabled: true
105110

106111
broker:
107112
replicaCount: 1

.ci/helm.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,28 @@ function ci::verify_go_package() {
450450
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-go-fn)
451451
echo "${RET}"
452452
}
453+
454+
function ci::create_java_function_by_upload() {
455+
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- cat conf/functions_worker.yml
456+
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin functions create --jar /pulsar/examples/api-examples.jar --name package-upload-java-fn --className org.apache.pulsar.functions.api.examples.ExclamationFunction --inputs persistent://public/default/package-upload-java-fn-input --cpu 0.1)
457+
${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0
458+
sleep 15
459+
echo $RET
460+
${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0
461+
sleep 15
462+
${KUBECTL} get pods -A
463+
sleep 5
464+
WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "package-upload-java-fn" | wc -l)
465+
while [[ ${WC} -lt 1 ]]; do
466+
echo ${WC};
467+
sleep 20
468+
${KUBECTL} get pods -n ${NAMESPACE}
469+
${KUBECTL} describe pod package-upload-java-fn-function-0
470+
WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "package-upload-java-fn" | wc -l)
471+
done
472+
echo "java function test done"
473+
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-upload-java-fn)
474+
echo "${RET}"
475+
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin packages get-metadata function://public/default/package-upload-java-fn@latest)
476+
echo "${RET}"
477+
}

.ci/verify_package_management_service.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,5 @@ ci::verify_python_package
3636

3737
ci::upload_go_package
3838
ci::verify_go_package
39+
40+
ci::create_java_function_by_upload

.github/workflows/test-integration-mesh-worker-service.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,8 @@ jobs:
8989
- name: verify package management service
9090
run: |
9191
.ci/verify_package_management_service.sh
92+
93+
- name: Setup tmate session
94+
uses: mxschmitt/action-tmate@v3
95+
if: failure()
96+

manifests/crd.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ spec:
1515
name: webhook-service
1616
namespace: system
1717
path: /convert
18+
port: 443
1819
group: compute.functionmesh.io
1920
names:
2021
kind: FunctionMesh
@@ -7441,6 +7442,7 @@ spec:
74417442
name: webhook-service
74427443
namespace: system
74437444
path: /convert
7445+
port: 443
74447446
group: compute.functionmesh.io
74457447
names:
74467448
kind: Function
@@ -10015,6 +10017,7 @@ spec:
1001510017
name: webhook-service
1001610018
namespace: system
1001710019
path: /convert
10020+
port: 443
1001810021
group: compute.functionmesh.io
1001910022
names:
1002010023
kind: Sink
@@ -12526,6 +12529,7 @@ spec:
1252612529
name: webhook-service
1252712530
namespace: system
1252812531
path: /convert
12532+
port: 443
1252912533
group: compute.functionmesh.io
1253012534
names:
1253112535
kind: Source

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private void validateRegisterFunctionRequestParams(String tenant, String namespa
8888
throw new RestException(Response.Status.BAD_REQUEST, "Function config is not provided");
8989
}
9090
MeshWorkerServiceCustomConfig customConfig = worker().getMeshWorkerServiceCustomConfig();
91-
if (jarUploaded && customConfig != null && customConfig.isUploadEnabled()) {
91+
if (jarUploaded && customConfig != null && !customConfig.isUploadEnabled()) {
9292
throw new RestException(Response.Status.BAD_REQUEST, "Uploading Jar File is not enabled");
9393
}
9494
this.validateResources(functionConfig.getResources(), worker().getWorkerConfig().getFunctionInstanceMinResources(),
@@ -130,14 +130,24 @@ public void registerFunction(final String tenant,
130130
clientAuthenticationDataHttps,
131131
ComponentTypeUtils.toString(componentType));
132132
this.validateTenantIsExist(tenant, namespace, functionName, clientRole);
133+
String packageURL = functionPkgUrl;
134+
if (uploadedInputStream != null) {
135+
try {
136+
String tempDirectory = System.getProperty("java.io.tmpdir");
137+
packageURL = FunctionsUtil.uploadPackageToPackageService(worker().getBrokerAdmin(), tenant, namespace, functionName, uploadedInputStream, fileDetail, tempDirectory);
138+
} catch (Exception e) {
139+
log.error("register {}/{}/{} function failed, error message: {}", tenant, namespace, functionName, e);
140+
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
141+
}
142+
}
133143

134144
String cluster = worker().getWorkerConfig().getPulsarFunctionsCluster();
135145
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(
136146
kind,
137147
group,
138148
version,
139149
functionName,
140-
functionPkgUrl,
150+
packageURL,
141151
functionConfig,
142152
cluster,
143153
worker()

mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737
import io.functionmesh.compute.models.MeshWorkerServiceCustomConfig;
3838
import io.kubernetes.client.custom.Quantity;
3939
import lombok.extern.slf4j.Slf4j;
40+
import org.apache.commons.io.FileUtils;
4041
import org.apache.commons.lang3.RandomStringUtils;
4142
import org.apache.commons.lang3.StringUtils;
4243
import org.apache.logging.log4j.util.Strings;
44+
import org.apache.pulsar.client.admin.PulsarAdmin;
4345
import org.apache.pulsar.client.admin.PulsarAdminException;
4446
import org.apache.pulsar.common.functions.ConsumerConfig;
4547
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -54,10 +56,13 @@
5456
import org.apache.pulsar.functions.proto.InstanceCommunication;
5557
import org.apache.pulsar.functions.utils.FunctionCommon;
5658
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
59+
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
60+
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
5761

5862
import javax.ws.rs.core.Response;
5963
import java.io.File;
6064
import java.io.IOException;
65+
import java.io.InputStream;
6166
import java.nio.file.Files;
6267
import java.nio.file.Path;
6368
import java.nio.file.Paths;
@@ -650,4 +655,44 @@ private static Class<?>[] extractTypeArgs(final FunctionConfig functionConfig,
650655
return typeArgs;
651656
}
652657

658+
private static String generatePackageURL(final String tenant,
659+
final String namespace,
660+
final String functionName) {
661+
return String.format("function://%s/%s/%s", tenant, namespace, functionName);
662+
}
663+
664+
public static String uploadPackageToPackageService(PulsarAdmin admin,
665+
final String tenant,
666+
final String namespace,
667+
final String functionName,
668+
final InputStream uploadedInputStream,
669+
final FormDataContentDisposition fileDetail,
670+
String tempDirectory) throws Exception {
671+
Path tempDirectoryPath = Paths.get(tempDirectory);
672+
if (Files.notExists(tempDirectoryPath)) {
673+
Files.createDirectories(tempDirectoryPath);
674+
}
675+
Path filePath = Files.createTempFile(tempDirectoryPath,
676+
RandomStringUtils.random(5, true, true).toLowerCase(), fileDetail.getFileName());
677+
FileUtils.copyInputStreamToFile(uploadedInputStream, filePath.toFile());
678+
uploadedInputStream.close();
679+
680+
PackageMetadata packageMetadata = new PackageMetadata();
681+
String packageName = generatePackageURL(tenant, namespace, functionName);
682+
packageMetadata.setContact("mesh-worker-service");
683+
packageMetadata.setDescription("mesh-worker-service created for " + packageName);
684+
Map<String, String> properties = new HashMap<>();
685+
properties.put("tenant", tenant);
686+
properties.put("namespace", namespace);
687+
properties.put("functionName", functionName);
688+
properties.put("fileName", fileDetail.getFileName());
689+
properties.put("size", Long.toString(filePath.toFile().length()));
690+
long checksum = FileUtils.checksumCRC32(filePath.toFile());
691+
properties.put("checksum", Long.toString(checksum));
692+
packageMetadata.setProperties(properties);
693+
admin.packages().upload(packageMetadata, packageName, filePath.toString());
694+
log.info("upload file {} to package service {} successfully", filePath, packageName);
695+
Files.deleteIfExists(filePath);
696+
return packageName;
697+
}
653698
}

mesh-worker-service/src/test/java/io/functionmesh/compute/util/WorkerConfigTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void testCustomConfigs() throws Exception {
4848
MeshWorkerServiceCustomConfig customConfig = RuntimeUtils.getRuntimeFunctionConfig(
4949
customConfigs, MeshWorkerServiceCustomConfig.class);
5050

51+
assertEquals(customConfigs.get("uploadEnabled"), customConfig.isUploadEnabled());
5152
assertEquals("service-account", customConfig.getDefaultServiceAccountName());
5253

5354
V1OwnerReference ownerRef = CommonUtil.getOwnerReferenceFromCustomConfigs(customConfig);

0 commit comments

Comments
 (0)