Nats.py Versions Save

Python3 client for NATS

v2.1.4

1 year ago

Fixed

Added

v2.1.3

2 years ago

Fixes

  • Fixes to headers parser when handling empty keys

v2.1.2

2 years ago

Added

Changed

  • Changed sub.Fetch to not block until all messages arrive, it now yields when there are some pending and handle new 4XX temporary status sent by the server

Fixed

v2.1.0

2 years ago

Added

  • Added pending_size option on connect to set max internal pending size for flushing commands and a flush_timeout option to control the maximum time to wait for a flush.

For example to set it to 2MB only which is the default:

nats.connect(pending_size=2*1024*1024)

And similar to the nats.go client, it can be disabled if set to be negative:

nats.connect(pending_size=-1)

When pending size is disabled, then any published message during a disconnection will result in a synchronous OutboundBufferLimitError thrown when publishing.

To control the flushing there is a flush_timeout option that can be passed on connect as well.

nats.connect(flush_timeout=10)

By default, there is no flush timeout (it is None) so the client can wait indefinitely during a flush similar to nats.go (eventually the ping interval ought to unblock the flushing). These couple of changes also improve the publishing performance of the client, thanks to @charbonnierg for contributing to this issue.

Fixed

Changed

  • Changed nc.flush() to now throw FlushTimeoutError instead which is a subtype of TimeoutError that it used be using for backwards compatibility.

  • Changed EOF disconnections being reported as StaleConnectionError and they are instead a UnexpectedEOF error.

v2.0.0

2 years ago

nats.py v2.0.0

Major upgrade to the APIs of the Python3 client. For this release the client has also been renamed to be nats-py from asyncio-nats-client, it can now be installed with:

pip install nats-py

# With NKEYS / JWT support
pip install nats-py[nkeys]

This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.

Overall, the API of the client should resemble more the APIs of the Go client:

import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  sub = await nc.subscribe("hello")

  await nc.publish("hello")

  msg = await sub.next_msg()
  print(f"Received [{msg.subject}]: {msg.data}")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

There is support for NATS Headers ⚡

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
  nc = await nats.connect("demo.nats.io")

  async def help_request(msg):
      print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
      print("Headers", msg.header)
      await msg.respond(b'OK')

  sub = await nc.subscribe("hello", "workers", help_request)

  try:
      response = await nc.request("help", b'help me', timeout=0.5)
      print("Received response: {message}".format(
          message=response.data.decode()))
  except TimeoutError:
      print("Request timed out")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

It also now includes JetStream support:

import asyncio
import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  # Create JetStream context.
  js = nc.jetstream()

  # Persist messages on 'foo' subject.
  await js.add_stream(name="sample-stream", subjects=["foo"])

  for i in range(0, 10):
      ack = await js.publish("foo", f"hello world: {i}".encode())
      print(ack)

  # Create pull based consumer on 'foo'.
  psub = await js.pull_subscribe("foo", "psub")

  # Fetch and ack messagess from consumer.
  for i in range(0, 10):
      msgs = await psub.fetch()
      for msg in msgs:
          print(msg)

  await nc.close()

if __name__ == '__main__':
  asyncio.run(main())

As well as JetStream KV support:

import asyncio
import nats

async def main():
  nc = await nats.connect()
  js = nc.jetstream()

  # Create a KV
  kv = await js.create_key_value(bucket='MY_KV')

  # Set and retrieve a value
  await kv.put('hello', b'world')
  entry = await kv.get('hello')
  print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
  # KeyValue.Entry: key=hello, value=world

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

New Documentation site:

The following site has been created to host the API of the Python3 client: https://nats-io.github.io/nats.py/ The contents of the doc site can be found in the following branch from this same repo: https://github.com/nats-io/nats.py/tree/docs/source

Breaking changes

  • Changed the return type of subscribe instead of returning a sid.

  • Changed suffix of most errors to follow PEP-8 style and now use the Error suffix. For example, ErrSlowConsumer is now SlowConsumerError. Old style errors are subclasses of the new ones so exceptions under try...catch blocks would be still caught.

