Skip to content

v0.5.8

Compare
Choose a tag to compare
@Lancetnik Lancetnik released this 23 May 19:36
· 234 commits to main since this release
82853db

What's Changed

This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!

  1. KeyValue creation and watching API added (you can read updated documentation section for changes):

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("some-key", kv_watch="bucket")
     async def handler(msg: int, logger: Logger):
         logger.info(msg)
     
     @app.after_startup
     async def test():
         kv = await broker.key_value("bucket")
         await kv.put("some-key", b"1")
  2. ObjectStore API added as well (you can read updated documentation section for changes):

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")
  3. Also now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("test", stream="stream", pull_sub=True)
     async def handler(msg, logger: Logger):
         logger.info(msg)

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

@broker.subscriber("tests", no_reply=True)
async def handler():
    ....
 
# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)

New Contributors

Full Changelog: 0.5.7...0.5.8