Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Unify the middleware interface between Broker and subscriber/publisher #1646

Open
andreaimprovised opened this issue Aug 6, 2024 · 10 comments · May be fixed by #1779
Open

Feature: Unify the middleware interface between Broker and subscriber/publisher #1646

andreaimprovised opened this issue Aug 6, 2024 · 10 comments · May be fixed by #1779
Assignees
Labels
Core Issues related to core FastStream functionality and affects to all brokers enhancement New feature or request
Milestone

Comments

@andreaimprovised
Copy link
Contributor

andreaimprovised commented Aug 6, 2024

Is your feature request related to a problem? Please describe.

Yes.

The API protocol/types for middleware dare different for Broker/Router instantiation vs. subscriber/publisher instantiation.

These are the current types in faststream/broker/types.py:

BrokerMiddleware: TypeAlias = Callable[[Optional[MsgType]], BaseMiddleware]
SubscriberMiddleware: TypeAlias = Callable[
    [AsyncFuncAny, MsgType],
    MsgType,
]

class PublisherMiddleware(Protocol):
    """Publisher middleware interface."""

    def __call__(
        self,
        call_next: AsyncFunc,
        *__args: Any,
        **__kwargs: Any,
    ) -> Any: ...

The subscriber/publisher middleware API is specific to the consume_scope and publish_scope signatures.

This is a bit surprising. It makes faststream.broker.middleware.BaseMiddleware only work for brokers. As a result, the middleware is not really portable.

Describe the solution you'd like

Make the subscriber/publisher protocols the same as the broker/router protocols.

Or, perhaps, allow both? Or at least document on the website that there's a difference.

Feature code example
This is what I wish were possible.

from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.broker.middleware import BaseMiddleware

class CustomMiddleware1(BaseMiddleware):
    pass

class CustomMiddleware2(BaseMiddleware):
    pass

broker = KafkaBroker("localhost:9092", middlewares=[CustomMiddleware1])

app = FastStream(broker)

@broker.subscribe("in-topic", middlewares=[CustomMiddleware2])
async def handle(body: str):
    print(str)

Describe alternatives you've considered

There is an adapter strategy, but it's a little extra effort and more indirection.

from faststream.broker.types import BrokerMiddleware

class SubscriberMiddlewareAdapter:
    def __init__(self, middleware: BrokerMiddleware):
        self.middleware = middleware

    async def __call__(
        self, call_next: AsyncFuncAny, msg: KafkaMessage
    ) -> KafkaMessage:
        async with self.middleware(msg.raw_message) as m:
            return await m.consume_scope(call_next, msg)

Additional context
Include any other relevant context or screenshots related to the feature request.

@andreaimprovised andreaimprovised added the enhancement New feature or request label Aug 6, 2024
@Lancetnik
Copy link
Member

Thank you for the suggestion! I am still designing some breaking changes to release in 0.6.0 and subscriber & publisher middlewares rework is one of them. I'll contact you here, when we have some final view about middlewares.

@Lancetnik Lancetnik added the Core Issues related to core FastStream functionality and affects to all brokers label Aug 8, 2024
@Lancetnik Lancetnik moved this to Todo in FastStream Aug 21, 2024
@Lancetnik Lancetnik added this to the 0.6.0 milestone Aug 21, 2024
@Lancetnik
Copy link
Member

@andreaimprovised what do you think about removing subscriber/publisher middlewares at all?
I think, it will be much simpler, if you will be able to register middlewares only on Broker/Router level.
If you want to register middleware for specific subscriber/publisher - you can create a separated router with custom middleware for it.

@andreaimprovised
Copy link
Contributor Author

I have retry-oriented and timeout-oriented middleware that is specific to the handlers their registered on. I don't know much about making routers, but I'm guessing that's just a group of handlers that could share middleware? I suppose that could be fine, assuming doing that doesn't break any major assumptions elsewhere they I make

@Lancetnik
Copy link
Member

Sure, router is just an object you can you to register subscribers/publishers instead of broker - https://faststream.airt.ai/latest/getting-started/routers/#router-usage

You can use it to apply middlewares for subscriber, registered by this router only as well

router = RabbitRouter(middlewares=[])

@router.subscriber(...)
async def handler(): ...

I think, it is flexible enough API and provides with much explicit, but a little bit verbose way to register middleware for the single subscriber

I plan to deprecate subscriber/publisher middlewares in 0.6 and remove them in 0.7

@andreaimprovised
Copy link
Contributor Author

Yeah that's probably workable, okay.

On a separate note, it's a bit hard to reason about middleware order, generally speaking, across broker, router, and sub/pub. But order is very important.

In your changes, maybe document an official order?

@Lancetnik
Copy link
Member

For sure. Btw, what is the middlewares order you expect in the following case?

router = Router(middlewares=[Middleware2, Middleware4])

@router.subscriber(...)
async def handler(): ...

broker = Broker(routers=[router], middlewares=[Middleware1, Middleware3])

@andreaimprovised

@Lancetnik Lancetnik linked a pull request Dec 7, 2024 that will close this issue
@andreaimprovised
Copy link
Contributor Author

1, 3, 2, 4. More general => runs first

@Lancetnik
Copy link
Member

1, 3, 2, 4. More general => runs first

Correct. So, I think the behavior is clear enough 😄

@andreaimprovised
Copy link
Contributor Author

andreaimprovised commented Dec 7, 2024

Agree, I just don't think it's documented? And it was more like 3, 1, 4, 2 I think only recently until a bug fix last week that got released

@Lancetnik
Copy link
Member

@andreaimprovised indeed, we found such bug accidentally in 0.6 branch and backported the fix to main
It's a pity you didn't report it sooner.

@Lancetnik Lancetnik moved this from Todo to Waiting for merge in FastStream Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core Issues related to core FastStream functionality and affects to all brokers enhancement New feature or request
Projects
Status: Waiting for merge
Development

Successfully merging a pull request may close this issue.

2 participants