Merge pull request #3748 from sfc-gh-ljoswiak/visibility-2

Add TLogVersion::V6
This commit is contained in:
Trevor Clinkenbeard 2020-10-14 17:35:32 -07:00 committed by GitHub
commit 24ea35e56f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 257 additions and 63 deletions

View File

@ -166,7 +166,8 @@ struct CommitTransactionRequest : TimedRequest {
Optional<ClientTrCommitCostEstimation> commitCostEstimation;
Optional<TagSet> tagSet;
CommitTransactionRequest() : flags(0) {}
CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {}
CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {}
template <class Ar>
void serialize(Ar& ar) {

View File

@ -49,6 +49,7 @@ static const char* typeString[] = { "SetValue",
"MinV2",
"AndV2",
"CompareAndClear",
"Reserved_For_SpanContextMessage",
"MAX_ATOMIC_OP" };
struct MutationRef {
@ -75,6 +76,7 @@ struct MutationRef {
MinV2,
AndV2,
CompareAndClear,
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.

View File

@ -754,14 +754,16 @@ struct TLogVersion {
// V3 was the introduction of spill by reference;
// V4 changed how data gets written to satellite TLogs so that we can peek from them;
// V5 merged reference and value spilling
// V6 added span context to list of serialized mutations sent from proxy to tlogs
// V1 = 1, // 4.6 is dispatched to via 6.0
V2 = 2, // 6.0
V3 = 3, // 6.1
V4 = 4, // 6.2
V5 = 5, // 6.3
V6 = 6, // 7.0
MIN_SUPPORTED = V2,
MAX_SUPPORTED = V5,
MIN_RECRUITABLE = V4,
MAX_SUPPORTED = V6,
MIN_RECRUITABLE = V5,
DEFAULT = V5,
} version;
@ -784,6 +786,7 @@ struct TLogVersion {
if (s == LiteralStringRef("3")) return V3;
if (s == LiteralStringRef("4")) return V4;
if (s == LiteralStringRef("5")) return V5;
if (s == LiteralStringRef("6")) return V6;
return default_error_or();
}
};

View File

@ -2699,7 +2699,7 @@ void debugAddTags(Transaction *tr) {
Transaction::Transaction(Database const& cx)
: cx(cx), info(cx->taskID, deterministicRandom()->randomUniqueID()), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF),
committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0),
trLogInfo(createTrLogInfoProbabilistically(cx)), span(info.spanID, "Transaction"_loc) {
trLogInfo(createTrLogInfoProbabilistically(cx)), tr(info.spanID), span(info.spanID, "Transaction"_loc) {
if (DatabaseContext::debugUseTags) {
debugAddTags(this);
}

View File

@ -45,16 +45,20 @@ Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInf
// It is incredibly important that any modifications to txnStateStore are done in such a way that
// the same operations will be done on all commit proxies at the same time. Otherwise, the data
// stored in txnStateStore will become corrupted.
void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange,
Reference<ILogSystem> logSystem, Version popVersion,
KeyRangeMap<std::set<Key>>* vecBackupKeys, KeyRangeMap<ServerCacheInfo>* keyInfo,
KeyRangeMap<bool>* cacheInfo, std::map<Key, ApplyMutationsData>* uid_applyMutationsData,
RequestStream<CommitTransactionRequest> commit, Database cx, NotifiedVersion* commitVersion,
std::map<UID, Reference<StorageInfo>>* storageCache, std::map<Tag, Version>* tag_popped,
bool initialCommit) {
void applyMetadataMutations(SpanID const& spanContext, UID const& dbgid, Arena& arena,
VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore,
LogPushData* toCommit, bool& confChange, Reference<ILogSystem> logSystem,
Version popVersion, KeyRangeMap<std::set<Key>>* vecBackupKeys,
KeyRangeMap<ServerCacheInfo>* keyInfo, KeyRangeMap<bool>* cacheInfo, std::map<Key,
ApplyMutationsData>* uid_applyMutationsData, RequestStream<CommitTransactionRequest> commit,
Database cx, NotifiedVersion* commitVersion, std::map<UID, Reference<StorageInfo>>* storageCache,
std::map<Tag, Version>* tag_popped, bool initialCommit) {
//std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
if (toCommit) {
toCommit->addTransactionInfo(spanContext);
}
for (auto const& m : mutations) {
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
@ -102,7 +106,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
.detail("TagKey", serverTagKeyFor( serverKeysDecodeServer(m.param1) )).detail("Tag", decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor( serverKeysDecodeServer(m.param1) ) ).get().get() ).toString());
toCommit->addTag( decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor( serverKeysDecodeServer(m.param1) ) ).get().get() ) );
toCommit->addTypedMessage(privatized);
toCommit->writeTypedMessage(privatized);
}
} else if (m.param1.startsWith(serverTagPrefix)) {
UID id = decodeServerTagKey(m.param1);
@ -114,9 +118,9 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString());
toCommit->addTag(tag);
toCommit->addTypedMessage(LogProtocolMessage());
toCommit->writeTypedMessage(LogProtocolMessage(), true);
toCommit->addTag(tag);
toCommit->addTypedMessage(privatized);
toCommit->writeTypedMessage(privatized);
}
if(!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
@ -168,7 +172,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
toCommit->addTag( cacheTag );
toCommit->addTypedMessage(privatized);
toCommit->writeTypedMessage(privatized);
}
}
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
@ -285,13 +289,13 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
if (m.param1 == lastEpochEndKey) {
toCommit->addTags(allTags);
toCommit->addTypedMessage(LogProtocolMessage());
toCommit->writeTypedMessage(LogProtocolMessage(), true);
}
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTags(allTags);
toCommit->addTypedMessage(privatized);
toCommit->writeTypedMessage(privatized);
}
}
else if (m.param1 == minRequiredCommitVersionKey) {
@ -350,7 +354,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(kv.value));
toCommit->addTypedMessage(privatized);
toCommit->writeTypedMessage(privatized);
}
}
}
@ -544,35 +548,35 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
// Add the tags to both begin and end mutations
toCommit->addTags(allTags);
toCommit->addTypedMessage(mutationBegin);
toCommit->writeTypedMessage(mutationBegin);
toCommit->addTags(allTags);
toCommit->addTypedMessage(mutationEnd);
toCommit->writeTypedMessage(mutationEnd);
}
}
}
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* toCommit, bool& confChange,
Version popVersion, bool initialCommit) {
void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena,
Reference<ILogSystem> logSystem, const VectorRef<MutationRef>& mutations,
LogPushData* toCommit, bool& confChange, Version popVersion, bool initialCommit) {
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
if (proxyCommitData.firstProxy) {
uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData;
}
applyMetadataMutations(proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, confChange,
logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo,
applyMetadataMutations(spanContext, proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit,
confChange, logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo,
&proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit,
proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache,
&proxyCommitData.tag_popped, initialCommit);
}
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore) {
void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena,
const VectorRef<MutationRef>& mutations, IKeyValueStore* txnStateStore) {
bool confChange; // Dummy variable, not used.
applyMetadataMutations(dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange,
applyMetadataMutations(spanContext, dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange,
Reference<ILogSystem>(), /* popVersion= */ 0, /* vecBackupKeys= */ nullptr,
/* keyInfo= */ nullptr, /* cacheInfo= */ nullptr, /* uid_applyMutationsData= */ nullptr,
RequestStream<CommitTransactionRequest>(), Database(), /* commitVersion= */ nullptr,

View File

@ -41,10 +41,10 @@ inline bool isMetadataMutation(MutationRef const& m) {
Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInfo>>* storageCache, IKeyValueStore* txnStateStore);
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* pToCommit, bool& confChange,
Version popVersion, bool initialCommit);
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore);
void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena,
Reference<ILogSystem> logSystem, const VectorRef<MutationRef>& mutations,
LogPushData* pToCommit, bool& confChange, Version popVersion, bool initialCommit);
void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena,
const VectorRef<MutationRef>& mutations, IKeyValueStore* txnStateStore);
#endif

