fix dotnet distributed tracing (#4)

* fix dotnet distributed tracing

* regenerate protos for python
This commit is contained in:
Kosta Petan 2024-10-01 18:31:26 +02:00 committed by GitHub
parent 7fade2d5e7
commit 54c6382e89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 40 additions and 17 deletions

View File

@ -87,8 +87,7 @@ public abstract class AgentBase
{
case Message.MessageOneofCase.CloudEvent:
{
// TODO: fix activity extraction
var activity = default(Activity); // ExtractActivity(msg.Event.Type, msg.Event.Attributes);
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item),
(this, msg.CloudEvent),
@ -190,11 +189,11 @@ public abstract class AgentBase
{
//TODO: Reimplement
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
//activity?.SetTag("peer.service", $"{item.DataType}/{item.Namespace}");
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");
var completion = new TaskCompletionSource<CloudEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
// TODO: fix activity
//Context.DistributedContextPropagator.Inject(activity, item., static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource<CloudEvent>) state) =>
{

View File

@ -78,7 +78,7 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
message.Response.RequestId = request.OriginalRequestId;
request.Agent.ReceiveMessage(message);
break;
case Message.MessageOneofCase.Event:
case Message.MessageOneofCase.CloudEvent:
// TODO: Reimplement
// HACK: Send the message to an instance of each agent type

View File

@ -48,7 +48,6 @@ internal sealed class WorkerGateway : BackgroundService, IWorkerGateway
public async ValueTask<RpcResponse> InvokeRequest(RpcRequest request)
{
//TODO: Reimplement
(string Type, string Key) agentId = (request.Target.Type, request.Target.Key);
if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted)
{

View File

@ -20,12 +20,13 @@ message CloudEvent {
// Optional & Extension Attributes
map<string, CloudEventAttributeValue> attributes = 5;
map<string, string> metadata = 6;
// -- CloudEvent Data (Bytes, Text, or Proto)
oneof data {
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
bytes binary_data = 7;
string text_data = 8;
google.protobuf.Any proto_data = 9;
}
/**
@ -45,4 +46,4 @@ message CloudEvent {
google.protobuf.Timestamp ce_timestamp = 7;
}
}
}
}

View File

@ -16,7 +16,7 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63loudevent.proto\x12\ncloudevent\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xa2\x04\n\nCloudEvent\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\x12\x14\n\x0cspec_version\x18\x03 \x01(\t\x12\x0c\n\x04type\x18\x04 \x01(\t\x12:\n\nattributes\x18\x05 \x03(\x0b\x32&.cloudevent.CloudEvent.AttributesEntry\x12\x15\n\x0b\x62inary_data\x18\x06 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x07 \x01(\tH\x00\x12*\n\nproto_data\x18\x08 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x1a\x62\n\x0f\x41ttributesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12>\n\x05value\x18\x02 \x01(\x0b\x32/.cloudevent.CloudEvent.CloudEventAttributeValue:\x02\x38\x01\x1a\xd3\x01\n\x18\x43loudEventAttributeValue\x12\x14\n\nce_boolean\x18\x01 \x01(\x08H\x00\x12\x14\n\nce_integer\x18\x02 \x01(\x05H\x00\x12\x13\n\tce_string\x18\x03 \x01(\tH\x00\x12\x12\n\x08\x63\x65_bytes\x18\x04 \x01(\x0cH\x00\x12\x10\n\x06\x63\x65_uri\x18\x05 \x01(\tH\x00\x12\x14\n\nce_uri_ref\x18\x06 \x01(\tH\x00\x12\x32\n\x0c\x63\x65_timestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x42\x06\n\x04\x61ttrB\x06\n\x04\x64\x61taB(\xaa\x02%Microsoft.AutoGen.Agents.Abstractionsb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63loudevent.proto\x12\ncloudevent\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x8b\x05\n\nCloudEvent\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\x12\x14\n\x0cspec_version\x18\x03 \x01(\t\x12\x0c\n\x04type\x18\x04 \x01(\t\x12:\n\nattributes\x18\x05 \x03(\x0b\x32&.cloudevent.CloudEvent.AttributesEntry\x12\x36\n\x08metadata\x18\x06 \x03(\x0b\x32$.cloudevent.CloudEvent.MetadataEntry\x12\x15\n\x0b\x62inary_data\x18\x07 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x08 \x01(\tH\x00\x12*\n\nproto_data\x18\t \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x1a\x62\n\x0f\x41ttributesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12>\n\x05value\x18\x02 \x01(\x0b\x32/.cloudevent.CloudEvent.CloudEventAttributeValue:\x02\x38\x01\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xd3\x01\n\x18\x43loudEventAttributeValue\x12\x14\n\nce_boolean\x18\x01 \x01(\x08H\x00\x12\x14\n\nce_integer\x18\x02 \x01(\x05H\x00\x12\x13\n\tce_string\x18\x03 \x01(\tH\x00\x12\x12\n\x08\x63\x65_bytes\x18\x04 \x01(\x0cH\x00\x12\x10\n\x06\x63\x65_uri\x18\x05 \x01(\tH\x00\x12\x14\n\nce_uri_ref\x18\x06 \x01(\tH\x00\x12\x32\n\x0c\x63\x65_timestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x42\x06\n\x04\x61ttrB\x06\n\x04\x64\x61taB(\xaa\x02%Microsoft.AutoGen.Agents.Abstractionsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -26,10 +26,14 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._serialized_options = b'\252\002%Microsoft.AutoGen.Agents.Abstractions'
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._options = None
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_options = b'8\001'
_globals['_CLOUDEVENT_METADATAENTRY']._options = None
_globals['_CLOUDEVENT_METADATAENTRY']._serialized_options = b'8\001'
_globals['_CLOUDEVENT']._serialized_start=93
_globals['_CLOUDEVENT']._serialized_end=639
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_start=319
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_end=417
_globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_start=420
_globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_end=631
_globals['_CLOUDEVENT']._serialized_end=744
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_start=375
_globals['_CLOUDEVENT_ATTRIBUTESENTRY']._serialized_end=473
_globals['_CLOUDEVENT_METADATAENTRY']._serialized_start=475
_globals['_CLOUDEVENT_METADATAENTRY']._serialized_end=522
_globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_start=525
_globals['_CLOUDEVENT_CLOUDEVENTATTRIBUTEVALUE']._serialized_end=736
# @@protoc_insertion_point(module_scope)

View File

@ -36,6 +36,22 @@ class CloudEvent(google.protobuf.message.Message):
def HasField(self, field_name: typing.Literal["value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ...
@typing.final
class MetadataEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
value: builtins.str
def __init__(
self,
*,
key: builtins.str = ...,
value: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ...
@typing.final
class CloudEventAttributeValue(google.protobuf.message.Message):
"""*
@ -80,6 +96,7 @@ class CloudEvent(google.protobuf.message.Message):
SPEC_VERSION_FIELD_NUMBER: builtins.int
TYPE_FIELD_NUMBER: builtins.int
ATTRIBUTES_FIELD_NUMBER: builtins.int
METADATA_FIELD_NUMBER: builtins.int
BINARY_DATA_FIELD_NUMBER: builtins.int
TEXT_DATA_FIELD_NUMBER: builtins.int
PROTO_DATA_FIELD_NUMBER: builtins.int
@ -98,6 +115,8 @@ class CloudEvent(google.protobuf.message.Message):
def attributes(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, global___CloudEvent.CloudEventAttributeValue]:
"""Optional & Extension Attributes"""
@property
def metadata(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
@property
def proto_data(self) -> google.protobuf.any_pb2.Any: ...
def __init__(
@ -108,12 +127,13 @@ class CloudEvent(google.protobuf.message.Message):
spec_version: builtins.str = ...,
type: builtins.str = ...,
attributes: collections.abc.Mapping[builtins.str, global___CloudEvent.CloudEventAttributeValue] | None = ...,
metadata: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
binary_data: builtins.bytes = ...,
text_data: builtins.str = ...,
proto_data: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ...
def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "metadata", b"metadata", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ...
global___CloudEvent = CloudEvent