Python3 client for NATS
Added heartbeat
option to pull subscribers fetch
API
await sub.fetch(1, timeout=1, heartbeat=0.1)
It can be useful to help distinguish API timeouts from not receiving messages:
try:
await sub.fetch(100, timeout=1, heartbeat=0.2)
except nats.js.errors.FetchTimeoutError:
# timeout due to not receiving messages
except asyncio.TimeoutError:
# unexpected timeout
Added subject_transform
to add_consumer
await js.add_stream(
name="TRANSFORMS",
subjects=["test", "foo"],
subject_transform=nats.js.api.SubjectTransform(
src=">", dest="transformed.>"
),
)
Added subject_transform
to sources as well:
transformed_source = nats.js.api.StreamSource(
name="TRANSFORMS",
# The source filters cannot overlap.
subject_transforms=[
nats.js.api.SubjectTransform(
src="transformed.>", dest="fromtest.transformed.>"
),
nats.js.api.SubjectTransform(
src="foo.>", dest="fromtest.foo.>"
),
],
)
await js.add_stream(
name="SOURCING",
sources=[transformed_source],
)
Added backoff
option to add_consumer
await js.add_consumer(
"events",
durable_name="a",
max_deliver=3, # has to be greater than length as backoff array
backoff=[1, 2], # defined in seconds
ack_wait=999999, # ignored once using backoff
max_ack_pending=3,
filter_subject="events.>",
)
Added compression
to add_consumer
await js.add_stream(
name="COMPRESSION",
subjects=["test", "foo"],
compression="s2",
)
Added metadata
to add_stream
await js.add_stream(
name="META",
subjects=["test", "foo"],
metadata={'foo': 'bar'},
)
pull_subscribe_bind
API:await jsm.add_stream(name="multi", subjects=["a", "b", "c.>"])
cinfo = await jsm.add_consumer(
"multi",
name="myconsumer",
filter_subjects=["a", "b"],
)
psub = await js.pull_subscribe_bind("multi", "myconsumer")
msgs = await psub.fetch(2)
for msg in msgs:
await msg.ack()
subjects_filter
option to js.stream_info()
APIstream = await js.add_stream(name="foo", subjects=["foo.>"])
for i in range(0, 5):
await js.publish("foo.%d" % i, b'A')
si = await js.stream_info("foo", subjects_filter=">")
print(si.state.subjects)
# => {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1}
inactive_threshold
cleanup timeout to be 5 minutes.
It can now be customized as well by passing inactive_threshold
as argument in seconds:w = await kv.watchall(inactive_threshold=30.0)
pull_subscribe_bind
first argument to be called consumer
instead of durable
since it also supports ephemeral consumers. This should be backwards compatible.psub = await js.pull_subscribe_bind(consumer="myconsumer", stream="test")
next_msg
NotFoundError
(#499 )Full Changelog: https://github.com/nats-io/nats.py/compare/v2.5.0...v2.6.0
Full Changelog: https://github.com/nats-io/nats.py/compare/v2.4.0...v2.5.0
connect()
by @dsodx in https://github.com/nats-io/nats.py/pull/484
Full Changelog: https://github.com/nats-io/nats.py/compare/v2.3.1...v2.4.0
pip install nats-py
Upload example:
import nats
import asyncio
async def main():
nc = await nats.connect("locahost", name="object.py")
js = nc.jetstream()
obs = await js.create_object_store("nats-py-object-store")
object_name = 'my-file.mp4'
with open(object_name) as f:
await obs.put(object_name, f)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Download example:
import nats
import asyncio
async def main():
nc = await nats.connect("localhost", name="object.py")
js = nc.jetstream()
obs = await js.object_store("nats-py-object-store")
files = await obs.list()
for f in files:
print(f.name, "-", f.size)
object_name = 'my-file.mp4'
with open("copy-"+object_name, 'w') as f:
await obs.get(object_name, f)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
WebSocketTransport
to detect the attempt to upgrade the connection to TLS by @allanbank (https://github.com/nats-io/nats.py/pull/443)next_msg
and tasks cancellation (https://github.com/nats-io/nats.py/pull/446)republish
value into dataclass by @orsinium (https://github.com/nats-io/nats.py/pull/405).travis.yml
by @WillCodeCo (https://github.com/nats-io/nats.py/pull/420)Full Changelog: https://github.com/nats-io/nats.py/compare/v2.1.7...v2.2.0
pending_msgs_limit
and pending_bytes_limit
can now be set for push and pull consumers from JetStream. To disable limits -1
can be used instead:# Push Subscriber
await js.subscribe("push-example", pending_bytes_limit=-1, pending_msgs_limit=-1)
# Pull Subscriber
await js.pull_subscribe("pull-example", "durable", pending_msgs_limit=-1, pending_bytes_limit=-1)
sub.pending_bytes
and sub.pending_msgs
methods to confirm buffered bytes and messages from a SubscriptionFixed accounting bug when using sub.next_msg
which would have caused SlowConsumer errors and dropping messages when reaching default limit
Fixed empty message being returned sometimes when calling sub.next_msg
after future was cancelled