Refactoring the services and implementing an in-memory runtime for .NET (#4005)

closes #3950 closes #3702

What this is doing:

I am refactoring the services on the .NET runtime and attempting to clarify the naming and organization.
I added this doc to help capture the naming and concepts.
AgentRuntime / Worker should work similar to the python version and enables running the whole agent system in one process. For remote the system uses the versions of the services in the grpc folder.
lots of other bug fixes/threading cleanup - passing cancellation token throughout
Services update clarifies the naming and roles:

Worker: Hosts the Agents and is a client to the Gateway
Gateway:
-- RPC gateway for the other services APIs
-- Provides an RPC bridge between the workers and the Event Bus
Registry: keeps track of the agents in the system and which events they can handle
AgentState: persistent state for agents
This commit is contained in:
Ryan Sweet 2024-11-12 11:04:59 -08:00 committed by GitHub
parent 51b361dfcf
commit 458d273fc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
72 changed files with 920 additions and 707 deletions

View File

@ -195,13 +195,13 @@ public class HelloAgent(
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
await PublishEventAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
await PublishEventAsync(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
@ -210,7 +210,7 @@ public class HelloAgent(
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
await PublishEventAsync(evt).ConfigureAwait(false);
await Task.Delay(60000);
await App.ShutdownAsync();
}

View File

@ -0,0 +1,26 @@
# AutoGen Services
## Overview
Each AutoGen agent system has one or more Agent Workers and a set of services for managing/supporting the agents. The services and workers can all be hosted in the same process or in a distributed system. When in the same process communication and event delivery is in-memory. When distributed, workers communicate with the service over gRPC. In all cases, events are packaged as CloudEvents. There are multiple options for the backend services:
- In-Memory: the Agent Workers and Services are all hosted in the same process and communicate over in-memory channels. Available for python and .NET.
- Python only: Agent workers communicate with a python hosted service that implements an in-memory message bus and agent registry.
- Micrososft Orleans: a distributed actor system that can host the services and workers, enables distributed state with persistent storage, can leverage multiple event bus types, and cross-language agent communication.
- *Roadmap: support for other languages distributed systems such as dapr or Akka.*
The Services in the system include:
- Worker: Hosts the Agents and is a client to the Gateway
- Gateway:
-- RPC gateway for the other services APIs
-- Provides an RPC bridge between the workers and the Event Bus
-- Message Session state (track message queues/delivery)
- Registry: keeps track of the {agents:agent types}:{Subscription/Topics} in the system and which events they can handle
-- *Roadmap: add lookup api in gateway*
- AgentState: persistent state for agents
- Routing: delivers events to agents based on their subscriptions+topics
-- *Roadmap: add subscription management APIs*
- *Roadmap: Management APIs for the Agent System*
- *Roadmap: Scheduling: manages placement of agents*
- *Roadmap: Discovery: allows discovery of agents and services*

View File

@ -84,10 +84,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Abstracti
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Extensions.SemanticKernel", "src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj", "{952827D4-8D4C-4327-AE4D-E8D25811EF35}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Runtime", "src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj", "{A905E29A-7110-497F-ADC5-2CE2A148FEA0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AgentChat", "AgentChat", "{668726B9-77BC-45CF-B576-0F0773BF1615}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AutoGen.Anthropic.Samples", "samples\AutoGen.Anthropic.Samples\AutoGen.Anthropic.Samples.csproj", "{84020C4A-933A-4693-9889-1B99304A7D76}"
@ -128,12 +124,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgent", "samples\Hello
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensions", "src\Microsoft.AutoGen\Extensions\AIModelClientHostingExtensions\AIModelClientHostingExtensions.csproj", "{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Agents.Tests", "Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\Extensions\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{65059914-5527-4A00-9308-9FAF23D5E85A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Agents.Tests", "test\Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{394FDAF8-74F9-4977-94A5-3371737EB774}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -264,14 +262,6 @@ Global
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.Build.0 = Debug|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.ActiveCfg = Release|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.Build.0 = Release|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.Build.0 = Release|Any CPU
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.Build.0 = Release|Any CPU
{84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{84020C4A-933A-4693-9889-1B99304A7D76}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -340,14 +330,18 @@ Global
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU
{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.Build.0 = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU
{65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.Build.0 = Release|Any CPU
{394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.Build.0 = Debug|Any CPU
{394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.ActiveCfg = Release|Any CPU
{394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -385,8 +379,6 @@ Global
{FD87BD33-4616-460B-AC85-A412BA08BB78} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{952827D4-8D4C-4327-AE4D-E8D25811EF35} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{A905E29A-7110-497F-ADC5-2CE2A148FEA0} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{668726B9-77BC-45CF-B576-0F0773BF1615} = {686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}
{84020C4A-933A-4693-9889-1B99304A7D76} = {668726B9-77BC-45CF-B576-0F0773BF1615}
{5777515F-4053-42F9-AF2B-95D8D0F5384A} = {668726B9-77BC-45CF-B576-0F0773BF1615}
@ -407,8 +399,9 @@ Global
{A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{CF4C92BD-28AE-4B8F-B173-601004AEC9BF} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{65059914-5527-4A00-9308-9FAF23D5E85A} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{394FDAF8-74F9-4977-94A5-3371737EB774} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}

View File

@ -63,11 +63,17 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Debug" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="8.2.1" />
<PackageVersion Include="Microsoft.Orleans.Clustering.Cosmos" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.CodeGenerator" Version="8.2.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageVersion>
<PackageVersion Include="Microsoft.Orleans.Core.Abstractions" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Persistence.Cosmos" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Reminders" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Reminders.Cosmos" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Runtime" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Sdk" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Serialization" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Serialization.Protobuf" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Server" Version="8.2.0" />
<PackageVersion Include="Microsoft.Orleans.Streaming" Version="8.2.0" />

View File

@ -1,4 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
@ -6,10 +9,6 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>

View File

@ -1,5 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
var app = await Microsoft.AutoGen.Runtime.Host.StartAsync(local: true);
using Microsoft.Extensions.Hosting;
var app = await Microsoft.AutoGen.Agents.Host.StartAsync(local: false, useGrpc: true);
await app.WaitForShutdownAsync();

View File

@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Orleans": "Warning"
}
}
}

View File

@ -8,11 +8,13 @@ using Microsoft.Extensions.AI;
namespace Hello;
[TopicSubscription("HelloAgents")]
public class HelloAIAgent(
IAgentContext context,
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
IHostApplicationLifetime hostApplicationLifetime,
IChatClient client) : HelloAgent(
context,
typeRegistry),
typeRegistry,
hostApplicationLifetime),
IHandle<NewMessageReceived>
{
// This Handle supercedes the one in the base class

View File

@ -13,7 +13,6 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Extensions\AIModelClientHostingExtensions\AIModelClientHostingExtensions.csproj" />
</ItemGroup>
</Project>

View File

@ -32,8 +32,9 @@ namespace Hello
{
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
IHostApplicationLifetime hostApplicationLifetime) : ConsoleAgent(
context,
typeRegistry),
ISayHello,
@ -65,7 +66,7 @@ namespace Hello
await PublishMessageAsync(evt).ConfigureAwait(false);
//sleep30 seconds
await Task.Delay(30000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);
hostApplicationLifetime.StopApplication();
}
public async Task<string> SayHello(string ask)

View File

@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Orleans": "Warning"
}
}
}

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
@ -14,6 +14,5 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
</ItemGroup>
</Project>

View File

@ -3,6 +3,8 @@
using Microsoft.AutoGen.Abstractions;
using Microsoft.AutoGen.Agents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
// step 1: create in-memory agent runtime
@ -16,7 +18,7 @@ using Microsoft.AutoGen.Agents;
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: false);
}, local: true);
await app.WaitForShutdownAsync();
@ -24,9 +26,8 @@ namespace Hello
{
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
IHostApplicationLifetime hostApplicationLifetime) : AgentBase(
IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase(
context,
typeRegistry),
ISayHello,

View File

@ -25,11 +25,11 @@ Flow Diagram:
```mermaid
%%{init: {'theme':'forest'}}%%
graph LR;
A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent]
A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent]
C --> D{"WriteConsole()"}
B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEvent(Output('***Goodbye***'))"| C
B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEventAsync(Output('***Goodbye***'))"| C
E --> F{"Shutdown()"}
```
@ -58,13 +58,13 @@ public class HelloAgent(
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
await PublishEventAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
await PublishEventAsync(goodbye).ConfigureAwait(false);
}
```
@ -109,7 +109,6 @@ message ReadmeRequested {
}
```
```xml
<ItemGroup>
<PackageReference Include="Google.Protobuf" />

View File

@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Orleans": "Warning"
}
}
}

View File

@ -14,6 +14,5 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using System.Text.Json;
using Microsoft.AutoGen.Abstractions;
using Microsoft.AutoGen.Agents;
@ -8,7 +9,7 @@ using Microsoft.AutoGen.Agents;
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: false);
}, local: true);
await app.WaitForShutdownAsync();
@ -16,13 +17,15 @@ namespace Hello
{
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
IAgentRuntime context,
IHostApplicationLifetime hostApplicationLifetime,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase(
context,
typeRegistry),
ISayHello,
IHandleConsole,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
IHandle<ConversationClosed>,
IHandle<Shutdown>
{
private AgentState? State { get; set; }
public async Task Handle(NewMessageReceived item)
@ -32,11 +35,15 @@ namespace Hello
{
Message = response
};
var entry = "We said hello to " + item.Message;
await Store(new AgentState
Dictionary<string, string> state = new()
{
{ "data", "We said hello to " + item.Message },
{ "workflow", "Active" }
};
await StoreAsync(new AgentState
{
AgentId = this.AgentId,
TextData = entry
TextData = JsonSerializer.Serialize(state)
}).ConfigureAwait(false);
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
@ -45,21 +52,40 @@ namespace Hello
UserMessage = "Goodbye"
};
await PublishMessageAsync(goodbye).ConfigureAwait(false);
// send the shutdown message
await PublishMessageAsync(new Shutdown { Message = this.AgentId.Key }).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
State = await Read<AgentState>(this.AgentId).ConfigureAwait(false);
var read = State?.TextData ?? "No state data found";
var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************";
State = await ReadAsync<AgentState>(this.AgentId).ConfigureAwait(false);
var state = JsonSerializer.Deserialize<Dictionary<string, string>>(State.TextData) ?? new Dictionary<string, string> { { "data", "No state data found" } };
var goodbye = $"\nState: {state}\n********************* {item.UserId} said {item.UserMessage} ************************";
var evt = new Output
{
Message = goodbye
};
await PublishMessageAsync(evt).ConfigureAwait(false);
//sleep
await Task.Delay(10000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);
await PublishMessageAsync(evt).ConfigureAwait(true);
state["workflow"] = "Complete";
await StoreAsync(new AgentState
{
AgentId = this.AgentId,
TextData = JsonSerializer.Serialize(state)
}).ConfigureAwait(false);
}
public async Task Handle(Shutdown item)
{
string? workflow = null;
// make sure the workflow is finished
while (workflow != "Complete")
{
State = await ReadAsync<AgentState>(this.AgentId).ConfigureAwait(true);
var state = JsonSerializer.Deserialize<Dictionary<string, string>>(State?.TextData ?? "{}") ?? new Dictionary<string, string>();
workflow = state["workflow"];
await Task.Delay(1000).ConfigureAwait(true);
}
// now we can shut down...
hostApplicationLifetime.StopApplication();
}
public async Task<string> SayHello(string ask)
{
@ -67,8 +93,4 @@ namespace Hello
return response;
}
}
public interface ISayHello
{
public Task<string> SayHello(string ask);
}
}

View File

@ -25,11 +25,11 @@ Flow Diagram:
```mermaid
%%{init: {'theme':'forest'}}%%
graph LR;
A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent]
A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent]
C --> D{"WriteConsole()"}
B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEvent(Output('***Goodbye***'))"| C
B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEventAsync(Output('***Goodbye***'))"| C
E --> F{"Shutdown()"}
```
@ -58,13 +58,13 @@ public class HelloAgent(
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
await PublishEventAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
await PublishEventAsync(goodbye).ConfigureAwait(false);
}
```

View File

@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Orleans": "Warning"
}
}
}

