Initial work on ledger orchestrator.

This commit is contained in:
Adam Fourney 2024-11-12 16:38:38 -08:00
parent 51b361dfcf
commit dc172b2d0b
5 changed files with 408 additions and 0 deletions

View File

@ -2,10 +2,12 @@ from ._group_chat._base_group_chat import BaseGroupChat
from ._group_chat._round_robin_group_chat import RoundRobinGroupChat
from ._group_chat._selector_group_chat import SelectorGroupChat
from ._group_chat._swarm_group_chat import Swarm
from ._group_chat._ledger_orchestrator import OrchestratorGroupChat
__all__ = [
"BaseGroupChat",
"RoundRobinGroupChat",
"SelectorGroupChat",
"Swarm",
"OrchestratorGroupChat",
]

View File

@ -0,0 +1,5 @@
from ._ledger_orchestrator import OrchestratorGroupChat
__all__ = [
"OrchestratorGroupChat",
]

View File

@ -0,0 +1,63 @@
import logging
import re
from typing import Callable, Dict, List, Sequence
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from .... import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME
from ....base import ChatAgent, TerminationCondition
from ....messages import (
AgentMessage,
HandoffMessage,
MultiModalMessage,
StopMessage,
TextMessage,
ToolCallMessage,
ToolCallResultMessage,
)
from .._base_group_chat import BaseGroupChat
from .._base_group_chat_manager import BaseGroupChatManager
from ._ledger_orchestrator_manager import LedgerOrchestratorManager
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)
event_logger = logging.getLogger(EVENT_LOGGER_NAME)
class OrchestratorGroupChat(BaseGroupChat):
def __init__(
self,
participants: List[ChatAgent],
model_client: ChatCompletionClient,
*,
termination_condition: TerminationCondition | None = None,
):
super().__init__(
participants, group_chat_manager_class=LedgerOrchestratorManager, termination_condition=termination_condition
)
# Validate the participants.
if len(participants) == 0:
raise ValueError("At least one participant is required for OrchestratorGroupChat.")
self._model_client = model_client
def _create_group_chat_manager_factory(
self,
group_topic_type: str,
output_topic_type: str,
participant_topic_types: List[str],
participant_descriptions: List[str],
termination_condition: TerminationCondition | None,
) -> Callable[[], LedgerOrchestratorManager]:
# TODO: Do something about the termination conditions
return lambda: LedgerOrchestratorManager(
group_topic_type,
output_topic_type,
participant_topic_types,
participant_descriptions,
self._model_client,
)

View File

