Skip to content

Commit edd275b

Browse files
authored
#325 add State Store configs to support stateful functions in Pulsar (#359)
* add State Store configs * more comment * fix CI * fix style * stash * add tests * add stateful sample and integration tests * fix tests * more output * add test step * address comments
1 parent 213cfe2 commit edd275b

20 files changed

Lines changed: 499 additions & 47 deletions
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
apiVersion: compute.functionmesh.io/v1alpha1
2+
kind: Function
3+
metadata:
4+
name: java-function-stateful-sample
5+
namespace: default
6+
spec:
7+
image: streamnative/pulsar-functions-java-sample:2.10.0.0-rc10
8+
className: org.apache.pulsar.functions.api.examples.WordCountFunction
9+
forwardSourceMessageProperty: true
10+
maxPendingAsyncRequests: 1000
11+
replicas: 1
12+
maxReplicas: 5
13+
logTopic: persistent://public/default/logging-function-logs
14+
input:
15+
topics:
16+
- persistent://public/default/java-function-stateful-input-topic
17+
typeClassName: java.lang.String
18+
output:
19+
topic: persistent://public/default/java-function-stateful-output-topic
20+
typeClassName: java.lang.String
21+
resources:
22+
requests:
23+
cpu: "0.1"
24+
memory: 1G
25+
limits:
26+
cpu: "0.2"
27+
memory: 1.1G
28+
pulsar:
29+
pulsarConfig: "test-pulsar"
30+
java:
31+
jar: /pulsar/examples/api-examples.jar
32+
clusterName: test-pulsar
33+
autoAck: true
34+
statefulConfig:
35+
pulsar:
36+
serviceUrl: "bk://sn-platform-pulsar-bk.default.svc.cluster.local:4181"
37+
---
38+
apiVersion: v1
39+
kind: ConfigMap
40+
metadata:
41+
name: test-pulsar
42+
data:
43+
webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080
44+
brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650

.ci/clusters/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,6 @@ broker:
112112

113113
proxy:
114114
replicaCount: 1
115+
116+
functions:
117+
functionState: true

.ci/verify_function_mesh.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,12 @@ case ${1} in
7373
ci::verify_java_function function-builtin-hpa-sample
7474
ci::verify_hpa function-builtin-hpa-sample
7575
;;
76+
compute_v1alpha1_function_stateful)
77+
ci::verify_function_mesh java-function-stateful-sample
78+
sleep 60
79+
ci::print_function_log java-function-stateful-sample
80+
ci::verify_java_function java-function-stateful-sample
81+
sleep 60
82+
ci::print_function_log java-function-stateful-sample
83+
;;
7684
esac

.github/workflows/test-integration-kind-samples.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ jobs:
133133
.ci/verify_function_mesh.sh compute_v1alpha1_function_builtin_hpa
134134
kubectl delete -f .ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml
135135
136+
- name: Test Java Stateful Function
137+
run: |
138+
kubectl apply -f .ci/clusters/compute_v1alpha1_function_stateful.yaml
139+
kubectl get all
140+
141+
- name: Verify Java Stateful Function
142+
run: |
143+
.ci/verify_function_mesh.sh compute_v1alpha1_function_stateful
144+
kubectl delete -f .ci/clusters/compute_v1alpha1_function_stateful.yaml
145+
136146
- name: Setup tmate session
137147
uses: mxschmitt/action-tmate@v3
138148
if: failure()

api/v1alpha1/common.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type Messaging struct {
3535
Pulsar *PulsarMessaging `json:"pulsar,omitempty"`
3636
}
3737

