Client tool that exports the consumer lag of Kafka consumer groups to Prometheus or your terminal
A client tool that exports the consumer lag of a Kafka consumer group to different implementations such as Prometheus or your terminal. It utlizes Kafka's AdminClient and Kafka's Consumer's client in order to fetch such metrics. Consumer lag calculated as follows:
sum(topic_offset_per_partition-consumer_offset_per_partition)
Quoting this article:
What is Kafka Consumer Lag? Kafka Consumer Lag is the indicator of how much lag there is between Kafka producers and consumers....
Why is Consumer Lag Important? Many applications today are based on being able to process (near) real-time data. Think about performance monitoring system like Sematext Monitoring or log management service like Sematext Logs. They continuously process infinite streams of near real-time data. If they were to show you metrics or logs with too much delay – if the Consumer Lag were too big – they’d be nearly useless. This Consumer Lag tells us how far behind each Consumer (Group) is in each Partition. The smaller the lag the more real-time the data consumption.
In summary, consumer lag tells us 2 things:
Since this client uses Kafka Admin Client and Kafka Consumer client version of 2+, therefore this client supportes Kafka brokers from version 0.10.2+.
2.8.0
.Major Release:
You can downland the latest release of the Native application from here, currently it only supports Mac and Linux. An example from Prometheus component:
./kafka-consumer-lag-monitoring-prometheus-0.1.0 config.properties
Note to Mac users: You will need to verify the application, to do this, run:
xattr -r -d com.apple.quarantine kafka-consumer-lag-monitoring-prometheus-0.1.0
You can downland the latest release of the Uber JAR from here. This client requires at least Java 8 in order to run. You can run it like this for example from Console component:
java -jar kafka-consumer-lag-monitoring-console-0.1.0-all.jar -b kafka1:9092,kafka2:9092,kafka3:9092 -c "my_awesome_consumer_group_01" -p 5000
There two types of docker images:
Example:
docker run omarsmak/kafka-consumer-lag-monitoring-prometheus-native -p 9739:9739 \
-e kafka_bootstrap_servers=localhost:9092 \
-e kafka_retry_backoff.ms = 200 \
-e monitoring_lag_consumer_groups="test*" \
-e monitoring_lag_prometheus_http_port=9739 \
-e monitoring_lag_logging_rootLogger_appenderRef_stdout_ref=LogToConsole \
-e monitoring_lag_logging_rootLogger_level=info
This mode will print the consumer lag per partition and the total lag among all partitions and continuously refreshing the metrics per the value of --poll.interval
startup parameter. It accepts the following parameters:
./kafka-consumer-lag-monitoring-console-0.1.0 -h
130 ↵ omaral-safi@Omars-MBP-2
Usage: kafka-consumer-lag-monitoring-console [-hV] [-b=<kafkaBootstrapServers>]
[-c=<kafkaConsumerGroups>] [-f=<kafkaPropertiesFile>] [-p=<pollInterval>]
Prints the kafka consumer lag to the console.
-b, --bootstrap.servers=<kafkaBootstrapServers>
A list of host/port pairs to use for establishing the initial
connection to the Kafka cluster
-c, --consumer.groups=<kafkaConsumerGroups>
A list of Kafka consumer groups or list ending with star (*)
to fetch all consumers with matching pattern, e.g: 'test_v*'
-f, --properties.file=<kafkaPropertiesFile>
Optional. Properties file for Kafka AdminClient
configurations, this is the typical Kafka properties file
that can be used in the AdminClient. For more info, please
take a look at Kafka AdminClient configurations
documentation.
-h, --help Show this help message and exit.
-p, --poll.interval=<pollInterval>
Interval delay in ms to that refreshes the client lag
metrics, default to 2000ms
-V, --version Print version information and exit.
An example output:
./kafka-consumer-lag-monitoring-console-0.1.0 -b kafka1:9092,kafka2:9092,kafka3:9092 -c "my_awesome_consumer_group_01" -p 5000
Consumer group: my_awesome_consumer_group_01
==============================================================================
Topic name: topic_example_1
Total topic offsets: 211132248
Total consumer offsets: 187689403
Total lag: 23442845
Topic name: topic_example_2
Total topic offsets: 15763247
Total consumer offsets: 15024564
Total lag: 738683
Topic name: topic_example_3
Total topic offsets: 392
Total consumer offsets: 392
Total lag: 0
Topic name: topic_example_4
Total topic offsets: 24572
Total consumer offsets: 24570
Total lag: 2
Topic name: topic_example_5
Total topic offsets: 430
Total consumer offsets: 430
Total lag: 0
Topic name: topic_example_6
Total topic offsets: 6342
Total consumer offsets: 6335
Total lag: 7
./kafka-consumer-lag-monitoring-console-0.1.0 -c "test*" -b localhost:9092 -p 500
java -jar kafka-consumer-lag-monitoring-console-0.1.0-all.jar -c "test*" -b localhost:9092 -p 500
docker run omarsmak/kafka-consumer-lag-monitoring-console-native -c "test*" -b localhost:9092 -p 500
docker run omarsmak/kafka-consumer-lag-monitoring-console -c "test*" -b localhost:9092 -p 500
In this mode, the tool will start an http server on a port that being set in monitoring.lag.prometheus.http.port
config and it will expose an endpoint that is reachable via localhost:<http.port>/metrics
or localhost:<http.port>/prometheus
so prometheus server can scrap these metrics and expose them for example to grafana. You will need to pass the configuration as properties file or via environment variables. An example config file:
kafka.bootstrap.servers=localhost:9092
kafka.retry.backoff.ms = 200
monitoring.lag.consumer.groups=test*
monitoring.lag.prometheus.http.port=9772
monitoring.lag.logging.rootLogger.appenderRef.stdout.ref=LogToConsole
monitoring.lag.logging.rootLogger.level=info
And then you can run it like the following:
./kafka-consumer-lag-monitoring-prometheus-0.1.0 config.proprties
java -jar kafka-consumer-lag-monitoring-prometheus-0.1.0-all.jar config.proprties
For Docker, we will use the environment variables instead:
docker run omarsmak/kafka-consumer-lag-monitoring-prometheus-native -p 9739:9739 \
-e kafka_bootstrap_servers=localhost:9092 \
-e kafka_retry_backoff.ms = 200 \
-e monitoring_lag_consumer_groups="test*" \
-e monitoring_lag_prometheus_http_port=9739 \
-e monitoring_lag_logging_rootLogger_appenderRef_stdout_ref=LogToConsole \
-e monitoring_lag_logging_rootLogger_level=info
For Docker, we will use the environment variables instead:
docker run omarsmak/kafka-consumer-lag-monitoring-prometheus -p 9739:9739 \
-e kafka_bootstrap_servers=localhost:9092 \
-e kafka_retry_backoff.ms = 200 \
-e monitoring_lag_consumer_groups="test*" \
-e monitoring_lag_prometheus_http_port=9739 \
-e monitoring_lag_logging_rootLogger_appenderRef_stdout_ref=LogToConsole \
-e monitoring_lag_logging_rootLogger_level=info
Note: By default, port 9739
is exposed by the docker image, hence you should avoid overrding the client's HTTP port through the client's startup arguments (--http.port
) as described below when you run the client through docker container and leave it to the default of 9739
. However you can still change the corresponding docker mapped port to anything of your choice.
kafka_consumer_group_offset{group, topic, partition}
The latest committed offset of a consumer group in a given partition of a topic.
kafka_consumer_group_partition_lag{group, topic, partition}
The lag of a consumer group behind the head of a given partition of a topic. Calculated like this: current_topic_offset_per_partition - current_consumer_offset_per_partition
.
kafka_topic_latest_offsets{group, topic, partition}
The latest committed offset of a topic in a given partition.
kafka_consumer_group_total_lag{group, topic}
The total lag of a consumer group behind the head of a topic. This gives the total lags from all partitions over each topic, it provides good visibility but not a precise measurement since is not partition aware.
kafka_consumer_group_member_lag{group, member, topic}
The total lag of a consumer group member behind the head of a topic. This gives the total lags over each consumer member within consumer group.
kafka_consumer_group_member_partition_lag{group, member, topic, partition}
The lag of a consumer member within consumer group behind the head of a given partition of a topic.
Majority of the components here, for example Prometheus
components supports two types of configurations:
Application Properties File: You can provide the application a config properties file as argument e.g: ./kafka-consumer-lag-monitoring-prometheus-0.1.0 config.properties
, this is an example config:
```
kafka.bootstrap.servers=localhost:9092
kafka.retry.backoff.ms = 200
monitoring.lag.consumer.groups=test*
monitoring.lag.prometheus.http.port=9772
monitoring.lag.logging.rootLogger.appenderRef.stdout.ref=LogToConsole
monitoring.lag.logging.rootLogger.level=info
```
Note here the application accepts configs with two prefixes:
kafka.
: Use the kafka
prefix for any config related to Kafka admin client, these configs are basically the same configs that you will find here: https://kafka.apache.org/documentation/#adminclientconfigs/.monitoring.lag.
: Use the monitoring.lag
prefix to pass any config specific to this client, you will take a look which configs that the client will accept later.Environment Variables: You can as well pass the configs as environment variables, this is useful when running the application in environment like Docker, for example:
```
docker run --rm -p 9739:9739 \
-e monitoring_lag_logging_rootLogger_appenderRef_stdout_ref=LogToConsole \
-e monitoring_lag_consumer_groups="test-*" \
-e kafka_bootstrap_servers=host.docker.internal:9092 \
omarsmak/kafka-consumer-lag-monitoring-prometheus-native:latest
```
Similar to the application properties file, it supports kafka
and monitoring.lag
. However, you will need to replace all dot .
with underscore _
for all the configs, for example the config kafka.bootstrap.servers
its environment equivalent is kafka_bootstrap_servers
.
monitoring.lag.consumer.groups
: A list of Kafka consumer groups or list ending with star (*) to fetch all consumers with matching pattern, e.g: test_v*
.monitoring.lag.poll.interval
: Interval delay in ms to that refreshes the client lag metrics, default to 2000ms.monitoring.lag.prometheus.http.port
: Http port that is used to expose metrics in case, default to 9739.The client ships with Log4j bindings and supports JSON and standard logging. The default log4j properties that it uses:
# Log to console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
# Log to console as JSON
appender.json.type = Console
appender.json.name = LogInJSON
appender.json.layout.type = JsonLayout
appender.json.layout.complete = true
appender.json.layout.compact = false
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = LogInJSON
By default, LogInJSON
is enabled. However, you can customtize all of this by providing these configurations prefixed with monitoring.lag.logging.
. For example, to enable the standard logging, you will need to
add this config monitoring.lag.logging.rootLogger.appenderRef.stdout.ref=LogToConsole
or as environment variable: monitoring_lag_logging_rootLogger_appenderRef_stdout_ref=LogToConsole
.
Note: When configuring the logging through the environment variables, note that the configuration are case sensitive.
If you want to use this client embedded into your application, you can achieve that by adding a dependency to this tool in your pom.xml
or gradle.build
as explained below:
<dependency>
<groupId>com.omarsmak.kafka</groupId>
<artifactId>consumer-lag-monitoring</artifactId>
<version>0.1.1</version>
</dependency>
compile 'com.omarsmak.kafka:consumer-lag-monitoring:0.1.1'
import com.omarsmak.kafka.consumer.lag.monitoring.client.KafkaConsumerLagClient;
import com.omarsmak.kafka.consumer.lag.monitoring.client.KafkaConsumerLagClientFactory;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Properties;
public class ConsumerLagClientTest {
public static void main(String[] args){
// Create a Properties object to hold the Kafka bootstrap servers
final Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
// Create the client, we will use the Java client
final KafkaConsumerLagClient kafkaConsumerLagClient = KafkaConsumerLagClientFactory.create(properties);
// Print the lag of a Kafka consumer
System.out.println(kafkaConsumerLagClient.getConsumerLag("awesome-consumer"));
}
}
import com.omarsmak.kafka.consumer.lag.monitoring.client.KafkaConsumerLagClientFactory
import org.apache.kafka.clients.admin.AdminClientConfig
import java.util.Properties
object ConsumerLagClientTest {
@JvmStatic
fun main(arg: Array<String>) {
// Create a Properties object to hold the Kafka bootstrap servers
val properties = Properties().apply {
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka1:9092"
}
// Create the client, we will use the Kafka AdminClient Java client
val kafkaConsumerLagClient = KafkaConsumerLagClientFactory.create(properties)
// Print the lag of a Kafka consumer
println(kafkaConsumerLagClient.getConsumerLag("awesome-consumer"))
}
}
Run ./gradlew clean build
on the top project folder which is as result, it will run all tests and build the Uber jar.