Skip to content

Latest commit

 

History

History
133 lines (99 loc) · 5.1 KB

File metadata and controls

133 lines (99 loc) · 5.1 KB

Example Fn Java FDK : Service Connector Hub - Streaming

This example provides a Function to use as a service connector hub target. The function accepts a typed event containing a batch of messages.

Source

Streaming The value in each streaming event is delivered as base64 encoded. The library automatically decodes it.

Dependencies

  • [fn-events] for ConnectorHubFunction classes.
  • [fn-events-testing] for ConnectorHubFunction testing library.

Demonstrated FDK features

This example showcases how to use the fn-event ConnectorHubFunction to use a Function as the target for Streaming source.

Step by step

Set up the connector hub with Streaming source and Function target:

The Function entrypoint extends the ConnectorHubFunction abstract class. Note: the func.yaml entrypoint remains the class which extends ConnectorHubFunction e.g. cmd: com.fnproject.fn.examples.Function::handler

Function.java

package com.fnproject.fn.examples;

import com.fnproject.events.ConnectorHubFunction;
import com.fnproject.events.input.ConnectorHubBatch;
import com.fnproject.events.input.sch.StreamingData;

public class Function extends ConnectorHubFunction<StreamingData<Employee>> {

    public StreamService streamService;

    public Function() {
        this.streamService = new StreamService();
    }

    @Override
    public void handler(ConnectorHubBatch<StreamingData<Employee>> batch) {
        for (StreamingData<Employee> stream : batch.getBatch()) {
            streamService.readStream(stream);
        }
    }
}

The ConnectorHubBatch.java batch contains a list of events from Streaming as specified in Batch Settings.

The class StreamingData.java represents the batch of Streaming Events.

The Employee.java represents the base64 encoded JSON from value in each message. Note: Provide a String type if the message value is not JSON format.

Function.java public class Function extends ConnectorHubFunction<StreamingData<Employee>>.

To return an error response, throw RuntimeException.class. Doing so will cause the Function to return a 502 Retry policy

Test walkthrough

Unit testing ConnectorHubFunction is supported with the ConnectorHubTestFeature and FnTestingRule.

First of all, the class initializes the FnTestingRule harness, as explained in Testing Functions.

FunctionTest.java

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Date;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fnproject.events.input.ConnectorHubBatch;
import com.fnproject.events.input.sch.StreamingData;
import com.fnproject.events.testing.ConnectorHubTestFeature;
import com.fnproject.fn.testing.FnResult;
import com.fnproject.fn.testing.FnTestingRule;
import org.junit.Rule;
import org.junit.Test;

public class FunctionTest {

    @Rule
    public FnTestingRule fn = FnTestingRule.createDefault();

    private final ConnectorHubTestFeature connectorHubTestFeature = ConnectorHubTestFeature.createDefault(fn);

    @Test
    public void testInvokeFunctionWithStreamingData() throws Exception {

        ConnectorHubBatch<StreamingData<Employee>> event = createMinimalRequest();
        connectorHubTestFeature.givenEvent(event).enqueue();

        fn.thenRun(Function.class, "handler");

        FnResult result = fn.getOnlyResult();
        assertEquals(200, result.getStatus().getCode());
    }

    private static ConnectorHubBatch<StreamingData<Employee>> createMinimalRequest() throws JsonProcessingException {
        Employee employee = new Employee();
        employee.setName("foo");

        StreamingData<Employee> source = new StreamingData<Employee>(
            "stream-name",
            "0",
            null,
            employee,
            "3",
            new Date(1764860467553L)
        );
        ConnectorHubBatch<StreamingData<Employee>> event = mock(ConnectorHubBatch.class);

        when(event.getBatch()).thenReturn(Collections.singletonList(source));
        return event;
    }
}

Use connectorHubTestFeature.givenEvent(event).enqueue(); to queue the request event and invoke the Function with fn.thenRun(Function.class, "handler");.