Sample to show how process different types of avro subjects in a single topic#98
Sample to show how process different types of avro subjects in a single topic#98buddhike wants to merge 8 commits intoaws-samples:mainfrom
Conversation
nicusX
left a comment
There was a problem hiding this comment.
Many thanks for the contribution.
I would suggest some changes to make it simpler to understand, and also not to hint any not-so-good practice.
There was a problem hiding this comment.
.gitignore in the subfolder is not required.
There is a gitignore at top level. If there is anything missing there please update that one
| * Flink API: DataStream API | ||
| * Language: Java (11) | ||
|
|
||
| This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject types. |
There was a problem hiding this comment.
Explain this is specific to Confluent Schema Registry
|
|
||
| * Flink version: 1.20 | ||
| * Flink API: DataStream API | ||
| * Language: Java (11) |
There was a problem hiding this comment.
We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry
|
|
||
| This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). | ||
|
|
||
| A `KafkaSource` produces a stream of Avro data objects (`SpecificRecord`), fetching the writer's schema from AWS Glue Schema Registry. The Avro Kafka message value must have been serialized using AWS Glue Schema Registry. |
There was a problem hiding this comment.
I think you mean Confluent. Schema Registry?
| env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); | ||
| env.enableCheckpointing(60000); | ||
| } | ||
| env.setRuntimeMode(RuntimeExecutionMode.STREAMING); |
There was a problem hiding this comment.
This is not really required
| env.execute("avro-one-topic-many-subjects"); | ||
| } | ||
|
|
||
| private static void setupAirQualityGenerator(String bootstrapServers, String sourceTopic, String schemaRegistryUrl, Map<String, Object> schemaRegistryConfig, StreamExecutionEnvironment env) { |
There was a problem hiding this comment.
Even though having the data generator within the same Flink app works, we are deliberately avoiding doing it in any of the examples. The reason is that building jobs with multiple dataflows is strongly discouraged.
We are avoiding using any bad practice in examples, not to suggest it may be a good idea doing it.
I reckon it's more complicated, but you can add a separate module with a standalone Java application which generates data. Something similar to what we do in this example, even though in that case it's Kinesis
| * strategies and event time extraction. However, for those scenarios to work | ||
| * all subjects should have a standard set of fields. | ||
| */ | ||
| class Option { |
There was a problem hiding this comment.
Maybe you can use org.apache.flink.types.SerializableOptional<T> that comes with Flink
There was a problem hiding this comment.
Option type in this PR is a container type to hold any possible deserialized value. SerializableOptional<T> is for optional values. I guess it would not be the right choice here? Am I missing something? 👀 🙏🏾
BTW: I could have used Object instead of creating an Option with the Object type value field. However, having Option type helps if we want to generate watermarks via source operator using a common timestamp field.
Is there a better way to do this?
| } | ||
|
|
||
| // Custom deserialization schema for handling multiple generic Avro record types | ||
| class OptionDeserializationSchema implements KafkaRecordDeserializationSchema<Option> { |
There was a problem hiding this comment.
Please, move to a top level class for readability
| } | ||
| } | ||
|
|
||
| class RecordNameSerializer<T> implements KafkaRecordSerializationSchema<T> |
| env.setRuntimeMode(RuntimeExecutionMode.STREAMING); | ||
|
|
||
| Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP); | ||
| String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); |
There was a problem hiding this comment.
The code building the dataflow is a bit hard to follow.
I would suggest to do what we tend to do in other examples
- In runtime configuration, use a PropertyGroup for each source and sink, even if some configurations are repeated
- Instantiate Source and Sink in a local method, the
Propertieswhich contains all configuration for that specific component. Extract specific properties, like topic name, within the method rather than in the main() directly - Build the dataflow just attaching the operators one after the others, using intermediate streams variables only when it helps readability
- Avoid having methods that attach operators to the dataflow. Practically, any method which expects a
DataStreamorStreamingExecutionEnvironmentas a parameter should be avoided. - If an operator implementation like a map for a filter is simple, try using a lambda and inlining it. If the operator implementation is complex externalize the implementation to a separate class
See examples here
We are not following these patterns in all examples, but we are trying to converge as possible
|
@nicusX Thanks for reviewing this PR. I've addressed your points. Could you please take another look? 🙏🏾 |
There was a problem hiding this comment.
@nicusX Do you think this is a good approach to share run configurations with IntelliJ users?
There was a problem hiding this comment.
Uhm, I will avoid any IntelliJ specific. We are avoiding it in all other examples. People may have different setup. We give instructions.
nicusX
left a comment
There was a problem hiding this comment.
Splitting the data generator makes it much more readable.
I added some comments about few things to cleanup.
I would also remove the IntelliJ configuration. Instructions are more than sufficient.
There was a problem hiding this comment.
Uhm, I will avoid any IntelliJ specific. We are avoiding it in all other examples. People may have different setup. We give instructions.
| <maven.compiler.source>${target.java.version}</maven.compiler.source> | ||
| <main.class>com.amazonaws.services.msf.StreamingJob</main.class> | ||
|
|
||
| <scala.binary.version>2.12</scala.binary.version> |
| <artifactId>flink-connector-kafka</artifactId> | ||
| <version>${kafka.clients.version}</version> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
I don't think this dependency is required. This is a super-old dependency containing AWS connectors before they were part of the Apache Flink project
| <dependency> | ||
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-runtime-web</artifactId> | ||
| <version>${flink.version}</version> |
| <dependency> | ||
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-connector-base</artifactId> | ||
| <version>${flink.version}</version> |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
|
|
||
| if (isLocal(env)) { | ||
| env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration()); |
There was a problem hiding this comment.
Creating the local env with webUI is no longer required. With 1.20 the UI is automatically created when running locally if the flink-web dependency is included
| </filters> | ||
| <transformers> | ||
| <transformer | ||
| implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> |
There was a problem hiding this comment.
This is not needed for a plain-java application
| <goal>shade</goal> | ||
| </goals> | ||
| <configuration> | ||
| <artifactSet> |
There was a problem hiding this comment.
This is not needed for a plain java application.
It will actually prevent the packaged application from logging
| <exclude>org.apache.logging.log4j:*</exclude> | ||
| </excludes> | ||
| </artifactSet> | ||
| <filters> |
| <commons.version>1.9.0</commons.version> | ||
| </properties> | ||
|
|
||
| <repositories> |
There was a problem hiding this comment.
This is not required
io.confluent:kafka-avro-serializer are in Maven central
No description provided.