Nats.py Versions Save

Python3 client for NATS

v2.7.2

2 months ago
  • Added heartbeat option to pull subscribers fetch API

    await sub.fetch(1, timeout=1, heartbeat=0.1)
    

    It can be useful to help distinguish API timeouts from not receiving messages:

    try:
      await sub.fetch(100, timeout=1, heartbeat=0.2)
    except nats.js.errors.FetchTimeoutError:
      # timeout due to not receiving messages
    except asyncio.TimeoutError:
      # unexpected timeout
    
  • Added subject_transform to add_consumer

    await js.add_stream(
      name="TRANSFORMS",
      subjects=["test", "foo"],
      subject_transform=nats.js.api.SubjectTransform(
        src=">", dest="transformed.>"
      ),
    )
    
  • Added subject_transform to sources as well:

    transformed_source = nats.js.api.StreamSource(
        name="TRANSFORMS",
        # The source filters cannot overlap.
        subject_transforms=[
            nats.js.api.SubjectTransform(
                src="transformed.>", dest="fromtest.transformed.>"
            ),
            nats.js.api.SubjectTransform(
                src="foo.>", dest="fromtest.foo.>"
            ),
        ],
    )
    await js.add_stream(
      name="SOURCING",
      sources=[transformed_source],
    )
    
  • Added backoff option to add_consumer

    await js.add_consumer(
              "events",
              durable_name="a",
              max_deliver=3,    # has to be greater than length as backoff array
              backoff=[1, 2],   # defined in seconds
              ack_wait=999999,  # ignored once using backoff
              max_ack_pending=3,
              filter_subject="events.>",
          )
    
  • Added compression to add_consumer

    await js.add_stream(
      name="COMPRESSION",
      subjects=["test", "foo"],
      compression="s2",
    )
    
  • Added metadata to add_stream

    await js.add_stream(
        name="META",
        subjects=["test", "foo"],
        metadata={'foo': 'bar'},
    )
    

v2.7.0

3 months ago

Added

  • Added support for multiple filter consumers when using nats-server +v2.10 This is only supported when using the pull_subscribe_bind API:
await jsm.add_stream(name="multi", subjects=["a", "b", "c.>"])
cinfo = await jsm.add_consumer(
    "multi",
    name="myconsumer",
    filter_subjects=["a", "b"],
)
psub = await js.pull_subscribe_bind("multi", "myconsumer")
msgs = await psub.fetch(2)
for msg in msgs:
  await msg.ack()
  • Added subjects_filter option to js.stream_info() API
stream = await js.add_stream(name="foo", subjects=["foo.>"])
for i in range(0, 5):
    await js.publish("foo.%d" % i, b'A')

si = await js.stream_info("foo", subjects_filter=">")
print(si.state.subjects)
# => {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1}

Changed

  • Changed kv.watch default inactive_threshold cleanup timeout to be 5 minutes. It can now be customized as well by passing inactive_threshold as argument in seconds:
w = await kv.watchall(inactive_threshold=30.0)
  • Changed pull_subscribe_bind first argument to be called consumer instead of durable since it also supports ephemeral consumers. This should be backwards compatible.
psub = await js.pull_subscribe_bind(consumer="myconsumer", stream="test")

v2.6.0

6 months ago

Added

  • Added support to ephemeral pull consumers (#412)

Changed

  • Changed default max control line to 4K as in the server since v2.2

Fixed

  • Fixed ordered consumer implementation not being recreated when consumer deleted (#510)
  • Fixed accounting issue pending data which would have caused slow consumers on ordered consumers using next_msg
  • Fixed subscribe to missing stream not raising NotFoundError (#499 )

Full Changelog: https://github.com/nats-io/nats.py/compare/v2.5.0...v2.6.0

v2.5.0

6 months ago

Added

Fixed

New Contributors

Full Changelog: https://github.com/nats-io/nats.py/compare/v2.4.0...v2.5.0

v2.4.0

8 months ago

Fixed

Added

Improved

New Contributors

Full Changelog: https://github.com/nats-io/nats.py/compare/v2.3.1...v2.4.0

v2.3.0

11 months ago
pip install nats-py

Added

Upload example:

import nats
import asyncio

async def main():
    nc = await nats.connect("locahost", name="object.py")
    js = nc.jetstream()
    obs = await js.create_object_store("nats-py-object-store")

    object_name = 'my-file.mp4'
    with open(object_name) as f:
        await obs.put(object_name, f)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Download example:

import nats
import asyncio

async def main():
    nc = await nats.connect("localhost", name="object.py")
    js = nc.jetstream()
    obs = await js.object_store("nats-py-object-store")

    files = await obs.list()
    for f in files:
        print(f.name, "-", f.size)

    object_name = 'my-file.mp4'
    with open("copy-"+object_name, 'w') as f:
        await obs.get(object_name, f)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Fixed

Changed

Internal changes

New Contributors

v2.2.0

1 year ago

Added

Changed

Fixed

Full Changelog: https://github.com/nats-io/nats.py/compare/v2.1.7...v2.2.0

v2.1.7

1 year ago

Changed

  • Changed nc.request to have more unique inboxes to avoid accidental reuse of response tokens #335

v2.1.6

1 year ago

Fixed

  • Fixed SlowConsumerError that would appear when using async for msg in sub.messages after reaching default pending bytes limit

v2.1.5

1 year ago

Added

  • Added pending_msgs_limit and pending_bytes_limit can now be set for push and pull consumers from JetStream. To disable limits -1 can be used instead:
# Push Subscriber
await js.subscribe("push-example", pending_bytes_limit=-1, pending_msgs_limit=-1)

# Pull Subscriber
await js.pull_subscribe("pull-example", "durable", pending_msgs_limit=-1, pending_bytes_limit=-1)
  • Added sub.pending_bytes and sub.pending_msgs methods to confirm buffered bytes and messages from a Subscription

Fixed

  • Fixed accounting bug when using sub.next_msg which would have caused SlowConsumer errors and dropping messages when reaching default limit

  • Fixed empty message being returned sometimes when calling sub.next_msg after future was cancelled