Rysweet 4679 move grpc to runtimegrpc project (#4680)

* refactor moving grpc runtime to a separate project
This commit is contained in:
Ryan Sweet 2024-12-13 09:29:57 -08:00 committed by GitHub
parent 55e157cb99
commit c169df8b7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 299 additions and 159 deletions

View File

@ -136,6 +136,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Integrati
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgent.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgent.AppHost\HelloAgent.AppHost.csproj", "{99D7766B-076F-4E6F-A8D2-3DF1DAFA2599}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Contracts", "src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj", "{7F60934B-3E59-48D0-B26D-04A39FEC13EF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc", "src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj", "{9653676C-147D-4CBE-BB53-A30FD3634F4C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Runtime.Grpc", "src\Microsoft.AutoGen\Runtime.Grpc\Microsoft.AutoGen.Runtime.Grpc.csproj", "{8457B68C-CC86-4A3F-8559-C1AE199EC366}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -258,10 +264,6 @@ Global
{FD87BD33-4616-460B-AC85-A412BA08BB78}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FD87BD33-4616-460B-AC85-A412BA08BB78}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FD87BD33-4616-460B-AC85-A412BA08BB78}.Release|Any CPU.Build.0 = Release|Any CPU
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106}.Release|Any CPU.Build.0 = Release|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.Build.0 = Debug|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -354,6 +356,18 @@ Global
{99D7766B-076F-4E6F-A8D2-3DF1DAFA2599}.Debug|Any CPU.Build.0 = Debug|Any CPU
{99D7766B-076F-4E6F-A8D2-3DF1DAFA2599}.Release|Any CPU.ActiveCfg = Release|Any CPU
{99D7766B-076F-4E6F-A8D2-3DF1DAFA2599}.Release|Any CPU.Build.0 = Release|Any CPU
{7F60934B-3E59-48D0-B26D-04A39FEC13EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7F60934B-3E59-48D0-B26D-04A39FEC13EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7F60934B-3E59-48D0-B26D-04A39FEC13EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7F60934B-3E59-48D0-B26D-04A39FEC13EF}.Release|Any CPU.Build.0 = Release|Any CPU
{9653676C-147D-4CBE-BB53-A30FD3634F4C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9653676C-147D-4CBE-BB53-A30FD3634F4C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9653676C-147D-4CBE-BB53-A30FD3634F4C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9653676C-147D-4CBE-BB53-A30FD3634F4C}.Release|Any CPU.Build.0 = Release|Any CPU
{8457B68C-CC86-4A3F-8559-C1AE199EC366}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8457B68C-CC86-4A3F-8559-C1AE199EC366}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8457B68C-CC86-4A3F-8559-C1AE199EC366}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8457B68C-CC86-4A3F-8559-C1AE199EC366}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -389,7 +403,6 @@ Global
{42A8251C-E7B3-47BB-A82E-459952EBE132} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{4BB66E06-37D8-45A0-9B97-DE590AFBA340} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{FD87BD33-4616-460B-AC85-A412BA08BB78} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{952827D4-8D4C-4327-AE4D-E8D25811EF35} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{668726B9-77BC-45CF-B576-0F0773BF1615} = {686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}
{84020C4A-933A-4693-9889-1B99304A7D76} = {668726B9-77BC-45CF-B576-0F0773BF1615}
@ -416,6 +429,9 @@ Global
{394FDAF8-74F9-4977-94A5-3371737EB774} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{D04C6153-8EAF-4E54-9852-52CEC1BE8D31} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{99D7766B-076F-4E6F-A8D2-3DF1DAFA2599} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{7F60934B-3E59-48D0-B26D-04A39FEC13EF} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{9653676C-147D-4CBE-BB53-A30FD3634F4C} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{8457B68C-CC86-4A3F-8559-C1AE199EC366} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}

View File

@ -72,6 +72,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageVersion>
<PackageVersion Include="Microsoft.Orleans.Core.Abstractions" Version="$(MicrosoftOrleans)" />
<PackageVersion Include="Microsoft.Orleans.Persistence.Memory" Version="$(MicrosoftOrleans)" />
<PackageVersion Include="Microsoft.Orleans.Persistence.Cosmos" Version="$(MicrosoftOrleans)" />
<PackageVersion Include="Microsoft.Orleans.Reminders" Version="$(MicrosoftOrleans)" />
<PackageVersion Include="Microsoft.Orleans.Reminders.Cosmos" Version="$(MicrosoftOrleans)" />

