The Apache Kafka C/C++ library
librdkafka v2.3.0 is a feature release:
rd_kafka_DescribeTopics
function (#4300, #4451).DescribeCluster()
and DescribeTopics()
(#4240, @jainruchir).retry.backoff.ms
as minimum backoff and retry.backoff.max.ms
as the
maximum backoff, with 20% jitter (#4422).rd_kafka_query_watermark_offsets
continuing beyond timeout expiry (#4460).rd_kafka_query_watermark_offsets
not refreshing the partition leader
after a leader change and subsequent NOT_LEADER_OR_FOLLOWER
error (#4225).retry.backoff.ms
:
If it is set greater than retry.backoff.max.ms
which has the default value of 1000 ms then it is assumes the value of retry.backoff.max.ms
.
To change this behaviour make sure that retry.backoff.ms
is always less than retry.backoff.max.ms
.
If equal then the backoff will be linear instead of exponential.
topic.metadata.refresh.fast.interval.ms
:
If it is set greater than retry.backoff.max.ms
which has the default value of 1000 ms then it is assumes the value of retry.backoff.max.ms
.
To change this behaviour make sure that topic.metadata.refresh.fast.interval.ms
is always less than retry.backoff.max.ms
.
If equal then the backoff will be linear instead of exponential.
rd_kafka_query_watermark_offsets
was not enforced after
making the necessary ListOffsets requests, and thus, it never timed out in
case of broker/network issues. Fixed by setting an absolute timeout (#4460).OUT_OF_ORDER_SEQUENCE_NUMBER
.
The error could contain the message "sequence desynchronization" with
just one possibly persisted error or "rewound sequence number" in case of
multiple errored messages.
Solved by treating the possible persisted message as not persisted,
and expecting a DUPLICATE_SEQUENCE_NUMBER
error in case it was or
NO_ERROR
in case it wasn't, in both cases the message will be considered
delivered (#4438).rd_kafka_poll_set_consumer
, along with a consume callback, and then
calling rd_kafka_poll
to service the callbacks, would not reset
max.poll.interval.ms.
This was because we were only checking rk_rep
for
consumer messages, while the method to service the queue internally also
services the queue forwarded to from rk_rep
, which is rkcg_q
.
Solved by moving the max.poll.interval.ms
check into rd_kafka_q_serve
(#4431).rd_kafka_query_watermark_offsets
call would continue
trying to call ListOffsets on the old leader, if the topic wasn't included in
the subscription set, so it started querying the new leader only after
topic.metadata.refresh.interval.ms
(#4225).Release asset checksums:
15e77455811b3e5d869d6f97ce765b634c7583da188792e2930a2098728e932b
2d49c35c77eeb3d42fa61c43757fcbb6a206daa560247154e60642bcdcc14d12
librdkafka v2.2.0 is a feature release:
rd_kafka_offsets_store
(@mathispesch, #4084).fetch.queue.backoff.ms
to the consumer to control how long
the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879)fetch.queue.backoff.ms
to the consumer to control how long
the consumer backs off next fetch attempt. When the pre-fetch queue
has exceeded its queuing thresholds: queued.min.messages
and
queued.max.messages.kbytes
it backs off for 1 seconds.
If those parameters have to be set too high to hold 1 s of data,
this new parameter allows to back off the fetch earlier, reducing memory
requirements.close_notify
,
in OpenSSL 3.x a new type of error is set and it was interpreted as permanent
in librdkafka. It can cause a different issue depending on the RPC.
If received when waiting for OffsetForLeaderEpoch response, it triggers
an offset reset following the configured policy.
Solved by treating SSL errors as transport errors and
by setting an OpenSSL flag that allows to treat unclean SSL closes as normal
ones. These types of errors can happen it the other side doesn't support close_notify
or if there's a TCP connection reset.Release asset checksums:
e9a99476dd326089ce986afd3a5b069ef8b93dbb845bc5157b3d94894de53567
af9a820cbecbc64115629471df7c7cecd40403b6c34bfdbb9223152677a47226
librdkafka v2.1.1 is a maintenance release:
rd_kafka_message_leader_epoch()
on the polled rkmessage
(#4245).Requires.private
field (@FantasqueX, @stertingen, #4180).rd_kafka_toppar_set_fetch_state
function, given they hold
the lock on the rktp
.max.poll.interval.ms
. Only certain functions were made to reset the timer,
but it is possible for the user to obtain the queue with messages from
the broker, skipping these functions. This was fixed by encoding information
in a queue itself, that, whether polling, resets the timer.Release asset checksums:
3b8a59f71e22a8070e0ae7a6b7ad7e90d39da8fddc41ce6c5d596ee7f5a4be4b
7be1fc37ab10ebdc037d5c5a9b35b48931edafffae054b488faaff99e60e0108
librdkafka v2.1.0 is a feature release:
max.poll.interval.ms
, where polling any queue would cause
the timeout to be reset (#4176).rd_kafka_topic_partition_get_leader_epoch()
(and set..()
).rd_kafka_topic_partition_get_leader_epoch()
(and set..()
)rd_kafka_message_leader_epoch()
rd_kafka_*assign()
and rd_kafka_seek_partitions()
now supports
partitions with a leader epoch set.rd_kafka_offsets_for_times()
will return per-partition leader-epochs.leader_epoch
, stored_leader_epoch
, and committed_leader_epoch
added to per-partition statistics.rd_kafka_seek_partitions
, the remaining timeout was
converted from microseconds to milliseconds but the expected unit
for that parameter is microseconds.rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
intermittently updating app_offset
and store_offset
incorrectly when
pause or resume was being used for a partition.rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
intermittently skipping offsets when pause or resume was being
used for a partition.rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
APIs are used with
any of the seek, pause, resume or rebalancing operation, on_consume
interceptors might be called incorrectly (maybe multiple times) for not consumed messages.rd_kafka_message_leader_epoch()
on the polled rkmessage
.Release asset checksums:
2fe898f9f5e2b287d26c5f929c600e2772403a594a691e0560a2a1f2706edf57
d8e76c4b1cde99e283a19868feaaff5778aa5c6f35790036c5ef44bc5b5187aa
librdkafka v2.0.0 is a feature release:
The introduction of OpenSSL 3.0.x in the self-contained librdkafka bundles changes the default set of available ciphers, in particular all obsolete or insecure ciphers and algorithms as listed in the OpenSSL legacy manual page are now disabled by default.
WARNING: These ciphers are disabled for security reasons and it is highly recommended NOT to use them.
Should you need to use any of these old ciphers you'll need to explicitly
enable the legacy
provider by configuring ssl.providers=default,legacy
on the librdkafka client.
OpenSSL 3.0.x deprecates the use of engines, which is being replaced by
providers. As such librdkafka will emit a deprecation warning if
ssl.engine.location
is configured.
OpenSSL providers may be configured with the new ssl.providers
configuration property.
The default value for ssl.endpoint.identification.algorithm
has been
changed from none
(no hostname verification) to https
, which enables
broker hostname verification (to counter man-in-the-middle
impersonation attacks) by default.
To restore the previous behaviour, set ssl.endpoint.identification.algorithm
to none
.
The Consumer Batch APIs rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
are not thread safe if rkmessages_size
is greater than 1 and any of the seek,
pause, resume or rebalancing operation is performed in parallel with any of
the above APIs. Some of the messages might be lost, or erroneously returned to the
application, in the above scenario.
It is strongly recommended to use the Consumer Batch APIs and the mentioned operations in sequential order in order to get consistent result.
For rebalancing operation to work in sequencial manner, please set rebalance_cb
configuration property (refer examples/rdkafka_complex_consumer_example.c for the help with the usage) for the consumer.
on_broker_state_change()
interceptorrd_kafka_sasl_set_credentials()
API to update SASL credentials.allow.auto.create.topics
will no longer give a warning if used by a producer, since that is an expected use case.
Improvement in documentation for this property.resolve_cb
configuration setting that permits using custom DNS resolution logic.rd_kafka_mock_broker_error_stack_cnt()
.ssl.ca.pem
, not just the first one.send_offsets_for_transaction()
) that
timeout due to low timeout_ms may now be resumed by calling the same API
again, as the operation continues in the background.send_offsets_to_transactions()
, causing excessive network requests.
These retries are now delayed 500ms.init_transactions()
is called with an infinite timeout (-1),
the timeout will be limited to 2 * transaction.timeout.ms
.
The application may retry and resume the call if a retriable error is
returned.rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
skipping
other partitions' offsets intermittently when seek, pause, resume
or rebalancing is used for a partition.rd_kafka_consume_batch()
and rd_kafka_consume_batch_queue()
intermittently returing incorrect partitions' messages if rebalancing
happens during these operations.Release asset checksums:
9d8a8be30ed09daf6c560f402e91db22fcaea11cac18a0d3c0afdbf884df1d4e
f75de3545b3c6cc027306e2df0371aefe1bb8f86d4ec612ed4ebf7bfb2f817cd
librdkafka v1.9.2 is a maintenance release:
Release asset checksums:
4ecb0a3103022a7cab308e9fecd88237150901fa29980c99344218a84f497b86
3fba157a9f80a0889c982acdd44608be8a46142270a389008b22d921be1198ad
librdkafka v1.9.1 is a maintenance release:
Release asset checksums:
d3fc2e0bc00c3df2c37c5389c206912842cca3f97dd91a7a97bc0f4fc69f94ce
3a54cf375218977b7af4716ed9738378e37fe400a6c5ddb9d622354ca31fdc79
librdkafka v1.9.0 is a feature release:
rd_kafka_offsets_store()
(et.al) will now return an error for any
partition that is not currently assigned (through rd_kafka_*assign()
).
This prevents a race condition where an application would store offsets
after the assigned partitions had been revoked (which resets the stored
offset), that could cause these old stored offsets to be committed later
when the same partitions were assigned to this consumer again - effectively
overwriting any committed offsets by any consumers that were assigned the
same partitions previously. This would typically result in the offsets
rewinding and messages to be reprocessed.
As an extra effort to avoid this situation the stored offset is now
also reset when partitions are assigned (through rd_kafka_*assign()
).
Applications that explicitly call ..offset*_store()
will now need
to handle the case where RD_KAFKA_RESP_ERR__STATE
is returned
in the per-partition .err
field - meaning the partition is no longer
assigned to this consumer and the offset could not be stored for commit.socket.connection.setup.timeout.ms
(default 30s).
The maximum time allowed for broker connection setups (TCP connection as
well as SSL and SASL handshakes) is now limited to this value.
This fixes the issue with stalled broker connections in the case of network
or load balancer problems.
The Java clients has an exponential backoff to this timeout which is
limited by socket.connection.setup.timeout.max.ms
- this was not
implemented in librdkafka due to differences in connection handling and
ERR__ALL_BROKERS_DOWN
error reporting. Having a lower initial connection
setup timeout and then increase the timeout for the next attempt would
yield possibly false-positive ERR__ALL_BROKERS_DOWN
too early.rd_kafka_poll()
(et.al.) at least once to trigger the
refresh callback before being able to connect to brokers.
With the new rd_kafka_conf_enable_sasl_queue()
configuration API and
rd_kafka_sasl_background_callbacks_enable()
the refresh callbacks
can now be triggered automatically on the librdkafka background thread.rd_kafka_queue_get_background()
now creates the background thread
if not already created.rd_kafka_consumer_close_queue()
and rd_kafka_consumer_closed()
.
This allow applications and language bindings to implement asynchronous
consumer close.test.mock.broker.rtt
to simulate RTT/latency for mock brokers.commit_transaction()
, list_groups()
, etc.no OPENSSL_Applink()
written to the console if ssl.keystore.location
was configured.
This regression was introduced in v1.8.0 due to use of vcpkgs and how
keystore file was read. #3554.max.poll.interval.ms
consumer
timer expiring even though the application was polling according to profile.
Fixed by @WhiteWind (#3815).rd_kafka_clusterid()
would previously fail with timeout if
called on cluster with no visible topics (#3620).
The clusterid is now returned as soon as metadata has been retrieved.rd_kafka_list_groups()
if there are no available brokers
to connect to (#3705).timeout_ms
) in various APIs, such as rd_kafka_poll()
,
was limited to roughly 36 hours before wrapping. (#3034)rd_kafka_metadata()
or consumer group rebalancing
encountered a non-retriable error it would not be propagated to the caller and thus
cause a stall or timeout, this has now been fixed. (@aiquestion, #3625)DeleteGroups()
and DeleteConsumerGroupOffsets()
:
if the given coordinator connection was not up by the time these calls were
initiated and the first connection attempt failed then no further connection
attempts were performed, ulimately leading to the calls timing out.
This is now fixed by keep retrying to connect to the group coordinator
until the connection is successful or the call times out.
Additionally, the coordinator will be now re-queried once per second until
the coordinator comes up or the call times out, to detect change in
coordinators.rd_kafka_mock_broker_set_down()
would previously
accept and then disconnect new connections, it now refuses new connections.rd_kafka_offsets_store()
(et.al) will now return an error for any
partition that is not currently assigned (through rd_kafka_*assign()
).
See Upgrade considerations above for more information.rd_kafka_*assign()
will now reset/clear the stored offset.
See Upgrade considerations above for more information.seek()
followed by pause()
would overwrite the seeked offset when
later calling resume()
. This is now fixed. (#3471).
Note: Avoid storing offsets (offsets_store()
) after calling
seek()
as this may later interfere with resuming a paused partition,
instead store offsets prior to calling seek.ERR_MSG_SIZE_TOO_LARGE
consumer error would previously be raised
if the consumer received a maximum sized FetchResponse only containing
(transaction) aborted messages with no control messages. The fetching did
not stop, but some applications would terminate upon receiving this error.
No error is now raised in this case. (#2993)
Thanks to @jacobmikesell for providing an application to reproduce the
issue.assert: rkbuf->rkbuf_rkb
) when parsing
malformed JoinGroupResponse consumer group metadata state.cant handle op type
) when using consume_batch_queue()
(et.al)
and an OAUTHBEARER refresh callback was set.
The callback is now triggered by the consume call. (#3263)partition.assignment.strategy
ordering when multiple strategies are configured.
If there is more than one eligible strategy, preference is determined by the
configured order of strategies. The partitions are assigned to group members according
to the strategy order preference now. (#3818)PRODUCER_FENCED
error was returned from the
broker (added in Apache Kafka 2.8), which could cause the producer to
seemingly hang.
This error code is now correctly handled by raising a fatal error.send_offsets_to_transactions()
was called, and the first connection
attempt failed then no further connection attempts were performed, ulimately
leading to send_offsets_to_transactions()
timing out, and possibly
also the transaction timing out on the transaction coordinator.
This is now fixed by keep retrying to connect to the group coordinator
until the connection is successful or the call times out.
Additionally, the coordinator will be now re-queried once per second until
the coordinator comes up or the call times out, to detect change in
coordinators.message.timeout.ms
is greather than
an explicitly configured linger.ms
was incorrect and instead of
erroring out early the lingering time was automatically adjusted to the
message timeout, ignoring the configured linger.ms
.
This has now been fixed so that an error is returned when instantiating the
producer. Thanks to @larry-cdn77 for analysis and test-cases. (#3709)Release asset checksums:
a2d124cfb2937ec5efc8f85123dbcfeba177fb778762da506bfc5a9665ed9e57
59b6088b69ca6cf278c3f9de5cd6b7f3fd604212cd1c59870bc531c54147e889