View File

@ -9,8 +9,8 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
</ItemGroup>
</Project>

View File

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using Microsoft.AutoGen.Runtime;
using Microsoft.AutoGen.Agents;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();

View File

@ -9,8 +9,8 @@
<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
<ProjectReference Include="..\DevTeam.Shared\DevTeam.Shared.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj" />
</ItemGroup>

View File

@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory;
namespace DevTeam.Agents;
[TopicSubscription("devteam")]
public class Dev(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<Dev> logger)
public class Dev(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<Dev> logger)
: SKAiAgent<DeveloperState>(context, memory, kernel, typeRegistry), IDevelopApps,
IHandle<CodeGenerationRequested>,
IHandle<CodeChainClosed>

View File

@ -11,7 +11,7 @@ using Microsoft.SemanticKernel.Memory;
namespace DevTeam.Agents;
[TopicSubscription("devteam")]
public class DeveloperLead(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<DeveloperLead> logger)
public class DeveloperLead(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<DeveloperLead> logger)
: SKAiAgent<DeveloperLeadState>(context, memory, kernel, typeRegistry), ILeadDevelopers,
IHandle<DevPlanRequested>,
IHandle<DevPlanChainClosed>

View File

@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory;
namespace DevTeam.Agents;
[TopicSubscription("devteam")]
public class ProductManager(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<ProductManager> logger)
public class ProductManager(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<ProductManager> logger)
: SKAiAgent<ProductManagerState>(context, memory, kernel, typeRegistry), IManageProducts,
IHandle<ReadmeChainClosed>,
IHandle<ReadmeRequested>

View File

@ -9,7 +9,7 @@ using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Memory;
namespace Microsoft.AI.DevTeam;
public class AzureGenie(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService)
public class AzureGenie(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService)
: SKAiAgent<object>(context, memory, kernel, typeRegistry),
IHandle<ReadmeCreated>,
IHandle<CodeCreated>

View File

@ -12,7 +12,7 @@ using Microsoft.SemanticKernel.Memory;
namespace Microsoft.AI.DevTeam;
public class Hubber(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService)
public class Hubber(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService)
: SKAiAgent<object>(context, memory, kernel, typeRegistry),
IHandle<NewAsk>,
IHandle<ReadmeGenerated>,

View File

@ -29,7 +29,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj" />
<ProjectReference Include="..\DevTeam.Shared\DevTeam.Shared.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj" />
</ItemGroup>

View File

@ -9,13 +9,14 @@ public interface IAgentBase
{
// Properties
AgentId AgentId { get; }
IAgentContext Context { get; }
IAgentRuntime Context { get; }
// Methods
Task CallHandler(CloudEvent item);
Task<RpcResponse> HandleRequest(RpcRequest request);
void ReceiveMessage(Message message);
Task Store(AgentState state);
Task<T> Read<T>(AgentId agentId) where T : IMessage, new();
ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default);
Task StoreAsync(AgentState state, CancellationToken cancellationToken = default);
Task<T> ReadAsync<T>(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new();
ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default);
}

View File

@ -1,20 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentContext.cs
// IAgentRuntime.cs
using System.Diagnostics;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Abstractions;
public interface IAgentContext
public interface IAgentRuntime
{
AgentId AgentId { get; }
IAgentBase? AgentInstance { get; set; }
DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator.
ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger.
ValueTask Store(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken = default);
ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default);
void Update(Activity? activity, RpcRequest request);
void Update(Activity? activity, CloudEvent cloudEvent);
(string?, string?) GetTraceIDandState(IDictionary<string, string> metadata);
IDictionary<string, string> ExtractMetadata(IDictionary<string, string> metadata);
}

View File

@ -1,11 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IWorkerAgentGrain.cs
// IAgentState.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
internal interface IWorkerAgentGrain : IGrainWithStringKey
public interface IAgentState
{
ValueTask<AgentState> ReadStateAsync();
ValueTask<string> WriteStateAsync(AgentState state, string eTag);

View File

@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentWorker.cs
namespace Microsoft.AutoGen.Abstractions;
public interface IAgentWorker
{
ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default);
ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default);
}

View File

@ -1,13 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentWorkerRuntime.cs
namespace Microsoft.AutoGen.Abstractions;
public interface IAgentWorkerRuntime
{
ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken);
ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken);
ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken);
ValueTask Store(AgentState value, CancellationToken cancellationToken);
ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken);
}

View File

@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IConnection.cs
namespace Microsoft.AutoGen.Abstractions;
public interface IConnection
{
}

View File

@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IGateway.cs
namespace Microsoft.AutoGen.Abstractions;
public interface IGateway : IGrainObserver
{
ValueTask<RpcResponse> InvokeRequest(RpcRequest request);
ValueTask BroadcastEvent(CloudEvent evt);
ValueTask StoreAsync(AgentState value);
ValueTask<AgentState> ReadAsync(AgentId agentId);
Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent);
}

View File