View File

@ -61,8 +61,9 @@ struct VersionedMessage {
ArenaReader reader(arena, message, AssumeVersion(currentProtocolVersion));
// Return false for LogProtocolMessage.
// Return false for LogProtocolMessage and SpanContextMessage metadata messages.
if (LogProtocolMessage::isNextIn(reader)) return false;
if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) return false;
reader >> *m;
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;

View File

@ -87,6 +87,7 @@ set(FDBSERVER_SRCS
SimulatedCluster.actor.cpp
SimulatedCluster.h
SkipList.cpp
SpanContextMessage.h
Status.actor.cpp
Status.h
StorageCache.actor.cpp

View File

@ -295,6 +295,8 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
toCommit->addTransactionInfo(SpanID());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation)
{
@ -356,7 +358,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
auto& tags = self->tagsForKey(backupMutation.param1);
toCommit->addTags(tags);
toCommit->addTypedMessage(backupMutation);
toCommit->writeTypedMessage(backupMutation);
// if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) {
// TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString())
@ -395,7 +397,7 @@ struct CommitBatchContext {
int batchOperations = 0;
Span span = Span("MP:commitBatch"_loc);
Span span;
int64_t batchBytes = 0;
@ -475,7 +477,9 @@ CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_,
localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem),
committed(trs.size()) {
committed(trs.size()),
span("MP:commitBatch"_loc) {
evaluateBatchSize();
@ -530,6 +534,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
state const int64_t localBatchNumber = self->localBatchNumber;
state const int latencyBucket = self->latencyBucket;
state const Optional<UID>& debugID = self->debugID;
state Span span("MP:preresolutionProcessing"_loc, self->span.context);
// Pre-resolution the commits
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1);
@ -545,7 +550,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
"CommitProxyServer.commitBatch.GettingCommitVersion");
}
GetCommitVersionRequest req(self->span.context, pProxyCommitData->commitVersionRequestNumber++,
GetCommitVersionRequest req(span.context, pProxyCommitData->commitVersionRequestNumber++,
pProxyCommitData->mostRecentProcessedRequestNumber, pProxyCommitData->dbgid);
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
pProxyCommitData->master.getCommitVersion.getReply(
@ -581,13 +586,14 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
// resolution processing but is still using CPU
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
std::vector<CommitTransactionRequest>& trs = self->trs;
state Span span("MP:getResolution"_loc, self->span.context);
ResolutionRequestBuilder requests(
pProxyCommitData,
self->commitVersion,
self->prevVersion,
pProxyCommitData->version,
self->span
span
);
int conflictRangeCount = 0;
self->maxTransactionBytes = 0;
@ -659,7 +665,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
for (int resolver = 0; resolver < self->resolution.size(); resolver++)
committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
if (committed) {
applyMetadataMutations(*self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem,
applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem,
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
/* pToCommit= */ nullptr, self->forceRecovery,
/* popVersion= */ 0, /* initialCommit */ false);
@ -742,7 +748,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
for (t = 0; t < trs.size() && !self->forceRecovery; t++) {
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) {
self->commitCount++;
applyMetadataMutations(*pProxyCommitData, self->arena, pProxyCommitData->logSystem,
applyMetadataMutations(trs[t].spanContext, *pProxyCommitData, self->arena, pProxyCommitData->logSystem,
trs[t].transaction.mutations, &self->toCommit, self->forceRecovery,
self->commitVersion + 1, /* initialCommit= */ false);
}
@ -791,6 +797,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
for (; mutationNum < pMutations->size(); mutationNum++) {
if(self->yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
self->yieldBytes = 0;
@ -845,7 +854,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if(pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.addTypedMessage(m);
self->toCommit.writeTypedMessage(m);
}
else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
@ -896,7 +905,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if(pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.addTypedMessage(m);
self->toCommit.writeTypedMessage(m);
} else {
UNREACHABLE();
}
@ -950,6 +959,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
state std::vector<CommitTransactionRequest>& trs = self->trs;
state const int64_t localBatchNumber = self->localBatchNumber;
state const Optional<UID>& debugID = self->debugID;
state Span span("MP:postResolution"_loc, self->span.context);
TEST(pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
@ -1000,7 +1010,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {self->span.context});
waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {span.context});
choose{
when(wait(pProxyCommitData->committedVersion.whenAtLeast(self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
@ -1036,7 +1046,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
if(firstMessage) {
self->toCommit.addTxsTag();
}
self->toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage);
self->toCommit.writeMessage(StringRef(m.begin(), m.size()), !firstMessage);
firstMessage = false;
}
@ -1051,7 +1061,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
self->commitStartTime = now();
pProxyCommitData->lastStartCommit = self->commitStartTime;
self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->debugID );
self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, span.context, self->debugID );
if (!self->forceRecovery) {
ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber-1);
@ -1073,6 +1083,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:transactionLogging"_loc, self->span.context);
try {
choose {
@ -1108,6 +1119,7 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
ACTOR Future<Void> reply(CommitBatchContext* self) {
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:reply"_loc, self->span.context);
const Optional<UID>& debugID = self->debugID;
@ -1788,7 +1800,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy, MasterInter
Arena arena;
bool confChanges;
applyMetadataMutations(commitData, arena, Reference<ILogSystem>(), mutations,
applyMetadataMutations(SpanID(), commitData, arena, Reference<ILogSystem>(), mutations,
/* pToCommit= */ nullptr, confChanges,
/* popVersion= */ 0, /* initialCommit= */ true);
}

View File

@ -24,11 +24,13 @@
#include <set>
#include <vector>
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/MutationTracking.h"
#include "flow/IndexedSet.h"
#include "flow/Knobs.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Replication.h"
@ -678,7 +680,7 @@ struct ILogSystem {
// Never returns normally, but throws an error if the subsystem stops working
//Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, Optional<UID> debugID = Optional<UID>() ) = 0;
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, SpanID const& spanContext, Optional<UID> debugID = Optional<UID>() ) = 0;
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier)
// Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N)
// Changes the version number of the bundle to be version (unblocking the next push)
@ -828,6 +830,18 @@ struct CompareFirst {
}
};
// Structure to store serialized mutations sent from the proxy to the
// transaction logs. The serialization repeats with the following format:
//
// +----------------------+ +----------------------+ +----------+ +----------------+ +----------------------+
// | Message size | | Subsequence | | # of tags| | Tag | . . . . | Mutation |
// +----------------------+ +----------------------+ +----------+ +----------------+ +----------------------+
// <------- 32 bits ------> <------- 32 bits ------> <- 16 bits-> <---- 24 bits ---> <---- variable bits --->
//
// `Mutation` can be a serialized MutationRef or a special metadata message
// such as LogProtocolMessage or SpanContextMessage. The type of `Mutation` is
// uniquely identified by its first byte -- a value from MutationRef::Type.
//
struct LogPushData : NonCopyable {
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
@ -859,7 +873,15 @@ struct LogPushData : NonCopyable {
next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end());
}
void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations ) {
// Add transaction info to be written before the first mutation in the transaction.
void addTransactionInfo(SpanID const& context) {
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
spanContext = context;
transactionSubseq = 0;
writtenLocations.clear();
}
void writeMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations ) {
if( !usePreviousLocations ) {
prev_tags.clear();
if(logSystem->hasRemoteLogs()) {
@ -875,15 +897,16 @@ struct LogPushData : NonCopyable {
uint32_t subseq = this->subsequence++;
uint32_t msgsize = rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size();
for(int loc : msg_locations) {
messagesWriter[loc] << msgsize << subseq << uint16_t(prev_tags.size());
BinaryWriter& wr = messagesWriter[loc];
wr << msgsize << subseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
messagesWriter[loc] << tag;
messagesWriter[loc].serializeBytes(rawMessageWithoutLength);
wr << tag;
wr.serializeBytes(rawMessageWithoutLength);
}
}
template <class T>
void addTypedMessage(T const& item, bool allLocations = false) {
void writeTypedMessage(T const& item, bool metadataMessage = false, bool allLocations = false) {
prev_tags.clear();
if(logSystem->hasRemoteLogs()) {
prev_tags.push_back( logSystem->getRandomRouterTag() );
@ -895,12 +918,31 @@ struct LogPushData : NonCopyable {
logSystem->getPushLocations(prev_tags, msg_locations, allLocations);
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
// Metadata messages should be written before span information. If this
// isn't a metadata message, make sure all locations have had
// transaction info written to them. Mutations may have different sets
// of tags, so it is necessary to check all tag locations each time a
// mutation is written.
if (!metadataMessage) {
// If span information hasn't been written for this transaction yet,
// generate a subsequence value for the message.
if (!transactionSubseq) {
transactionSubseq = this->subsequence++;
}
for (int loc : msg_locations) {
writeTransactionInfo(loc);
}
}
uint32_t subseq = this->subsequence++;
bool first = true;
int firstOffset=-1, firstLength=-1;
for(int loc : msg_locations) {
if (first) {
BinaryWriter& wr = messagesWriter[loc];
if (first) {
firstOffset = wr.getLength();
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
@ -911,7 +953,6 @@ struct LogPushData : NonCopyable {
DEBUG_TAGS_AND_MESSAGE("ProxyPushLocations", invalidVersion, StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength)).detail("PushLocations", msg_locations);
first = false;
} else {
BinaryWriter& wr = messagesWriter[loc];
BinaryWriter& from = messagesWriter[msg_locations[0]];
wr.serializeBytes( (uint8_t*)from.getData() + firstOffset, firstLength );
}
@ -929,7 +970,39 @@ private:
std::vector<Tag> prev_tags;
std::vector<BinaryWriter> messagesWriter;
std::vector<int> msg_locations;
// Stores message locations that have had span information written to them
// for the current transaction. Adding transaction info will reset this
// field.
std::unordered_set<int> writtenLocations;
uint32_t subsequence;
// Store transaction subsequence separately, as multiple mutations may need
// to write transaction info. This can happen if later mutations in a
// transaction need to write to a different location than earlier
// mutations.
uint32_t transactionSubseq;
SpanID spanContext;
// Writes transaction info to the message stream for the given location if
// it has not already been written (for the current transaction).
void writeTransactionInfo(int location) {
if (!FLOW_KNOBS->WRITE_TRACING_ENABLED) {
return;
}
if (writtenLocations.count(location) == 0) {
writtenLocations.insert(location);
BinaryWriter& wr = messagesWriter[location];
SpanContextMessage contextMessage(spanContext);
int offset = wr.getLength();
wr << uint32_t(0) << transactionSubseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
wr << tag;
wr << contextMessage;
int length = wr.getLength() - offset;
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
}
}
};
#endif

View File

@ -21,6 +21,7 @@
#include <vector>
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED
#error "You cannot use mutation tracking in a clean/release build."
@ -71,6 +72,10 @@ TraceEvent debugTagsAndMessageEnabled( const char* context, Version version, Str
LogProtocolMessage lpm;
br >> lpm;
rdr.setProtocolVersion(br.protocolVersion());
} else if (SpanContextMessage::startsSpanContextMessage(mutationType)) {
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
SpanContextMessage scm;
br >> scm;
} else {
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));

View File

@ -105,6 +105,7 @@ ACTOR Future<Void> resolveBatch(
ResolveTransactionBatchRequest req)
{
state Optional<UID> debugID;
state Span span("R:resolveBatch"_loc, req.spanContext);
// The first request (prevVersion < 0) comes from the master
state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();

View File

@ -0,0 +1,60 @@
/*
* SpanContextMessage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_SPANCONTEXTMESSAGE_H
#define FDBSERVER_SPANCONTEXTMESSAGE_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
struct SpanContextMessage {
// This message is pushed into the the transaction logs' memory to inform
// it what transaction subsequent mutations were a part of. This allows
// transaction logs and storage servers to associate mutations with a
// transaction identifier, called a span context.
//
// This message is similar to LogProtocolMessage. Storage servers read the
// first byte of this message to uniquely identify it, meaning it will
// never be mistaken for another message. See LogProtocolMessage.h for more
// information.
SpanID spanContext;
SpanContextMessage() {}
SpanContextMessage(SpanID const& spanContext) : spanContext(spanContext) {}
std::string toString() const {
return format("code: %d, span context: %s", MutationRef::Reserved_For_SpanContextMessage, spanContext.toString().c_str());
}
template <class Ar>
void serialize(Ar& ar) {
uint8_t poly = MutationRef::Reserved_For_SpanContextMessage;
serializer(ar, poly, spanContext);
}
static bool startsSpanContextMessage(uint8_t byte) {
return byte == MutationRef::Reserved_For_SpanContextMessage;
}
template <class Ar> static bool isNextIn(Ar& ar) { return startsSpanContextMessage(*(const uint8_t*)ar.peekBytes(1)); }
};
#endif

View File

@ -1763,6 +1763,10 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
dbgLastMessageWasProtocol = true;
cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
}
else if (cloneReader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
}
else {
MutationRef msg;
cloneReader >> msg;
@ -1835,6 +1839,10 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
data->logProtocol = reader.protocolVersion();
cloneCursor2->setProtocolVersion(data->logProtocol);
}
else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) {
SpanContextMessage scm;
reader >> scm;
}
else {
MutationRef msg;
reader >> msg;

View File

@ -240,6 +240,7 @@ struct TLogCommitReply {
struct TLogCommitRequest {
constexpr static FileIdentifier file_identifier = 4022206;
SpanID spanContext;
Arena arena;
Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
@ -249,11 +250,11 @@ struct TLogCommitRequest {
Optional<UID> debugID;
TLogCommitRequest() {}
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
TLogCommitRequest( const SpanID& context, const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: spanContext(context), arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID);
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID, spanContext);
}
};

View File

@ -28,6 +28,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/IKeyValueStore.h"
@ -1855,6 +1856,7 @@ ACTOR Future<Void> tLogCommit(
TLogCommitRequest req,
Reference<LogData> logData,
PromiseStream<Void> warningCollectorInput ) {
state Span span("TLog:tLogCommit"_loc, req.spanContext);
state Optional<UID> tlogDebugID;
if(req.debugID.present())
{

View File

@ -527,11 +527,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Future<Version> push(Version prevVersion, Version version, Version knownCommittedVersion,
Version minKnownCommittedVersion, LogPushData& data, Optional<UID> debugID) final {
Version minKnownCommittedVersion, LogPushData& data,
SpanID const& spanContext, Optional<UID> debugID) final {
// FIXME: Randomize request order as in LegacyLogSystem?
vector<Future<Void>> quorumResults;
vector<Future<TLogCommitReply>> allReplies;
int location = 0;
Span span("TPLS:push"_loc, spanContext);
for(auto& it : tLogs) {
if(it->isLocal && it->logServers.size()) {
if(it->connectionResetTrackers.size() == 0) {
@ -542,7 +544,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) );
allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( spanContext, msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -1627,7 +1627,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
}
}
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
applyMetadataMutations(SpanID(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
self->txnStateStore);
mmApplied = tr.mutations.size();

View File

@ -43,6 +43,7 @@
#include "fdbserver/Knobs.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/MutationTracking.h"
@ -2785,6 +2786,7 @@ private:
ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
{
state double start;
state Span span("SS:update"_loc);
try {
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
// This is often referred to as the storage server e-brake (emergency brake)
@ -2856,6 +2858,10 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
dbgLastMessageWasProtocol = true;
cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
}
else if (cloneReader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
}
else {
MutationRef msg;
cloneReader >> msg;
@ -2949,6 +2955,11 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
data->storage.changeLogProtocol(ver, data->logProtocol);
cloneCursor2->setProtocolVersion(rd.protocolVersion());
}
else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
SpanContextMessage scm;
rd >> scm;
span.addParent(scm.spanContext);
}
else {
MutationRef msg;
rd >> msg;

View File

@ -353,6 +353,7 @@ struct TLogOptions {
"_LS_" + boost::lexical_cast<std::string>(spillType);
break;
case TLogVersion::V5:
case TLogVersion::V6:
toReturn = "V_" + boost::lexical_cast<std::string>(version);
break;
}
@ -374,6 +375,7 @@ TLogFn tLogFnForOptions( TLogOptions options ) {
else
return oldTLog_6_2::tLog;
case TLogVersion::V5:
case TLogVersion::V6:
return tLog;
default:
ASSERT(false);

View File

@ -31,7 +31,7 @@ static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory", "memory-1
static const char* logTypes[] = {
"log_engine:=1", "log_engine:=2",
"log_spill:=1", "log_spill:=2",
"log_version:=2", "log_version:=3", "log_version:=4"
"log_version:=2", "log_version:=3", "log_version:=4", "log_version:=5", "log_version:=6"
};
static const char* redundancies[] = { "single", "double", "triple" };
static const char* backupTypes[] = { "backup_worker_enabled:=0", "backup_worker_enabled:=1" };

View File

@ -61,6 +61,8 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
//connectionMonitor
init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0;
init( CONNECTION_MONITOR_TIMEOUT, isSimulated ? 1.50 : 2.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_TIMEOUT = 6.0;

View File

@ -69,6 +69,8 @@ public:
double HUGE_ARENA_LOGGING_BYTES;
double HUGE_ARENA_LOGGING_INTERVAL;
bool WRITE_TRACING_ENABLED;
//run loop profiling
double RUN_LOOP_PROFILING_INTERVAL;
double SLOWTASK_PROFILING_LOG_INTERVAL;

View File

@ -129,6 +129,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
};
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.