View File

@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime.Grpc\Microsoft.AutoGen.Runtime.Grpc.csproj" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>

View File

@ -1,5 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
var app = await Microsoft.AutoGen.Agents.Host.StartAsync(local: false, useGrpc: true);
var app = await Microsoft.AutoGen.Runtime.Grpc.Host.StartAsync(local: false, useGrpc: true);
await app.WaitForShutdownAsync();

View File

@ -2,7 +2,8 @@
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Microsoft.AspNetCore": "Warning",
"Aspire.Hosting.ApplicationModel.ResourceNotificationService": "Debug"
}
}
}

View File

@ -10,6 +10,7 @@
<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/Agents/Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="../../../src/Microsoft.AutoGen/Runtime.Grpc/Microsoft.AutoGen.Runtime.Grpc.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Extensions\Aspire\Microsoft.AutoGen.Extensions.Aspire.csproj" />
</ItemGroup>

View File

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using Microsoft.AutoGen.Runtime.Grpc;
using Microsoft.AutoGen.Agents;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();

View File

@ -4,7 +4,6 @@
using DevTeam.Backend;
using DevTeam.Shared;
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Memory;
namespace Microsoft.AI.DevTeam;

View File

@ -6,7 +6,6 @@ using DevTeam;
using DevTeam.Backend;
using DevTeam.Shared;
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Memory;

View File

@ -1,3 +1,4 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Agent.cs
@ -13,7 +14,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
public abstract class Agent : IHandle
public abstract class Agent : IDisposable, IHandle
{
public static readonly ActivitySource s_source = new("AutoGen.Agent");
public AgentId AgentId => _runtime.AgentId;
@ -22,6 +23,8 @@ public abstract class Agent : IHandle
private readonly Channel<object> _mailbox = Channel.CreateUnbounded<object>();
private readonly IAgentRuntime _runtime;
public string Route { get; set; } = "base";
protected internal ILogger<Agent> _logger;
public IAgentRuntime Context => _runtime;
protected readonly EventTypes EventTypes;
@ -341,4 +344,9 @@ public abstract class Agent : IHandle
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
public void Dispose()
{
throw new NotImplementedException();
}
}

View File

@ -9,7 +9,7 @@ using static Microsoft.AutoGen.Contracts.CloudEvent.Types;
namespace Microsoft.AutoGen.Agents;
internal sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger<Agent> logger, DistributedContextPropagator distributedContextPropagator) : IAgentRuntime
public sealed class AgentRuntime(AgentId agentId, IAgentWorker worker, ILogger<Agent> logger, DistributedContextPropagator distributedContextPropagator) : IAgentRuntime
{
private readonly IAgentWorker worker = worker;

View File

@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AgentTypes.cs
namespace Microsoft.AutoGen.Agents
;
public sealed class AgentTypes(Dictionary<string, Type> types)
{
public Dictionary<string, Type> Types { get; } = types;
public static AgentTypes? GetAgentTypesFromAssembly()
{
var agents = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(Agent))
&& !type.IsAbstract
&& !type.Name.Equals(nameof(Client)))
.ToDictionary(type => type.Name, type => type);
return new AgentTypes(agents);
}
}

View File

