-
I have been doing some explorations into Semantic Kernel's Process frameowork in Python, and have been attempting to create something similar to dotnet Example Step 04. My understanding from the code is that
I have attempted to follow the same process in Python, but I am unable to add the AgentGroupChat to the kernel through the add_service() method. When I attempt to do this, I get the below error:
I am also unable to give my AgentGroupChat a service_id, which I believe would resolve this issue. Is my thought process on the implementation of this correct, or should I be implementing this in a different manner in Python? Thanks in advance for any advice. Below is an extract of my kernel setup process where I am trying to add the AgentGroupChat as a service to my kernel:
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hi @FredMCA, thank you for your question. We do have work on the SK Python side to get to sample parity with .Net, as you've pointed out. It's great you've been looking into it on your side. The first thing to callout is the difference in the ability to use dependency injection in .Net versus Python. The Step04 process sample in .Net is showing lots of use of dependency injection, to retrieve registered resources. In Python, to add a service to the kernel, it needs to be of type The way we can handle passing in the necessary dependencies is through creating a custom state for the step. As shown in the following concept sample, the step itself has a parameter-less constructor, but we are able to create a custom state that we can use as we run the process. The reason, right now, that we need a parameterless constructor, and we handle things through state is because when we create the process, we are registering the steps The code looks like: # Define a sample `CStepState` that will keep track of the current cycle.
class CStepState:
current_cycle: int = 0
# Define a sample `CStep` step that will emit an `ExitRequested` event after 3 cycles.
class CStep(KernelProcessStep[CStepState]):
state: CStepState = Field(default_factory=CStepState)
# The activate method overrides the base class method to set the state in the step.
async def activate(self, state: KernelProcessStepState[CStepState]):
"""Activates the step and sets the state."""
self.state = state.state
@kernel_function()
async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepdata: str):
self.state.current_cycle += 1
print(f"CStep Current Cycle: {self.state.current_cycle}")
if self.state.current_cycle == 3:
print("CStep Exit Requested")
await context.emit_event(process_event=CommonEvents.ExitRequested)
return
await context.emit_event(process_event=CommonEvents.CStepDone) In helping to answer your question, I started playing around with an # Copyright (c) Microsoft. All rights reserved.
import asyncio
from enum import Enum
from pydantic import Field
from semantic_kernel.agents import AgentGroupChat, ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import ChatHistory, ChatMessageContent
from semantic_kernel.exceptions import KernelException
from semantic_kernel.functions.kernel_function_decorator import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.kernel_process import (
KernelProcess,
KernelProcessStep,
KernelProcessStepContext,
KernelProcessStepState,
)
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.local_runtime.local_kernel_process import start
class ChatHistoryProvider:
"""A Sample Chat History provider."""
def __init__(self):
self.messages: list[ChatMessageContent] = []
async def get_history(self) -> list[ChatMessageContent]:
return self.messages
async def add_message(self, message: ChatMessageContent):
self.messages.append(message)
async def commit(self):
pass
###############################################################################
# Define events for the agent orchestration
###############################################################################
class AgentOrchestrationEvents:
StartProcess = "StartProcess"
AgentResponse = "AgentResponse"
AgentResponded = "AgentResponded"
AgentWorking = "AgentWorking"
GroupInput = "GroupInput"
GroupMessage = "GroupMessage"
GroupCompleted = "GroupCompleted"
class CommonEvents:
UserInputReceived = "UserInputReceived"
UserInputComplete = "UserInputComplete"
###############################################################################
# Steps
###############################################################################
class ScriptedUserInputState(KernelBaseModel):
user_inputs: list[str] = Field(default_factory=list)
current_index: int = 0
# We define the KernelProcessStep of type `ScriptedUserInputState`
class ScriptedUserInputStep(KernelProcessStep[ScriptedUserInputState]):
class Functions(Enum):
GetUserInput = "get_user_input"
# We define the KernelProcessStepState of type `ScriptedUserInputState` as well
async def activate(self, state: KernelProcessStepState[ScriptedUserInputState]):
# You could also define the state as a Pydantic field on the class if you'd like
if state.state is None:
state.state = ScriptedUserInputState()
self.state = state.state
self.populate_user_inputs()
def populate_user_inputs(self):
"""Override to fill user_inputs with some scripted data for demonstration."""
self.state.user_inputs.append("Hello agent")
self.state.user_inputs.append("Tell me about upcoming events")
self.state.user_inputs.append("Ok, let's do group chat now")
self.state.user_inputs.append("exit")
@kernel_function(name=Functions.GetUserInput.value)
async def get_user_input(self, context: KernelProcessStepContext):
if self.state.current_index >= len(self.state.user_inputs):
# If no more input, let's end
await context.emit_event(CommonEvents.UserInputComplete)
return
user_msg = self.state.user_inputs[self.state.current_index]
self.state.current_index += 1
if "exit" in user_msg.lower():
await context.emit_event(CommonEvents.UserInputComplete)
return
# user typed something
await context.emit_event(CommonEvents.UserInputReceived, data=user_msg)
###############################################################################
# Define the example Manager Agent State and Step
###############################################################################
kernel = Kernel()
kernel.add_service(AzureChatCompletion(service_id="default"))
chat_completion_agent = ChatCompletionAgent(
name="Manager",
instructions="Manager instructions: do not solve user request fully, delegate to group if needed.",
kernel=kernel,
)
chat_history_provider = ChatHistoryProvider()
class ManagerAgentState(KernelBaseModel):
# Create a sample Pydantic class to manage the state
agent: ChatCompletionAgent = Field(default_factory=lambda: chat_completion_agent)
history_provider: ChatHistoryProvider = Field(default_factory=lambda: chat_history_provider)
# Define the class as a KernelProcessStep of type `ManagerAgentState`
class ManagerAgentStep(KernelProcessStep[ManagerAgentState]):
# State is defined as a Field on the model
state: ManagerAgentState = Field(default_factory=ManagerAgentState)
class Functions(Enum):
InvokeAgent = "InvokeAgent"
InvokeGroup = "InvokeGroup"
ReceiveResponse = "ReceiveResponse"
async def activate(self, state: KernelProcessStepState[ManagerAgentState]):
"""Activates the step and sets the state."""
# Initial state is replaced with the initial_state at runtime
self.state = state.state
@kernel_function(name=Functions.InvokeAgent.value)
async def invoke_agent(self, context: KernelProcessStepContext, user_input: str):
# Add user input to chat history
await self.state.history_provider.add_message(ChatMessageContent(role="user", content=user_input))
# Do agent invocation
msgs = await self.state.history_provider.get_history()
chat_history = ChatHistory(messages=msgs)
async for response in self.state.agent.invoke(chat_history):
await self.state.history_provider.add_message(response)
# Emit event for each agent response
await context.emit_event(AgentOrchestrationEvents.AgentResponse, data=response)
if "group chat" in user_input.lower():
await context.emit_event(AgentOrchestrationEvents.AgentWorking)
else:
await context.emit_event(AgentOrchestrationEvents.AgentResponded)
@kernel_function(name=Functions.InvokeGroup.value)
async def invoke_group(self, context: KernelProcessStepContext):
# Summarize conversation into something for group
# (Placeholder: just read last user message)
msgs = await self.state.history_provider.get_history()
summary = msgs[-1].content if msgs else "No conversation so far"
await context.emit_event(AgentOrchestrationEvents.GroupInput, data=summary)
@kernel_function(name=Functions.ReceiveResponse.value)
async def receive_response(self, context: KernelProcessStepContext, response: str):
# Add message to main agent's chat so it sees the group result
msg = ChatMessageContent(role="assistant", content=response, name="GroupChat")
await self.state.history_provider.add_message(msg)
# Emit the new agent response, then tell the system we're done with the group step
await context.emit_event(AgentOrchestrationEvents.AgentResponse, data=msg)
await context.emit_event(AgentOrchestrationEvents.AgentResponded)
###############################################################################
# Define the Agent Group Chat State and Step
###############################################################################
# You could create more agents here, a selection strategy, and a termination strategy if desired
agent_group_chat = AgentGroupChat(agents=[chat_completion_agent])
class AgentGroupChatState(KernelBaseModel):
group_chat: AgentGroupChat = Field(default_factory=lambda: agent_group_chat)
class AgentGroupChatStep(KernelProcessStep[AgentGroupChatState]):
state: AgentGroupChatState = Field(default_factory=AgentGroupChatState)
class Functions(Enum):
InvokeAgentGroup = "InvokeAgentGroup"
async def activate(self, state: KernelProcessStepState[AgentGroupChatState]):
"""Activates the step and sets the state."""
self.state = state.state
@kernel_function(name=Functions.InvokeAgentGroup.value)
async def invoke_agent_group(self, context: KernelProcessStepContext, input: str):
# Start group chat with the summary as "assistant" message, could be user, if needed
await self.state.group_chat.add_chat_message(ChatMessageContent(role="assistant", content=input))
# Let the group chat produce some responses
responses: list[ChatMessageContent] = []
async for response in self.state.group_chat.invoke():
# For each partial response:
await context.emit_event(AgentOrchestrationEvents.GroupMessage, data=response)
responses.append(response)
# Summarize group chat as final data
# (Placeholder logic: just combine all final responses)
final_text = " | ".join(r.content for r in responses)
await context.emit_event(AgentOrchestrationEvents.GroupCompleted, data=final_text)
###############################################################################
# Define the Render Message Step
###############################################################################
class RenderMessageStep(KernelProcessStep):
class Functions(Enum):
RenderDone = "RenderDone"
RenderError = "RenderError"
RenderUserText = "RenderUserText"
RenderMessage = "RenderMessage"
RenderInnerMessage = "RenderInnerMessage"
@kernel_function(name=Functions.RenderDone.value)
async def render_done(self):
print("[Process] DONE!")
@kernel_function(name=Functions.RenderError.value)
async def render_error(self, error: KernelException):
print(f"[Process] ERROR: {str(error)}")
@kernel_function(name=Functions.RenderUserText.value)
async def render_user_text(self, user_text: str):
print(f"USER: {user_text}")
@kernel_function(name=Functions.RenderMessage.value)
async def render_message(self, message: ChatMessageContent):
print(f"{message.role.upper()}({message.name}): {message.content}")
@kernel_function(name=Functions.RenderInnerMessage.value)
async def render_inner_message(self, message: ChatMessageContent):
print(f"INNER-{message.role.upper()}({message.name}): {message.content}")
###############################################################################
# Building the process
###############################################################################
def build_agent_orchestration_process() -> KernelProcess:
process = ProcessBuilder(name="AgentGroupChatProcess")
# Steps
user_input_step = process.add_step(ScriptedUserInputStep, name="UserInputStep")
manager_step = process.add_step(
ManagerAgentStep,
name="ManagerAgentStep",
initial_state=ManagerAgentState(agent=chat_completion_agent, history_provider=chat_history_provider),
)
group_step = process.add_step(
AgentGroupChatStep, name="AgentGroupStep", initial_state=AgentGroupChatState(group_chat=agent_group_chat)
)
render_step = process.add_step(RenderMessageStep, name="RenderMessageStep")
# 1) Start Process => go to user input
process.on_input_event(AgentOrchestrationEvents.StartProcess).send_event_to(user_input_step)
# 2) UserInput => on CommonEvents.UserInputReceived => managerStep.InvokeAgent
user_input_step.on_event(CommonEvents.UserInputReceived).send_event_to(
manager_step, function_name=ManagerAgentStep.Functions.InvokeAgent, parameter_name="user_input"
)
# 2a) Also pass user input to renderUserText
user_input_step.on_event(CommonEvents.UserInputReceived).send_event_to(
render_step, function_name=RenderMessageStep.Functions.RenderUserText, parameter_name="user_text"
)
# 2b) If user is done, we show "done" and stop
user_input_step.on_event(CommonEvents.UserInputComplete).send_event_to(
render_step, function_name=RenderMessageStep.Functions.RenderDone
).stop_process()
# 3) Manager => AgentResponse => render
manager_step.on_event(AgentOrchestrationEvents.AgentResponse).send_event_to(
render_step, function_name=RenderMessageStep.Functions.RenderMessage, parameter_name="message"
)
# 4) Manager => AgentResponded => go back to user input
manager_step.on_event(AgentOrchestrationEvents.AgentResponded).send_event_to(user_input_step)
# 5) Manager => AgentWorking => managerStep.InvokeGroup
manager_step.on_event(AgentOrchestrationEvents.AgentWorking).send_event_to(
manager_step, function_name=ManagerAgentStep.Functions.InvokeGroup
)
# 6) Manager => GroupInput => groupStep.InvokeAgentGroup
manager_step.on_event(AgentOrchestrationEvents.GroupInput).send_event_to(
group_step, function_name=AgentGroupChatStep.Functions.InvokeAgentGroup, parameter_name="input"
)
# 7) Group => GroupMessage => render as "inner message"
group_step.on_event(AgentOrchestrationEvents.GroupMessage).send_event_to(
render_step, function_name=RenderMessageStep.Functions.RenderInnerMessage, parameter_name="message"
)
# 8) Group => GroupCompleted => managerStep.ReceiveResponse
group_step.on_event(AgentOrchestrationEvents.GroupCompleted).send_event_to(
manager_step, function_name=ManagerAgentStep.Functions.ReceiveResponse, parameter_name="response"
)
return process.build()
###############################################################################
# Run the process
###############################################################################
async def main():
# Define and build the process
kernel_process = build_agent_orchestration_process()
# Run the local process
async with await start(
process=kernel_process,
kernel=kernel,
initial_event=KernelProcessEvent(id=AgentOrchestrationEvents.StartProcess),
) as process_context:
process_state = await process_context.get_state()
step_state: KernelProcessStepState[AgentGroupChatState] = next(
(s.state for s in process_state.steps if s.state.name == "AgentGroupStep"), None
)
assert step_state.state # nosec
# As an example, print the AgentGroupStep history
print("*" * 60)
print("AgentGroupStep history:")
for msg in step_state.state.group_chat.history.messages:
print(f" {msg.role}: {msg.content}")
if __name__ == "__main__":
asyncio.run(main()) This does work, in a very basic sense, to hopefully show you how you can handle state and dependencies for steps. The output may look something like:
As a big note: this sample works fine in a local runtime scenario. I have not tested it in a distributed runtime scenario like Dapr. Some things may need to change, I expect. |
Beta Was this translation helpful? Give feedback.
Hi @FredMCA, thank you for your question. We do have work on the SK Python side to get to sample parity with .Net, as you've pointed out. It's great you've been looking into it on your side.
The first thing to callout is the difference in the ability to use dependency injection in .Net versus Python. The Step04 process sample in .Net is showing lots of use of dependency injection, to retrieve registered resources. In Python, to add a service to the kernel, it needs to be of type
AIServiceClientBase
, which currentlyAgentGroupChat
is not.The way we can handle passing in the necessary dependencies is through creating a custom state for the step.
As shown in the following concept sample, th…