first draft of stateful persistence grains for each agent.... (#3954)

* adds Orleans persistence for AgentState
This commit is contained in:
Ryan Sweet 2024-10-28 17:28:36 -07:00 committed by GitHub
parent 6925cd436a
commit 14846a3e84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 749 additions and 77 deletions

View File

@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensi
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -335,6 +337,10 @@ Global
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -395,6 +401,7 @@ Global
{A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}

View File

@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Aspire.Hosting.AppHost" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.
using Microsoft.AutoGen.Abstractions;
using Microsoft.AutoGen.Agents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
// send a message to the agent
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: false);
await app.WaitForShutdownAsync();
namespace Hello
{
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
context,
typeRegistry),
ISayHello,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
{
private AgentState? State { get; set; }
public async Task Handle(NewMessageReceived item)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
var entry = "We said hello to " + item.Message;
await Store(new AgentState
{
AgentId = this.AgentId,
TextData = entry
}).ConfigureAwait(false);
await PublishEvent(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
State = await Read<AgentState>(this.AgentId).ConfigureAwait(false);
var read = State?.TextData ?? "No state data found";
var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************";
var evt = new Output
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
//sleep
await Task.Delay(10000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);
}
public async Task<string> SayHello(string ask)
{
var response = $"\n\n\n\n***************Hello {ask}**********************\n\n\n\n";
return response;
}
}
public interface ISayHello
{
public Task<string> SayHello(string ask);
}
}

View File

