Skip to content

Commit

Permalink
Add request_id parameter (#4384)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits authored Nov 26, 2024
1 parent 1f07e5b commit cf80b1b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import inspect
import logging
import threading
import uuid
import warnings
from asyncio import CancelledError, Future, Task
from collections.abc import Sequence
Expand Down Expand Up @@ -53,6 +54,7 @@ class PublishMessageEnvelope:
sender: AgentId | None
topic_id: TopicId
metadata: EnvelopeMetadata | None = None
message_id: str


@dataclass(kw_only=True)
Expand Down Expand Up @@ -256,6 +258,7 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
with self._tracer_helper.trace_block(
"create",
Expand All @@ -268,6 +271,9 @@ async def publish_message(
content = message.__dict__ if hasattr(message, "__dict__") else message
logger.info(f"Publishing message of type {type(message).__name__} to all subscribers: {content}")

if message_id is None:
message_id = str(uuid.uuid4())

# event_logger.info(
# MessageEvent(
# payload=message,
Expand All @@ -285,6 +291,7 @@ async def publish_message(
sender=sender,
topic_id=topic_id,
metadata=get_telemetry_envelope_metadata(),
message_id=message_id,
)
)

Expand Down Expand Up @@ -327,6 +334,8 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None:
topic_id=None,
is_rpc=True,
cancellation_token=message_envelope.cancellation_token,
# Will be fixed when send API removed
message_id="NOT_DEFINED_TODO_FIX",
)
with MessageHandlerContext.populate_context(recipient_agent.id):
response = await recipient_agent.on_message(
Expand Down Expand Up @@ -385,6 +394,7 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
topic_id=message_envelope.topic_id,
is_rpc=False,
cancellation_token=message_envelope.cancellation_token,
message_id=message_envelope.message_id,
)
agent = await self._get_agent(agent_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import signal
import uuid
import warnings
from asyncio import Future, Task
from collections import defaultdict
Expand Down Expand Up @@ -371,11 +372,17 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
if not self._running:
raise ValueError("Runtime must be running when publishing message.")
if self._host_connection is None:
raise RuntimeError("Host connection is not set.")
if message_id is None:
message_id = str(uuid.uuid4())

# TODO: consume message_id

message_type = self._serialization_registry.type_name(message)
with self._trace_helper.trace_block(
"create", topic_id, parent=None, extraAttributes={"message_type": message_type}
Expand Down Expand Up @@ -447,6 +454,7 @@ async def _process_request(self, request: agent_worker_pb2.RpcRequest) -> None:
topic_id=None,
is_rpc=True,
cancellation_token=CancellationToken(),
message_id=request.request_id,
)

# Call the receiving agent.
Expand Down Expand Up @@ -530,11 +538,13 @@ async def _process_event(self, event: agent_worker_pb2.Event) -> None:
for agent_id in recipients:
if agent_id == sender:
continue
# TODO: consume message_id
message_context = MessageContext(
sender=sender,
topic_id=topic_id,
is_rpc=False,
cancellation_token=CancellationToken(),
message_id="NOT_DEFINED_TODO_FIX",
)
agent = await self._get_agent(agent_id)
with MessageHandlerContext.populate_context(agent.id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
"""Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
Expand All @@ -64,7 +65,8 @@ async def publish_message(
message (Any): The message to publish.
topic (TopicId): The topic to publish the message to.
sender (AgentId | None, optional): The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None.
cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None.
message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.
Raises:
UndeliverableException: If the message cannot be delivered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class MessageContext:
topic_id: TopicId | None
is_rpc: bool
cancellation_token: CancellationToken
message_id: str

0 comments on commit cf80b1b

Please sign in to comment.