From 54c6382e89c6d1093e8eebc91aea5e432ab7702a Mon Sep 17 00:00:00 2001 From: Kosta Petan Date: Tue, 1 Oct 2024 18:31:26 +0200 Subject: [PATCH] fix dotnet distributed tracing (#4) * fix dotnet distributed tracing * regenerate protos for python --- .../AgentBase.cs | 7 +++--- .../AgentWorkerRuntime.cs | 2 +- .../WorkerGateway.cs | 1 - protos/cloudevent.proto | 9 ++++---- .../application/protos/cloudevent_pb2.py | 16 +++++++++----- .../application/protos/cloudevent_pb2.pyi | 22 ++++++++++++++++++- 6 files changed, 40 insertions(+), 17 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentBase.cs b/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentBase.cs index c34b276827..b9a3a8b360 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentBase.cs @@ -87,8 +87,7 @@ public abstract class AgentBase { case Message.MessageOneofCase.CloudEvent: { - // TODO: fix activity extraction - var activity = default(Activity); // ExtractActivity(msg.Event.Type, msg.Event.Attributes); + var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata); await this.InvokeWithActivityAsync( static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item), (this, msg.CloudEvent), @@ -190,11 +189,11 @@ public abstract class AgentBase { //TODO: Reimplement var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default); - //activity?.SetTag("peer.service", $"{item.DataType}/{item.Namespace}"); + activity?.SetTag("peer.service", $"{item.Type}/{item.Source}"); var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // TODO: fix activity - //Context.DistributedContextPropagator.Inject(activity, item., static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); + Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary)carrier!)[key] = value); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource) state) => { diff --git a/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentWorkerRuntime.cs index 9f352f3170..ea07f511fb 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents.Client/AgentWorkerRuntime.cs @@ -78,7 +78,7 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork message.Response.RequestId = request.OriginalRequestId; request.Agent.ReceiveMessage(message); break; - case Message.MessageOneofCase.Event: + case Message.MessageOneofCase.CloudEvent: // TODO: Reimplement // HACK: Send the message to an instance of each agent type diff --git a/dotnet/src/Microsoft.AutoGen.Agents.Runtime/WorkerGateway.cs b/dotnet/src/Microsoft.AutoGen.Agents.Runtime/WorkerGateway.cs index 1b701e7dc9..742aa823a0 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents.Runtime/WorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents.Runtime/WorkerGateway.cs @@ -48,7 +48,6 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway public async ValueTask InvokeRequest(RpcRequest request) { - //TODO: Reimplement (string Type, string Key) agentId = (request.Target.Type, request.Target.Key); if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) { diff --git a/protos/cloudevent.proto b/protos/cloudevent.proto index 92ee6d9734..f710ee874a 100644 --- a/protos/cloudevent.proto +++ b/protos/cloudevent.proto @@ -20,12 +20,13 @@ message CloudEvent { // Optional & Extension Attributes map attributes = 5; + map metadata = 6; // -- CloudEvent Data (Bytes, Text, or Proto) oneof data { - bytes binary_data = 6; - string text_data = 7; - google.protobuf.Any proto_data = 8; + bytes binary_data = 7; + string text_data = 8; + google.protobuf.Any proto_data = 9; } /** @@ -45,4 +46,4 @@ message CloudEvent { google.protobuf.Timestamp ce_timestamp = 7; } } -} +} \ No newline at end of file diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.py index 4242930fc9..f20699395f 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.py @@ -16,7 +16,7 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63loudevent.proto\x12\ncloudevent\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xa2\x04\n\nCloudEvent\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\x12\x14\n\x0cspec_version\x18\x03 \x01(\t\x12\x0c\n\x04type\x18\x04 \x01(\t\x12:\n\nattributes\x18\x05 \x03(\x0b\x32&.cloudevent.CloudEvent.AttributesEntry\x12\x15\n\x0b\x62inary_data\x18\x06 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x07 \x01(\tH\x00\x12*\n\nproto_data\x18\x08 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x1a\x62\n\x0f\x41ttributesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12>\n\x05value\x18\x02 \x01(\x0b\x32/.cloudevent.CloudEvent.CloudEventAttributeValue:\x02\x38\x01\x1a\xd3\x01\n\x18\x43loudEventAttributeValue\x12\x14\n\nce_boolean\x18\x01 \x01(\x08H\x00\x12\x14\n\nce_integer\x18\x02 \x01(\x05H\x00\x12\x13\n\tce_string\x18\x03 \x01(\tH\x00\x12\x12\n\x08\x63\x65_bytes\x18\x04 \x01(\x0cH\x00\x12\x10\n\x06\x63\x65_uri\x18\x05 \x01(\tH\x00\x12\x14\n\nce_uri_ref\x18\x06 \x01(\tH\x00\x12\x32\n\x0c\x63\x65_timestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x42\x06\n\x04\x61ttrB\x06\n\x04\x64\x61taB(\xaa\x02%Microsoft.AutoGen.Agents.Abstractionsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63loudevent.proto\x12\ncloudevent\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x8b\x05\n\nCloudEvent\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\x12\x14\n\x0cspec_version\x18\x03 \x01(\t\x12\x0c\n\x04type\x18\x04 \x01(\t\x12:\n\nattributes\x18\x05 \x03(\x0b\x32&.cloudevent.CloudEvent.AttributesEntry\x12\x36\n\x08metadata\x18\x06 \x03(\x0b\x32$.cloudevent.CloudEvent.MetadataEntry\x12\x15\n\x0b\x62inary_data\x18\x07 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x08 \x01(\tH\x00\x12*\n\nproto_data\x18\t \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x1a\x62\n\x0f\x41ttributesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12>\n\x05value\x18\x02 \x01(\x0b\x32/.cloudevent.CloudEvent.CloudEventAttributeValue:\x02\x38\x01\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xd3\x01\n\x18\x43loudEventAttributeValue\x12\x14\n\nce_boolean\x18\x01 \x01(\x08H\x00\x12\x14\n\nce_integer\x18\x02 \x01(\x05H\x00\x12\x13\n\tce_string\x18\x03 \x01(\tH\x00\x12\x12\n\x08\x63\x65_bytes\x18\x04 \x01(\x0cH\x00\x12\x10\n\x06\x63\x65_uri\x18\x05 \x01(\tH\x00\x12\x14\n\nce_uri_ref\x18\x06 \x01(\tH\x00\x12\x32\n\x0c\x63\x65_timestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x42\x06\n\x04\x61ttrB\x06\n\x04\x64\x61taB(\xaa\x02%Microsoft.AutoGen.Agents.Abstractionsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -26,10 +26,14 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._serialized_options = b'\252\002%Microsoft.AutoGen.Agents.Abstractions' _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._options = None _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_options = b'8\001' + _globals['_CLOUDEVENT_METADATAENTRY']._options = None + _globals['_CLOUDEVENT_METADATAENTRY']._serialized_options = b'8\001' _globals['_CLOUDEVENT']._serialized_start=93 - _globals['_CLOUDEVENT']._serialized_end=639 - _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_start=319 - _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_end=417 - _globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_start=420 - _globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_end=631 + _globals['_CLOUDEVENT']._serialized_end=744 + _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_start=375 + _globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_end=473 + _globals['_CLOUDEVENT_METADATAENTRY']._serialized_start=475 + _globals['_CLOUDEVENT_METADATAENTRY']._serialized_end=522 + _globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_start=525 + _globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_end=736 # @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.pyi index 0db31c1020..1cf61a523f 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/cloudevent_pb2.pyi @@ -36,6 +36,22 @@ class CloudEvent(google.protobuf.message.Message): def HasField(self, field_name: typing.Literal["value", b"value"]) -> builtins.bool: ... def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ... + @typing.final + class MetadataEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ... + @typing.final class CloudEventAttributeValue(google.protobuf.message.Message): """* @@ -80,6 +96,7 @@ class CloudEvent(google.protobuf.message.Message): SPEC_VERSION_FIELD_NUMBER: builtins.int TYPE_FIELD_NUMBER: builtins.int ATTRIBUTES_FIELD_NUMBER: builtins.int + METADATA_FIELD_NUMBER: builtins.int BINARY_DATA_FIELD_NUMBER: builtins.int TEXT_DATA_FIELD_NUMBER: builtins.int PROTO_DATA_FIELD_NUMBER: builtins.int @@ -98,6 +115,8 @@ class CloudEvent(google.protobuf.message.Message): def attributes(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, global___CloudEvent.CloudEventAttributeValue]: """Optional & Extension Attributes""" + @property + def metadata(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ... @property def proto_data(self) -> google.protobuf.any_pb2.Any: ... def __init__( @@ -108,12 +127,13 @@ class CloudEvent(google.protobuf.message.Message): spec_version: builtins.str = ..., type: builtins.str = ..., attributes: collections.abc.Mapping[builtins.str, global___CloudEvent.CloudEventAttributeValue] | None = ..., + metadata: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., binary_data: builtins.bytes = ..., text_data: builtins.str = ..., proto_data: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField(self, field_name: typing.Literal["binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ... + def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "metadata", b"metadata", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ... def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ... global___CloudEvent = CloudEvent