Move tool use with intervention to cookbook; remove duplicated examples (#658)

This commit is contained in:
Eric Zhu 2024-09-27 20:57:44 -07:00 committed by GitHub
parent 4250c0619a
commit da246ef71b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 287 additions and 739 deletions

View File

@ -0,0 +1,284 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# User Approval for Tool Execution using Intervention Handler\n",
"\n",
"This cookbook shows how to intercept the tool execution using\n",
"an intervention hanlder, and prompt the user for permission to execute the tool."
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any, List\n",
"\n",
"from autogen_core.application import SingleThreadedAgentRuntime\n",
"from autogen_core.base import AgentId, AgentType, MessageContext\n",
"from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage\n",
"from autogen_core.components import FunctionCall, RoutedAgent, message_handler\n",
"from autogen_core.components.code_executor import DockerCommandLineCodeExecutor\n",
"from autogen_core.components.models import (\n",
" ChatCompletionClient,\n",
" LLMMessage,\n",
" OpenAIChatCompletionClient,\n",
" SystemMessage,\n",
" UserMessage,\n",
")\n",
"from autogen_core.components.tool_agent import ToolAgent, ToolException, tool_agent_caller_loop\n",
"from autogen_core.components.tools import PythonCodeExecutionTool, ToolSchema"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's define a simple message type that carries a string content."
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's create a simple tool use agent that is capable of using tools through a\n",
"{py:class}`~autogen_core.components.tool_agent.ToolAgent`."
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"class ToolUseAgent(RoutedAgent):\n",
" \"\"\"An agent that uses tools to perform tasks. It executes the tools\n",
" by itself by sending the tool execution task to a ToolAgent.\"\"\"\n",
"\n",
" def __init__(\n",
" self,\n",
" description: str,\n",
" system_messages: List[SystemMessage],\n",
" model_client: ChatCompletionClient,\n",
" tool_schema: List[ToolSchema],\n",
" tool_agent_type: AgentType,\n",
" ) -> None:\n",
" super().__init__(description)\n",
" self._model_client = model_client\n",
" self._system_messages = system_messages\n",
" self._tool_schema = tool_schema\n",
" self._tool_agent_id = AgentId(type=tool_agent_type, key=self.id.key)\n",
"\n",
" @message_handler\n",
" async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:\n",
" \"\"\"Handle a user message, execute the model and tools, and returns the response.\"\"\"\n",
" session: List[LLMMessage] = [UserMessage(content=message.content, source=\"User\")]\n",
" # Use the tool agent to execute the tools, and get the output messages.\n",
" output_messages = await tool_agent_caller_loop(\n",
" self,\n",
" tool_agent_id=self._tool_agent_id,\n",
" model_client=self._model_client,\n",
" input_messages=session,\n",
" tool_schema=self._tool_schema,\n",
" cancellation_token=ctx.cancellation_token,\n",
" )\n",
" # Extract the final response from the output messages.\n",
" final_response = output_messages[-1].content\n",
" assert isinstance(final_response, str)\n",
" return Message(content=final_response)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The tool use agent sends tool call requests to the tool agent to execute tools,\n",
"so we can intercept the messages sent by the tool use agent to the tool agent\n",
"to prompt the user for permission to execute the tool.\n",
"\n",
"Let's create an intervention handler that intercepts the messages and prompts\n",
"user for before allowing the tool execution."
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"class ToolInterventionHandler(DefaultInterventionHandler):\n",
" async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]:\n",
" if isinstance(message, FunctionCall):\n",
" # Request user prompt for tool execution.\n",
" user_input = input(\n",
" f\"Function call: {message.name}\\nArguments: {message.arguments}\\nDo you want to execute the tool? (y/n): \"\n",
" )\n",
" if user_input.strip().lower() != \"y\":\n",
" raise ToolException(content=\"User denied tool execution.\", call_id=message.id)\n",
" return message"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can create a runtime with the intervention handler registered."
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"# Create the runtime with the intervention handler.\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[ToolInterventionHandler()])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example, we will use a tool for Python code execution.\n",
"First, we create a Docker-based command-line code executor\n",
"using {py:class}`~autogen_core.components.code_executor.DockerCommandLineCodeExecutor`,\n",
"and then use it to instantiate a built-in Python code execution tool\n",
"{py:class}`~autogen_core.components.tools.PythonCodeExecutionTool`\n",
"that runs code in a Docker container."
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"# Create the docker executor for the Python code execution tool.\n",
"docker_executor = DockerCommandLineCodeExecutor()\n",
"\n",
"# Create the Python code execution tool.\n",
"python_tool = PythonCodeExecutionTool(executor=docker_executor)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Register the agents with tools and tool schema."
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"AgentType(type='tool_enabled_agent')"
]
},
"execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Register agents.\n",
"tool_agent_type = await ToolAgent.register(\n",
" runtime,\n",
" \"tool_executor_agent\",\n",
" lambda: ToolAgent(\n",
" description=\"Tool Executor Agent\",\n",
" tools=[python_tool],\n",
" ),\n",
")\n",
"await ToolUseAgent.register(\n",
" runtime,\n",
" \"tool_enabled_agent\",\n",
" lambda: ToolUseAgent(\n",
" description=\"Tool Use Agent\",\n",
" system_messages=[SystemMessage(\"You are a helpful AI Assistant. Use your tools to solve problems.\")],\n",
" model_client=OpenAIChatCompletionClient(model=\"gpt-4o-mini\"),\n",
" tool_schema=[python_tool.schema],\n",
" tool_agent_type=tool_agent_type,\n",
" ),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the agents by starting the runtime and sending a message to the tool use agent.\n",
"The intervention handler will prompt you for permission to execute the tool."
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The output of the code is: **Hello, World!**\n"
]
}
],
"source": [
"# Start the runtime and the docker executor.\n",
"await docker_executor.start()\n",
"runtime.start()\n",
"\n",
"# Send a task to the tool user.\n",
"response = await runtime.send_message(\n",
" Message(\"Run the following Python code: print('Hello, World!')\"), AgentId(\"tool_enabled_agent\", \"default\")\n",
")\n",
"print(response.content)\n",
"\n",
"# Stop the runtime and the docker executor.\n",
"await runtime.stop()\n",
"await docker_executor.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -90,6 +90,7 @@ guides/multi-agent-debate
cookbook/azure-openai-with-aad-auth
cookbook/termination-with-intervention
cookbook/tool-use-with-intervention
cookbook/extracting-results-with-an-agent
cookbook/openai-assistant-agent
cookbook/langgraph-agent

View File

@ -1,13 +1,11 @@
# Examples
This directory contains examples of how to use AutoGen core.
See [user guide](../docs/src/core-user-guide/guides/) and
[cookbooks](../docs/src/core-user-guide/cookbook/) for more examples.
See [Running the examples](#running-the-examples) for instructions on how to run the examples.
- [`coding_pub_sub.py`](coding_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses broadcast communication.
- [`coding_direct_with_intercept.py`](coding_direct_with_intercept.py): an example showing human-in-the-loop for approving or denying tool execution.
- [`assistant.py`](assistant.py): a demonstration of how to use the OpenAI Assistant API to create
a ChatGPT agent.
- [`chest_game.py`](chess_game.py): an example with two chess player agents that executes its own tools to demonstrate tool use and reflection on tool use.
- [`slow_human_in_loop.py`](slow_human_in_loop.py): an example showing human-in-the-loop which waits for human input before making the tool call.

View File

@ -1,250 +0,0 @@
"""This is an example of a terminal-based ChatGPT clone
using an OpenAIAssistantAgent and event-based orchestration."""
import argparse
import asyncio
import logging
import os
import re
from typing import List
import aiofiles
import openai
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext, AgentRuntime, MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.model_context import BufferedChatCompletionContext
from common.agents import OpenAIAssistantAgent
from common.patterns._group_chat_manager import GroupChatManager
from common.types import PublishNow, TextMessage
from openai import AsyncAssistantEventHandler
from openai.types.beta.thread import ToolResources
from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override
sep = "-" * 50
class UserProxyAgent(RoutedAgent):
def __init__( # type: ignore
self,
client: openai.AsyncClient, # type: ignore
assistant_id: str,
thread_id: str,
vector_store_id: str,
) -> None: # type: ignore
super().__init__(
description="A human user",
) # type: ignore
self._client = client
self._assistant_id = assistant_id
self._thread_id = thread_id
self._vector_store_id = vector_store_id
@message_handler() # type: ignore
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
# TODO: render image if message has image.
# print(f"{message.source}: {message.content}")
pass
async def _get_user_input(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)
@message_handler() # type: ignore
async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None:
while True:
user_input = await self._get_user_input(f"\n{sep}\nYou: ")
# Parse upload file command '[upload code_interpreter | file_search filename]'.
match = re.search(r"\[upload\s+(code_interpreter|file_search)\s+(.+)\]", user_input)
if match:
# Purpose of the file.
purpose = match.group(1)
# Extract file path.
file_path = match.group(2)
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
continue
# Filename.
file_name = os.path.basename(file_path)
# Read file content.
async with aiofiles.open(file_path, "rb") as f:
file_content = await f.read()
if purpose == "code_interpreter":
# Upload file.
file = await self._client.files.create(file=(file_name, file_content), purpose="assistants")
# Get existing file ids from tool resources.
thread = await self._client.beta.threads.retrieve(thread_id=self._thread_id)
tool_resources: ToolResources = thread.tool_resources if thread.tool_resources else ToolResources()
assert tool_resources.code_interpreter is not None
if tool_resources.code_interpreter.file_ids:
file_ids = tool_resources.code_interpreter.file_ids
else:
file_ids = [file.id]
# Update thread with new file.
await self._client.beta.threads.update(
thread_id=self._thread_id,
tool_resources={"code_interpreter": {"file_ids": file_ids}},
)
elif purpose == "file_search":
# Upload file to vector store.
file_batch = await self._client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=self._vector_store_id,
files=[(file_name, file_content)],
)
assert file_batch.status == "completed"
print(f"Uploaded file: {file_name}")
continue
elif user_input.startswith("[upload"):
print("Invalid upload command. Please use '[upload code_interpreter | file_search filename]'.")
continue
elif user_input.strip().lower() == "exit":
# Exit handler.
return
else:
# Publish user input and exit handler.
await self.publish_message(
TextMessage(content=user_input, source=self.metadata["type"]), topic_id=DefaultTopicId()
)
return
class EventHandler(AsyncAssistantEventHandler):
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
print(delta.value, end="", flush=True)
@override
async def on_run_step_created(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\nGenerating code to interpret:\n\n```python")
@override
async def on_run_step_done(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\n```\nExecuting code...")
@override
async def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep) -> None:
details = delta.step_details
if details is not None and details.type == "tool_calls":
for tool in details.tool_calls or []:
if tool.type == "code_interpreter" and tool.code_interpreter and tool.code_interpreter.input:
print(tool.code_interpreter.input, end="", flush=True)
@override
async def on_message_created(self, message: Message) -> None:
print(f"{sep}\nAssistant:\n")
@override
async def on_message_done(self, message: Message) -> None:
# print a citation to the file searched
if not message.content:
return
content = message.content[0]
if not content.type == "text":
return
text_content = content.text
annotations = text_content.annotations
citations: List[str] = []
for index, annotation in enumerate(annotations):
text_content.value = text_content.value.replace(annotation.text, f"[{index}]")
if file_citation := getattr(annotation, "file_citation", None):
client = openai.AsyncClient()
cited_file = await client.files.retrieve(file_citation.file_id)
citations.append(f"[{index}] {cited_file.filename}")
if citations:
print("\n".join(citations))
async def assistant_chat(runtime: AgentRuntime) -> str:
oai_assistant = openai.beta.assistants.create(
model="gpt-4-turbo",
description="An AI assistant that helps with everyday tasks.",
instructions="Help the user with their task.",
tools=[{"type": "code_interpreter"}, {"type": "file_search"}],
)
vector_store = openai.beta.vector_stores.create()
thread = openai.beta.threads.create(
tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}},
)
await runtime.register(
"Assistant",
lambda: OpenAIAssistantAgent(
description="An AI assistant that helps with everyday tasks.",
client=openai.AsyncClient(),
assistant_id=oai_assistant.id,
thread_id=thread.id,
assistant_event_handler_factory=lambda: EventHandler(),
),
lambda: [DefaultSubscription()],
)
await runtime.register(
"User",
lambda: UserProxyAgent(
client=openai.AsyncClient(),
assistant_id=oai_assistant.id,
thread_id=thread.id,
vector_store_id=vector_store.id,
),
lambda: [DefaultSubscription()],
)
# Create a group chat manager to facilitate a turn-based conversation.
await runtime.register(
"GroupChatManager",
lambda: GroupChatManager(
description="A group chat manager.",
model_context=BufferedChatCompletionContext(buffer_size=10),
participants=[
AgentId("Assistant", AgentInstantiationContext.current_agent_id().key),
AgentId("User", AgentInstantiationContext.current_agent_id().key),
],
),
lambda: [DefaultSubscription()],
)
return "User"
async def main() -> None:
usage = """Chat with an AI assistant backed by OpenAI Assistant API.
You can upload files to the assistant using the command:
[upload code_interpreter | file_search filename]
where 'code_interpreter' or 'file_search' is the purpose of the file and
'filename' is the path to the file. For example:
[upload code_interpreter data.csv]
This will upload data.csv to the assistant for use with the code interpreter tool.
Type "exit" to exit the chat.
"""
runtime = SingleThreadedAgentRuntime()
user = await assistant_chat(runtime)
runtime.start()
print(usage)
# Request the user to start the conversation.
await runtime.send_message(PublishNow(), AgentId(user, "default"))
# TODO: have a way to exit the loop.
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Chat with an AI assistant.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
handler = logging.FileHandler("assistant.log")
logging.getLogger("autogen_core").addHandler(handler)
asyncio.run(main())

