Fix subsequence ordering issue
This commit is contained in:
parent
7403e49c33
commit
22bc2eb71c
|
@ -877,7 +877,6 @@ struct LogPushData : NonCopyable {
|
|||
void addTransactionInfo(SpanID const& context) {
|
||||
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
|
||||
spanContext = context;
|
||||
transactionSubseq = 0;
|
||||
writtenLocations.clear();
|
||||
}
|
||||
|
||||
|
@ -919,26 +918,28 @@ struct LogPushData : NonCopyable {
|
|||
|
||||
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
|
||||
|
||||
// Metadata messages should be written before span information. If this
|
||||
// isn't a metadata message, make sure all locations have had
|
||||
// transaction info written to them. Mutations may have different sets
|
||||
// of tags, so it is necessary to check all tag locations each time a
|
||||
// mutation is written.
|
||||
// Metadata messages (currently LogProtocolMessage is the only metadata
|
||||
// message) should be written before span information. If this isn't a
|
||||
// metadata message, make sure all locations have had transaction info
|
||||
// written to them. Mutations may have different sets of tags, so it
|
||||
// is necessary to check all tag locations each time a mutation is
|
||||
// written.
|
||||
if (!metadataMessage) {
|
||||
// If span information hasn't been written for this transaction yet,
|
||||
// generate a subsequence value for the message.
|
||||
if (!transactionSubseq) {
|
||||
transactionSubseq = this->subsequence++;
|
||||
}
|
||||
|
||||
uint32_t subseq = this->subsequence++;
|
||||
bool updatedLocation = false;
|
||||
for (int loc : msg_locations) {
|
||||
writeTransactionInfo(loc);
|
||||
updatedLocation = writeTransactionInfo(loc, subseq) || updatedLocation;
|
||||
}
|
||||
// If this message doesn't write to any new locations, the
|
||||
// subsequence wasn't actually used and can be decremented.
|
||||
if (!updatedLocation) {
|
||||
this->subsequence--;
|
||||
}
|
||||
} else {
|
||||
// When writing a metadata message, make sure transaction state has
|
||||
// been reset. If you are running into this assertion, make sure
|
||||
// you are calling addTransactionInfo before each transaction.
|
||||
ASSERT(transactionSubseq == 0);
|
||||
ASSERT(writtenLocations.size() == 0);
|
||||
}
|
||||
|
||||
uint32_t subseq = this->subsequence++;
|
||||
|
@ -980,33 +981,30 @@ private:
|
|||
// field.
|
||||
std::unordered_set<int> writtenLocations;
|
||||
uint32_t subsequence;
|
||||
// Store transaction subsequence separately, as multiple mutations may need
|
||||
// to write transaction info. This can happen if later mutations in a
|
||||
// transaction need to write to a different location than earlier
|
||||
// mutations.
|
||||
uint32_t transactionSubseq;
|
||||
SpanID spanContext;
|
||||
|
||||
// Writes transaction info to the message stream for the given location if
|
||||
// it has not already been written (for the current transaction).
|
||||
void writeTransactionInfo(int location) {
|
||||
if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6) {
|
||||
return;
|
||||
// Writes transaction info to the message stream at the given location if
|
||||
// it has not already been written (for the current transaction). Returns
|
||||
// true on a successful write, and false if the location has already been
|
||||
// written.
|
||||
bool writeTransactionInfo(int location, uint32_t subseq) {
|
||||
if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6 || writtenLocations.count(location) != 0) {
|
||||
return false;
|
||||
}
|
||||
if (writtenLocations.count(location) == 0) {
|
||||
writtenLocations.insert(location);
|
||||
|
||||
BinaryWriter& wr = messagesWriter[location];
|
||||
SpanContextMessage contextMessage(spanContext);
|
||||
writtenLocations.insert(location);
|
||||
|
||||
int offset = wr.getLength();
|
||||
wr << uint32_t(0) << transactionSubseq << uint16_t(prev_tags.size());
|
||||
for(auto& tag : prev_tags)
|
||||
wr << tag;
|
||||
wr << contextMessage;
|
||||
int length = wr.getLength() - offset;
|
||||
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
|
||||
}
|
||||
BinaryWriter& wr = messagesWriter[location];
|
||||
SpanContextMessage contextMessage(spanContext);
|
||||
|
||||
int offset = wr.getLength();
|
||||
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
|
||||
for(auto& tag : prev_tags)
|
||||
wr << tag;
|
||||
wr << contextMessage;
|
||||
int length = wr.getLength() - offset;
|
||||
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue