Merge remote-tracking branch 'origin/master' into misc-changes
This commit is contained in:
commit
fb9b4b7626
|
@ -1913,6 +1913,7 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
{
|
||||
state Version ver = wait( version );
|
||||
state Span span("NAPI:getValue"_loc, info.spanID);
|
||||
span.addTag("key"_sr, key);
|
||||
cx->validateVersion(ver);
|
||||
|
||||
loop {
|
||||
|
|
|
@ -1891,6 +1891,10 @@ public:
|
|||
return _localAddress;
|
||||
}
|
||||
|
||||
boost::asio::ip::udp::socket::native_handle_type native_handle() override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Future<Reference<IUDPSocket>> Sim2::createUDPSocket(NetworkAddress toAddr) {
|
||||
|
|
|
@ -1241,6 +1241,10 @@ private:
|
|||
openTracer(TracerType::DISABLED);
|
||||
} else if (tracer == "logfile" || tracer == "file" || tracer == "log_file") {
|
||||
openTracer(TracerType::LOG_FILE);
|
||||
} else if (tracer == "network_async") {
|
||||
openTracer(TracerType::NETWORK_ASYNC);
|
||||
} else if (tracer == "network_lossy") {
|
||||
openTracer(TracerType::NETWORK_LOSSY);
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: Unknown or unsupported tracer: `%s'", args.OptionArg());
|
||||
printHelpTeaser(argv[0]);
|
||||
|
|
|
@ -971,6 +971,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version versi
|
|||
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
||||
state int64_t resultSize = 0;
|
||||
Span span("SS:getValue"_loc, { req.spanContext });
|
||||
span.addTag("key"_sr, req.key);
|
||||
|
||||
try {
|
||||
++data->counters.getValueQueries;
|
||||
|
@ -2876,7 +2877,6 @@ private:
|
|||
ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
||||
{
|
||||
state double start;
|
||||
state Span span("SS:update"_loc);
|
||||
try {
|
||||
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
|
||||
// This is often referred to as the storage server e-brake (emergency brake)
|
||||
|
@ -3019,6 +3019,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
|
||||
state Version ver = invalidVersion;
|
||||
cloneCursor2->setProtocolVersion(data->logProtocol);
|
||||
state SpanID spanContext = SpanID();
|
||||
for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
|
||||
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
||||
mutationBytes = 0;
|
||||
|
@ -3048,12 +3049,15 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
|
||||
SpanContextMessage scm;
|
||||
rd >> scm;
|
||||
span.addParent(scm.spanContext);
|
||||
spanContext = scm.spanContext;
|
||||
}
|
||||
else {
|
||||
MutationRef msg;
|
||||
rd >> msg;
|
||||
|
||||
Span span("SS:update"_loc, { spanContext });
|
||||
span.addTag("key"_sr, msg.param1);
|
||||
|
||||
if (ver != invalidVersion) { // This change belongs to a version < minVersion
|
||||
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
|
||||
if (ver == 1) {
|
||||
|
|
|
@ -64,7 +64,7 @@ set(FLOW_SRCS
|
|||
Trace.cpp
|
||||
Trace.h
|
||||
Tracing.h
|
||||
Tracing.cpp
|
||||
Tracing.actor.cpp
|
||||
TreeBenchmark.h
|
||||
UnitTest.cpp
|
||||
UnitTest.h
|
||||
|
|
|
@ -62,6 +62,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
|
|||
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
|
||||
|
||||
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
|
||||
init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option.
|
||||
|
||||
//connectionMonitor
|
||||
init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0;
|
||||
|
|
|
@ -70,6 +70,7 @@ public:
|
|||
double HUGE_ARENA_LOGGING_INTERVAL;
|
||||
|
||||
bool WRITE_TRACING_ENABLED;
|
||||
int TRACING_UDP_LISTENER_PORT;
|
||||
|
||||
//run loop profiling
|
||||
double RUN_LOOP_PROFILING_INTERVAL;
|
||||
|
|
|
@ -650,11 +650,15 @@ public:
|
|||
void addref() override { ReferenceCounted<UDPSocket>::addref(); }
|
||||
void delref() override { ReferenceCounted<UDPSocket>::delref(); }
|
||||
|
||||
NetworkAddress localAddress() const {
|
||||
NetworkAddress localAddress() const override {
|
||||
auto endpoint = socket.local_endpoint();
|
||||
return NetworkAddress(toIPAddress(endpoint.address()), endpoint.port(), isPublic, false);
|
||||
}
|
||||
|
||||
boost::asio::ip::udp::socket::native_handle_type native_handle() override {
|
||||
return socket.native_handle();
|
||||
}
|
||||
|
||||
private:
|
||||
UDPSocket(boost::asio::io_service& io_service, Optional<NetworkAddress> toAddress, bool isV6)
|
||||
: id(nondeterministicRandom()->randomUniqueID()), socket(io_service, isV6 ? udp::v6() : udp::v4()) {
|
||||
|
|
|
@ -0,0 +1,486 @@
|
|||
/*
|
||||
* Tracing.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/Tracing.h"
|
||||
|
||||
#include "flow/network.h"
|
||||
|
||||
#include <functional>
|
||||
#include <iomanip>
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace {
|
||||
|
||||
// Initial size of buffer used to store serialized traces. Buffer will be
|
||||
// resized when necessary.
|
||||
constexpr int kTraceBufferSize = 1024;
|
||||
|
||||
// The time interval between each report of the tracer queue size (seconds).
|
||||
constexpr float kQueueSizeLogInterval = 5.0;
|
||||
|
||||
struct NoopTracer : ITracer {
|
||||
TracerType type() const { return TracerType::DISABLED; }
|
||||
void trace(Span const& span) override {}
|
||||
};
|
||||
|
||||
struct LogfileTracer : ITracer {
|
||||
TracerType type() const override { return TracerType::LOG_FILE; }
|
||||
void trace(Span const& span) override {
|
||||
TraceEvent te(SevInfo, "TracingSpan", span.context);
|
||||
te.detail("Location", span.location.name)
|
||||
.detail("Begin", format("%.6f", span.begin))
|
||||
.detail("End", format("%.6f", span.end));
|
||||
if (span.parents.size() == 1) {
|
||||
te.detail("Parent", *span.parents.begin());
|
||||
} else {
|
||||
for (auto parent : span.parents) {
|
||||
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
|
||||
}
|
||||
}
|
||||
for (const auto& [key, value] : span.tags) {
|
||||
TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct TraceRequest {
|
||||
uint8_t* buffer;
|
||||
// Amount of data in buffer (bytes).
|
||||
std::size_t data_size;
|
||||
// Size of buffer (bytes).
|
||||
std::size_t buffer_size;
|
||||
|
||||
void write_byte(uint8_t byte) {
|
||||
write_bytes(&byte, 1);
|
||||
}
|
||||
|
||||
void write_bytes(const uint8_t* buf, std::size_t n) {
|
||||
resize(n);
|
||||
std::copy(buf, buf + n, buffer + data_size);
|
||||
data_size += n;
|
||||
}
|
||||
|
||||
void resize(std::size_t n) {
|
||||
if (data_size + n <= buffer_size) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::size_t size = buffer_size;
|
||||
while (size < data_size + n) {
|
||||
size *= 2;
|
||||
}
|
||||
|
||||
TraceEvent(SevInfo, "TracingSpanResizedBuffer").detail("OldSize", buffer_size).detail("NewSize", size);
|
||||
uint8_t* new_buffer = new uint8_t[size];
|
||||
std::copy(buffer, buffer + data_size, new_buffer);
|
||||
free(buffer);
|
||||
buffer = new_buffer;
|
||||
buffer_size = size;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
data_size = 0;
|
||||
}
|
||||
};
|
||||
|
||||
// A server listening for UDP trace messages, run only in simulation.
|
||||
ACTOR Future<Void> simulationStartServer() {
|
||||
TraceEvent(SevInfo, "UDPServerStarted").detail("Port", FLOW_KNOBS->TRACING_UDP_LISTENER_PORT);
|
||||
state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT));
|
||||
state Reference<IUDPSocket> serverSocket = wait(INetworkConnections::net()->createUDPSocket(localAddress));
|
||||
serverSocket->bind(localAddress);
|
||||
|
||||
state Standalone<StringRef> packetString = makeString(IUDPSocket::MAX_PACKET_SIZE);
|
||||
state uint8_t* packet = mutateString(packetString);
|
||||
|
||||
loop {
|
||||
int size = wait(serverSocket->receive(packet, packet + IUDPSocket::MAX_PACKET_SIZE));
|
||||
auto message = packetString.substr(0, size);
|
||||
|
||||
// For now, just check the first byte in the message matches. Data is
|
||||
// currently written as an array, so first byte should match msgpack
|
||||
// array notation. In the future, the entire message should be
|
||||
// deserialized to make sure all data is written correctly.
|
||||
ASSERT(message[0] == (4 | 0b10010000) || (5 | 0b10010000));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> traceSend(FutureStream<TraceRequest> inputStream, std::queue<TraceRequest>* buffers, int* pendingMessages, bool* sendError) {
|
||||
state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT));
|
||||
state Reference<IUDPSocket> socket = wait(INetworkConnections::net()->createUDPSocket(localAddress));
|
||||
|
||||
loop choose {
|
||||
when(state TraceRequest request = waitNext(inputStream)) {
|
||||
try {
|
||||
if (!(*sendError)) {
|
||||
int bytesSent = wait(socket->send(request.buffer, request.buffer + request.data_size));
|
||||
TEST(bytesSent > 0); // Successfully sent serialized trace
|
||||
}
|
||||
--(*pendingMessages);
|
||||
request.reset();
|
||||
buffers->push(request);
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TracingSpanSendError").detail("Error", e.what());
|
||||
*sendError = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Runs on an interval, printing debug information and performing other
|
||||
// connection tasks.
|
||||
ACTOR Future<Void> traceLog(int* pendingMessages, bool* sendError) {
|
||||
state bool sendErrorReset = false;
|
||||
|
||||
loop {
|
||||
TraceEvent("TracingSpanQueueSize").detail("PendingMessages", *pendingMessages);
|
||||
|
||||
// Wait at least one full loop before attempting to send messages
|
||||
// again.
|
||||
if (sendErrorReset) {
|
||||
sendErrorReset = false;
|
||||
*sendError = false;
|
||||
} else if (*sendError) {
|
||||
sendErrorReset = true;
|
||||
}
|
||||
|
||||
wait(delay(kQueueSizeLogInterval));
|
||||
}
|
||||
}
|
||||
|
||||
struct UDPTracer : public ITracer {
|
||||
protected:
|
||||
// Serializes span fields as an array into the supplied TraceRequest
|
||||
// buffer.
|
||||
void serialize_span(const Span& span, TraceRequest& request) {
|
||||
// If you change the serialization format here, make sure to update the
|
||||
// fluentd filter to be able to correctly parse the updated format! See
|
||||
// the msgpack specification for more info on the bit patterns used
|
||||
// here.
|
||||
uint8_t size = 8;
|
||||
if (span.parents.size() == 0) --size;
|
||||
request.write_byte(size | 0b10010000); // write as array
|
||||
|
||||
serialize_string(g_network->getLocalAddress().toString(), request); // ip:port
|
||||
|
||||
serialize_value(span.context.first(), request, 0xcf); // trace id
|
||||
serialize_value(span.context.second(), request, 0xcf); // token (span id)
|
||||
|
||||
serialize_value(span.begin, request, 0xcb); // start time
|
||||
serialize_value(span.end - span.begin, request, 0xcb); // duration
|
||||
|
||||
serialize_string(span.location.name.toString(), request);
|
||||
|
||||
serialize_map(span.tags, request);
|
||||
|
||||
serialize_vector(span.parents, request);
|
||||
}
|
||||
|
||||
private:
|
||||
// Writes the given value in big-endian format to the request. Sets the
|
||||
// first byte to msgpack_type.
|
||||
template <typename T>
|
||||
inline void serialize_value(const T& val, TraceRequest& request, uint8_t msgpack_type) {
|
||||
request.write_byte(msgpack_type);
|
||||
|
||||
const uint8_t* p = reinterpret_cast<const uint8_t*>(std::addressof(val));
|
||||
for (size_t i = 0; i < sizeof(T); ++i) {
|
||||
request.write_byte(p[sizeof(T) - i - 1]);
|
||||
}
|
||||
}
|
||||
|
||||
// Writes the given string to the request as a sequence of bytes. Inserts a
|
||||
// format byte at the beginning of the string according to the its length,
|
||||
// as specified by the msgpack specification.
|
||||
inline void serialize_string(const uint8_t* c, int length, TraceRequest& request) {
|
||||
if (length <= 31) {
|
||||
// A size 0 string is ok. We still need to write a byte
|
||||
// identifiying the item as a string, but can set the size to 0.
|
||||
request.write_byte(static_cast<uint8_t>(length) | 0b10100000);
|
||||
} else if (length <= 255) {
|
||||
request.write_byte(0xd9);
|
||||
request.write_byte(static_cast<uint8_t>(length));
|
||||
} else if (length <= 65535) {
|
||||
request.write_byte(0xda);
|
||||
request.write_byte(static_cast<uint16_t>(length));
|
||||
} else {
|
||||
// TODO: Add support for longer strings if necessary.
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
request.write_bytes(c, length);
|
||||
}
|
||||
|
||||
inline void serialize_string(const std::string& str, TraceRequest& request) {
|
||||
serialize_string(reinterpret_cast<const uint8_t*>(str.data()), str.size(), request);
|
||||
}
|
||||
|
||||
// Writes the given vector of SpanIDs to the request. If the vector is
|
||||
// empty, the request is not modified.
|
||||
inline void serialize_vector(const SmallVectorRef<SpanID>& vec, TraceRequest& request) {
|
||||
int size = vec.size();
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (size <= 15) {
|
||||
request.write_byte(static_cast<uint8_t>(size) | 0b10010000);
|
||||
} else if (size <= 65535) {
|
||||
request.write_byte(0xdc);
|
||||
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[1]);
|
||||
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[0]);
|
||||
} else {
|
||||
// TODO: Add support for longer vectors if necessary.
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
for (const auto& parentContext : vec) {
|
||||
serialize_value(parentContext.second(), request, 0xcf);
|
||||
}
|
||||
}
|
||||
|
||||
inline void serialize_map(const std::unordered_map<StringRef, StringRef>& map, TraceRequest& request) {
|
||||
int size = map.size();
|
||||
|
||||
if (size <= 15) {
|
||||
request.write_byte(static_cast<uint8_t>(size) | 0b10000000);
|
||||
} else {
|
||||
// TODO: Add support for longer maps if necessary.
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
for (const auto& [key, value] : map) {
|
||||
serialize_string(key.begin(), key.size(), request);
|
||||
serialize_string(value.begin(), value.size(), request);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#ifndef WIN32
|
||||
struct AsyncUDPTracer : public UDPTracer {
|
||||
public:
|
||||
AsyncUDPTracer() : pending_messages_(0), send_error_(false) {}
|
||||
|
||||
~AsyncUDPTracer() override {
|
||||
while (!buffers_.empty()) {
|
||||
auto& request = buffers_.front();
|
||||
buffers_.pop();
|
||||
free(request.buffer);
|
||||
}
|
||||
}
|
||||
|
||||
TracerType type() const override { return TracerType::NETWORK_ASYNC; }
|
||||
|
||||
// Serializes the given span to msgpack format and sends the data via UDP.
|
||||
void trace(Span const& span) override {
|
||||
static std::once_flag once;
|
||||
std::call_once(once, [&]() {
|
||||
send_actor_ = traceSend(stream_.getFuture(), &buffers_, &pending_messages_, &send_error_);
|
||||
log_actor_ = traceLog(&pending_messages_, &send_error_);
|
||||
if (g_network->isSimulated()) {
|
||||
udp_server_actor_ = simulationStartServer();
|
||||
}
|
||||
});
|
||||
|
||||
if (span.location.name.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// ASSERT(!send_actor_.isReady());
|
||||
// ASSERT(!log_actor_.isReady());
|
||||
|
||||
if (buffers_.empty()) {
|
||||
buffers_.push(TraceRequest{
|
||||
.buffer = new uint8_t[kTraceBufferSize],
|
||||
.data_size = 0,
|
||||
.buffer_size = kTraceBufferSize
|
||||
});
|
||||
}
|
||||
|
||||
auto request = buffers_.front();
|
||||
buffers_.pop();
|
||||
|
||||
serialize_span(span, request);
|
||||
|
||||
++pending_messages_;
|
||||
stream_.send(request);
|
||||
}
|
||||
|
||||
private:
|
||||
// Sending data is asynchronous and it is necessary to keep the buffer
|
||||
// around until the send completes. Therefore, multiple buffers may be
|
||||
// needed at any one time to handle multiple send calls.
|
||||
std::queue<TraceRequest> buffers_;
|
||||
int pending_messages_;
|
||||
bool send_error_;
|
||||
|
||||
PromiseStream<TraceRequest> stream_;
|
||||
Future<Void> send_actor_;
|
||||
Future<Void> log_actor_;
|
||||
Future<Void> udp_server_actor_;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> fastTraceLogger(int* unreadyMessages, int* failedMessages, int* totalMessages, bool* sendError) {
|
||||
state bool sendErrorReset = false;
|
||||
|
||||
loop {
|
||||
TraceEvent("TracingSpanStats").detail("UnreadyMessages", *unreadyMessages)
|
||||
.detail("FailedMessages", *failedMessages)
|
||||
.detail("TotalMessages", *totalMessages)
|
||||
.detail("SendError", *sendError);
|
||||
|
||||
if (sendErrorReset) {
|
||||
sendErrorReset = false;
|
||||
*sendError = false;
|
||||
} else if (*sendError) {
|
||||
sendErrorReset = true;
|
||||
}
|
||||
|
||||
wait(delay(kQueueSizeLogInterval));
|
||||
}
|
||||
}
|
||||
|
||||
struct FastUDPTracer : public UDPTracer {
|
||||
FastUDPTracer() : socket_fd_(-1), unready_socket_messages_(0), failed_messages_(0), total_messages_(0), send_error_(false) {
|
||||
request_ = TraceRequest{
|
||||
.buffer = new uint8_t[kTraceBufferSize],
|
||||
.data_size = 0,
|
||||
.buffer_size = kTraceBufferSize
|
||||
};
|
||||
}
|
||||
|
||||
~FastUDPTracer() {
|
||||
free(request_.buffer);
|
||||
}
|
||||
|
||||
TracerType type() const override { return TracerType::NETWORK_LOSSY; }
|
||||
|
||||
void trace(Span const& span) override {
|
||||
static std::once_flag once;
|
||||
std::call_once(once, [&]() {
|
||||
log_actor_ = fastTraceLogger(&unready_socket_messages_, &failed_messages_, &total_messages_, &send_error_);
|
||||
if (g_network->isSimulated()) {
|
||||
udp_server_actor_ = simulationStartServer();
|
||||
}
|
||||
|
||||
NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(FLOW_KNOBS->TRACING_UDP_LISTENER_PORT));
|
||||
socket_ = INetworkConnections::net()->createUDPSocket(localAddress);
|
||||
});
|
||||
|
||||
if (span.location.name.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
++total_messages_;
|
||||
if (!socket_.isReady()) {
|
||||
++unready_socket_messages_;
|
||||
return;
|
||||
} else if (socket_fd_ == -1) {
|
||||
socket_fd_ = socket_.get()->native_handle();
|
||||
}
|
||||
|
||||
if (send_error_) {
|
||||
return;
|
||||
}
|
||||
|
||||
serialize_span(span, request_);
|
||||
|
||||
int bytesSent = send(socket_fd_, request_.buffer, request_.data_size, MSG_DONTWAIT);
|
||||
if (bytesSent == -1) {
|
||||
// Will forgo checking errno here, and assume all error messages
|
||||
// should be treated the same.
|
||||
++failed_messages_;
|
||||
send_error_ = true;
|
||||
}
|
||||
request_.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
TraceRequest request_;
|
||||
|
||||
int unready_socket_messages_;
|
||||
int failed_messages_;
|
||||
int total_messages_;
|
||||
|
||||
int socket_fd_;
|
||||
bool send_error_;
|
||||
|
||||
Future<Reference<IUDPSocket>> socket_;
|
||||
Future<Void> log_actor_;
|
||||
Future<Void> udp_server_actor_;
|
||||
};
|
||||
#endif
|
||||
|
||||
ITracer* g_tracer = new NoopTracer();
|
||||
|
||||
} // namespace
|
||||
|
||||
void openTracer(TracerType type) {
|
||||
if (g_tracer->type() == type) {
|
||||
return;
|
||||
}
|
||||
delete g_tracer;
|
||||
switch (type) {
|
||||
case TracerType::DISABLED:
|
||||
g_tracer = new NoopTracer{};
|
||||
break;
|
||||
case TracerType::LOG_FILE:
|
||||
g_tracer = new LogfileTracer{};
|
||||
break;
|
||||
case TracerType::NETWORK_ASYNC:
|
||||
#ifndef WIN32
|
||||
g_tracer = new AsyncUDPTracer{};
|
||||
#endif
|
||||
break;
|
||||
case TracerType::NETWORK_LOSSY:
|
||||
#ifndef WIN32
|
||||
g_tracer = new FastUDPTracer{};
|
||||
#endif
|
||||
break;
|
||||
case TracerType::END:
|
||||
ASSERT(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ITracer::~ITracer() {}
|
||||
|
||||
Span& Span::operator=(Span&& o) {
|
||||
if (begin > 0.0) {
|
||||
end = g_network->now();
|
||||
g_tracer->trace(*this);
|
||||
}
|
||||
arena = std::move(o.arena);
|
||||
context = o.context;
|
||||
begin = o.begin;
|
||||
end = o.end;
|
||||
location = o.location;
|
||||
parents = std::move(o.parents);
|
||||
o.begin = 0;
|
||||
return *this;
|
||||
}
|
||||
|
||||
Span::~Span() {
|
||||
if (begin > 0.0) {
|
||||
end = g_network->now();
|
||||
g_tracer->trace(*this);
|
||||
}
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
/*
|
||||
* Tracing.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/Tracing.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace {
|
||||
|
||||
struct NoopTracer : ITracer {
|
||||
TracerType type() const { return TracerType::DISABLED; }
|
||||
void trace(Span const& span) override {}
|
||||
};
|
||||
|
||||
struct LogfileTracer : ITracer {
|
||||
TracerType type() const { return TracerType::LOG_FILE; }
|
||||
void trace(Span const& span) override {
|
||||
if (g_network->isSimulated()) {
|
||||
return;
|
||||
}
|
||||
TraceEvent te(SevInfo, "TracingSpan", span.context);
|
||||
te.detail("Location", span.location.name)
|
||||
.detail("Begin", format("%.6f", span.begin))
|
||||
.detail("End", format("%.6f", span.end));
|
||||
if (span.parents.size() == 1) {
|
||||
te.detail("Parent", *span.parents.begin());
|
||||
} else {
|
||||
for (auto parent : span.parents) {
|
||||
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<ITracer> g_tracer = std::make_unique<NoopTracer>();
|
||||
|
||||
} // namespace
|
||||
|
||||
void openTracer(TracerType type) {
|
||||
if (g_tracer->type() == type) {
|
||||
return;
|
||||
}
|
||||
switch (type) {
|
||||
case TracerType::DISABLED:
|
||||
g_tracer = std::make_unique<NoopTracer>();
|
||||
break;
|
||||
case TracerType::LOG_FILE:
|
||||
g_tracer = std::make_unique<LogfileTracer>();
|
||||
break;
|
||||
case TracerType::END:
|
||||
ASSERT(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ITracer::~ITracer() {}
|
||||
|
||||
Span& Span::operator=(Span&& o) {
|
||||
if (begin > 0.0) {
|
||||
end = g_network->now();
|
||||
g_tracer->trace(*this);
|
||||
}
|
||||
arena = std::move(o.arena);
|
||||
context = o.context;
|
||||
begin = o.begin;
|
||||
end = o.end;
|
||||
location = o.location;
|
||||
parents = std::move(o.parents);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Span::~Span() {
|
||||
if (begin > 0.0) {
|
||||
end = g_network->now();
|
||||
g_tracer->trace(*this);
|
||||
}
|
||||
}
|
|
@ -35,7 +35,11 @@ inline Location operator"" _loc(const char* str, size_t size) {
|
|||
|
||||
struct Span {
|
||||
Span(SpanID context, Location location, std::initializer_list<SpanID> const& parents = {})
|
||||
: context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {}
|
||||
: context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {
|
||||
if (parents.size() > 0) {
|
||||
this->context = SpanID((*parents.begin()).first(), context.second());
|
||||
}
|
||||
}
|
||||
Span(Location location, std::initializer_list<SpanID> const& parents = {})
|
||||
: Span(deterministicRandom()->randomUniqueID(), location, parents) {}
|
||||
Span(Location location, SpanID context) : Span(location, { context }) {}
|
||||
|
@ -64,19 +68,36 @@ struct Span {
|
|||
std::swap(parents, other.parents);
|
||||
}
|
||||
|
||||
void addParent(SpanID span) { parents.push_back(arena, span); }
|
||||
void addParent(SpanID span) {
|
||||
if (parents.size() == 0) {
|
||||
// Use first parent to set trace ID. This is non-ideal for spans
|
||||
// with multiple parents, because the trace ID will associate the
|
||||
// span with only one trace. A workaround is to look at the parent
|
||||
// relationships instead of the trace ID. Another option in the
|
||||
// future is to keep a list of trace IDs.
|
||||
context = SpanID(span.first(), context.second());
|
||||
}
|
||||
parents.push_back(arena, span);
|
||||
}
|
||||
|
||||
void addTag(const StringRef& key, const StringRef& value) {
|
||||
tags[key] = value;
|
||||
}
|
||||
|
||||
Arena arena;
|
||||
UID context = UID();
|
||||
double begin = 0.0, end = 0.0;
|
||||
Location location;
|
||||
SmallVectorRef<SpanID> parents;
|
||||
std::unordered_map<StringRef, StringRef> tags;
|
||||
};
|
||||
|
||||
enum class TracerType {
|
||||
DISABLED = 0,
|
||||
LOG_FILE = 1,
|
||||
END = 2
|
||||
NETWORK_ASYNC = 2,
|
||||
NETWORK_LOSSY = 3,
|
||||
END = 4
|
||||
};
|
||||
|
||||
struct ITracer {
|
||||
|
|
|
@ -574,6 +574,7 @@ public:
|
|||
|
||||
virtual UID getDebugID() const = 0;
|
||||
virtual NetworkAddress localAddress() const = 0;
|
||||
virtual boost::asio::ip::udp::socket::native_handle_type native_handle() = 0;
|
||||
};
|
||||
|
||||
class INetworkConnections {
|
||||
|
|
Loading…
Reference in New Issue