From 458d273fc43ac048a412394d80a17f4132bf6109 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Tue, 12 Nov 2024 11:04:59 -0800 Subject: [PATCH] Refactoring the services and implementing an in-memory runtime for .NET (#4005) closes #3950 closes #3702 What this is doing: I am refactoring the services on the .NET runtime and attempting to clarify the naming and organization. I added this doc to help capture the naming and concepts. AgentRuntime / Worker should work similar to the python version and enables running the whole agent system in one process. For remote the system uses the versions of the services in the grpc folder. lots of other bug fixes/threading cleanup - passing cancellation token throughout Services update clarifies the naming and roles: Worker: Hosts the Agents and is a client to the Gateway Gateway: -- RPC gateway for the other services APIs -- Provides an RPC bridge between the workers and the Event Bus Registry: keeps track of the agents in the system and which events they can handle AgentState: persistent state for agents --- README.md | 6 +- ...tocol.md => 03 - Agent Worker Protocol.md} | 0 docs/design/05 - Services.md | 26 ++ dotnet/AutoGen.sln | 35 +-- dotnet/Directory.Packages.props | 6 + dotnet/samples/Hello/Backend/Backend.csproj | 9 +- dotnet/samples/Hello/Backend/Program.cs | 4 +- dotnet/samples/Hello/Backend/appsettings.json | 9 + .../Hello/HelloAIAgents/HelloAIAgent.cs | 6 +- .../Hello/HelloAIAgents/HelloAIAgents.csproj | 1 - dotnet/samples/Hello/HelloAIAgents/Program.cs | 7 +- .../Hello/HelloAIAgents/appsettings.json | 9 + .../Hello/HelloAgent/HelloAgent.csproj | 3 +- dotnet/samples/Hello/HelloAgent/Program.cs | 9 +- dotnet/samples/Hello/HelloAgent/README.md | 13 +- .../samples/Hello/HelloAgent/appsettings.json | 9 + .../HelloAgentState/HelloAgentState.csproj | 1 - .../samples/Hello/HelloAgentState/Program.cs | 62 +++-- .../samples/Hello/HelloAgentState/README.md | 12 +- .../Hello/HelloAgentState/appsettings.json | 9 + .../DevTeam.AgentHost.csproj | 4 +- .../dev-team/DevTeam.AgentHost/Program.cs | 2 +- .../DevTeam.Agents/DevTeam.Agents.csproj | 4 +- .../DevTeam.Agents/Developer/Developer.cs | 2 +- .../DeveloperLead/DeveloperLead.cs | 2 +- .../ProductManager/ProductManager.cs | 2 +- .../DevTeam.Backend/Agents/AzureGenie.cs | 2 +- .../dev-team/DevTeam.Backend/Agents/Hubber.cs | 2 +- .../DevTeam.Backend/DevTeam.Backend.csproj | 2 +- .../Abstractions/IAgentBase.cs | 9 +- .../{IAgentContext.cs => IAgentRuntime.cs} | 15 +- .../IAgentState.cs} | 8 +- .../Abstractions/IAgentWorker.cs | 13 + .../Abstractions/IAgentWorkerRuntime.cs | 13 - .../Abstractions/IConnection.cs | 7 + .../Abstractions/IGateway.cs | 13 + .../Microsoft.AutoGen.Abstractions.csproj | 3 +- .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 55 ++-- .../Agents/AgentBaseExtensions.cs | 22 +- .../Microsoft.AutoGen/Agents/AgentContext.cs | 39 --- .../Microsoft.AutoGen/Agents/AgentRuntime.cs | 72 ++++++ .../Microsoft.AutoGen/Agents/AgentWorker.cs | 21 -- .../Agents/Agents/AIAgent/InferenceAgent.cs | 4 +- .../Agents/Agents/AIAgent/SKAiAgent.cs | 2 +- .../IOAgent/ConsoleAgent/ConsoleAgent.cs | 2 +- .../Agents/IOAgent/FileAgent/FileAgent.cs | 5 +- .../Agents/Agents/IOAgent/IOAgent.cs | 2 +- .../Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs | 2 +- dotnet/src/Microsoft.AutoGen/Agents/App.cs | 12 +- dotnet/src/Microsoft.AutoGen/Agents/Client.cs | 14 + .../Agents/Microsoft.AutoGen.Agents.csproj | 28 +- .../Agents/Services/AgentWorker.cs | 181 +++++++++++++ .../Services}/AgentWorkerHostingExtensions.cs | 30 ++- .../Grpc/GrpcAgentWorker.cs} | 110 +++----- .../GrpcAgentWorkerHostBuilderExtension.cs | 51 ++++ .../Services/Grpc/GrpcGateway.cs} | 241 +++++++----------- .../Services/Grpc/GrpcGatewayService.cs} | 17 +- .../Services/Grpc/GrpcWorkerConnection.cs} | 10 +- .../{Runtime => Agents/Services}/Host.cs | 10 +- .../{ => Services}/HostBuilderExtensions.cs | 50 +--- .../Services/Orleans/AgentStateGrain.cs} | 8 +- .../Agents/Services/Orleans/IRegistryGrain.cs | 15 ++ .../OrleansRuntimeHostingExtenions.cs | 4 +- .../Services/Orleans/RegistryGrain.cs} | 205 +++++++-------- .../ServiceDefaults/Extensions.cs | 0 .../Microsoft.AutoGen.ServiceDefaults.csproj | 0 .../Runtime/IAgentWorkerRegistryGrain.cs | 15 -- .../Runtime/IWorkerGateway.cs | 14 - .../Runtime/Microsoft.AutoGen.Runtime.csproj | 35 --- .../AgentBaseTests.cs | 7 +- .../Microsoft.AutoGen.Agents.Tests.csproj | 2 +- protos/agent_events.proto | 3 + 72 files changed, 920 insertions(+), 707 deletions(-) rename docs/design/{03 - worker-protocol.md => 03 - Agent Worker Protocol.md} (100%) create mode 100644 docs/design/05 - Services.md create mode 100644 dotnet/samples/Hello/Backend/appsettings.json create mode 100644 dotnet/samples/Hello/HelloAIAgents/appsettings.json create mode 100644 dotnet/samples/Hello/HelloAgent/appsettings.json create mode 100644 dotnet/samples/Hello/HelloAgentState/appsettings.json rename dotnet/src/Microsoft.AutoGen/Abstractions/{IAgentContext.cs => IAgentRuntime.cs} (50%) rename dotnet/src/Microsoft.AutoGen/{Runtime/IWorkerAgentGrain.cs => Abstractions/IAgentState.cs} (53%) create mode 100644 dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Client.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs rename dotnet/src/Microsoft.AutoGen/{Runtime => Agents/Services}/AgentWorkerHostingExtensions.cs (64%) rename dotnet/src/Microsoft.AutoGen/Agents/{GrpcAgentWorkerRuntime.cs => Services/Grpc/GrpcAgentWorker.cs} (76%) create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs rename dotnet/src/Microsoft.AutoGen/{Runtime/WorkerGateway.cs => Agents/Services/Grpc/GrpcGateway.cs} (56%) rename dotnet/src/Microsoft.AutoGen/{Runtime/WorkerGatewayService.cs => Agents/Services/Grpc/GrpcGatewayService.cs} (67%) rename dotnet/src/Microsoft.AutoGen/{Runtime/WorkerProcessConnection.cs => Agents/Services/Grpc/GrpcWorkerConnection.cs} (90%) rename dotnet/src/Microsoft.AutoGen/{Runtime => Agents/Services}/Host.cs (72%) rename dotnet/src/Microsoft.AutoGen/Agents/{ => Services}/HostBuilderExtensions.cs (82%) rename dotnet/src/Microsoft.AutoGen/{Runtime/WorkerAgentGrain.cs => Agents/Services/Orleans/AgentStateGrain.cs} (82%) create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs rename dotnet/src/Microsoft.AutoGen/{Runtime => Agents/Services/Orleans}/OrleansRuntimeHostingExtenions.cs (96%) rename dotnet/src/Microsoft.AutoGen/{Runtime/AgentWorkerRegistryGrain.cs => Agents/Services/Orleans/RegistryGrain.cs} (78%) rename dotnet/src/Microsoft.AutoGen/{ => Extensions}/ServiceDefaults/Extensions.cs (100%) rename dotnet/src/Microsoft.AutoGen/{ => Extensions}/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj (100%) delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj rename dotnet/{ => test}/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs (80%) rename dotnet/{ => test}/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj (74%) diff --git a/README.md b/README.md index a472e10a1b..7150942717 100644 --- a/README.md +++ b/README.md @@ -195,13 +195,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } public async Task Handle(ConversationClosed item) { @@ -210,7 +210,7 @@ public class HelloAgent( { Message = goodbye }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); await Task.Delay(60000); await App.ShutdownAsync(); } diff --git a/docs/design/03 - worker-protocol.md b/docs/design/03 - Agent Worker Protocol.md similarity index 100% rename from docs/design/03 - worker-protocol.md rename to docs/design/03 - Agent Worker Protocol.md diff --git a/docs/design/05 - Services.md b/docs/design/05 - Services.md new file mode 100644 index 0000000000..9aeacdb626 --- /dev/null +++ b/docs/design/05 - Services.md @@ -0,0 +1,26 @@ +# AutoGen Services + +## Overview + +Each AutoGen agent system has one or more Agent Workers and a set of services for managing/supporting the agents. The services and workers can all be hosted in the same process or in a distributed system. When in the same process communication and event delivery is in-memory. When distributed, workers communicate with the service over gRPC. In all cases, events are packaged as CloudEvents. There are multiple options for the backend services: + +- In-Memory: the Agent Workers and Services are all hosted in the same process and communicate over in-memory channels. Available for python and .NET. +- Python only: Agent workers communicate with a python hosted service that implements an in-memory message bus and agent registry. +- Micrososft Orleans: a distributed actor system that can host the services and workers, enables distributed state with persistent storage, can leverage multiple event bus types, and cross-language agent communication. +- *Roadmap: support for other languages distributed systems such as dapr or Akka.* + +The Services in the system include: + +- Worker: Hosts the Agents and is a client to the Gateway +- Gateway: +-- RPC gateway for the other services APIs +-- Provides an RPC bridge between the workers and the Event Bus +-- Message Session state (track message queues/delivery) +- Registry: keeps track of the {agents:agent types}:{Subscription/Topics} in the system and which events they can handle +-- *Roadmap: add lookup api in gateway* +- AgentState: persistent state for agents +- Routing: delivers events to agents based on their subscriptions+topics +-- *Roadmap: add subscription management APIs* +- *Roadmap: Management APIs for the Agent System* +- *Roadmap: Scheduling: manages placement of agents* +- *Roadmap: Discovery: allows discovery of agents and services* diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index 8557f17840..accf92218a 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -84,10 +84,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Abstracti EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Extensions.SemanticKernel", "src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj", "{952827D4-8D4C-4327-AE4D-E8D25811EF35}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Runtime", "src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj", "{A905E29A-7110-497F-ADC5-2CE2A148FEA0}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AgentChat", "AgentChat", "{668726B9-77BC-45CF-B576-0F0773BF1615}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AutoGen.Anthropic.Samples", "samples\AutoGen.Anthropic.Samples\AutoGen.Anthropic.Samples.csproj", "{84020C4A-933A-4693-9889-1B99304A7D76}" @@ -128,12 +124,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgent", "samples\Hello EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensions", "src\Microsoft.AutoGen\Extensions\AIModelClientHostingExtensions\AIModelClientHostingExtensions.csproj", "{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Agents.Tests", "Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{CF4C92BD-28AE-4B8F-B173-601004AEC9BF}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\Extensions\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{65059914-5527-4A00-9308-9FAF23D5E85A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Agents.Tests", "test\Microsoft.AutoGen.Agents.Tests\Microsoft.AutoGen.Agents.Tests.csproj", "{394FDAF8-74F9-4977-94A5-3371737EB774}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -264,14 +262,6 @@ Global {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.Build.0 = Debug|Any CPU {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.ActiveCfg = Release|Any CPU {952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.Build.0 = Release|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.Build.0 = Release|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}.Release|Any CPU.Build.0 = Release|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Debug|Any CPU.Build.0 = Debug|Any CPU {84020C4A-933A-4693-9889-1B99304A7D76}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -340,14 +330,18 @@ Global {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF}.Release|Any CPU.Build.0 = Release|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {65059914-5527-4A00-9308-9FAF23D5E85A}.Release|Any CPU.Build.0 = Release|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Debug|Any CPU.Build.0 = Debug|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.ActiveCfg = Release|Any CPU + {394FDAF8-74F9-4977-94A5-3371737EB774}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -385,8 +379,6 @@ Global {FD87BD33-4616-460B-AC85-A412BA08BB78} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {E0C991D9-0DB8-471C-ADC9-5FB16E2A0106} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {952827D4-8D4C-4327-AE4D-E8D25811EF35} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {A905E29A-7110-497F-ADC5-2CE2A148FEA0} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {668726B9-77BC-45CF-B576-0F0773BF1615} = {686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED} {84020C4A-933A-4693-9889-1B99304A7D76} = {668726B9-77BC-45CF-B576-0F0773BF1615} {5777515F-4053-42F9-AF2B-95D8D0F5384A} = {668726B9-77BC-45CF-B576-0F0773BF1615} @@ -407,8 +399,9 @@ Global {A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} - {CF4C92BD-28AE-4B8F-B173-601004AEC9BF} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} {64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} + {65059914-5527-4A00-9308-9FAF23D5E85A} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {394FDAF8-74F9-4977-94A5-3371737EB774} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index a414a5f59d..c1b26f2f9b 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -63,11 +63,17 @@ + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + diff --git a/dotnet/samples/Hello/Backend/Backend.csproj b/dotnet/samples/Hello/Backend/Backend.csproj index 2f5a02ee51..d502d7260d 100644 --- a/dotnet/samples/Hello/Backend/Backend.csproj +++ b/dotnet/samples/Hello/Backend/Backend.csproj @@ -1,4 +1,7 @@ - + + + + Exe net8.0 @@ -6,10 +9,6 @@ enable - - - - diff --git a/dotnet/samples/Hello/Backend/Program.cs b/dotnet/samples/Hello/Backend/Program.cs index 7abdb205a8..b913d39d64 100644 --- a/dotnet/samples/Hello/Backend/Program.cs +++ b/dotnet/samples/Hello/Backend/Program.cs @@ -1,5 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -var app = await Microsoft.AutoGen.Runtime.Host.StartAsync(local: true); +using Microsoft.Extensions.Hosting; + +var app = await Microsoft.AutoGen.Agents.Host.StartAsync(local: false, useGrpc: true); await app.WaitForShutdownAsync(); diff --git a/dotnet/samples/Hello/Backend/appsettings.json b/dotnet/samples/Hello/Backend/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/Backend/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs index fd3c517f33..d2ba81e659 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgent.cs @@ -8,11 +8,13 @@ using Microsoft.Extensions.AI; namespace Hello; [TopicSubscription("HelloAgents")] public class HelloAIAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, + IHostApplicationLifetime hostApplicationLifetime, IChatClient client) : HelloAgent( context, - typeRegistry), + typeRegistry, + hostApplicationLifetime), IHandle { // This Handle supercedes the one in the base class diff --git a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj index 86bccb13b3..f17ab0c9f0 100644 --- a/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj +++ b/dotnet/samples/Hello/HelloAIAgents/HelloAIAgents.csproj @@ -13,7 +13,6 @@ - diff --git a/dotnet/samples/Hello/HelloAIAgents/Program.cs b/dotnet/samples/Hello/HelloAIAgents/Program.cs index 8285e0800f..9612a0a079 100644 --- a/dotnet/samples/Hello/HelloAIAgents/Program.cs +++ b/dotnet/samples/Hello/HelloAIAgents/Program.cs @@ -32,8 +32,9 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + IAgentRuntime context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry, + IHostApplicationLifetime hostApplicationLifetime) : ConsoleAgent( context, typeRegistry), ISayHello, @@ -65,7 +66,7 @@ namespace Hello await PublishMessageAsync(evt).ConfigureAwait(false); //sleep30 seconds await Task.Delay(30000).ConfigureAwait(false); - await AgentsApp.ShutdownAsync().ConfigureAwait(false); + hostApplicationLifetime.StopApplication(); } public async Task SayHello(string ask) diff --git a/dotnet/samples/Hello/HelloAIAgents/appsettings.json b/dotnet/samples/Hello/HelloAIAgents/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAIAgents/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj index 8799eb7275..f2f3e473fe 100644 --- a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj +++ b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj @@ -1,4 +1,4 @@ - + Exe net8.0 @@ -14,6 +14,5 @@ - diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index 4e96c7f99b..506d915023 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -3,6 +3,8 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; // step 1: create in-memory agent runtime @@ -16,7 +18,7 @@ using Microsoft.AutoGen.Agents; var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: false); +}, local: true); await app.WaitForShutdownAsync(); @@ -24,9 +26,8 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry, - IHostApplicationLifetime hostApplicationLifetime) : AgentBase( + IAgentRuntime context, IHostApplicationLifetime hostApplicationLifetime, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( context, typeRegistry), ISayHello, diff --git a/dotnet/samples/Hello/HelloAgent/README.md b/dotnet/samples/Hello/HelloAgent/README.md index 2de64ebaa0..f95e25d6ec 100644 --- a/dotnet/samples/Hello/HelloAgent/README.md +++ b/dotnet/samples/Hello/HelloAgent/README.md @@ -25,11 +25,11 @@ Flow Diagram: ```mermaid %%{init: {'theme':'forest'}}%% graph LR; - A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} - B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent] C --> D{"WriteConsole()"} - B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} - B --> |"PublishEvent(Output('***Goodbye***'))"| C + B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEventAsync(Output('***Goodbye***'))"| C E --> F{"Shutdown()"} ``` @@ -58,13 +58,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } ``` @@ -109,7 +109,6 @@ message ReadmeRequested { } ``` - ```xml diff --git a/dotnet/samples/Hello/HelloAgent/appsettings.json b/dotnet/samples/Hello/HelloAgent/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgent/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj index 8799eb7275..797fe957bb 100644 --- a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -14,6 +14,5 @@ - diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs index d021126850..7c15c4c54d 100644 --- a/dotnet/samples/Hello/HelloAgentState/Program.cs +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs +using System.Text.Json; using Microsoft.AutoGen.Abstractions; using Microsoft.AutoGen.Agents; @@ -8,7 +9,7 @@ using Microsoft.AutoGen.Agents; var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived { Message = "World" -}, local: false); +}, local: true); await app.WaitForShutdownAsync(); @@ -16,13 +17,15 @@ namespace Hello { [TopicSubscription("HelloAgents")] public class HelloAgent( - IAgentContext context, - [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + IAgentRuntime context, + IHostApplicationLifetime hostApplicationLifetime, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase( context, typeRegistry), - ISayHello, + IHandleConsole, IHandle, - IHandle + IHandle, + IHandle { private AgentState? State { get; set; } public async Task Handle(NewMessageReceived item) @@ -32,11 +35,15 @@ namespace Hello { Message = response }; - var entry = "We said hello to " + item.Message; - await Store(new AgentState + Dictionary state = new() + { + { "data", "We said hello to " + item.Message }, + { "workflow", "Active" } + }; + await StoreAsync(new AgentState { AgentId = this.AgentId, - TextData = entry + TextData = JsonSerializer.Serialize(state) }).ConfigureAwait(false); await PublishMessageAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed @@ -45,21 +52,40 @@ namespace Hello UserMessage = "Goodbye" }; await PublishMessageAsync(goodbye).ConfigureAwait(false); + // send the shutdown message + await PublishMessageAsync(new Shutdown { Message = this.AgentId.Key }).ConfigureAwait(false); + } public async Task Handle(ConversationClosed item) { - State = await Read(this.AgentId).ConfigureAwait(false); - var read = State?.TextData ?? "No state data found"; - var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************"; + State = await ReadAsync(this.AgentId).ConfigureAwait(false); + var state = JsonSerializer.Deserialize>(State.TextData) ?? new Dictionary { { "data", "No state data found" } }; + var goodbye = $"\nState: {state}\n********************* {item.UserId} said {item.UserMessage} ************************"; var evt = new Output { Message = goodbye }; - await PublishMessageAsync(evt).ConfigureAwait(false); - //sleep - await Task.Delay(10000).ConfigureAwait(false); - await AgentsApp.ShutdownAsync().ConfigureAwait(false); - + await PublishMessageAsync(evt).ConfigureAwait(true); + state["workflow"] = "Complete"; + await StoreAsync(new AgentState + { + AgentId = this.AgentId, + TextData = JsonSerializer.Serialize(state) + }).ConfigureAwait(false); + } + public async Task Handle(Shutdown item) + { + string? workflow = null; + // make sure the workflow is finished + while (workflow != "Complete") + { + State = await ReadAsync(this.AgentId).ConfigureAwait(true); + var state = JsonSerializer.Deserialize>(State?.TextData ?? "{}") ?? new Dictionary(); + workflow = state["workflow"]; + await Task.Delay(1000).ConfigureAwait(true); + } + // now we can shut down... + hostApplicationLifetime.StopApplication(); } public async Task SayHello(string ask) { @@ -67,8 +93,4 @@ namespace Hello return response; } } - public interface ISayHello - { - public Task SayHello(string ask); - } } diff --git a/dotnet/samples/Hello/HelloAgentState/README.md b/dotnet/samples/Hello/HelloAgentState/README.md index 0079005d24..8bc8e34545 100644 --- a/dotnet/samples/Hello/HelloAgentState/README.md +++ b/dotnet/samples/Hello/HelloAgentState/README.md @@ -25,11 +25,11 @@ Flow Diagram: ```mermaid %%{init: {'theme':'forest'}}%% graph LR; - A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} - B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent] C --> D{"WriteConsole()"} - B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} - B --> |"PublishEvent(Output('***Goodbye***'))"| C + B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEventAsync(Output('***Goodbye***'))"| C E --> F{"Shutdown()"} ``` @@ -58,13 +58,13 @@ public class HelloAgent( { Message = response }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(evt).ConfigureAwait(false); + await PublishEventAsync(evt).ConfigureAwait(false); var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" }.ToCloudEvent(this.AgentId.Key); - await PublishEvent(goodbye).ConfigureAwait(false); + await PublishEventAsync(goodbye).ConfigureAwait(false); } ``` diff --git a/dotnet/samples/Hello/HelloAgentState/appsettings.json b/dotnet/samples/Hello/HelloAgentState/appsettings.json new file mode 100644 index 0000000000..3bb8d88255 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning", + "Microsoft.Orleans": "Warning" + } + } + } \ No newline at end of file diff --git a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj index 69705e4667..d8d7ebf8e4 100644 --- a/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj +++ b/dotnet/samples/dev-team/DevTeam.AgentHost/DevTeam.AgentHost.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs b/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs index 4864f39421..6480e72a0b 100644 --- a/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs +++ b/dotnet/samples/dev-team/DevTeam.AgentHost/Program.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Program.cs -using Microsoft.AutoGen.Runtime; +using Microsoft.AutoGen.Agents; var builder = WebApplication.CreateBuilder(args); builder.AddServiceDefaults(); diff --git a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj index 03176d2cfd..8dfd6912e5 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj +++ b/dotnet/samples/dev-team/DevTeam.Agents/DevTeam.Agents.csproj @@ -9,8 +9,8 @@ - - + + diff --git a/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs b/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs index 5b66822482..325b9fbe0b 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/Developer/Developer.cs @@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class Dev(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class Dev(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), IDevelopApps, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs b/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs index 71301dd3d3..e03701c968 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/DeveloperLead/DeveloperLead.cs @@ -11,7 +11,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class DeveloperLead(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class DeveloperLead(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), ILeadDevelopers, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs b/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs index a97b333567..cc393e6518 100644 --- a/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs +++ b/dotnet/samples/dev-team/DevTeam.Agents/ProductManager/ProductManager.cs @@ -10,7 +10,7 @@ using Microsoft.SemanticKernel.Memory; namespace DevTeam.Agents; [TopicSubscription("devteam")] -public class ProductManager(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) +public class ProductManager(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger) : SKAiAgent(context, memory, kernel, typeRegistry), IManageProducts, IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs b/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs index 7dac8163a7..d3997a8f8e 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs +++ b/dotnet/samples/dev-team/DevTeam.Backend/Agents/AzureGenie.cs @@ -9,7 +9,7 @@ using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.Memory; namespace Microsoft.AI.DevTeam; -public class AzureGenie(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService) +public class AzureGenie(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageAzure azureService) : SKAiAgent(context, memory, kernel, typeRegistry), IHandle, IHandle diff --git a/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs b/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs index 2f4c373385..3dc8dd35ad 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs +++ b/dotnet/samples/dev-team/DevTeam.Backend/Agents/Hubber.cs @@ -12,7 +12,7 @@ using Microsoft.SemanticKernel.Memory; namespace Microsoft.AI.DevTeam; -public class Hubber(IAgentContext context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService) +public class Hubber(IAgentRuntime context, Kernel kernel, ISemanticTextMemory memory, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, IManageGithub ghService) : SKAiAgent(context, memory, kernel, typeRegistry), IHandle, IHandle, diff --git a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj index 419298430c..8296f7aa67 100644 --- a/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj +++ b/dotnet/samples/dev-team/DevTeam.Backend/DevTeam.Backend.csproj @@ -29,7 +29,7 @@ - + diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs index 2e767e7213..14c2688c23 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs @@ -9,13 +9,14 @@ public interface IAgentBase { // Properties AgentId AgentId { get; } - IAgentContext Context { get; } + IAgentRuntime Context { get; } // Methods Task CallHandler(CloudEvent item); Task HandleRequest(RpcRequest request); void ReceiveMessage(Message message); - Task Store(AgentState state); - Task Read(AgentId agentId) where T : IMessage, new(); - ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default); + Task StoreAsync(AgentState state, CancellationToken cancellationToken = default); + Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new(); + ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default); + ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs similarity index 50% rename from dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs index ab5972730f..2125e57a8b 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs @@ -1,20 +1,21 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentContext.cs +// IAgentRuntime.cs using System.Diagnostics; -using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Abstractions; -public interface IAgentContext +public interface IAgentRuntime { AgentId AgentId { get; } IAgentBase? AgentInstance { get; set; } - DistributedContextPropagator DistributedContextPropagator { get; } // TODO: Remove this. An abstraction should not have a dependency on DistributedContextPropagator. - ILogger Logger { get; } // TODO: Remove this. An abstraction should not have a dependency on ILogger. - ValueTask Store(AgentState value, CancellationToken cancellationToken = default); - ValueTask Read(AgentId agentId, CancellationToken cancellationToken = default); + ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default); + ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default); ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default); + void Update(Activity? activity, RpcRequest request); + void Update(Activity? activity, CloudEvent cloudEvent); + (string?, string?) GetTraceIDandState(IDictionary metadata); + IDictionary ExtractMetadata(IDictionary metadata); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs similarity index 53% rename from dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs index d28e2a8c8d..0a6784b54f 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentState.cs @@ -1,11 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// IWorkerAgentGrain.cs +// IAgentState.cs -using Microsoft.AutoGen.Abstractions; +namespace Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; - -internal interface IWorkerAgentGrain : IGrainWithStringKey +public interface IAgentState { ValueTask ReadStateAsync(); ValueTask WriteStateAsync(AgentState state, string eTag); diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs new file mode 100644 index 0000000000..67a867d87d --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IAgentWorker.cs + +namespace Microsoft.AutoGen.Abstractions; + +public interface IAgentWorker +{ + ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default); + ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default); + ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default); + ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default); + ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs deleted file mode 100644 index c03259f722..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorkerRuntime.cs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentWorkerRuntime.cs - -namespace Microsoft.AutoGen.Abstractions; - -public interface IAgentWorkerRuntime -{ - ValueTask PublishEvent(CloudEvent evt, CancellationToken cancellationToken); - ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken); - ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken); - ValueTask Store(AgentState value, CancellationToken cancellationToken); - ValueTask Read(AgentId agentId, CancellationToken cancellationToken); -} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs new file mode 100644 index 0000000000..3ac582f6d4 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IConnection.cs @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IConnection.cs + +namespace Microsoft.AutoGen.Abstractions; +public interface IConnection +{ +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs new file mode 100644 index 0000000000..79b7b63e72 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IGateway.cs + +namespace Microsoft.AutoGen.Abstractions; + +public interface IGateway : IGrainObserver +{ + ValueTask InvokeRequest(RpcRequest request); + ValueTask BroadcastEvent(CloudEvent evt); + ValueTask StoreAsync(AgentState value); + ValueTask ReadAsync(AgentId agentId); + Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent); +} diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj index 52f933e195..e24b52187c 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj @@ -20,7 +20,8 @@ + + - diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 3932e007b3..6fffdaadf1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -20,21 +20,24 @@ public abstract class AgentBase : IAgentBase, IHandle private readonly Dictionary> _pendingRequests = []; private readonly Channel _mailbox = Channel.CreateUnbounded(); - private readonly IAgentContext _context; + private readonly IAgentRuntime _context; public string Route { get; set; } = "base"; - protected internal ILogger Logger => _context.Logger; - public IAgentContext Context => _context; + protected internal ILogger _logger; + public IAgentRuntime Context => _context; protected readonly EventTypes EventTypes; - protected AgentBase(IAgentContext context, EventTypes eventTypes) + protected AgentBase( + IAgentRuntime context, + EventTypes eventTypes, + ILogger? logger = null) { _context = context; context.AgentInstance = this; this.EventTypes = eventTypes; + _logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger(); Completion = Start(); } - internal Task Completion { get; } internal Task Start() @@ -58,7 +61,6 @@ public abstract class AgentBase : IAgentBase, IHandle } } } - public void ReceiveMessage(Message message) => _mailbox.Writer.TryWrite(message); private async Task RunMessagePump() @@ -71,7 +73,7 @@ public abstract class AgentBase : IAgentBase, IHandle switch (message) { case Message msg: - await HandleRpcMessage(msg).ConfigureAwait(false); + await HandleRpcMessage(msg, new CancellationToken()).ConfigureAwait(false); break; default: throw new InvalidOperationException($"Unexpected message '{message}'."); @@ -79,12 +81,11 @@ public abstract class AgentBase : IAgentBase, IHandle } catch (Exception ex) { - _context.Logger.LogError(ex, "Error processing message."); + _logger.LogError(ex, "Error processing message."); } } } - - protected internal async Task HandleRpcMessage(Message msg) + protected internal async Task HandleRpcMessage(Message msg, CancellationToken cancellationToken = default) { switch (msg.MessageCase) { @@ -95,17 +96,17 @@ public abstract class AgentBase : IAgentBase, IHandle static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item), (this, msg.CloudEvent), activity, - msg.CloudEvent.Type).ConfigureAwait(false); + msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false); } break; case Message.MessageOneofCase.Request: { var activity = this.ExtractActivity(msg.Request.Method, msg.Request.Metadata); await this.InvokeWithActivityAsync( - static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCore(state.Request), + static ((AgentBase Agent, RpcRequest Request) state) => state.Agent.OnRequestCoreAsync(state.Request), (this, msg.Request), activity, - msg.Request.Method).ConfigureAwait(false); + msg.Request.Method, cancellationToken).ConfigureAwait(false); } break; case Message.MessageOneofCase.Response: @@ -113,14 +114,14 @@ public abstract class AgentBase : IAgentBase, IHandle break; } } - public async Task Store(AgentState state) + public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default) { - await _context.Store(state).ConfigureAwait(false); + await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false); return; } - public async Task Read(AgentId agentId) where T : IMessage, new() + public async Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new() { - var agentstate = await _context.Read(agentId).ConfigureAwait(false); + var agentstate = await _context.ReadAsync(agentId, cancellationToken).ConfigureAwait(false); return agentstate.FromAgentState(); } private void OnResponseCore(RpcResponse response) @@ -137,7 +138,7 @@ public abstract class AgentBase : IAgentBase, IHandle completion.SetResult(response); } - private async Task OnRequestCore(RpcRequest request) + private async Task OnRequestCoreAsync(RpcRequest request, CancellationToken cancellationToken = default) { RpcResponse response; @@ -149,8 +150,7 @@ public abstract class AgentBase : IAgentBase, IHandle { response = new RpcResponse { Error = ex.Message }; } - - await _context.SendResponseAsync(request, response).ConfigureAwait(false); + await _context.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false); } protected async Task RequestAsync(AgentId target, string method, Dictionary parameters) @@ -174,7 +174,7 @@ public abstract class AgentBase : IAgentBase, IHandle activity?.SetTag("peer.service", target.ToString()); var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Context.DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + _context.Update(activity, request); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource) state) => { @@ -204,13 +204,13 @@ public abstract class AgentBase : IAgentBase, IHandle await PublishEventAsync(evt, token).ConfigureAwait(false); } - public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default) + public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken cancellationToken = default) { var activity = s_source.StartActivity($"PublishEventAsync '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default); activity?.SetTag("peer.service", $"{item.Type}/{item.Source}"); // TODO: fix activity - Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + _context.Update(activity, item); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, CloudEvent Event) state) => { @@ -218,7 +218,7 @@ public abstract class AgentBase : IAgentBase, IHandle }, (this, item), activity, - item.Type).ConfigureAwait(false); + item.Type, cancellationToken).ConfigureAwait(false); } public Task CallHandler(CloudEvent item) @@ -251,7 +251,7 @@ public abstract class AgentBase : IAgentBase, IHandle } catch (Exception ex) { - Logger.LogError(ex, $"Error invoking method {nameof(IHandle.Handle)}"); + _logger.LogError(ex, $"Error invoking method {nameof(IHandle.Handle)}"); throw; // TODO: ? } } @@ -262,6 +262,7 @@ public abstract class AgentBase : IAgentBase, IHandle public Task HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" }); + //TODO: should this be async and cancellable? public virtual Task HandleObject(object item) { // get all Handle methods @@ -279,4 +280,8 @@ public abstract class AgentBase : IAgentBase, IHandle // otherwise, complain throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}"); } + public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default) + { + await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs index ef62a2b1d4..ce1318a0d3 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBaseExtensions.cs @@ -9,17 +9,8 @@ public static class AgentBaseExtensions { public static Activity? ExtractActivity(this AgentBase agent, string activityName, IDictionary metadata) { - Activity? activity = null; - agent.Context.DistributedContextPropagator.ExtractTraceIdAndState(metadata, - static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => - { - var metadata = (IDictionary)carrier!; - fieldValues = null; - metadata.TryGetValue(fieldName, out fieldValue); - }, - out var traceParent, - out var traceState); - + Activity? activity; + (var traceParent, var traceState) = agent.Context.GetTraceIDandState(metadata); if (!string.IsNullOrEmpty(traceParent)) { if (ActivityContext.TryParse(traceParent, traceState, isRemote: true, out ActivityContext parentContext)) @@ -40,12 +31,7 @@ public static class AgentBaseExtensions activity.TraceStateString = traceState; } - var baggage = agent.Context.DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => - { - var metadata = (IDictionary)carrier!; - fieldValues = null; - metadata.TryGetValue(fieldName, out fieldValue); - }); + var baggage = agent.Context.ExtractMetadata(metadata); if (baggage is not null) { @@ -63,7 +49,7 @@ public static class AgentBaseExtensions return activity; } - public static async Task InvokeWithActivityAsync(this AgentBase agent, Func func, TState state, Activity? activity, string methodName) + public static async Task InvokeWithActivityAsync(this AgentBase agent, Func func, TState state, Activity? activity, string methodName, CancellationToken cancellationToken = default) { if (activity is not null && activity.StartTimeUtc == default) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs deleted file mode 100644 index 7de1e6565d..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentContext.cs - -using System.Diagnostics; -using Microsoft.AutoGen.Abstractions; -using Microsoft.Extensions.Logging; - -namespace Microsoft.AutoGen.Agents; - -internal sealed class AgentContext(AgentId agentId, IAgentWorkerRuntime runtime, ILogger logger, DistributedContextPropagator distributedContextPropagator) : IAgentContext -{ - private readonly IAgentWorkerRuntime _runtime = runtime; - - public AgentId AgentId { get; } = agentId; - public ILogger Logger { get; } = logger; - public IAgentBase? AgentInstance { get; set; } - public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken) - { - response.RequestId = request.RequestId; - await _runtime.SendResponse(response, cancellationToken).ConfigureAwait(false); - } - public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) - { - await _runtime.SendRequest(agent, request, cancellationToken).ConfigureAwait(false); - } - public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken) - { - await _runtime.PublishEvent(@event, cancellationToken).ConfigureAwait(false); - } - public async ValueTask Store(AgentState value, CancellationToken cancellationToken) - { - await _runtime.Store(value, cancellationToken).ConfigureAwait(false); - } - public ValueTask Read(AgentId agentId, CancellationToken cancellationToken) - { - return _runtime.Read(agentId, cancellationToken); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs new file mode 100644 index 0000000000..86944cad3a --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentRuntime.cs + +using System.Diagnostics; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; + +internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger logger, DistributedContextPropagator distributedContextPropagator) : IAgentRuntime +{ + private readonly IAgentWorker worker = worker; + + public AgentId AgentId { get; } = agentId; + public ILogger Logger { get; } = logger; + public IAgentBase? AgentInstance { get; set; } + private DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; + public (string?, string?) GetTraceIDandState(IDictionary metadata) + { + DistributedContextPropagator.ExtractTraceIdAndState(metadata, + static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => + { + var metadata = (IDictionary)carrier!; + fieldValues = null; + metadata.TryGetValue(fieldName, out fieldValue); + }, + out var traceParent, + out var traceState); + return (traceParent, traceState); + } + public void Update(Activity? activity, RpcRequest request) + { + DistributedContextPropagator.Inject(activity, request.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + } + public void Update(Activity? activity, CloudEvent cloudEvent) + { + DistributedContextPropagator.Inject(activity, cloudEvent.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + } + public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default) + { + response.RequestId = request.RequestId; + await worker.SendResponseAsync(response, cancellationToken).ConfigureAwait(false); + } + public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) + { + await worker.SendRequestAsync(agent, request, cancellationToken).ConfigureAwait(false); + } + public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) + { + await worker.PublishEventAsync(@event, cancellationToken).ConfigureAwait(false); + } + public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) + { + await worker.StoreAsync(value, cancellationToken).ConfigureAwait(false); + } + public ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) + { + return worker.ReadAsync(agentId, cancellationToken); + } + + public IDictionary ExtractMetadata(IDictionary metadata) + { + var baggage = DistributedContextPropagator.ExtractBaggage(metadata, static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => + { + var metadata = (IDictionary)carrier!; + fieldValues = null; + metadata.TryGetValue(fieldName, out fieldValue); + }); + + return baggage as IDictionary ?? new Dictionary(); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs deleted file mode 100644 index a820656090..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// AgentWorker.cs - -using System.Diagnostics; -using Google.Protobuf; -using Microsoft.AutoGen.Abstractions; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Microsoft.AutoGen.Agents; -public sealed class AgentWorker(IAgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator, - [FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger logger) - : AgentBase(new AgentContext(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes) -{ - public async ValueTask PublishEventAsync(CloudEvent evt) => await base.PublishEventAsync(evt); - - public async ValueTask PublishEventAsync(string topic, IMessage evt) - { - await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false); - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs index 1568d97696..a0383a3c21 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs @@ -4,12 +4,12 @@ using Google.Protobuf; using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.AI; -namespace Microsoft.AutoGen.Agents.Client; +namespace Microsoft.AutoGen.Agents; public abstract class InferenceAgent : AgentBase where T : IMessage, new() { protected IChatClient ChatClient { get; } public InferenceAgent( - IAgentContext context, + IAgentRuntime context, EventTypes typeRegistry, IChatClient client ) : base(context, typeRegistry) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs index ceeeadacc5..db0d8241fe 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs @@ -15,7 +15,7 @@ public abstract class SKAiAgent : AgentBase where T : class, new() protected Kernel _kernel; private readonly ISemanticTextMemory _memory; - public SKAiAgent(IAgentContext context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry) + public SKAiAgent(IAgentRuntime context, ISemanticTextMemory memory, Kernel kernel, EventTypes typeRegistry) : base(context, typeRegistry) { _state = new(); _memory = memory; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs index d7c32c2479..3d7de5c12d 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs @@ -13,7 +13,7 @@ public abstract class ConsoleAgent : IOAgent, { // instead of the primary constructor above, make a constructr here that still calls the base constructor - public ConsoleAgent(IAgentContext context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry) + public ConsoleAgent(IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : base(context, typeRegistry) { _route = "console"; } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs index 217c2e56e7..65aecee148 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs @@ -9,7 +9,7 @@ namespace Microsoft.AutoGen.Agents; [TopicSubscription("FileIO")] public abstract class FileAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, string inputPath = "input.txt", string outputPath = "output.txt" @@ -24,7 +24,7 @@ public abstract class FileAgent( if (!File.Exists(inputPath)) { var errorMessage = $"File not found: {inputPath}"; - Logger.LogError(errorMessage); + _logger.LogError(errorMessage); //publish IOError event var err = new IOError { @@ -42,6 +42,7 @@ public abstract class FileAgent( var evt = new InputProcessed { Route = _route + }; await PublishMessageAsync(evt); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs index 6d470b345a..fdd6852405 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs @@ -8,7 +8,7 @@ namespace Microsoft.AutoGen.Agents; public abstract class IOAgent : AgentBase { public string _route = "base"; - protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) + protected IOAgent(IAgentRuntime context, EventTypes eventTypes) : base(context, eventTypes) { } public virtual async Task Handle(Input item) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs index 69b8d177be..3a594a3bf7 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs @@ -17,7 +17,7 @@ public abstract class WebAPIAgent : IOAgent, private readonly string _url = "/agents/webio"; public WebAPIAgent( - IAgentContext context, + IAgentRuntime context, [FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger logger, string url = "/agents/webio") : base( diff --git a/dotnet/src/Microsoft.AutoGen/Agents/App.cs b/dotnet/src/Microsoft.AutoGen/Agents/App.cs index 68c57e8d6d..fc36d33677 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/App.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/App.cs @@ -1,10 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // App.cs - using System.Diagnostics.CodeAnalysis; using Google.Protobuf; using Microsoft.AspNetCore.Builder; -using Microsoft.AutoGen.Runtime; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -21,15 +19,15 @@ public static class AgentsApp if (local) { // start the server runtime - builder.AddLocalAgentService(); + builder.AddLocalAgentService(useGrpc: false); } - builder.AddAgentWorker() + builder.AddAgentWorker(local: local) .AddAgents(agentTypes); builder.AddServiceDefaults(); var app = builder.Build(); if (local) { - app.MapAgentService(); + app.MapAgentService(local: true, useGrpc: false); } app.MapDefaultEndpoints(); Host = app; @@ -47,8 +45,8 @@ public static class AgentsApp { await StartAsync(builder, agents, local); } - var client = Host.Services.GetRequiredService() ?? throw new InvalidOperationException("Host not started"); - await client.PublishEventAsync(topic, message).ConfigureAwait(false); + var client = Host.Services.GetRequiredService() ?? throw new InvalidOperationException("Host not started"); + await client.PublishEventAsync(topic, message, new CancellationToken()).ConfigureAwait(true); return Host; } public static async ValueTask ShutdownAsync() diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Client.cs b/dotnet/src/Microsoft.AutoGen/Agents/Client.cs new file mode 100644 index 0000000000..313685d793 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Client.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Client.cs + +using System.Diagnostics; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; +public sealed class Client(IAgentWorker runtime, DistributedContextPropagator distributedContextPropagator, + [FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger logger) + : AgentBase(new AgentRuntime(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes) +{ +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj index 8e2c457766..3bc2b3acb0 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj +++ b/dotnet/src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj @@ -4,22 +4,44 @@ net8.0 enable enable - AutoGen.Worker.Client + Microsoft.AutoGen.Agents https://github.com/microsoft/autogen Microsoft - AutoGen Worker Client Library + Micrososft AutoGen Agents SDK ai-agents;event-driven-agents - + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + + true + true + diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs new file mode 100644 index 0000000000..4900514903 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs @@ -0,0 +1,181 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentWorker.cs + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading.Channels; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents; + +public class AgentWorker : + IHostedService, + IAgentWorker +{ + private readonly ConcurrentDictionary _agentTypes = new(); + private readonly ConcurrentDictionary<(string Type, string Key), IAgentBase> _agents = new(); + private readonly ILogger _logger; + private readonly Channel _mailbox = Channel.CreateUnbounded(); + private readonly ConcurrentDictionary _agentStates = new(); + private readonly ConcurrentDictionary _pendingClientRequests = new(); + private readonly CancellationTokenSource _shutdownCts; + private readonly IServiceProvider _serviceProvider; + private readonly IEnumerable> _configuredAgentTypes; + private readonly DistributedContextPropagator _distributedContextPropagator; + private readonly CancellationTokenSource _shutdownCancellationToken = new(); + private Task? _mailboxTask; + private readonly object _channelLock = new(); + + public AgentWorker( + IHostApplicationLifetime hostApplicationLifetime, + IServiceProvider serviceProvider, + [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, + ILogger logger, + DistributedContextPropagator distributedContextPropagator) + { + _logger = logger; + _serviceProvider = serviceProvider; + _configuredAgentTypes = configuredAgentTypes; + _distributedContextPropagator = distributedContextPropagator; + _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); + } + // this is the in-memory version - we just pass the message directly to the agent(s) that handle this type of event + public async ValueTask PublishEventAsync(CloudEvent cloudEvent, CancellationToken cancellationToken = default) + { + foreach (var (typeName, _) in _agentTypes) + { + if (typeName == "Client") { continue; } + var agent = GetOrActivateAgent(new AgentId(typeName, cloudEvent.Source)); + agent.ReceiveMessage(new Message { CloudEvent = cloudEvent }); + } + } + public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) + { + var requestId = Guid.NewGuid().ToString(); + _pendingClientRequests[requestId] = (agent, request.RequestId); + request.RequestId = requestId; + await _mailbox.Writer.WriteAsync(request, cancellationToken).ConfigureAwait(false); + } + public ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) + { + return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken); + } + public ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) + { + var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); + // add or update _agentStates with the new state + var response = _agentStates.AddOrUpdate(agentId.ToString(), value, (key, oldValue) => value); + return ValueTask.CompletedTask; + } + public ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) + { + _agentStates.TryGetValue(agentId.ToString(), out var state); + if (state is not null && state.AgentId is not null) + { + return new ValueTask(state); + } + else + { + throw new KeyNotFoundException($"Failed to read AgentState for {agentId}."); + } + } + public async Task RunMessagePump() + { + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + await foreach (var message in _mailbox.Reader.ReadAllAsync()) + { + try + { + if (message == null) { continue; } + switch (message) + { + case Message msg: + + var item = msg.CloudEvent; + + foreach (var (typeName, _) in _agentTypes) + { + var agentToInvoke = GetOrActivateAgent(new AgentId(typeName, item.Source)); + agentToInvoke.ReceiveMessage(msg); + } + break; + default: + throw new InvalidOperationException($"Unexpected message '{message}'."); + } + } + catch (OperationCanceledException) + { + } + finally + { + _shutdownCancellationToken.Cancel(); + } + } + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + StartCore(); + + foreach (var (typeName, type) in _configuredAgentTypes) + { + _agentTypes.TryAdd(typeName, type); + } + void StartCore() + { + var didSuppress = false; + if (!ExecutionContext.IsFlowSuppressed()) + { + didSuppress = true; + ExecutionContext.SuppressFlow(); + } + + try + { + _mailboxTask = Task.Run(RunMessagePump, CancellationToken.None); + } + finally + { + if (didSuppress) + { + ExecutionContext.RestoreFlow(); + } + } + } + } + public async Task StopAsync(CancellationToken cancellationToken) + { + _shutdownCts.Cancel(); + + _mailbox.Writer.TryComplete(); + + if (_mailboxTask is { } readTask) + { + await readTask.ConfigureAwait(false); + } + lock (_channelLock) + { + } + } + private IAgentBase GetOrActivateAgent(AgentId agentId) + { + if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent)) + { + if (_agentTypes.TryGetValue(agentId.Type, out var agentType)) + { + var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); + agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context); + _agents.TryAdd((agentId.Type, agentId.Key), agent); + } + else + { + throw new InvalidOperationException($"Agent type '{agentId.Type}' is unknown."); + } + } + + return agent; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs similarity index 64% rename from dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs index db582e051a..8215f20327 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorkerHostingExtensions.cs @@ -9,11 +9,11 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class AgentWorkerHostingExtensions { - public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false) + public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false, bool useGrpc = true) { if (local) { @@ -26,24 +26,32 @@ public static class AgentWorkerHostingExtensions listenOptions.UseHttps(); }); }); + builder.AddOrleans(local); } - builder.Services.AddGrpc(); - builder.AddOrleans(local); + else + { + builder.AddOrleans(); + } + builder.Services.TryAddSingleton(DistributedContextPropagator.Current); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(sp => sp.GetRequiredService()); + + if (useGrpc) + { + builder.Services.AddGrpc(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); + } return builder; } - public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder) + public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder, bool useGrpc = true) { - builder.AddAgentService(local: true); - return builder; + return builder.AddAgentService(local: true, useGrpc); } - public static WebApplication MapAgentService(this WebApplication app) + public static WebApplication MapAgentService(this WebApplication app, bool local = false, bool useGrpc = true) { - app.MapGrpcService(); + if (useGrpc) { app.MapGrpcService(); } return app; } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs similarity index 76% rename from dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs index b0550c1fb7..431a5629c1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/GrpcAgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorker.cs @@ -1,5 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// GrpcAgentWorkerRuntime.cs +// GrpcAgentWorker.cs using System.Collections.Concurrent; using System.Diagnostics; @@ -13,7 +13,15 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Agents; -public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgentWorkerRuntime +public sealed class GrpcAgentWorker( + AgentRpc.AgentRpcClient client, + IHostApplicationLifetime hostApplicationLifetime, + IServiceProvider serviceProvider, + [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, + ILogger logger, + DistributedContextPropagator distributedContextPropagator) : + AgentWorker(hostApplicationLifetime, + serviceProvider, configuredAgentTypes, logger, distributedContextPropagator), IHostedService, IDisposable, IAgentWorker { private readonly object _channelLock = new(); private readonly ConcurrentDictionary _agentTypes = new(); @@ -26,38 +34,21 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); - private readonly AgentRpc.AgentRpcClient _client; - private readonly IServiceProvider _serviceProvider; - private readonly IEnumerable> _configuredAgentTypes; - private readonly ILogger _logger; - private readonly DistributedContextPropagator _distributedContextPropagator; - private readonly CancellationTokenSource _shutdownCts; + private readonly AgentRpc.AgentRpcClient _client = client; + private readonly IServiceProvider _serviceProvider = serviceProvider; + private readonly IEnumerable> _configuredAgentTypes = configuredAgentTypes; + private readonly ILogger _logger = logger; + private readonly DistributedContextPropagator _distributedContextPropagator = distributedContextPropagator; + private readonly CancellationTokenSource _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); private AsyncDuplexStreamingCall? _channel; private Task? _readTask; private Task? _writeTask; - public GrpcAgentWorkerRuntime( - AgentRpc.AgentRpcClient client, - IHostApplicationLifetime hostApplicationLifetime, - IServiceProvider serviceProvider, - [FromKeyedServices("AgentTypes")] IEnumerable> configuredAgentTypes, - ILogger logger, - DistributedContextPropagator distributedContextPropagator) - { - _client = client; - _serviceProvider = serviceProvider; - _configuredAgentTypes = configuredAgentTypes; - _logger = logger; - _distributedContextPropagator = distributedContextPropagator; - _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); - } - public void Dispose() { _outboundMessagesChannel.Writer.TryComplete(); _channel?.Dispose(); } - private async Task RunReadPump() { var channel = GetChannel(); @@ -131,7 +122,6 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent } } } - private async Task RunWritePump() { var channel = GetChannel(); @@ -183,14 +173,13 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent item.WriteCompletionSource.TrySetCanceled(); } } - private IAgentBase GetOrActivateAgent(AgentId agentId) { if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent)) { if (_agentTypes.TryGetValue(agentId.Type, out var agentType)) { - var context = new AgentContext(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); + var context = new AgentRuntime(agentId, this, _serviceProvider.GetRequiredService>(), _distributedContextPropagator); agent = (AgentBase)ActivatorUtilities.CreateInstance(_serviceProvider, agentType, context); _agents.TryAdd((agentId.Type, agentId.Key), agent); } @@ -203,7 +192,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent return agent; } - private async ValueTask RegisterAgentType(string type, Type agentType) + private async ValueTask RegisterAgentTypeAsync(string type, Type agentType, CancellationToken cancellationToken = default) { if (_agentTypes.TryAdd(type, agentType)) { @@ -223,55 +212,33 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent //StateType = state?.Name, //Events = { events } } - }, - _shutdownCts.Token).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); } } - public async ValueTask SendResponse(RpcResponse response, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default) { - _logger.LogInformation("Sending response '{Response}'.", response); await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false); } - - public async ValueTask SendRequest(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default) { - _logger.LogInformation("[{AgentId}] Sending request '{Request}'.", agent.AgentId, request); var requestId = Guid.NewGuid().ToString(); _pendingRequests[requestId] = (agent, request.RequestId); request.RequestId = requestId; - try - { - await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - if (_pendingRequests.TryRemove(requestId, out _)) - { - agent.ReceiveMessage(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = exception.Message } }); - } - } + await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); } - - public async ValueTask PublishEvent(CloudEvent @event, CancellationToken cancellationToken) + // new is intentional + public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default) { - try - { - await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - _logger.LogWarning(exception, "Failed to publish event '{Event}'.", @event); - } + await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false); } - - private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken) + private async Task WriteChannelAsync(Message message, CancellationToken cancellationToken = default) { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcs = new TaskCompletionSource(); await _outboundMessagesChannel.Writer.WriteAsync((message, tcs), cancellationToken).ConfigureAwait(false); - await tcs.Task.WaitAsync(cancellationToken); } - private AsyncDuplexStreamingCall GetChannel() { if (_channel is { } channel) @@ -307,7 +274,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent return _channel; } - public async Task StartAsync(CancellationToken cancellationToken) + public new async Task StartAsync(CancellationToken cancellationToken) { _channel = GetChannel(); StartCore(); @@ -315,10 +282,10 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent var tasks = new List(_agentTypes.Count); foreach (var (typeName, type) in _configuredAgentTypes) { - tasks.Add(RegisterAgentType(typeName, type).AsTask()); + tasks.Add(RegisterAgentTypeAsync(typeName, type, cancellationToken).AsTask()); } - await Task.WhenAll(tasks).ConfigureAwait(false); + await Task.WhenAll(tasks).ConfigureAwait(true); void StartCore() { @@ -344,7 +311,7 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent } } - public async Task StopAsync(CancellationToken cancellationToken) + public new async Task StopAsync(CancellationToken cancellationToken) { _shutdownCts.Cancel(); @@ -364,19 +331,20 @@ public sealed class GrpcAgentWorkerRuntime : IHostedService, IDisposable, IAgent _channel?.Dispose(); } } - public ValueTask Store(AgentState value, CancellationToken cancellationToken) + // new intentional + public new async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default) { var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); - var response = _client.SaveState(value, cancellationToken: cancellationToken); + var response = _client.SaveState(value, null, null, cancellationToken); if (!response.Success) { throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}."); } - return ValueTask.CompletedTask; } - public async ValueTask Read(AgentId agentId, CancellationToken cancellationToken) + // new intentional + public new async ValueTask ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) { - var response = await _client.GetStateAsync(agentId, cancellationToken: cancellationToken); + var response = await _client.GetStateAsync(agentId).ConfigureAwait(true); // if (response.Success && response.AgentState.AgentId is not null) - why is success always false? if (response.AgentState.AgentId is not null) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs new file mode 100644 index 0000000000..670411b336 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcAgentWorkerHostBuilderExtension.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GrpcAgentWorkerHostBuilderExtension.cs + +using Grpc.Core; +using Grpc.Net.Client.Configuration; +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.AutoGen.Agents; + +public static class GrpcAgentWorkerHostBuilderExtensions +{ + private const string _defaultAgentServiceAddress = "https://localhost:5001"; + public static IHostApplicationBuilder AddGrpcAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress) + { + builder.Services.AddGrpcClient(options => + { + options.Address = new Uri(agentServiceAddress); + options.ChannelOptionsActions.Add(channelOptions => + { + + channelOptions.HttpHandler = new SocketsHttpHandler + { + EnableMultipleHttp2Connections = true, + KeepAlivePingDelay = TimeSpan.FromSeconds(20), + KeepAlivePingTimeout = TimeSpan.FromSeconds(10), + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests + }; + + var methodConfig = new MethodConfig + { + Names = { MethodName.Default }, + RetryPolicy = new RetryPolicy + { + MaxAttempts = 5, + InitialBackoff = TimeSpan.FromSeconds(1), + MaxBackoff = TimeSpan.FromSeconds(5), + BackoffMultiplier = 1.5, + RetryableStatusCodes = { StatusCode.Unavailable } + } + }; + + channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } }; + channelOptions.ThrowOperationCanceledOnCancellation = true; + }); + }); + builder.Services.AddSingleton(); + return builder; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs similarity index 56% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index cca25d36a8..89e9c55c46 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -1,5 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerGateway.cs +// GrpcGateway.cs using System.Collections.Concurrent; using Grpc.Core; @@ -7,91 +7,60 @@ using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerGateway : BackgroundService, IWorkerGateway +public sealed class GrpcGateway : BackgroundService, IGateway { private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); - - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IClusterClient _clusterClient; - private readonly IAgentWorkerRegistryGrain _gatewayRegistry; - private readonly IWorkerGateway _reference; - - // The local mapping of agents to worker processes. - private readonly ConcurrentDictionary _workers = new(); - + private readonly ConcurrentDictionary _agentState = new(); + private readonly IRegistryGrain _gatewayRegistry; + private readonly IGateway _reference; // The agents supported by each worker process. - private readonly ConcurrentDictionary> _supportedAgentTypes = []; + private readonly ConcurrentDictionary> _supportedAgentTypes = []; + public readonly ConcurrentDictionary _workers = new(); // The mapping from agent id to worker process. - private readonly ConcurrentDictionary<(string Type, string Key), WorkerProcessConnection> _agentDirectory = new(); - + private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); // RPC - private readonly ConcurrentDictionary<(WorkerProcessConnection, string), TaskCompletionSource> _pendingRequests = new(); + private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource> _pendingRequests = new(); + // InMemory Message Queue - public WorkerGateway(IClusterClient clusterClient, ILogger logger) + public GrpcGateway(IClusterClient clusterClient, ILogger logger) { _logger = logger; _clusterClient = clusterClient; - _reference = clusterClient.CreateObjectReference(this); - _gatewayRegistry = clusterClient.GetGrain(0); + _reference = clusterClient.CreateObjectReference(this); + _gatewayRegistry = clusterClient.GetGrain(0); } - public async ValueTask BroadcastEvent(CloudEvent evt) { // TODO: filter the workers that receive the event var tasks = new List(_workers.Count); - foreach (var (_, connection) in _workers) + foreach (var (_, connection) in _supportedAgentTypes) { - tasks.Add(connection.SendMessage(new Message { CloudEvent = evt })); + + tasks.Add(this.SendMessageAsync((IConnection)connection[0], evt, default)); } - - await Task.WhenAll(tasks); + await Task.WhenAll(tasks).ConfigureAwait(false); } - - public async ValueTask InvokeRequest(RpcRequest request) + //intetionally not static so can be called by some methods implemented in base class + public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default) { - (string Type, string Key) agentId = (request.Target.Type, request.Target.Key); - if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) - { - // Activate the agent on a compatible worker process. - if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers)) - { - connection = workers[Random.Shared.Next(workers.Count)]; - _agentDirectory[agentId] = connection; - } - else - { - return new(new RpcResponse { Error = "Agent not found." }); - } - } - - // Proxy the request to the agent. - var originalRequestId = request.RequestId; - var newRequestId = Guid.NewGuid().ToString(); - var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); - request.RequestId = newRequestId; - await connection.ResponseStream.WriteAsync(new Message { Request = request }); - - // Wait for the response and send it back to the caller. - var response = await completion.Task.WaitAsync(s_agentResponseTimeout); - response.RequestId = originalRequestId; - return response; + var queue = (GrpcWorkerConnection)connection; + await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); } - - private void DispatchResponse(WorkerProcessConnection connection, RpcResponse response) + private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response) { if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion)) { _logger.LogWarning("Received response for unknown request."); return; } - // Complete the request. completion.SetResult(response); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) @@ -104,10 +73,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway { _logger.LogWarning(exception, "Error adding worker to registry."); } - await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken); } - try { await _gatewayRegistry.RemoveWorker(_reference); @@ -117,8 +84,8 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway _logger.LogWarning(exception, "Error removing worker from registry."); } } - - internal async Task OnReceivedMessageAsync(WorkerProcessConnection connection, Message message) + //new is intentional... + internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message) { _logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection); switch (message.MessageCase) @@ -139,63 +106,28 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway throw new InvalidOperationException($"Unknown message type for message '{message}'."); }; } - - private async ValueTask RegisterAgentTypeAsync(WorkerProcessConnection connection, RegisterAgentTypeRequest msg) + private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg) { connection.AddSupportedType(msg.Type); _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); - var success = false; - var error = String.Empty; - try - { - await _gatewayRegistry.RegisterAgentType(msg.Type, _reference); - success = true; - } - catch (InvalidOperationException exception) - { - error = $"Error registering agent type '{msg.Type}'."; - _logger.LogWarning(exception, error); - } - var request_id = msg.RequestId; - var response = new RegisterAgentTypeResponse { RequestId = request_id, Success = success, Error = error }; - await connection.SendMessage(new Message { RegisterAgentTypeResponse = response }); + await _gatewayRegistry.RegisterAgentType(msg.Type, _reference); } - private async ValueTask DispatchEventAsync(CloudEvent evt) { - await BroadcastEvent(evt); + await BroadcastEvent(evt).ConfigureAwait(false); /* var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(evt.Namespace, evt.Type)); await topic.OnNextAsync(evt.ToEvent()); */ } - - private async ValueTask DispatchRequestAsync(WorkerProcessConnection connection, RpcRequest request) + private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request) { var requestId = request.RequestId; if (request.Target is null) { throw new InvalidOperationException($"Request message is missing a target. Message: '{request}'."); } - - /* - if (string.Equals("runtime", request.Target.Type, StringComparison.Ordinal)) - { - if (string.Equals("subscribe", request.Method)) - { - await InvokeRequestDelegate(connection, request, async (_) => - { - await SubscribeToTopic(connection, request); - return new RpcResponse { Result = "Ok" }; - }); - - return; - } - } - else - { - */ await InvokeRequestDelegate(connection, request, async request => { var (gateway, isPlacement) = await _gatewayRegistry.GetOrPlaceAgent(request.Target); @@ -203,84 +135,51 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway { return new RpcResponse { Error = "Agent not found and no compatible gateways were found." }; } - if (isPlacement) { - // Activate the worker: load state - // TODO + // TODO// Activate the worker: load state } - // Forward the message to the gateway and return the result. - return await gateway.InvokeRequest(request); + return await gateway.InvokeRequest(request).ConfigureAwait(true); }); //} } - private static async Task InvokeRequestDelegate(WorkerProcessConnection connection, RpcRequest request, Func> func) + private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func> func) { try { var response = await func(request); response.RequestId = request.RequestId; - await connection.ResponseStream.WriteAsync(new Message { Response = response }); + await connection.ResponseStream.WriteAsync(new Message { Response = response }).ConfigureAwait(false); } catch (Exception ex) { - await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }); + await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false); } } - public async ValueTask Store(AgentState value) - { - var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); - var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); - await agentState.WriteStateAsync(value, value.ETag); - } - - public async ValueTask Read(AgentId agentId) - { - var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); - return await agentState.ReadStateAsync(); - } - /* - private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request) - { - // Subscribe to a topic - var parameters = JsonSerializer.Deserialize>(request.Data) - ?? throw new ArgumentException($"Request data does not contain required payload format: {{\"namespace\": \"string\", \"type\": \"string\"}}."); - var ns = parameters["namespace"]; - var type = parameters["type"]; - var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(ns: type, key: ns)); - await topic.SubscribeAsync(OnNextAsync); - return; - - async Task OnNextAsync(IList> items) - { - foreach (var item in items) - { - var evt = item.Item.ToRpcEvent(); - evt.Namespace = ns; - evt.Type = evt.Type; - var payload = new Dictionary - { - ["sequenceId"] = item.Token.SequenceNumber.ToString(), - ["eventIdx"] = item.Token.EventIndex.ToString() - }; - evt.Data = JsonSerializer.Serialize(payload); - await connection.ResponseStream.WriteAsync(new Message { Event = evt }); - } - } - } - */ - internal Task ConnectToWorkerProcess(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _logger.LogInformation("Received new connection from {Peer}.", context.Peer); - var workerProcess = new WorkerProcessConnection(this, requestStream, responseStream, context); + var workerProcess = new GrpcWorkerConnection(this, requestStream, responseStream, context); _workers[workerProcess] = workerProcess; return workerProcess.Completion; } + public async ValueTask StoreAsync(AgentState value) + { + var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); + _agentState[agentId.Key] = value; + } - internal void OnRemoveWorkerProcess(WorkerProcessConnection workerProcess) + public async ValueTask ReadAsync(AgentId agentId) + { + if (_agentState.TryGetValue(agentId.Key, out var state)) + { + return state; + } + return new AgentState { AgentId = agentId }; + } + internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess) { _workers.TryRemove(workerProcess, out _); var types = workerProcess.GetSupportedTypes(); @@ -291,14 +190,50 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway supported.Remove(workerProcess); } } - // Any agents activated on that worker are also gone. foreach (var pair in _agentDirectory) { if (pair.Value == workerProcess) { - ((IDictionary<(string Type, string Key), WorkerProcessConnection>)_agentDirectory).Remove(pair); + ((IDictionary<(string Type, string Key), GrpcWorkerConnection>)_agentDirectory).Remove(pair); } } } + public async ValueTask InvokeRequest(RpcRequest request, CancellationToken cancellationToken = default) + { + (string Type, string Key) agentId = (request.Target.Type, request.Target.Key); + if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) + { + // Activate the agent on a compatible worker process. + if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers)) + { + connection = workers[Random.Shared.Next(workers.Count)]; + _agentDirectory[agentId] = connection; + } + else + { + return new(new RpcResponse { Error = "Agent not found." }); + } + } + // Proxy the request to the agent. + var originalRequestId = request.RequestId; + var newRequestId = Guid.NewGuid().ToString(); + var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); + request.RequestId = newRequestId; + await connection.ResponseStream.WriteAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false); + // Wait for the response and send it back to the caller. + var response = await completion.Task.WaitAsync(s_agentResponseTimeout); + response.RequestId = originalRequestId; + return response; + } + + async ValueTask IGateway.InvokeRequest(RpcRequest request) + { + return await this.InvokeRequest(request).ConfigureAwait(false); + } + + Task IGateway.SendMessageAsync(IConnection connection, CloudEvent cloudEvent) + { + return this.SendMessageAsync(connection, cloudEvent); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs similarity index 67% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs index b5c4527854..e26f5c2bc9 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGatewayService.cs @@ -1,19 +1,24 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerGatewayService.cs +// GrpcGatewayService.cs using Grpc.Core; using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; // gRPC service which handles communication between the agent worker and the cluster. -internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc.AgentRpcBase +internal sealed class GrpcGatewayService : AgentRpc.AgentRpcBase { + private readonly GrpcGateway Gateway; + public GrpcGatewayService(GrpcGateway gateway) + { + Gateway = (GrpcGateway)gateway; + } public override async Task OpenChannel(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { try { - await agentWorker.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); + await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true); } catch { @@ -26,13 +31,13 @@ internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc } public override async Task GetState(AgentId request, ServerCallContext context) { - var state = await agentWorker.Read(request); + var state = await Gateway.ReadAsync(request); return new GetStateResponse { AgentState = state }; } public override async Task SaveState(AgentState request, ServerCallContext context) { - await agentWorker.Store(request); + await Gateway.StoreAsync(request); return new SaveStateResponse { Success = true // TODO: Implement error handling diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs similarity index 90% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs index d998ce2051..f2eb81c436 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerProcessConnection.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs @@ -1,13 +1,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerProcessConnection.cs +// GrpcWorkerConnection.cs using System.Threading.Channels; using Grpc.Core; using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerProcessConnection : IAsyncDisposable +internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection { private static long s_nextConnectionId; private readonly Task _readTask; @@ -15,10 +15,10 @@ internal sealed class WorkerProcessConnection : IAsyncDisposable private readonly string _connectionId = Interlocked.Increment(ref s_nextConnectionId).ToString(); private readonly object _lock = new(); private readonly HashSet _supportedTypes = []; - private readonly WorkerGateway _gateway; + private readonly GrpcGateway _gateway; private readonly CancellationTokenSource _shutdownCancellationToken = new(); - public WorkerProcessConnection(WorkerGateway agentWorker, IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + public GrpcWorkerConnection(GrpcGateway agentWorker, IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { _gateway = agentWorker; RequestStream = requestStream; diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Host.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs similarity index 72% rename from dotnet/src/Microsoft.AutoGen/Runtime/Host.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs index 73b53dd414..5b725af0c9 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Host.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Host.cs @@ -4,24 +4,24 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Hosting; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class Host { - public static async Task StartAsync(bool local = false) + public static async Task StartAsync(bool local = false, bool useGrpc = true) { var builder = WebApplication.CreateBuilder(); builder.AddServiceDefaults(); if (local) { - builder.AddLocalAgentService(); + builder.AddLocalAgentService(useGrpc); } else { - builder.AddAgentService(); + builder.AddAgentService(useGrpc); } var app = builder.Build(); - app.MapAgentService(); + app.MapAgentService(local, useGrpc); app.MapDefaultEndpoints(); await app.StartAsync().ConfigureAwait(false); return app; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs similarity index 82% rename from dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs index ea8870a511..8ded5cb03d 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/HostBuilderExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/HostBuilderExtensions.cs @@ -6,8 +6,6 @@ using System.Diagnostics.CodeAnalysis; using System.Reflection; using Google.Protobuf; using Google.Protobuf.Reflection; -using Grpc.Core; -using Grpc.Net.Client.Configuration; using Microsoft.AutoGen.Abstractions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -20,41 +18,18 @@ public static class HostBuilderExtensions private const string _defaultAgentServiceAddress = "https://localhost:5001"; public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress = _defaultAgentServiceAddress, bool local = false) { - builder.Services.AddGrpcClient(options => - { - options.Address = new Uri(agentServiceAddress); - options.ChannelOptionsActions.Add(channelOptions => - { - - channelOptions.HttpHandler = new SocketsHttpHandler - { - EnableMultipleHttp2Connections = true, - KeepAlivePingDelay = TimeSpan.FromSeconds(20), - KeepAlivePingTimeout = TimeSpan.FromSeconds(10), - KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests - }; - - var methodConfig = new MethodConfig - { - Names = { MethodName.Default }, - RetryPolicy = new RetryPolicy - { - MaxAttempts = 5, - InitialBackoff = TimeSpan.FromSeconds(1), - MaxBackoff = TimeSpan.FromSeconds(5), - BackoffMultiplier = 1.5, - RetryableStatusCodes = { StatusCode.Unavailable } - } - }; - - channelOptions.ServiceConfig = new() { MethodConfigs = { methodConfig } }; - channelOptions.ThrowOperationCanceledOnCancellation = true; - }); - }); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); - builder.Services.AddSingleton(); + + // if !local, then add the gRPC client + if (!local) + { + builder.AddGrpcAgentWorker(agentServiceAddress); + } + else + { + builder.Services.AddSingleton(); + } + builder.Services.AddSingleton(sp => (IHostedService)sp.GetRequiredService()); builder.Services.AddKeyedSingleton("EventTypes", (sp, key) => { var interfaceType = typeof(IMessage); @@ -122,6 +97,7 @@ public static class HostBuilderExtensions } return new EventTypes(typeRegistry, types, eventsMap); }); + builder.Services.AddSingleton(); return new AgentApplicationBuilder(builder); } @@ -159,7 +135,7 @@ public sealed class AgentTypes(Dictionary types) .SelectMany(assembly => assembly.GetTypes()) .Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(AgentBase)) && !type.IsAbstract - && !type.Name.Equals("AgentWorker")) + && !type.Name.Equals(nameof(Client))) .ToDictionary(type => type.Name, type => type); return new AgentTypes(agents); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs similarity index 82% rename from dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs index 7d752d1566..50d8c3ad45 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// WorkerAgentGrain.cs +// AgentStateGrain.cs using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IWorkerAgentGrain +internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IAgentState { public async ValueTask WriteStateAsync(AgentState newState, string eTag) { @@ -16,7 +16,7 @@ internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStor if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) { state.State = newState; - await state.WriteStateAsync(); + await state.WriteStateAsync().ConfigureAwait(false); } else { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs new file mode 100644 index 0000000000..87fd0aa38c --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/IRegistryGrain.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IRegistryGrain.cs +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Agents; + +public interface IRegistryGrain : IGrainWithIntegerKey +{ + ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId); + ValueTask RemoveWorker(IGateway worker); + ValueTask RegisterAgentType(string type, IGateway worker); + ValueTask AddWorker(IGateway worker); + ValueTask UnregisterAgentType(string type, IGateway worker); + ValueTask GetCompatibleWorker(string type); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs similarity index 96% rename from dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs index fae441e0ce..cd59bcefc3 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/OrleansRuntimeHostingExtenions.cs @@ -4,11 +4,12 @@ using System.Configuration; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Orleans.Configuration; using Orleans.Serialization; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; public static class OrleansRuntimeHostingExtenions { @@ -83,6 +84,7 @@ public static class OrleansRuntimeHostingExtenions .AddMemoryGrainStorage("PubSubStore"); } }); + builder.Services.AddSingleton(); return builder; } } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs similarity index 78% rename from dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs index 0e7ffa7f28..c5114e3e74 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerRegistryGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/RegistryGrain.cs @@ -1,16 +1,16 @@ // Copyright (c) Microsoft Corporation. All rights reserved. -// AgentWorkerRegistryGrain.cs +// RegistryGrain.cs using Microsoft.AutoGen.Abstractions; -namespace Microsoft.AutoGen.Runtime; +namespace Microsoft.AutoGen.Agents; -public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain +public sealed class RegistryGrain : Grain, IRegistryGrain { // TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively. - private readonly Dictionary _workerStates = []; - private readonly Dictionary> _supportedAgentTypes = []; - private readonly Dictionary<(string Type, string Key), IWorkerGateway> _agentDirectory = []; + private readonly Dictionary _workerStates = new(); + private readonly Dictionary> _supportedAgentTypes = []; + private readonly Dictionary<(string Type, string Key), IGateway> _agentDirectory = []; private readonly TimeSpan _agentTimeout = TimeSpan.FromMinutes(1); public override Task OnActivateAsync(CancellationToken cancellationToken) @@ -18,107 +18,7 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); return base.OnActivateAsync(cancellationToken); } - - private Task PurgeInactiveWorkers() - { - foreach (var (worker, state) in _workerStates) - { - if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout) - { - _workerStates.Remove(worker); - foreach (var type in state.SupportedTypes) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - } - } - } - - return Task.CompletedTask; - } - - public ValueTask AddWorker(IWorkerGateway worker) - { - GetOrAddWorker(worker); - return ValueTask.CompletedTask; - } - - private WorkerState GetOrAddWorker(IWorkerGateway worker) - { - if (!_workerStates.TryGetValue(worker, out var workerState)) - { - workerState = _workerStates[worker] = new(); - } - - workerState.LastSeen = DateTimeOffset.UtcNow; - return workerState; - } - - public ValueTask RegisterAgentType(string type, IWorkerGateway worker) - { - if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes)) - { - supportedAgentTypes = _supportedAgentTypes[type] = []; - } - - if (!supportedAgentTypes.Contains(worker)) - { - supportedAgentTypes.Add(worker); - } - - var workerState = GetOrAddWorker(worker); - workerState.SupportedTypes.Add(type); - - return ValueTask.CompletedTask; - } - - public ValueTask RemoveWorker(IWorkerGateway worker) - { - if (_workerStates.Remove(worker, out var state)) - { - foreach (var type in state.SupportedTypes) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - } - } - - return ValueTask.CompletedTask; - } - - public ValueTask UnregisterAgentType(string type, IWorkerGateway worker) - { - if (_workerStates.TryGetValue(worker, out var state)) - { - state.SupportedTypes.Remove(type); - } - - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - workers.Remove(worker); - } - - return ValueTask.CompletedTask; - } - - public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); - - private IWorkerGateway? GetCompatibleWorkerCore(string type) - { - if (_supportedAgentTypes.TryGetValue(type, out var workers)) - { - // Return a random compatible worker. - return workers[Random.Shared.Next(workers.Count)]; - } - - return null; - } - - public ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId) + public ValueTask<(IGateway? Worker, bool NewPlacement)> GetOrPlaceAgent(AgentId agentId) { // TODO: bool isNewPlacement; @@ -142,9 +42,98 @@ public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain // Existing activation. isNewPlacement = false; } - return new((worker, isNewPlacement)); } + public ValueTask RemoveWorker(IGateway worker) + { + if (_workerStates.Remove(worker, out var state)) + { + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + return ValueTask.CompletedTask; + } + public ValueTask RegisterAgentType(string type, IGateway worker) + { + if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes)) + { + supportedAgentTypes = _supportedAgentTypes[type] = []; + } + + if (!supportedAgentTypes.Contains(worker)) + { + supportedAgentTypes.Add(worker); + } + var workerState = GetOrAddWorker(worker); + workerState.SupportedTypes.Add(type); + return ValueTask.CompletedTask; + } + public ValueTask AddWorker(IGateway worker) + { + GetOrAddWorker(worker); + return ValueTask.CompletedTask; + } + public ValueTask UnregisterAgentType(string type, IGateway worker) + { + if (_workerStates.TryGetValue(worker, out var state)) + { + state.SupportedTypes.Remove(type); + } + + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + return ValueTask.CompletedTask; + } + private Task PurgeInactiveWorkers() + { + foreach (var (worker, state) in _workerStates) + { + if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout) + { + _workerStates.Remove(worker); + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + } + + return Task.CompletedTask; + } + + private WorkerState GetOrAddWorker(IGateway worker) + { + if (!_workerStates.TryGetValue(worker, out var workerState)) + { + workerState = _workerStates[worker] = new(); + } + + workerState.LastSeen = DateTimeOffset.UtcNow; + return workerState; + } + + public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); + + private IGateway? GetCompatibleWorkerCore(string type) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + // Return a random compatible worker. + return workers[Random.Shared.Next(workers.Count)]; + } + + return null; + } private sealed class WorkerState { diff --git a/dotnet/src/Microsoft.AutoGen/ServiceDefaults/Extensions.cs b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Extensions.cs similarity index 100% rename from dotnet/src/Microsoft.AutoGen/ServiceDefaults/Extensions.cs rename to dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Extensions.cs diff --git a/dotnet/src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj b/dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj similarity index 100% rename from dotnet/src/Microsoft.AutoGen/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj rename to dotnet/src/Microsoft.AutoGen/Extensions/ServiceDefaults/Microsoft.AutoGen.ServiceDefaults.csproj diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs deleted file mode 100644 index bda83f16f7..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentWorkerRegistryGrain.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IAgentWorkerRegistryGrain.cs - -using Microsoft.AutoGen.Abstractions; - -namespace Microsoft.AutoGen.Runtime; - -public interface IAgentWorkerRegistryGrain : IGrainWithIntegerKey -{ - ValueTask RegisterAgentType(string type, IWorkerGateway worker); - ValueTask UnregisterAgentType(string type, IWorkerGateway worker); - ValueTask AddWorker(IWorkerGateway worker); - ValueTask RemoveWorker(IWorkerGateway worker); - ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs deleted file mode 100644 index 54ca077de1..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// IWorkerGateway.cs - -using Microsoft.AutoGen.Abstractions; - -namespace Microsoft.AutoGen.Runtime; - -public interface IWorkerGateway : IGrainObserver -{ - ValueTask InvokeRequest(RpcRequest request); - ValueTask BroadcastEvent(CloudEvent evt); - ValueTask Store(AgentState value); - ValueTask Read(AgentId agentId); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj deleted file mode 100644 index 40a240c2f6..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj +++ /dev/null @@ -1,35 +0,0 @@ - - - - net8.0 - enable - enable - AutoGen.Worker.Server - https://github.com/microsoft/agnext - Microsoft - AutoGen Worker Server Library - ai-agents;event-driven-agents - $(NoWarn);CS8002 - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs similarity index 80% rename from dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs rename to dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs index 4e17dd56f5..b10f82e7d4 100644 --- a/dotnet/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs @@ -4,6 +4,7 @@ using FluentAssertions; using Google.Protobuf.Reflection; using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; using Moq; using Xunit; @@ -14,8 +15,8 @@ public class AgentBaseTests [Fact] public async Task ItInvokeRightHandlerTestAsync() { - var mockContext = new Mock(); - var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], [])); + var mockContext = new Mock(); + var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger(new LoggerFactory())); await agent.HandleObject("hello world"); await agent.HandleObject(42); @@ -30,7 +31,7 @@ public class AgentBaseTests /// public class TestAgent : AgentBase, IHandle, IHandle { - public TestAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) + public TestAgent(IAgentRuntime context, EventTypes eventTypes, Logger logger) : base(context, eventTypes, logger) { } diff --git a/dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj b/dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj similarity index 74% rename from dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj rename to dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj index ca18cf97c5..db7467bf12 100644 --- a/dotnet/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/Microsoft.AutoGen.Agents.Tests.csproj @@ -8,7 +8,7 @@ - + diff --git a/protos/agent_events.proto b/protos/agent_events.proto index 19b41482e3..811c888c64 100644 --- a/protos/agent_events.proto +++ b/protos/agent_events.proto @@ -35,3 +35,6 @@ message ConversationClosed { string user_id = 1; string user_message = 2; } +message Shutdown { + string message = 1; +}