Fix msgpack array size format

Was being written in little-endian, should be written in big-endian.
This commit is contained in:
Lukas Joswiak 2020-11-09 17:46:53 -08:00
parent e2f57b0845
commit abedd7a147
1 changed files with 25 additions and 9 deletions

View File

@ -73,7 +73,6 @@ struct TraceRequest {
void write_bytes(uint8_t* buf, std::size_t n) {
resize(n);
memcpy(buffer + data_size, buf, n);
data_size += n;
}
@ -123,15 +122,14 @@ ACTOR Future<Void> simulationStartServer() {
}
}
ACTOR Future<Void> traceSend(FutureStream<TraceRequest> inputStream, std::queue<TraceRequest>* buffers, int* pendingMessages) {
ACTOR Future<Void> traceSend(FutureStream<TraceRequest> inputStream, std::queue<TraceRequest>* buffers, int* pendingMessages, bool* sendError) {
state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(kUdpPort));
state Reference<IUDPSocket> socket = wait(INetworkConnections::net()->createUDPSocket(localAddress));
state bool sendError = false;
loop choose {
when(state TraceRequest request = waitNext(inputStream)) {
try {
if (!sendError) {
if (!(*sendError)) {
int bytesSent = wait(socket->send(request.buffer, request.buffer + request.data_size));
}
--(*pendingMessages);
@ -139,15 +137,27 @@ ACTOR Future<Void> traceSend(FutureStream<TraceRequest> inputStream, std::queue<
buffers->push(request);
} catch (Error& e) {
TraceEvent("TracingSpanSendError").detail("Error", e.what());
sendError = true;
*sendError = true;
}
}
}
}
ACTOR Future<Void> traceLog(int* pendingMessages) {
ACTOR Future<Void> traceLog(int* pendingMessages, bool* sendError) {
state bool sendErrorReset = false;
loop {
TraceEvent("TracingSpanQueueSize").detail("PendingMessages", *pendingMessages);
// Wait at least one full loop before attempting to send messages
// again.
if (sendErrorReset) {
sendErrorReset = false;
*sendError = false;
} else if (*sendError) {
sendErrorReset = true;
}
wait(delay(kQueueSizeLogInterval));
}
}
@ -169,8 +179,8 @@ public:
void trace(Span const& span) override {
static std::once_flag once;
std::call_once(once, [&]() {
send_actor_ = traceSend(stream_.getFuture(), &buffers_, &pending_messages_);
log_actor_ = traceLog(&pending_messages_);
send_actor_ = traceSend(stream_.getFuture(), &buffers_, &pending_messages_, &send_error_);
log_actor_ = traceLog(&pending_messages_, &send_error_);
if (g_network->isSimulated()) {
udp_server_actor_ = simulationStartServer();
}
@ -232,6 +242,10 @@ private:
// specified by the msgpack specification.
inline void serialize_string(const std::string& str, TraceRequest& request) {
int size = str.size();
if (size == 0) {
return;
}
if (size <= 31) {
request.write_byte((uint8_t) size | 0b10100000);
} else if (size <= 255) {
@ -259,7 +273,8 @@ private:
request.write_byte((uint8_t) size | 0b10010000);
} else if (size <= 65535) {
request.write_byte(0xdc);
request.write_bytes((uint8_t*) &size, 2);
request.write_byte(((uint8_t*) &size)[1]);
request.write_byte(((uint8_t*) &size)[0]);
} else {
// TODO: Add support for longer vectors if necessary.
ASSERT(false);
@ -275,6 +290,7 @@ private:
// needed at any one time to handle multiple trace calls.
std::queue<TraceRequest> buffers_;
int pending_messages_;
bool send_error_;
int total_buffers_; // TODO: This should be removed after performance testing is done
int total_messages_; // TODO: This should be removed after performance testing is done