diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 21d498e1ab..521d1d8719 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1913,6 +1913,7 @@ ACTOR Future> getValue( Future version, Key key, Databa { state Version ver = wait( version ); state Span span("NAPI:getValue"_loc, info.spanID); + span.addTag("key"_sr, key); cx->validateVersion(ver); loop { diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 6f3157da30..f97eb07b1b 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1891,6 +1891,10 @@ public: return _localAddress; } + boost::asio::ip::udp::socket::native_handle_type native_handle() override { + return 0; + } + }; Future> Sim2::createUDPSocket(NetworkAddress toAddr) { diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 3bf9ab866f..1f3b9f5587 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1241,6 +1241,10 @@ private: openTracer(TracerType::DISABLED); } else if (tracer == "logfile" || tracer == "file" || tracer == "log_file") { openTracer(TracerType::LOG_FILE); + } else if (tracer == "network_async") { + openTracer(TracerType::NETWORK_ASYNC); + } else if (tracer == "network_lossy") { + openTracer(TracerType::NETWORK_LOSSY); } else { fprintf(stderr, "ERROR: Unknown or unsupported tracer: `%s'", args.OptionArg()); printHelpTeaser(argv[0]); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 59b99abaaf..e5b34d3863 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -971,6 +971,7 @@ ACTOR Future waitForVersionNoTooOld( StorageServer* data, Version versi ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { state int64_t resultSize = 0; Span span("SS:getValue"_loc, { req.spanContext }); + span.addTag("key"_sr, req.key); try { ++data->counters.getValueQueries; @@ -2876,7 +2877,6 @@ private: ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) { state double start; - state Span span("SS:update"_loc); try { // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory // This is often referred to as the storage server e-brake (emergency brake) @@ -3019,6 +3019,7 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) state Version ver = invalidVersion; cloneCursor2->setProtocolVersion(data->logProtocol); + state SpanID spanContext = SpanID(); for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) { if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) { mutationBytes = 0; @@ -3048,12 +3049,15 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) { SpanContextMessage scm; rd >> scm; - span.addParent(scm.spanContext); + spanContext = scm.spanContext; } else { MutationRef msg; rd >> msg; + Span span("SS:update"_loc, { spanContext }); + span.addTag("key"_sr, msg.param1); + if (ver != invalidVersion) { // This change belongs to a version < minVersion DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID); if (ver == 1) { diff --git a/flow/CMakeLists.txt b/flow/CMakeLists.txt index 124d6b375d..f98e6ab67a 100644 --- a/flow/CMakeLists.txt +++ b/flow/CMakeLists.txt @@ -64,7 +64,7 @@ set(FLOW_SRCS Trace.cpp Trace.h Tracing.h - Tracing.cpp + Tracing.actor.cpp TreeBenchmark.h UnitTest.cpp UnitTest.h diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 0e77aa1aac..a83100e9db 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -62,6 +62,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) { init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 ); init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false; + init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option. //connectionMonitor init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index 3570d223b9..1450af992a 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -70,6 +70,7 @@ public: double HUGE_ARENA_LOGGING_INTERVAL; bool WRITE_TRACING_ENABLED; + int TRACING_UDP_LISTENER_PORT; //run loop profiling double RUN_LOOP_PROFILING_INTERVAL; diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 7f7d6b1624..66541b87d2 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -650,11 +650,15 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } - NetworkAddress localAddress() const { + NetworkAddress localAddress() const override { auto endpoint = socket.local_endpoint(); return NetworkAddress(toIPAddress(endpoint.address()), endpoint.port(), isPublic, false); } + boost::asio::ip::udp::socket::native_handle_type native_handle() override { + return socket.native_handle(); + } + private: UDPSocket(boost::asio::io_service& io_service, Optional toAddress, bool isV6) : id(nondeterministicRandom()->randomUniqueID()), socket(io_service, isV6 ? udp::v6() : udp::v4()) { diff --git a/flow/Tracing.actor.cpp b/flow/Tracing.actor.cpp new file mode 100644 index 0000000000..c713e6f4f8 --- /dev/null +++ b/flow/Tracing.actor.cpp @@ -0,0 +1,486 @@ +/* + * Tracing.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/Tracing.h" + +#include "flow/network.h" + +#include +#include + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +// Initial size of buffer used to store serialized traces. Buffer will be +// resized when necessary. +constexpr int kTraceBufferSize = 1024; + +// The time interval between each report of the tracer queue size (seconds). +constexpr float kQueueSizeLogInterval = 5.0; + +struct NoopTracer : ITracer { + TracerType type() const { return TracerType::DISABLED; } + void trace(Span const& span) override {} +}; + +struct LogfileTracer : ITracer { + TracerType type() const override { return TracerType::LOG_FILE; } + void trace(Span const& span) override { + TraceEvent te(SevInfo, "TracingSpan", span.context); + te.detail("Location", span.location.name) + .detail("Begin", format("%.6f", span.begin)) + .detail("End", format("%.6f", span.end)); + if (span.parents.size() == 1) { + te.detail("Parent", *span.parents.begin()); + } else { + for (auto parent : span.parents) { + TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent); + } + } + for (const auto& [key, value] : span.tags) { + TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value); + } + } +}; + +struct TraceRequest { + uint8_t* buffer; + // Amount of data in buffer (bytes). + std::size_t data_size; + // Size of buffer (bytes). + std::size_t buffer_size; + + void write_byte(uint8_t byte) { + write_bytes(&byte, 1); + } + + void write_bytes(const uint8_t* buf, std::size_t n) { + resize(n); + std::copy(buf, buf + n, buffer + data_size); + data_size += n; + } + + void resize(std::size_t n) { + if (data_size + n <= buffer_size) { + return; + } + + std::size_t size = buffer_size; + while (size < data_size + n) { + size *= 2; + } + + TraceEvent(SevInfo, "TracingSpanResizedBuffer").detail("OldSize", buffer_size).detail("NewSize", size); + uint8_t* new_buffer = new uint8_t[size]; + std::copy(buffer, buffer + data_size, new_buffer); + free(buffer); + buffer = new_buffer; + buffer_size = size; + } + + void reset() { + data_size = 0; + } +}; + +// A server listening for UDP trace messages, run only in simulation. +ACTOR Future simulationStartServer() { + TraceEvent(SevInfo, "UDPServerStarted").detail("Port", FLOW_KNOBS->TRACING_UDP_LISTENER_PORT); + state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT)); + state Reference serverSocket = wait(INetworkConnections::net()->createUDPSocket(localAddress)); + serverSocket->bind(localAddress); + + state Standalone packetString = makeString(IUDPSocket::MAX_PACKET_SIZE); + state uint8_t* packet = mutateString(packetString); + + loop { + int size = wait(serverSocket->receive(packet, packet + IUDPSocket::MAX_PACKET_SIZE)); + auto message = packetString.substr(0, size); + + // For now, just check the first byte in the message matches. Data is + // currently written as an array, so first byte should match msgpack + // array notation. In the future, the entire message should be + // deserialized to make sure all data is written correctly. + ASSERT(message[0] == (4 | 0b10010000) || (5 | 0b10010000)); + } +} + +ACTOR Future traceSend(FutureStream inputStream, std::queue* buffers, int* pendingMessages, bool* sendError) { + state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT)); + state Reference socket = wait(INetworkConnections::net()->createUDPSocket(localAddress)); + + loop choose { + when(state TraceRequest request = waitNext(inputStream)) { + try { + if (!(*sendError)) { + int bytesSent = wait(socket->send(request.buffer, request.buffer + request.data_size)); + TEST(bytesSent > 0); // Successfully sent serialized trace + } + --(*pendingMessages); + request.reset(); + buffers->push(request); + } catch (Error& e) { + TraceEvent("TracingSpanSendError").detail("Error", e.what()); + *sendError = true; + } + } + } +} + +// Runs on an interval, printing debug information and performing other +// connection tasks. +ACTOR Future traceLog(int* pendingMessages, bool* sendError) { + state bool sendErrorReset = false; + + loop { + TraceEvent("TracingSpanQueueSize").detail("PendingMessages", *pendingMessages); + + // Wait at least one full loop before attempting to send messages + // again. + if (sendErrorReset) { + sendErrorReset = false; + *sendError = false; + } else if (*sendError) { + sendErrorReset = true; + } + + wait(delay(kQueueSizeLogInterval)); + } +} + +struct UDPTracer : public ITracer { +protected: + // Serializes span fields as an array into the supplied TraceRequest + // buffer. + void serialize_span(const Span& span, TraceRequest& request) { + // If you change the serialization format here, make sure to update the + // fluentd filter to be able to correctly parse the updated format! See + // the msgpack specification for more info on the bit patterns used + // here. + uint8_t size = 8; + if (span.parents.size() == 0) --size; + request.write_byte(size | 0b10010000); // write as array + + serialize_string(g_network->getLocalAddress().toString(), request); // ip:port + + serialize_value(span.context.first(), request, 0xcf); // trace id + serialize_value(span.context.second(), request, 0xcf); // token (span id) + + serialize_value(span.begin, request, 0xcb); // start time + serialize_value(span.end - span.begin, request, 0xcb); // duration + + serialize_string(span.location.name.toString(), request); + + serialize_map(span.tags, request); + + serialize_vector(span.parents, request); + } + +private: + // Writes the given value in big-endian format to the request. Sets the + // first byte to msgpack_type. + template + inline void serialize_value(const T& val, TraceRequest& request, uint8_t msgpack_type) { + request.write_byte(msgpack_type); + + const uint8_t* p = reinterpret_cast(std::addressof(val)); + for (size_t i = 0; i < sizeof(T); ++i) { + request.write_byte(p[sizeof(T) - i - 1]); + } + } + + // Writes the given string to the request as a sequence of bytes. Inserts a + // format byte at the beginning of the string according to the its length, + // as specified by the msgpack specification. + inline void serialize_string(const uint8_t* c, int length, TraceRequest& request) { + if (length <= 31) { + // A size 0 string is ok. We still need to write a byte + // identifiying the item as a string, but can set the size to 0. + request.write_byte(static_cast(length) | 0b10100000); + } else if (length <= 255) { + request.write_byte(0xd9); + request.write_byte(static_cast(length)); + } else if (length <= 65535) { + request.write_byte(0xda); + request.write_byte(static_cast(length)); + } else { + // TODO: Add support for longer strings if necessary. + ASSERT(false); + } + + request.write_bytes(c, length); + } + + inline void serialize_string(const std::string& str, TraceRequest& request) { + serialize_string(reinterpret_cast(str.data()), str.size(), request); + } + + // Writes the given vector of SpanIDs to the request. If the vector is + // empty, the request is not modified. + inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { + int size = vec.size(); + if (size == 0) { + return; + } + + if (size <= 15) { + request.write_byte(static_cast(size) | 0b10010000); + } else if (size <= 65535) { + request.write_byte(0xdc); + request.write_byte(reinterpret_cast(&size)[1]); + request.write_byte(reinterpret_cast(&size)[0]); + } else { + // TODO: Add support for longer vectors if necessary. + ASSERT(false); + } + + for (const auto& parentContext : vec) { + serialize_value(parentContext.second(), request, 0xcf); + } + } + + inline void serialize_map(const std::unordered_map& map, TraceRequest& request) { + int size = map.size(); + + if (size <= 15) { + request.write_byte(static_cast(size) | 0b10000000); + } else { + // TODO: Add support for longer maps if necessary. + ASSERT(false); + } + + for (const auto& [key, value] : map) { + serialize_string(key.begin(), key.size(), request); + serialize_string(value.begin(), value.size(), request); + } + } +}; + +#ifndef WIN32 +struct AsyncUDPTracer : public UDPTracer { +public: + AsyncUDPTracer() : pending_messages_(0), send_error_(false) {} + + ~AsyncUDPTracer() override { + while (!buffers_.empty()) { + auto& request = buffers_.front(); + buffers_.pop(); + free(request.buffer); + } + } + + TracerType type() const override { return TracerType::NETWORK_ASYNC; } + + // Serializes the given span to msgpack format and sends the data via UDP. + void trace(Span const& span) override { + static std::once_flag once; + std::call_once(once, [&]() { + send_actor_ = traceSend(stream_.getFuture(), &buffers_, &pending_messages_, &send_error_); + log_actor_ = traceLog(&pending_messages_, &send_error_); + if (g_network->isSimulated()) { + udp_server_actor_ = simulationStartServer(); + } + }); + + if (span.location.name.size() == 0) { + return; + } + + // ASSERT(!send_actor_.isReady()); + // ASSERT(!log_actor_.isReady()); + + if (buffers_.empty()) { + buffers_.push(TraceRequest{ + .buffer = new uint8_t[kTraceBufferSize], + .data_size = 0, + .buffer_size = kTraceBufferSize + }); + } + + auto request = buffers_.front(); + buffers_.pop(); + + serialize_span(span, request); + + ++pending_messages_; + stream_.send(request); + } + +private: + // Sending data is asynchronous and it is necessary to keep the buffer + // around until the send completes. Therefore, multiple buffers may be + // needed at any one time to handle multiple send calls. + std::queue buffers_; + int pending_messages_; + bool send_error_; + + PromiseStream stream_; + Future send_actor_; + Future log_actor_; + Future udp_server_actor_; +}; + +ACTOR Future fastTraceLogger(int* unreadyMessages, int* failedMessages, int* totalMessages, bool* sendError) { + state bool sendErrorReset = false; + + loop { + TraceEvent("TracingSpanStats").detail("UnreadyMessages", *unreadyMessages) + .detail("FailedMessages", *failedMessages) + .detail("TotalMessages", *totalMessages) + .detail("SendError", *sendError); + + if (sendErrorReset) { + sendErrorReset = false; + *sendError = false; + } else if (*sendError) { + sendErrorReset = true; + } + + wait(delay(kQueueSizeLogInterval)); + } +} + +struct FastUDPTracer : public UDPTracer { + FastUDPTracer() : socket_fd_(-1), unready_socket_messages_(0), failed_messages_(0), total_messages_(0), send_error_(false) { + request_ = TraceRequest{ + .buffer = new uint8_t[kTraceBufferSize], + .data_size = 0, + .buffer_size = kTraceBufferSize + }; + } + + ~FastUDPTracer() { + free(request_.buffer); + } + + TracerType type() const override { return TracerType::NETWORK_LOSSY; } + + void trace(Span const& span) override { + static std::once_flag once; + std::call_once(once, [&]() { + log_actor_ = fastTraceLogger(&unready_socket_messages_, &failed_messages_, &total_messages_, &send_error_); + if (g_network->isSimulated()) { + udp_server_actor_ = simulationStartServer(); + } + + NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT)); + socket_ = INetworkConnections::net()->createUDPSocket(localAddress); + }); + + if (span.location.name.size() == 0) { + return; + } + + ++total_messages_; + if (!socket_.isReady()) { + ++unready_socket_messages_; + return; + } else if (socket_fd_ == -1) { + socket_fd_ = socket_.get()->native_handle(); + } + + if (send_error_) { + return; + } + + serialize_span(span, request_); + + int bytesSent = send(socket_fd_, request_.buffer, request_.data_size, MSG_DONTWAIT); + if (bytesSent == -1) { + // Will forgo checking errno here, and assume all error messages + // should be treated the same. + ++failed_messages_; + send_error_ = true; + } + request_.reset(); + } + +private: + TraceRequest request_; + + int unready_socket_messages_; + int failed_messages_; + int total_messages_; + + int socket_fd_; + bool send_error_; + + Future> socket_; + Future log_actor_; + Future udp_server_actor_; +}; +#endif + +ITracer* g_tracer = new NoopTracer(); + +} // namespace + +void openTracer(TracerType type) { + if (g_tracer->type() == type) { + return; + } + delete g_tracer; + switch (type) { + case TracerType::DISABLED: + g_tracer = new NoopTracer{}; + break; + case TracerType::LOG_FILE: + g_tracer = new LogfileTracer{}; + break; + case TracerType::NETWORK_ASYNC: +#ifndef WIN32 + g_tracer = new AsyncUDPTracer{}; +#endif + break; + case TracerType::NETWORK_LOSSY: +#ifndef WIN32 + g_tracer = new FastUDPTracer{}; +#endif + break; + case TracerType::END: + ASSERT(false); + break; + } +} + +ITracer::~ITracer() {} + +Span& Span::operator=(Span&& o) { + if (begin > 0.0) { + end = g_network->now(); + g_tracer->trace(*this); + } + arena = std::move(o.arena); + context = o.context; + begin = o.begin; + end = o.end; + location = o.location; + parents = std::move(o.parents); + o.begin = 0; + return *this; +} + +Span::~Span() { + if (begin > 0.0) { + end = g_network->now(); + g_tracer->trace(*this); + } +} diff --git a/flow/Tracing.cpp b/flow/Tracing.cpp deleted file mode 100644 index b44f1f847f..0000000000 --- a/flow/Tracing.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Tracing.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "flow/Tracing.h" - -#include - -namespace { - -struct NoopTracer : ITracer { - TracerType type() const { return TracerType::DISABLED; } - void trace(Span const& span) override {} -}; - -struct LogfileTracer : ITracer { - TracerType type() const { return TracerType::LOG_FILE; } - void trace(Span const& span) override { - if (g_network->isSimulated()) { - return; - } - TraceEvent te(SevInfo, "TracingSpan", span.context); - te.detail("Location", span.location.name) - .detail("Begin", format("%.6f", span.begin)) - .detail("End", format("%.6f", span.end)); - if (span.parents.size() == 1) { - te.detail("Parent", *span.parents.begin()); - } else { - for (auto parent : span.parents) { - TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent); - } - } - } -}; - -std::unique_ptr g_tracer = std::make_unique(); - -} // namespace - -void openTracer(TracerType type) { - if (g_tracer->type() == type) { - return; - } - switch (type) { - case TracerType::DISABLED: - g_tracer = std::make_unique(); - break; - case TracerType::LOG_FILE: - g_tracer = std::make_unique(); - break; - case TracerType::END: - ASSERT(false); - break; - } -} - -ITracer::~ITracer() {} - -Span& Span::operator=(Span&& o) { - if (begin > 0.0) { - end = g_network->now(); - g_tracer->trace(*this); - } - arena = std::move(o.arena); - context = o.context; - begin = o.begin; - end = o.end; - location = o.location; - parents = std::move(o.parents); - return *this; -} - -Span::~Span() { - if (begin > 0.0) { - end = g_network->now(); - g_tracer->trace(*this); - } -} diff --git a/flow/Tracing.h b/flow/Tracing.h index 05da261f98..c7c11241a2 100644 --- a/flow/Tracing.h +++ b/flow/Tracing.h @@ -35,7 +35,11 @@ inline Location operator"" _loc(const char* str, size_t size) { struct Span { Span(SpanID context, Location location, std::initializer_list const& parents = {}) - : context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {} + : context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) { + if (parents.size() > 0) { + this->context = SpanID((*parents.begin()).first(), context.second()); + } + } Span(Location location, std::initializer_list const& parents = {}) : Span(deterministicRandom()->randomUniqueID(), location, parents) {} Span(Location location, SpanID context) : Span(location, { context }) {} @@ -64,19 +68,36 @@ struct Span { std::swap(parents, other.parents); } - void addParent(SpanID span) { parents.push_back(arena, span); } + void addParent(SpanID span) { + if (parents.size() == 0) { + // Use first parent to set trace ID. This is non-ideal for spans + // with multiple parents, because the trace ID will associate the + // span with only one trace. A workaround is to look at the parent + // relationships instead of the trace ID. Another option in the + // future is to keep a list of trace IDs. + context = SpanID(span.first(), context.second()); + } + parents.push_back(arena, span); + } + + void addTag(const StringRef& key, const StringRef& value) { + tags[key] = value; + } Arena arena; UID context = UID(); double begin = 0.0, end = 0.0; Location location; SmallVectorRef parents; + std::unordered_map tags; }; enum class TracerType { DISABLED = 0, LOG_FILE = 1, - END = 2 + NETWORK_ASYNC = 2, + NETWORK_LOSSY = 3, + END = 4 }; struct ITracer { diff --git a/flow/network.h b/flow/network.h index e2cc38ee91..cccd48c7ac 100644 --- a/flow/network.h +++ b/flow/network.h @@ -574,6 +574,7 @@ public: virtual UID getDebugID() const = 0; virtual NetworkAddress localAddress() const = 0; + virtual boost::asio::ip::udp::socket::native_handle_type native_handle() = 0; }; class INetworkConnections {