diff --git a/README.md b/README.md index a472e10a1b..7150942717 100644 --- a/README.md +++ b/README.md @@ -195,13 +195,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } public async Task Handle(ConversationClosed item) { @@ -210,7 +210,7 @@ public class HelloAgent( { Message = goodbye }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); await Task.Delay(60000); await App.ShutdownAsync(); } diff --git a/docs/design/03 - worker-protocol.md b/docs/design/03 - Agent Worker Protocol.md similarity index 100% rename from docs/design/03 - worker-protocol.md rename to docs/design/03 - Agent Worker Protocol.md diff --git a/docs/design/05 - Services.md b/docs/design/05 - Services.md new file mode 100644 index 0000000000..9aeacdb626 --- /dev/null +++ b/docs/design/05 - Services.md @@ -0,0 +1,26 @@ +# AutoGen Services + +## Overview + +Each AutoGen agent system has one or more Agent Workers and a set of services for managing/supporting the agents. The services and workers can all be hosted in the same process or in a distributed system. When in the same process communication and event delivery is in-memory. When distributed, workers communicate with the service over gRPC. In all cases, events are packaged as CloudEvents. There are multiple options for the backend services: + +- In-Memory: the Agent Workers and Services are all hosted in the same process and communicate over in-memory channels. Available for python and .NET. +- Python only: Agent workers communicate with a python hosted service that implements an in-memory message bus and agent registry. +- Micrososft Orleans: a distributed actor system that can host the services and workers, enables distributed state with persistent storage, can leverage multiple event bus types, and cross-language agent communication. +- *Roadmap: support for other languages distributed systems such as dapr or Akka.* + +The Services in the system include: + +- Worker: Hosts the Agents and is a client to the Gateway +- Gateway: +-- RPC gateway for the other services APIs +-- Provides an RPC bridge between the workers and the Event Bus +-- Message Session state (track message queues/delivery) +- Registry: keeps track of the {agents:agent types}:{Subscription/Topics} in the system and which events they can handle +-- *Roadmap: add lookup api in gateway* +- AgentState: persistent state for agents +- Routing: delivers events to agents based on their subscriptions+topics +-- *Roadmap: add subscription management APIs* +- *Roadmap: Management APIs for the Agent System* +- *Roadmap: Scheduling: manages placement of agents* +- *Roadmap: Discovery: allows discovery of agents and services* diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index 8557f17840..accf92218a 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -84,10 +84,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Abstracti EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Extensions.SemanticKernel", "src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj", "{952827D4-8D4C-4327-AE4D-E8D25811EF35}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Runtime", "src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj", "{A905E29A-7110-497F-ADC5-2CE2A148FEA0}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AgentChat", "AgentChat", "{668726B9-77BC-45CF-B576-0F0773BF1615}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AutoGen.Anthropic.Samples", "samples\AutoGen.Anthropic.Samples\AutoGen.Anthropic.Samples.csproj", "{84020C4A-933A-4693-9889-1B99304A7D76}" @@ -128,12 +124,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgent", "samples\Hello EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensions", "src\Microsoft.AutoGen\Extensions\AIModelClientHostingExtensions\AIModelClientHostingExtensions.csproj", "{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Agents.Tests", "Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\Extensions\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{65059914-5527-4A00-9308-9FAF23D5E85A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Agents.Tests", "test\Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{394FDAF8-74F9-4977-94A5-3371737EB774}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -264,14 +262,6 @@ Global {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.Build.0 = Debug|Any CPU {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.ActiveCfg = Release|Any CPU {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.Build.0 = Release|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.Build.0 = Release|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.Build.0 = Release|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.Build.0 = Debug|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -340,14 +330,18 @@ Global {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.Build.0 = Release|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.Build.0 = Release|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.Build.0 = Debug|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.ActiveCfg = Release|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -385,8 +379,6 @@ Global {FD87BD33-4616-460B-AC85-A412BA08BB78} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {E0C991D9-0DB8-471C-ADC9-5FB16E2A0106} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {952827D4-8D4C-4327-AE4D-E8D25811EF35} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {A905E29A-7110-497F-ADC5-2CE2A148FEA0} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {668726B9-77BC-45CF-B576-0F0773BF1615} = {686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED} {84020C4A-933A-4693-9889-1B99304A7D76} = {668726B9-77BC-45CF-B576-0F0773BF1615} {5777515F-4053-42F9-AF2B-95D8D0F5384A} = {668726B9-77BC-45CF-B576-0F0773BF1615} @@ -407,8 +399,9 @@ Global {A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} {64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} + {65059914-5527-4A00-9308-9FAF23D5E85A} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {394FDAF8-74F9-4977-94A5-3371737EB774} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index a414a5f59d..c1b26f2f9b 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -63,11 +63,17 @@ + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + diff --git a/dotnet/samples/Hello/Backend/Backend.csproj b/dotnet/samples/Hello/Backend/Backend.csproj index 2f5a02ee51..d502d7260d 100644 --- a/dotnet/samples/Hello/Backend/Backend.csproj +++ b/dotnet/samples/Hello/Backend/Backend.csproj @@ -1,4 +1,7 @@ - + + + + Exe net8.0 @@ -6,10 +9,6 @@ enable - - - - diff --git a/dotnet/samples/Hello/Backend/Program.cs b/dotnet/samples/Hello/Backend/Program.cs index 7abdb205a8..b913d39d64 100644 --- a/dotnet/samples/Hello/Backend/Program.cs +++ b/dotnet/samples/Hello/Backend/Program.cs @@ -1,5 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -var app = await Microsoft.AutoGen.Runtime.Host.StartAsync(local: true); +using Microsoft.Extensions.Hosting; + +var app = await Microsoft.AutoGen.Agents.Host.StartAsync(local: false, useGrpc: true); await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/Backend/appsettings.json b/dotnet/samples/Hello/Backend/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/Backend/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs index fd3c517f33..d2ba81e659 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs @@ -8,11 +8,13 @@ using Microsoft.Extensions.AI; namespace Hello; [TopicSubscription("HelloAgents")] public class HelloAIAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, + IHostApplicationLifetime hostApplicationLifetime, IChatClient client) : HelloAgent( context, - typeRegistry), + typeRegistry, + hostApplicationLifetime), IHandle { // This Handle supercedes the one in the base class diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj index 86bccb13b3..f17ab0c9f0 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj @@ -13,7 +13,6 @@ - diff --git a/dotnet/samples/Hello/HelloAIAgents/Program.cs b/dotnet/samples/Hello/HelloAIAgents/Program.cs index 8285e0800f..9612a0a079 100644 --- a/dotnet/samples/Hello/HelloAIAgents/Program.cs +++ b/dotnet/samples/Hello/HelloAIAgents/Program.cs @@ -32,8 +32,9 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + IAgentRuntime context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry, + IHostApplicationLifetime hostApplicationLifetime) : ConsoleAgent( context, typeRegistry), ISayHello, @@ -65,7 +66,7 @@ namespace Hello await PublishMessageAsync(evt).ConfigureAwait(false); //sleep30 seconds await Task.Delay(30000).ConfigureAwait(false); - await AgentsApp.ShutdownAsync().ConfigureAwait(false); + hostApplicationLifetime.StopApplication(); } public async Task SayHello(string ask) diff --git a/dotnet/samples/Hello/HelloAIAgents/appsettings.json b/dotnet/samples/Hello/HelloAIAgents/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAIAgents/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj index 8799eb7275..f2f3e473fe 100644 --- a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj +++ b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj @@ -1,4 +1,4 @@ - + Exe net8.0 @@ -14,6 +14,5 @@ - diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index 4e96c7f99b..506d915023 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -3,6 +3,8 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; // step 1: create in-memory agent runtime @@ -16,7 +18,7 @@ using Microsoft.AutoGen.Agents; var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: false); +}, local: true); await app.WaitForShutdownAsync(); @@ -24,9 +26,8 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry, - IHostApplicationLifetime hostApplicationLifetime) : AgentBase( + IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( context, typeRegistry), ISayHello, diff --git a/dotnet/samples/Hello/HelloAgent/README.md b/dotnet/samples/Hello/HelloAgent/README.md index 2de64ebaa0..f95e25d6ec 100644 --- a/dotnet/samples/Hello/HelloAgent/README.md +++ b/dotnet/samples/Hello/HelloAgent/README.md @@ -25,11 +25,11 @@ Flow Diagram: ```mermaid %%{init: {'theme':'forest'}}%% graph LR; - A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} - B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent] C --> D{"WriteConsole()"} - B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} - B --> |"PublishEvent(Output('***Goodbye***'))"| C + B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEventAsync(Output('***Goodbye***'))"| C E --> F{"Shutdown()"} ``` @@ -58,13 +58,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } ``` @@ -109,7 +109,6 @@ message ReadmeRequested { } ``` - ```xml diff --git a/dotnet/samples/Hello/HelloAgent/appsettings.json b/dotnet/samples/Hello/HelloAgent/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgent/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj index 8799eb7275..797fe957bb 100644 --- a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -14,6 +14,5 @@ - diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs index d021126850..7c15c4c54d 100644 --- a/dotnet/samples/Hello/HelloAgentState/Program.cs +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs +using System.Text.Json; using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; @@ -8,7 +9,7 @@ using Microsoft.AutoGen.Agents; var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: false); +}, local: true); await app.WaitForShutdownAsync(); @@ -16,13 +17,15 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + IAgentRuntime context, + IHostApplicationLifetime hostApplicationLifetime, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( context, typeRegistry), - ISayHello, + IHandleConsole, IHandle, - IHandle + IHandle, + IHandle { private AgentState? State { get; set; } public async Task Handle(NewMessageReceived item) @@ -32,11 +35,15 @@ namespace Hello { Message = response }; - var entry = "We said hello to " + item.Message; - await Store(new AgentState + Dictionary state = new() + { + { "data", "We said hello to " + item.Message }, + { "workflow", "Active" } + }; + await StoreAsync(new AgentState { AgentId = this.AgentId, - TextData = entry + TextData = JsonSerializer.Serialize(state) }).ConfigureAwait(false); await PublishMessageAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed @@ -45,21 +52,40 @@ namespace Hello UserMessage = "Goodbye" }; await PublishMessageAsync(goodbye).ConfigureAwait(false); + // send the shutdown message + await PublishMessageAsync(new Shutdown { Message = this.AgentId.Key }).ConfigureAwait(false); + } public async Task Handle(ConversationClosed item) { - State = await Read(this.AgentId).ConfigureAwait(false); - var read = State?.TextData ?? "No state data found"; - var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************"; + State = await ReadAsync(this.AgentId).ConfigureAwait(false); + var state = JsonSerializer.Deserialize>(State.TextData) ?? new Dictionary { { "data", "No state data found" } }; + var goodbye = $"\nState: {state}\n********************* {item.UserId} said {item.UserMessage} ************************"; var evt = new Output { Message = goodbye }; - await PublishMessageAsync(evt).ConfigureAwait(false); - //sleep - await Task.Delay(10000).ConfigureAwait(false); - await AgentsApp.ShutdownAsync().ConfigureAwait(false); - + await PublishMessageAsync(evt).ConfigureAwait(true); + state["workflow"] = "Complete"; + await StoreAsync(new AgentState + { + AgentId = this.AgentId, + TextData = JsonSerializer.Serialize(state) + }).ConfigureAwait(false); + } + public async Task Handle(Shutdown item) + { + string? workflow = null; + // make sure the workflow is finished + while (workflow != "Complete") + { + State = await ReadAsync(this.AgentId).ConfigureAwait(true); + var state = JsonSerializer.Deserialize>(State?.TextData ?? "{}") ?? new Dictionary(); + workflow = state["workflow"]; + await Task.Delay(1000).ConfigureAwait(true); + } + // now we can shut down... + hostApplicationLifetime.StopApplication(); } public async Task SayHello(string ask) { @@ -67,8 +93,4 @@ namespace Hello return response; } } - public interface ISayHello - { - public Task SayHello(string ask); - } } diff --git a/dotnet/samples/Hello/HelloAgentState/README.md b/dotnet/samples/Hello/HelloAgentState/README.md index 0079005d24..8bc8e34545 100644 --- a/dotnet/samples/Hello/HelloAgentState/README.md +++ b/dotnet/samples/Hello/HelloAgentState/README.md @@ -25,11 +25,11 @@ Flow Diagram: ```mermaid %%{init: {'theme':'forest'}}%% graph LR; - A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} - B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent] C --> D{"WriteConsole()"} - B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} - B --> |"PublishEvent(Output('***Goodbye***'))"| C + B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEventAsync(Output('***Goodbye***'))"| C E --> F{"Shutdown()"} ``` @@ -58,13 +58,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } ``` diff --git a/dotnet/samples/Hello/HelloAgentState/appsettings.json b/dotnet/samples/Hello/HelloAgentState/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj index 69705e4667..d8d7ebf8e4 100644 --- a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj +++ b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs b/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs index 4864f39421..6480e72a0b 100644 --- a/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs +++ b/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -using Microsoft.AutoGen.Runtime; +using Microsoft.AutoGen.Agents; var builder = WebApplication.CreateBuilder(args); builder.AddServiceDefaults(); diff --git a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj index 03176d2cfd..8dfd6912e5 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj +++ b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs b/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs index 5b66822482..325b9fbe0b 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs @@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class Dev(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class Dev(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), IDevelopApps, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs b/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs index 71301dd3d3..e03701c968 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs @@ -11,7 +11,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class DeveloperLead(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class DeveloperLead(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), ILeadDevelopers, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs b/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs index a97b333567..cc393e6518 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs @@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class ProductManager(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class ProductManager(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), IManageProducts, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs b/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs index 7dac8163a7..d3997a8f8e 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs +++ b/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs @@ -9,7 +9,7 @@ using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.Memory; namespace Microsoft.AI.DevTeam; -public class AzureGenie(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService) +public class AzureGenie(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService) : SKAiAgent(context, memory, kernel, typeRegistry), IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs b/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs index 2f4c373385..3dc8dd35ad 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs +++ b/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs @@ -12,7 +12,7 @@ using Microsoft.SemanticKernel.Memory; namespace Microsoft.AI.DevTeam; -public class Hubber(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService) +public class Hubber(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService) : SKAiAgent(context, memory, kernel, typeRegistry), IHandle, IHandle, diff --git a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj index 419298430c..8296f7aa67 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj +++ b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj @@ -29,7 +29,7 @@ - + diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs index 2e767e7213..14c2688c23 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs @@ -9,13 +9,14 @@ public interface IAgentBase { // Properties AgentId AgentId { get; } - IAgentContext Context { get; } + IAgentRuntime Context { get; } // Methods Task CallHandler(CloudEvent item); Task HandleRequest(RpcRequest request); void ReceiveMessage(Message message); - Task Store(AgentState state); - Task Read(AgentId agentId) where T : IMessage, new(); - ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default); + Task StoreAsync(AgentState state, CancellationToken cancellationToken = default); + Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new(); + ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default); + ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs similarity index 50% rename from dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs index ab5972730f..2125e57a8b 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs @@ -1,20 +1,21 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentContext.cs +// IAgentRuntime.cs using System.Diagnostics; -using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Abstractions; -public interface IAgentContext +public interface IAgentRuntime { AgentId AgentId { get; } IAgentBase? AgentInstance { get; set; } - DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator. - ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger. - ValueTask Store(AgentState value, CancellationToken cancellationToken = default); - ValueTask Read(AgentId agentId, CancellationToken cancellationToken = default); + ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default); + ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default); ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default); + void Update(Activity? activity, RpcRequest request); + void Update(Activity? activity, CloudEvent cloudEvent); + (string?, string?) GetTraceIDandState(IDictionary metadata); + IDictionary ExtractMetadata(IDictionary metadata); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs similarity index 53% rename from dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs index d28e2a8c8d..0a6784b54f 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs @@ -1,11 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// IWorkerAgentGrain.cs +// IAgentState.cs -using Microsoft.AutoGen.Abstractions; +namespace Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; - -internal interface IWorkerAgentGrain : IGrainWithStringKey +public interface IAgentState { ValueTask ReadStateAsync(); ValueTask WriteStateAsync(AgentState state, string eTag); diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs new file mode 100644 index 0000000000..67a867d87d --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IAgentWorker.cs + +namespace Microsoft.AutoGen.Abstractions; + +public interface IAgentWorker +{ + ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default); + ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); + ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); + ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default); + ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs deleted file mode 100644 index c03259f722..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentWorkerRuntime.cs - -namespace Microsoft.AutoGen.Abstractions; - -public interface IAgentWorkerRuntime -{ - ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken); - ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken); - ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken); - ValueTask Store(AgentState value, CancellationToken cancellationToken); - ValueTask Read(AgentId agentId, CancellationToken cancellationToken); -} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs new file mode 100644 index 0000000000..3ac582f6d4 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IConnection.cs + +namespace Microsoft.AutoGen.Abstractions; +public interface IConnection +{ +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs new file mode 100644 index 0000000000..79b7b63e72 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IGateway.cs + +namespace Microsoft.AutoGen.Abstractions; + +public interface IGateway : IGrainObserver +{ + ValueTask InvokeRequest(RpcRequest request); + ValueTask BroadcastEvent(CloudEvent evt); + ValueTask StoreAsync(AgentState value); + ValueTask ReadAsync(AgentId agentId); + Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent); +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj index 52f933e195..e24b52187c 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj @@ -20,7 +20,8 @@ + + - diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 3932e007b3..6fffdaadf1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -20,21 +20,24 @@ public abstract class AgentBase : IAgentBase, IHandle private readonly Dictionary> _pendingRequests = []; private readonly Channel _mailbox = Channel.CreateUnbounded(); - private readonly IAgentContext _context; + private readonly IAgentRuntime _context; public string Route { get; set; } = "base"; - protected internal ILogger Logger => _context.Logger; - public IAgentContext Context => _context; + protected internal ILogger _logger; + public IAgentRuntime Context => _context; protected readonly EventTypes EventTypes; - protected AgentBase(IAgentContext context, EventTypes eventTypes) + protected AgentBase( + IAgentRuntime context, + EventTypes eventTypes, + ILogger? logger = null) { _context = context; context.AgentInstance = this; this.EventTypes = eventTypes; + _logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger(); Completion = Start(); } - internal Task Completion { get; } internal Task Start() @@ -58,7 +61,6 @@ public abstract class AgentBase : IAgentBase, IHandle } } } - public void ReceiveMessage(Message message) => _mailbox.Writer.TryWrite(message); private async Task RunMessagePump() @@ -71,7 +73,7 @@ public abstract class AgentBase : IAgentBase, IHandle switch (message) { case Message msg: - await HandleRpcMessage(msg).ConfigureAwait(false); + await HandleRpcMessage(msg, new CancellationToken()).ConfigureAwait(false); break; default: throw new InvalidOperationException($"Unexpected message '{message}'."); @@ -79,12 +81,11 @@ public abstract class AgentBase : IAgentBase, IHandle } catch (Exception ex) { - _context.Logger.LogError(ex, "Error processing message."); + _logger.LogError(ex, "Error processing message."); } } } - - protected internal async Task HandleRpcMessage(Message msg) + protected internal async Task HandleRpcMessage(Message msg, CancellationToken cancellationToken = default) { switch (msg.MessageCase) { @@ -95,17 +96,17 @@ public abstract class AgentBase : IAgentBase, IHandle static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item), (this, msg.CloudEvent), activity, - msg.CloudEvent.Type).ConfigureAwait(false); + msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false); } break; case Message.MessageOneofCase.Request: { var activity = this.ExtractActivity(msg.Request.Method, msg.Request.Metadata); await this.InvokeWithActivityAsync( - static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCore(state.Request), + static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCoreAsync(state.Request), (this, msg.Request), activity, - msg.Request.Method).ConfigureAwait(false); + msg.Request.Method, cancellationToken).ConfigureAwait(false); } break; case Message.MessageOneofCase.Response: @@ -113,14 +114,14 @@ public abstract class AgentBase : IAgentBase, IHandle break; } } - public async Task Store(AgentState state) + public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default) { - await _context.Store(state).ConfigureAwait(false); + await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false); return; } - public async Task Read(AgentId agentId) where T : IMessage, new() + public async Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new() { - var agentstate = await _context.Read(agentId).ConfigureAwait(false); + var agentstate = await _context.ReadAsync(agentId, cancellationToken).ConfigureAwait(false); return agentstate.FromAgentState(); } private void OnResponseCore(RpcResponse response) @@ -137,7 +138,7 @@ public abstract class AgentBase : IAgentBase, IHandle completion.SetResult(response); } - private async Task OnRequestCore(RpcRequest request) + private async Task OnRequestCoreAsync(RpcRequest request, CancellationToken cancellationToken = default) { RpcResponse response; @@ -149,8 +150,7 @@ public abstract class AgentBase : IAgentBase, IHandle { response = new RpcResponse { Error = ex.Message }; } - - await _context.SendResponseAsync(request, response).ConfigureAwait(false); + await _context.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false); } protected async Task RequestAsync(AgentId target, string method, Dictionary parameters) @@ -174,7 +174,7 @@ public abstract class AgentBase : IAgentBase, IHandle activity?.SetTag("peer.service", target.ToString()); var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Context.DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + _context.Update(activity, request); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource) state) => { @@ -204,13 +204,13 @@ public abstract class AgentBase : IAgentBase, IHandle await PublishEventAsync(evt, token).ConfigureAwait(false); } - public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default) + public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default) { var activity = s_source.StartActivity($"PublishEventAsync '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default); activity?.SetTag("peer.service", $"{item.Type}/{item.Source}"); // TODO: fix activity - Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + _context.Update(activity, item); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, CloudEvent Event) state) => { @@ -218,7 +218,7 @@ public abstract class AgentBase : IAgentBase, IHandle }, (this, item), activity, - item.Type).ConfigureAwait(false); + item.Type, cancellationToken).ConfigureAwait(false); } public Task CallHandler(CloudEvent item) @@ -251,7 +251,7 @@ public abstract class AgentBase : IAgentBase, IHandle } catch (Exception ex) { - Logger.LogError(ex, $"Error invoking method {nameof(IHandle.Handle)}"); + _logger.LogError(ex, $"Error invoking method {nameof(IHandle.Handle)}"); throw; // TODO: ? } } @@ -262,6 +262,7 @@ public abstract class AgentBase : IAgentBase, IHandle public Task HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" }); + //TODO: should this be async and cancellable? public virtual Task HandleObject(object item) { // get all Handle methods @@ -279,4 +280,8 @@ public abstract class AgentBase : IAgentBase, IHandle // otherwise, complain throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}"); } + public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default) + { + await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs index ef62a2b1d4..ce1318a0d3 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs @@ -9,17 +9,8 @@ public static class AgentBaseExtensions { public static Activity? ExtractActivity(this AgentBase agent, string activityName, IDictionary metadata) { - Activity? activity = null; - agent.Context.DistributedContextPropagator.ExtractTraceIdAndState(metadata, - static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => - { - var metadata = (IDictionary)carrier!; - fieldValues = null; - metadata.TryGetValue(fieldName, out fieldValue); - }, - out var traceParent, - out var traceState); - + Activity? activity; + (var traceParent, var traceState) = agent.Context.GetTraceIDandState(metadata); if (!string.IsNullOrEmpty(traceParent)) { if (ActivityContext.TryParse(traceParent, traceState, isRemote: true, out ActivityContext parentContext)) @@ -40,12 +31,7 @@ public static class AgentBaseExtensions activity.TraceStateString = traceState; } - var baggage = agent.Context.DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => - { - var metadata = (IDictionary)carrier!; - fieldValues = null; - metadata.TryGetValue(fieldName, out fieldValue); - }); + var baggage = agent.Context.ExtractMetadata(metadata); if (baggage is not null) { @@ -63,7 +49,7 @@ public static class AgentBaseExtensions return activity; } - public static async Task InvokeWithActivityAsync(this AgentBase agent, Func func, TState state, Activity? activity, string methodName) + public static async Task InvokeWithActivityAsync(this AgentBase agent, Func func, TState state, Activity? activity, string methodName, CancellationToken cancellationToken = default) { if (activity is not null && activity.StartTimeUtc == default) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs deleted file mode 100644 index 7de1e6565d..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentContext.cs - -using System.Diagnostics; -using Microsoft.AutoGen.Abstractions; -using Microsoft.Extensions.Logging; - -namespace Microsoft.AutoGen.Agents; - -internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime, ILogger logger, DistributedContextPropagator distributedContextPropagator) : IAgentContext -{ - private readonly IAgentWorkerRuntime _runtime = runtime; - - public AgentId AgentId { get; } = agentId; - public ILogger Logger { get; } = logger; - public IAgentBase? AgentInstance { get; set; } - public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken) - { - response.RequestId = request.RequestId; - await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false); - } - public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) - { - await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false); - } - public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken) - { - await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false); - } - public async ValueTask Store(AgentState value, CancellationToken cancellationToken) - { - await _runtime.Store(value, cancellationToken).ConfigureAwait(false); - } - public ValueTask Read(AgentId agentId, CancellationToken cancellationToken) - { - return _runtime.Read(agentId, cancellationToken); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs new file mode 100644 index 0000000000..86944cad3a --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentRuntime.cs + +using System.Diagnostics; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; + +internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger logger, DistributedContextPropagator distributedContextPropagator) : IAgentRuntime +{ + private readonly IAgentWorker worker = worker; + + public AgentId AgentId { get; } = agentId; + public ILogger Logger { get; } = logger; + public IAgentBase? AgentInstance { get; set; } + private DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; + public (string?, string?) GetTraceIDandState(IDictionary metadata) + { + DistributedContextPropagator.ExtractTraceIdAndState(metadata, + static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => + { + var metadata = (IDictionary)carrier!; + fieldValues = null; + metadata.TryGetValue(fieldName, out fieldValue); + }, + out var traceParent, + out var traceState); + return (traceParent, traceState); + } + public void Update(Activity? activity, RpcRequest request) + { + DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + } + public void Update(Activity? activity, CloudEvent cloudEvent) + { + DistributedContextPropagator.Inject(activity, cloudEvent.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + } + public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default) + { + response.RequestId = request.RequestId; + await worker.SendResponseAsync(response, cancellationToken).ConfigureAwait(false); + } + public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) + { + await worker.SendRequestAsync(agent, request, cancellationToken).ConfigureAwait(false); + } + public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) + { + await worker.PublishEventAsync(@event, cancellationToken).ConfigureAwait(false); + } + public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) + { + await worker.StoreAsync(value, cancellationToken).ConfigureAwait(false); + } + public ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) + { + return worker.ReadAsync(agentId, cancellationToken); + } + + public IDictionary ExtractMetadata(IDictionary metadata) + { + var baggage = DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => + { + var metadata = (IDictionary)carrier!; + fieldValues = null; + metadata.TryGetValue(fieldName, out fieldValue); + }); + + return baggage as IDictionary ?? new Dictionary(); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs deleted file mode 100644 index a820656090..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentWorker.cs - -using System.Diagnostics; -using Google.Protobuf; -using Microsoft.AutoGen.Abstractions; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Microsoft.AutoGen.Agents; -public sealed class AgentWorker(IAgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator, - [FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger logger) - : AgentBase(new AgentContext(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes) -{ - public async ValueTask PublishEventAsync(CloudEvent evt) => await base.PublishEventAsync(evt); - - public async ValueTask PublishEventAsync(string topic, IMessage evt) - { - await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs index 1568d97696..a0383a3c21 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs @@ -4,12 +4,12 @@ using Google.Protobuf; using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.AI; -namespace Microsoft.AutoGen.Agents.Client; +namespace Microsoft.AutoGen.Agents; public abstract class InferenceAgent : AgentBase where T : IMessage, new() { protected IChatClient ChatClient { get; } public InferenceAgent( - IAgentContext context, + IAgentRuntime context, EventTypes typeRegistry, IChatClient client ) : base(context, typeRegistry) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs index ceeeadacc5..db0d8241fe 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs @@ -15,7 +15,7 @@ public abstract class SKAiAgent : AgentBase where T : class, new() protected Kernel _kernel; private readonly ISemanticTextMemory _memory; - public SKAiAgent(IAgentContext context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry) + public SKAiAgent(IAgentRuntime context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry) { _state = new(); _memory = memory; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs index d7c32c2479..3d7de5c12d 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs @@ -13,7 +13,7 @@ public abstract class ConsoleAgent : IOAgent, { // instead of the primary constructor above, make a constructr here that still calls the base constructor - public ConsoleAgent(IAgentContext context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry) + public ConsoleAgent(IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry) { _route = "console"; } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs index 217c2e56e7..65aecee148 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs @@ -9,7 +9,7 @@ namespace Microsoft.AutoGen.Agents; [TopicSubscription("FileIO")] public abstract class FileAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, string inputPath = "input.txt", string outputPath = "output.txt" @@ -24,7 +24,7 @@ public abstract class FileAgent( if (!File.Exists(inputPath)) { var errorMessage = $"File not found: {inputPath}"; - Logger.LogError(errorMessage); + _logger.LogError(errorMessage); //publish IOError event var err = new IOError { @@ -42,6 +42,7 @@ public abstract class FileAgent( var evt = new InputProcessed { Route = _route + }; await PublishMessageAsync(evt); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs index 6d470b345a..fdd6852405 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs @@ -8,7 +8,7 @@ namespace Microsoft.AutoGen.Agents; public abstract class IOAgent : AgentBase { public string _route = "base"; - protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) + protected IOAgent(IAgentRuntime context, EventTypes eventTypes) : base(context, eventTypes) { } public virtual async Task Handle(Input item) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs index 69b8d177be..3a594a3bf7 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs @@ -17,7 +17,7 @@ public abstract class WebAPIAgent : IOAgent, private readonly string _url = "/agents/webio"; public WebAPIAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger, string url = "/agents/webio") : base( diff --git a/dotnet/src/Microsoft.AutoGen/Agents/App.cs b/dotnet/src/Microsoft.AutoGen/Agents/App.cs index 68c57e8d6d..fc36d33677 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/App.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/App.cs @@ -1,10 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // App.cs - using System.Diagnostics.CodeAnalysis; using Google.Protobuf; using Microsoft.AspNetCore.Builder; -using Microsoft.AutoGen.Runtime; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -21,15 +19,15 @@ public static class AgentsApp if (local) { // start the server runtime - builder.AddLocalAgentService(); + builder.AddLocalAgentService(useGrpc: false); } - builder.AddAgentWorker() + builder.AddAgentWorker(local: local) .AddAgents(agentTypes); builder.AddServiceDefaults(); var app = builder.Build(); if (local) { - app.MapAgentService(); + app.MapAgentService(local: true, useGrpc: false); } app.MapDefaultEndpoints(); Host = app; @@ -47,8 +45,8 @@ public static class AgentsApp { await StartAsync(builder, agents, local); } - var client = Host.Services.GetRequiredService() ?? throw new InvalidOperationException("Host not started"); - await client.PublishEventAsync(topic, message).ConfigureAwait(false); + var client = Host.Services.GetRequiredService() ?? throw new InvalidOperationException("Host not started"); + await client.PublishEventAsync(topic, message, new CancellationToken()).ConfigureAwait(true); return Host; } public static async ValueTask ShutdownAsync() diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Client.cs b/dotnet/src/Microsoft.AutoGen/Agents/Client.cs new file mode 100644 index 0000000000..313685d793 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Client.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Client.cs + +using System.Diagnostics; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; +public sealed class Client(IAgentWorker runtime, DistributedContextPropagator distributedContextPropagator, + [FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger logger) + : AgentBase(new AgentRuntime(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes) +{ +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj index 8e2c457766..3bc2b3acb0 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj +++ b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj @@ -4,22 +4,44 @@ net8.0 enable enable - AutoGen.Worker.Client + Microsoft.AutoGen.Agents https://github.com/microsoft/autogen Microsoft - AutoGen Worker Client Library + Micrososft AutoGen Agents SDK ai-agents;event-driven-agents - + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + + true + true + diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs new file mode 100644 index 0000000000..4900514903 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs @@ -0,0 +1,181 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentWorker.cs + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading.Channels; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; + +public class AgentWorker : + IHostedService, + IAgentWorker +{ + private readonly ConcurrentDictionary _agentTypes = new(); + private readonly ConcurrentDictionary<(string Type, string Key), IAgentBase> _agents = new(); + private readonly ILogger _logger; + private readonly Channel _mailbox = Channel.CreateUnbounded(); + private readonly ConcurrentDictionary _agentStates = new(); + private readonly ConcurrentDictionary _pendingClientRequests = new(); + private readonly CancellationTokenSource _shutdownCts; + private readonly IServiceProvider _serviceProvider; + private readonly IEnumerable> _configuredAgentTypes; + private readonly DistributedContextPropagator _distributedContextPropagator; + private readonly CancellationTokenSource _shutdownCancellationToken = new(); + private Task? _mailboxTask; + private readonly object _channelLock = new(); + + public AgentWorker( + IHostApplicationLifetime hostApplicationLifetime, + IServiceProvider serviceProvider, + [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, + ILogger logger, + DistributedContextPropagator distributedContextPropagator) + { + _logger = logger; + _serviceProvider = serviceProvider; + _configuredAgentTypes = configuredAgentTypes; + _distributedContextPropagator = distributedContextPropagator; + _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); + } + // this is the in-memory version - we just pass the message directly to the agent(s) that handle this type of event + public async ValueTask PublishEventAsync(CloudEvent cloudEvent, CancellationToken cancellationToken = default) + { + foreach (var (typeName, _) in _agentTypes) + { + if (typeName == "Client") { continue; } + var agent = GetOrActivateAgent(new AgentId(typeName, cloudEvent.Source)); + agent.ReceiveMessage(new Message { CloudEvent = cloudEvent }); + } + } + public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) + { + var requestId = Guid.NewGuid().ToString(); + _pendingClientRequests[requestId] = (agent, request.RequestId); + request.RequestId = requestId; + await _mailbox.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false); + } + public ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) + { + return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken); + } + public ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) + { + var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); + // add or update _agentStates with the new state + var response = _agentStates.AddOrUpdate(agentId.ToString(), value, (key, oldValue) => value); + return ValueTask.CompletedTask; + } + public ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) + { + _agentStates.TryGetValue(agentId.ToString(), out var state); + if (state is not null && state.AgentId is not null) + { + return new ValueTask(state); + } + else + { + throw new KeyNotFoundException($"Failed to read AgentState for {agentId}."); + } + } + public async Task RunMessagePump() + { + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + await foreach (var message in _mailbox.Reader.ReadAllAsync()) + { + try + { + if (message == null) { continue; } + switch (message) + { + case Message msg: + + var item = msg.CloudEvent; + + foreach (var (typeName, _) in _agentTypes) + { + var agentToInvoke = GetOrActivateAgent(new AgentId(typeName, item.Source)); + agentToInvoke.ReceiveMessage(msg); + } + break; + default: + throw new InvalidOperationException($"Unexpected message '{message}'."); + } + } + catch (OperationCanceledException) + { + } + finally + { + _shutdownCancellationToken.Cancel(); + } + } + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + StartCore(); + + foreach (var (typeName, type) in _configuredAgentTypes) + { + _agentTypes.TryAdd(typeName, type); + } + void StartCore() + { + var didSuppress = false; + if (!ExecutionContext.IsFlowSuppressed()) + { + didSuppress = true; + ExecutionContext.SuppressFlow(); + } + + try + { + _mailboxTask = Task.Run(RunMessagePump, CancellationToken.None); + } + finally + { + if (didSuppress) + { + ExecutionContext.RestoreFlow(); + } + } + } + } + public async Task StopAsync(CancellationToken cancellationToken) + { + _shutdownCts.Cancel(); + + _mailbox.Writer.TryComplete(); + + if (_mailboxTask is { } readTask) + { + await readTask.ConfigureAwait(false); + } + lock (_channelLock) + { + } + } + private IAgentBase GetOrActivateAgent(AgentId agentId) + { + if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent)) + { + if (_agentTypes.TryGetValue(agentId.Type, out var agentType)) + { + var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); + agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context); + _agents.TryAdd((agentId.Type, agentId.Key), agent); + } + else + { + throw new InvalidOperationException($"Agent type '{agentId.Type}' is unknown."); + } + } + + return agent; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs similarity index 64% rename from dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs index db582e051a..8215f20327 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs @@ -9,11 +9,11 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class AgentWorkerHostingExtensions { - public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false) + public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false, bool useGrpc = true) { if (local) { @@ -26,24 +26,32 @@ public static class AgentWorkerHostingExtensions listenOptions.UseHttps(); }); }); + builder.AddOrleans(local); } - builder.Services.AddGrpc(); - builder.AddOrleans(local); + else + { + builder.AddOrleans(); + } + builder.Services.TryAddSingleton(DistributedContextPropagator.Current); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(sp => sp.GetRequiredService()); + + if (useGrpc) + { + builder.Services.AddGrpc(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); + } return builder; } - public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder) + public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder, bool useGrpc = true) { - builder.AddAgentService(local: true); - return builder; + return builder.AddAgentService(local: true, useGrpc); } - public static WebApplication MapAgentService(this WebApplication app) + public static WebApplication MapAgentService(this WebApplication app, bool local = false, bool useGrpc = true) { - app.MapGrpcService(); + if (useGrpc) { app.MapGrpcService(); } return app; } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs similarity index 76% rename from dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs index b0550c1fb7..431a5629c1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs @@ -1,5 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// GrpcAgentWorkerRuntime.cs +// GrpcAgentWorker.cs using System.Collections.Concurrent; using System.Diagnostics; @@ -13,7 +13,15 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Agents; -public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgentWorkerRuntime +public sealed class GrpcAgentWorker( + AgentRpc.AgentRpcClient client, + IHostApplicationLifetime hostApplicationLifetime, + IServiceProvider serviceProvider, + [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, + ILogger logger, + DistributedContextPropagator distributedContextPropagator) : + AgentWorker(hostApplicationLifetime, + serviceProvider, configuredAgentTypes, logger, distributedContextPropagator), IHostedService, IDisposable, IAgentWorker { private readonly object _channelLock = new(); private readonly ConcurrentDictionary _agentTypes = new(); @@ -26,38 +34,21 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); - private readonly AgentRpc.AgentRpcClient _client; - private readonly IServiceProvider _serviceProvider; - private readonly IEnumerable> _configuredAgentTypes; - private readonly ILogger _logger; - private readonly DistributedContextPropagator _distributedContextPropagator; - private readonly CancellationTokenSource _shutdownCts; + private readonly AgentRpc.AgentRpcClient _client = client; + private readonly IServiceProvider _serviceProvider = serviceProvider; + private readonly IEnumerable> _configuredAgentTypes = configuredAgentTypes; + private readonly ILogger _logger = logger; + private readonly DistributedContextPropagator _distributedContextPropagator = distributedContextPropagator; + private readonly CancellationTokenSource _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); private AsyncDuplexStreamingCall? _channel; private Task? _readTask; private Task? _writeTask; - public GrpcAgentWorkerRuntime( - AgentRpc.AgentRpcClient client, - IHostApplicationLifetime hostApplicationLifetime, - IServiceProvider serviceProvider, - [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, - ILogger logger, - DistributedContextPropagator distributedContextPropagator) - { - _client = client; - _serviceProvider = serviceProvider; - _configuredAgentTypes = configuredAgentTypes; - _logger = logger; - _distributedContextPropagator = distributedContextPropagator; - _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); - } - public void Dispose() { _outboundMessagesChannel.Writer.TryComplete(); _channel?.Dispose(); } - private async Task RunReadPump() { var channel = GetChannel(); @@ -131,7 +122,6 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent } } } - private async Task RunWritePump() { var channel = GetChannel(); @@ -183,14 +173,13 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent item.WriteCompletionSource.TrySetCanceled(); } } - private IAgentBase GetOrActivateAgent(AgentId agentId) { if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent)) { if (_agentTypes.TryGetValue(agentId.Type, out var agentType)) { - var context = new AgentContext(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); + var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context); _agents.TryAdd((agentId.Type, agentId.Key), agent); } @@ -203,7 +192,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent return agent; } - private async ValueTask RegisterAgentType(string type, Type agentType) + private async ValueTask RegisterAgentTypeAsync(string type, Type agentType, CancellationToken cancellationToken = default) { if (_agentTypes.TryAdd(type, agentType)) { @@ -223,55 +212,33 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent //StateType = state?.Name, //Events = { events } } - }, - _shutdownCts.Token).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); } } - public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) { - _logger.LogInformation("Sending response '{Response}'.", response); await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false); } - - public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) { - _logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request); var requestId = Guid.NewGuid().ToString(); _pendingRequests[requestId] = (agent, request.RequestId); request.RequestId = requestId; - try - { - await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - if (_pendingRequests.TryRemove(requestId, out _)) - { - agent.ReceiveMessage(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = exception.Message } }); - } - } + await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); } - - public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) { - try - { - await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - _logger.LogWarning(exception, "Failed to publish event '{Event}'.", @event); - } + await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); } - - private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken) + private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default) { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcs = new TaskCompletionSource(); await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false); - await tcs.Task.WaitAsync(cancellationToken); } - private AsyncDuplexStreamingCall GetChannel() { if (_channel is { } channel) @@ -307,7 +274,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent return _channel; } - public async Task StartAsync(CancellationToken cancellationToken) + public new async Task StartAsync(CancellationToken cancellationToken) { _channel = GetChannel(); StartCore(); @@ -315,10 +282,10 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent var tasks = new List(_agentTypes.Count); foreach (var (typeName, type) in _configuredAgentTypes) { - tasks.Add(RegisterAgentType(typeName, type).AsTask()); + tasks.Add(RegisterAgentTypeAsync(typeName, type, cancellationToken).AsTask()); } - await Task.WhenAll(tasks).ConfigureAwait(false); + await Task.WhenAll(tasks).ConfigureAwait(true); void StartCore() { @@ -344,7 +311,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent } } - public async Task StopAsync(CancellationToken cancellationToken) + public new async Task StopAsync(CancellationToken cancellationToken) { _shutdownCts.Cancel(); @@ -364,19 +331,20 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent _channel?.Dispose(); } } - public ValueTask Store(AgentState value, CancellationToken cancellationToken) + // new intentional + public new async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) { var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); - var response = _client.SaveState(value, cancellationToken: cancellationToken); + var response = _client.SaveState(value, null, null, cancellationToken); if (!response.Success) { throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}."); } - return ValueTask.CompletedTask; } - public async ValueTask Read(AgentId agentId, CancellationToken cancellationToken) + // new intentional + public new async ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) { - var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken); + var response = await _client.GetStateAsync(agentId).ConfigureAwait(true); // if (response.Success && response.AgentState.AgentId is not null) - why is success always false? if (response.AgentState.AgentId is not null) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs new file mode 100644 index 0000000000..670411b336 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GrpcAgentWorkerHostBuilderExtension.cs + +using Grpc.Core; +using Grpc.Net.Client.Configuration; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.AutoGen.Agents; + +public static class GrpcAgentWorkerHostBuilderExtensions +{ + private const string _defaultAgentServiceAddress = "https://localhost:5001"; + public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress) + { + builder.Services.AddGrpcClient(options => + { + options.Address = new Uri(agentServiceAddress); + options.ChannelOptionsActions.Add(channelOptions => + { + + channelOptions.HttpHandler = new SocketsHttpHandler + { + EnableMultipleHttp2Connections = true, + KeepAlivePingDelay = TimeSpan.FromSeconds(20), + KeepAlivePingTimeout = TimeSpan.FromSeconds(10), + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests + }; + + var methodConfig = new MethodConfig + { + Names = { MethodName.Default }, + RetryPolicy = new RetryPolicy + { + MaxAttempts = 5, + InitialBackoff = TimeSpan.FromSeconds(1), + MaxBackoff = TimeSpan.FromSeconds(5), + BackoffMultiplier = 1.5, + RetryableStatusCodes = { StatusCode.Unavailable } + } + }; + + channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } }; + channelOptions.ThrowOperationCanceledOnCancellation = true; + }); + }); + builder.Services.AddSingleton(); + return builder; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs similarity index 56% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index cca25d36a8..89e9c55c46 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -1,5 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerGateway.cs +// GrpcGateway.cs using System.Collections.Concurrent; using Grpc.Core; @@ -7,91 +7,60 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerGateway : BackgroundService, IWorkerGateway +public sealed class GrpcGateway : BackgroundService, IGateway { private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); - - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IClusterClient _clusterClient; - private readonly IAgentWorkerRegistryGrain _gatewayRegistry; - private readonly IWorkerGateway _reference; - - // The local mapping of agents to worker processes. - private readonly ConcurrentDictionary _workers = new(); - + private readonly ConcurrentDictionary _agentState = new(); + private readonly IRegistryGrain _gatewayRegistry; + private readonly IGateway _reference; // The agents supported by each worker process. - private readonly ConcurrentDictionary> _supportedAgentTypes = []; + private readonly ConcurrentDictionary> _supportedAgentTypes = []; + public readonly ConcurrentDictionary _workers = new(); // The mapping from agent id to worker process. - private readonly ConcurrentDictionary<(string Type, string Key), WorkerProcessConnection> _agentDirectory = new(); - + private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); // RPC - private readonly ConcurrentDictionary<(WorkerProcessConnection, string), TaskCompletionSource> _pendingRequests = new(); + private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource> _pendingRequests = new(); + // InMemory Message Queue - public WorkerGateway(IClusterClient clusterClient, ILogger logger) + public GrpcGateway(IClusterClient clusterClient, ILogger logger) { _logger = logger; _clusterClient = clusterClient; - _reference = clusterClient.CreateObjectReference(this); - _gatewayRegistry = clusterClient.GetGrain(0); + _reference = clusterClient.CreateObjectReference(this); + _gatewayRegistry = clusterClient.GetGrain(0); } - public async ValueTask BroadcastEvent(CloudEvent evt) { // TODO: filter the workers that receive the event var tasks = new List(_workers.Count); - foreach (var (_, connection) in _workers) + foreach (var (_, connection) in _supportedAgentTypes) { - tasks.Add(connection.SendMessage(new Message { CloudEvent = evt })); + + tasks.Add(this.SendMessageAsync((IConnection)connection[0], evt, default)); } - - await Task.WhenAll(tasks); + await Task.WhenAll(tasks).ConfigureAwait(false); } - - public async ValueTask InvokeRequest(RpcRequest request) + //intetionally not static so can be called by some methods implemented in base class + public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) { - (string Type, string Key) agentId = (request.Target.Type, request.Target.Key); - if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) - { - // Activate the agent on a compatible worker process. - if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers)) - { - connection = workers[Random.Shared.Next(workers.Count)]; - _agentDirectory[agentId] = connection; - } - else - { - return new(new RpcResponse { Error = "Agent not found." }); - } - } - - // Proxy the request to the agent. - var originalRequestId = request.RequestId; - var newRequestId = Guid.NewGuid().ToString(); - var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); - request.RequestId = newRequestId; - await connection.ResponseStream.WriteAsync(new Message { Request = request }); - - // Wait for the response and send it back to the caller. - var response = await completion.Task.WaitAsync(s_agentResponseTimeout); - response.RequestId = originalRequestId; - return response; + var queue = (GrpcWorkerConnection)connection; + await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); } - - private void DispatchResponse(WorkerProcessConnection connection, RpcResponse response) + private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response) { if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion)) { _logger.LogWarning("Received response for unknown request."); return; } - // Complete the request. completion.SetResult(response); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) @@ -104,10 +73,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway { _logger.LogWarning(exception, "Error adding worker to registry."); } - await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken); } - try { await _gatewayRegistry.RemoveWorker(_reference); @@ -117,8 +84,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway _logger.LogWarning(exception, "Error removing worker from registry."); } } - - internal async Task OnReceivedMessageAsync(WorkerProcessConnection connection, Message message) + //new is intentional... + internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message) { _logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection); switch (message.MessageCase) @@ -139,63 +106,28 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway throw new InvalidOperationException($"Unknown message type for message '{message}'."); }; } - - private async ValueTask RegisterAgentTypeAsync(WorkerProcessConnection connection, RegisterAgentTypeRequest msg) + private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) { connection.AddSupportedType(msg.Type); _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - var success = false; - var error = String.Empty; - try - { - await _gatewayRegistry.RegisterAgentType(msg.Type, _reference); - success = true; - } - catch (InvalidOperationException exception) - { - error = $"Error registering agent type '{msg.Type}'."; - _logger.LogWarning(exception, error); - } - var request_id = msg.RequestId; - var response = new RegisterAgentTypeResponse { RequestId = request_id, Success = success, Error = error }; - await connection.SendMessage(new Message { RegisterAgentTypeResponse = response }); + await _gatewayRegistry.RegisterAgentType(msg.Type, _reference); } - private async ValueTask DispatchEventAsync(CloudEvent evt) { - await BroadcastEvent(evt); + await BroadcastEvent(evt).ConfigureAwait(false); /* var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(evt.Namespace, evt.Type)); await topic.OnNextAsync(evt.ToEvent()); */ } - - private async ValueTask DispatchRequestAsync(WorkerProcessConnection connection, RpcRequest request) + private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request) { var requestId = request.RequestId; if (request.Target is null) { throw new InvalidOperationException($"Request message is missing a target. Message: '{request}'."); } - - /* - if (string.Equals("runtime", request.Target.Type, StringComparison.Ordinal)) - { - if (string.Equals("subscribe", request.Method)) - { - await InvokeRequestDelegate(connection, request, async (_) => - { - await SubscribeToTopic(connection, request); - return new RpcResponse { Result = "Ok" }; - }); - - return; - } - } - else - { - */ await InvokeRequestDelegate(connection, request, async request => { var (gateway, isPlacement) = await _gatewayRegistry.GetOrPlaceAgent(request.Target); @@ -203,84 +135,51 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway { return new RpcResponse { Error = "Agent not found and no compatible gateways were found." }; } - if (isPlacement) { - // Activate the worker: load state - // TODO + // TODO// Activate the worker: load state } - // Forward the message to the gateway and return the result. - return await gateway.InvokeRequest(request); + return await gateway.InvokeRequest(request).ConfigureAwait(true); }); //} } - private static async Task InvokeRequestDelegate(WorkerProcessConnection connection, RpcRequest request, Func> func) + private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func> func) { try { var response = await func(request); response.RequestId = request.RequestId; - await connection.ResponseStream.WriteAsync(new Message { Response = response }); + await connection.ResponseStream.WriteAsync(new Message { Response = response }).ConfigureAwait(false); } catch (Exception ex) { - await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }); + await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false); } } - public async ValueTask Store(AgentState value) - { - var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); - var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); - await agentState.WriteStateAsync(value, value.ETag); - } - - public async ValueTask Read(AgentId agentId) - { - var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); - return await agentState.ReadStateAsync(); - } - /* - private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request) - { - // Subscribe to a topic - var parameters = JsonSerializer.Deserialize>(request.Data) - ?? throw new ArgumentException($"Request data does not contain required payload format: {{\"namespace\": \"string\", \"type\": \"string\"}}."); - var ns = parameters["namespace"]; - var type = parameters["type"]; - var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(ns: type, key: ns)); - await topic.SubscribeAsync(OnNextAsync); - return; - - async Task OnNextAsync(IList> items) - { - foreach (var item in items) - { - var evt = item.Item.ToRpcEvent(); - evt.Namespace = ns; - evt.Type = evt.Type; - var payload = new Dictionary - { - ["sequenceId"] = item.Token.SequenceNumber.ToString(), - ["eventIdx"] = item.Token.EventIndex.ToString() - }; - evt.Data = JsonSerializer.Serialize(payload); - await connection.ResponseStream.WriteAsync(new Message { Event = evt }); - } - } - } - */ - internal Task ConnectToWorkerProcess(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _logger.LogInformation("Received new connection from {Peer}.", context.Peer); - var workerProcess = new WorkerProcessConnection(this, requestStream, responseStream, context); + var workerProcess = new GrpcWorkerConnection(this, requestStream, responseStream, context); _workers[workerProcess] = workerProcess; return workerProcess.Completion; } + public async ValueTask StoreAsync(AgentState value) + { + var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); + _agentState[agentId.Key] = value; + } - internal void OnRemoveWorkerProcess(WorkerProcessConnection workerProcess) + public async ValueTask ReadAsync(AgentId agentId) + { + if (_agentState.TryGetValue(agentId.Key, out var state)) + { + return state; + } + return new AgentState { AgentId = agentId }; + } + internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) { _workers.TryRemove(workerProcess, out _); var types = workerProcess.GetSupportedTypes(); @@ -291,14 +190,50 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway supported.Remove(workerProcess); } } - // Any agents activated on that worker are also gone. foreach (var pair in _agentDirectory) { if (pair.Value == workerProcess) { - ((IDictionary<(string Type, string Key), WorkerProcessConnection>)_agentDirectory).Remove(pair); + ((IDictionary<(string Type, string Key), GrpcWorkerConnection>)_agentDirectory).Remove(pair); } } } + public async ValueTask InvokeRequest(RpcRequest request, CancellationToken cancellationToken = default) + { + (string Type, string Key) agentId = (request.Target.Type, request.Target.Key); + if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) + { + // Activate the agent on a compatible worker process. + if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers)) + { + connection = workers[Random.Shared.Next(workers.Count)]; + _agentDirectory[agentId] = connection; + } + else + { + return new(new RpcResponse { Error = "Agent not found." }); + } + } + // Proxy the request to the agent. + var originalRequestId = request.RequestId; + var newRequestId = Guid.NewGuid().ToString(); + var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); + request.RequestId = newRequestId; + await connection.ResponseStream.WriteAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); + // Wait for the response and send it back to the caller. + var response = await completion.Task.WaitAsync(s_agentResponseTimeout); + response.RequestId = originalRequestId; + return response; + } + + async ValueTask IGateway.InvokeRequest(RpcRequest request) + { + return await this.InvokeRequest(request).ConfigureAwait(false); + } + + Task IGateway.SendMessageAsync(IConnection connection, CloudEvent cloudEvent) + { + return this.SendMessageAsync(connection, cloudEvent); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs similarity index 67% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs index b5c4527854..e26f5c2bc9 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs @@ -1,19 +1,24 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerGatewayService.cs +// GrpcGatewayService.cs using Grpc.Core; using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; // gRPC service which handles communication between the agent worker and the cluster. -internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc.AgentRpcBase +internal sealed class GrpcGatewayService : AgentRpc.AgentRpcBase { + private readonly GrpcGateway Gateway; + public GrpcGatewayService(GrpcGateway gateway) + { + Gateway = (GrpcGateway)gateway; + } public override async Task OpenChannel(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { try { - await agentWorker.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); + await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); } catch { @@ -26,13 +31,13 @@ internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc } public override async Task GetState(AgentId request, ServerCallContext context) { - var state = await agentWorker.Read(request); + var state = await Gateway.ReadAsync(request); return new GetStateResponse { AgentState = state }; } public override async Task SaveState(AgentState request, ServerCallContext context) { - await agentWorker.Store(request); + await Gateway.StoreAsync(request); return new SaveStateResponse { Success = true // TODO: Implement error handling diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs similarity index 90% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs index d998ce2051..f2eb81c436 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs @@ -1,13 +1,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerProcessConnection.cs +// GrpcWorkerConnection.cs using System.Threading.Channels; using Grpc.Core; using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerProcessConnection : IAsyncDisposable +internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection { private static long s_nextConnectionId; private readonly Task _readTask; @@ -15,10 +15,10 @@ internal sealed class WorkerProcessConnection : IAsyncDisposable private readonly string _connectionId = Interlocked.Increment(ref s_nextConnectionId).ToString(); private readonly object _lock = new(); private readonly HashSet _supportedTypes = []; - private readonly WorkerGateway _gateway; + private readonly GrpcGateway _gateway; private readonly CancellationTokenSource _shutdownCancellationToken = new(); - public WorkerProcessConnection(WorkerGateway agentWorker, IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + public GrpcWorkerConnection(GrpcGateway agentWorker, IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _gateway = agentWorker; RequestStream = requestStream; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Host.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs similarity index 72% rename from dotnet/src/Microsoft.AutoGen/Runtime/Host.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs index 73b53dd414..5b725af0c9 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Host.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs @@ -4,24 +4,24 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class Host { - public static async Task StartAsync(bool local = false) + public static async Task StartAsync(bool local = false, bool useGrpc = true) { var builder = WebApplication.CreateBuilder(); builder.AddServiceDefaults(); if (local) { - builder.AddLocalAgentService(); + builder.AddLocalAgentService(useGrpc); } else { - builder.AddAgentService(); + builder.AddAgentService(useGrpc); } var app = builder.Build(); - app.MapAgentService(); + app.MapAgentService(local, useGrpc); app.MapDefaultEndpoints(); await app.StartAsync().ConfigureAwait(false); return app; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs similarity index 82% rename from dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs index ea8870a511..8ded5cb03d 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs @@ -6,8 +6,6 @@ using System.Diagnostics.CodeAnalysis; using System.Reflection; using Google.Protobuf; using Google.Protobuf.Reflection; -using Grpc.Core; -using Grpc.Net.Client.Configuration; using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -20,41 +18,18 @@ public static class HostBuilderExtensions private const string _defaultAgentServiceAddress = "https://localhost:5001"; public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress, bool local = false) { - builder.Services.AddGrpcClient(options => - { - options.Address = new Uri(agentServiceAddress); - options.ChannelOptionsActions.Add(channelOptions => - { - - channelOptions.HttpHandler = new SocketsHttpHandler - { - EnableMultipleHttp2Connections = true, - KeepAlivePingDelay = TimeSpan.FromSeconds(20), - KeepAlivePingTimeout = TimeSpan.FromSeconds(10), - KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests - }; - - var methodConfig = new MethodConfig - { - Names = { MethodName.Default }, - RetryPolicy = new RetryPolicy - { - MaxAttempts = 5, - InitialBackoff = TimeSpan.FromSeconds(1), - MaxBackoff = TimeSpan.FromSeconds(5), - BackoffMultiplier = 1.5, - RetryableStatusCodes = { StatusCode.Unavailable } - } - }; - - channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } }; - channelOptions.ThrowOperationCanceledOnCancellation = true; - }); - }); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); - builder.Services.AddSingleton(); + + // if !local, then add the gRPC client + if (!local) + { + builder.AddGrpcAgentWorker(agentServiceAddress); + } + else + { + builder.Services.AddSingleton(); + } + builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); builder.Services.AddKeyedSingleton("EventTypes", (sp, key) => { var interfaceType = typeof(IMessage); @@ -122,6 +97,7 @@ public static class HostBuilderExtensions } return new EventTypes(typeRegistry, types, eventsMap); }); + builder.Services.AddSingleton(); return new AgentApplicationBuilder(builder); } @@ -159,7 +135,7 @@ public sealed class AgentTypes(Dictionary types) .SelectMany(assembly => assembly.GetTypes()) .Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(AgentBase)) && !type.IsAbstract - && !type.Name.Equals("AgentWorker")) + && !type.Name.Equals(nameof(Client))) .ToDictionary(type => type.Name, type => type); return new AgentTypes(agents); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs similarity index 82% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs index 7d752d1566..50d8c3ad45 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerAgentGrain.cs +// AgentStateGrain.cs using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IWorkerAgentGrain +internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IAgentState { public async ValueTask WriteStateAsync(AgentState newState, string eTag) { @@ -16,7 +16,7 @@ internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStor if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) { state.State = newState; - await state.WriteStateAsync(); + await state.WriteStateAsync().ConfigureAwait(false); } else { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs new file mode 100644 index 0000000000..87fd0aa38c --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IRegistryGrain.cs +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Agents; + +public interface IRegistryGrain : IGrainWithIntegerKey +{ + ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId); + ValueTask RemoveWorker(IGateway worker); + ValueTask RegisterAgentType(string type, IGateway worker); + ValueTask AddWorker(IGateway worker); + ValueTask UnregisterAgentType(string type, IGateway worker); + ValueTask GetCompatibleWorker(string type); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs similarity index 96% rename from dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs index fae441e0ce..cd59bcefc3 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs @@ -4,11 +4,12 @@ using System.Configuration; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Orleans.Configuration; using Orleans.Serialization; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class OrleansRuntimeHostingExtenions { @@ -83,6 +84,7 @@ public static class OrleansRuntimeHostingExtenions .AddMemoryGrainStorage("PubSubStore"); } }); + builder.Services.AddSingleton(); return builder; } } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs similarity index 78% rename from dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs index 0e7ffa7f28..c5114e3e74 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs @@ -1,16 +1,16 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// AgentWorkerRegistryGrain.cs +// RegistryGrain.cs using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain +public sealed class RegistryGrain : Grain, IRegistryGrain { // TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively. - private readonly Dictionary _workerStates = []; - private readonly Dictionary> _supportedAgentTypes = []; - private readonly Dictionary<(string Type, string Key), IWorkerGateway> _agentDirectory = []; + private readonly Dictionary _workerStates = new(); + private readonly Dictionary> _supportedAgentTypes = []; + private readonly Dictionary<(string Type, string Key), IGateway> _agentDirectory = []; private readonly TimeSpan _agentTimeout = TimeSpan.FromMinutes(1); public override Task OnActivateAsync(CancellationToken cancellationToken) @@ -18,107 +18,7 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); return base.OnActivateAsync(cancellationToken); } - - private Task PurgeInactiveWorkers() - { - foreach (var (worker, state) in _workerStates) - { - if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout) - { - _workerStates.Remove(worker); - foreach (var type in state.SupportedTypes) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - } - } - } - - return Task.CompletedTask; - } - - public ValueTask AddWorker(IWorkerGateway worker) - { - GetOrAddWorker(worker); - return ValueTask.CompletedTask; - } - - private WorkerState GetOrAddWorker(IWorkerGateway worker) - { - if (!_workerStates.TryGetValue(worker, out var workerState)) - { - workerState = _workerStates[worker] = new(); - } - - workerState.LastSeen = DateTimeOffset.UtcNow; - return workerState; - } - - public ValueTask RegisterAgentType(string type, IWorkerGateway worker) - { - if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes)) - { - supportedAgentTypes = _supportedAgentTypes[type] = []; - } - - if (!supportedAgentTypes.Contains(worker)) - { - supportedAgentTypes.Add(worker); - } - - var workerState = GetOrAddWorker(worker); - workerState.SupportedTypes.Add(type); - - return ValueTask.CompletedTask; - } - - public ValueTask RemoveWorker(IWorkerGateway worker) - { - if (_workerStates.Remove(worker, out var state)) - { - foreach (var type in state.SupportedTypes) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - } - } - - return ValueTask.CompletedTask; - } - - public ValueTask UnregisterAgentType(string type, IWorkerGateway worker) - { - if (_workerStates.TryGetValue(worker, out var state)) - { - state.SupportedTypes.Remove(type); - } - - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - - return ValueTask.CompletedTask; - } - - public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); - - private IWorkerGateway? GetCompatibleWorkerCore(string type) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - // Return a random compatible worker. - return workers[Random.Shared.Next(workers.Count)]; - } - - return null; - } - - public ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId) + public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId) { // TODO: bool isNewPlacement; @@ -142,9 +42,98 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain // Existing activation. isNewPlacement = false; } - return new((worker, isNewPlacement)); } + public ValueTask RemoveWorker(IGateway worker) + { + if (_workerStates.Remove(worker, out var state)) + { + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + return ValueTask.CompletedTask; + } + public ValueTask RegisterAgentType(string type, IGateway worker) + { + if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes)) + { + supportedAgentTypes = _supportedAgentTypes[type] = []; + } + + if (!supportedAgentTypes.Contains(worker)) + { + supportedAgentTypes.Add(worker); + } + var workerState = GetOrAddWorker(worker); + workerState.SupportedTypes.Add(type); + return ValueTask.CompletedTask; + } + public ValueTask AddWorker(IGateway worker) + { + GetOrAddWorker(worker); + return ValueTask.CompletedTask; + } + public ValueTask UnregisterAgentType(string type, IGateway worker) + { + if (_workerStates.TryGetValue(worker, out var state)) + { + state.SupportedTypes.Remove(type); + } + + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + return ValueTask.CompletedTask; + } + private Task PurgeInactiveWorkers() + { + foreach (var (worker, state) in _workerStates) + { + if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout) + { + _workerStates.Remove(worker); + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + } + + return Task.CompletedTask; + } + + private WorkerState GetOrAddWorker(IGateway worker) + { + if (!_workerStates.TryGetValue(worker, out var workerState)) + { + workerState = _workerStates[worker] = new(); + } + + workerState.LastSeen = DateTimeOffset.UtcNow; + return workerState; + } + + public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); + + private IGateway? GetCompatibleWorkerCore(string type) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + // Return a random compatible worker. + return workers[Random.Shared.Next(workers.Count)]; + } + + return null; + } private sealed class WorkerState { diff --git a/dotnet/src/Microsoft.AutoGen/ServiceDefaults/Extensions.cs b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Extensions.cs similarity index 100% rename from dotnet/src/Microsoft.AutoGen/ServiceDefaults/Extensions.cs rename to dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Extensions.cs diff --git a/dotnet/src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj similarity index 100% rename from dotnet/src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj rename to dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs deleted file mode 100644 index bda83f16f7..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentWorkerRegistryGrain.cs - -using Microsoft.AutoGen.Abstractions; - -namespace Microsoft.AutoGen.Runtime; - -public interface IAgentWorkerRegistryGrain : IGrainWithIntegerKey -{ - ValueTask RegisterAgentType(string type, IWorkerGateway worker); - ValueTask UnregisterAgentType(string type, IWorkerGateway worker); - ValueTask AddWorker(IWorkerGateway worker); - ValueTask RemoveWorker(IWorkerGateway worker); - ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs deleted file mode 100644 index 54ca077de1..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IWorkerGateway.cs - -using Microsoft.AutoGen.Abstractions; - -namespace Microsoft.AutoGen.Runtime; - -public interface IWorkerGateway : IGrainObserver -{ - ValueTask InvokeRequest(RpcRequest request); - ValueTask BroadcastEvent(CloudEvent evt); - ValueTask Store(AgentState value); - ValueTask Read(AgentId agentId); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj deleted file mode 100644 index 40a240c2f6..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj +++ /dev/null @@ -1,35 +0,0 @@ - - - - net8.0 - enable - enable - AutoGen.Worker.Server - https://github.com/microsoft/agnext - Microsoft - AutoGen Worker Server Library - ai-agents;event-driven-agents - $(NoWarn);CS8002 - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs similarity index 80% rename from dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs rename to dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs index 4e17dd56f5..b10f82e7d4 100644 --- a/dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs @@ -4,6 +4,7 @@ using FluentAssertions; using Google.Protobuf.Reflection; using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; using Moq; using Xunit; @@ -14,8 +15,8 @@ public class AgentBaseTests [Fact] public async Task ItInvokeRightHandlerTestAsync() { - var mockContext = new Mock(); - var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], [])); + var mockContext = new Mock(); + var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger(new LoggerFactory())); await agent.HandleObject("hello world"); await agent.HandleObject(42); @@ -30,7 +31,7 @@ public class AgentBaseTests /// public class TestAgent : AgentBase, IHandle, IHandle { - public TestAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) + public TestAgent(IAgentRuntime context, EventTypes eventTypes, Logger logger) : base(context, eventTypes, logger) { } diff --git a/dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj b/dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj similarity index 74% rename from dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj rename to dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj index ca18cf97c5..db7467bf12 100644 --- a/dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj @@ -8,7 +8,7 @@ - + diff --git a/protos/agent_events.proto b/protos/agent_events.proto index 19b41482e3..811c888c64 100644 --- a/protos/agent_events.proto +++ b/protos/agent_events.proto @@ -35,3 +35,6 @@ message ConversationClosed { string user_id = 1; string user_message = 2; } +message Shutdown { + string message = 1; +}