@ -20,7 +20,8 @@
<PackageReference Include="Grpc.AspNetCore" />
<PackageReference Include="Grpc.Net.ClientFactory" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" />
<PackageReference Include="Microsoft.Orleans.Sdk" />
<PackageReference Include="Microsoft.SemanticKernel" />
</ItemGroup>
</Project>

View File

@ -20,21 +20,24 @@ public abstract class AgentBase : IAgentBase, IHandle
private readonly Dictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests = [];
private readonly Channel<object> _mailbox = Channel.CreateUnbounded<object>();
private readonly IAgentContext _context;
private readonly IAgentRuntime _context;
public string Route { get; set; } = "base";
protected internal ILogger Logger => _context.Logger;
public IAgentContext Context => _context;
protected internal ILogger<AgentBase> _logger;
public IAgentRuntime Context => _context;
protected readonly EventTypes EventTypes;
protected AgentBase(IAgentContext context, EventTypes eventTypes)
protected AgentBase(
IAgentRuntime context,
EventTypes eventTypes,
ILogger<AgentBase>? logger = null)
{
_context = context;
context.AgentInstance = this;
this.EventTypes = eventTypes;
_logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger<AgentBase>();
Completion = Start();
}
internal Task Completion { get; }
internal Task Start()
@ -58,7 +61,6 @@ public abstract class AgentBase : IAgentBase, IHandle
}
}
}
public void ReceiveMessage(Message message) => _mailbox.Writer.TryWrite(message);
private async Task RunMessagePump()
@ -71,7 +73,7 @@ public abstract class AgentBase : IAgentBase, IHandle
switch (message)
{
case Message msg:
await HandleRpcMessage(msg).ConfigureAwait(false);
await HandleRpcMessage(msg, new CancellationToken()).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException($"Unexpected message '{message}'.");
@ -79,12 +81,11 @@ public abstract class AgentBase : IAgentBase, IHandle
}
catch (Exception ex)
{
_context.Logger.LogError(ex, "Error processing message.");
_logger.LogError(ex, "Error processing message.");
}
}
}
protected internal async Task HandleRpcMessage(Message msg)
protected internal async Task HandleRpcMessage(Message msg, CancellationToken cancellationToken = default)
{
switch (msg.MessageCase)
{
@ -95,17 +96,17 @@ public abstract class AgentBase : IAgentBase, IHandle
static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item),
(this, msg.CloudEvent),
activity,
msg.CloudEvent.Type).ConfigureAwait(false);
msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false);
}
break;
case Message.MessageOneofCase.Request:
{
var activity = this.ExtractActivity(msg.Request.Method, msg.Request.Metadata);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCore(state.Request),
static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCoreAsync(state.Request),
(this, msg.Request),
activity,
msg.Request.Method).ConfigureAwait(false);
msg.Request.Method, cancellationToken).ConfigureAwait(false);
}
break;
case Message.MessageOneofCase.Response:
@ -113,14 +114,14 @@ public abstract class AgentBase : IAgentBase, IHandle
break;
}
}
public async Task Store(AgentState state)
public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default)
{
await _context.Store(state).ConfigureAwait(false);
await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false);
return;
}
public async Task<T> Read<T>(AgentId agentId) where T : IMessage, new()
public async Task<T> ReadAsync<T>(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new()
{
var agentstate = await _context.Read(agentId).ConfigureAwait(false);
var agentstate = await _context.ReadAsync(agentId, cancellationToken).ConfigureAwait(false);
return agentstate.FromAgentState<T>();
}
private void OnResponseCore(RpcResponse response)
@ -137,7 +138,7 @@ public abstract class AgentBase : IAgentBase, IHandle
completion.SetResult(response);
}
private async Task OnRequestCore(RpcRequest request)
private async Task OnRequestCoreAsync(RpcRequest request, CancellationToken cancellationToken = default)
{
RpcResponse response;
@ -149,8 +150,7 @@ public abstract class AgentBase : IAgentBase, IHandle
{
response = new RpcResponse { Error = ex.Message };
}
await _context.SendResponseAsync(request, response).ConfigureAwait(false);
await _context.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false);
}
protected async Task<RpcResponse> RequestAsync(AgentId target, string method, Dictionary<string, string> parameters)
@ -174,7 +174,7 @@ public abstract class AgentBase : IAgentBase, IHandle
activity?.SetTag("peer.service", target.ToString());
var completion = new TaskCompletionSource<RpcResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
Context.DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
_context.Update(activity, request);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource<RpcResponse>) state) =>
{
@ -204,13 +204,13 @@ public abstract class AgentBase : IAgentBase, IHandle
await PublishEventAsync(evt, token).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default)
public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default)
{
var activity = s_source.StartActivity($"PublishEventAsync '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");
// TODO: fix activity
Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
_context.Update(activity, item);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, CloudEvent Event) state) =>
{
@ -218,7 +218,7 @@ public abstract class AgentBase : IAgentBase, IHandle
},
(this, item),
activity,
item.Type).ConfigureAwait(false);
item.Type, cancellationToken).ConfigureAwait(false);
}
public Task CallHandler(CloudEvent item)
@ -251,7 +251,7 @@ public abstract class AgentBase : IAgentBase, IHandle
}
catch (Exception ex)
{
Logger.LogError(ex, $"Error invoking method {nameof(IHandle<object>.Handle)}");
_logger.LogError(ex, $"Error invoking method {nameof(IHandle<object>.Handle)}");
throw; // TODO: ?
}
}
@ -262,6 +262,7 @@ public abstract class AgentBase : IAgentBase, IHandle
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });
//TODO: should this be async and cancellable?
public virtual Task HandleObject(object item)
{
// get all Handle<T> methods
@ -279,4 +280,8 @@ public abstract class AgentBase : IAgentBase, IHandle
// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}

View File

@ -9,17 +9,8 @@ public static class AgentBaseExtensions
{
public static Activity? ExtractActivity(this AgentBase agent, string activityName, IDictionary<string, string> metadata)
{
Activity? activity = null;
agent.Context.DistributedContextPropagator.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
},
out var traceParent,
out var traceState);
Activity? activity;
(var traceParent, var traceState) = agent.Context.GetTraceIDandState(metadata);
if (!string.IsNullOrEmpty(traceParent))
{
if (ActivityContext.TryParse(traceParent, traceState, isRemote: true, out ActivityContext parentContext))
@ -40,12 +31,7 @@ public static class AgentBaseExtensions
activity.TraceStateString = traceState;
}
var baggage = agent.Context.DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
});
var baggage = agent.Context.ExtractMetadata(metadata);
if (baggage is not null)
{
@ -63,7 +49,7 @@ public static class AgentBaseExtensions
return activity;
}
public static async Task InvokeWithActivityAsync<TState>(this AgentBase agent, Func<TState, Task> func, TState state, Activity? activity, string methodName)
public static async Task InvokeWithActivityAsync<TState>(this AgentBase agent, Func<TState, Task> func, TState state, Activity? activity, string methodName, CancellationToken cancellationToken = default)
{
if (activity is not null && activity.StartTimeUtc == default)
{

View File

@ -1,39 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentContext.cs
using System.Diagnostics;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime, ILogger<AgentBase> logger, DistributedContextPropagator distributedContextPropagator) : IAgentContext
{
private readonly IAgentWorkerRuntime _runtime = runtime;
public AgentId AgentId { get; } = agentId;
public ILogger Logger { get; } = logger;
public IAgentBase? AgentInstance { get; set; }
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken)
{
response.RequestId = request.RequestId;
await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
{
await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken)
{
await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false);
}
public async ValueTask Store(AgentState value, CancellationToken cancellationToken)
{
await _runtime.Store(value, cancellationToken).ConfigureAwait(false);
}
public ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
{
return _runtime.Read(agentId, cancellationToken);
}
}

View File

@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentRuntime.cs
using System.Diagnostics;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger<AgentBase> logger, DistributedContextPropagator distributedContextPropagator) : IAgentRuntime
{
private readonly IAgentWorker worker = worker;
public AgentId AgentId { get; } = agentId;
public ILogger<AgentBase> Logger { get; } = logger;
public IAgentBase? AgentInstance { get; set; }
private DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public (string?, string?) GetTraceIDandState(IDictionary<string, string> metadata)
{
DistributedContextPropagator.ExtractTraceIdAndState(metadata,
static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
},
out var traceParent,
out var traceState);
return (traceParent, traceState);
}
public void Update(Activity? activity, RpcRequest request)
{
DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
}
public void Update(Activity? activity, CloudEvent cloudEvent)
{
DistributedContextPropagator.Inject(activity, cloudEvent.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
}
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default)
{
response.RequestId = request.RequestId;
await worker.SendResponseAsync(response, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default)
{
await worker.SendRequestAsync(agent, request, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
await worker.PublishEventAsync(@event, cancellationToken).ConfigureAwait(false);
}
public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
await worker.StoreAsync(value, cancellationToken).ConfigureAwait(false);
}
public ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
{
return worker.ReadAsync(agentId, cancellationToken);
}
public IDictionary<string, string> ExtractMetadata(IDictionary<string, string> metadata)
{
var baggage = DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable<string>? fieldValues) =>
{
var metadata = (IDictionary<string, string>)carrier!;
fieldValues = null;
metadata.TryGetValue(fieldName, out fieldValue);
});
return baggage as IDictionary<string, string> ?? new Dictionary<string, string>();
}
}

