Example of PyFlink application reading from Kinesis Data Stream and writing to Kinesis Data Firehose
Example showing how to send data to Amazon Data Firehose from a PyFlink application.
- Flink version: 1.20
- Flink API: Table API & SQL
- Flink Connectors: Firehose Connector
- Language: Python
The application uses the Firehose connector to send JSON data directly to an Amazon Data Firehose delivery stream. Random data are generated internally, by the application.
- Python 3.11
- PyFlink library:
apache-flink==1.20.0 - Java JDK 11+ and Maven
⚠️ As of 2024-06-27, the Flink Python library 1.19.x may fail installing on Python 3.12. We recommend using Python 3.11 for development, the same Python version used by Amazon Managed Service for Apache Flink runtime 1.20.
JDK and Maven are uses to download and package any required Flink dependencies, e.g. connectors, and to package the application as
.zipfile, for deployment to Amazon Managed Service for Apache Flink.
The application expects an Amazon Data Firehose delivery stream with Direct PUT source.
The delivery stream name is defined in the configuration (see below).
The application defines no default name and region.
The configuration for local development set them, by default:
ExampleOutputFirehoseDeliveryStream in us-east-1.
The Firehose destination where the delivery stream write data and the destination settings are not relevant. You can send data to S3 without any transformation.
The application must have sufficient permissions to publish data to the Firehose delivery stream.
When running locally, you need active valid AWS credentials that allow publishing data to the delivery stream.
- Local development: uses the local file application_properties.json
- On Amazon Managed Service for Apache Fink: define Runtime Properties, using Group ID and property names based on the content of application_properties.json
For this application, the configuration properties to specify are:
| Group ID | Key | Mandatory | Example Value (default for local) | Notes |
|---|---|---|---|---|
OutputDeliveryStream0 |
stream.name |
Y | ExampleOutputFirehoseDeliveryStream |
Output delivery stream . |
OutputDeliveryStream0 |
aws.region |
Y | us-east-1 |
Region for the output delivery stream. |
In addition to these configuration properties, when running a PyFlink application in Managed Flink you need to set two Additional configuring for PyFink application on Managed Flink.
- Make sure you have created the Kinesis Streams and you have a valid AWS session that allows you to publish to the Streams (the way of doing it depends on your setup)
- Run
mvn packageonce, from this directory. This step is required to download the jar dependencies - the Kinesis connector in this case - Set the environment variable
IS_LOCAL=true. You can do from the prompt or in the run profile of the IDE - Run
main.py
You can also run the python script directly from the command line, like python main.py. This still require running mvn package before.
If you are using Virtual Environments, make sure the to select the venv as a runtime in your IDE.
If you forget the set the environment variable IS_LOCAL=true or forget to run mvn package the application fails on start.
🚨 The application does not log or print anything. If you do not see any output in the console, it does not mean the application is not running. The output is sent to the Kinesis streams. You can inspect the content of the streams using the Data Viewer in the Kinesis console
Note: if you modify the Python code, you do not need to re-run mvn package before running the application locally.
- Make sure you have the 4 required Kinesis Streams
- Create a Managed Flink application
- Modify the application IAM role to allow writing to all the 4 Kinesis Streams
- Package the application: run
mvn clean packagefrom this directory - Upload to an S3 bucket the zip file that the previous command creates in the
./targetsubdirectory - Configure the Managed Flink application: set Application Code Location to the bucket and zip file you just uploaded
- Configure the Runtime Properties of the application, creating the Group ID, Keys and Values as defined in the application_properties.json
- Start the application
- When the application transitions to "Ready" you can open the Flink Dashboard to verify the job is running, and you can inspect the data published to the destination of the delivery stream, e.g. an S3 bucket.
Follow this process to make changes to the Python code
- Modify the code locally (test/run locally, as required)
- Re-run
mvn clean package- if you skip this step, the zipfile is not updated, and contains the old Python script. - Upload the new zip file to the same location on S3 (overwriting the previous zip file)
- In the Managed Flink application console, enter Configure, scroll down and press Save Changes
- If your application was running when you published the change, Managed Flink stops the application and restarts it with the new code
- If the application was not running (in Ready state) you need to click Run to restart it with the new code
🚨 by design, Managed Flink does not detect the new zip file automatically. You control when you want to restart the application with the code changes. This is done saving a new configuration from the console or using the UpdateApplication API.
The application generates synthetic data using the DataGen connector. No external data generator is required.
Records are sent to a Firehose delivery stream, as JSON, without any transformations.
This examples also demonstrate how to include jar dependencies - e.g. connectors - in a PyFlink application, and how to package it, for deploying on Amazon Managed Service for Apache Flink.
Any jar dependencies must be added to the <dependencies> block in the pom.xml file.
In this case, you can see we have included flink-connector-aws-kinesis-firehose.
Executing mvn package takes care of downloading any defined dependencies and create a single "fat-jar" containing all of them.
This file, is generated in the ./target subdirectory and is called pyflink-dependencies.jar
The
./targetdirectory and any generated files are not supposed to be committed to git.
When running locally, for example in your IDE, PyFlink will look for this jar file in ./target.
When you are happy with your Python code and you are ready to deploy the application to Amazon Managed Service for Apache Flink,
run mvn package again. The zip file you find in ./target is the artifact to upload to S3, containing
both jar dependencies and your Python code.
To tell Managed Flink what Python script to run and the fat-jar containing all dependencies, you need to specific some additional Runtime Properties, as part of the application configuration:
| Group ID | Key | Mandatory | Value | Notes |
|---|---|---|---|---|
kinesis.analytics.flink.run.options |
python |
Y | main.py |
The Python script containing the main() method to start the job. |
kinesis.analytics.flink.run.options |
jarfile |
Y | lib/pyflink-dependencies.jar |
Location (inside the zip) of the fat-jar containing all jar dependencies. |