From 22bc2eb71c700864980a7b3ea00623d6dd410793 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 22 Oct 2020 17:29:36 -0700 Subject: [PATCH] Fix subsequence ordering issue --- fdbserver/LogSystem.h | 70 +++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index fd19e05d90..8ad4c4beb0 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -877,7 +877,6 @@ struct LogPushData : NonCopyable { void addTransactionInfo(SpanID const& context) { TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID spanContext = context; - transactionSubseq = 0; writtenLocations.clear(); } @@ -919,26 +918,28 @@ struct LogPushData : NonCopyable { BinaryWriter bw(AssumeVersion(currentProtocolVersion)); - // Metadata messages should be written before span information. If this - // isn't a metadata message, make sure all locations have had - // transaction info written to them. Mutations may have different sets - // of tags, so it is necessary to check all tag locations each time a - // mutation is written. + // Metadata messages (currently LogProtocolMessage is the only metadata + // message) should be written before span information. If this isn't a + // metadata message, make sure all locations have had transaction info + // written to them. Mutations may have different sets of tags, so it + // is necessary to check all tag locations each time a mutation is + // written. if (!metadataMessage) { - // If span information hasn't been written for this transaction yet, - // generate a subsequence value for the message. - if (!transactionSubseq) { - transactionSubseq = this->subsequence++; - } - + uint32_t subseq = this->subsequence++; + bool updatedLocation = false; for (int loc : msg_locations) { - writeTransactionInfo(loc); + updatedLocation = writeTransactionInfo(loc, subseq) || updatedLocation; + } + // If this message doesn't write to any new locations, the + // subsequence wasn't actually used and can be decremented. + if (!updatedLocation) { + this->subsequence--; } } else { // When writing a metadata message, make sure transaction state has // been reset. If you are running into this assertion, make sure // you are calling addTransactionInfo before each transaction. - ASSERT(transactionSubseq == 0); + ASSERT(writtenLocations.size() == 0); } uint32_t subseq = this->subsequence++; @@ -980,33 +981,30 @@ private: // field. std::unordered_set writtenLocations; uint32_t subsequence; - // Store transaction subsequence separately, as multiple mutations may need - // to write transaction info. This can happen if later mutations in a - // transaction need to write to a different location than earlier - // mutations. - uint32_t transactionSubseq; SpanID spanContext; - // Writes transaction info to the message stream for the given location if - // it has not already been written (for the current transaction). - void writeTransactionInfo(int location) { - if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6) { - return; + // Writes transaction info to the message stream at the given location if + // it has not already been written (for the current transaction). Returns + // true on a successful write, and false if the location has already been + // written. + bool writeTransactionInfo(int location, uint32_t subseq) { + if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6 || writtenLocations.count(location) != 0) { + return false; } - if (writtenLocations.count(location) == 0) { - writtenLocations.insert(location); - BinaryWriter& wr = messagesWriter[location]; - SpanContextMessage contextMessage(spanContext); + writtenLocations.insert(location); - int offset = wr.getLength(); - wr << uint32_t(0) << transactionSubseq << uint16_t(prev_tags.size()); - for(auto& tag : prev_tags) - wr << tag; - wr << contextMessage; - int length = wr.getLength() - offset; - *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); - } + BinaryWriter& wr = messagesWriter[location]; + SpanContextMessage contextMessage(spanContext); + + int offset = wr.getLength(); + wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); + for(auto& tag : prev_tags) + wr << tag; + wr << contextMessage; + int length = wr.getLength() - offset; + *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); + return true; } };