View File

@ -1,21 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentWorker.cs
using System.Diagnostics;
using Google.Protobuf;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public sealed class AgentWorker(IAgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator,
[FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger<AgentBase> logger)
: AgentBase(new AgentContext(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes)
{
public async ValueTask PublishEventAsync(CloudEvent evt) => await base.PublishEventAsync(evt);
public async ValueTask PublishEventAsync(string topic, IMessage evt)
{
await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false);
}
}

View File

@ -4,12 +4,12 @@
using Google.Protobuf;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.AI;
namespace Microsoft.AutoGen.Agents.Client;
namespace Microsoft.AutoGen.Agents;
public abstract class InferenceAgent<T> : AgentBase where T : IMessage, new()
{
protected IChatClient ChatClient { get; }
public InferenceAgent(
IAgentContext context,
IAgentRuntime context,
EventTypes typeRegistry, IChatClient client
) : base(context, typeRegistry)
{

View File

@ -15,7 +15,7 @@ public abstract class SKAiAgent<T> : AgentBase where T : class, new()
protected Kernel _kernel;
private readonly ISemanticTextMemory _memory;
public SKAiAgent(IAgentContext context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry)
public SKAiAgent(IAgentRuntime context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry)
{
_state = new();
_memory = memory;

View File

@ -13,7 +13,7 @@ public abstract class ConsoleAgent : IOAgent,
{
// instead of the primary constructor above, make a constructr here that still calls the base constructor
public ConsoleAgent(IAgentContext context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry)
public ConsoleAgent(IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry)
{
_route = "console";
}

View File

@ -9,7 +9,7 @@ namespace Microsoft.AutoGen.Agents;
[TopicSubscription("FileIO")]
public abstract class FileAgent(
IAgentContext context,
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
string inputPath = "input.txt",
string outputPath = "output.txt"
@ -24,7 +24,7 @@ public abstract class FileAgent(
if (!File.Exists(inputPath))
{
var errorMessage = $"File not found: {inputPath}";
Logger.LogError(errorMessage);
_logger.LogError(errorMessage);
//publish IOError event
var err = new IOError
{
@ -42,6 +42,7 @@ public abstract class FileAgent(
var evt = new InputProcessed
{
Route = _route
};
await PublishMessageAsync(evt);
}

View File

@ -8,7 +8,7 @@ namespace Microsoft.AutoGen.Agents;
public abstract class IOAgent : AgentBase
{
public string _route = "base";
protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes)
protected IOAgent(IAgentRuntime context, EventTypes eventTypes) : base(context, eventTypes)
{
}
public virtual async Task Handle(Input item)

View File

@ -17,7 +17,7 @@ public abstract class WebAPIAgent : IOAgent,
private readonly string _url = "/agents/webio";
public WebAPIAgent(
IAgentContext context,
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
ILogger<WebAPIAgent> logger,
string url = "/agents/webio") : base(

View File

@ -1,10 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// App.cs
using System.Diagnostics.CodeAnalysis;
using Google.Protobuf;
using Microsoft.AspNetCore.Builder;
using Microsoft.AutoGen.Runtime;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
@ -21,15 +19,15 @@ public static class AgentsApp
if (local)
{
// start the server runtime
builder.AddLocalAgentService();
builder.AddLocalAgentService(useGrpc: false);
}
builder.AddAgentWorker()
builder.AddAgentWorker(local: local)
.AddAgents(agentTypes);
builder.AddServiceDefaults();
var app = builder.Build();
if (local)
{
app.MapAgentService();
app.MapAgentService(local: true, useGrpc: false);
}
app.MapDefaultEndpoints();
Host = app;
@ -47,8 +45,8 @@ public static class AgentsApp
{
await StartAsync(builder, agents, local);
}
var client = Host.Services.GetRequiredService<AgentWorker>() ?? throw new InvalidOperationException("Host not started");
await client.PublishEventAsync(topic, message).ConfigureAwait(false);
var client = Host.Services.GetRequiredService<Client>() ?? throw new InvalidOperationException("Host not started");
await client.PublishEventAsync(topic, message, new CancellationToken()).ConfigureAwait(true);
return Host;
}
public static async ValueTask ShutdownAsync()

View File

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Client.cs
using System.Diagnostics;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public sealed class Client(IAgentWorker runtime, DistributedContextPropagator distributedContextPropagator,
[FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger<Client> logger)
: AgentBase(new AgentRuntime(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes)
{
}

View File

@ -4,22 +4,44 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>AutoGen.Worker.Client</PackageId>
<PackageId>Microsoft.AutoGen.Agents</PackageId>
<PackageProjectUrl>https://github.com/microsoft/autogen</PackageProjectUrl>
<Authors>Microsoft</Authors>
<Description>AutoGen Worker Client Library</Description>
<Description>Micrososft AutoGen Agents SDK</Description>
<Tags>ai-agents;event-driven-agents</Tags>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\Runtime\Microsoft.AutoGen.Runtime.csproj" />
<ProjectReference Include="..\Extensions\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.AI.Abstractions" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.Orleans.Serialization" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Serialization.Protobuf" />
<PackageReference Include="Microsoft.Orleans.Server" />
<PackageReference Include="Microsoft.Orleans.Streaming" />
<PackageReference Include="Microsoft.Orleans.Sdk" />
<PackageReference Include="Microsoft.Orleans.Runtime" />
<PackageReference Include="Microsoft.Orleans.Persistence.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Clustering.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Reminders.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Streaming.EventHubs" />
<PackageReference Include="Microsoft.Orleans.Reminders" />
<PackageReference Include="OrleansDashboard" />
</ItemGroup>
<PropertyGroup>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,181 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentWorker.cs
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Channels;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public class AgentWorker :
IHostedService,
IAgentWorker
{
private readonly ConcurrentDictionary<string, Type> _agentTypes = new();
private readonly ConcurrentDictionary<(string Type, string Key), IAgentBase> _agents = new();
private readonly ILogger<AgentWorker> _logger;
private readonly Channel<object> _mailbox = Channel.CreateUnbounded<object>();
private readonly ConcurrentDictionary<string, AgentState> _agentStates = new();
private readonly ConcurrentDictionary<string, (IAgentBase Agent, string OriginalRequestId)> _pendingClientRequests = new();
private readonly CancellationTokenSource _shutdownCts;
private readonly IServiceProvider _serviceProvider;
private readonly IEnumerable<Tuple<string, Type>> _configuredAgentTypes;
private readonly DistributedContextPropagator _distributedContextPropagator;
private readonly CancellationTokenSource _shutdownCancellationToken = new();
private Task? _mailboxTask;
private readonly object _channelLock = new();
public AgentWorker(
IHostApplicationLifetime hostApplicationLifetime,
IServiceProvider serviceProvider,
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorker> logger,
DistributedContextPropagator distributedContextPropagator)
{
_logger = logger;
_serviceProvider = serviceProvider;
_configuredAgentTypes = configuredAgentTypes;
_distributedContextPropagator = distributedContextPropagator;
_shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping);
}
// this is the in-memory version - we just pass the message directly to the agent(s) that handle this type of event
public async ValueTask PublishEventAsync(CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
foreach (var (typeName, _) in _agentTypes)
{
if (typeName == "Client") { continue; }
var agent = GetOrActivateAgent(new AgentId(typeName, cloudEvent.Source));
agent.ReceiveMessage(new Message { CloudEvent = cloudEvent });
}
}
public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default)
{
var requestId = Guid.NewGuid().ToString();
_pendingClientRequests[requestId] = (agent, request.RequestId);
request.RequestId = requestId;
await _mailbox.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false);
}
public ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken);
}
public ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
// add or update _agentStates with the new state
var response = _agentStates.AddOrUpdate(agentId.ToString(), value, (key, oldValue) => value);
return ValueTask.CompletedTask;
}
public ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
{
_agentStates.TryGetValue(agentId.ToString(), out var state);
if (state is not null && state.AgentId is not null)
{
return new ValueTask<AgentState>(state);
}
else
{
throw new KeyNotFoundException($"Failed to read AgentState for {agentId}.");
}
}
public async Task RunMessagePump()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
await foreach (var message in _mailbox.Reader.ReadAllAsync())
{
try
{
if (message == null) { continue; }
switch (message)
{
case Message msg:
var item = msg.CloudEvent;
foreach (var (typeName, _) in _agentTypes)
{
var agentToInvoke = GetOrActivateAgent(new AgentId(typeName, item.Source));
agentToInvoke.ReceiveMessage(msg);
}
break;
default:
throw new InvalidOperationException($"Unexpected message '{message}'.");
}
}
catch (OperationCanceledException)
{
}
finally
{
_shutdownCancellationToken.Cancel();
}
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
StartCore();
foreach (var (typeName, type) in _configuredAgentTypes)
{
_agentTypes.TryAdd(typeName, type);
}
void StartCore()
{
var didSuppress = false;
if (!ExecutionContext.IsFlowSuppressed())
{
didSuppress = true;
ExecutionContext.SuppressFlow();
}
try
{
_mailboxTask = Task.Run(RunMessagePump, CancellationToken.None);
}
finally
{
if (didSuppress)
{
ExecutionContext.RestoreFlow();
}
}
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_shutdownCts.Cancel();
_mailbox.Writer.TryComplete();
if (_mailboxTask is { } readTask)
{
await readTask.ConfigureAwait(false);
}
lock (_channelLock)
{
}
}
private IAgentBase GetOrActivateAgent(AgentId agentId)
{
if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent))
{
if (_agentTypes.TryGetValue(agentId.Type, out var agentType))
{
var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService<ILogger<AgentBase>>(), _distributedContextPropagator);
agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context);
_agents.TryAdd((agentId.Type, agentId.Key), agent);
}
else
{
throw new InvalidOperationException($"Agent type '{agentId.Type}' is unknown.");
}
}
return agent;
}
}

View File

@ -9,11 +9,11 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
public static class AgentWorkerHostingExtensions
{
public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false)
public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false, bool useGrpc = true)
{
if (local)
{
@ -26,24 +26,32 @@ public static class AgentWorkerHostingExtensions
listenOptions.UseHttps();
});
});
builder.AddOrleans(local);
}
builder.Services.AddGrpc();
builder.AddOrleans(local);
else
{
builder.AddOrleans();
}
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<WorkerGateway>();
builder.Services.AddSingleton<IHostedService>(sp => sp.GetRequiredService<WorkerGateway>());
if (useGrpc)
{
builder.Services.AddGrpc();
builder.Services.AddSingleton<GrpcGateway>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<GrpcGateway>());
}
return builder;
}
public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder)
public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder, bool useGrpc = true)
{
builder.AddAgentService(local: true);
return builder;
return builder.AddAgentService(local: true, useGrpc);
}
public static WebApplication MapAgentService(this WebApplication app)
public static WebApplication MapAgentService(this WebApplication app, bool local = false, bool useGrpc = true)
{
app.MapGrpcService<WorkerGatewayService>();
if (useGrpc) { app.MapGrpcService<GrpcGatewayService>(); }
return app;
}
}

