Skip to content

Commit a231e67

Browse files
authored
feat(pulsar-functions): add FQFN support and optional parameters (#77)
1 parent f641081 commit a231e67

8 files changed

Lines changed: 1412 additions & 151 deletions

File tree

.github/workflows/e2e.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ jobs:
4242
run: go mod download
4343

4444
- name: Run E2E tests
45+
env:
46+
E2E_VERBOSE: "1"
4547
run: ./scripts/e2e-test.sh all
4648

4749
- name: Cleanup

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ WORKDIR /server
6161

6262
# Copy binary from builder
6363
COPY --from=builder /build/snmcp /server/snmcp
64+
COPY --from=builder /build/cmd/snmcp-e2e/testdata/functions/echo.py /server/e2e/functions/echo.py
6465

6566
# Change ownership
6667
RUN chown -R snmcp:snmcp /server

cmd/snmcp-e2e/main.go

Lines changed: 230 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,13 @@ func run(ctx context.Context, cfg config) error {
134134

135135
suffix := time.Now().UnixNano()
136136
tenant := fmt.Sprintf("e2e-%d", suffix)
137-
namespace := fmt.Sprintf("%s/ns-%d", tenant, suffix)
137+
namespaceName := fmt.Sprintf("ns-%d", suffix)
138+
namespace := fmt.Sprintf("%s/%s", tenant, namespaceName)
138139
topic := fmt.Sprintf("persistent://%s/topic-%d", namespace, suffix)
139140
concurrentTopic := fmt.Sprintf("persistent://%s/topic-concurrent-%d", namespace, suffix)
141+
functionInputTopic := fmt.Sprintf("persistent://%s/function-input-%d", namespace, suffix)
142+
functionOutputTopic := fmt.Sprintf("persistent://%s/function-output-%d", namespace, suffix)
143+
functionName := fmt.Sprintf("echo-%d", suffix)
140144

141145
result, err := callTool(ctx, adminClient, "pulsar_admin_tenant", map[string]any{
142146
"resource": "tenant",
@@ -209,6 +213,118 @@ func run(ctx context.Context, cfg config) error {
209213
return err
210214
}
211215

216+
result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
217+
"resource": "topic",
218+
"operation": "create",
219+
"topic": functionInputTopic,
220+
"partitions": float64(0),
221+
})
222+
if err := requireToolOK(result, err, "pulsar_admin_topic create function input"); err != nil {
223+
return err
224+
}
225+
226+
result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
227+
"resource": "topic",
228+
"operation": "create",
229+
"topic": functionOutputTopic,
230+
"partitions": float64(0),
231+
})
232+
if err := requireToolOK(result, err, "pulsar_admin_topic create function output"); err != nil {
233+
return err
234+
}
235+
236+
logf(cfg.verbose, "creating function: tenant=%s namespace=%s name=%s inputs=%v output=%s py=%s classname=%s",
237+
tenant,
238+
namespaceName,
239+
functionName,
240+
[]string{functionInputTopic},
241+
functionOutputTopic,
242+
"/server/e2e/functions/echo.py",
243+
"echo.EchoFunction",
244+
)
245+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
246+
"operation": "create",
247+
"tenant": tenant,
248+
"namespace": namespaceName,
249+
"name": functionName,
250+
"classname": "echo.EchoFunction",
251+
"inputs": []string{functionInputTopic},
252+
"output": functionOutputTopic,
253+
"py": "/server/e2e/functions/echo.py",
254+
})
255+
if err := requireToolOK(result, err, "pulsar_admin_functions create"); err != nil {
256+
return err
257+
}
258+
259+
if err := waitForFunctionRunning(ctx, adminClient, tenant, namespaceName, functionName, 60*time.Second); err != nil {
260+
return err
261+
}
262+
263+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
264+
"operation": "stats",
265+
"tenant": tenant,
266+
"namespace": namespaceName,
267+
"name": functionName,
268+
"instanceId": float64(0),
269+
})
270+
if err := requireToolOK(result, err, "pulsar_admin_functions stats"); err != nil {
271+
return err
272+
}
273+
274+
triggerValue := "e2e-trigger"
275+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
276+
"operation": "trigger",
277+
"tenant": tenant,
278+
"namespace": namespaceName,
279+
"name": functionName,
280+
"topic": functionInputTopic,
281+
"triggerValue": triggerValue,
282+
})
283+
if err := requireToolOK(result, err, "pulsar_admin_functions trigger"); err != nil {
284+
return err
285+
}
286+
triggerResult := firstText(result)
287+
logf(cfg.verbose, "trigger result: %s", triggerResult)
288+
if !strings.Contains(triggerResult, triggerValue) {
289+
if err := waitForTriggerOutput(ctx, adminClient, functionOutputTopic, fmt.Sprintf("trigger-sub-%d", suffix), triggerValue, 30*time.Second, cfg.verbose); err != nil {
290+
return fmt.Errorf("unexpected trigger result: %s; output check failed: %w", triggerResult, err)
291+
}
292+
}
293+
294+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
295+
"operation": "update",
296+
"tenant": tenant,
297+
"namespace": namespaceName,
298+
"name": functionName,
299+
"userConfig": map[string]any{"updated": true},
300+
})
301+
if err := requireToolOK(result, err, "pulsar_admin_functions update"); err != nil {
302+
return err
303+
}
304+
305+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
306+
"operation": "get",
307+
"tenant": tenant,
308+
"namespace": namespaceName,
309+
"name": functionName,
310+
})
311+
if err := requireToolOK(result, err, "pulsar_admin_functions get"); err != nil {
312+
return err
313+
}
314+
if err := assertFunctionUserConfig(firstText(result), "updated"); err != nil {
315+
return err
316+
}
317+
318+
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
319+
"operation": "delete",
320+
"tenant": tenant,
321+
"namespace": namespaceName,
322+
"name": functionName,
323+
})
324+
if err := requireToolOK(result, err, "pulsar_admin_functions delete"); err != nil {
325+
return err
326+
}
327+
212328
result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
213329
"resource": "topic",
214330
"operation": "create",
@@ -462,6 +578,119 @@ func listClusters(ctx context.Context, c *client.Client) ([]string, error) {
462578
return clusters, nil
463579
}
464580

