Project goal: Explore Kafka, Kafka Connect, and Kafka Streams. Components: store-api: Inserts/updates MySQL records. Source Connectors: Monitor MySQL changes, push messages to Kafka. Sink Connectors: Listen to Kafka, insert/update Elasticsearch docs. store-streams: Listens to Kafka, processes with Kafka Streams, pushes new messages to Kafka.
The main goal of this project is to play with Kafka
, Kafka Connect
and Kafka Streams
. For this, we have: store-api
that inserts/updates records in MySQL
; Source Connectors
that monitor inserted/updated records in MySQL
and push messages related to those changes to Kafka
; Sink Connectors
that listen messages from Kafka
and insert/update documents in Elasticsearch
; finally, store-streams
that listens messages from Kafka
, treats them using Kafka Streams
and push new messages back to Kafka
.
On ivangfr.github.io, I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
Monolithic Spring Boot
application that exposes a REST API to manage Customers
, Products
and Orders
. The data is stored in MySQL
.
Spring Boot
application that connects to Kafka
and uses Kafka Streams API
to transform some "input" topics into a new "output" topic in Kafka
.
In order to run this project, you can use JSON
or Avro
format to serialize/deserialize data to/from the binary
format used by Kafka. The default format is JSON
. Throughout this document, I will point out what to do if you want to use Avro
.
Open a terminal and inside springboot-kafka-connect-jdbc-streams
root folder run:
docker compose up -d
Note: During the first run, an image for
kafka-connect
will be built, whose name isspringboot-kafka-connect-jdbc-streams_kafka-connect
. Run the command below to rebuild it.docker compose build
Wait for all Docker containers to be up and running. To check it, run:
docker compose ps
In order to have topics in Kafka
with more than 1
partition, we have to create them manually and not let the connectors to create them for us. So, for it:
Open a new terminal and make sure you are in springboot-kafka-connect-jdbc-streams
root folder;
Run the script below:
./create-kafka-topics.sh
It will create the topics mysql.storedb.customers
, mysql.storedb.products
, mysql.storedb.orders
, mysql.storedb.orders_products
with 5
partitions.
Connectors use Converters
for data serialization and deserialization. If you are configuring For JSON (de)serialization
, the converter used is JsonConverter
. On the other hand, the converter used is AvroConverter
.
Important: if the
Source Connector Converter
serializes data, for instance, fromJSON
tobytes
(usingJsonConverter
), then theSink Connector Converter
must also useJsonConverter
to deserialize thebytes
, otherwise an error will be thrown. The document Kafka Connect Deep Dive – Converters and Serialization Explained explains it very well.
Steps to create the connectors:
In a terminal, navigate to springboot-kafka-connect-jdbc-streams
root folder
Run the following script to create the connectors on kafka-connect
:
For JSON (de)serialization
./create-connectors-jsonconverter.sh
For Avro (de)serialization
./create-connectors-avroconverter.sh
You can check the state of the connectors and their tasks on Kafka Connect UI
or running the following script:
./check-connectors-state.sh
Once the connectors and their tasks are ready (RUNNING
state), you should see something like:
{"name":"mysql-source-customers","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"}
{"name":"mysql-source-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"}
{"name":"mysql-source-orders","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"}
{"name":"mysql-source-orders_products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"}
{"name":"elasticsearch-sink-customers","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"}
{"name":"elasticsearch-sink-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"}
{"name":"elasticsearch-sink-orders","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"}
On Kafka Connect UI
(http://localhost:8086), you should see:
If there is any problem, you can check kafka-connect
container logs:
docker logs kafka-connect
store-api
Open a new terminal and make sure you are in springboot-kafka-connect-jdbc-streams
root folder.
Run the command below to start the application:
./mvnw clean spring-boot:run --projects store-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
Note
It will create all tables, such as:
customers
,products
,orders
andorders_products
. We are usingspring.jpa.hibernate.ddl-auto=update
configuration.It will also insert some customers and products. If you don't want it, just set to
false
the propertiesload-samples.customers.enabled
andload-samples.products.enabled
inapplication.yml
.
store-streams
Open a new terminal and inside springboot-kafka-connect-jdbc-streams
root folder.
To start application, run:
For JSON (de)serialization
./mvnw clean spring-boot:run --projects store-streams -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
For Avro (de)serialization
Warning: Unable to run in this mode in my machine! The application starts fine when using
avro
profile but, when the 1st event arrives, theorg.apache.kafka.common.errors.SerializationException: Unknown magic byte!
is thrown. The problem doesn't happen while Running Applications as Docker containers.
./mvnw clean spring-boot:run --projects store-streams -Dspring-boot.run.jvmArguments="-Dserver.port=9081" -Dspring-boot.run.profiles=avro
The command below generates Java classes from Avro files present in
src/main/resources/avro
./mvnw generate-sources --projects store-streams
In a terminal, make sure you are inside springboot-kafka-connect-jdbc-streams
root folder;
Run the following script to build the application's docker image:
./docker-build.sh
store-api
Environment Variable | Description |
---|---|
MYSQL_HOST |
Specify host of the MySQL database to use (default localhost ) |
MYSQL_PORT |
Specify port of the MySQL database to use (default 3306 ) |
store-streams
Environment Variable | Description |
---|---|
KAFKA_HOST |
Specify host of the Kafka message broker to use (default localhost ) |
KAFKA_PORT |
Specify port of the Kafka message broker to use (default 29092 ) |
SCHEMA_REGISTRY_HOST |
Specify host of the Schema Registry to use (default localhost ) |
SCHEMA_REGISTRY_PORT |
Specify port of the Schema Registry to use (default 8081 ) |
In a terminal, make sure you are inside springboot-kafka-connect-jdbc-streams
root folder;
In order to run the application's docker containers, you can pick between JSON
or Avro
:
./start-apps.sh
./start-apps.sh avro
Application | URL |
---|---|
store-api | http://localhost:9080/swagger-ui.html |
store-streams | http://localhost:9081/actuator/health |
Let's simulate an order creation. In this example, customer with id 1
{"id":1, "name":"John Gates", "email":"[email protected]", "address":"street 1", "phone":"112233"}
will order one unit of the product with id 15
{"id":15, "name":"iPhone Xr", "price":900.00}
In a terminal, run the following curl
command:
curl -i -X POST localhost:9080/api/orders \
-H 'Content-Type: application/json' \
-d '{"customerId": 1, "paymentType": "BITCOIN", "status": "OPEN", "products": [{"id": 15, "unit": 1}]}'
The response should be:
HTTP/1.1 201
{
"id": "47675629-4f0d-440d-b6df-c829874ee2a6",
"customerId": 1,
"paymentType": "BITCOIN",
"status": "OPEN",
"products": [{"id": 15, "unit": 1}]
}
Checking Elasticsearch
:
curl "localhost:9200/store.streams.orders/_search?pretty"
We should have one order with a customer and products names:
{
"took" : 844,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "store.streams.orders",
"_type" : "order",
"_id" : "47675629-4f0d-440d-b6df-c829874ee2a6",
"_score" : 1.0,
"_source" : {
"payment_type" : "BITCOIN",
"created_at" : 1606821792360,
"id" : "47675629-4f0d-440d-b6df-c829874ee2a6",
"customer_name" : "John Gates",
"customer_id" : 1,
"status" : "OPEN",
"products" : [
{
"unit" : 1,
"price" : 900,
"name" : "iPhone Xr",
"id" : 15
}
]
}
}
]
}
}
In order to create random orders, we can use also the simulation
:
curl -i -X POST localhost:9080/api/simulation/orders \
-H 'Content-Type: application/json' \
-d '{"total": 10, "sleep": 100}'
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085
Kafka Connect UI
Kafka Connect UI
can be accessed at http://localhost:8086
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001
Schema Registry
You can use curl
to check the subjects in Schema Registry
curl localhost:8081/subjects
mysql.storedb.customers-value
curl localhost:8081/subjects/mysql.storedb.customers-value/versions/latest
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000
Configuration
Cluster
(dropdown on the header) and then on Add Cluster
;Cluster Name
field, for example: MyCluster
;zookeeper:2181
in Cluster Zookeeper Hosts
field;Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
;Save
button at the bottom of the page.Elasticsearch
Elasticsearch
can be accessed at http://localhost:9200
curl "localhost:9200/_cat/indices?v"
curl "localhost:9200/mysql.storedb.customers/_search?pretty"
curl "localhost:9200/mysql.storedb.products/_search?pretty"
curl "localhost:9200/store.streams.orders/_search?pretty"
MySQL
docker exec -it -e MYSQL_PWD=secret mysql mysql -uroot --database storedb
select * from orders;
Maven
, go to the terminals where they are running and press Ctrl+C
;springboot-kafka-connect-jdbc-streams
root folder, run the script below:
./stop-apps.sh
springboot-kafka-connect-jdbc-streams
root folder, run the following command:
docker compose down -v
To remove the Docker images created by this project, go to a terminal and, inside springboot-kafka-connect-jdbc-streams
root folder, run the script below:
./remove-docker-images.sh
price
field, numeric.mapping doesn't work for DECIMAL fields #563. For now, the workaround is using String
instead of BigDecimal
as type for this field.