View File

@ -1,5 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GrpcAgentWorkerRuntime.cs
// GrpcAgentWorker.cs
using System.Collections.Concurrent;
using System.Diagnostics;
@ -13,7 +13,15 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgentWorkerRuntime
public sealed class GrpcAgentWorker(
AgentRpc.AgentRpcClient client,
IHostApplicationLifetime hostApplicationLifetime,
IServiceProvider serviceProvider,
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorker> logger,
DistributedContextPropagator distributedContextPropagator) :
AgentWorker(hostApplicationLifetime,
serviceProvider, configuredAgentTypes, logger, distributedContextPropagator), IHostedService, IDisposable, IAgentWorker
{
private readonly object _channelLock = new();
private readonly ConcurrentDictionary<string, Type> _agentTypes = new();
@ -26,38 +34,21 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait
});
private readonly AgentRpc.AgentRpcClient _client;
private readonly IServiceProvider _serviceProvider;
private readonly IEnumerable<Tuple<string, Type>> _configuredAgentTypes;
private readonly ILogger<GrpcAgentWorkerRuntime> _logger;
private readonly DistributedContextPropagator _distributedContextPropagator;
private readonly CancellationTokenSource _shutdownCts;
private readonly AgentRpc.AgentRpcClient _client = client;
private readonly IServiceProvider _serviceProvider = serviceProvider;
private readonly IEnumerable<Tuple<string, Type>> _configuredAgentTypes = configuredAgentTypes;
private readonly ILogger<GrpcAgentWorker> _logger = logger;
private readonly DistributedContextPropagator _distributedContextPropagator = distributedContextPropagator;
private readonly CancellationTokenSource _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping);
private AsyncDuplexStreamingCall<Message, Message>? _channel;
private Task? _readTask;
private Task? _writeTask;
public GrpcAgentWorkerRuntime(
AgentRpc.AgentRpcClient client,
IHostApplicationLifetime hostApplicationLifetime,
IServiceProvider serviceProvider,
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorkerRuntime> logger,
DistributedContextPropagator distributedContextPropagator)
{
_client = client;
_serviceProvider = serviceProvider;
_configuredAgentTypes = configuredAgentTypes;
_logger = logger;
_distributedContextPropagator = distributedContextPropagator;
_shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping);
}
public void Dispose()
{
_outboundMessagesChannel.Writer.TryComplete();
_channel?.Dispose();
}
private async Task RunReadPump()
{
var channel = GetChannel();
@ -131,7 +122,6 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
}
}
}
private async Task RunWritePump()
{
var channel = GetChannel();
@ -183,14 +173,13 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
item.WriteCompletionSource.TrySetCanceled();
}
}
private IAgentBase GetOrActivateAgent(AgentId agentId)
{
if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent))
{
if (_agentTypes.TryGetValue(agentId.Type, out var agentType))
{
var context = new AgentContext(agentId, this, _serviceProvider.GetRequiredService<ILogger<AgentBase>>(), _distributedContextPropagator);
var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService<ILogger<AgentBase>>(), _distributedContextPropagator);
agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context);
_agents.TryAdd((agentId.Type, agentId.Key), agent);
}
@ -203,7 +192,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
return agent;
}
private async ValueTask RegisterAgentType(string type, Type agentType)
private async ValueTask RegisterAgentTypeAsync(string type, Type agentType, CancellationToken cancellationToken = default)
{
if (_agentTypes.TryAdd(type, agentType))
{
@ -223,55 +212,33 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
//StateType = state?.Name,
//Events = { events }
}
},
_shutdownCts.Token).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);
}
}
public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken)
// new is intentional
public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Sending response '{Response}'.", response);
await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken)
// new is intentional
public new async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default)
{
_logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request);
var requestId = Guid.NewGuid().ToString();
_pendingRequests[requestId] = (agent, request.RequestId);
request.RequestId = requestId;
try
{
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
if (_pendingRequests.TryRemove(requestId, out _))
{
agent.ReceiveMessage(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = exception.Message } });
}
}
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken)
// new is intentional
public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
try
{
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Failed to publish event '{Event}'.", @event);
}
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
}
private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken)
private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs = new TaskCompletionSource();
await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false);
await tcs.Task.WaitAsync(cancellationToken);
}
private AsyncDuplexStreamingCall<Message, Message> GetChannel()
{
if (_channel is { } channel)
@ -307,7 +274,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
return _channel;
}
public async Task StartAsync(CancellationToken cancellationToken)
public new async Task StartAsync(CancellationToken cancellationToken)
{
_channel = GetChannel();
StartCore();
@ -315,10 +282,10 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
var tasks = new List<Task>(_agentTypes.Count);
foreach (var (typeName, type) in _configuredAgentTypes)
{
tasks.Add(RegisterAgentType(typeName, type).AsTask());
tasks.Add(RegisterAgentTypeAsync(typeName, type, cancellationToken).AsTask());
}
await Task.WhenAll(tasks).ConfigureAwait(false);
await Task.WhenAll(tasks).ConfigureAwait(true);
void StartCore()
{
@ -344,7 +311,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
}
}
public async Task StopAsync(CancellationToken cancellationToken)
public new async Task StopAsync(CancellationToken cancellationToken)
{
_shutdownCts.Cancel();
@ -364,19 +331,20 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent
_channel?.Dispose();
}
}
public ValueTask Store(AgentState value, CancellationToken cancellationToken)
// new intentional
public new async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
var response = _client.SaveState(value, cancellationToken: cancellationToken);
var response = _client.SaveState(value, null, null, cancellationToken);
if (!response.Success)
{
throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}.");
}
return ValueTask.CompletedTask;
}
public async ValueTask<AgentState> Read(AgentId agentId, CancellationToken cancellationToken)
// new intentional
public new async ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
{
var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken);
var response = await _client.GetStateAsync(agentId).ConfigureAwait(true);
// if (response.Success && response.AgentState.AgentId is not null) - why is success always false?
if (response.AgentState.AgentId is not null)
{

View File

@ -0,0 +1,51 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GrpcAgentWorkerHostBuilderExtension.cs
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Agents;
public static class GrpcAgentWorkerHostBuilderExtensions
{
private const string _defaultAgentServiceAddress = "https://localhost:5001";
public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress)
{
builder.Services.AddGrpcClient<AgentRpc.AgentRpcClient>(options =>
{
options.Address = new Uri(agentServiceAddress);
options.ChannelOptionsActions.Add(channelOptions =>
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true,
KeepAlivePingDelay = TimeSpan.FromSeconds(20),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
};
var methodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};
channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } };
channelOptions.ThrowOperationCanceledOnCancellation = true;
});
});
builder.Services.AddSingleton<IAgentWorker, GrpcAgentWorker>();
return builder;
}
}