@ -0,0 +1,212 @@
import json
from abc import ABC, abstractmethod
from typing import Any, List
from autogen_core.base import MessageContext
from autogen_core.components import DefaultTopicId, event
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from ....base import TerminationCondition, Response
from ....messages import (
TextMessage,
AgentMessage,
StopMessage,
)
from .._events import (
GroupChatAgentResponse,
GroupChatRequestPublish,
GroupChatReset,
GroupChatStart,
GroupChatTermination,
GroupChatMessage,
)
#from .._base_group_chat_manager import BaseGroupChatManager
from .._sequential_routed_agent import SequentialRoutedAgent
from ._prompts import (
ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT,
ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT,
ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT,
ORCHESTRATOR_PROGRESS_LEDGER_PROMPT,
)
#class LedgerOrchestratorManager(BaseGroupChatManager):
class LedgerOrchestratorManager(SequentialRoutedAgent, ABC):
def __init__(
self,
group_topic_type: str,
output_topic_type: str,
participant_topic_types: List[str],
participant_descriptions: List[str],
model_client: ChatCompletionClient,
):
super().__init__(description="Group chat manager")
self._group_topic_type = group_topic_type
self._output_topic_type = output_topic_type
if len(participant_topic_types) != len(participant_descriptions):
raise ValueError("The number of participant topic types, agent types, and descriptions must be the same.")
if len(set(participant_topic_types)) != len(participant_topic_types):
raise ValueError("The participant topic types must be unique.")
if group_topic_type in participant_topic_types:
raise ValueError("The group topic type must not be in the participant topic types.")
self._participant_topic_types = participant_topic_types
self._participant_descriptions = participant_descriptions
self._message_thread: List[AgentMessage] = []
self._name = "orchestrator"
self._model_client = model_client
self._task = None
self._facts = None
self._plan = None
self._team_description = "\n".join(
[
f"{topic_type}: {description}".strip()
for topic_type, description in zip(
self._participant_topic_types, self._participant_descriptions, strict=True
)
]
)
def _get_task_ledger_facts_prompt(self, task: str) -> str:
return ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT.format(task=task)
def _get_task_ledger_plan_prompt(self, team: str) -> str:
return ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT.format(team=team)
def _get_task_ledger_full_prompt(self, task: str, team: str, facts: str, plan: str) -> str:
return ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT.format(task=task, team=team, facts=facts, plan=plan)
def _get_progress_ledger_prompt(self, task: str, team: str, names: List[str]) -> str:
return ORCHESTRATOR_PROGRESS_LEDGER_PROMPT.format(task=task, team=team, names=", ".join(names))
@event
async def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None:
"""Handle the start of a group chat by selecting a speaker to start the conversation."""
assert message is not None
# Log the start message.
await self.publish_message(message, topic_id=DefaultTopicId(type=self._output_topic_type))
# Create the initial task ledger
#################################
self._task = message.message.content
planning_conversation = []
# 1. GATHER FACTS
# create a closed book task and generate a response and update the chat history
planning_conversation.append(
UserMessage(content=self._get_task_ledger_facts_prompt(self._task), source=self._name)
)
response = await self._model_client.create(planning_conversation)
assert isinstance(response.content, str)
self._facts = response.content
planning_conversation.append(AssistantMessage(content=self._facts, source=self._name))
# 2. CREATE A PLAN
## plan based on available information
planning_conversation.append(
UserMessage(content=self._get_task_ledger_plan_prompt(self._team_description), source=self._name)
)
response = await self._model_client.create(planning_conversation)
assert isinstance(response.content, str)
self._plan = response.content
# Kick things off
await self._reenter_inner_loop()
@event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
self._message_thread.append(message.agent_response.chat_message)
await self._orchestrate_step()
@event
async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> None:
# Reset the group chat manager.
await self.reset()
async def select_speaker(self, thread: List[AgentMessage]) -> str:
"""Select a speaker from the participants and return the
topic type of the selected speaker."""
return ""
async def reset(self) -> None:
"""Reset the group chat manager."""
pass
async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:
raise ValueError(f"Unhandled message in group chat manager: {type(message)}")
async def _reenter_inner_loop(self):
# TODO: Reset the agents
# Broadcast the new plan
ledger_message = TextMessage(
content=self._get_task_ledger_full_prompt(self._task, self._team_description, self._facts, self._plan),
source=self._name
)
self._message_thread.append(ledger_message) # My copy
await self.publish_message( # Broadcast
GroupChatAgentResponse(agent_response=Response(chat_message=ledger_message)),
topic_id=DefaultTopicId(type=self._group_topic_type),
)
# Restart the inner loop
await self._orchestrate_step()
async def _orchestrate_step(self) -> None:
# Update the progress ledger
context = []
for m in self._message_thread:
if m.source == self._name:
context.append(AssistantMessage(content=m.content, source=m.source))
else:
context.append(UserMessage(content=m.content, source=m.source))
progress_ledger_prompt = self._get_progress_ledger_prompt(self._task, self._team_description, self._participant_topic_types)
context.append(UserMessage(content=progress_ledger_prompt, source=self._name))
response = await self._model_client.create(
context,
json_output=True
)
progress_ledger = json.loads(response.content)
# Broadcst the next step
message = TextMessage(
content=progress_ledger["instruction_or_question"]["answer"],
source=self._name
)
self._message_thread.append(message) # My copy
# Log it
await self.publish_message(
GroupChatMessage(message=message),
topic_id=DefaultTopicId(type=self._output_topic_type),
)
# Broadcast it
await self.publish_message( # Broadcast
GroupChatAgentResponse(agent_response=Response(chat_message=message)),
topic_id=DefaultTopicId(type=self._group_topic_type),
)
# Request that it be completed
next_speaker = progress_ledger["next_speaker"]["answer"]
await self.publish_message(GroupChatRequestPublish(), topic_id=DefaultTopicId(type=next_speaker))

View File