View File

@ -1,147 +0,0 @@
"""
This example implements a tool-enabled agent that uses tools to perform tasks.
1. The tool use agent receives a user message, and makes an inference using a model.
If the response is a list of function calls, the tool use agent executes the tools by
sending tool execution task to a tool executor agent.
2. The tool executor agent executes the tools and sends the results back to the
tool use agent, who makes an inference using the model again.
3. The agents keep executing the tools until the inference response is not a
list of function calls.
4. The tool use agent returns the final response to the user.
"""
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext
from autogen_core.components import FunctionCall, RoutedAgent, message_handler
from autogen_core.components.code_executor import DockerCommandLineCodeExecutor
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
SystemMessage,
UserMessage,
)
from autogen_core.components.tool_agent import ToolAgent, ToolException
from autogen_core.components.tools import PythonCodeExecutionTool, Tool, ToolSchema
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
@dataclass
class Message:
content: str
class ToolUseAgent(RoutedAgent):
"""An agent that uses tools to perform tasks. It executes the tools
by itself by sending the tool execution task to itself."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
tool_schema: List[ToolSchema],
tool_agent: AgentId,
) -> None:
super().__init__(description)
self._model_client = model_client
self._system_messages = system_messages
self._tool_schema = tool_schema
self._tool_agent = tool_agent
@message_handler
async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:
"""Handle a user message, execute the model and tools, and returns the response."""
session: List[LLMMessage] = []
session.append(UserMessage(content=message.content, source="User"))
response = await self._model_client.create(self._system_messages + session, tools=self._tool_schema)
session.append(AssistantMessage(content=response.content, source=self.metadata["type"]))
# Keep executing the tools until the response is not a list of function calls.
while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content):
results: List[FunctionExecutionResult | BaseException] = await asyncio.gather(
*[
self.send_message(call, self._tool_agent, cancellation_token=ctx.cancellation_token)
for call in response.content
],
return_exceptions=True,
)
# Combine the results into a single response and handle exceptions.
function_results: List[FunctionExecutionResult] = []
for result in results:
if isinstance(result, FunctionExecutionResult):
function_results.append(result)
elif isinstance(result, ToolException):
function_results.append(FunctionExecutionResult(content=f"Error: {result}", call_id=result.call_id))
elif isinstance(result, BaseException):
raise result
session.append(FunctionExecutionResultMessage(content=function_results))
# Execute the model again with the new response.
response = await self._model_client.create(self._system_messages + session, tools=self._tool_schema)
session.append(AssistantMessage(content=response.content, source=self.metadata["type"]))
assert isinstance(response.content, str)
return Message(content=response.content)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
async with DockerCommandLineCodeExecutor() as executor:
# Define the tools.
tools: List[Tool] = [
# A tool that executes Python code.
PythonCodeExecutionTool(
executor=executor,
)
]
# Register agents.
await runtime.register(
"tool_executor_agent",
lambda: ToolAgent(
description="Tool Executor Agent",
tools=tools,
),
)
await runtime.register(
"tool_enabled_agent",
lambda: ToolUseAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
tool_schema=[tool.schema for tool in tools],
tool_agent=AgentId("tool_executor_agent", AgentInstantiationContext.current_agent_id().key),
),
)
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(
Message("Run the following Python code: print('Hello, World!')"), AgentId("tool_enabled_agent", "default")
)
print(response.content)
# Run the runtime until the task is completed.
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,82 +0,0 @@
"""
This example show case how to intercept the tool execution using
intervention hanlder.
The intervention handler is used to intercept the FunctionCall message
before it is sent out, and prompt the user for permission to execute the tool.
"""
import asyncio
from typing import Any, List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext
from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage
from autogen_core.components import FunctionCall
from autogen_core.components.code_executor import DockerCommandLineCodeExecutor
from autogen_core.components.models import SystemMessage
from autogen_core.components.tool_agent import ToolAgent, ToolException
from autogen_core.components.tools import PythonCodeExecutionTool, Tool
from coding_direct import Message, ToolUseAgent
from common.utils import get_chat_completion_client_from_envs
class ToolInterventionHandler(DefaultInterventionHandler):
async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]:
if isinstance(message, FunctionCall):
# Request user prompt for tool execution.
user_input = input(
f"Function call: {message.name}\nArguments: {message.arguments}\nDo you want to execute the tool? (y/n): "
)
if user_input.strip().lower() != "y":
raise ToolException(content="User denied tool execution.", call_id=message.id)
return message
async def main() -> None:
# Create the runtime with the intervention handler.
runtime = SingleThreadedAgentRuntime(intervention_handlers=[ToolInterventionHandler()])
async with DockerCommandLineCodeExecutor() as executor:
# Define the tools.
tools: List[Tool] = [
# A tool that executes Python code.
PythonCodeExecutionTool(
executor=executor,
)
]
# Register agents.
await runtime.register(
"tool_executor_agent",
lambda: ToolAgent(
description="Tool Executor Agent",
tools=tools,
),
)
await runtime.register(
"tool_enabled_agent",
lambda: ToolUseAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
tool_schema=[tool.schema for tool in tools],
tool_agent=AgentId("tool_executor_agent", AgentInstantiationContext.current_agent_id().key),
),
)
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(
Message("Run the following Python code: print('Hello, World!')"), AgentId("tool_enabled_agent", "default")
)
print(response.content)
# Run the runtime until the task is completed.
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,11 +1,5 @@
from ._chat_completion_agent import ChatCompletionAgent
from ._image_generation_agent import ImageGenerationAgent
from ._oai_assistant import OpenAIAssistantAgent
from ._user_proxy import UserProxyAgent
__all__ = [
"ChatCompletionAgent",
"OpenAIAssistantAgent",
"UserProxyAgent",
"ImageGenerationAgent",
]

View File

@ -1,80 +0,0 @@
from typing import Literal
import openai
from autogen_core.base import CancellationToken, MessageContext
from autogen_core.components import (
DefaultTopicId,
Image,
RoutedAgent,
message_handler,
)
from autogen_core.components.model_context import ChatCompletionContext
from autogen_core.components.models import UserMessage
from ..types import (
MultiModalMessage,
PublishNow,
Reset,
TextMessage,
)
class ImageGenerationAgent(RoutedAgent):
"""An agent that generates images using DALL-E models. It publishes the
generated images as MultiModalMessage.
Args:
description (str): The description of the agent.
model_context (ChatCompletionContext): The context manager for storing
and retrieving ChatCompletion messages.
client (openai.AsyncClient): The client to use for the OpenAI API.
model (Literal["dall-e-2", "dall-e-3"], optional): The DALL-E model to use. Defaults to "dall-e-2".
"""
def __init__(
self,
description: str,
model_context: ChatCompletionContext,
client: openai.AsyncClient,
model: Literal["dall-e-2", "dall-e-3"] = "dall-e-2",
):
super().__init__(description)
self._client = client
self._model = model
self._model_context = model_context
@message_handler
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handle a text message. This method adds the message to the memory."""
await self._model_context.add_message(UserMessage(content=message.content, source=message.source))
@message_handler
async def on_reset(self, message: Reset, ctx: MessageContext) -> None:
await self._model_context.clear()
@message_handler
async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None:
"""Handle a publish now message. This method generates an image using a DALL-E model with
a prompt. The prompt is a concatenation of all TextMessages in the memory. The generated
image is published as a MultiModalMessage."""
response = await self._generate_response(ctx.cancellation_token)
await self.publish_message(response, topic_id=DefaultTopicId())
async def _generate_response(self, cancellation_token: CancellationToken) -> MultiModalMessage:
messages = await self._model_context.get_messages()
if len(messages) == 0:
return MultiModalMessage(
content=["I need more information to generate an image."], source=self.metadata["type"]
)
prompt = ""
for m in messages:
assert isinstance(m.content, str)
prompt += m.content + "\n"
prompt.strip()
response = await self._client.images.generate(model=self._model, prompt=prompt, response_format="b64_json")
assert len(response.data) > 0 and response.data[0].b64_json is not None
# Create a MultiModalMessage with the image.
image = Image.from_base64(response.data[0].b64_json)
multi_modal_message = MultiModalMessage(content=[image], source=self.metadata["type"])
return multi_modal_message