View File

@ -1,5 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// WorkerGateway.cs
// GrpcGateway.cs
using System.Collections.Concurrent;
using Grpc.Core;
@ -7,91 +7,60 @@ using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
public sealed class GrpcGateway : BackgroundService, IGateway
{
private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30);
private readonly ILogger<WorkerGateway> _logger;
private readonly ILogger<GrpcGateway> _logger;
private readonly IClusterClient _clusterClient;
private readonly IAgentWorkerRegistryGrain _gatewayRegistry;
private readonly IWorkerGateway _reference;
// The local mapping of agents to worker processes.
private readonly ConcurrentDictionary<WorkerProcessConnection, WorkerProcessConnection> _workers = new();
private readonly ConcurrentDictionary<string, AgentState> _agentState = new();
private readonly IRegistryGrain _gatewayRegistry;
private readonly IGateway _reference;
// The agents supported by each worker process.
private readonly ConcurrentDictionary<string, List<WorkerProcessConnection>> _supportedAgentTypes = [];
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection>> _supportedAgentTypes = [];
public readonly ConcurrentDictionary<IConnection, IConnection> _workers = new();
// The mapping from agent id to worker process.
private readonly ConcurrentDictionary<(string Type, string Key), WorkerProcessConnection> _agentDirectory = new();
private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new();
// RPC
private readonly ConcurrentDictionary<(WorkerProcessConnection, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
// InMemory Message Queue
public WorkerGateway(IClusterClient clusterClient, ILogger<WorkerGateway> logger)
public GrpcGateway(IClusterClient clusterClient, ILogger<GrpcGateway> logger)
{
_logger = logger;
_clusterClient = clusterClient;
_reference = clusterClient.CreateObjectReference<IWorkerGateway>(this);
_gatewayRegistry = clusterClient.GetGrain<IAgentWorkerRegistryGrain>(0);
_reference = clusterClient.CreateObjectReference<IGateway>(this);
_gatewayRegistry = clusterClient.GetGrain<IRegistryGrain>(0);
}
public async ValueTask BroadcastEvent(CloudEvent evt)
{
// TODO: filter the workers that receive the event
var tasks = new List<Task>(_workers.Count);
foreach (var (_, connection) in _workers)
foreach (var (_, connection) in _supportedAgentTypes)
{
tasks.Add(connection.SendMessage(new Message { CloudEvent = evt }));
tasks.Add(this.SendMessageAsync((IConnection)connection[0], evt, default));
}
await Task.WhenAll(tasks);
await Task.WhenAll(tasks).ConfigureAwait(false);
}
public async ValueTask<RpcResponse> InvokeRequest(RpcRequest request)
//intetionally not static so can be called by some methods implemented in base class
public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
(string Type, string Key) agentId = (request.Target.Type, request.Target.Key);
if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted)
{
// Activate the agent on a compatible worker process.
if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers))
{
connection = workers[Random.Shared.Next(workers.Count)];
_agentDirectory[agentId] = connection;
}
else
{
return new(new RpcResponse { Error = "Agent not found." });
}
}
// Proxy the request to the agent.
var originalRequestId = request.RequestId;
var newRequestId = Guid.NewGuid().ToString();
var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously);
request.RequestId = newRequestId;
await connection.ResponseStream.WriteAsync(new Message { Request = request });
// Wait for the response and send it back to the caller.
var response = await completion.Task.WaitAsync(s_agentResponseTimeout);
response.RequestId = originalRequestId;
return response;
var queue = (GrpcWorkerConnection)connection;
await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false);
}
private void DispatchResponse(WorkerProcessConnection connection, RpcResponse response)
private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response)
{
if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion))
{
_logger.LogWarning("Received response for unknown request.");
return;
}
// Complete the request.
completion.SetResult(response);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
@ -104,10 +73,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
{
_logger.LogWarning(exception, "Error adding worker to registry.");
}
await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken);
}
try
{
await _gatewayRegistry.RemoveWorker(_reference);
@ -117,8 +84,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
_logger.LogWarning(exception, "Error removing worker from registry.");
}
}
internal async Task OnReceivedMessageAsync(WorkerProcessConnection connection, Message message)
//new is intentional...
internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message)
{
_logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection);
switch (message.MessageCase)
@ -139,63 +106,28 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
throw new InvalidOperationException($"Unknown message type for message '{message}'.");
};
}
private async ValueTask RegisterAgentTypeAsync(WorkerProcessConnection connection, RegisterAgentTypeRequest msg)
private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg)
{
connection.AddSupportedType(msg.Type);
_supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection);
var success = false;
var error = String.Empty;
try
{
await _gatewayRegistry.RegisterAgentType(msg.Type, _reference);
success = true;
}
catch (InvalidOperationException exception)
{
error = $"Error registering agent type '{msg.Type}'.";
_logger.LogWarning(exception, error);
}
var request_id = msg.RequestId;
var response = new RegisterAgentTypeResponse { RequestId = request_id, Success = success, Error = error };
await connection.SendMessage(new Message { RegisterAgentTypeResponse = response });
await _gatewayRegistry.RegisterAgentType(msg.Type, _reference);
}
private async ValueTask DispatchEventAsync(CloudEvent evt)
{
await BroadcastEvent(evt);
await BroadcastEvent(evt).ConfigureAwait(false);
/*
var topic = _clusterClient.GetStreamProvider("agents").GetStream<Event>(StreamId.Create(evt.Namespace, evt.Type));
await topic.OnNextAsync(evt.ToEvent());
*/
}
private async ValueTask DispatchRequestAsync(WorkerProcessConnection connection, RpcRequest request)
private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request)
{
var requestId = request.RequestId;
if (request.Target is null)
{
throw new InvalidOperationException($"Request message is missing a target. Message: '{request}'.");
}
/*
if (string.Equals("runtime", request.Target.Type, StringComparison.Ordinal))
{
if (string.Equals("subscribe", request.Method))
{
await InvokeRequestDelegate(connection, request, async (_) =>
{
await SubscribeToTopic(connection, request);
return new RpcResponse { Result = "Ok" };
});
return;
}
}
else
{
*/
await InvokeRequestDelegate(connection, request, async request =>
{
var (gateway, isPlacement) = await _gatewayRegistry.GetOrPlaceAgent(request.Target);
@ -203,84 +135,51 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
{
return new RpcResponse { Error = "Agent not found and no compatible gateways were found." };
}
if (isPlacement)
{
// Activate the worker: load state
// TODO
// TODO// Activate the worker: load state
}
// Forward the message to the gateway and return the result.
return await gateway.InvokeRequest(request);
return await gateway.InvokeRequest(request).ConfigureAwait(true);
});
//}
}
private static async Task InvokeRequestDelegate(WorkerProcessConnection connection, RpcRequest request, Func<RpcRequest, Task<RpcResponse>> func)
private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func<RpcRequest, Task<RpcResponse>> func)
{
try
{
var response = await func(request);
response.RequestId = request.RequestId;
await connection.ResponseStream.WriteAsync(new Message { Response = response });
await connection.ResponseStream.WriteAsync(new Message { Response = response }).ConfigureAwait(false);
}
catch (Exception ex)
{
await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } });
await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false);
}
}
public async ValueTask Store(AgentState value)
{
var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId));
var agentState = _clusterClient.GetGrain<IWorkerAgentGrain>($"{agentId.Type}:{agentId.Key}");
await agentState.WriteStateAsync(value, value.ETag);
}
public async ValueTask<AgentState> Read(AgentId agentId)
{
var agentState = _clusterClient.GetGrain<IWorkerAgentGrain>($"{agentId.Type}:{agentId.Key}");
return await agentState.ReadStateAsync();
}
/*
private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request)
{
// Subscribe to a topic
var parameters = JsonSerializer.Deserialize<Dictionary<string, string>>(request.Data)
?? throw new ArgumentException($"Request data does not contain required payload format: {{\"namespace\": \"string\", \"type\": \"string\"}}.");
var ns = parameters["namespace"];
var type = parameters["type"];
var topic = _clusterClient.GetStreamProvider("agents").GetStream<Event>(StreamId.Create(ns: type, key: ns));
await topic.SubscribeAsync(OnNextAsync);
return;
async Task OnNextAsync(IList<SequentialItem<Event>> items)
{
foreach (var item in items)
{
var evt = item.Item.ToRpcEvent();
evt.Namespace = ns;
evt.Type = evt.Type;
var payload = new Dictionary<string, string>
{
["sequenceId"] = item.Token.SequenceNumber.ToString(),
["eventIdx"] = item.Token.EventIndex.ToString()
};
evt.Data = JsonSerializer.Serialize(payload);
await connection.ResponseStream.WriteAsync(new Message { Event = evt });
}
}
}
*/
internal Task ConnectToWorkerProcess(IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
{
_logger.LogInformation("Received new connection from {Peer}.", context.Peer);
var workerProcess = new WorkerProcessConnection(this, requestStream, responseStream, context);
var workerProcess = new GrpcWorkerConnection(this, requestStream, responseStream, context);
_workers[workerProcess] = workerProcess;
return workerProcess.Completion;
}
public async ValueTask StoreAsync(AgentState value)
{
var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId));
_agentState[agentId.Key] = value;
}
internal void OnRemoveWorkerProcess(WorkerProcessConnection workerProcess)
public async ValueTask<AgentState> ReadAsync(AgentId agentId)
{
if (_agentState.TryGetValue(agentId.Key, out var state))
{
return state;
}
return new AgentState { AgentId = agentId };
}
internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess)
{
_workers.TryRemove(workerProcess, out _);
var types = workerProcess.GetSupportedTypes();
@ -291,14 +190,50 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
supported.Remove(workerProcess);
}
}
// Any agents activated on that worker are also gone.
foreach (var pair in _agentDirectory)
{
if (pair.Value == workerProcess)
{
((IDictionary<(string Type, string Key), WorkerProcessConnection>)_agentDirectory).Remove(pair);
((IDictionary<(string Type, string Key), GrpcWorkerConnection>)_agentDirectory).Remove(pair);
}
}
}
public async ValueTask<RpcResponse> InvokeRequest(RpcRequest request, CancellationToken cancellationToken = default)
{
(string Type, string Key) agentId = (request.Target.Type, request.Target.Key);
if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted)
{
// Activate the agent on a compatible worker process.
if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers))
{
connection = workers[Random.Shared.Next(workers.Count)];
_agentDirectory[agentId] = connection;
}
else
{
return new(new RpcResponse { Error = "Agent not found." });
}
}
// Proxy the request to the agent.
var originalRequestId = request.RequestId;
var newRequestId = Guid.NewGuid().ToString();
var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously);
request.RequestId = newRequestId;
await connection.ResponseStream.WriteAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
// Wait for the response and send it back to the caller.
var response = await completion.Task.WaitAsync(s_agentResponseTimeout);
response.RequestId = originalRequestId;
return response;
}
async ValueTask<RpcResponse> IGateway.InvokeRequest(RpcRequest request)
{
return await this.InvokeRequest(request).ConfigureAwait(false);
}
Task IGateway.SendMessageAsync(IConnection connection, CloudEvent cloudEvent)
{
return this.SendMessageAsync(connection, cloudEvent);
}
}

