Add tags to spans to provide additional metadata

This commit is contained in:
Lukas Joswiak 2021-01-12 14:01:48 -08:00
parent f6f53c7a73
commit 3f7e19a182
5 changed files with 47 additions and 23 deletions

View File

@ -1818,11 +1818,6 @@ public:
void addref() override { ReferenceCounted<UDPSimSocket>::addref(); }
void delref() override { ReferenceCounted<UDPSimSocket>::delref(); }
int sendSynchronous(uint8_t const* begin, uint8_t const* end) override {
// TODO: Implement!
return 0;
}
Future<int> send(uint8_t const* begin, uint8_t const* end) override {
int sz = int(end - begin);
auto res = fmap([sz](Void){ return sz; }, delay(0.0));
@ -1906,7 +1901,6 @@ public:
}
boost::asio::ip::udp::socket::native_handle_type native_handle() override {
// TODO: Implement!
return 0;
}

View File

@ -609,17 +609,6 @@ public:
res);
}
int sendSynchronous(uint8_t const* begin, uint8_t const* end) override {
try {
return socket.send(boost::asio::const_buffer(begin, end - begin));
} catch (boost::system::system_error& e) {
TraceEvent(SevError, "SynchronousSendError").detail("Message", e.what());
} catch (...) {
TraceEvent(SevError, "SychronousSendUnknownError");
}
return -1;
}
Future<int> send(uint8_t const* begin, uint8_t const* end) override {
++g_net2->countUDPWrites;
ReadPromise p("N2_UDPWriteError", id);

View File

@ -55,6 +55,9 @@ struct LogfileTracer : ITracer {
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
}
}
for (const auto& [key, value] : span.tags) {
TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value);
}
}
};
@ -172,7 +175,7 @@ protected:
// fluentd filter to be able to correctly parse the updated format! See
// the msgpack specification for more info on the bit patterns used
// here.
uint8_t size = 7;
uint8_t size = 8;
if (span.parents.size() == 0) --size;
request.write_byte(size | 0b10010000); // write as array
@ -186,6 +189,8 @@ protected:
serialize_string(span.location.name.toString(), request);
serialize_map(span.tags, request);
serialize_vector(span.parents, request);
}
@ -246,10 +251,28 @@ private:
serialize_value(parentContext.second(), request, 0xcf);
}
}
inline void serialize_map(const std::unordered_map<std::string, std::string>& map, TraceRequest& request) {
int size = map.size();
if (size <= 15) {
request.write_byte(static_cast<uint8_t>(size) | 0b10000000);
} else {
// TODO: Add support for longer maps if necessary.
ASSERT(false);
}
for (const auto& [key, value] : map) {
serialize_string(key, request);
serialize_string(value, request);
}
}
};
struct AsyncUDPTracer : public UDPTracer {
public:
AsyncUDPTracer() : pending_messages_(0), send_error_(false) {}
~AsyncUDPTracer() override {
while (!buffers_.empty()) {
auto& request = buffers_.front();
@ -313,7 +336,10 @@ ACTOR Future<Void> fastTraceLogger(int* unreadyMessages, int* failedMessages, in
state bool sendErrorReset = false;
loop {
TraceEvent("TracingSpanStats").detail("UnreadyMessages", *unreadyMessages).detail("FailedMessages", *failedMessages).detail("TotalMessages", *totalMessages);
TraceEvent("TracingSpanStats").detail("UnreadyMessages", *unreadyMessages)
.detail("FailedMessages", *failedMessages)
.detail("TotalMessages", *totalMessages)
.detail("SendError", *sendError);
if (sendErrorReset) {
sendErrorReset = false;
@ -327,7 +353,7 @@ ACTOR Future<Void> fastTraceLogger(int* unreadyMessages, int* failedMessages, in
}
struct FastUDPTracer : public UDPTracer {
FastUDPTracer() : socket_fd_(-1) {
FastUDPTracer() : socket_fd_(-1), unready_socket_messages_(0), failed_messages_(0), total_messages_(0), send_error_(false) {
request_ = TraceRequest{
.buffer = new uint8_t[kTraceBufferSize],
.data_size = 0,
@ -375,6 +401,7 @@ struct FastUDPTracer : public UDPTracer {
if (bytesSent == -1) {
// Will forgo checking errno here, and assume all error messages
// should be treated the same.
++failed_messages_;
send_error_ = true;
}
request_.reset();
@ -383,7 +410,6 @@ struct FastUDPTracer : public UDPTracer {
private:
TraceRequest request_;
// Temporary debugging messages. TODO: Remove
int unready_socket_messages_;
int failed_messages_;
int total_messages_;
@ -437,6 +463,7 @@ Span& Span::operator=(Span&& o) {
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.begin = 0;
return *this;
}

View File

@ -68,13 +68,28 @@ struct Span {
std::swap(parents, other.parents);
}
void addParent(SpanID span) { parents.push_back(arena, span); }
void addParent(SpanID span) {
if (parents.size() == 0) {
// Use first parent to set trace ID. This is non-ideal for spans
// with multiple parents, because the trace ID will associate the
// span with only one trace. A workaround is to look at the parent
// relationships instead of the trace ID. Another option in the
// future is to keep a list of trace IDs.
context = SpanID(span.first(), context.second());
}
parents.push_back(arena, span);
}
void addTag(const std::string& key, const std::string value) {
tags[key] = value;
}
Arena arena;
UID context = UID();
double begin = 0.0, end = 0.0;
Location location;
SmallVectorRef<SpanID> parents;
std::unordered_map<std::string, std::string> tags;
};
enum class TracerType {

View File

@ -566,7 +566,6 @@ public:
virtual void delref() = 0;
virtual void close() = 0;
virtual int sendSynchronous(uint8_t const* begin, uint8_t const* end) = 0;
virtual Future<int> send(uint8_t const* begin, uint8_t const* end) = 0;
virtual Future<int> sendTo(uint8_t const* begin, uint8_t const* end, NetworkAddress const& peer) = 0;
virtual Future<int> receive(uint8_t* begin, uint8_t* end) = 0;