Experiment with Kafka, Debezium, and ksqlDB. research-service: Performs MySQL record manipulation. Source Connectors: Monitor MySQL changes, push messages to Kafka. Sink Connectors and kafka-research-consumer: Listen to Kafka, insert/update Elasticsearch. ksqlDB-Server: Listens to Kafka, performs joins, and pushes new messages to new Kafka topics.
The goal of this project is to play with Kafka
, Debezium
and ksqlDB
. For this, we have: research-service
that inserts/updates/deletes records in MySQL
; Source Connectors
that monitor change of records in MySQL and push messages related to those changes to Kafka; Sink Connectors
and kafka-research-consumer
that listen messages from Kafka and insert/update documents in Elasticsearch
; finally, ksqlDB-Server
that listens some topics in Kafka, does some joins and pushes new messages to new topics in Kafka.
On ivangfr.github.io, I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
Monolithic Spring Boot
application that exposes a REST API to manage Institutes
, Articles
, Researchers
and Reviews
. The data is saved in MySQL
.
Spring Boot
application that listens messages from the topic reviews_researchers_institutes_articles
(that is one of ksqlDB
outputs) and save the payload of those messages (i.e, reviews with detailed information) in Elasticsearch
.
Open a terminal and, inside springboot-kafka-connect-debezium-ksqldb
root folder, run the following command
docker compose up -d
Note: During the first run, an image for
mysql
andkafka-connect
will be built, whose names arespringboot-kafka-connect-debezium-ksqldb_mysql
andspringboot-kafka-connect-debezium-ksqldb_kafka-connect
, respectively. To rebuild those images rundocker compose build
Wait for all Docker containers to be up and running. To check it, run
docker compose ps
In order to have topics in Kafka
with more than 1
partition, we must create them manually and not wait for the connectors to create for us. So, for it
In a terminal, make sure you are in springboot-kafka-connect-debezium-ksqldb
root folder
Run the script below
./create-kafka-topics.sh
It will create the topics mysql.researchdb.institutes
, mysql.researchdb.researchers
, mysql.researchdb.articles
and mysql.researchdb.reviews
with 5
partitions.
In a terminal, make sure you are in springboot-kafka-connect-debezium-ksqldb
root folder
Run the following curl
commands to create one Debezium
and two Elasticsearch-Sink
connectors in kafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/debezium-mysql-source-researchdb.json
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-institutes.json
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-articles.json
You can check the state of the connectors and their tasks on Kafka Connect UI
(http://localhost:8086) or calling kafka-connect
endpoint
curl localhost:8083/connectors/debezium-mysql-source-researchdb/status
curl localhost:8083/connectors/elasticsearch-sink-institutes/status
curl localhost:8083/connectors/elasticsearch-sink-articles/status
The state of the connectors and their tasks must be RUNNING
. If there is any problem, you can check kafka-connect
container logs.
docker logs kafka-connect
Open a new terminal and navigate to springboot-kafka-connect-debezium-ksqldb
root folder
Run the command below to start the application
./mvnw clean spring-boot:run --projects research-service -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
It will create some articles, institutes and researchers. If you don't want it, just set to false
the properties load-samples.articles.enabled
, load-samples.institutes.enabled
and load-samples.researchers.enabled
in application.yml
.
The Swagger link is http://localhost:9080/swagger-ui.html
Important: create at least one review
so that mysql.researchdb.reviews-key
and mysql.researchdb.reviews-value
are created in Schema Registry
. Below there is a sample request to create a review.
curl -i -X POST localhost:9080/api/reviews \
-H "Content-Type: application/json" \
-d "{ \"researcherId\": 1, \"articleId\": 1, \"comment\": \"Ln 56: replace the 'a' by 'an'\"}"
Open a new terminal and, inside springboot-kafka-connect-debezium-ksqldb
root folder, run the docker
command below to start ksqlDB-cli
docker run -it --rm --name ksqldb-cli \
--network springboot-kafka-connect-debezium-ksqldb_default \
-v $PWD/docker/ksql/researchers-institutes.ksql:/tmp/researchers-institutes.ksql \
-v $PWD/docker/ksql/reviews-researchers-institutes-articles.ksql:/tmp/reviews-researchers-institutes-articles.ksql \
confluentinc/cp-ksqldb-cli:7.4.1 http://ksqldb-server:8088
On ksqlDB-cli
command line, run the following commands
Set auto.offset.reset
value
SET 'auto.offset.reset' = 'earliest';
Run the following script. It will create researchers_institutes
topic
RUN SCRIPT '/tmp/researchers-institutes.ksql';
Check whether the topic was created
DESCRIBE "researchers_institutes";
SELECT * FROM "researchers_institutes" EMIT CHANGES LIMIT 5;
Run the script below. It will create reviews_researchers_institutes_articles
topic
RUN SCRIPT '/tmp/reviews-researchers-institutes-articles.ksql';
Check whether the topic was created
DESCRIBE "reviews_researchers_institutes_articles";
SELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES LIMIT 1;
In a terminal, make sure you are in springboot-kafka-connect-debezium-ksqldb
root folder
Run the curl
command below to create elasticsearch-sink-researchers
connector in kafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-researchers.json
You can check the state of the connector and its task on Kafka Connect UI
(http://localhost:8086) or calling kafka-connect
endpoint
curl localhost:8083/connectors/elasticsearch-sink-researchers/status
Open a new terminal and navigate to springboot-kafka-connect-debezium-ksqldb
root folder
Run the command below to start the application
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
The command below generates the Java class
ReviewMessage
from Avro file present insrc/main/resources/avro
./mvnw generate-sources --projects kafka-research-consumer
This service runs on port 9081
. The health
endpoint is http://localhost:9081/actuator/health
[Optional] We can start another kafka-research-consumer
instance by opening another terminal and running
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
Go to the terminal where ksql-cli
is running
On ksql-cli
command line, run the following query
SELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES;
In another terminal, call the research-service
simulation endpoint
curl -X POST localhost:9080/api/simulation/reviews \
-H "Content-Type: application/json" \
-d "{\"total\": 100, \"sleep\": 100}"
The GIF below shows it. research-service
is running in the upper left terminal; kafka-research-consumer
is running in the upper right terminal; the middle terminal is used to submit the POST request to research-service
. The lower terminal is where ksql-cli
is running.
You can also query Elasticsearch
curl "localhost:9200/reviews/_search?pretty"
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085
Kafka Connect UI
Kafka Connect UI
can be accessed at http://localhost:8086
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001
Schema Registry
You can use curl
to check the subjects in Schema Registry
curl localhost:8081/subjects
mysql.researchdb.researchers-value
curl localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000
Configuration
Cluster
(dropdown on the header) and then on Add Cluster
Cluster Name
field, for example: MyCluster
zookeeper:2181
in Cluster Zookeeper Hosts
fieldPoll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
Save
button at the bottom of the page.Elasticsearch
Elasticsearch
can be accessed at http://localhost:9200
curl "localhost:9200/_cat/indices?v"
curl "localhost:9200/articles/_search?pretty"
curl "localhost:9200/institutes/_search?pretty"
curl "localhost:9200/researchers/_search?pretty"
curl "localhost:9200/reviews/_search?pretty"
MySQL
Access MySQL monitor
docker exec -it -e MYSQL_PWD=secret mysql mysql -uroot --database researchdb
Inside the monitor, run the following SELECT
command
SELECT a.id AS review_id, c.id AS article_id, c.title AS article_title, b.id AS reviewer_id, b.first_name, b.last_name, b.institute_id, a.comment \
FROM reviews a, researchers b, articles c \
WHERE a.researcher_id = b.id and a.article_id = c.id;
Type
exit
to leaveMySQL
terminal
research-service
and kafka-research-consumer
are running and press Ctrl+C
to stop themksql-cli
is running and press Ctrl+C
to stop the SELECT
; then, type exit
springboot-kafka-connect-debezium-ksqldb
root folder, run the command below
docker compose down -v
To remove the Docker images created by this project, go to a terminal and, inside springboot-kafka-connect-debezium-ksqldb
root folder, run the following script
./remove-docker-images.sh
alias
for them.