View File

@ -1,19 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// WorkerGatewayService.cs
// GrpcGatewayService.cs
using Grpc.Core;
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
// gRPC service which handles communication between the agent worker and the cluster.
internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc.AgentRpcBase
internal sealed class GrpcGatewayService : AgentRpc.AgentRpcBase
{
private readonly GrpcGateway Gateway;
public GrpcGatewayService(GrpcGateway gateway)
{
Gateway = (GrpcGateway)gateway;
}
public override async Task OpenChannel(IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
{
try
{
await agentWorker.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true);
await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true);
}
catch
{
@ -26,13 +31,13 @@ internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc
}
public override async Task<GetStateResponse> GetState(AgentId request, ServerCallContext context)
{
var state = await agentWorker.Read(request);
var state = await Gateway.ReadAsync(request);
return new GetStateResponse { AgentState = state };
}
public override async Task<SaveStateResponse> SaveState(AgentState request, ServerCallContext context)
{
await agentWorker.Store(request);
await Gateway.StoreAsync(request);
return new SaveStateResponse
{
Success = true // TODO: Implement error handling

View File

@ -1,13 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// WorkerProcessConnection.cs
// GrpcWorkerConnection.cs
using System.Threading.Channels;
using Grpc.Core;
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
internal sealed class WorkerProcessConnection : IAsyncDisposable
internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection
{
private static long s_nextConnectionId;
private readonly Task _readTask;
@ -15,10 +15,10 @@ internal sealed class WorkerProcessConnection : IAsyncDisposable
private readonly string _connectionId = Interlocked.Increment(ref s_nextConnectionId).ToString();
private readonly object _lock = new();
private readonly HashSet<string> _supportedTypes = [];
private readonly WorkerGateway _gateway;
private readonly GrpcGateway _gateway;
private readonly CancellationTokenSource _shutdownCancellationToken = new();
public WorkerProcessConnection(WorkerGateway agentWorker, IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
public GrpcWorkerConnection(GrpcGateway agentWorker, IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
{
_gateway = agentWorker;
RequestStream = requestStream;

View File

@ -4,24 +4,24 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
public static class Host
{
public static async Task<WebApplication> StartAsync(bool local = false)
public static async Task<WebApplication> StartAsync(bool local = false, bool useGrpc = true)
{
var builder = WebApplication.CreateBuilder();
builder.AddServiceDefaults();
if (local)
{
builder.AddLocalAgentService();
builder.AddLocalAgentService(useGrpc);
}
else
{
builder.AddAgentService();
builder.AddAgentService(useGrpc);
}
var app = builder.Build();
app.MapAgentService();
app.MapAgentService(local, useGrpc);
app.MapDefaultEndpoints();
await app.StartAsync().ConfigureAwait(false);
return app;

View File

@ -6,8 +6,6 @@ using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
@ -20,41 +18,18 @@ public static class HostBuilderExtensions
private const string _defaultAgentServiceAddress = "https://localhost:5001";
public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress, bool local = false)
{
builder.Services.AddGrpcClient<AgentRpc.AgentRpcClient>(options =>
{
options.Address = new Uri(agentServiceAddress);
options.ChannelOptionsActions.Add(channelOptions =>
{
channelOptions.HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true,
KeepAlivePingDelay = TimeSpan.FromSeconds(20),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests
};
var methodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};
channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } };
channelOptions.ThrowOperationCanceledOnCancellation = true;
});
});
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<IAgentWorkerRuntime, GrpcAgentWorkerRuntime>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentWorkerRuntime>());
builder.Services.AddSingleton<AgentWorker>();
// if !local, then add the gRPC client
if (!local)
{
builder.AddGrpcAgentWorker(agentServiceAddress);
}
else
{
builder.Services.AddSingleton<IAgentWorker, AgentWorker>();
}
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentWorker>());
builder.Services.AddKeyedSingleton("EventTypes", (sp, key) =>
{
var interfaceType = typeof(IMessage);
@ -122,6 +97,7 @@ public static class HostBuilderExtensions
}
return new EventTypes(typeRegistry, types, eventsMap);
});
builder.Services.AddSingleton<Client>();
return new AgentApplicationBuilder(builder);
}
@ -159,7 +135,7 @@ public sealed class AgentTypes(Dictionary<string, Type> types)
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(AgentBase))
&& !type.IsAbstract
&& !type.Name.Equals("AgentWorker"))
&& !type.Name.Equals(nameof(Client)))
.ToDictionary(type => type.Name, type => type);
return new AgentTypes(agents);

View File