View File

@ -1,137 +0,0 @@
from typing import Any, Callable, List, Mapping
import openai
from autogen_core.base import (
CancellationToken,
MessageContext, # type: ignore
)
from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler
from openai import AsyncAssistantEventHandler
from openai.types import ResponseFormatJSONObject, ResponseFormatText
from ..types import PublishNow, Reset, RespondNow, ResponseFormat, TextMessage
class OpenAIAssistantAgent(RoutedAgent):
"""An agent implementation that uses the OpenAI Assistant API to generate
responses.
Args:
description (str): The description of the agent.
client (openai.AsyncClient): The client to use for the OpenAI API.
assistant_id (str): The assistant ID to use for the OpenAI API.
thread_id (str): The thread ID to use for the OpenAI API.
assistant_event_handler_factory (Callable[[], AsyncAssistantEventHandler], optional):
A factory function to create an async assistant event handler. Defaults to None.
If provided, the agent will use the streaming mode with the event handler.
If not provided, the agent will use the blocking mode to generate responses.
"""
def __init__(
self,
description: str,
client: openai.AsyncClient,
assistant_id: str,
thread_id: str,
assistant_event_handler_factory: (Callable[[], AsyncAssistantEventHandler] | None) = None,
) -> None:
super().__init__(description)
self._client = client
self._assistant_id = assistant_id
self._thread_id = thread_id
self._assistant_event_handler_factory = assistant_event_handler_factory
@message_handler()
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
"""Handle a text message. This method adds the message to the thread."""
# Save the message to the thread.
_ = await self._client.beta.threads.messages.create(
thread_id=self._thread_id,
content=message.content,
role="user",
metadata={"sender": message.source},
)
@message_handler()
async def on_reset(self, message: Reset, ctx: MessageContext) -> None:
"""Handle a reset message. This method deletes all messages in the thread."""
# Get all messages in this thread.
all_msgs: List[str] = []
while True:
if not all_msgs:
msgs = await self._client.beta.threads.messages.list(self._thread_id)
else:
msgs = await self._client.beta.threads.messages.list(self._thread_id, after=all_msgs[-1])
for msg in msgs.data:
all_msgs.append(msg.id)
if not msgs.has_next_page():
break
# Delete all the messages.
for msg_id in all_msgs:
status = await self._client.beta.threads.messages.delete(message_id=msg_id, thread_id=self._thread_id)
assert status.deleted is True
@message_handler()
async def on_respond_now(self, message: RespondNow, ctx: MessageContext) -> TextMessage:
"""Handle a respond now message. This method generates a response and returns it to the sender."""
return await self._generate_response(message.response_format, ctx.cancellation_token)
@message_handler()
async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None:
"""Handle a publish now message. This method generates a response and publishes it."""
response = await self._generate_response(message.response_format, ctx.cancellation_token)
await self.publish_message(response, DefaultTopicId())
async def _generate_response(
self,
requested_response_format: ResponseFormat,
cancellation_token: CancellationToken,
) -> TextMessage:
# Handle response format.
if requested_response_format == ResponseFormat.json_object:
response_format = ResponseFormatJSONObject(type="json_object") # type: ignore
else:
response_format = ResponseFormatText(type="text") # type: ignore
if self._assistant_event_handler_factory is not None:
# Use event handler and streaming mode if available.
async with self._client.beta.threads.runs.stream(
thread_id=self._thread_id,
assistant_id=self._assistant_id,
event_handler=self._assistant_event_handler_factory(),
response_format=response_format, # type: ignore
) as stream:
run = await stream.get_final_run()
else:
# Use blocking mode.
run = await self._client.beta.threads.runs.create(
thread_id=self._thread_id,
assistant_id=self._assistant_id,
response_format=response_format, # type: ignore
)
if run.status != "completed":
# TODO: handle other statuses.
raise ValueError(f"Run did not complete successfully: {run}")
# Get the last message from the run.
response = await self._client.beta.threads.messages.list(self._thread_id, run_id=run.id, order="desc", limit=1)
last_message_content = response.data[0].content
# TODO: handle array of content.
text_content = [content for content in last_message_content if content.type == "text"]
if not text_content:
raise ValueError(f"Expected text content in the last message: {last_message_content}")
# TODO: handle multiple text content.
return TextMessage(content=text_content[0].text.value, source=self.metadata["type"])
def save_state(self) -> Mapping[str, Any]:
return {
"assistant_id": self._assistant_id,
"thread_id": self._thread_id,
}
def load_state(self, state: Mapping[str, Any]) -> None:
self._assistant_id = state["assistant_id"]
self._thread_id = state["thread_id"]

View File

@ -1,33 +0,0 @@
import asyncio
from autogen_core.base import MessageContext
from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler
from ..types import PublishNow, TextMessage
class UserProxyAgent(RoutedAgent):
"""An agent that proxies user input from the console. Override the `get_user_input`
method to customize how user input is retrieved.
Args:
description (str): The description of the agent.
user_input_prompt (str): The console prompt to show to the user when asking for input.
"""
def __init__(self, description: str, user_input_prompt: str) -> None:
super().__init__(description)
self._user_input_prompt = user_input_prompt
@message_handler()
async def on_publish_now(self, message: PublishNow, ctx: MessageContext) -> None:
"""Handle a publish now message. This method prompts the user for input, then publishes it."""
user_input = await self.get_user_input(self._user_input_prompt)
await self.publish_message(
TextMessage(content=user_input, source=self.metadata["type"]), topic_id=DefaultTopicId()
)
async def get_user_input(self, prompt: str) -> str:
"""Get user input from the console. Override this method to customize how user input is retrieved."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)