From 14846a3e84b4bbc6b019f67d7e28ebc83c47a986 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Mon, 28 Oct 2024 17:28:36 -0700 Subject: [PATCH] first draft of stateful persistence grains for each agent.... (#3954) * adds Orleans persistence for AgentState --- dotnet/AutoGen.sln | 7 + .../HelloAgentState/HelloAgentState.csproj | 21 +++ .../samples/Hello/HelloAgentState/Program.cs | 75 ++++++++++ .../samples/Hello/HelloAgentState/README.md | 138 ++++++++++++++++++ .../{AgentState.cs => ChatState.cs} | 5 +- .../Abstractions/MessageExtensions.cs | 22 ++- .../Microsoft.AutoGen.Abstractions.csproj | 1 - .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 14 +- .../Microsoft.AutoGen/Agents/AgentClient.cs | 12 +- .../Microsoft.AutoGen/Agents/AgentContext.cs | 11 +- .../Agents/AgentWorkerRuntime.cs | 25 +++- .../Agents/Agents/AIAgent/InferenceAgent.cs | 3 +- .../Agents/Agents/AIAgent/SKAiAgent.cs | 1 + .../IOAgent/ConsoleAgent/ConsoleAgent.cs | 2 +- .../Agents/IOAgent/FileAgent/FileAgent.cs | 2 +- .../Agents/Agents/IOAgent/IOAgent.cs | 8 +- .../Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs | 2 +- .../Microsoft.AutoGen/Agents/IAgentBase.cs | 22 +++ .../Microsoft.AutoGen/Agents/IAgentContext.cs | 2 + .../Runtime/AgentStateGrain.cs | 20 --- .../Runtime/AgentWorkerHostingExtensions.cs | 35 ++--- .../Runtime/IAgentStateGrain.cs | 7 - .../Runtime/IWorkerAgentGrain.cs | 9 ++ .../Runtime/IWorkerGateway.cs | 2 + .../Runtime/Microsoft.AutoGen.Runtime.csproj | 9 ++ .../Runtime/OrleansRuntimeHostingExtenions.cs | 85 +++++++++++ .../Runtime/WorkerAgentGrain.cs | 31 ++++ .../Runtime/WorkerGateway.cs | 12 ++ .../Runtime/WorkerGatewayService.cs | 14 ++ protos/agent_worker.proto | 23 +++ .../_worker_runtime_host_servicer.py | 14 ++ .../application/protos/agent_worker_pb2.py | 16 +- .../application/protos/agent_worker_pb2.pyi | 76 ++++++++++ .../protos/agent_worker_pb2_grpc.py | 66 +++++++++ .../protos/agent_worker_pb2_grpc.pyi | 34 +++++ 35 files changed, 749 insertions(+), 77 deletions(-) create mode 100644 dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj create mode 100644 dotnet/samples/Hello/HelloAgentState/Program.cs create mode 100644 dotnet/samples/Hello/HelloAgentState/README.md rename dotnet/src/Microsoft.AutoGen/Abstractions/{AgentState.cs => ChatState.cs} (65%) create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index 1106ebf844..83147d38dc 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensi EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -335,6 +337,10 @@ Global {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -395,6 +401,7 @@ Global {A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj new file mode 100644 index 0000000000..eb2ba96d66 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + Exe + net8.0 + enable + enable + + + diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs new file mode 100644 index 0000000000..6880bdd616 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.AutoGen.Abstractions; +using Microsoft.AutoGen.Agents; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +// send a message to the agent +var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived +{ + Message = "World" +}, local: false); + +await app.WaitForShutdownAsync(); + +namespace Hello +{ + [TopicSubscription("HelloAgents")] + public class HelloAgent( + IAgentContext context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + context, + typeRegistry), + ISayHello, + IHandle, + IHandle + { + private AgentState? State { get; set; } + public async Task Handle(NewMessageReceived item) + { + var response = await SayHello(item.Message).ConfigureAwait(false); + var evt = new Output + { + Message = response + }.ToCloudEvent(this.AgentId.Key); + var entry = "We said hello to " + item.Message; + await Store(new AgentState + { + AgentId = this.AgentId, + TextData = entry + }).ConfigureAwait(false); + await PublishEvent(evt).ConfigureAwait(false); + var goodbye = new ConversationClosed + { + UserId = this.AgentId.Key, + UserMessage = "Goodbye" + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(goodbye).ConfigureAwait(false); + } + public async Task Handle(ConversationClosed item) + { + State = await Read(this.AgentId).ConfigureAwait(false); + var read = State?.TextData ?? "No state data found"; + var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************"; + var evt = new Output + { + Message = goodbye + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(evt).ConfigureAwait(false); + //sleep + await Task.Delay(10000).ConfigureAwait(false); + await AgentsApp.ShutdownAsync().ConfigureAwait(false); + + } + public async Task SayHello(string ask) + { + var response = $"\n\n\n\n***************Hello {ask}**********************\n\n\n\n"; + return response; + } + } + public interface ISayHello + { + public Task SayHello(string ask); + } +} diff --git a/dotnet/samples/Hello/HelloAgentState/README.md b/dotnet/samples/Hello/HelloAgentState/README.md new file mode 100644 index 0000000000..06c4883182 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/README.md @@ -0,0 +1,138 @@ +# AutoGen 0.4 .NET Hello World Sample + +This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response. + +## Prerequisites + +To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later. +Also recommended is the [GitHub CLI](https://cli.github.com/). + +## Instructions to run the sample + +```bash +# Clone the repository +gh repo clone microsoft/autogen +cd dotnet/samples/Hello +dotnet run +``` + +## Key Concepts + +This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages. + +Flow Diagram: + +```mermaid +%%{init: {'theme':'forest'}}%% +graph LR; + A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + C --> D{"WriteConsole()"} + B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEvent(Output('***Goodbye***'))"| C + E --> F{"Shutdown()"} + +``` + +### Writing Event Handlers + +The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data. + +Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Abstractions;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface. + +```csharp +TopicSubscription("HelloAgents")] +public class HelloAgent( + IAgentContext context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + context, + typeRegistry), + ISayHello, + IHandle, + IHandle +{ + public async Task Handle(NewMessageReceived item) + { + var response = await SayHello(item.Message).ConfigureAwait(false); + var evt = new Output + { + Message = response + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(evt).ConfigureAwait(false); + var goodbye = new ConversationClosed + { + UserId = this.AgentId.Key, + UserMessage = "Goodbye" + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(goodbye).ConfigureAwait(false); + } +``` + +### Inheritance and Composition + +This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method. + +### Starting the Application Runtime + +AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method. + +```csharp +// send a message to the agent +var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived +{ + Message = "World" +}, local: true); + +await App.RuntimeApp!.WaitForShutdownAsync(); +await app.WaitForShutdownAsync(); +``` + +### Sending Messages + +The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file: + +```proto +syntax = "proto3"; +package devteam; +option csharp_namespace = "DevTeam.Shared"; +message NewAsk { + string org = 1; + string repo = 2; + string ask = 3; + int64 issue_number = 4; +} +message ReadmeRequested { + string org = 1; + string repo = 2; + int64 issue_number = 3; + string ask = 4; +} +``` + +```xml + + + + + +``` + +You can send messages using the [```Microsoft.AutoGen.Agents``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus. + +### Managing State + +There is a simple API for persisting agent state. + +```csharp + await Store(new AgentState + { + AgentId = this.AgentId, + TextData = entry + }).ConfigureAwait(false); +``` + +which can be read back using Read: + +```csharp + State = await Read(this.AgentId).ConfigureAwait(false); +``` diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs similarity index 65% rename from dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs index 53093bc9b9..8185c153d9 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs @@ -1,6 +1,9 @@ +using Google.Protobuf; + namespace Microsoft.AutoGen.Abstractions; -public class AgentState where T : class, new() +public class ChatState + where T : IMessage, new() { public List History { get; set; } = new(); public T Data { get; set; } = new(); diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs index 724a706b10..5fa09ae218 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs @@ -16,9 +16,29 @@ public static class MessageExtensions }; } - public static T FromCloudEvent(this CloudEvent cloudEvent) where T : IMessage, new() { return cloudEvent.ProtoData.Unpack(); } + public static AgentState ToAgentState(this T state, AgentId agentId, string eTag) where T : IMessage + { + return new AgentState + { + ProtoData = Any.Pack(state), + AgentId = agentId, + ETag = eTag + }; + } + + public static T FromAgentState(this AgentState state) where T : IMessage, new() + { + if (state.HasTextData == true) + { + if (typeof(T) == typeof(AgentState)) + { + return (T)(IMessage)state; + } + } + return state.ProtoData.Unpack(); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj index fe480940cb..52f933e195 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj @@ -14,7 +14,6 @@ - diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 6307988a46..62779f8366 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -108,7 +108,16 @@ public abstract class AgentBase break; } } - + protected async Task Store(AgentState state) + { + await _context.Store(state).ConfigureAwait(false); + return; + } + protected async Task Read(AgentId agentId) where T : IMessage, new() + { + var agentstate = await _context.Read(agentId).ConfigureAwait(false); + return agentstate.FromAgentState(); + } private void OnResponseCore(RpcResponse response) { var requestId = response.RequestId; @@ -186,7 +195,6 @@ public abstract class AgentBase protected async ValueTask PublishEvent(CloudEvent item) { - //TODO: Reimplement var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default); activity?.SetTag("peer.service", $"{item.Type}/{item.Source}"); @@ -200,7 +208,7 @@ public abstract class AgentBase }, (this, item, completion), activity, - item.Type).ConfigureAwait(false);// TODO: It's not the descriptor's name probably + item.Type).ConfigureAwait(false); } public Task CallHandler(CloudEvent item) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs index 9611a50386..369d717af3 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs @@ -5,14 +5,12 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Agents; - public sealed class AgentClient(ILogger logger, AgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator, [FromKeyedServices("EventTypes")] EventTypes eventTypes) : AgentBase(new ClientContext(logger, runtime, distributedContextPropagator), eventTypes) { public async ValueTask PublishEventAsync(CloudEvent evt) => await PublishEvent(evt); public async ValueTask SendRequestAsync(AgentId target, string method, Dictionary parameters) => await RequestAsync(target, method, parameters); - public async ValueTask PublishEventAsync(string topic, IMessage evt) { await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false); @@ -23,12 +21,10 @@ public sealed class AgentClient(ILogger logger, AgentWorkerRuntime public AgentBase? AgentInstance { get; set; } public ILogger Logger { get; } = logger; public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask PublishEventAsync(CloudEvent @event) { await runtime.PublishEvent(@event).ConfigureAwait(false); } - public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) { await runtime.SendRequest(AgentInstance!, request).ConfigureAwait(false); @@ -38,5 +34,13 @@ public sealed class AgentClient(ILogger logger, AgentWorkerRuntime { await runtime.SendResponse(response).ConfigureAwait(false); } + public ValueTask Store(AgentState value) + { + throw new NotImplementedException(); + } + public ValueTask Read(AgentId agentId) + { + throw new NotImplementedException(); + } } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs index 43d1137c86..779cc86a60 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs @@ -12,20 +12,25 @@ internal sealed class AgentContext(AgentId agentId, AgentWorkerRuntime runtime, public ILogger Logger { get; } = logger; public AgentBase? AgentInstance { get; set; } public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response) { response.RequestId = request.RequestId; await _runtime.SendResponse(response); } - public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) { await _runtime.SendRequest(agent, request); } - public async ValueTask PublishEventAsync(CloudEvent @event) { await _runtime.PublishEvent(@event); } + public async ValueTask Store(AgentState value) + { + await _runtime.Store(value); + } + public async ValueTask Read(AgentId agentId) + { + return await _runtime.Read(agentId); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs index d0df48f71b..f335881fc0 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs @@ -84,7 +84,6 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork request.Agent.ReceiveMessage(message); break; case Message.MessageOneofCase.CloudEvent: - // TODO: Reimplement // HACK: Send the message to an instance of each agent type // where AgentId = (namespace: event.Namespace, name: agentType) @@ -323,10 +322,32 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork _channel?.Dispose(); } } - public ValueTask SendRequest(RpcRequest request) { throw new NotImplementedException(); } + public ValueTask Store(AgentState value) + { + var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); + var response = _client.SaveState(value); + if (!response.Success) + { + throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}."); + } + return ValueTask.CompletedTask; + } + public async ValueTask Read(AgentId agentId) + { + var response = await _client.GetStateAsync(agentId); + // if (response.Success && response.AgentState.AgentId is not null) - why is success always false? + if (response.AgentState.AgentId is not null) + { + return response.AgentState; + } + else + { + throw new KeyNotFoundException($"Failed to read AgentState for {agentId}."); + } + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs index e1f932fa66..15c4fc095f 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs @@ -1,6 +1,7 @@ +using Google.Protobuf; using Microsoft.Extensions.AI; namespace Microsoft.AutoGen.Agents.Client; -public abstract class InferenceAgent : AgentBase where T : class, new() +public abstract class InferenceAgent : AgentBase where T : IMessage, new() { protected IChatClient ChatClient { get; } public InferenceAgent( diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs index 84bd2f8219..becd2c208f 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs @@ -40,6 +40,7 @@ public abstract class SKAiAgent : AgentBase where T : class, new() var function = _kernel.CreateFunctionFromPrompt(template, promptSettings); var result = (await _kernel.InvokeAsync(function, arguments).ConfigureAwait(true)).ToString(); AddToHistory(result, ChatUserType.Agent); + //await Store(_state.Data.ToAgentState(AgentId,""));//TODO add eTag return result; } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs index c6e9f4392d..2df6c79650 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs @@ -3,7 +3,7 @@ using Microsoft.Extensions.DependencyInjection; namespace Microsoft.AutoGen.Agents; -public abstract class ConsoleAgent : IOAgent, +public abstract class ConsoleAgent : IOAgent, IUseConsole, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs index f8bf463042..2149a32d23 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs @@ -10,7 +10,7 @@ public abstract class FileAgent( [FromKeyedServices("EventTypes")] EventTypes typeRegistry, string inputPath = "input.txt", string outputPath = "output.txt" - ) : IOAgent(context, typeRegistry), + ) : IOAgent(context, typeRegistry), IUseFiles, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs index 7d14387203..fc0f497331 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs @@ -2,16 +2,12 @@ using Microsoft.AutoGen.Abstractions; namespace Microsoft.AutoGen.Agents; -public abstract class IOAgent : AgentBase where T : class, new() +public abstract class IOAgent : AgentBase { - protected AgentState _state; public string _route = "base"; - - public IOAgent(IAgentContext context, EventTypes typeRegistry) : base(context, typeRegistry) + protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) { - _state = new(); } - public virtual async Task Handle(Input item) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs index 47d107d63d..418ef8d5ab 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Agents; -public abstract class WebAPIAgent : IOAgent, +public abstract class WebAPIAgent : IOAgent, IUseWebAPI, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs new file mode 100644 index 0000000000..122dff2c62 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs @@ -0,0 +1,22 @@ +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents +{ + public interface IAgentBase + { + // Properties + string AgentId { get; } + ILogger Logger { get; } + IAgentContext Context { get; } + + // Methods + Task CallHandler(CloudEvent item); + Task HandleRequest(RpcRequest request); + Task Start(); + Task ReceiveMessage(Message message); + Task Store(AgentState state); + Task Read(AgentId agentId); + Task PublishEvent(CloudEvent item); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs index a7911e37e5..0dfa78b36e 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs @@ -10,6 +10,8 @@ public interface IAgentContext AgentBase? AgentInstance { get; set; } DistributedContextPropagator DistributedContextPropagator { get; } ILogger Logger { get; } + ValueTask Store(AgentState value); + ValueTask Read(AgentId agentId); ValueTask SendResponseAsync(RpcRequest request, RpcResponse response); ValueTask SendRequestAsync(AgentBase agent, RpcRequest request); ValueTask PublishEventAsync(CloudEvent @event); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs deleted file mode 100644 index d717e26f46..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Microsoft.AutoGen.Runtime; - -internal sealed class AgentStateGrain([PersistentState("state", "agent-state")] IPersistentState> state) : Grain, IAgentStateGrain -{ - public ValueTask<(Dictionary State, string ETag)> ReadStateAsync() - { - return new((state.State, state.Etag)); - } - - public async ValueTask WriteStateAsync(Dictionary value, string eTag) - { - if (string.Equals(state.Etag, eTag, StringComparison.Ordinal)) - { - state.State = value; - await state.WriteStateAsync(); - } - - return state.Etag; - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs index 48e911f351..447b527417 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs @@ -5,19 +5,27 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; -using Orleans.Serialization; namespace Microsoft.AutoGen.Runtime; public static class AgentWorkerHostingExtensions { - public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuilder builder) + public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false) { + if (local) + { + //TODO: make configuration more flexible + builder.WebHost.ConfigureKestrel(serverOptions => + { + serverOptions.ListenLocalhost(5001, listenOptions => + { + listenOptions.Protocols = HttpProtocols.Http2; + listenOptions.UseHttps(); + }); + }); + } builder.Services.AddGrpc(); - builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer()); - - // Ensure Orleans is added before the hosted service to guarantee that it starts first. - builder.UseOrleans(); + builder.AddOrleans(local); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => sp.GetRequiredService()); @@ -27,22 +35,9 @@ public static class AgentWorkerHostingExtensions public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder) { - builder.WebHost.ConfigureKestrel(serverOptions => - { - serverOptions.ListenLocalhost(5001, listenOptions => - { - listenOptions.Protocols = HttpProtocols.Http2; - listenOptions.UseHttps(); - }); - }); - builder.AddAgentService(); - builder.UseOrleans(siloBuilder => - { - siloBuilder.UseLocalhostClustering(); ; - }); + builder.AddAgentService(local: true); return builder; } - public static WebApplication MapAgentService(this WebApplication app) { app.MapGrpcService(); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs deleted file mode 100644 index b5ece3ad6f..0000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Microsoft.AutoGen.Runtime; - -internal interface IAgentStateGrain : IGrainWithStringKey -{ - ValueTask<(Dictionary State, string ETag)> ReadStateAsync(); - ValueTask WriteStateAsync(Dictionary state, string eTag); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs new file mode 100644 index 0000000000..ce93b9a41e --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs @@ -0,0 +1,9 @@ +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Runtime; + +internal interface IWorkerAgentGrain : IGrainWithStringKey +{ + ValueTask ReadStateAsync(); + ValueTask WriteStateAsync(AgentState state, string eTag); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs index c48c0fa8a6..ec63cdcc88 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs @@ -7,4 +7,6 @@ public interface IWorkerGateway : IGrainObserver { ValueTask InvokeRequest(RpcRequest request); ValueTask BroadcastEvent(CloudEvent evt); + ValueTask Store(AgentState value); + ValueTask Read(AgentId agentId); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj index 37e1bd2926..40a240c2f6 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj +++ b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj @@ -21,6 +21,15 @@ + + + + + + + + + diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs b/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs new file mode 100644 index 0000000000..3f980cf85d --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs @@ -0,0 +1,85 @@ +using System.Configuration; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Orleans.Configuration; +using Orleans.Serialization; + +namespace Microsoft.AutoGen.Runtime; + +public static class OrleansRuntimeHostingExtenions +{ + public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builder, bool local = false) + { + + builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer()); + // Ensure Orleans is added before the hosted service to guarantee that it starts first. + //TODO: make all of this configurable + builder.Host.UseOrleans(siloBuilder => + { + // Development mode or local mode uses in-memory storage and streams + if (builder.Environment.IsDevelopment() || local) + { + siloBuilder.UseLocalhostClustering() + .AddMemoryStreams("StreamProvider") + .AddMemoryGrainStorage("PubSubStore") + .AddMemoryGrainStorage("AgentStateStore"); + + siloBuilder.UseInMemoryReminderService(); + siloBuilder.UseDashboard(x => x.HostSelf = true); + + siloBuilder.UseInMemoryReminderService(); + } + else + { + var cosmosDbconnectionString = builder.Configuration.GetValue("Orleans:CosmosDBConnectionString") ?? + throw new ConfigurationErrorsException( + "Orleans:CosmosDBConnectionString is missing from configuration. This is required for persistence in production environments."); + siloBuilder.Configure(options => + { + //TODO: make this configurable + options.ClusterId = "AutoGen-cluster"; + options.ServiceId = "AutoGen-cluster"; + }); + siloBuilder.Configure(options => + { + options.ResponseTimeout = TimeSpan.FromMinutes(3); + options.SystemResponseTimeout = TimeSpan.FromMinutes(3); + }); + siloBuilder.Configure(options => + { + options.ResponseTimeout = TimeSpan.FromMinutes(3); + }); + siloBuilder.UseCosmosClustering(o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "clustering"; + o.IsResourceCreationEnabled = true; + }); + + siloBuilder.UseCosmosReminderService(o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "reminders"; + o.IsResourceCreationEnabled = true; + }); + siloBuilder.AddCosmosGrainStorage( + name: "AgentStateStore", + configureOptions: o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "persistence"; + o.IsResourceCreationEnabled = true; + }); + //TODO: replace with EventHub + siloBuilder + .AddMemoryStreams("StreamProvider") + .AddMemoryGrainStorage("PubSubStore"); + } + }); + return builder; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs new file mode 100644 index 0000000000..3bbe7d78cd --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs @@ -0,0 +1,31 @@ +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Runtime; + +internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IWorkerAgentGrain +{ + public async ValueTask WriteStateAsync(AgentState newState, string eTag) + { + // etags for optimistic concurrency control + // if the Etag is null, its a new state + // if the passed etag is null or empty, we should not check the current state's Etag - caller doesnt care + // if both etags are set, they should match or it means that the state has changed since the last read. + if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) + { + state.State = newState; + await state.WriteStateAsync(); + } + else + { + //TODO - this is probably not the correct behavior to just throw - I presume we want to somehow let the caller know that the state has changed and they need to re-read it + throw new ArgumentException( + "The provided ETag does not match the current ETag. The state has been modified by another request."); + } + return state.Etag; + } + + public ValueTask ReadStateAsync() + { + return ValueTask.FromResult(state.State); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs index 6cb26bc1c7..6d549ef727 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs @@ -37,6 +37,7 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway public async ValueTask BroadcastEvent(CloudEvent evt) { + // TODO: filter the workers that receive the event var tasks = new List(_workers.Count); foreach (var (_, connection) in _workers) { @@ -211,7 +212,18 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }); } } + public async ValueTask Store(AgentState value) + { + var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); + var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); + await agentState.WriteStateAsync(value, value.ETag); + } + public async ValueTask Read(AgentId agentId) + { + var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); + return await agentState.ReadStateAsync(); + } /* private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request) { diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs index b817bc0492..8600aa5fd2 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs @@ -21,4 +21,18 @@ internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc throw; } } + public override async Task GetState(AgentId request, ServerCallContext context) + { + var state = await agentWorker.Read(request); + return new GetStateResponse { AgentState = state }; + } + + public override async Task SaveState(AgentState request, ServerCallContext context) + { + await agentWorker.Store(request); + return new SaveStateResponse + { + Success = true // TODO: Implement error handling + }; + } } diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto index ec472923be..7b0b5245dd 100644 --- a/protos/agent_worker.proto +++ b/protos/agent_worker.proto @@ -82,6 +82,29 @@ message AddSubscriptionResponse { service AgentRpc { rpc OpenChannel (stream Message) returns (stream Message); + rpc GetState(AgentId) returns (GetStateResponse); + rpc SaveState(AgentState) returns (SaveStateResponse); +} + +message AgentState { + AgentId agent_id = 1; + string eTag = 2; + oneof data { + bytes binary_data = 3; + string text_data = 4; + google.protobuf.Any proto_data = 5; + } +} + +message GetStateResponse { + AgentState agent_state = 1; + bool success = 2; + optional string error = 3; +} + +message SaveStateResponse { + bool success = 1; + optional string error = 2; } message Message { diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py index 9308edbcd8..1ed794c35f 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py @@ -243,3 +243,17 @@ class WorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer): ) case None: logger.warning("Received empty subscription message") + + async def GetState( # type: ignore + self, + request: agent_worker_pb2.AgentId, + context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.GetStateResponse], + ) -> agent_worker_pb2.GetStateResponse: # type: ignore + raise NotImplementedError("Method not implemented!") + + async def SaveState( # type: ignore + self, + request: agent_worker_pb2.AgentState, + context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.SaveStateResponse], + ) -> agent_worker_pb2.SaveStateResponse: # type: ignore + raise NotImplementedError("Method not implemented!") diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py index cfbc0522b8..0637e866c4 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py @@ -16,7 +16,7 @@ import cloudevent_pb2 as cloudevent__pb2 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2?\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x42!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -60,8 +60,14 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397 - _globals['_MESSAGE']._serialized_start=1400 - _globals['_MESSAGE']._serialized_end=1854 - _globals['_AGENTRPC']._serialized_start=1856 - _globals['_AGENTRPC']._serialized_end=1919 + _globals['_AGENTSTATE']._serialized_start=1400 + _globals['_AGENTSTATE']._serialized_end=1557 + _globals['_GETSTATERESPONSE']._serialized_start=1559 + _globals['_GETSTATERESPONSE']._serialized_end=1665 + _globals['_SAVESTATERESPONSE']._serialized_start=1667 + _globals['_SAVESTATERESPONSE']._serialized_end=1733 + _globals['_MESSAGE']._serialized_start=1736 + _globals['_MESSAGE']._serialized_end=2190 + _globals['_AGENTRPC']._serialized_start=2193 + _globals['_AGENTRPC']._serialized_end=2371 # @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi index 6c57fa8a9f..522124ab88 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi @@ -6,6 +6,7 @@ isort:skip_file import builtins import cloudevent_pb2 import collections.abc +import google.protobuf.any_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message @@ -333,6 +334,81 @@ class AddSubscriptionResponse(google.protobuf.message.Message): global___AddSubscriptionResponse = AddSubscriptionResponse +@typing.final +class AgentState(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + AGENT_ID_FIELD_NUMBER: builtins.int + ETAG_FIELD_NUMBER: builtins.int + BINARY_DATA_FIELD_NUMBER: builtins.int + TEXT_DATA_FIELD_NUMBER: builtins.int + PROTO_DATA_FIELD_NUMBER: builtins.int + eTag: builtins.str + binary_data: builtins.bytes + text_data: builtins.str + @property + def agent_id(self) -> global___AgentId: ... + @property + def proto_data(self) -> google.protobuf.any_pb2.Any: ... + def __init__( + self, + *, + agent_id: global___AgentId | None = ..., + eTag: builtins.str = ..., + binary_data: builtins.bytes = ..., + text_data: builtins.str = ..., + proto_data: google.protobuf.any_pb2.Any | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "eTag", b"eTag", "proto_data", b"proto_data", "text_data", b"text_data"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ... + +global___AgentState = AgentState + +@typing.final +class GetStateResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + AGENT_STATE_FIELD_NUMBER: builtins.int + SUCCESS_FIELD_NUMBER: builtins.int + ERROR_FIELD_NUMBER: builtins.int + success: builtins.bool + error: builtins.str + @property + def agent_state(self) -> global___AgentState: ... + def __init__( + self, + *, + agent_state: global___AgentState | None = ..., + success: builtins.bool = ..., + error: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error", "success", b"success"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ... + +global___GetStateResponse = GetStateResponse + +@typing.final +class SaveStateResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SUCCESS_FIELD_NUMBER: builtins.int + ERROR_FIELD_NUMBER: builtins.int + success: builtins.bool + error: builtins.str + def __init__( + self, + *, + success: builtins.bool = ..., + error: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["_error", b"_error", "error", b"error"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_error", b"_error", "error", b"error", "success", b"success"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ... + +global___SaveStateResponse = SaveStateResponse + @typing.final class Message(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py index d561618a2c..fc27021587 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py @@ -19,6 +19,16 @@ class AgentRpcStub(object): request_serializer=agent__worker__pb2.Message.SerializeToString, response_deserializer=agent__worker__pb2.Message.FromString, ) + self.GetState = channel.unary_unary( + '/agents.AgentRpc/GetState', + request_serializer=agent__worker__pb2.AgentId.SerializeToString, + response_deserializer=agent__worker__pb2.GetStateResponse.FromString, + ) + self.SaveState = channel.unary_unary( + '/agents.AgentRpc/SaveState', + request_serializer=agent__worker__pb2.AgentState.SerializeToString, + response_deserializer=agent__worker__pb2.SaveStateResponse.FromString, + ) class AgentRpcServicer(object): @@ -30,6 +40,18 @@ class AgentRpcServicer(object): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def GetState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SaveState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_AgentRpcServicer_to_server(servicer, server): rpc_method_handlers = { @@ -38,6 +60,16 @@ def add_AgentRpcServicer_to_server(servicer, server): request_deserializer=agent__worker__pb2.Message.FromString, response_serializer=agent__worker__pb2.Message.SerializeToString, ), + 'GetState': grpc.unary_unary_rpc_method_handler( + servicer.GetState, + request_deserializer=agent__worker__pb2.AgentId.FromString, + response_serializer=agent__worker__pb2.GetStateResponse.SerializeToString, + ), + 'SaveState': grpc.unary_unary_rpc_method_handler( + servicer.SaveState, + request_deserializer=agent__worker__pb2.AgentState.FromString, + response_serializer=agent__worker__pb2.SaveStateResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'agents.AgentRpc', rpc_method_handlers) @@ -64,3 +96,37 @@ class AgentRpc(object): agent__worker__pb2.Message.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/GetState', + agent__worker__pb2.AgentId.SerializeToString, + agent__worker__pb2.GetStateResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def SaveState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/SaveState', + agent__worker__pb2.AgentState.SerializeToString, + agent__worker__pb2.SaveStateResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi index 1642ca2af1..bf6bc1ba2d 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi @@ -24,12 +24,32 @@ class AgentRpcStub: agent_worker_pb2.Message, ] + GetState: grpc.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentId, + agent_worker_pb2.GetStateResponse, + ] + + SaveState: grpc.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentState, + agent_worker_pb2.SaveStateResponse, + ] + class AgentRpcAsyncStub: OpenChannel: grpc.aio.StreamStreamMultiCallable[ agent_worker_pb2.Message, agent_worker_pb2.Message, ] + GetState: grpc.aio.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentId, + agent_worker_pb2.GetStateResponse, + ] + + SaveState: grpc.aio.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentState, + agent_worker_pb2.SaveStateResponse, + ] + class AgentRpcServicer(metaclass=abc.ABCMeta): @abc.abstractmethod def OpenChannel( @@ -38,4 +58,18 @@ class AgentRpcServicer(metaclass=abc.ABCMeta): context: _ServicerContext, ) -> typing.Union[collections.abc.Iterator[agent_worker_pb2.Message], collections.abc.AsyncIterator[agent_worker_pb2.Message]]: ... + @abc.abstractmethod + def GetState( + self, + request: agent_worker_pb2.AgentId, + context: _ServicerContext, + ) -> typing.Union[agent_worker_pb2.GetStateResponse, collections.abc.Awaitable[agent_worker_pb2.GetStateResponse]]: ... + + @abc.abstractmethod + def SaveState( + self, + request: agent_worker_pb2.AgentState, + context: _ServicerContext, + ) -> typing.Union[agent_worker_pb2.SaveStateResponse, collections.abc.Awaitable[agent_worker_pb2.SaveStateResponse]]: ... + def add_AgentRpcServicer_to_server(servicer: AgentRpcServicer, server: typing.Union[grpc.Server, grpc.aio.Server]) -> None: ...