38+
type Stateful struct {
39+
Pulsar *PulsarStateStore `json:"pulsar,omitempty"`
40+
}
41+
3842
type PulsarMessaging struct {
3943
// The config map need to contain the following fields
4044
// webServiceURL
@@ -54,6 +58,25 @@ type PulsarMessaging struct {
5458
TLSSecret string `json:"tlsSecret,omitempty"`
5559
}
5660

61+
type PulsarStateStore struct {
62+
// The service url points to the state store service
63+
// By default, the state store service is bookkeeper table service
64+
ServiceURL string `json:"serviceUrl"`
65+
66+
// The state store config for Java runtime
67+
JavaProvider *PulsarStateStoreJavaProvider `json:"javaProvider,omitempty"`
68+
}
69+
70+
type PulsarStateStoreJavaProvider struct {
71+
// The java class name of the state store provider implementation
72+
// The class must implement `org.apache.pulsar.functions.instance.state.StateStoreProvider` interface
73+
// If not set, `org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl` will be used
74+
ClassName string `json:"className"`
75+
76+
// The configmap of the configuration for the state store provider
77+
Config *Config `json:"config,omitempty"`
78+
}
79+
5780
type PodPolicy struct {
5881
// Labels specifies the labels to attach to pod the operator creates for the cluster.
5982
Labels map[string]string `json:"labels,omitempty"`

api/v1alpha1/function_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type FunctionSpec struct {
8585

8686
// Image pull policy, one of Always, Never, IfNotPresent, default to IfNotPresent.
8787
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
88+
89+
// +kubebuilder:validation:Optional
90+
StateConfig *Stateful `json:"statefulConfig,omitempty"`
8891
}
8992

9093
// FunctionStatus defines the observed state of Function

api/v1alpha1/function_webhook.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ func (r *Function) ValidateCreate() error {
217217
allErrs = append(allErrs, fieldErr)
218218
}
219219

220+
fieldErr = validateStatefulFunctionConfigs(r.Spec.StateConfig, r.Spec.Runtime)
221+
if fieldErr != nil {
222+
allErrs = append(allErrs, fieldErr)
223+
}
224+
220225
if len(allErrs) == 0 {
221226
return nil
222227
}

api/v1alpha1/validate.go

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func validateJavaRuntime(java *JavaRuntime, className string) []*field.Error {
3434
allErrs = append(allErrs, e)
3535
}
3636
if java.Jar == "" {
37-
e := field.Invalid(field.NewPath("spec").Child("java", "jar"), java.Jar, "jar cannot be empty in java runtime")
37+
e := field.Invalid(field.NewPath("spec").Child("java", "jar"), java.Jar,
38+
"jar cannot be empty in java runtime")
3839
allErrs = append(allErrs, e)
3940
}
4041
if java.JarLocation != "" {
@@ -56,7 +57,8 @@ func validatePythonRuntime(python *PythonRuntime, className string) []*field.Err
5657
allErrs = append(allErrs, e)
5758
}
5859
if python.Py == "" {
59-
e := field.Invalid(field.NewPath("spec").Child("python", "py"), python.Py, "py cannot be empty in python runtime")
60+
e := field.Invalid(field.NewPath("spec").Child("python", "py"), python.Py,
61+
"py cannot be empty in python runtime")
6062
allErrs = append(allErrs, e)
6163
}
6264
if python.PyLocation != "" {
@@ -74,7 +76,8 @@ func validateGolangRuntime(golang *GoRuntime) []*field.Error {
7476
var allErrs field.ErrorList
7577
if golang != nil {
7678
if golang.Go == "" {
77-
e := field.Invalid(field.NewPath("spec").Child("golang", "go"), golang.Go, "go cannot be empty in golang runtime")
79+
e := field.Invalid(field.NewPath("spec").Child("golang", "go"), golang.Go,
80+
"go cannot be empty in golang runtime")
7881
allErrs = append(allErrs, e)
7982
}
8083
if golang.GoLocation != "" {
@@ -102,7 +105,8 @@ func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error
102105
}
103106

104107
if maxReplicas != nil && replicas != nil && *replicas > *maxReplicas {
105-
e := field.Invalid(field.NewPath("spec").Child("maxReplicas"), *maxReplicas, "maxReplicas must be greater than or equal to replicas")
108+
e := field.Invalid(field.NewPath("spec").Child("maxReplicas"), *maxReplicas,
109+
"maxReplicas must be greater than or equal to replicas")
106110
allErrs = append(allErrs, e)
107111
}
108112
return allErrs
@@ -117,38 +121,45 @@ func validateResourceRequirement(requirements corev1.ResourceRequirements) *fiel
117121

118122
func validateTimeout(timeout int32, processingGuarantee ProcessGuarantee) *field.Error {
119123
if timeout != 0 && processingGuarantee == EffectivelyOnce {
120-
return field.Invalid(field.NewPath("spec").Child("timeout"), timeout, "message timeout can only be set for AtleastOnce processing guarantee")
124+
return field.Invalid(field.NewPath("spec").Child("timeout"), timeout,
125+
"message timeout can only be set for AtleastOnce processing guarantee")
121126
}
122127
return nil
123128
}
124129

125-
func validateMaxMessageRetry(maxMessageRetry int32, processingGuarantee ProcessGuarantee, deadLetterTopic string) []*field.Error {
130+
func validateMaxMessageRetry(maxMessageRetry int32, processingGuarantee ProcessGuarantee,
131+
deadLetterTopic string) []*field.Error {
126132
var allErrs field.ErrorList
127133
if maxMessageRetry > 0 && processingGuarantee == EffectivelyOnce {
128-
e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "MaxMessageRetries and Effectively once are not compatible")
134+
e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry,
135+
"MaxMessageRetries and Effectively once are not compatible")
129136
allErrs = append(allErrs, e)
130137
}
131138

132139
if maxMessageRetry <= 0 && deadLetterTopic != "" {
133-
e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "dead letter topic is set but max message retry is set to infinity")
140+
e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry,
141+
"dead letter topic is set but max message retry is set to infinity")
134142
allErrs = append(allErrs, e)
135143
}
136144
return allErrs
137145
}
138146

139147
func validateRetainKeyOrdering(retainKeyOrdering bool, processingGuarantee ProcessGuarantee) *field.Error {
140148
if retainKeyOrdering && processingGuarantee == EffectivelyOnce {
141-
return field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "when effectively once processing guarantee is specified, retain Key ordering cannot be set")
149+
return field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering,
150+
"when effectively once processing guarantee is specified, retain Key ordering cannot be set")
142151
}
143152
return nil
144153
}
145154