@ -35,7 +35,7 @@ public class AgentWorker :
IHostApplicationLifetime hostApplicationLifetime,
IServiceProvider serviceProvider,
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorker> logger,
ILogger<AgentWorker> logger,
DistributedContextPropagator distributedContextPropagator)
{
_logger = logger;

View File

@ -1,9 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// App.cs
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Google.Protobuf;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Agents;
@ -14,22 +16,15 @@ public static class AgentsApp
public static WebApplication? Host { get; private set; }
[MemberNotNull(nameof(Host))]
public static async ValueTask<WebApplication> StartAsync(WebApplicationBuilder? builder = null, AgentTypes? agentTypes = null, bool local = false)
public static async ValueTask<WebApplication> StartAsync(WebApplicationBuilder? builder = null, AgentTypes? agentTypes = null)
{
builder ??= WebApplication.CreateBuilder();
if (local)
{
// start the server runtime
builder.AddLocalAgentService(useGrpc: false);
}
builder.AddAgentWorker(local: local)
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.AddAgentWorker()
.AddAgents(agentTypes);
builder.AddServiceDefaults();
var app = builder.Build();
if (local)
{
app.MapAgentService(local: true, useGrpc: false);
}
app.MapDefaultEndpoints();
Host = app;
await app.StartAsync().ConfigureAwait(false);
@ -44,7 +39,7 @@ public static class AgentsApp
{
if (Host == null)
{
await StartAsync(builder, agents, local);
await StartAsync(builder, agents).ConfigureAwait(false);
}
var client = Host.Services.GetRequiredService<Client>() ?? throw new InvalidOperationException("Host not started");
await client.PublishEventAsync(topic, message, new CancellationToken()).ConfigureAwait(true);
@ -56,7 +51,7 @@ public static class AgentsApp
{
throw new InvalidOperationException("Host not started");
}
await Host.StopAsync();
await Host.StopAsync().ConfigureAwait(true);
}
private static IHostApplicationBuilder AddAgents(this IHostApplicationBuilder builder, AgentTypes? agentTypes)

View File

@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// EventTypes.cs
using Google.Protobuf.Reflection;
namespace Microsoft.AutoGen.Agents;
public sealed class EventTypes(TypeRegistry typeRegistry, Dictionary<string, Type> types, Dictionary<Type, HashSet<string>> eventsMap)
{
public TypeRegistry TypeRegistry { get; } = typeRegistry;
public Dictionary<string, Type> Types { get; } = types;
public Dictionary<Type, HashSet<string>> EventsMap { get; } = eventsMap;
}

View File

@ -6,7 +6,6 @@ using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
@ -31,20 +30,11 @@ public static class HostBuilderExtensions
return builder;
}
public static IHostApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string? agentServiceAddress = null, bool local = false)
public static IHostApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string? agentServiceAddress = null)
{
agentServiceAddress ??= builder.Configuration["AGENT_HOST"] ?? _defaultAgentServiceAddress;
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
// if !local, then add the gRPC client
if (!local)
{
builder.AddGrpcAgentWorker(agentServiceAddress);
}
else
{
builder.Services.AddSingleton<IAgentWorker, AgentWorker>();
}
builder.Services.AddSingleton<IAgentWorker, AgentWorker>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IAgentWorker>());
builder.Services.AddKeyedSingleton("EventTypes", (sp, key) =>
{
@ -125,47 +115,6 @@ public static class HostBuilderExtensions
return property?.GetValue(null) as MessageDescriptor;
}
}
public sealed class ReflectionHelper
{
public static bool IsSubclassOfGeneric(Type type, Type genericBaseType)
{
while (type != null && type != typeof(object))
{
if (genericBaseType == (type.IsGenericType ? type.GetGenericTypeDefinition() : type))
{
return true;
}
if (type.BaseType == null)
{
return false;
}
type = type.BaseType;
}
return false;
}
}
public sealed class AgentTypes(Dictionary<string, Type> types)
{
public Dictionary<string, Type> Types { get; } = types;
public static AgentTypes? GetAgentTypesFromAssembly()
{
var agents = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(Agent))
&& !type.IsAbstract
&& !type.Name.Equals(nameof(Client)))
.ToDictionary(type => type.Name, type => type);
return new AgentTypes(agents);
}
}
public sealed class EventTypes(TypeRegistry typeRegistry, Dictionary<string, Type> types, Dictionary<Type, HashSet<string>> eventsMap)
{
public TypeRegistry TypeRegistry { get; } = typeRegistry;
public Dictionary<string, Type> Types { get; } = types;
public Dictionary<Type, HashSet<string>> EventsMap { get; } = eventsMap;
}
public sealed class AgentApplicationBuilder(IHostApplicationBuilder builder)
{
public AgentApplicationBuilder AddAgent<

View File

@ -1,8 +1,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IHandle.cs
namespace Microsoft.AutoGen.Contracts;
// Copyright (c) Microsoft Corporation. All rights reserved.
// IHandle.cs
namespace Microsoft.AutoGen.Agents;
public interface IHandle
{
Task HandleObject(object item);

View File

@ -0,0 +1,59 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ReflectionHelper.cs
using System.Reflection;
using Google.Protobuf;
using Google.Protobuf.Reflection;
namespace Microsoft.AutoGen.Agents;
public sealed class ReflectionHelper
{
public static bool IsSubclassOfGeneric(Type type, Type genericBaseType)
{
while (type != null && type != typeof(object))
{
if (genericBaseType == (type.IsGenericType ? type.GetGenericTypeDefinition() : type))
{
return true;
}
if (type.BaseType == null)
{
return false;
}
type = type.BaseType;
}
return false;
}
public static EventTypes GetAgentsMetadata(params Assembly[] assemblies)
{
var interfaceType = typeof(IMessage);
var pairs = assemblies
.SelectMany(assembly => assembly.GetTypes())
.Where(type => interfaceType.IsAssignableFrom(type) && type.IsClass && !type.IsAbstract)
.Select(t => (t, GetMessageDescriptor(t)));
var descriptors = pairs.Select(t => t.Item2);
var typeRegistry = TypeRegistry.FromMessages(descriptors);
var types = pairs.ToDictionary(item => item.Item2?.FullName ?? "", item => item.t);
var eventsMap = assemblies
.SelectMany(assembly => assembly.GetTypes())
.Where(type => IsSubclassOfGeneric(type, typeof(Agent)) && !type.IsAbstract)
.Select(t => (t, t.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>))
.Select(i => GetMessageDescriptor(i.GetGenericArguments().First())?.FullName ?? "").ToHashSet()))
.ToDictionary(item => item.t, item => item.Item2);
return new EventTypes(typeRegistry, types, eventsMap);
}
/// <summary>
/// Gets the message descriptor for the specified type.
/// </summary>
/// <param name="type">The type to get the message descriptor for.</param>
/// <returns>The message descriptor if found; otherwise, <c>null</c>.</returns>
public static MessageDescriptor? GetMessageDescriptor(Type type)
{
var property = type.GetProperty("Descriptor", BindingFlags.Static | BindingFlags.Public);
return property?.GetValue(null) as MessageDescriptor;
}
}

