This example provides a Function to use as a service connector hub target. The function accepts a typed event containing a batch of messages.
Streaming The value in each streaming event is delivered as base64 encoded. The library automatically decodes it.
- [fn-events] for ConnectorHubFunction classes.
- [fn-events-testing] for ConnectorHubFunction testing library.
This example showcases how to use the fn-event ConnectorHubFunction to use a Function as the target for Streaming source.
Set up the connector hub with Streaming source and Function target:
The Function entrypoint extends the ConnectorHubFunction abstract class.
Note: the func.yaml entrypoint remains the class which extends ConnectorHubFunction
e.g. cmd: com.fnproject.fn.examples.Function::handler
package com.fnproject.fn.examples;
import com.fnproject.events.ConnectorHubFunction;
import com.fnproject.events.input.ConnectorHubBatch;
import com.fnproject.events.input.sch.StreamingData;
public class Function extends ConnectorHubFunction<StreamingData<Employee>> {
public StreamService streamService;
public Function() {
this.streamService = new StreamService();
}
@Override
public void handler(ConnectorHubBatch<StreamingData<Employee>> batch) {
for (StreamingData<Employee> stream : batch.getBatch()) {
streamService.readStream(stream);
}
}
}The ConnectorHubBatch.java
batch contains a list of events from Streaming as
specified in Batch Settings.
The class StreamingData.java represents the batch of Streaming Events.
The Employee.java represents the base64 encoded JSON from value in each message. Note: Provide a String type if the message value is not JSON format.
Function.java
public class Function extends ConnectorHubFunction<StreamingData<Employee>>.
To return an error response, throw RuntimeException.class. Doing so will cause the Function to return a 502 Retry policy
Unit testing ConnectorHubFunction is supported with the ConnectorHubTestFeature and FnTestingRule.
First of all, the class initializes the FnTestingRule harness, as explained
in Testing Functions.
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Date;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fnproject.events.input.ConnectorHubBatch;
import com.fnproject.events.input.sch.StreamingData;
import com.fnproject.events.testing.ConnectorHubTestFeature;
import com.fnproject.fn.testing.FnResult;
import com.fnproject.fn.testing.FnTestingRule;
import org.junit.Rule;
import org.junit.Test;
public class FunctionTest {
@Rule
public FnTestingRule fn = FnTestingRule.createDefault();
private final ConnectorHubTestFeature connectorHubTestFeature = ConnectorHubTestFeature.createDefault(fn);
@Test
public void testInvokeFunctionWithStreamingData() throws Exception {
ConnectorHubBatch<StreamingData<Employee>> event = createMinimalRequest();
connectorHubTestFeature.givenEvent(event).enqueue();
fn.thenRun(Function.class, "handler");
FnResult result = fn.getOnlyResult();
assertEquals(200, result.getStatus().getCode());
}
private static ConnectorHubBatch<StreamingData<Employee>> createMinimalRequest() throws JsonProcessingException {
Employee employee = new Employee();
employee.setName("foo");
StreamingData<Employee> source = new StreamingData<Employee>(
"stream-name",
"0",
null,
employee,
"3",
new Date(1764860467553L)
);
ConnectorHubBatch<StreamingData<Employee>> event = mock(ConnectorHubBatch.class);
when(event.getBatch()).thenReturn(Collections.singletonList(source));
return event;
}
}Use connectorHubTestFeature.givenEvent(event).enqueue(); to queue the request event
and invoke the Function with fn.thenRun(Function.class, "handler");.