146155
func validateRetainOrderingConflicts(retainKeyOrdering bool, retainOrdering bool) []*field.Error {
147156
var allErrs field.ErrorList
148157
if retainKeyOrdering && retainOrdering {
149-
e := field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "only one of retain ordering or retain key ordering can be set")
158+
e := field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering,
159+
"only one of retain ordering or retain key ordering can be set")
150160
allErrs = append(allErrs, e)
151-
e = field.Invalid(field.NewPath("spec").Child("retainOrdering"), retainOrdering, "only one of retain ordering or retain key ordering can be set")
161+
e = field.Invalid(field.NewPath("spec").Child("retainOrdering"), retainOrdering,
162+
"only one of retain ordering or retain key ordering can be set")
152163
allErrs = append(allErrs, e)
153164
}
154165
return allErrs
@@ -158,7 +169,8 @@ func validateFunctionConfig(config *Config) *field.Error {
158169
if config != nil {
159170
_, err := config.MarshalJSON()
160171
if err != nil {
161-
return field.Invalid(field.NewPath("spec").Child("funcConfig"), config, "function config is invalid: "+err.Error())
172+
return field.Invalid(field.NewPath("spec").Child("funcConfig"), config,
173+
"function config is invalid: "+err.Error())
162174
}
163175
}
164176
return nil
@@ -168,7 +180,8 @@ func validateSinkConfig(config *Config) *field.Error {
168180
if config != nil {
169181
_, err := config.MarshalJSON()
170182
if err != nil {
171-
return field.Invalid(field.NewPath("spec").Child("sinkConfig"), config, "sink config is invalid: "+err.Error())
183+
return field.Invalid(field.NewPath("spec").Child("sinkConfig"), config,
184+
"sink config is invalid: "+err.Error())
172185
}
173186
}
174187
return nil
@@ -178,7 +191,8 @@ func validateSourceConfig(config *Config) *field.Error {
178191
if config != nil {
179192
_, err := config.MarshalJSON()
180193
if err != nil {
181-
return field.Invalid(field.NewPath("spec").Child("sourceConfig"), config, "source config is invalid: "+err.Error())
194+
return field.Invalid(field.NewPath("spec").Child("sourceConfig"), config,
195+
"source config is invalid: "+err.Error())
182196
}
183197
}
184198
return nil
@@ -188,7 +202,8 @@ func validateSecretsMap(secrets map[string]SecretRef) *field.Error {
188202
if secrets != nil {
189203
_, err := json.Marshal(secrets)
190204
if err != nil {
191-
return field.Invalid(field.NewPath("spec").Child("secretsMap"), secrets, "secrets map is invalid: "+err.Error())
205+
return field.Invalid(field.NewPath("spec").Child("secretsMap"), secrets,
206+
"secrets map is invalid: "+err.Error())
192207
}
193208
}
194209
return nil
@@ -247,14 +262,18 @@ func validateInputOutput(input *InputConf, output *OutputConf) []*field.Error {
247262
}
248263
if output.ProducerConf != nil && output.ProducerConf.CryptoConfig != nil {
249264
if output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName == "" {
250-
e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "cryptoKeyReaderClassName"),
251-
output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName, "cryptoKeyReader class name required")
265+
e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig",
266+
"cryptoKeyReaderClassName"),
267+
output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName,
268+
"cryptoKeyReader class name required")
252269
allErrs = append(allErrs, e)
253270
}
254271

255272
if len(output.ProducerConf.CryptoConfig.EncryptionKeys) == 0 {
256-
e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "encryptionKeys"),
257-
output.ProducerConf.CryptoConfig.EncryptionKeys, "must provide encryption key name for crypto key reader")
273+
e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig",
274+
"encryptionKeys"),
275+
output.ProducerConf.CryptoConfig.EncryptionKeys,
276+
"must provide encryption key name for crypto key reader")
258277
allErrs = append(allErrs, e)
259278
}
260279
}
@@ -268,7 +287,8 @@ func validateLogTopic(logTopic string) *field.Error {
268287
if logTopic != "" {
269288
err := isValidTopicName(logTopic)
270289
if err != nil {
271-
return field.Invalid(field.NewPath("spec").Child("logTopic"), logTopic, fmt.Sprintf("Log topic %s is invalid", logTopic))
290+
return field.Invalid(field.NewPath("spec").Child("logTopic"), logTopic,
291+
fmt.Sprintf("Log topic %s is invalid", logTopic))
272292
}
273293
}
274294
return nil
@@ -278,7 +298,8 @@ func validateDeadLetterTopic(deadLetterTopic string) *field.Error {
278298
if deadLetterTopic != "" {
279299
err := isValidTopicName(deadLetterTopic)
280300
if err != nil {
281-
return field.Invalid(field.NewPath("spec").Child("deadLetterTopic"), deadLetterTopic, fmt.Sprintf("DeadLetter topic %s is invalid", deadLetterTopic))
301+
return field.Invalid(field.NewPath("spec").Child("deadLetterTopic"), deadLetterTopic,
302+
fmt.Sprintf("DeadLetter topic %s is invalid", deadLetterTopic))
282303
}
283304
}
284305
return nil
@@ -290,3 +311,23 @@ func validateAutoAck(autoAck *bool) *field.Error {
290311
}
291312
return nil
292313
}
314+
315+
func validateStatefulFunctionConfigs(statefulFunctionConfigs *Stateful, runtime Runtime) *field.Error {
316+
if statefulFunctionConfigs != nil {
317+
if statefulFunctionConfigs.Pulsar != nil {
318+
if isGolangRuntime(runtime) {
319+
return field.Invalid(field.NewPath("spec").Child("statefulConfig"), runtime.Golang,
320+
"Golang function do not support stateful function yet")
321+
}
322+
if statefulFunctionConfigs.Pulsar.ServiceURL == "" {
323+
return field.Invalid(field.NewPath("spec").Child("statefulConfig", "pulsar", "serviceUrl"),
324+
statefulFunctionConfigs.Pulsar.ServiceURL, "serviceUrl cannot be empty")
325+
}
326+
}
327+
}
328+
return nil
329+
}
330+
331+
func isGolangRuntime(runtime Runtime) bool {
332+
return runtime.Golang != nil && runtime.Python == nil && runtime.Java == nil
333+
}

0 commit comments

Comments
 (0)