581+
type functionStatus struct {
582+
NumRunning int `json:"numRunning"`
583+
}
584+
585+
func waitForFunctionRunning(ctx context.Context, c *client.Client, tenant, namespace, name string, timeout time.Duration) error {
586+
deadline := time.Now().Add(timeout)
587+
for time.Now().Before(deadline) {
588+
status, err := getFunctionStatus(ctx, c, tenant, namespace, name)
589+
if err == nil && status.NumRunning > 0 {
590+
return nil
591+
}
592+
select {
593+
case <-ctx.Done():
594+
return ctx.Err()
595+
case <-time.After(2 * time.Second):
596+
}
597+
}
598+
return fmt.Errorf("function %s did not reach running state within %s", name, timeout.String())
599+
}
600+
601+
func getFunctionStatus(ctx context.Context, c *client.Client, tenant, namespace, name string) (functionStatus, error) {
602+
result, err := callTool(ctx, c, "pulsar_admin_functions", map[string]any{
603+
"operation": "status",
604+
"tenant": tenant,
605+
"namespace": namespace,
606+
"name": name,
607+
})
608+
if err := requireToolOK(result, err, "pulsar_admin_functions status"); err != nil {
609+
return functionStatus{}, err
610+
}
611+
raw := firstText(result)
612+
if raw == "" {
613+
return functionStatus{}, errors.New("empty function status result")
614+
}
615+
var status functionStatus
616+
if err := json.Unmarshal([]byte(raw), &status); err != nil {
617+
return functionStatus{}, fmt.Errorf("failed to parse function status: %w", err)
618+
}
619+
return status, nil
620+
}
621+
622+
func assertFunctionUserConfig(raw string, key string) error {
623+
if raw == "" {
624+
return errors.New("empty function config result")
625+
}
626+
var payload map[string]interface{}
627+
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
628+
return fmt.Errorf("failed to parse function config: %w", err)
629+
}
630+
userConfig, ok := payload["userConfig"].(map[string]interface{})
631+
if !ok {
632+
return fmt.Errorf("missing userConfig in function config")
633+
}
634+
if _, ok := userConfig[key]; !ok {
635+
return fmt.Errorf("userConfig missing key: %s", key)
636+
}
637+
return nil
638+
}
639+
640+
type consumeResponse struct {
641+
MessagesConsumed int `json:"messages_consumed"`
642+
Messages []consumeMessage `json:"messages"`
643+
}
644+
645+
type consumeMessage struct {
646+
Data string `json:"data"`
647+
}
648+
649+
func waitForTriggerOutput(ctx context.Context, c *client.Client, topic, subscription, expected string, timeout time.Duration, verbose bool) error {
650+
deadline := time.Now().Add(timeout)
651+
for time.Now().Before(deadline) {
652+
result, err := callTool(ctx, c, "pulsar_client_consume", map[string]any{
653+
"topic": topic,
654+
"subscription-name": subscription,
655+
"initial-position": "earliest",
656+
"num-messages": float64(1),
657+
"timeout": float64(5),
658+
"subscription-type": "exclusive",
659+
"subscription-mode": "durable",
660+
"show-properties": false,
661+
"hide-payload": false,
662+
})
663+
if err := requireToolOK(result, err, "pulsar_client_consume trigger output"); err != nil {
664+
return err
665+
}
666+
667+
raw := firstText(result)
668+
if raw != "" {
669+
var resp consumeResponse
670+
if err := json.Unmarshal([]byte(raw), &resp); err != nil {
671+
return fmt.Errorf("failed to parse trigger output: %w", err)
672+
}
673+
if resp.MessagesConsumed > 0 {
674+
for _, msg := range resp.Messages {
675+
if strings.Contains(msg.Data, expected) {
676+
return nil
677+
}
678+
}
679+
return fmt.Errorf("trigger output does not contain expected value: %s", expected)
680+
}
681+
}
682+
683+
logf(verbose, "trigger output not ready yet, retrying")
684+
select {
685+
case <-ctx.Done():
686+
return ctx.Err()
687+
case <-time.After(2 * time.Second):
688+
}
689+
}
690+
691+
return fmt.Errorf("trigger output not available after %s", timeout.String())
692+
}
693+
465694
func runConcurrent(ctx context.Context, adminClient, testClient *client.Client, topic, subscription string) error {
466695
var wg sync.WaitGroup
467696
errCh := make(chan error, 2)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class EchoFunction(object):
2+
def process(self, input, context):
3+
return input

0 commit comments

Comments
 (0)