Skip to content

Commit 9891629

Browse files
authored
fix python & go function command (#147)
* fix python & go function command * fix ci * more output * fix ci * fix lint
1 parent 0cb72fa commit 9891629

458 files changed

Lines changed: 48335 additions & 332 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
5.23 MB
Binary file not shown.

controllers/spec/common.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"strings"
2525
"time"
2626

27-
"github.com/gogo/protobuf/jsonpb"
2827
"github.com/streamnative/function-mesh/api/v1alpha1"
2928
"github.com/streamnative/function-mesh/controllers/proto"
3029

@@ -56,6 +55,8 @@ const (
5655

5756
AnnotationPrometheusScrape = "prometheus.io/scrape"
5857
AnnotationPrometheusPort = "prometheus.io/port"
58+
59+
EnvGoFunctionConfigs = "GO_FUNCTION_CONF"
5960
)
6061

6162
var GRPCPort = corev1.ContainerPort{
@@ -259,7 +260,7 @@ func getProcessPythonRuntimeArgs(name string, packageName string, clusterName st
259260
"python",
260261
"/pulsar/instances/python-instance/python_instance_main.py",
261262
"--py",
262-
fmt.Sprintf("/pulsar/%s", packageName),
263+
packageName,
263264
"--logging_directory",
264265
"logs/functions",
265266
"--logging_file",
@@ -317,38 +318,35 @@ func getSharedArgs(details, clusterName string, authProvided bool) []string {
317318
return args
318319
}
319320

320-
func generateGoFunctionDetailsInJSON(function *v1alpha1.Function) string {
321-
functionDetails := convertFunctionDetails(function)
322-
marshaler := &jsonpb.Marshaler{}
323-
json, err := marshaler.MarshalToString(functionDetails)
321+
func generateGoFunctionConf(function *v1alpha1.Function) string {
322+
goFunctionConfs := convertGoFunctionConfs(function)
323+
json, err := json.Marshal(goFunctionConfs)
324324
if err != nil {
325325
// TODO
326326
panic(err)
327327
}
328-
return json
328+
ret := string(json)
329+
ret = strings.ReplaceAll(ret, "\"instanceID\":0", "\"instanceID\":${"+EnvShardID+"}")
330+
return ret
329331
}
330332

331333
func getProcessGoRuntimeArgs(goExecFilePath string, function *v1alpha1.Function) []string {
332-
str := generateGoFunctionDetailsInJSON(function)
333-
tmpStr := strings.TrimSuffix(str, "}")
334-
335-
inputTopic := function.Spec.Input.Topics[0]
336-
outputTopic := function.Spec.Output.Topic
337-
338-
configContent := fmt.Sprintf("%s, \"pulsarServiceURL\": \"pulsar://test-pulsar-broker.default.svc.cluster.local:6650\", "+
339-
"\"sourceSpecsTopic\": \"%s\", \"sinkSpecsTopic\": \"%s\"}", tmpStr, inputTopic, outputTopic)
340-
341-
goPath := fmt.Sprintf("/pulsar/%s", goExecFilePath)
342-
conf := fmt.Sprintf("'%s'", configContent)
343-
334+
str := generateGoFunctionConf(function)
335+
str = strings.ReplaceAll(str, "\"", "\\\"")
344336
args := []string{
337+
fmt.Sprintf("%s=%s", EnvGoFunctionConfigs, str),
338+
"&&",
339+
fmt.Sprintf("goFunctionConfigs=${%s}", EnvGoFunctionConfigs),
340+
"&&",
341+
"echo goFunctionConfigs=\"'${goFunctionConfigs}'\"",
342+
"&&",
345343
"chmod +x",
346-
goPath,
344+
goExecFilePath,
347345
"&&",
348346
"exec",
349-
goPath,
347+
goExecFilePath,
350348
"-instance-conf",
351-
conf,
349+
"${goFunctionConfigs}",
352350
}
353351

354352
return args

controllers/spec/common_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package spec
1919

2020
import (
21+
"strings"
2122
"testing"
2223

24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
2326
"github.com/streamnative/function-mesh/api/v1alpha1"
2427

2528
"github.com/stretchr/testify/assert"
@@ -170,3 +173,76 @@ func TestGetSourceRunnerImage(t *testing.T) {
170173
image = getSourceRunnerImage(&spec)
171174
assert.Equal(t, image, "streamnative/pulsar-io-test:2.7.1")
172175
}
176+
177+
func TestMakeGoFunctionCommand(t *testing.T) {
178+
function := makeGoFunctionSample(TestFunctionName)
179+
commands := MakeGoFunctionCommand("", "/pulsar/go-func", function)
180+
assert.Equal(t, commands[0], "sh")
181+
assert.Equal(t, commands[1], "-c")
182+
assert.True(t, strings.HasPrefix(commands[2], "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}"))
183+
innerCommands := strings.Split(commands[2], "&&")
184+
assert.Equal(t, innerCommands[0], "SHARD_ID=${POD_NAME##*-} ")
185+
assert.Equal(t, innerCommands[1], " echo shardId=${SHARD_ID} ")
186+
assert.True(t, strings.HasPrefix(innerCommands[2], " GO_FUNCTION_CONF"))
187+
assert.Equal(t, innerCommands[3], " goFunctionConfigs=${GO_FUNCTION_CONF} ")
188+
assert.Equal(t, innerCommands[4], " echo goFunctionConfigs=\"'${goFunctionConfigs}'\" ")
189+
assert.Equal(t, innerCommands[5], " chmod +x /pulsar/go-func ")
190+
assert.Equal(t, innerCommands[6], " exec /pulsar/go-func -instance-conf ${goFunctionConfigs}")
191+
}
192+
193+
const TestClusterName string = "test-pulsar"
194+
const TestFunctionName string = "test-function"
195+
const TestNameSpace string = "default"
196+
197+
func makeSampleObjectMeta(name string) *metav1.ObjectMeta {
198+
return &metav1.ObjectMeta{
199+
Name: name,
200+
Namespace: TestNameSpace,
201+
UID: "dead-beef", // uid not generate automatically with fake k8s
202+
}
203+
}
204+
205+
func makeGoFunctionSample(functionName string) *v1alpha1.Function {
206+
maxPending := int32(1000)
207+
replicas := int32(1)
208+
maxReplicas := int32(5)
209+
trueVal := true
210+
return &v1alpha1.Function{
211+
TypeMeta: metav1.TypeMeta{
212+
Kind: "Function",
213+
APIVersion: "compute.functionmesh.io/v1alpha1",
214+
},
215+
ObjectMeta: *makeSampleObjectMeta(functionName),
216+
Spec: v1alpha1.FunctionSpec{
217+
Name: functionName,
218+
Tenant: "public",
219+
ClusterName: TestClusterName,
220+
Input: v1alpha1.InputConf{
221+
Topics: []string{
222+
"persistent://public/default/go-function-input-topic",
223+
},
224+
},
225+
Output: v1alpha1.OutputConf{
226+
Topic: "persistent://public/default/go-function-output-topic",
227+
},
228+
LogTopic: "persistent://public/default/go-function-logs",
229+
Timeout: 0,
230+
MaxMessageRetry: 0,
231+
ForwardSourceMessageProperty: &trueVal,
232+
Replicas: &replicas,
233+
MaxReplicas: &maxReplicas,
234+
AutoAck: &trueVal,
235+
MaxPendingAsyncRequests: &maxPending,
236+
Messaging: v1alpha1.Messaging{
237+
Pulsar: &v1alpha1.PulsarMessaging{
238+
PulsarConfig: TestClusterName,
239+
},
240+
},
241+
Runtime: v1alpha1.Runtime{
242+
Golang: &v1alpha1.GoRuntime{
243+
Go: "/pulsar/go-func",
244+
},
245+
},
246+
},
247+
}
248+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package spec
19+
20+
import "time"
21+
22+
// copy from https://github.com/apache/pulsar/blob/master/pulsar-function-go/conf/conf.go
23+
// directly import Conf from pulsar-function-go/conf will break envtest
24+
25+
type GoFunctionConf struct {
26+
PulsarServiceURL string `json:"pulsarServiceURL" yaml:"pulsarServiceURL"`
27+
InstanceID int `json:"instanceID" yaml:"instanceID"`
28+
FuncID string `json:"funcID" yaml:"funcID"`
29+
FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
30+
MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
31+
Port int `json:"port" yaml:"port"`
32+
ClusterName string `json:"clusterName" yaml:"clusterName"`
33+
KillAfterIdle time.Duration `json:"killAfterIdleMs" yaml:"killAfterIdleMs"`
34+
// function details config
35+
Tenant string `json:"tenant" yaml:"tenant"`
36+
NameSpace string `json:"nameSpace" yaml:"nameSpace"`
37+
Name string `json:"name" yaml:"name"`
38+
LogTopic string `json:"logTopic" yaml:"logTopic"`
39+
ProcessingGuarantees int32 `json:"processingGuarantees" yaml:"processingGuarantees"`
40+
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
41+
Runtime int32 `json:"runtime" yaml:"runtime"`
42+
AutoACK bool `json:"autoAck" yaml:"autoAck"`
43+
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
44+
//source config
45+
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
46+
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
47+
SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
48+
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
49+
//source input specs
50+
SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
51+
SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
52+
IsRegexPatternSubscription bool `json:"isRegexPatternSubscription" yaml:"isRegexPatternSubscription"`
53+
ReceiverQueueSize int32 `json:"receiverQueueSize" yaml:"receiverQueueSize"`
54+
//sink spec config
55+
SinkSpecTopic string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
56+
SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
57+
//resources config
58+
CPU float64 `json:"cpu" yaml:"cpu"`
59+
RAM int64 `json:"ram" yaml:"ram"`
60+
Disk int64 `json:"disk" yaml:"disk"`
61+
//retryDetails config
62+
MaxMessageRetries int32 `json:"maxMessageRetries" yaml:"maxMessageRetries"`
63+
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
64+
ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
65+
UserConfig string `json:"userConfig" yaml:"userConfig"`
66+
//metrics config
67+
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
68+
}

controllers/spec/utils.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package spec
1919

2020
import (
2121
"encoding/json"
22+
"fmt"
2223
"regexp"
2324
"strings"
25+
"time"
2426

2527
"k8s.io/apimachinery/pkg/util/validation"
2628

@@ -55,6 +57,42 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
5557
}
5658
}
5759

60+
func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf {
61+
return &GoFunctionConf{
62+
FuncID: fmt.Sprintf("${%s}-%d", EnvShardID, time.Now().Unix()),
63+
PulsarServiceURL: "${brokerServiceURL}",
64+
FuncVersion: "0",
65+
MaxBufTuples: 100, //TODO
66+
Port: int(GRPCPort.ContainerPort),
67+
ClusterName: function.Spec.ClusterName,
68+
Tenant: function.Spec.Tenant,
69+
NameSpace: function.Namespace,
70+
Name: function.Spec.Name,
71+
LogTopic: function.Spec.LogTopic,
72+
ProcessingGuarantees: int32(convertProcessingGuarantee(function.Spec.ProcessingGuarantee)),
73+
SecretsMap: marshalSecretsMap(function.Spec.SecretsMap),
74+
Runtime: int32(proto.FunctionDetails_GO),
75+
AutoACK: *function.Spec.AutoAck,
76+
Parallelism: *function.Spec.Replicas,
77+
TimeoutMs: uint64(function.Spec.Timeout),
78+
SubscriptionName: function.Spec.SubscriptionName,
79+
CleanupSubscription: function.Spec.CleanupSubscription,
80+
SourceSpecTopic: function.Spec.Input.Topics[0],
81+
SourceSchemaType: "", // TODO: map schema type
82+
IsRegexPatternSubscription: function.Spec.Input.TopicPattern != "",
83+
SinkSpecTopic: function.Spec.Output.Topic,
84+
SinkSchemaType: "", // TODO: map schema type
85+
CPU: float64(function.Spec.Resources.Requests.Cpu().Value()),
86+
RAM: function.Spec.Resources.Requests.Memory().Value(),
87+
Disk: function.Spec.Resources.Requests.Storage().Value(),
88+
MaxMessageRetries: function.Spec.MaxMessageRetry,
89+
DeadLetterTopic: function.Spec.DeadLetterTopic,
90+
UserConfig: getUserConfig(function.Spec.FuncConfig),
91+
MetricsPort: int(MetricsPort.ContainerPort),
92+
ExpectedHealthCheckInterval: -1, // TurnOff BuiltIn HealthCheck to avoid instance exit
93+
}
94+
}
95+
5896
func generateInputSpec(sourceConf v1alpha1.InputConf) map[string]*proto.ConsumerSpec {
5997
inputSpecs := make(map[string]*proto.ConsumerSpec)
6098

controllers/suite_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ var _ = BeforeSuite(func(done Done) {
6363
}
6464
} else {
6565
testEnv = &envtest.Environment{
66-
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
66+
CRDInstallOptions: envtest.CRDInstallOptions{
67+
Paths: []string{filepath.Join("..", "config", "crd", "bases")},
68+
},
69+
AttachControlPlaneOutput: true,
6770
}
6871
}
6972
var err error

go.mod

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,20 @@ module github.com/streamnative/function-mesh
33
go 1.13
44

55
require (
6+
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd // indirect
67
github.com/ghodss/yaml v1.0.0
7-
github.com/davecgh/go-spew v1.1.1 // indirect
88
github.com/go-logr/logr v0.1.0
99
github.com/gogo/protobuf v1.3.1
10-
github.com/golang/protobuf v1.4.2
11-
github.com/google/gofuzz v1.1.0 // indirect
10+
github.com/golang/protobuf v1.4.3
11+
github.com/kr/pretty v0.2.0 // indirect
1212
github.com/onsi/ginkgo v1.14.2
1313
github.com/onsi/gomega v1.10.4
14-
github.com/pkg/errors v0.9.1 // indirect
15-
github.com/prometheus/client_golang v1.1.0 // indirect
14+
github.com/prometheus/client_golang v1.7.1 // indirect
1615
github.com/streamnative/pulsarctl v0.5.0
1716
github.com/stretchr/testify v1.6.1
18-
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
19-
google.golang.org/appengine v1.6.6 // indirect
2017
google.golang.org/protobuf v1.25.0
21-
gopkg.in/yaml.v2 v2.3.0
2218
k8s.io/api v0.18.6
2319
k8s.io/apimachinery v0.18.6
2420
k8s.io/client-go v0.18.6
2521
sigs.k8s.io/controller-runtime v0.6.2
26-
sigs.k8s.io/controller-tools v0.2.4 // indirect
2722
)

0 commit comments

Comments
 (0)