Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Window with AggregateFunction.
🚨 August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink.
Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Window with AggregateFunction.
Contents:
The Architecture of this starter kit is shown in the below diagram
The following AWS services are required to deploy this starter kit:
mvn -X clean install
-X clean install
. Navigation: Project right click --> Run As --> Maven Build (Option 4)amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar
.You can deploy the Starter Kit using either AWS CLI or AWS Console.
The Starter Kit requires the following properties
Key | Value | Description |
---|---|---|
region | us-east-1 | AWS region |
input_stream_name | kda_flink_starter_kit_kinesis_stream | Input Kinesis Data Stream Name |
session_time_out_in_minutes | 10 | Session timeout in minutes |
stream_initial_position | TRIM_HORIZON | Refer documentation here for more details |
s3_output_path | s3a://<bucket_name>/kda_flink_starter_kit_output | s3 path for Flink Application output |
bucket_check_interval_in_seconds | 2 | interval for checking time based rolling policies |
rolling_interval_in_seconds | 2 | the max time a part file can stay open before having to roll |
inactivity_interval_in_seconds | 2 | Sets the interval of allowed inactivity after which a part file will have to roll |
Log onto AWS console and go to S3, select the bucket you will use. If not create a new bucket and go to the bucket
Create a folder with name kda_flink_starter_kit_jar
Create a folder with name kda_flink_starter_kit_output
Open command prompt on your Laptop / MacBook
Upload Flink Application Jar file to S3 bucket
aws s3 cp amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar s3://bucket_name/kda_flink_starter_kit_jar/
Create Kinesis Stream
aws kinesis create-stream --stream-name kda_flink_starter_kit_kinesis_stream --shard-count 4
Create IAM policies. On your terminal, navigate to folder /amazon-kinesis-data-analytics-flink-starter-kit/src/main/resources
Policy for CloudWatch Logs
aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch_logs \
--policy-document file://flink_starter_kit_iam_policy_cloudwatch_logs.json
Policy for CloudWatch
aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch \
--policy-document file://flink_starter_kit_iam_policy_cloudwatch.json
Policy for Kinesis Data Stream
aws iam create-policy --policy-name flink_starter_kit_iam_policy_kinesis \
--policy-document file://flink_starter_kit_iam_policy_kinesis.json
Policy for S3
aws iam create-policy --policy-name flink_starter_kit_iam_policy_s3 \
--policy-document file://flink_starter_kit_iam_policy_s3.json
Create an IAM role
aws iam create-role --role-name flink_starter_kit_role --assume-role-policy-document file://flink_starter_kit_assume-role-policy-document.json
Attach policies to IAM role flink_starter_kit_role
. Replace <1234567890> with your AWS Account Id before running the commands.
Policy for CloudWatch Logs
aws iam attach-role-policy --role-name flink_starter_kit_role \
--policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch_logs
Policy for CloudWatch
aws iam attach-role-policy --role-name flink_starter_kit_role \
--policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch
Policy for Kinesis
aws iam attach-role-policy --role-name flink_starter_kit_role \
--policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_kinesis
Policy for S3
aws iam attach-role-policy --role-name flink_starter_kit_role \
--policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_s3
Open flink_starter_kit_def_stream_position_trim_horizon.json and update the following values:
Create Log group in CloudWatch Logs
aws logs create-log-group --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit
Create Log stream in under the Log group
aws logs create-log-stream --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit \
--log-stream-name kda_flink_starter_kit
Run this command to create Kinesis Data Analytics Flink application
aws kinesisanalyticsv2 create-application \
--cli-input-json file://flink_starter_kit_def_stream_position_trim_horizon.json
Run this command to start the application
aws kinesisanalyticsv2 start-application \
--cli-input-json file://flink_starter_kit_start_configuration.json
kda_flink_starter_kit
flink_starter_kit_iam_policy_s3
using Policy summary sample
flink_starter_kit_iam_policy_kinesis
using Policy summary sample
flink_starter_kit_iam_policy_cloudwatch
using Policy summary sample
flink_starter_kit_iam_policy_cloudwatch_logs
using Policy summary sample
kda_flink_starter_kit
and attach above policieskda_flink_starter_kit_kinesis_stream
6
amazon_kda_flink_starter_kit
amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar
the IAM role created above
FlinkAppProperties
. Create properties defined in the section Flink Application Properties
You can use Amazon Kinesis Data Analytics Flink – Benchmarking Utility to generate sample data, test Apache Flink Session Window, and to prove the architecture of this starter kit.
The future releases of this starter kit will include the following features
Contributions are welcome, refer CONTRIBUTING.md for more details.
This sample code is made available under the MIT-0 license. See the LICENSE file.