@ -1,11 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// WorkerAgentGrain.cs
// AgentStateGrain.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState<AgentState> state) : Grain, IWorkerAgentGrain
internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState<AgentState> state) : Grain, IAgentState
{
public async ValueTask<string> WriteStateAsync(AgentState newState, string eTag)
{
@ -16,7 +16,7 @@ internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStor
if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal)))
{
state.State = newState;
await state.WriteStateAsync();
await state.WriteStateAsync().ConfigureAwait(false);
}
else
{

View File

@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IRegistryGrain.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Agents;
public interface IRegistryGrain : IGrainWithIntegerKey
{
ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId);
ValueTask RemoveWorker(IGateway worker);
ValueTask RegisterAgentType(string type, IGateway worker);
ValueTask AddWorker(IGateway worker);
ValueTask UnregisterAgentType(string type, IGateway worker);
ValueTask<IGateway?> GetCompatibleWorker(string type);
}

View File

@ -4,11 +4,12 @@
using System.Configuration;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Orleans.Configuration;
using Orleans.Serialization;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
public static class OrleansRuntimeHostingExtenions
{
@ -83,6 +84,7 @@ public static class OrleansRuntimeHostingExtenions
.AddMemoryGrainStorage("PubSubStore");
}
});
builder.Services.AddSingleton<IRegistryGrain, RegistryGrain>();
return builder;
}
}

View File

@ -1,16 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentWorkerRegistryGrain.cs
// RegistryGrain.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
namespace Microsoft.AutoGen.Agents;
public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain
public sealed class RegistryGrain : Grain, IRegistryGrain
{
// TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively.
private readonly Dictionary<IWorkerGateway, WorkerState> _workerStates = [];
private readonly Dictionary<string, List<IWorkerGateway>> _supportedAgentTypes = [];
private readonly Dictionary<(string Type, string Key), IWorkerGateway> _agentDirectory = [];
private readonly Dictionary<IGateway, WorkerState> _workerStates = new();
private readonly Dictionary<string, List<IGateway>> _supportedAgentTypes = [];
private readonly Dictionary<(string Type, string Key), IGateway> _agentDirectory = [];
private readonly TimeSpan _agentTimeout = TimeSpan.FromMinutes(1);
public override Task OnActivateAsync(CancellationToken cancellationToken)
@ -18,107 +18,7 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain
this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
return base.OnActivateAsync(cancellationToken);
}
private Task PurgeInactiveWorkers()
{
foreach (var (worker, state) in _workerStates)
{
if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout)
{
_workerStates.Remove(worker);
foreach (var type in state.SupportedTypes)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
}
}
}
return Task.CompletedTask;
}
public ValueTask AddWorker(IWorkerGateway worker)
{
GetOrAddWorker(worker);
return ValueTask.CompletedTask;
}
private WorkerState GetOrAddWorker(IWorkerGateway worker)
{
if (!_workerStates.TryGetValue(worker, out var workerState))
{
workerState = _workerStates[worker] = new();
}
workerState.LastSeen = DateTimeOffset.UtcNow;
return workerState;
}
public ValueTask RegisterAgentType(string type, IWorkerGateway worker)
{
if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes))
{
supportedAgentTypes = _supportedAgentTypes[type] = [];
}
if (!supportedAgentTypes.Contains(worker))
{
supportedAgentTypes.Add(worker);
}
var workerState = GetOrAddWorker(worker);
workerState.SupportedTypes.Add(type);
return ValueTask.CompletedTask;
}
public ValueTask RemoveWorker(IWorkerGateway worker)
{
if (_workerStates.Remove(worker, out var state))
{
foreach (var type in state.SupportedTypes)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
}
}
return ValueTask.CompletedTask;
}
public ValueTask UnregisterAgentType(string type, IWorkerGateway worker)
{
if (_workerStates.TryGetValue(worker, out var state))
{
state.SupportedTypes.Remove(type);
}
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
return ValueTask.CompletedTask;
}
public ValueTask<IWorkerGateway?> GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type));
private IWorkerGateway? GetCompatibleWorkerCore(string type)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
// Return a random compatible worker.
return workers[Random.Shared.Next(workers.Count)];
}
return null;
}
public ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId)
public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId)
{
// TODO:
bool isNewPlacement;
@ -142,9 +42,98 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain
// Existing activation.
isNewPlacement = false;
}
return new((worker, isNewPlacement));
}
public ValueTask RemoveWorker(IGateway worker)
{
if (_workerStates.Remove(worker, out var state))
{
foreach (var type in state.SupportedTypes)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
}
}
return ValueTask.CompletedTask;
}
public ValueTask RegisterAgentType(string type, IGateway worker)
{
if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes))
{
supportedAgentTypes = _supportedAgentTypes[type] = [];
}
if (!supportedAgentTypes.Contains(worker))
{
supportedAgentTypes.Add(worker);
}
var workerState = GetOrAddWorker(worker);
workerState.SupportedTypes.Add(type);
return ValueTask.CompletedTask;
}
public ValueTask AddWorker(IGateway worker)
{
GetOrAddWorker(worker);
return ValueTask.CompletedTask;
}
public ValueTask UnregisterAgentType(string type, IGateway worker)
{
if (_workerStates.TryGetValue(worker, out var state))
{
state.SupportedTypes.Remove(type);
}
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
return ValueTask.CompletedTask;
}
private Task PurgeInactiveWorkers()
{
foreach (var (worker, state) in _workerStates)
{
if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout)
{
_workerStates.Remove(worker);
foreach (var type in state.SupportedTypes)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
workers.Remove(worker);
}
}
}
}
return Task.CompletedTask;
}
private WorkerState GetOrAddWorker(IGateway worker)
{
if (!_workerStates.TryGetValue(worker, out var workerState))
{
workerState = _workerStates[worker] = new();
}
workerState.LastSeen = DateTimeOffset.UtcNow;
return workerState;
}
public ValueTask<IGateway?> GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type));
private IGateway? GetCompatibleWorkerCore(string type)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
// Return a random compatible worker.
return workers[Random.Shared.Next(workers.Count)];
}
return null;
}
private sealed class WorkerState
{

View File

@ -1,15 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IAgentWorkerRegistryGrain.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
public interface IAgentWorkerRegistryGrain : IGrainWithIntegerKey
{
ValueTask RegisterAgentType(string type, IWorkerGateway worker);
ValueTask UnregisterAgentType(string type, IWorkerGateway worker);
ValueTask AddWorker(IWorkerGateway worker);
ValueTask RemoveWorker(IWorkerGateway worker);
ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId);
}

View File

@ -1,14 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IWorkerGateway.cs
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
public interface IWorkerGateway : IGrainObserver
{
ValueTask<RpcResponse> InvokeRequest(RpcRequest request);
ValueTask BroadcastEvent(CloudEvent evt);
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
}

View File

@ -1,35 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>AutoGen.Worker.Server</PackageId>
<PackageProjectUrl>https://github.com/microsoft/agnext</PackageProjectUrl>
<Authors>Microsoft</Authors>
<Description>AutoGen Worker Server Library</Description>
<Tags>ai-agents;event-driven-agents</Tags>
<NoWarn>$(NoWarn);CS8002</NoWarn>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Serialization.Protobuf" />
<PackageReference Include="Microsoft.Orleans.Server" />
<PackageReference Include="Microsoft.Orleans.Streaming" />
<PackageReference Include="Microsoft.Orleans.Sdk" />
<PackageReference Include="Microsoft.Orleans.Runtime" />
<PackageReference Include="Microsoft.Orleans.Persistence.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Clustering.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Reminders.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Streaming.EventHubs" />
<PackageReference Include="Microsoft.Orleans.Reminders" />
<PackageReference Include="OrleansDashboard"/>
</ItemGroup>
</Project>

View File

@ -4,6 +4,7 @@
using FluentAssertions;
using Google.Protobuf.Reflection;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
@ -14,8 +15,8 @@ public class AgentBaseTests
[Fact]
public async Task ItInvokeRightHandlerTestAsync()
{
var mockContext = new Mock<IAgentContext>();
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []));
var mockContext = new Mock<IAgentRuntime>();
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger<AgentBase>(new LoggerFactory()));
await agent.HandleObject("hello world");
await agent.HandleObject(42);
@ -30,7 +31,7 @@ public class AgentBaseTests
/// </summary>
public class TestAgent : AgentBase, IHandle<string>, IHandle<int>
{
public TestAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes)
public TestAgent(IAgentRuntime context, EventTypes eventTypes, Logger<AgentBase> logger) : base(context, eventTypes, logger)
{
}

View File

@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
</ItemGroup>
</Project>

View File

@ -35,3 +35,6 @@ message ConversationClosed {
string user_id = 1;
string user_message = 2;
}
message Shutdown {
string message = 1;
}