View File

@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// App.cs
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Google.Protobuf;
using Microsoft.AspNetCore.Builder;
using Microsoft.AutoGen.Agents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Core.Grpc;
public static class AgentsApp
{
// need a variable to store the runtime instance
public static WebApplication? Host { get; private set; }
[MemberNotNull(nameof(Host))]
public static async ValueTask<WebApplication> StartAsync(WebApplicationBuilder? builder = null, AgentTypes? agentTypes = null, bool local = false)
{
builder ??= WebApplication.CreateBuilder();
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
builder.AddAgentWorker()
.AddAgents(agentTypes);
builder.AddServiceDefaults();
var app = builder.Build();
app.MapDefaultEndpoints();
Host = app;
await app.StartAsync().ConfigureAwait(false);
return Host;
}
public static async ValueTask<WebApplication> PublishMessageAsync(
string topic,
IMessage message,
WebApplicationBuilder? builder = null,
AgentTypes? agents = null,
bool local = false)
{
if (Host == null)
{
await StartAsync(builder, agents, local);
}
var client = Host.Services.GetRequiredService<Client>() ?? throw new InvalidOperationException("Host not started");
await client.PublishEventAsync(topic, message, new CancellationToken()).ConfigureAwait(true);
return Host;
}
public static async ValueTask ShutdownAsync()
{
if (Host == null)
{
throw new InvalidOperationException("Host not started");
}
await Host.StopAsync();
}
private static IHostApplicationBuilder AddAgents(this IHostApplicationBuilder builder, AgentTypes? agentTypes)
{
agentTypes ??= AgentTypes.GetAgentTypesFromAssembly()
?? throw new InvalidOperationException("No agent types found in the assembly");
foreach (var type in agentTypes.Types)
{
builder.AddAgent(type.Key, type.Value);
}
return builder;
}
}

View File

