Python3 client for NATS
num_replicas
field to Consumer config (https://github.com/nats-io/nats.py/pull/326)JetStreamManager
(https://github.com/nats-io/nats.py/pull/315)subscribe
(https://github.com/nats-io/nats.py/pull/302)sub.Fetch
to not block until all messages arrive, it now yields when there are some pending and handle new 4XX temporary status sent by the servercluster
should be optional in Placement
class (https://github.com/nats-io/nats.py/pull/310) by @olgeninext_msg()
does not cancel internal _next_msg()
task (https://github.com/nats-io/nats.py/pull/301)pending_size
option on connect to set max internal pending size for flushing commands and a flush_timeout
option to control the maximum time to wait for a flush.For example to set it to 2MB only which is the default:
nats.connect(pending_size=2*1024*1024)
And similar to the nats.go client, it can be disabled if set to be negative:
nats.connect(pending_size=-1)
When pending size is disabled, then any published message during a disconnection will result in a synchronous OutboundBufferLimitError
thrown when publishing.
To control the flushing there is a flush_timeout
option that can be passed on connect as well.
nats.connect(flush_timeout=10)
By default, there is no flush timeout (it is None
) so the client can wait indefinitely during a flush similar to nats.go (eventually the ping interval ought to unblock the flushing). These couple of changes also improve the publishing performance of the client, thanks to @charbonnierg for contributing to this issue.
Changed nc.flush()
to now throw FlushTimeoutError
instead which is a subtype of TimeoutError
that it used be using for backwards compatibility.
Changed EOF disconnections being reported as StaleConnectionError
and they are instead a UnexpectedEOF
error.
Major upgrade to the APIs of the Python3 client. For this release the client has also been renamed to be nats-py
from asyncio-nats-client
, it can now be installed with:
pip install nats-py
# With NKEYS / JWT support
pip install nats-py[nkeys]
This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.
Overall, the API of the client should resemble more the APIs of the Go client:
import nats
async def main():
nc = await nats.connect("demo.nats.io")
sub = await nc.subscribe("hello")
await nc.publish("hello")
msg = await sub.next_msg()
print(f"Received [{msg.subject}]: {msg.data}")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
There is support for NATS Headers ⚡
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("demo.nats.io")
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
print("Headers", msg.header)
await msg.respond(b'OK')
sub = await nc.subscribe("hello", "workers", help_request)
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
It also now includes JetStream
support:
import asyncio
import nats
async def main():
nc = await nats.connect("demo.nats.io")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo' subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch()
for msg in msgs:
print(msg)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
As well as JetStream KV
support:
import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Create a KV
kv = await js.create_key_value(bucket='MY_KV')
# Set and retrieve a value
await kv.put('hello', b'world')
entry = await kv.get('hello')
print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
# KeyValue.Entry: key=hello, value=world
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
The following site has been created to host the API of the Python3 client: https://nats-io.github.io/nats.py/ The contents of the doc site can be found in the following branch from this same repo: https://github.com/nats-io/nats.py/tree/docs/source
Changed the return type of subscribe
instead of returning a sid.
Changed suffix of most errors to follow PEP-8 style and now use the Error
suffix. For example, ErrSlowConsumer
is now SlowConsumerError
. Old style errors are subclasses of the new ones so exceptions under try...catch
blocks would be still caught.
Several areas of the client got deprecated in this release:
is_async
parameter for subscribe
Client.timed_request
loop
parameter to most functionsauto_unsubscribe
instead preferring sub.unsubscribe
Special thanks to @brianshannan @charliestrawn @orsinium @charbonnierg for their contributions in this release!
Bugfix release that includes #230
Major upgrade to the APIs of the Python3 client! For this release the client has also been renamed to be nats-py
from asyncio-nats-client
, it can now be installed with:
pip install nats-py
# With NKEYS / JWT support
pip install nats-py[nkeys]
This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.
Overall, the API of the client should resemble more the APIs of the Go client:
import nats
async def main():
nc = await nats.connect("demo.nats.io")
sub = await nc.subscribe("hello")
await nc.publish("hello")
msg = await sub.next_msg()
print(f"Received [{msg.subject}]: {msg.data}")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
There is support for NATS Headers ⚡
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("demo.nats.io")
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
print("Headers", msg.header)
await msg.respond(b'OK')
sub = await nc.subscribe("hello", "workers", help_request)
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
It also now includes JetStream
support:
import asyncio
import nats
async def main():
nc = await nats.connect("demo.nats.io")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo' subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch()
for msg in msgs:
print(msg)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
As well as JetStream KV
support:
import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Create a KV
kv = await js.create_key_value(bucket='MY_KV')
# Set and retrieve a value
await kv.put('hello', b'world')
entry = await kv.get('hello')
print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
# KeyValue.Entry: key=hello, value=world
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
The following site has been created to host the API of the Python3 client: https://nats-io.github.io/nats.py/ The contents of the doc site can be found in the following branch from this same repo: https://github.com/nats-io/nats.py/tree/docs/source
Changed the return type of subscribe
instead of returning a sid.
Changed suffix of most errors to follow PEP-8 style and now use the Error
suffix. For example, ErrSlowConsumer
is now SlowConsumerError
. Old style errors are subclasses of the new ones so exceptions under try...catch
blocks would be still caught.
Several areas of the client got deprecated in this release:
Deprecated is_async
parameter for subscribe
Deprecated Client.timed_request
Deprecated passing loop
parameter to most functions
print(nc.client_id)
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx, tls_hostname="localhost")
ErrInvalidCallbackType
error when client callback functions are not coroutines (#128)