Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

README.md

AVRO serialization in FlinkKinesisConsumer and KinesisStreamsSink using AWS Glue Schema Registry

  • Flink version: 1.15
  • Flink API: DataStream API
  • Language Java (11)

This example demonstrates how to serialize/deserialize AVRO messages in Amazon Kinesis Data Streams sources and sinks, using AWS Glue Schema Registry.

This example uses AVRO generated classes (more details, below)

The reader's schema definition, for the source, and the writer's schema definition, for the sink, are provided as AVRO IDL (.avdl) in ./src/main/resources/avro.

A FlinkKinesisConsumer produces a stream of AVRO data objects (SpecificRecords), fetching the writer's schema from AWS Glue Schema Registry. The AVRO Kinesis message value must have been serialized using AWS Glue Schema Registry.

A KinesisStreamsSink serializes AVRO data objects as Kinesis data payload value, and uses a String from the record as Kinesis partition Key.

Pre-requisites

In order for to have this sample running locally or in Amazon Managed Service For Apache Flink, you will need the following:

  • Existing Kinesis Data Stream (Please add Kinesis Data Stream Name and Region in flink-application-properties-dev.json)
  • Glue Schema Registry (Please add your Glue Schema Registry nName and Region in flink-application-properties-dev.json )
  • Trade Producer sending data into a Kinesis Data Stream

You can modify the input-reader-schema.avdl, if you wish to send different data.

Flink compatibility

Note: This project is compatible with Flink 1.15+ and Kinesis Data Analytics for Apache Flink.

Flink API compatibility

This example shows how to use AWS Glue Schema Registry with the Flink Java DataStream API.

It uses the FlinkKinesisConsumer and newer KinesisStreamsSink (as opposed to Kinesis Producer that does not use AWS V2 SDK for Java).

At the moment, no format provider is available for the Table API.

Notes about using AVRO with Apache Flink

AVRO-generated classes

This project uses classes generated at built-time as data objects.

As a best practice, only the AVRO schema definitions (IDL .avdl files in this case) are included in the project source code.

AVRO Maven plugin generates the Java classes (source code) at build-time, during the generate-source phase.

The generated classes are written into ./target/generated-sources/avro directory and should not be committed with the project source.

This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build.

Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the AVRO classes (Trade and BuySell) when you import the project for the first time, you may manually run mvn generate-sources once of force source code generation from the IDE.

AVRO-generated classes (SpecificRecord) in Apache Flink

Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the Flink state, has an additional benefit. Flink will natively and efficiently serialize and deserialize these objects, without risking of falling back to Kryo.

AVRO and AWS Glue Schema Registry dependencies

The following dependencies related to AVRO and AWS Glue Schema Registry are included (for FLink 1.15.2):

  1. org.apache.flink:flink-avro-glue-schema-registry_2.12:1.15.2 - Support for AWS Glue Schema Registry SerDe
  2. org.apache.avro:avro:1.10.2 - Overrides AVRO 1.10.0, transitively included.

The project also includes org.apache.flink:flink-avro:1.15.2. This is already a transitive dependency from the Glue Schema Registry SerDe and is defined explicitly only for clarity.

Note that we are overriding AVRO 1.10.0 with 1.10.2. This minor version upgrade does not break the internal API, and includes some bug fixes introduced with AVRO 1.10.1 and 1.10.2.

Running in Intellij

To start the Flink job in IntelliJ edit the Run/Debug configuration enabling 'Add dependencies with "provided" scope to the classpath'.