@ -0,0 +1,126 @@
ORCHESTRATOR_SYSTEM_MESSAGE = ""
ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT = """Below I will present you a request. Before we begin addressing the request, please answer the following pre-survey to the best of your ability. Keep in mind that you are Ken Jennings-level with trivia, and Mensa-level with puzzles, so there should be a deep well to draw from.
Here is the request:
{task}
Here is the pre-survey:
1. Please list any specific facts or figures that are GIVEN in the request itself. It is possible that there are none.
2. Please list any facts that may need to be looked up, and WHERE SPECIFICALLY they might be found. In some cases, authoritative sources are mentioned in the request itself.
3. Please list any facts that may need to be derived (e.g., via logical deduction, simulation, or computation)
4. Please list any facts that are recalled from memory, hunches, well-reasoned guesses, etc.
When answering this survey, keep in mind that "facts" will typically be specific names, dates, statistics, etc. Your answer should use headings:
1. GIVEN OR VERIFIED FACTS
2. FACTS TO LOOK UP
3. FACTS TO DERIVE
4. EDUCATED GUESSES
DO NOT include any other headings or sections in your response. DO NOT list next steps or plans until asked to do so.
"""
ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT = """Fantastic. To address this request we have assembled the following team:
{team}
Based on the team composition, and known and unknown facts, please devise a short bullet-point plan for addressing the original request. Remember, there is no requirement to involve all team members -- a team member's particular expertise may not be needed for this task."""
ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT = """
We are working to address the following user request:
{task}
To answer this request we have assembled the following team:
{team}
Here is an initial fact sheet to consider:
{facts}
Here is the plan to follow as best as possible:
{plan}
"""
ORCHESTRATOR_PROGRESS_LEDGER_PROMPT = """
Recall we are working on the following request:
{task}
And we have assembled the following team:
{team}
To make progress on the request, please answer the following questions, including necessary reasoning:
- Is the request fully satisfied? (True if complete, or False if the original request has yet to be SUCCESSFULLY and FULLY addressed)
- Are we in a loop where we are repeating the same requests and / or getting the same responses as before? Loops can span multiple turns, and can include repeated actions like scrolling up or down more than a handful of times.
- Are we making forward progress? (True if just starting, or recent messages are adding value. False if recent messages show evidence of being stuck in a loop or if there is evidence of significant barriers to success such as the inability to read from a required file)
- Who should speak next? (select from: {names})
- What instruction or question would you give this team member? (Phrase as if speaking directly to them, and include any specific information they may need)
Please output an answer in pure JSON format according to the following schema. The JSON object must be parsable as-is. DO NOT OUTPUT ANYTHING OTHER THAN JSON, AND DO NOT DEVIATE FROM THIS SCHEMA:
{{
"is_request_satisfied": {{
"reason": string,
"answer": boolean
}},
"is_in_loop": {{
"reason": string,
"answer": boolean
}},
"is_progress_being_made": {{
"reason": string,
"answer": boolean
}},
"next_speaker": {{
"reason": string,
"answer": string (select from: {names})
}},
"instruction_or_question": {{
"reason": string,
"answer": string
}}
}}
"""
ORCHESTRATOR_UPDATE_FACTS_PROMPT = """As a reminder, we are working to solve the following task:
{task}
It's clear we aren't making as much progress as we would like, but we may have learned something new. Please rewrite the following fact sheet, updating it to include anything new we have learned that may be helpful. Example edits can include (but are not limited to) adding new guesses, moving educated guesses to verified facts if appropriate, etc. Updates may be made to any section of the fact sheet, and more than one section of the fact sheet can be edited. This is an especially good time to update educated guesses, so please at least add or update one educated guess or hunch, and explain your reasoning.
Here is the old fact sheet:
{facts}
"""
ORCHESTRATOR_UPDATE_PLAN_PROMPT = """Please briefly explain what went wrong on this last run (the root cause of the failure), and then come up with a new plan that takes steps and/or includes hints to overcome prior challenges and especially avoids repeating the same mistakes. As before, the new plan should be concise, be expressed in bullet-point form, and consider the following team composition (do not involve any other outside people since we cannot contact anyone else):
{team}
"""
ORCHESTRATOR_GET_FINAL_ANSWER = """
We are working on the following task:
{task}
We have completed the task.
The above messages contain the conversation that took place to complete the task.
Based on the information gathered, provide the final answer to the original request.
The answer should be phrased as if you were speaking to the user.
"""