Confluent Kafka Python Versions Save

Confluent's Kafka Python Client

v0.11.5

5 years ago

Admin Client support

v0.11.5 is a feature release that adds support for the Kafka Admin API (KIP-4).

Admin API

This release adds support for the Admin API, enabling applications and users to perform administrative Kafka tasks programmatically:

  • Create topics - specifying partition count, replication factor and topic configuration.
  • Delete topics - delete topics in cluster.
  • Create partitions - extend a topic with additional partitions.
  • Alter configuration - set, modify or delete configuration for any Kafka resource (topic, broker, ..).
  • Describe configuration - view configuration for any Kafka resource.

The API closely follows the Java Admin API:

def example_create_topics(a, topics):
    new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
    # Call create_topics to asynchronously create topics
    fs = a.create_topics(new_topics)

    # Wait for operation to finish.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} created".format(topic))
        except Exception as e:
            print("Failed to create topic {}: {}".format(topic, e))

Additional examples can be found in examples/adminapi

Enhancements

  • Schema Registry HTTPS support with TLS client auth added (#90)
  • Metadata API list_topics() added (#161, @tbsaunde, @stephan-hof)
  • Expose librdkafka built-in partitioner options directly (#396)
  • Callback based throttle event handling; throttle_cb (#237) (#377)
  • Added Unicode support for header values (#382)
  • OpenSSL version bump to 1.0.2o (#410)
  • Avro documentation added to the docs (#382)
  • Python 3.7 support (#382)
  • Allow passing headers as both list(tuples) and dict() (#355)
  • Support for legacy setuptool's install_requires (#399)

Fixes

  • Release GIL before making blocking calls (#412)
  • Prevent application config dict mutation (#412)
  • Intercept plugin configurations to ensure proper ordering (#404)
  • test_compatibility() should return False not None would return None when unable to check compatibility (#372, @Enether)
  • Schema Registry client returns false when unable to check compatibility(#372, @Enether)
  • Fix invocation of SchemaParseException (#376)
  • Fix call ordering to avoid callback crash on implicit close (#265)
  • Fix memory leaks in generic client setters (#382)
  • Fix AvroProducer/AvroConsumer key/value identity check (#342)
  • Correct Producer.produce documentation to use correct time unit of seconds (#384) (#385)
  • Fix KafkaError refcounting which could lead to memory leaks (#382)

v0.11.4

6 years ago

Simplified installation

This release adds binary wheels containing all required dependencies (librdkafka, openssl, zlib, etc) for Linux and OSX.

Should these wheels not work on your platform then please file an issue outlining what is failing, and then use the previous method of installing librdkafka manually followed by pip install --no-binary all confluent-kafka

Message header support

Support for Kafka message headers has been added (requires broker version >= v0.11.0).

When producing messages simply provide a list of key,value tuples as headers=:

    myproducer.produce(topic, 'A message payload', headers=[('hdr1', 'val1'), ('another', 'one'), ('hdr1', 'duplicates are supported and ordering is retained')])

Message headers are returned as a list of tuples for consumed messages:

   msg = myconsumer.poll(1)
   if msg is not None and not msg.error():
       headers = msg.headers()
       if headers is not None:
           # convert to dict, collapsing duplicate header keys
           headers_dict = dict(headers)

Enhancements

  • Message header support (@johnistan)
  • Added Consumer.seek()
  • Added consumer.pause/resume support (closes #120, @dangra)
  • Added Consumer.store_offsets() API (#245, @ctrochalakis)
  • Support for passing librdkafka logs to the standard logging module (see logger kwarg in constructors) (#148)
  • Enable produce.offset.report by default (#266) (#267)
  • Expose offsets_for_times consumer method. closes #224 (#268, @johnistan)
  • Add batch consume() API (closes #252, #282, @tburmeister)
  • Add hash func for UnionSchema (#228, @fyndiq)
  • Use schemaless reader to handle complex schema (#251, @fpietka)

Fixes

  • Fix librdkafka install command for macOS (#281, @vkroz)
  • Constructors now support both dict and kwargs
  • Add __version__ to __init__.py (@mrocklin)
  • Messages could be leaked&lost if exception raised from callback triggered by poll()
  • Make Consumer.commit(..,asynchronous=False) return offset commit results
  • Raise runtime error if accessing consumer after consumer close (#262, @johnistan)
  • Pass py.test arguments from tox (@ctrochalakis)
  • Rename async kwargs to asynchronous (async will continue working until the 1.0 API bump)

v0.11.0

6 years ago

This is a minimal librdkafka version-synchronized release of the Python client.

Changes:

  • Handle null/None values during deserialization
  • Allow to pass custom schema registry instance.
  • None conf values are now converted to NULL rather than the string "None" (#133)
  • Fix memory leaks when certain exceptions were raised.
  • Handle delivery.report.only.error in Python (#84)
  • Proper use of Message error string on Producer (#129)
  • Now Flake8 clean

v0.9.4

7 years ago

v0.9.2

7 years ago

v0.9.1.2

7 years ago

Bugfix release:

  • Use bytes for Message payload and key
  • Various build and packaging fixes