Deprecated

Several areas of the client got deprecated in this release:

  • Deprecated is_async parameter for subscribe
  • Deprecated Client.timed_request
  • Deprecated passing loop parameter to most functions
  • Deprecated auto_unsubscribe instead preferring sub.unsubscribe

Thanks

Special thanks to @brianshannan @charliestrawn @orsinium @charbonnierg for their contributions in this release!

v0.11.5

2 years ago

Bugfix release that includes #230

v2.0.0rc1

2 years ago

nats.py v2.0.0rc1

Major upgrade to the APIs of the Python3 client! For this release the client has also been renamed to be nats-py from asyncio-nats-client, it can now be installed with:

pip install nats-py

# With NKEYS / JWT support
pip install nats-py[nkeys]

This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.

Overall, the API of the client should resemble more the APIs of the Go client:

import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  sub = await nc.subscribe("hello")

  await nc.publish("hello")

  msg = await sub.next_msg()
  print(f"Received [{msg.subject}]: {msg.data}")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

There is support for NATS Headers ⚡

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
  nc = await nats.connect("demo.nats.io")

  async def help_request(msg):
      print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
      print("Headers", msg.header)
      await msg.respond(b'OK')

  sub = await nc.subscribe("hello", "workers", help_request)

  try:
      response = await nc.request("help", b'help me', timeout=0.5)
      print("Received response: {message}".format(
          message=response.data.decode()))
  except TimeoutError:
      print("Request timed out")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

It also now includes JetStream support:

import asyncio
import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  # Create JetStream context.
  js = nc.jetstream()

  # Persist messages on 'foo' subject.
  await js.add_stream(name="sample-stream", subjects=["foo"])

  for i in range(0, 10):
      ack = await js.publish("foo", f"hello world: {i}".encode())
      print(ack)

  # Create pull based consumer on 'foo'.
  psub = await js.pull_subscribe("foo", "psub")

  # Fetch and ack messagess from consumer.
  for i in range(0, 10):
      msgs = await psub.fetch()
      for msg in msgs:
          print(msg)

  await nc.close()

if __name__ == '__main__':
  asyncio.run(main())

As well as JetStream KV support:

import asyncio
import nats

async def main():
  nc = await nats.connect()
  js = nc.jetstream()

  # Create a KV
  kv = await js.create_key_value(bucket='MY_KV')

  # Set and retrieve a value
  await kv.put('hello', b'world')
  entry = await kv.get('hello')
  print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
  # KeyValue.Entry: key=hello, value=world

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

New Documentation site:

The following site has been created to host the API of the Python3 client: https://nats-io.github.io/nats.py/ The contents of the doc site can be found in the following branch from this same repo: https://github.com/nats-io/nats.py/tree/docs/source

Breaking changes

  • Changed the return type of subscribe instead of returning a sid.

  • Changed suffix of most errors to follow PEP-8 style and now use the Error suffix. For example, ErrSlowConsumer is now SlowConsumerError. Old style errors are subclasses of the new ones so exceptions under try...catch blocks would be still caught.

Deprecated

Several areas of the client got deprecated in this release:

  • Deprecated is_async parameter for subscribe

  • Deprecated Client.timed_request

  • Deprecated passing loop parameter to most functions

v0.11.4

3 years ago

Fixed

  • Fixed issue of NATS client not reconnecting to NATS Server nodes behind LB #189

v0.11.2

3 years ago

Added

Fixed

v0.11.0

3 years ago

Added

  • Added ability to access the connection identifier / client id (#146)
print(nc.client_id)
  • Added option to specify tls_hostname (#157)
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx, tls_hostname="localhost")

Fixed

Improved

  • Raise ErrInvalidCallbackType error when client callback functions are not coroutines (#128)
  • Many yapf formatting fixes (#152)
  • Fixed deprecation warnings. Remove unnecessary coverage runs. (#149)

Changed

  • Testing againts NATS 2.1.8
  • Updated examples to async/await syntax
  • Client uses f strings (#152)

Deprecated

  • Deprecated support for Python 3.5. Next release will drop support for Python 3.6.