@ -20,8 +20,7 @@ public sealed class GrpcAgentWorker(
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorker> logger,
DistributedContextPropagator distributedContextPropagator) :
AgentWorker(hostApplicationLifetime,
serviceProvider, configuredAgentTypes, logger, distributedContextPropagator), IHostedService, IDisposable, IAgentWorker
IHostedService, IDisposable, IAgentWorker
{
private readonly object _channelLock = new();
private readonly ConcurrentDictionary<string, Type> _agentTypes = new();
@ -78,20 +77,6 @@ public sealed class GrpcAgentWorker(
request.Agent.ReceiveMessage(message);
break;
case Message.MessageOneofCase.RegisterAgentTypeResponse:
if (!message.RegisterAgentTypeResponse.Success)
{
throw new InvalidOperationException($"Failed to register agent: '{message.RegisterAgentTypeResponse.Error}'.");
}
break;
case Message.MessageOneofCase.AddSubscriptionResponse:
if (!message.AddSubscriptionResponse.Success)
{
throw new InvalidOperationException($"Failed to add subscription: '{message.AddSubscriptionResponse.Error}'.");
}
break;
case Message.MessageOneofCase.CloudEvent:
// HACK: Send the message to an instance of each agent type
@ -102,7 +87,7 @@ public sealed class GrpcAgentWorker(
foreach (var (typeName, _) in _agentTypes)
{
var agent = GetOrActivateAgent(new AgentId(typeName, item.Source));
var agent = GetOrActivateAgent(new AgentId { Type = typeName, Key = item.Source });
agent.ReceiveMessage(message);
}
@ -212,7 +197,7 @@ public sealed class GrpcAgentWorker(
{
var events = agentType.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>))
.Select(i => i.GetGenericArguments().First().Name);
.Select(i => ReflectionHelper.GetMessageDescriptor(i.GetGenericArguments().First())?.FullName);
//var state = agentType.BaseType?.GetGenericArguments().First();
var topicTypes = agentType.GetCustomAttributes<TopicSubscriptionAttribute>().Select(t => t.Topic);
@ -270,25 +255,25 @@ public sealed class GrpcAgentWorker(
}
}
// new is intentional
public new async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
public async ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(new Message { Response = response }, cancellationToken).ConfigureAwait(false);
}
// new is intentional
public new async ValueTask SendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default)
public async ValueTask SendRequestAsync(Agent agent, RpcRequest request, CancellationToken cancellationToken = default)
{
var requestId = Guid.NewGuid().ToString();
_pendingRequests[requestId] = (agent, request.RequestId);
request.RequestId = requestId;
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
// new is intentional
public new async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default)
public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(message, cancellationToken).ConfigureAwait(false);
}
// new is intentional
public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
}
@ -332,7 +317,7 @@ public sealed class GrpcAgentWorker(
return _channel;
}
public new async Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
_channel = GetChannel();
StartCore();
@ -369,7 +354,7 @@ public sealed class GrpcAgentWorker(
}
}
public new async Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
_shutdownCts.Cancel();
@ -389,8 +374,8 @@ public sealed class GrpcAgentWorker(
_channel?.Dispose();
}
}
// new intentional
public new async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
var response = _client.SaveState(value, null, null, cancellationToken);
@ -399,8 +384,8 @@ public sealed class GrpcAgentWorker(
throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}.");
}
}
// new intentional
public new async ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
public async ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
{
var response = await _client.GetStateAsync(agentId).ConfigureAwait(true);
// if (response.Success && response.AgentState.AgentId is not null) - why is success always false?

View File

@ -1,13 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GrpcAgentWorkerHostBuilderExtension.cs
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Core.Grpc;
public static class GrpcAgentWorkerHostBuilderExtensions
{

View File

@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\Contracts\Microsoft.AutoGen.Contracts.csproj" />
</ItemGroup>
</Project>

View File

@ -7,31 +7,23 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public static class AgentWorkerHostingExtensions
{
public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuilder builder, bool local = false, bool useGrpc = true)
public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder)
{
builder.AddOrleans(local);
builder.AddOrleans();
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);
if (useGrpc)
{
builder.Services.AddGrpc();
builder.Services.AddSingleton<GrpcGateway>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<GrpcGateway>());
}
builder.Services.AddGrpc();
builder.Services.AddSingleton<GrpcGateway>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<GrpcGateway>());
return builder;
}
public static IHostApplicationBuilder AddLocalAgentService(this IHostApplicationBuilder builder, bool useGrpc = true)
{
return builder.AddAgentService(local: true, useGrpc);
}
public static WebApplication MapAgentService(this WebApplication app, bool local = false, bool useGrpc = true)
{
if (useGrpc) { app.MapGrpcService<GrpcGatewayService>(); }

View File

@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\Extensions\Aspire\Microsoft.AutoGen.Extensions.Aspire.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Reminders" />
<PackageReference Include="Microsoft.Orleans.Persistence.Memory" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.Orleans.Serialization" />
<PackageReference Include="Microsoft.Orleans.Serialization.Protobuf" />
<PackageReference Include="Microsoft.Orleans.Server" />
<PackageReference Include="Microsoft.Orleans.Streaming" />
<PackageReference Include="Microsoft.Orleans.Sdk" />
<PackageReference Include="Microsoft.Orleans.Runtime" />
<PackageReference Include="Microsoft.Orleans.Persistence.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Clustering.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Reminders.Cosmos" />
<PackageReference Include="Microsoft.Orleans.Streaming.EventHubs" />
<PackageReference Include="OrleansDashboard" />
</ItemGroup>
</Project>

View File

@ -7,7 +7,7 @@ using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public sealed class GrpcGateway : BackgroundService, IGateway
{

View File

@ -4,7 +4,7 @@
using Grpc.Core;
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
// gRPC service which handles communication between the agent worker and the cluster.
internal sealed class GrpcGatewayService : AgentRpc.AgentRpcBase

View File

@ -5,7 +5,7 @@ using System.Threading.Channels;
using Grpc.Core;
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection
{

View File

@ -4,7 +4,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Hosting;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public static class Host
{
@ -12,14 +12,8 @@ public static class Host
{
var builder = WebApplication.CreateBuilder();
builder.AddServiceDefaults();
if (local)
{
builder.AddLocalAgentService(useGrpc: useGrpc);
}
else
{
builder.AddAgentService(useGrpc: useGrpc);
}
builder.AddAgentService();
var app = builder.Build();
app.MapAgentService(local, useGrpc);
app.MapDefaultEndpoints();

View File

@ -2,7 +2,7 @@
// IGateway.cs
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public interface IGateway : IGrainObserver
{

View File

@ -3,7 +3,7 @@
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
internal sealed class AgentStateGrain([PersistentState("state", "AgentStateStore")] IPersistentState<AgentState> state) : Grain, IAgentState
{

View File

@ -2,7 +2,7 @@
// IRegistryGrain.cs
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public interface IRegistryGrain : IGrainWithIntegerKey
{

View File

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ISubscriptionsGrain.cs
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public interface ISubscriptionsGrain : IGrainWithIntegerKey
{
ValueTask SubscribeAsync(string agentType, string topic);

View File

@ -9,16 +9,11 @@ using Microsoft.Extensions.Hosting;
using Orleans.Configuration;
using Orleans.Serialization;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
public static class OrleansRuntimeHostingExtenions
{
public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builder, bool local = false)
{
return builder.AddOrleans(local);
}
public static IHostApplicationBuilder AddOrleans(this IHostApplicationBuilder builder, bool local = false)
public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builder)
{
builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer());
builder.Services.AddSingleton<IRegistryGrain, RegistryGrain>();
@ -28,7 +23,7 @@ public static class OrleansRuntimeHostingExtenions
builder.UseOrleans((siloBuilder) =>
{
// Development mode or local mode uses in-memory storage and streams
if (builder.Environment.IsDevelopment() || local)
if (builder.Environment.IsDevelopment())
{
siloBuilder.UseLocalhostClustering()
.AddMemoryStreams("StreamProvider")

View File

@ -3,7 +3,7 @@
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
internal sealed class RegistryGrain : Grain, IRegistryGrain
{

View File

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SubscriptionsGrain.cs
namespace Microsoft.AutoGen.Agents;
namespace Microsoft.AutoGen.Runtime.Grpc;
internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore")] IPersistentState<SubscriptionsState> state) : Grain, ISubscriptionsGrain
{

View File

@ -4,6 +4,7 @@
using System.Collections.Concurrent;
using FluentAssertions;
using Google.Protobuf.Reflection;
using Microsoft.AspNetCore.Builder;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
@ -102,13 +103,12 @@ public sealed class InMemoryAgentRuntimeFixture : IDisposable
{
public InMemoryAgentRuntimeFixture()
{
var builder = Microsoft.Extensions.Hosting.Host.CreateApplicationBuilder();
var builder = WebApplication.CreateBuilder();
// step 1: create in-memory agent runtime
// step 2: register TestAgent to that agent runtime
builder
.AddAgentService(local: true, useGrpc: false)
.AddAgentWorker(local: true)
.AddAgentWorker()
.AddAgent<TestAgent>(nameof(TestAgent));
AppHost = builder.Build();

View File

@ -43,7 +43,7 @@ public class HelloAppHostIntegrationTests(ITestOutputHelper testOutput)
}
}
//sleep 5 seconds to make sure the app is running
await Task.Delay(5000);
await Task.Delay(15000);
app.EnsureNoErrorsLogged();
app.EnsureLogContains("HelloAgents said Goodbye");
app.EnsureLogContains("Wild Hello from Python!");