Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions java/GettingStarted/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

Skeleton project for a basic Flink Java application to run on Amazon Managed Service for Apache Flink.

* Flink version: 1.20
* Flink version: 2.2
* Flink API: DataStream API
* Language: Java (11)
* Flink connectors: Kinesis Consumer, Kinesis Sink
* Language: Java (17)
* Flink connectors: Kinesis Streams Source, Kinesis Streams Sink

The project can run both on Amazon Managed Service for Apache Flink, and locally for development.

Expand All @@ -19,12 +19,11 @@ When running locally, the configuration is read from the [`resources/flink-appli

Runtime parameters:

| Group ID | Key | Description |
|-----------------|---------------|---------------------------|
| `InputStream0` | `stream.name` | Name of the input stream |
| `InputStream0` | `aws.region` | (optional) Region of the input stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
| `OutputStream0` | `stream.name` | Name of the output stream |
| `OutputStream0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
| Group ID | Key | Description |
|-----------------|---------------|-----------------------------------|
| `InputStream0` | `stream.arn` | ARN of the input Kinesis stream |
| `OutputStream0` | `stream.name` | Name of the output Kinesis stream |
| `OutputStream0` | `aws.region` | Region of the output Kinesis stream |

All parameters are case-sensitive.

Expand Down
31 changes: 7 additions & 24 deletions java/GettingStarted/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,19 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<target.java.version>17</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<aws.connector.version>5.0.0-1.20</aws.connector.version>
<flink.version>2.2.0</flink.version>
<aws.connector.version>6.0.0-2.0</aws.connector.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.12.677</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand All @@ -65,20 +47,21 @@
<scope>provided</scope>
</dependency>

<!-- Connectors -->
<!-- Connector base classes (AsyncSinkBase, etc.) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Connectors: Kinesis source and sink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${aws.connector.version}</version>
</dependency>


<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.amazonaws.services.msf;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -42,17 +42,18 @@ private static Map<String, Properties> loadApplicationProperties(StreamExecution
}
}

private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
String inputStreamName = inputProperties.getProperty("stream.name");
return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
private static KinesisStreamsSource<String> createSource(Properties inputProperties) {
return KinesisStreamsSource.<String>builder()
.setStreamArn(inputProperties.getProperty("stream.arn"))
.setDeserializationSchema(new SimpleStringSchema())
.build();
}

private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
String outputStreamName = outputProperties.getProperty("stream.name");
return KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(outputProperties)
.setStreamName(outputProperties.getProperty("stream.name"))
.setSerializationSchema(new SimpleStringSchema())
.setStreamName(outputStreamName)
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
.build();
}
Expand All @@ -64,10 +65,10 @@ public static void main(String[] args) throws Exception {
// Load application parameters
final Map<String, Properties> applicationParameters = loadApplicationProperties(env);

SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
DataStream<String> input = env.addSource(source, "Kinesis Source");
KinesisStreamsSource<String> source = createSource(applicationParameters.get("InputStream0"));
DataStream<String> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kinesis Source", TypeInformation.of(String.class));

Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
KinesisStreamsSink<String> sink = createSink(applicationParameters.get("OutputStream0"));
input.sinkTo(sink);

env.execute("Flink streaming Java API skeleton");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
{
"PropertyGroupId": "InputStream0",
"PropertyMap": {
"aws.region": "us-east-1",
"stream.name": "ExampleInputStream"
"stream.arn": "arn:aws:kinesis:us-east-1:<account-id>:stream/ExampleInputStream"
}
},
{
"PropertyGroupId": "OutputStream0",
"PropertyMap": {
"aws.region": "us-east-1",
"stream.name": "ExampleOutputStream"
"stream.name": "ExampleOutputStream",
"aws.region": "us-east-1"
}
}
]
Loading