Simplify publish events in agent (#4093)

* simplify publishing imessage contracts

use new api

complete adoption

remove unused project

more delete

more delete

* rename methods

* formatting

* Add task type that are messages to enable multi-modal tasks. (#4091)

* Add task type that are messages to enable multi-modal tasks.

* fix test

---------

Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
Diego Colombo 2024-11-08 14:16:24 +00:00 committed by GitHub
parent 5fa38b0166
commit 621b17ebbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 75 additions and 139 deletions

View File

@ -84,8 +84,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Abstracti
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Extensions.SemanticKernel", "src\Microsoft.AutoGen\Extensions\SemanticKernel\Microsoft.AutoGen.Extensions.SemanticKernel.csproj", "{952827D4-8D4C-4327-AE4D-E8D25811EF35}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Extensions.CloudEvents", "src\Microsoft.AutoGen\Extensions\CloudEvents\Microsoft.AutoGen.Extensions.CloudEvents.csproj", "{21C9EC49-E848-4EAE-932F-0862D44F7A80}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.Runtime", "src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj", "{A905E29A-7110-497F-ADC5-2CE2A148FEA0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AutoGen.ServiceDefaults", "src\Microsoft.AutoGen\ServiceDefaults\Microsoft.AutoGen.ServiceDefaults.csproj", "{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE}"
@ -266,10 +264,6 @@ Global
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Debug|Any CPU.Build.0 = Debug|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.ActiveCfg = Release|Any CPU
{952827D4-8D4C-4327-AE4D-E8D25811EF35}.Release|Any CPU.Build.0 = Release|Any CPU
{21C9EC49-E848-4EAE-932F-0862D44F7A80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{21C9EC49-E848-4EAE-932F-0862D44F7A80}.Debug|Any CPU.Build.0 = Debug|Any CPU
{21C9EC49-E848-4EAE-932F-0862D44F7A80}.Release|Any CPU.ActiveCfg = Release|Any CPU
{21C9EC49-E848-4EAE-932F-0862D44F7A80}.Release|Any CPU.Build.0 = Release|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A905E29A-7110-497F-ADC5-2CE2A148FEA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -391,7 +385,6 @@ Global
{FD87BD33-4616-460B-AC85-A412BA08BB78} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{E0C991D9-0DB8-471C-ADC9-5FB16E2A0106} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{952827D4-8D4C-4327-AE4D-E8D25811EF35} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{21C9EC49-E848-4EAE-932F-0862D44F7A80} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{A905E29A-7110-497F-ADC5-2CE2A148FEA0} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{D7E9D90B-5595-4E72-A90A-6DE20D9AB7AE} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{668726B9-77BC-45CF-B576-0F0773BF1615} = {686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}

View File

@ -20,16 +20,10 @@ public class HelloAIAgent(
{
var prompt = "Please write a limerick greeting someone with the name " + item.Message;
var response = await client.CompleteAsync(prompt);
var evt = new Output
{
Message = response.Message.Text
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
var evt = new Output { Message = response.Message.Text };
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed { UserId = this.AgentId.Key, UserMessage = "Goodbye" };
await PublishMessageAsync(goodbye).ConfigureAwait(false);
}
}

View File

@ -46,14 +46,14 @@ namespace Hello
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
};
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
};
await PublishMessageAsync(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
@ -61,8 +61,8 @@ namespace Hello
var evt = new Output
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
};
await PublishMessageAsync(evt).ConfigureAwait(false);
//sleep30 seconds
await Task.Delay(30000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);

View File

@ -37,17 +37,14 @@ namespace Hello
public async Task Handle(NewMessageReceived item)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
var evt = new Output { Message = response };
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
};
await PublishMessageAsync(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
@ -55,8 +52,8 @@ namespace Hello
var evt = new Output
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
};
await PublishMessageAsync(evt).ConfigureAwait(false);
// Signal shutdown.
hostApplicationLifetime.StopApplication();

View File

@ -31,20 +31,20 @@ namespace Hello
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
};
var entry = "We said hello to " + item.Message;
await Store(new AgentState
{
AgentId = this.AgentId,
TextData = entry
}).ConfigureAwait(false);
await PublishEvent(evt).ConfigureAwait(false);
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
};
await PublishMessageAsync(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
@ -54,8 +54,8 @@ namespace Hello
var evt = new Output
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
};
await PublishMessageAsync(evt).ConfigureAwait(false);
//sleep
await Task.Delay(10000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);