@ -0,0 +1,138 @@
# AutoGen 0.4 .NET Hello World Sample
This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response.
## Prerequisites
To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later.
Also recommended is the [GitHub CLI](https://cli.github.com/).
## Instructions to run the sample
```bash
# Clone the repository
gh repo clone microsoft/autogen
cd dotnet/samples/Hello
dotnet run
```
## Key Concepts
This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages.
Flow Diagram:
```mermaid
%%{init: {'theme':'forest'}}%%
graph LR;
A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent]
C --> D{"WriteConsole()"}
B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEvent(Output('***Goodbye***'))"| C
E --> F{"Shutdown()"}
```
### Writing Event Handlers
The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data.
Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Abstractions;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface.
```csharp
TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
context,
typeRegistry),
ISayHello,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
{
public async Task Handle(NewMessageReceived item)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
}
```
### Inheritance and Composition
This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method.
### Starting the Application Runtime
AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method.
```csharp
// send a message to the agent
var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: true);
await App.RuntimeApp!.WaitForShutdownAsync();
await app.WaitForShutdownAsync();
```
### Sending Messages
The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file:
```proto
syntax = "proto3";
package devteam;
option csharp_namespace = "DevTeam.Shared";
message NewAsk {
string org = 1;
string repo = 2;
string ask = 3;
int64 issue_number = 4;
}
message ReadmeRequested {
string org = 1;
string repo = 2;
int64 issue_number = 3;
string ask = 4;
}
```
```xml
<ItemGroup>
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<Protobuf Include="..\Protos\messages.proto" Link="Protos\messages.proto" />
</ItemGroup>
```
You can send messages using the [```Microsoft.AutoGen.Agents``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus.
### Managing State
There is a simple API for persisting agent state.
```csharp
await Store(new AgentState
{
AgentId = this.AgentId,
TextData = entry
}).ConfigureAwait(false);
```
which can be read back using Read:
```csharp
State = await Read<AgentState>(this.AgentId).ConfigureAwait(false);
```

View File

@ -1,6 +1,9 @@
using Google.Protobuf;
namespace Microsoft.AutoGen.Abstractions;
public class AgentState<T> where T : class, new()
public class ChatState
<T> where T : IMessage, new()
{
public List<ChatHistoryItem> History { get; set; } = new();
public T Data { get; set; } = new();

View File

@ -16,9 +16,29 @@ public static class MessageExtensions
};
}
public static T FromCloudEvent<T>(this CloudEvent cloudEvent) where T : IMessage, new()
{
return cloudEvent.ProtoData.Unpack<T>();
}
public static AgentState ToAgentState<T>(this T state, AgentId agentId, string eTag) where T : IMessage
{
return new AgentState
{
ProtoData = Any.Pack(state),
AgentId = agentId,
ETag = eTag
};
}
public static T FromAgentState<T>(this AgentState state) where T : IMessage, new()
{
if (state.HasTextData == true)
{
if (typeof(T) == typeof(AgentState))
{
return (T)(IMessage)state;
}
}
return state.ProtoData.Unpack<T>();
}
}

View File

@ -14,7 +14,6 @@
<Protobuf Include="..\..\..\..\protos\agent_worker.proto" GrpcServices="Client;Server" Link="Protos\agent_worker.proto" />
<Protobuf Include="..\..\..\..\protos\cloudevent.proto" GrpcServices="Client;Server" Link="Protos\cloudevent.proto" />
<Protobuf Include="..\..\..\..\protos\agent_events.proto" GrpcServices="Client;Server" Link="Protos\agent_events.proto" />
<Protobuf Include="..\..\..\..\protos\agent_states.proto" GrpcServices="Client;Server" Link="Protos\agent_states.proto" />
</ItemGroup>
<ItemGroup>

View File

@ -108,7 +108,16 @@ public abstract class AgentBase
break;
}
}
protected async Task Store(AgentState state)
{
await _context.Store(state).ConfigureAwait(false);
return;
}
protected async Task<T> Read<T>(AgentId agentId) where T : IMessage, new()
{
var agentstate = await _context.Read(agentId).ConfigureAwait(false);
return agentstate.FromAgentState<T>();
}
private void OnResponseCore(RpcResponse response)
{
var requestId = response.RequestId;
@ -186,7 +195,6 @@ public abstract class AgentBase
protected async ValueTask PublishEvent(CloudEvent item)
{
//TODO: Reimplement
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");
@ -200,7 +208,7 @@ public abstract class AgentBase
},
(this, item, completion),
activity,
item.Type).ConfigureAwait(false);// TODO: It's not the descriptor's name probably
item.Type).ConfigureAwait(false);
}
public Task CallHandler(CloudEvent item)

View File

@ -5,14 +5,12 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public sealed class AgentClient(ILogger<AgentClient> logger, AgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator,
[FromKeyedServices("EventTypes")] EventTypes eventTypes)
: AgentBase(new ClientContext(logger, runtime, distributedContextPropagator), eventTypes)
{
public async ValueTask PublishEventAsync(CloudEvent evt) => await PublishEvent(evt);
public async ValueTask<RpcResponse> SendRequestAsync(AgentId target, string method, Dictionary<string, string> parameters) => await RequestAsync(target, method, parameters);
public async ValueTask PublishEventAsync(string topic, IMessage evt)
{
await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false);
@ -23,12 +21,10 @@ public sealed class AgentClient(ILogger<AgentClient> logger, AgentWorkerRuntime
public AgentBase? AgentInstance { get; set; }
public ILogger Logger { get; } = logger;
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public async ValueTask PublishEventAsync(CloudEvent @event)
{
await runtime.PublishEvent(@event).ConfigureAwait(false);
}
public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request)
{
await runtime.SendRequest(AgentInstance!, request).ConfigureAwait(false);
@ -38,5 +34,13 @@ public sealed class AgentClient(ILogger<AgentClient> logger, AgentWorkerRuntime
{
await runtime.SendResponse(response).ConfigureAwait(false);
}
public ValueTask Store(AgentState value)
{
throw new NotImplementedException();
}
public ValueTask<AgentState> Read(AgentId agentId)
{
throw new NotImplementedException();
}
}
}

View File

@ -12,20 +12,25 @@ internal sealed class AgentContext(AgentId agentId, AgentWorkerRuntime runtime,
public ILogger Logger { get; } = logger;
public AgentBase? AgentInstance { get; set; }
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;
public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response)
{
response.RequestId = request.RequestId;
await _runtime.SendResponse(response);
}
public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request)
{
await _runtime.SendRequest(agent, request);
}
public async ValueTask PublishEventAsync(CloudEvent @event)
{
await _runtime.PublishEvent(@event);
}
public async ValueTask Store(AgentState value)
{
await _runtime.Store(value);
}
public async ValueTask<AgentState> Read(AgentId agentId)
{
return await _runtime.Read(agentId);
}
}

View File

@ -84,7 +84,6 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
request.Agent.ReceiveMessage(message);
break;
case Message.MessageOneofCase.CloudEvent:
// TODO: Reimplement
// HACK: Send the message to an instance of each agent type
// where AgentId = (namespace: event.Namespace, name: agentType)
@ -323,10 +322,32 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
_channel?.Dispose();
}
}
public ValueTask SendRequest(RpcRequest request)
{
throw new NotImplementedException();
}
public ValueTask Store(AgentState value)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
var response = _client.SaveState(value);
if (!response.Success)
{
throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}.");
}
return ValueTask.CompletedTask;
}
public async ValueTask<AgentState> Read(AgentId agentId)
{
var response = await _client.GetStateAsync(agentId);
// if (response.Success && response.AgentState.AgentId is not null) - why is success always false?
if (response.AgentState.AgentId is not null)
{
return response.AgentState;
}
else
{
throw new KeyNotFoundException($"Failed to read AgentState for {agentId}.");
}
}
}

View File

@ -1,6 +1,7 @@
using Google.Protobuf;
using Microsoft.Extensions.AI;
namespace Microsoft.AutoGen.Agents.Client;
public abstract class InferenceAgent<T> : AgentBase where T : class, new()
public abstract class InferenceAgent<T> : AgentBase where T : IMessage, new()
{
protected IChatClient ChatClient { get; }
public InferenceAgent(

View File

@ -40,6 +40,7 @@ public abstract class SKAiAgent<T> : AgentBase where T : class, new()
var function = _kernel.CreateFunctionFromPrompt(template, promptSettings);
var result = (await _kernel.InvokeAsync(function, arguments).ConfigureAwait(true)).ToString();
AddToHistory(result, ChatUserType.Agent);
//await Store(_state.Data.ToAgentState(AgentId,""));//TODO add eTag
return result;
}

View File

@ -3,7 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.AutoGen.Agents;
public abstract class ConsoleAgent : IOAgent<AgentState>,
public abstract class ConsoleAgent : IOAgent,
IUseConsole,
IHandle<Input>,
IHandle<Output>

View File

@ -10,7 +10,7 @@ public abstract class FileAgent(
[FromKeyedServices("EventTypes")] EventTypes typeRegistry,
string inputPath = "input.txt",
string outputPath = "output.txt"
) : IOAgent<AgentState>(context, typeRegistry),
) : IOAgent(context, typeRegistry),
IUseFiles,
IHandle<Input>,
IHandle<Output>

View File

@ -2,16 +2,12 @@ using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Agents;
public abstract class IOAgent<T> : AgentBase where T : class, new()
public abstract class IOAgent : AgentBase
{
protected AgentState<T> _state;
public string _route = "base";
public IOAgent(IAgentContext context, EventTypes typeRegistry) : base(context, typeRegistry)
protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes)
{
_state = new();
}
public virtual async Task Handle(Input item)
{

View File

@ -6,7 +6,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public abstract class WebAPIAgent : IOAgent<AgentState>,
public abstract class WebAPIAgent : IOAgent,
IUseWebAPI,
IHandle<Input>,
IHandle<Output>

View File

@ -0,0 +1,22 @@
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents
{
public interface IAgentBase
{
// Properties
string AgentId { get; }
ILogger Logger { get; }
IAgentContext Context { get; }
// Methods
Task CallHandler(CloudEvent item);
Task<RpcResponse> HandleRequest(RpcRequest request);
Task Start();
Task ReceiveMessage(Message message);
Task Store(AgentState state);
Task<T> Read<T>(AgentId agentId);
Task PublishEvent(CloudEvent item);
}
}

View File

@ -10,6 +10,8 @@ public interface IAgentContext
AgentBase? AgentInstance { get; set; }
DistributedContextPropagator DistributedContextPropagator { get; }
ILogger Logger { get; }
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response);
ValueTask SendRequestAsync(AgentBase agent, RpcRequest request);
ValueTask PublishEventAsync(CloudEvent @event);

View File

@ -1,20 +0,0 @@
namespace Microsoft.AutoGen.Runtime;
internal sealed class AgentStateGrain([PersistentState("state", "agent-state")] IPersistentState<Dictionary<string, object>> state) : Grain, IAgentStateGrain
{
public ValueTask<(Dictionary<string, object> State, string ETag)> ReadStateAsync()
{
return new((state.State, state.Etag));
}
public async ValueTask<string> WriteStateAsync(Dictionary<string, object> value, string eTag)
{
if (string.Equals(state.Etag, eTag, StringComparison.Ordinal))
{
state.State = value;
await state.WriteStateAsync();
}
return state.Etag;
}
}

View File

@ -5,19 +5,27 @@ using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Orleans.Serialization;
namespace Microsoft.AutoGen.Runtime;
public static class AgentWorkerHostingExtensions
{
public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuilder builder)
public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false)
{
if (local)
{
//TODO: make configuration more flexible
builder.WebHost.ConfigureKestrel(serverOptions =>
{
serverOptions.ListenLocalhost(5001, listenOptions =>
{
listenOptions.Protocols = HttpProtocols.Http2;
listenOptions.UseHttps();
});
});
}
builder.Services.AddGrpc();
builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer());
// Ensure Orleans is added before the hosted service to guarantee that it starts first.
builder.UseOrleans();
builder.AddOrleans(local);
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.Services.AddSingleton<WorkerGateway>();
builder.Services.AddSingleton<IHostedService>(sp => sp.GetRequiredService<WorkerGateway>());
@ -27,22 +35,9 @@ public static class AgentWorkerHostingExtensions
public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder)
{
builder.WebHost.ConfigureKestrel(serverOptions =>
{
serverOptions.ListenLocalhost(5001, listenOptions =>
{
listenOptions.Protocols = HttpProtocols.Http2;
listenOptions.UseHttps();
});
});
builder.AddAgentService();
builder.UseOrleans(siloBuilder =>
{
siloBuilder.UseLocalhostClustering(); ;
});
builder.AddAgentService(local: true);
return builder;
}
public static WebApplication MapAgentService(this WebApplication app)
{
app.MapGrpcService<WorkerGatewayService>();

View File

@ -1,7 +0,0 @@
namespace Microsoft.AutoGen.Runtime;
internal interface IAgentStateGrain : IGrainWithStringKey
{
ValueTask<(Dictionary<string, object> State, string ETag)> ReadStateAsync();
ValueTask<string> WriteStateAsync(Dictionary<string, object> state, string eTag);
}

View File

@ -0,0 +1,9 @@
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
internal interface IWorkerAgentGrain : IGrainWithStringKey
{
ValueTask<AgentState> ReadStateAsync();
ValueTask<string> WriteStateAsync(AgentState state, string eTag);
}

View File

@ -7,4 +7,6 @@ public interface IWorkerGateway : IGrainObserver
{
ValueTask<RpcResponse> InvokeRequest(RpcRequest request);
ValueTask BroadcastEvent(CloudEvent evt);
ValueTask Store(AgentState value);
ValueTask<AgentState> Read(AgentId agentId);
}

View File

@ -21,6 +21,15 @@
<PackageReference Include="Microsoft.Orleans.Serialization.Protobuf" />
<PackageReference Include="Microsoft.Orleans.Server" />
<PackageReference Include="Microsoft.Orleans.Streaming" />
<PackageReference Include="Microsoft.Orleans.Sdk" />
<PackageReference Include="Microsoft.Orleans.Runtime" />
<PackageReference Include="Microsoft.Orleans.Persistence.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Clustering.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Reminders.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Streaming.EventHubs" />
<PackageReference Include="Microsoft.Orleans.Reminders" />
<PackageReference Include="OrleansDashboard"/>
</ItemGroup>
</Project>

View File

@ -0,0 +1,85 @@
using System.Configuration;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Orleans.Configuration;
using Orleans.Serialization;
namespace Microsoft.AutoGen.Runtime;
public static class OrleansRuntimeHostingExtenions
{
public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builder, bool local = false)
{
builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer());
// Ensure Orleans is added before the hosted service to guarantee that it starts first.
//TODO: make all of this configurable
builder.Host.UseOrleans(siloBuilder =>
{
// Development mode or local mode uses in-memory storage and streams
if (builder.Environment.IsDevelopment() || local)
{
siloBuilder.UseLocalhostClustering()
.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore")
.AddMemoryGrainStorage("AgentStateStore");
siloBuilder.UseInMemoryReminderService();
siloBuilder.UseDashboard(x => x.HostSelf = true);
siloBuilder.UseInMemoryReminderService();
}
else
{
var cosmosDbconnectionString = builder.Configuration.GetValue<string>("Orleans:CosmosDBConnectionString") ??
throw new ConfigurationErrorsException(
"Orleans:CosmosDBConnectionString is missing from configuration. This is required for persistence in production environments.");
siloBuilder.Configure<ClusterOptions>(options =>
{
//TODO: make this configurable
options.ClusterId = "AutoGen-cluster";
options.ServiceId = "AutoGen-cluster";
});
siloBuilder.Configure<SiloMessagingOptions>(options =>
{
options.ResponseTimeout = TimeSpan.FromMinutes(3);
options.SystemResponseTimeout = TimeSpan.FromMinutes(3);
});
siloBuilder.Configure<ClientMessagingOptions>(options =>
{
options.ResponseTimeout = TimeSpan.FromMinutes(3);
});
siloBuilder.UseCosmosClustering(o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "AutoGen";
o.DatabaseName = "clustering";
o.IsResourceCreationEnabled = true;
});
siloBuilder.UseCosmosReminderService(o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "AutoGen";
o.DatabaseName = "reminders";
o.IsResourceCreationEnabled = true;
});
siloBuilder.AddCosmosGrainStorage(
name: "AgentStateStore",
configureOptions: o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "AutoGen";
o.DatabaseName = "persistence";
o.IsResourceCreationEnabled = true;
});
//TODO: replace with EventHub
siloBuilder
.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
}
});
return builder;
}
}

View File

@ -0,0 +1,31 @@
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState<AgentState> state) : Grain, IWorkerAgentGrain
{
public async ValueTask<string> WriteStateAsync(AgentState newState, string eTag)
{
// etags for optimistic concurrency control
// if the Etag is null, its a new state
// if the passed etag is null or empty, we should not check the current state's Etag - caller doesnt care
// if both etags are set, they should match or it means that the state has changed since the last read.
if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal)))
{
state.State = newState;
await state.WriteStateAsync();
}
else
{
//TODO - this is probably not the correct behavior to just throw - I presume we want to somehow let the caller know that the state has changed and they need to re-read it
throw new ArgumentException(
"The provided ETag does not match the current ETag. The state has been modified by another request.");
}
return state.Etag;
}
public ValueTask<AgentState> ReadStateAsync()
{
return ValueTask.FromResult(state.State);
}
}

View File

@ -37,6 +37,7 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
public async ValueTask BroadcastEvent(CloudEvent evt)
{
// TODO: filter the workers that receive the event
var tasks = new List<Task>(_workers.Count);
foreach (var (_, connection) in _workers)
{
@ -211,7 +212,18 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } });
}
}
public async ValueTask Store(AgentState value)
{
var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId));
var agentState = _clusterClient.GetGrain<IWorkerAgentGrain>($"{agentId.Type}:{agentId.Key}");
await agentState.WriteStateAsync(value, value.ETag);
}
public async ValueTask<AgentState> Read(AgentId agentId)
{
var agentState = _clusterClient.GetGrain<IWorkerAgentGrain>($"{agentId.Type}:{agentId.Key}");
return await agentState.ReadStateAsync();
}
/*
private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request)
{

View File

@ -21,4 +21,18 @@ internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc
throw;
}
}
public override async Task<GetStateResponse> GetState(AgentId request, ServerCallContext context)
{
var state = await agentWorker.Read(request);
return new GetStateResponse { AgentState = state };
}
public override async Task<SaveStateResponse> SaveState(AgentState request, ServerCallContext context)
{
await agentWorker.Store(request);
return new SaveStateResponse
{
Success = true // TODO: Implement error handling
};
}
}

View File

@ -82,6 +82,29 @@ message AddSubscriptionResponse {
service AgentRpc {
rpc OpenChannel (stream Message) returns (stream Message);
rpc GetState(AgentId) returns (GetStateResponse);
rpc SaveState(AgentState) returns (SaveStateResponse);
}
message AgentState {
AgentId agent_id = 1;
string eTag = 2;
oneof data {
bytes binary_data = 3;
string text_data = 4;
google.protobuf.Any proto_data = 5;
}
}
message GetStateResponse {
AgentState agent_state = 1;
bool success = 2;
optional string error = 3;
}
message SaveStateResponse {
bool success = 1;
optional string error = 2;
}
message Message {

View File

@ -243,3 +243,17 @@ class WorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer):
)
case None:
logger.warning("Received empty subscription message")
async def GetState( # type: ignore
self,
request: agent_worker_pb2.AgentId,
context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.GetStateResponse],
) -> agent_worker_pb2.GetStateResponse: # type: ignore
raise NotImplementedError("Method not implemented!")
async def SaveState( # type: ignore
self,
request: agent_worker_pb2.AgentState,
context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.SaveStateResponse],
) -> agent_worker_pb2.SaveStateResponse: # type: ignore
raise NotImplementedError("Method not implemented!")

View File

@ -16,7 +16,7 @@ import cloudevent_pb2 as cloudevent__pb2
from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2?\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x42!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -60,8 +60,14 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303
_globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305
_globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397
_globals['_MESSAGE']._serialized_start=1400
_globals['_MESSAGE']._serialized_end=1854
_globals['_AGENTRPC']._serialized_start=1856
_globals['_AGENTRPC']._serialized_end=1919
_globals['_AGENTSTATE']._serialized_start=1400
_globals['_AGENTSTATE']._serialized_end=1557
_globals['_GETSTATERESPONSE']._serialized_start=1559
_globals['_GETSTATERESPONSE']._serialized_end=1665
_globals['_SAVESTATERESPONSE']._serialized_start=1667
_globals['_SAVESTATERESPONSE']._serialized_end=1733
_globals['_MESSAGE']._serialized_start=1736
_globals['_MESSAGE']._serialized_end=2190
_globals['_AGENTRPC']._serialized_start=2193
_globals['_AGENTRPC']._serialized_end=2371
# @@protoc_insertion_point(module_scope)

View File

@ -6,6 +6,7 @@ isort:skip_file
import builtins
import cloudevent_pb2
import collections.abc
import google.protobuf.any_pb2
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.message
@ -333,6 +334,81 @@ class AddSubscriptionResponse(google.protobuf.message.Message):
global___AddSubscriptionResponse = AddSubscriptionResponse
@typing.final
class AgentState(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
AGENT_ID_FIELD_NUMBER: builtins.int
ETAG_FIELD_NUMBER: builtins.int
BINARY_DATA_FIELD_NUMBER: builtins.int
TEXT_DATA_FIELD_NUMBER: builtins.int
PROTO_DATA_FIELD_NUMBER: builtins.int
eTag: builtins.str
binary_data: builtins.bytes
text_data: builtins.str
@property
def agent_id(self) -> global___AgentId: ...
@property
def proto_data(self) -> google.protobuf.any_pb2.Any: ...
def __init__(
self,
*,
agent_id: global___AgentId | None = ...,
eTag: builtins.str = ...,
binary_data: builtins.bytes = ...,
text_data: builtins.str = ...,
proto_data: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "eTag", b"eTag", "proto_data", b"proto_data", "text_data", b"text_data"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ...
global___AgentState = AgentState
@typing.final
class GetStateResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
AGENT_STATE_FIELD_NUMBER: builtins.int
SUCCESS_FIELD_NUMBER: builtins.int
ERROR_FIELD_NUMBER: builtins.int
success: builtins.bool
error: builtins.str
@property
def agent_state(self) -> global___AgentState: ...
def __init__(
self,
*,
agent_state: global___AgentState | None = ...,
success: builtins.bool = ...,
error: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error", "success", b"success"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ...
global___GetStateResponse = GetStateResponse
@typing.final
class SaveStateResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
SUCCESS_FIELD_NUMBER: builtins.int
ERROR_FIELD_NUMBER: builtins.int
success: builtins.bool
error: builtins.str
def __init__(
self,
*,
success: builtins.bool = ...,
error: builtins.str | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["_error", b"_error", "error", b"error"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_error", b"_error", "error", b"error", "success", b"success"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ...
global___SaveStateResponse = SaveStateResponse
@typing.final
class Message(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

View File

@ -19,6 +19,16 @@ class AgentRpcStub(object):
request_serializer=agent__worker__pb2.Message.SerializeToString,
response_deserializer=agent__worker__pb2.Message.FromString,
)
self.GetState = channel.unary_unary(
'/agents.AgentRpc/GetState',
request_serializer=agent__worker__pb2.AgentId.SerializeToString,
response_deserializer=agent__worker__pb2.GetStateResponse.FromString,
)
self.SaveState = channel.unary_unary(
'/agents.AgentRpc/SaveState',
request_serializer=agent__worker__pb2.AgentState.SerializeToString,
response_deserializer=agent__worker__pb2.SaveStateResponse.FromString,
)
class AgentRpcServicer(object):
@ -30,6 +40,18 @@ class AgentRpcServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetState(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def SaveState(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_AgentRpcServicer_to_server(servicer, server):
rpc_method_handlers = {
@ -38,6 +60,16 @@ def add_AgentRpcServicer_to_server(servicer, server):
request_deserializer=agent__worker__pb2.Message.FromString,
response_serializer=agent__worker__pb2.Message.SerializeToString,
),
'GetState': grpc.unary_unary_rpc_method_handler(
servicer.GetState,
request_deserializer=agent__worker__pb2.AgentId.FromString,
response_serializer=agent__worker__pb2.GetStateResponse.SerializeToString,
),
'SaveState': grpc.unary_unary_rpc_method_handler(
servicer.SaveState,
request_deserializer=agent__worker__pb2.AgentState.FromString,
response_serializer=agent__worker__pb2.SaveStateResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'agents.AgentRpc', rpc_method_handlers)
@ -64,3 +96,37 @@ class AgentRpc(object):
agent__worker__pb2.Message.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetState(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/GetState',
agent__worker__pb2.AgentId.SerializeToString,
agent__worker__pb2.GetStateResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def SaveState(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/SaveState',
agent__worker__pb2.AgentState.SerializeToString,
agent__worker__pb2.SaveStateResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@ -24,12 +24,32 @@ class AgentRpcStub:
agent_worker_pb2.Message,
]
GetState: grpc.UnaryUnaryMultiCallable[
agent_worker_pb2.AgentId,
agent_worker_pb2.GetStateResponse,
]
SaveState: grpc.UnaryUnaryMultiCallable[
agent_worker_pb2.AgentState,
agent_worker_pb2.SaveStateResponse,
]
class AgentRpcAsyncStub:
OpenChannel: grpc.aio.StreamStreamMultiCallable[
agent_worker_pb2.Message,
agent_worker_pb2.Message,
]
GetState: grpc.aio.UnaryUnaryMultiCallable[
agent_worker_pb2.AgentId,
agent_worker_pb2.GetStateResponse,
]
SaveState: grpc.aio.UnaryUnaryMultiCallable[
agent_worker_pb2.AgentState,
agent_worker_pb2.SaveStateResponse,
]
class AgentRpcServicer(metaclass=abc.ABCMeta):
@abc.abstractmethod
def OpenChannel(
@ -38,4 +58,18 @@ class AgentRpcServicer(metaclass=abc.ABCMeta):
context: _ServicerContext,
) -> typing.Union[collections.abc.Iterator[agent_worker_pb2.Message], collections.abc.AsyncIterator[agent_worker_pb2.Message]]: ...
@abc.abstractmethod
def GetState(
self,
request: agent_worker_pb2.AgentId,
context: _ServicerContext,
) -> typing.Union[agent_worker_pb2.GetStateResponse, collections.abc.Awaitable[agent_worker_pb2.GetStateResponse]]: ...
@abc.abstractmethod
def SaveState(
self,
request: agent_worker_pb2.AgentState,
context: _ServicerContext,
) -> typing.Union[agent_worker_pb2.SaveStateResponse, collections.abc.Awaitable[agent_worker_pb2.SaveStateResponse]]: ...
def add_AgentRpcServicer_to_server(servicer: AgentRpcServicer, server: typing.Union[grpc.Server, grpc.aio.Server]) -> None: ...