Skip to content

Commit

Permalink
fix #1014, feat #1006 (#1015)
Browse files Browse the repository at this point in the history
* fix #1014: remove custom fields if many params

* chore: bump version

* test: fix required missing

* test: fix properties missing

* test: pydanticV1 compatibility
  • Loading branch information
Lancetnik authored Dec 3, 2023
1 parent f74a2c4 commit cfe3666
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 62 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices"""
__version__ = "0.3.0"
__version__ = "0.3.1"


INSTALL_YAML = """
Expand Down
10 changes: 9 additions & 1 deletion faststream/asyncapi/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ def parse_handler_params(call: CallModel[Any, Any], prefix: str = "") -> Dict[st
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
body = get_model_schema(
call.model, prefix=prefix, exclude=tuple(call.custom_fields.keys())
call.model,
prefix=prefix,
exclude=tuple(call.custom_fields.keys()),
)

if body is None:
return {"title": "EmptyPayload", "type": "null"}

Expand Down Expand Up @@ -181,6 +184,11 @@ def get_model_schema(
model = call

body: Dict[str, Any] = model_schema(model)
body["properties"] = body.get("properties", {})
for i in exclude:
body["properties"].pop(i, None)
if (required := body.get("required")):
body["required"] = list(filter(lambda x: x not in exclude, required))

if params_number == 1 and not use_original_model:
param_body: Dict[str, Any] = body.get("properties", {})
Expand Down
1 change: 1 addition & 0 deletions faststream/broker/core/asyncronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ def _wrap_handler(
no_ack=no_ack,
_raw=_raw,
_get_dependant=_get_dependant,
_process_kwargs=_process_kwargs,
)

async def _execute_handler(
Expand Down
70 changes: 42 additions & 28 deletions faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def serve(
"""Serve project AsyncAPI schema"""

if ":" in app:
module, app_obj = import_from_string(app)
raw_schema = get_app_schema(app_obj)
module, _ = import_from_string(app)

module_parent = module.parent
extra_extensions: Sequence[str] = ()
Expand All @@ -50,46 +49,24 @@ def serve(
schema_filepath = module_parent / app
extra_extensions = (schema_filepath.suffix,)

if schema_filepath.suffix == ".json":
data = schema_filepath.read_text()

elif schema_filepath.suffix == ".yaml" or schema_filepath.suffix == ".yml":
try:
import yaml
except ImportError as e: # pragma: no cover
typer.echo(INSTALL_YAML, err=True)
raise typer.Exit(1) from e

with schema_filepath.open("r") as f:
schema = yaml.safe_load(f)

data = json.dumps(schema)

else:
raise ValueError(
f"Unknown extension given - {app}; Please provide app in format [python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation"
)

raw_schema = model_parse(Schema, data)

if reload is True:
try:
from faststream.cli.supervisors.watchfiles import WatchReloader

except ImportError:
warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1)
serve_app(raw_schema, host, port)
_parse_and_serve(app, host, port)

else:
WatchReloader(
target=serve_app,
args=(raw_schema, host, port),
target=_parse_and_serve,
args=(app, host, port),
reload_dirs=(str(module_parent),),
extra_extensions=extra_extensions,
).run()

else:
serve_app(raw_schema, host, port)
_parse_and_serve(app, host, port)


@docs_app.command(name="gen")
Expand Down Expand Up @@ -133,3 +110,40 @@ def gen(
json.dump(schema, f, indent=2)

typer.echo(f"Your project AsyncAPI scheme was placed to `{name}`")


def _parse_and_serve(
app: str,
host: str = "localhost",
port: int = 8000,
) -> None:
if ":" in app:
_, app_obj = import_from_string(app)
raw_schema = get_app_schema(app_obj)

else:
schema_filepath = Path.cwd() / app

if schema_filepath.suffix == ".json":
data = schema_filepath.read_text()

elif schema_filepath.suffix == ".yaml" or schema_filepath.suffix == ".yml":
try:
import yaml
except ImportError as e: # pragma: no cover
typer.echo(INSTALL_YAML, err=True)
raise typer.Exit(1) from e

with schema_filepath.open("r") as f:
schema = yaml.safe_load(f)

data = json.dumps(schema)

else:
raise ValueError(
f"Unknown extension given - {app}; Please provide app in format [python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation"
)

raw_schema = model_parse(Schema, data)

serve_app(raw_schema, host, port)
3 changes: 2 additions & 1 deletion faststream/rabbit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from faststream.rabbit.router import RabbitRouter
from faststream.rabbit.shared.constants import ExchangeType
from faststream.rabbit.shared.router import RabbitRoute
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.rabbit.test import TestRabbitBroker

__all__ = (
Expand All @@ -13,6 +13,7 @@
"TestApp",
"RabbitExchange",
"RabbitQueue",
"ReplyConfig",
"ExchangeType",
"RabbitRouter",
"RabbitRoute",
Expand Down
19 changes: 16 additions & 3 deletions faststream/rabbit/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pamqp.common import FieldTable
from yarl import URL

from faststream._compat import override
from faststream._compat import model_to_dict, override
from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter
from faststream.broker.message import StreamMessage
from faststream.broker.middlewares import BaseMiddleware
Expand All @@ -46,6 +46,7 @@
from faststream.rabbit.shared.schemas import (
RabbitExchange,
RabbitQueue,
ReplyConfig,
get_routing_hash,
)
from faststream.rabbit.shared.types import TimeoutType
Expand Down Expand Up @@ -302,6 +303,7 @@ def subscriber( # type: ignore[override]
exchange: Union[str, RabbitExchange, None] = None,
*,
consume_args: Optional[AnyDict] = None,
reply_config: Optional[ReplyConfig] = None,
# broker arguments
dependencies: Sequence[Depends] = (),
parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]] = None,
Expand Down Expand Up @@ -383,6 +385,9 @@ def consumer_wrapper(
func,
extra_dependencies=dependencies,
no_ack=no_ack,
_process_kwargs={
"reply_config": reply_config,
},
**original_kwargs,
)

Expand Down Expand Up @@ -481,7 +486,6 @@ async def publish( # type: ignore[override]
Returns:
Union[aiormq.abc.ConfirmationFrameType, SendableMessage]: The confirmation frame or the response message.
"""

assert self._producer, NOT_CONNECTED_YET # nosec B101
return await self._producer.publish(*args, **kwargs)

Expand All @@ -491,6 +495,7 @@ def _process_message(
[StreamMessage[aio_pika.IncomingMessage]], Awaitable[T_HandlerReturn]
],
watcher: Callable[..., AsyncContextManager[None]],
reply_config: Optional[ReplyConfig] = None,
**kwargs: Any,
) -> Callable[
[StreamMessage[aio_pika.IncomingMessage]],
Expand All @@ -507,6 +512,10 @@ def _process_message(
Returns:
Callable: A wrapper function for processing messages.
"""
if reply_config is None:
reply_kwargs = {}
else:
reply_kwargs = model_to_dict(reply_config)

@wraps(func)
async def process_wrapper(
Expand All @@ -532,7 +541,11 @@ async def process_wrapper(
pub_response: Optional[AsyncPublisherProtocol]
if message.reply_to:
pub_response = FakePublisher(
partial(self.publish, routing_key=message.reply_to)
partial(
self.publish,
routing_key=message.reply_to,
**reply_kwargs,
)
)
else:
pub_response = None
Expand Down
4 changes: 3 additions & 1 deletion faststream/rabbit/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ from faststream.rabbit.helpers import RabbitDeclarer
from faststream.rabbit.message import RabbitMessage
from faststream.rabbit.producer import AioPikaFastProducer
from faststream.rabbit.shared.logging import RabbitLoggingMixin
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.rabbit.shared.types import TimeoutType
from faststream.rabbit.types import AioPikaSendableMessage
from faststream.types import AnyDict, SendableMessage
Expand Down Expand Up @@ -141,6 +141,7 @@ class RabbitBroker(
exchange: Union[str, RabbitExchange, None] = None,
*,
consume_args: Optional[AnyDict] = None,
reply_config: Optional[ReplyConfig] = None,
# broker arguments
dependencies: Sequence[Depends] = (),
filter: Filter[RabbitMessage] = default_filter,
Expand Down Expand Up @@ -226,6 +227,7 @@ class RabbitBroker(
[StreamMessage[aio_pika.IncomingMessage]], Awaitable[T_HandlerReturn]
],
watcher: Callable[..., AsyncContextManager[None]],
reply_config: Optional[ReplyConfig] = None,
**kwargs: Any,
) -> Callable[
[StreamMessage[aio_pika.IncomingMessage]],
Expand Down
27 changes: 3 additions & 24 deletions faststream/rabbit/fastapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ from typing import (
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Type,
Union,
overload,
)

import aio_pika
Expand All @@ -22,7 +20,7 @@ from fastapi.utils import generate_unique_id
from pamqp.common import FieldTable
from starlette import routing
from starlette.responses import JSONResponse, Response
from starlette.types import AppType, ASGIApp, Lifespan
from starlette.types import ASGIApp, Lifespan
from yarl import URL

from faststream._compat import override
Expand All @@ -41,7 +39,7 @@ from faststream.broker.wrapper import HandlerCallWrapper
from faststream.rabbit.asyncapi import Publisher
from faststream.rabbit.broker import RabbitBroker
from faststream.rabbit.message import RabbitMessage
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.rabbit.shared.types import TimeoutType
from faststream.types import AnyDict

Expand Down Expand Up @@ -128,6 +126,7 @@ class RabbitRouter(StreamRouter[IncomingMessage]):
exchange: Union[str, RabbitExchange, None] = None,
*,
consume_args: Optional[AnyDict] = None,
reply_config: Optional[ReplyConfig] = None,
# broker arguments
dependencies: Sequence[params.Depends] = (),
filter: Filter[RabbitMessage] = default_filter,
Expand Down Expand Up @@ -177,26 +176,6 @@ class RabbitRouter(StreamRouter[IncomingMessage]):
user_id: Optional[str] = None,
app_id: Optional[str] = None,
) -> Publisher: ...
@overload
def after_startup(
self,
func: Callable[[AppType], Mapping[str, Any]],
) -> Callable[[AppType], Mapping[str, Any]]: ...
@overload
def after_startup(
self,
func: Callable[[AppType], Awaitable[Mapping[str, Any]]],
) -> Callable[[AppType], Awaitable[Mapping[str, Any]]]: ...
@overload
def after_startup(
self,
func: Callable[[AppType], None],
) -> Callable[[AppType], None]: ...
@overload
def after_startup(
self,
func: Callable[[AppType], Awaitable[None]],
) -> Callable[[AppType], Awaitable[None]]: ...
@override
@staticmethod
def _setup_log_context( # type: ignore[override]
Expand Down
3 changes: 2 additions & 1 deletion faststream/rabbit/router.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ from faststream.broker.wrapper import HandlerCallWrapper
from faststream.rabbit.asyncapi import Publisher
from faststream.rabbit.message import RabbitMessage
from faststream.rabbit.shared.router import RabbitRoute
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue
from faststream.rabbit.shared.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.rabbit.shared.types import TimeoutType
from faststream.types import AnyDict

Expand Down Expand Up @@ -52,6 +52,7 @@ class RabbitRouter(BrokerRouter[int, aio_pika.IncomingMessage]):
exchange: Union[str, RabbitExchange, None] = None,
*,
consume_args: Optional[AnyDict] = None,
reply_config: Optional[ReplyConfig] = None,
# broker arguments
dependencies: Sequence[Depends] = (),
filter: Filter[RabbitMessage] = default_filter,
Expand Down
8 changes: 8 additions & 0 deletions faststream/rabbit/shared/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass, field
from typing import Optional, Pattern

from pydantic import BaseModel

from faststream._compat import PYDANTIC_V2
from faststream.broker.schemas import NameRequired
from faststream.rabbit.shared.constants import ExchangeType
Expand Down Expand Up @@ -226,6 +228,12 @@ def __init__(
)


class ReplyConfig(BaseModel):
mandatory: bool = True
immediate: bool = False
persist: bool = False


def get_routing_hash(
queue: RabbitQueue,
exchange: Optional[RabbitExchange] = None,
Expand Down
Loading

0 comments on commit cfe3666

Please sign in to comment.