View File

@ -24,8 +24,8 @@ public class Dev(IAgentContext context, Kernel kernel, ISemanticTextMemory memor
Repo = item.Repo,
IssueNumber = item.IssueNumber,
Code = code
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task Handle(CodeChainClosed item)
@ -35,8 +35,8 @@ public class Dev(IAgentContext context, Kernel kernel, ISemanticTextMemory memor
var evt = new CodeCreated
{
Code = lastCode
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task<string> GenerateCode(string ask)

View File

@ -25,8 +25,8 @@ public class DeveloperLead(IAgentContext context, Kernel kernel, ISemanticTextMe
Repo = item.Repo,
IssueNumber = item.IssueNumber,
Plan = plan
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task Handle(DevPlanChainClosed item)
@ -36,8 +36,8 @@ public class DeveloperLead(IAgentContext context, Kernel kernel, ISemanticTextMe
var evt = new DevPlanCreated
{
Plan = lastPlan
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task<string> CreatePlan(string ask)
{

View File

@ -22,8 +22,8 @@ public class ProductManager(IAgentContext context, Kernel kernel, ISemanticTextM
var evt = new ReadmeCreated
{
Readme = lastReadme
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task Handle(ReadmeRequested item)
@ -35,8 +35,8 @@ public class ProductManager(IAgentContext context, Kernel kernel, ISemanticTextM
Org = item.Org,
Repo = item.Repo,
IssueNumber = item.IssueNumber
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public async Task<string> CreateReadme(string ask)

View File

@ -20,7 +20,7 @@ public class AzureGenie(IAgentContext context, Kernel kernel, ISemanticTextMemor
// TODO: Not sure we need to store the files if we use ACA Sessions
// //var data = item.ToData();
// // await Store(data["org"], data["repo"], data.TryParseLong("parentNumber"), data.TryParseLong("issueNumber"), "readme", "md", "output", data["readme"]);
// await PublishEvent(new Event
// await PublishEventAsync(new Event
// {
// Namespace = item.Namespace,
// Type = nameof(EventTypes.ReadmeStored),
@ -36,7 +36,7 @@ public class AzureGenie(IAgentContext context, Kernel kernel, ISemanticTextMemor
// //var data = item.ToData();
// // await Store(data["org"], data["repo"], data.TryParseLong("parentNumber"), data.TryParseLong("issueNumber"), "run", "sh", "output", data["code"]);
// // await RunInSandbox(data["org"], data["repo"], data.TryParseLong("parentNumber"), data.TryParseLong("issueNumber"));
// await PublishEvent(new Event
// await PublishEventAsync(new Event
// {
// Namespace = item.Namespace,
// Type = nameof(EventTypes.SandboxRunCreated),

View File

@ -50,7 +50,7 @@
// if (await _azService.IsSandboxCompleted(sandboxId))
// {
// await _azService.DeleteSandbox(sandboxId);
// await PublishEvent(new Event
// await PublishEventAsync(new Event
// {
// Namespace = this.GetPrimaryKeyString(),
// Type = nameof(GithubFlowEventType.SandboxRunFinished),

View File

@ -17,5 +17,5 @@ public interface IAgentBase
void ReceiveMessage(Message message);
Task Store(AgentState state);
Task<T> Read<T>(AgentId agentId) where T : IMessage, new();
ValueTask PublishEvent(CloudEvent item);
ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default);
}

View File

@ -197,9 +197,16 @@ public abstract class AgentBase : IAgentBase, IHandle
return await completion.Task.ConfigureAwait(false);
}
public async ValueTask PublishEvent(CloudEvent item)
public async ValueTask PublishMessageAsync<T>(T message, string? source = null, CancellationToken token = default) where T : IMessage
{
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
var src = string.IsNullOrWhiteSpace(source) ? this.AgentId.Key : source;
var evt = message.ToCloudEvent(src);
await PublishEventAsync(evt, token).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent item, CancellationToken token = default)
{
var activity = s_source.StartActivity($"PublishEventAsync '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");
// TODO: fix activity

View File

@ -12,7 +12,7 @@ public sealed class AgentWorker(IAgentWorkerRuntime runtime, DistributedContextP
[FromKeyedServices("EventTypes")] EventTypes eventTypes, ILogger<AgentBase> logger)
: AgentBase(new AgentContext(new AgentId("client", Guid.NewGuid().ToString()), runtime, logger, distributedContextPropagator), eventTypes)
{
public async ValueTask PublishEventAsync(CloudEvent evt) => await PublishEvent(evt);
public async ValueTask PublishEventAsync(CloudEvent evt) => await base.PublishEventAsync(evt);
public async ValueTask PublishEventAsync(string topic, IMessage evt)
{

View File

@ -27,8 +27,8 @@ public abstract class ConsoleAgent : IOAgent,
var evt = new InputProcessed
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override async Task Handle(Output item)
@ -40,8 +40,8 @@ public abstract class ConsoleAgent : IOAgent,
var evt = new OutputWritten
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override Task<string> ProcessInput(string message)

View File

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IHandleConsole.cs
using Google.Protobuf;
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Agents;
@ -9,7 +10,7 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>
{
string Route { get; }
AgentId AgentId { get; }
ValueTask PublishEvent(CloudEvent item);
ValueTask PublishMessageAsync<T>(T message, string? source = null, CancellationToken token = default) where T : IMessage;
async Task IHandle<Output>.Handle(Output item)
{
@ -20,8 +21,8 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>
var evt = new OutputWritten
{
Route = "console"
}.ToCloudEvent(AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
async Task IHandle<Input>.Handle(Input item)
{
@ -33,8 +34,8 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>
var evt = new InputProcessed
{
Route = "console"
}.ToCloudEvent(AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
static Task ProcessOutput(string message)
{

View File

@ -29,8 +29,8 @@ public abstract class FileAgent(
var err = new IOError
{
Message = errorMessage
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(err);
};
await PublishMessageAsync(err);
return;
}
string content;
@ -42,8 +42,8 @@ public abstract class FileAgent(
var evt = new InputProcessed
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override async Task Handle(Output item)
{
@ -54,16 +54,16 @@ public abstract class FileAgent(
var evt = new OutputWritten
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override async Task<string> ProcessInput(string message)
{
var evt = new InputProcessed
{
Route = _route,
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
return message;
}
public override Task ProcessOutput(string message)

View File

@ -17,8 +17,8 @@ public abstract class IOAgent : AgentBase
var evt = new InputProcessed
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public virtual async Task Handle(Output item)
@ -26,8 +26,8 @@ public abstract class IOAgent : AgentBase
var evt = new OutputWritten
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public abstract Task ProcessInput(string message);

View File

@ -61,8 +61,8 @@ public abstract class WebAPIAgent : IOAgent,
var evt = new InputProcessed
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override async Task Handle(Output item)
@ -71,8 +71,8 @@ public abstract class WebAPIAgent : IOAgent,
var evt = new OutputWritten
{
Route = _route
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt);
};
await PublishMessageAsync(evt);
}
public override Task<string> ProcessInput(string message)

View File

@ -1,30 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// CloudEventExtensions.cs
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Microsoft.AutoGen.Abstractions;
namespace Microsoft.AutoGen.Runtime;
public static class CloudEventExtensions
{
public static CloudEvent ToCloudEvent<T>(this T message, string source) where T : IMessage
{
return new CloudEvent
{
ProtoData = Any.Pack(message),
Type = message.Descriptor.FullName,
Source = source,
Id = Guid.NewGuid().ToString()
};
}
public static T FromCloudEvent<T>(this CloudEvent cloudEvent) where T : IMessage, new()
{
return cloudEvent.ProtoData.Unpack<T>();
}
}

View File

@ -1,17 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="../../Agents/Microsoft.AutoGen.Agents.csproj" />
</ItemGroup>
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" />
</ItemGroup>
</Project>

View File

@ -1,9 +0,0 @@
syntax = "proto3";
package devteam;
option csharp_namespace = "Microsoft.AutoGen.Extensions.CloudEvents";
message CloudEventsState {
string cloudevent = 1;
}