From 1ca7fe1a05127a0248bd2a77afc2294a6de983b7 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 27 Aug 2020 15:11:16 -0700 Subject: [PATCH 01/13] Add span metadata message --- fdbclient/CommitTransaction.h | 2 + fdbclient/FDBTypes.h | 2 + fdbclient/MasterProxyInterface.h | 3 +- fdbclient/NativeAPI.actor.cpp | 2 +- fdbserver/ApplyMetadataMutation.cpp | 57 +++++++------ fdbserver/BackupWorker.actor.cpp | 3 +- fdbserver/CMakeLists.txt | 2 + fdbserver/LogSystem.h | 83 +++++++++++++++++-- fdbserver/MutationTracking.cpp | 5 ++ fdbserver/SpanContextMessage.h | 60 ++++++++++++++ fdbserver/StorageCache.actor.cpp | 8 ++ fdbserver/TLogServer.actor.cpp | 2 + fdbserver/storageserver.actor.cpp | 11 +++ .../workloads/ConfigureDatabase.actor.cpp | 2 +- 14 files changed, 205 insertions(+), 37 deletions(-) create mode 100644 fdbserver/SpanContextMessage.h diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index bc74941704..b2776a4dcd 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -49,6 +49,7 @@ static const char* typeString[] = { "SetValue", "MinV2", "AndV2", "CompareAndClear", + "Reserved_For_SpanContextMessage", "MAX_ATOMIC_OP" }; struct MutationRef { @@ -75,6 +76,7 @@ struct MutationRef { MinV2, AndV2, CompareAndClear, + Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */, MAX_ATOMIC_OP }; // This is stored this way for serialization purposes. diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 7e16dcd75f..80e8a8bb2c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -129,9 +129,11 @@ enum { txsTagOld = -1, invalidTagOld = -100 }; struct TagsAndMessage { StringRef message; + // SpanID spanContext; VectorRef tags; TagsAndMessage() {} + // TagsAndMessage(SpanID spanContext) : spanContext(spanContext) {} TagsAndMessage(StringRef message, VectorRef tags) : message(message), tags(tags) {} // Loads tags and message from a serialized buffer. "rd" is checkpointed at diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 9e2b49037c..fbd98c6c2b 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -165,7 +165,8 @@ struct CommitTransactionRequest : TimedRequest { Optional commitCostEstimation; Optional tagSet; - CommitTransactionRequest() : flags(0) {} + CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {} + CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {} template void serialize(Ar& ar) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index fba6fdf6f8..7099d6fbea 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2678,7 +2678,7 @@ void debugAddTags(Transaction *tr) { Transaction::Transaction(Database const& cx) : cx(cx), info(cx->taskID, deterministicRandom()->randomUniqueID()), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), versionstampPromise(Promise>()), options(cx), numErrors(0), - trLogInfo(createTrLogInfoProbabilistically(cx)), span(info.spanID, "Transaction"_loc) { + trLogInfo(createTrLogInfoProbabilistically(cx)), tr(info.spanID), span(info.spanID, "Transaction"_loc) { if (DatabaseContext::debugUseTags) { debugAddTags(this); } diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 23466ece9f..5872fdca0b 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -45,16 +45,21 @@ Reference getStorageInfo(UID id, std::map const& mutations, - IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange, - Reference logSystem, Version popVersion, - KeyRangeMap>* vecBackupKeys, KeyRangeMap* keyInfo, - KeyRangeMap* cacheInfo, std::map* uid_applyMutationsData, +void applyMetadataMutations(SpanID const& spanContext, UID const& dbgid, Arena& arena, + VectorRef const& mutations, IKeyValueStore* txnStateStore, + LogPushData* toCommit, bool& confChange, Reference logSystem, + Version popVersion, KeyRangeMap>* vecBackupKeys, + KeyRangeMap* keyInfo, KeyRangeMap* cacheInfo, + std::map* uid_applyMutationsData, RequestStream commit, Database cx, NotifiedVersion* commitVersion, std::map>* storageCache, std::map* tag_popped, bool initialCommit) { //std::map> cacheRangeInfo; std::map cachedRangeInfo; + if (toCommit) { + toCommit->addTransactionInfo(spanContext); + } + for (auto const& m : mutations) { //TraceEvent("MetadataMutation", dbgid).detail("M", m.toString()); @@ -102,7 +107,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefreadValue( serverTagKeyFor( serverKeysDecodeServer(m.param1) ) ).get().get() ).toString()); toCommit->addTag( decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor( serverKeysDecodeServer(m.param1) ) ).get().get() ) ); - toCommit->addTypedMessage(privatized); + toCommit->writeTypedMessage(privatized); } } else if (m.param1.startsWith(serverTagPrefix)) { UID id = decodeServerTagKey(m.param1); @@ -114,9 +119,9 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefaddTag(tag); - toCommit->addTypedMessage(LogProtocolMessage()); + toCommit->writeTypedMessage(LogProtocolMessage(), true); toCommit->addTag(tag); - toCommit->addTypedMessage(privatized); + toCommit->writeTypedMessage(privatized); } if(!initialCommit) { txnStateStore->set(KeyValueRef(m.param1, m.param2)); @@ -168,7 +173,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefaddTag( cacheTag ); - toCommit->addTypedMessage(privatized); + toCommit->writeTypedMessage(privatized); } } else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) { @@ -285,13 +290,13 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefaddTags(allTags); - toCommit->addTypedMessage(LogProtocolMessage()); + toCommit->writeTypedMessage(LogProtocolMessage(), true); } MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); toCommit->addTags(allTags); - toCommit->addTypedMessage(privatized); + toCommit->writeTypedMessage(privatized); } } else if (m.param1 == minRequiredCommitVersionKey) { @@ -347,7 +352,7 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefaddTag(decodeServerTagValue(kv.value)); - toCommit->addTypedMessage(privatized); + toCommit->writeTypedMessage(privatized); } } } @@ -538,37 +543,37 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRefaddTags(allTags); - toCommit->addTypedMessage(mutationBegin); + toCommit->writeTypedMessage(mutationBegin); toCommit->addTags(allTags); - toCommit->addTypedMessage(mutationEnd); + toCommit->writeTypedMessage(mutationEnd); } } } -void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, - const VectorRef& mutations, LogPushData* toCommit, bool& confChange, - Version popVersion, bool initialCommit) { +void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, + Reference logSystem, const VectorRef& mutations, + LogPushData* toCommit, bool& confChange, Version popVersion, bool initialCommit) { std::map* uid_applyMutationsData = nullptr; if (proxyCommitData.firstProxy) { uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData; } - applyMetadataMutations(proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, confChange, - logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo, - &proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit, - proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache, - &proxyCommitData.tag_popped, initialCommit); + applyMetadataMutations(spanContext, proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, + confChange, logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo, + &proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit, + proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache, + &proxyCommitData.tag_popped, initialCommit); } -void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef& mutations, - IKeyValueStore* txnStateStore) { +void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, + const VectorRef& mutations, IKeyValueStore* txnStateStore) { bool confChange; // Dummy variable, not used. - applyMetadataMutations(dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange, + applyMetadataMutations(spanContext, dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange, Reference(), /* popVersion= */ 0, /* vecBackupKeys= */ nullptr, /* keyInfo= */ nullptr, /* cacheInfo= */ nullptr, /* uid_applyMutationsData= */ nullptr, RequestStream(), Database(), /* commitVersion= */ nullptr, /* storageCache= */ nullptr, /* tag_popped= */ nullptr, /* initialCommit= */ false); -} \ No newline at end of file +} diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 5860a6772a..6625ca6827 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -61,8 +61,9 @@ struct VersionedMessage { ArenaReader reader(arena, message, AssumeVersion(currentProtocolVersion)); - // Return false for LogProtocolMessage. + // Return false for metadata messages LogProtocolMessage and SpanContextMessage. if (LogProtocolMessage::isNextIn(reader)) return false; + if (SpanContextMessage::isNextIn(reader)) return false; reader >> *m; return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 40b03e1aaa..965fd8ab0a 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -87,6 +87,7 @@ set(FDBSERVER_SRCS SimulatedCluster.actor.cpp SimulatedCluster.h SkipList.cpp + SpanContextMessage.h Status.actor.cpp Status.h StorageCache.actor.cpp @@ -127,6 +128,7 @@ set(FDBSERVER_SRCS workloads/BackupToDBUpgrade.actor.cpp workloads/BulkLoad.actor.cpp workloads/BulkSetup.actor.h + workloads/Basic.actor.cpp workloads/Cache.actor.cpp workloads/ChangeConfig.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 2e2a5997e7..c50b1ea5a3 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -24,6 +24,7 @@ #include #include +#include "fdbserver/SpanContextMessage.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbclient/DatabaseConfiguration.h" @@ -828,6 +829,18 @@ struct CompareFirst { } }; +// Structure to store serialized mutations sent from the proxy to the +// transaction logs. The serialization repeats with the following format: +// +// +----------------------+ +----------------------+ +----------+ +----------------+ +----------------------+ +// | Message size | | Subsequence | | # of tags| | Tag | . . . . | Mutation | +// +----------------------+ +----------------------+ +----------+ +----------------+ +----------------------+ +// <------- 32 bits ------> <------- 32 bits ------> <- 16 bits-> <---- 24 bits ---> <---- variable bits ---> +// +// `Mutation` can be a serialized MutationRef or a special metadata message +// such as LogProtocolMessage or SpanContextMessage. The type of `Mutation` is +// uniquely identified by its first byte -- a value from MutationRef::Type. +// struct LogPushData : NonCopyable { // Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version @@ -859,7 +872,14 @@ struct LogPushData : NonCopyable { next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end()); } - void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations ) { + // Add transaction info to be written before the first mutation in the transaction. + void addTransactionInfo(SpanID const& context) { + spanContext = context; + transactionSubseq = 0; + writtenLocations.clear(); + } + + void writeMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations ) { if( !usePreviousLocations ) { prev_tags.clear(); if(logSystem->hasRemoteLogs()) { @@ -875,15 +895,16 @@ struct LogPushData : NonCopyable { uint32_t subseq = this->subsequence++; uint32_t msgsize = rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size(); for(int loc : msg_locations) { - messagesWriter[loc] << msgsize << subseq << uint16_t(prev_tags.size()); + BinaryWriter& wr = messagesWriter[loc]; + wr << msgsize << subseq << uint16_t(prev_tags.size()); for(auto& tag : prev_tags) - messagesWriter[loc] << tag; - messagesWriter[loc].serializeBytes(rawMessageWithoutLength); + wr << tag; + wr.serializeBytes(rawMessageWithoutLength); } } template - void addTypedMessage(T const& item, bool allLocations = false) { + void writeTypedMessage(T const& item, bool metadataMessage = false, bool allLocations = false) { prev_tags.clear(); if(logSystem->hasRemoteLogs()) { prev_tags.push_back( logSystem->getRandomRouterTag() ); @@ -895,12 +916,31 @@ struct LogPushData : NonCopyable { logSystem->getPushLocations(prev_tags, msg_locations, allLocations); BinaryWriter bw(AssumeVersion(currentProtocolVersion)); + + // Metadata messages should be written before span information. If this + // isn't a metadata message, make sure all locations have had + // transaction info written to them. Mutations may have different sets + // of tags, so it is necessary to check all tag locations each time a + // mutation is written. + if (!metadataMessage) { + // If span information hasn't been written for this transaction yet, + // generate a subsequence value for the message. + if (!transactionSubseq) { + transactionSubseq = this->subsequence++; + } + + for (int loc : msg_locations) { + writeTransactionInfo(loc); + } + } + uint32_t subseq = this->subsequence++; bool first = true; int firstOffset=-1, firstLength=-1; for(int loc : msg_locations) { + BinaryWriter& wr = messagesWriter[loc]; + if (first) { - BinaryWriter& wr = messagesWriter[loc]; firstOffset = wr.getLength(); wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); for(auto& tag : prev_tags) @@ -911,7 +951,6 @@ struct LogPushData : NonCopyable { DEBUG_TAGS_AND_MESSAGE("ProxyPushLocations", invalidVersion, StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength)).detail("PushLocations", msg_locations); first = false; } else { - BinaryWriter& wr = messagesWriter[loc]; BinaryWriter& from = messagesWriter[msg_locations[0]]; wr.serializeBytes( (uint8_t*)from.getData() + firstOffset, firstLength ); } @@ -929,7 +968,37 @@ private: std::vector prev_tags; std::vector messagesWriter; std::vector msg_locations; + // Stores message locations that have had span information written to them + // for the current transaction. Adding transaction info will reset this + // field. + std::unordered_set writtenLocations; uint32_t subsequence; + // Store transaction subsequence separately, as multiple mutations may need + // to write transaction info. This can happen if later mutations in a + // transaction need to write to a different location than earlier + // mutations. + uint32_t transactionSubseq; + SpanID spanContext; + + // Writes transaction info to the message stream for the given location if + // it has not already been written (for the current transaction). + void writeTransactionInfo(int location) { + if (writtenLocations.count(location) == 0) { + writtenLocations.insert(location); + + BinaryWriter& wr = messagesWriter[location]; + SpanContextMessage contextMessage(spanContext); + + int offset = wr.getLength(); + wr << uint32_t(0) << transactionSubseq << uint16_t(prev_tags.size()); + for(auto& tag : prev_tags) + wr << tag; + wr << contextMessage; + int length = wr.getLength() - offset; + *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); + } + } + }; #endif diff --git a/fdbserver/MutationTracking.cpp b/fdbserver/MutationTracking.cpp index 9d090d59e3..f273057df7 100644 --- a/fdbserver/MutationTracking.cpp +++ b/fdbserver/MutationTracking.cpp @@ -21,6 +21,7 @@ #include #include "fdbserver/MutationTracking.h" #include "fdbserver/LogProtocolMessage.h" +#include "fdbserver/SpanContextMessage.h" #if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED #error "You cannot use mutation tracking in a clean/release build." @@ -71,6 +72,10 @@ TraceEvent debugTagsAndMessageEnabled( const char* context, Version version, Str LogProtocolMessage lpm; br >> lpm; rdr.setProtocolVersion(br.protocolVersion()); + } else if (SpanContextMessage::startsSpanContextMessage(mutationType)) { + BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); + SpanContextMessage scm; + br >> scm; } else { MutationRef m; BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); diff --git a/fdbserver/SpanContextMessage.h b/fdbserver/SpanContextMessage.h new file mode 100644 index 0000000000..da94fcc485 --- /dev/null +++ b/fdbserver/SpanContextMessage.h @@ -0,0 +1,60 @@ +/* + * SpanContextMessage.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_SPANCONTEXTMESSAGE_H +#define FDBSERVER_SPANCONTEXTMESSAGE_H +#pragma once + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/CommitTransaction.h" + +struct SpanContextMessage { + // This message is pushed into the the transaction logs' memory to inform + // it what transaction subsequent mutations were a part of. This allows + // transaction logs and storage servers to associate mutations with a + // transaction identifier, called a span context. + // + // This message is similar to LogProtocolMessage. Storage servers read the + // first byte of this message to uniquely identify it, meaning it will + // never be mistaken for another message. See LogProtocolMessage.h for more + // information. + + SpanID spanContext; + + SpanContextMessage() {} + SpanContextMessage(SpanID const& spanContext) : spanContext(spanContext) {} + + std::string toString() const { + return format("code: %d, span context: %s", MutationRef::Reserved_For_SpanContextMessage, spanContext.toString().c_str()); + } + + template + void serialize(Ar& ar) { + uint8_t poly = MutationRef::Reserved_For_SpanContextMessage; + serializer(ar, poly, spanContext); + } + + static bool startsSpanContextMessage(uint8_t byte) { + return byte == MutationRef::Reserved_For_SpanContextMessage; + } + template static bool isNextIn(Ar& ar) { return startsSpanContextMessage(*(const uint8_t*)ar.peekBytes(1)); } +}; + +#endif diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 44a3edfbfd..31ee4a4cb3 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1763,6 +1763,10 @@ ACTOR Future pullAsyncData( StorageCacheData *data ) { dbgLastMessageWasProtocol = true; cloneCursor1->setProtocolVersion(cloneReader.protocolVersion()); } + else if (SpanContextMessage::isNextIn(cloneReader)) { + SpanContextMessage scm; + cloneReader >> scm; + } else { MutationRef msg; cloneReader >> msg; @@ -1835,6 +1839,10 @@ ACTOR Future pullAsyncData( StorageCacheData *data ) { data->logProtocol = reader.protocolVersion(); cloneCursor2->setProtocolVersion(data->logProtocol); } + else if (SpanContextMessage::isNextIn(reader)) { + SpanContextMessage scm; + reader >> scm; + } else { MutationRef msg; reader >> msg; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 84c1655cac..ab015b1458 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -28,6 +28,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/LogProtocolMessage.h" +#include "fdbserver/SpanContextMessage.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/Knobs.h" #include "fdbserver/IKeyValueStore.h" @@ -1393,6 +1394,7 @@ void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req ACTOR Future> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) { // See the comment in LogSystem.cpp for the binary format of commitBlob. state std::vector relevantMessages; + // TODO: Change to passed in protocol version state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion)); while (!rd.empty()) { TagsAndMessage tagsAndMessage; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 561cbd85da..3d8822e1ae 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -43,6 +43,7 @@ #include "fdbserver/Knobs.h" #include "fdbserver/LatencyBandConfig.h" #include "fdbserver/LogProtocolMessage.h" +#include "fdbserver/SpanContextMessage.h" #include "fdbserver/LogSystem.h" #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/MutationTracking.h" @@ -2847,6 +2848,11 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) dbgLastMessageWasProtocol = true; cloneCursor1->setProtocolVersion(cloneReader.protocolVersion()); } + else if (SpanContextMessage::isNextIn(cloneReader)) { + SpanContextMessage scm; + cloneReader >> scm; + // TODO: Set span context state here + } else { MutationRef msg; cloneReader >> msg; @@ -2940,6 +2946,11 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) data->storage.changeLogProtocol(ver, data->logProtocol); cloneCursor2->setProtocolVersion(rd.protocolVersion()); } + else if (SpanContextMessage::isNextIn(rd)) { + SpanContextMessage scm; + rd >> scm; + // TODO: Set span context state here + } else { MutationRef msg; rd >> msg; diff --git a/fdbserver/workloads/ConfigureDatabase.actor.cpp b/fdbserver/workloads/ConfigureDatabase.actor.cpp index dca9994710..8536361eba 100644 --- a/fdbserver/workloads/ConfigureDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigureDatabase.actor.cpp @@ -31,7 +31,7 @@ static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory", "memory-1 static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2", - "log_version:=2", "log_version:=3", "log_version:=4" + "log_version:=2", "log_version:=3", "log_version:=4", "log_version:=5" }; static const char* redundancies[] = { "single", "double", "triple" }; static const char* backupTypes[] = { "backup_worker_enabled:=0", "backup_worker_enabled:=1" }; From 2a58e775d2f2c464d652b53dd085e2f3b89830af Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 27 Aug 2020 16:16:05 -0700 Subject: [PATCH 02/13] Add original changes --- fdbserver/ApplyMetadataMutation.h | 10 ++++---- fdbserver/LogSystem.h | 2 +- fdbserver/MasterProxyServer.actor.cpp | 27 +++++++++++++-------- fdbserver/TLogInterface.h | 7 +++--- fdbserver/TagPartitionedLogSystem.actor.cpp | 5 ++-- fdbserver/masterserver.actor.cpp | 2 +- 6 files changed, 31 insertions(+), 22 deletions(-) diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index 52355f2c19..5a06a31c40 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -39,10 +39,10 @@ inline bool isMetadataMutation(MutationRef const& m) { Reference getStorageInfo(UID id, std::map>* storageCache, IKeyValueStore* txnStateStore); -void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, - const VectorRef& mutations, LogPushData* pToCommit, bool& confChange, - Version popVersion, bool initialCommit); -void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef& mutations, - IKeyValueStore* txnStateStore); +void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, + Reference logSystem, const VectorRef& mutations, + LogPushData* pToCommit, bool& confChange, Version popVersion, bool initialCommit); +void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, + const VectorRef& mutations, IKeyValueStore* txnStateStore); #endif diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index c50b1ea5a3..b683646d81 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -679,7 +679,7 @@ struct ILogSystem { // Never returns normally, but throws an error if the subsystem stops working //Future push( UID bundle, int64_t seq, VectorRef messages ); - virtual Future push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, Optional debugID = Optional() ) = 0; + virtual Future push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, SpanID const& spanContext, Optional debugID = Optional() ) = 0; // Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier) // Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N) // Changes the version number of the bundle to be version (unblocking the next push) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7d8a36a66a..7a2ffea45d 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -296,6 +296,8 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapaddTransactionInfo(SpanID()); + // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation) { @@ -357,7 +359,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::maptagsForKey(backupMutation.param1); toCommit->addTags(tags); - toCommit->addTypedMessage(backupMutation); + toCommit->writeTypedMessage(backupMutation); // if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) { // TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString()) @@ -396,7 +398,7 @@ struct CommitBatchContext { int batchOperations = 0; - Span span = Span("MP:commitBatch"_loc); + Span span; int64_t batchBytes = 0; @@ -476,7 +478,9 @@ CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_, localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem), - committed(trs.size()) { + committed(trs.size()), + + span("MP:commitBatch"_loc) { evaluateBatchSize(); @@ -671,7 +675,7 @@ void applyMetadataEffect(CommitBatchContext* self) { for (int resolver = 0; resolver < self->resolution.size(); resolver++) committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) { - applyMetadataMutations(*self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, + applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations, /* pToCommit= */ nullptr, self->forceRecovery, /* popVersion= */ 0, /* initialCommit */ false); @@ -754,7 +758,7 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self for (t = 0; t < trs.size() && !self->forceRecovery; t++) { if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) { self->commitCount++; - applyMetadataMutations(*pProxyCommitData, self->arena, pProxyCommitData->logSystem, + applyMetadataMutations(trs[t].spanContext, *pProxyCommitData, self->arena, pProxyCommitData->logSystem, trs[t].transaction.mutations, &self->toCommit, self->forceRecovery, self->commitVersion + 1, /* initialCommit= */ false); } @@ -803,6 +807,9 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { state Optional* trCost = &trs[self->transactionNum].commitCostEstimation; state int mutationNum = 0; state VectorRef* pMutations = &trs[self->transactionNum].transaction.mutations; + + self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext); + for (; mutationNum < pMutations->size(); mutationNum++) { if(self->yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) { self->yieldBytes = 0; @@ -857,7 +864,7 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if(pProxyCommitData->cacheInfo[m.param1]) { self->toCommit.addTag(cacheTag); } - self->toCommit.addTypedMessage(m); + self->toCommit.writeTypedMessage(m); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); @@ -908,7 +915,7 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if(pProxyCommitData->needsCacheTag(clearRange)) { self->toCommit.addTag(cacheTag); } - self->toCommit.addTypedMessage(m); + self->toCommit.writeTypedMessage(m); } else { UNREACHABLE(); } @@ -1049,7 +1056,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { if(firstMessage) { self->toCommit.addTxsTag(); } - self->toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage); + self->toCommit.writeMessage(StringRef(m.begin(), m.size()), !firstMessage); firstMessage = false; } @@ -1064,7 +1071,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { self->commitStartTime = now(); pProxyCommitData->lastStartCommit = self->commitStartTime; - self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->debugID ); + self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->span.context, self->debugID ); if (!self->forceRecovery) { ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber-1); @@ -1806,7 +1813,7 @@ ACTOR Future masterProxyServerCore( Arena arena; bool confChanges; - applyMetadataMutations(commitData, arena, Reference(), mutations, + applyMetadataMutations(SpanID(), commitData, arena, Reference(), mutations, /* pToCommit= */ nullptr, confChanges, /* popVersion= */ 0, /* initialCommit= */ true); } diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index f5afa5df87..6fff6fb1f5 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -240,6 +240,7 @@ struct TLogCommitReply { struct TLogCommitRequest { constexpr static FileIdentifier file_identifier = 4022206; + SpanID spanContext; Arena arena; Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion; @@ -249,11 +250,11 @@ struct TLogCommitRequest { Optional debugID; TLogCommitRequest() {} - TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional debugID ) - : arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {} + TLogCommitRequest( const SpanID& context, const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional debugID ) + : spanContext(context), arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID); + serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, spanContext, debugID); } }; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 529cf58ac9..09e90d6eb1 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -527,7 +527,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted push(Version prevVersion, Version version, Version knownCommittedVersion, - Version minKnownCommittedVersion, LogPushData& data, Optional debugID) final { + Version minKnownCommittedVersion, LogPushData& data, + SpanID const& spanContext, Optional debugID) final { // FIXME: Randomize request order as in LegacyLogSystem? vector> quorumResults; vector> allReplies; @@ -542,7 +543,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> tLogCommitResults; for(int loc=0; loc< it->logServers.size(); loc++) { Standalone msg = data.getMessages(location); - allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) ); + allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( spanContext, msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) ); Future commitSuccess = success(allReplies.back()); addActor.get().send(commitSuccess); tLogCommitResults.push_back(commitSuccess); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index ce5c993d77..bc1168c58a 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1613,7 +1613,7 @@ ACTOR Future masterCore( Reference self ) { } } - applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), + applyMetadataMutations(SpanID(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore); mmApplied = tr.mutations.size(); From f896c6899603d52ee26230e55595d0bfea37ff53 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 27 Aug 2020 17:39:09 -0700 Subject: [PATCH 03/13] Cleanup --- fdbserver/ApplyMetadataMutation.cpp | 8 ++--- fdbserver/CMakeLists.txt | 1 - fdbserver/LogSystem.h | 1 - fdbserver/workloads/Basic.actor.cpp | 50 +++++++++++++++++++++++++++++ tests/AsyncFileCorrectness.txt | 12 ------- 5 files changed, 54 insertions(+), 18 deletions(-) create mode 100644 fdbserver/workloads/Basic.actor.cpp delete mode 100644 tests/AsyncFileCorrectness.txt diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 5872fdca0b..e20c17bbdc 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -560,10 +560,10 @@ void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCom } applyMetadataMutations(spanContext, proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, - confChange, logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo, - &proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit, - proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache, - &proxyCommitData.tag_popped, initialCommit); + confChange, logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo, + &proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit, + proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache, + &proxyCommitData.tag_popped, initialCommit); } void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 965fd8ab0a..e13bec6be4 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -128,7 +128,6 @@ set(FDBSERVER_SRCS workloads/BackupToDBUpgrade.actor.cpp workloads/BulkLoad.actor.cpp workloads/BulkSetup.actor.h - workloads/Basic.actor.cpp workloads/Cache.actor.cpp workloads/ChangeConfig.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index b683646d81..692d7bc727 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -998,7 +998,6 @@ private: *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); } } - }; #endif diff --git a/fdbserver/workloads/Basic.actor.cpp b/fdbserver/workloads/Basic.actor.cpp new file mode 100644 index 0000000000..2c1686144f --- /dev/null +++ b/fdbserver/workloads/Basic.actor.cpp @@ -0,0 +1,50 @@ +#include "fdbserver/IKeyValueStore.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// Basic workload which runs a single transaction in simulation. +struct BasicWorkload : TestWorkload { + BasicWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} + + std::string description() override { + return "Basic"; + } + + Future start(Database const& cx) override { + if (clientId == 0) { + TraceEvent("ABC_start"); + return testKVStore(cx->clone()); + } + + return Void(); + } + + Future check(Database const& cx) override { + return true; + } + + void getMetrics(vector& m) override {} + + ACTOR Future testKVStore(Database cx) { + state Reference tr = + Reference(new ReadYourWritesTransaction(cx)); + tr->set(KeyRef("foo2"), ValueRef("bar")); + wait(tr->commit()); + // auto ver = tr->getCommittedVersion(); + + /* + UID id = deterministicRandom()->randomUniqueID(); + std::string fn = id.toString(); + state IKeyValueStore* store = keyValueStoreMemory(fn, id, 500e6); + + wait(store->init()); + + KeyValueRef kv = KeyValueRef(StringRef("foo"), StringRef("bar")); + store->set(kv); + */ + + return Void(); + } +}; + +WorkloadFactory BasicWorkloadFactory("Basic"); diff --git a/tests/AsyncFileCorrectness.txt b/tests/AsyncFileCorrectness.txt deleted file mode 100644 index 161d746428..0000000000 --- a/tests/AsyncFileCorrectness.txt +++ /dev/null @@ -1,12 +0,0 @@ -testTitle=AsyncFileCorrectnessTest -useDB=false -runSetup=true -clearAfterTest=false - - testName=AsyncFileCorrectness - testDuration=10.0 - unbufferedIO=true - ;fileName=/home/ajb/testfilecorrectness - targetFileSize=327680 - maxOperationSize=8192 - numSimultaneousOperations=10 From 9398025f6ab12375fc0820eada3e603696b25395 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 27 Aug 2020 17:43:22 -0700 Subject: [PATCH 04/13] Remove test --- fdbserver/workloads/Basic.actor.cpp | 50 ----------------------------- 1 file changed, 50 deletions(-) delete mode 100644 fdbserver/workloads/Basic.actor.cpp diff --git a/fdbserver/workloads/Basic.actor.cpp b/fdbserver/workloads/Basic.actor.cpp deleted file mode 100644 index 2c1686144f..0000000000 --- a/fdbserver/workloads/Basic.actor.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include "fdbserver/IKeyValueStore.h" -#include "fdbserver/workloads/workloads.actor.h" -#include "flow/actorcompiler.h" // This must be the last #include. - -// Basic workload which runs a single transaction in simulation. -struct BasicWorkload : TestWorkload { - BasicWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} - - std::string description() override { - return "Basic"; - } - - Future start(Database const& cx) override { - if (clientId == 0) { - TraceEvent("ABC_start"); - return testKVStore(cx->clone()); - } - - return Void(); - } - - Future check(Database const& cx) override { - return true; - } - - void getMetrics(vector& m) override {} - - ACTOR Future testKVStore(Database cx) { - state Reference tr = - Reference(new ReadYourWritesTransaction(cx)); - tr->set(KeyRef("foo2"), ValueRef("bar")); - wait(tr->commit()); - // auto ver = tr->getCommittedVersion(); - - /* - UID id = deterministicRandom()->randomUniqueID(); - std::string fn = id.toString(); - state IKeyValueStore* store = keyValueStoreMemory(fn, id, 500e6); - - wait(store->init()); - - KeyValueRef kv = KeyValueRef(StringRef("foo"), StringRef("bar")); - store->set(kv); - */ - - return Void(); - } -}; - -WorkloadFactory BasicWorkloadFactory("Basic"); From 53b7721d6c0223053a21ad7cd8b70ee00cca76fe Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 28 Aug 2020 12:02:51 -0700 Subject: [PATCH 05/13] Add additional trace information --- fdbclient/FDBTypes.h | 2 -- fdbserver/MasterProxyServer.actor.cpp | 13 +++++++++---- fdbserver/Resolver.actor.cpp | 1 + fdbserver/TLogServer.actor.cpp | 1 + fdbserver/TagPartitionedLogSystem.actor.cpp | 1 + fdbserver/storageserver.actor.cpp | 4 ++-- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 80e8a8bb2c..7e16dcd75f 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -129,11 +129,9 @@ enum { txsTagOld = -1, invalidTagOld = -100 }; struct TagsAndMessage { StringRef message; - // SpanID spanContext; VectorRef tags; TagsAndMessage() {} - // TagsAndMessage(SpanID spanContext) : spanContext(spanContext) {} TagsAndMessage(StringRef message, VectorRef tags) : message(message), tags(tags) {} // Loads tags and message from a serialized buffer. "rd" is checkpointed at diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7a2ffea45d..7648289f27 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -539,6 +539,7 @@ ACTOR Future preresolutionProcessing(CommitBatchContext* self) { state const int64_t localBatchNumber = self->localBatchNumber; state const int latencyBucket = self->latencyBucket; state const Optional& debugID = self->debugID; + state Span span("MP:preresolutionProcessing"_loc, self->span.context); // Pre-resolution the commits TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); @@ -556,7 +557,7 @@ ACTOR Future preresolutionProcessing(CommitBatchContext* self) { ); } - GetCommitVersionRequest req(self->span.context, pProxyCommitData->commitVersionRequestNumber++, + GetCommitVersionRequest req(span.context, pProxyCommitData->commitVersionRequestNumber++, pProxyCommitData->mostRecentProcessedRequestNumber, pProxyCommitData->dbgid); GetCommitVersionReply versionReply = wait(brokenPromiseToNever( pProxyCommitData->master.getCommitVersion.getReply( @@ -595,13 +596,14 @@ ACTOR Future getResolution(CommitBatchContext* self) { // resolution processing but is still using CPU ProxyCommitData* pProxyCommitData = self->pProxyCommitData; std::vector& trs = self->trs; + state Span span("MP:getResolution"_loc, self->span.context); ResolutionRequestBuilder requests( pProxyCommitData, self->commitVersion, self->prevVersion, pProxyCommitData->version, - self->span + span ); int conflictRangeCount = 0; self->maxTransactionBytes = 0; @@ -969,6 +971,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { state std::vector& trs = self->trs; state const int64_t localBatchNumber = self->localBatchNumber; state const Optional& debugID = self->debugID; + state Span span("MP:postResolution"_loc, self->span.context); TEST(pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1)); @@ -1021,7 +1024,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { // This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation TEST(true); // Semi-committed pipeline limited by MVCC window //TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion); - waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {self->span.context}); + waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {span.context}); choose{ when(wait(pProxyCommitData->committedVersion.whenAtLeast(self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { wait(yield()); @@ -1071,7 +1074,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { self->commitStartTime = now(); pProxyCommitData->lastStartCommit = self->commitStartTime; - self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->span.context, self->debugID ); + self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, span.context, self->debugID ); if (!self->forceRecovery) { ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber-1); @@ -1093,6 +1096,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { ACTOR Future transactionLogging(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; + state Span span("MP:transactionLogging"_loc, self->span.context); try { choose { @@ -1128,6 +1132,7 @@ ACTOR Future transactionLogging(CommitBatchContext* self) { ACTOR Future reply(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; + state Span span("MP:reply"_loc, self->span.context); const Optional& debugID = self->debugID; diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 8a2cac8171..05046b0766 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -104,6 +104,7 @@ ACTOR Future resolveBatch( ResolveTransactionBatchRequest req) { state Optional debugID; + state Span span("R:resolveBatch"_loc, req.spanContext); // The first request (prevVersion < 0) comes from the master state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress(); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index ab015b1458..beaa945270 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1851,6 +1851,7 @@ ACTOR Future tLogCommit( TLogCommitRequest req, Reference logData, PromiseStream warningCollectorInput ) { + state Span span("TLog:tLogCommit"_loc, req.spanContext); state Optional tlogDebugID; if(req.debugID.present()) { diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 09e90d6eb1..6687a5d26f 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -533,6 +533,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> quorumResults; vector> allReplies; int location = 0; + Span span("TPLS:push"_loc, spanContext); for(auto& it : tLogs) { if(it->isLocal && it->logServers.size()) { if(it->connectionResetTrackers.size() == 0) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3d8822e1ae..af6fd39796 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2777,6 +2777,7 @@ private: ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) { state double start; + state Span span("SS:update"_loc); try { // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory // This is often referred to as the storage server e-brake (emergency brake) @@ -2851,7 +2852,6 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) else if (SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; - // TODO: Set span context state here } else { MutationRef msg; @@ -2949,7 +2949,7 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) else if (SpanContextMessage::isNextIn(rd)) { SpanContextMessage scm; rd >> scm; - // TODO: Set span context state here + span.addParent(scm.spanContext); } else { MutationRef msg; From 00d3aa3acc2d7cfbfc2a7aa1e145ec2d6c5ab8eb Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 28 Aug 2020 15:16:54 -0700 Subject: [PATCH 06/13] Update formatting --- fdbserver/MasterProxyServer.actor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7648289f27..cf4df617b6 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -295,7 +295,6 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapLOG_RANGE_BLOCK_SIZE; state int yieldBytes = 0; state BinaryWriter valueWriter(Unversioned()); - toCommit->addTransactionInfo(SpanID()); // Serialize the log range mutations within the map From b96dbc45cbb26de0c5582daf22af27289a5637a9 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Mon, 31 Aug 2020 10:39:07 -0700 Subject: [PATCH 07/13] Update formatting --- fdbserver/MasterProxyServer.actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index cf4df617b6..7648289f27 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -295,6 +295,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapLOG_RANGE_BLOCK_SIZE; state int yieldBytes = 0; state BinaryWriter valueWriter(Unversioned()); + toCommit->addTransactionInfo(SpanID()); // Serialize the log range mutations within the map From 7dc55fdffd7a403242f1f38ab5d3c6b7d73cf153 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Mon, 31 Aug 2020 14:46:41 -0700 Subject: [PATCH 08/13] Revert state --- fdbserver/TLogServer.actor.cpp | 1 - tests/AsyncFileCorrectness.txt | 12 ++++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 tests/AsyncFileCorrectness.txt diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index beaa945270..912c8fe0bb 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1394,7 +1394,6 @@ void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req ACTOR Future> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) { // See the comment in LogSystem.cpp for the binary format of commitBlob. state std::vector relevantMessages; - // TODO: Change to passed in protocol version state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion)); while (!rd.empty()) { TagsAndMessage tagsAndMessage; diff --git a/tests/AsyncFileCorrectness.txt b/tests/AsyncFileCorrectness.txt new file mode 100644 index 0000000000..161d746428 --- /dev/null +++ b/tests/AsyncFileCorrectness.txt @@ -0,0 +1,12 @@ +testTitle=AsyncFileCorrectnessTest +useDB=false +runSetup=true +clearAfterTest=false + + testName=AsyncFileCorrectness + testDuration=10.0 + unbufferedIO=true + ;fileName=/home/ajb/testfilecorrectness + targetFileSize=327680 + maxOperationSize=8192 + numSimultaneousOperations=10 From 783e6a170e9fc3d395843c71d23998c92bf67077 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 4 Sep 2020 17:36:56 -0700 Subject: [PATCH 09/13] Add code coverage --- fdbserver/LogSystem.h | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 692d7bc727..3e21c85619 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -874,6 +874,7 @@ struct LogPushData : NonCopyable { // Add transaction info to be written before the first mutation in the transaction. void addTransactionInfo(SpanID const& context) { + TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID spanContext = context; transactionSubseq = 0; writtenLocations.clear(); From efde86340a55d3498ee24cf8504cc30e341dde52 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 4 Sep 2020 17:37:34 -0700 Subject: [PATCH 10/13] Add knob to disable span serialization --- fdbserver/LogSystem.h | 4 ++++ flow/Knobs.cpp | 2 ++ flow/Knobs.h | 2 ++ 3 files changed, 8 insertions(+) diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 3e21c85619..22418224ab 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -30,6 +30,7 @@ #include "fdbclient/DatabaseConfiguration.h" #include "fdbserver/MutationTracking.h" #include "flow/IndexedSet.h" +#include "flow/Knobs.h" #include "fdbrpc/ReplicationPolicy.h" #include "fdbrpc/Locality.h" #include "fdbrpc/Replication.h" @@ -984,6 +985,9 @@ private: // Writes transaction info to the message stream for the given location if // it has not already been written (for the current transaction). void writeTransactionInfo(int location) { + if (!FLOW_KNOBS->WRITE_TRACING_ENABLED) { + return; + } if (writtenLocations.count(location) == 0) { writtenLocations.insert(location); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 5019923e8c..9c06722744 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -61,6 +61,8 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) { init( HUGE_ARENA_LOGGING_BYTES, 100e6 ); init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 ); + init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false; + //connectionMonitor init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0; init( CONNECTION_MONITOR_TIMEOUT, isSimulated ? 1.50 : 2.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_TIMEOUT = 6.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index bb719c2686..7f97df82fa 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -69,6 +69,8 @@ public: double HUGE_ARENA_LOGGING_BYTES; double HUGE_ARENA_LOGGING_INTERVAL; + bool WRITE_TRACING_ENABLED; + //run loop profiling double RUN_LOOP_PROFILING_INTERVAL; double SLOWTASK_PROFILING_LOG_INTERVAL; From e94c372815dbbdf43e0f63acb2cde6876ce627d3 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Tue, 6 Oct 2020 18:33:29 -0700 Subject: [PATCH 11/13] Fix serialization order --- fdbserver/BackupWorker.actor.cpp | 2 +- fdbserver/TLogInterface.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 6625ca6827..c1e2734e0f 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -61,7 +61,7 @@ struct VersionedMessage { ArenaReader reader(arena, message, AssumeVersion(currentProtocolVersion)); - // Return false for metadata messages LogProtocolMessage and SpanContextMessage. + // Return false for LogProtocolMessage and SpanContextMessage metadata messages. if (LogProtocolMessage::isNextIn(reader)) return false; if (SpanContextMessage::isNextIn(reader)) return false; diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 6fff6fb1f5..1626c02540 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -254,7 +254,7 @@ struct TLogCommitRequest { : spanContext(context), arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, spanContext, debugID); + serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID, spanContext); } }; From 2cdbf29a47e0382098b951288ce01829e666529f Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Wed, 14 Oct 2020 15:34:13 -0700 Subject: [PATCH 12/13] Fix formatting --- fdbserver/CommitProxyServer.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 39e1201386..7be9437a1d 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -593,7 +593,7 @@ ACTOR Future getResolution(CommitBatchContext* self) { self->commitVersion, self->prevVersion, pProxyCommitData->version, - span + span ); int conflictRangeCount = 0; self->maxTransactionBytes = 0; From e47e0108a7fe9ced445e1bd669697459638689f9 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 4 Sep 2020 16:57:36 -0700 Subject: [PATCH 13/13] Add new TLogVersion --- fdbclient/FDBTypes.h | 7 +++++-- fdbserver/BackupWorker.actor.cpp | 2 +- fdbserver/StorageCache.actor.cpp | 4 ++-- fdbserver/storageserver.actor.cpp | 4 ++-- fdbserver/worker.actor.cpp | 2 ++ fdbserver/workloads/ConfigureDatabase.actor.cpp | 2 +- flow/ProtocolVersion.h | 1 + 7 files changed, 14 insertions(+), 8 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index d876a0f489..faa824b434 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -754,14 +754,16 @@ struct TLogVersion { // V3 was the introduction of spill by reference; // V4 changed how data gets written to satellite TLogs so that we can peek from them; // V5 merged reference and value spilling + // V6 added span context to list of serialized mutations sent from proxy to tlogs // V1 = 1, // 4.6 is dispatched to via 6.0 V2 = 2, // 6.0 V3 = 3, // 6.1 V4 = 4, // 6.2 V5 = 5, // 6.3 + V6 = 6, // 7.0 MIN_SUPPORTED = V2, - MAX_SUPPORTED = V5, - MIN_RECRUITABLE = V4, + MAX_SUPPORTED = V6, + MIN_RECRUITABLE = V5, DEFAULT = V5, } version; @@ -784,6 +786,7 @@ struct TLogVersion { if (s == LiteralStringRef("3")) return V3; if (s == LiteralStringRef("4")) return V4; if (s == LiteralStringRef("5")) return V5; + if (s == LiteralStringRef("6")) return V6; return default_error_or(); } }; diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index f0af374128..f8313647e7 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -63,7 +63,7 @@ struct VersionedMessage { // Return false for LogProtocolMessage and SpanContextMessage metadata messages. if (LogProtocolMessage::isNextIn(reader)) return false; - if (SpanContextMessage::isNextIn(reader)) return false; + if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) return false; reader >> *m; return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey; diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 7c7a7ea261..ba9a677a53 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1763,7 +1763,7 @@ ACTOR Future pullAsyncData( StorageCacheData *data ) { dbgLastMessageWasProtocol = true; cloneCursor1->setProtocolVersion(cloneReader.protocolVersion()); } - else if (SpanContextMessage::isNextIn(cloneReader)) { + else if (cloneReader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; } @@ -1839,7 +1839,7 @@ ACTOR Future pullAsyncData( StorageCacheData *data ) { data->logProtocol = reader.protocolVersion(); cloneCursor2->setProtocolVersion(data->logProtocol); } - else if (SpanContextMessage::isNextIn(reader)) { + else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) { SpanContextMessage scm; reader >> scm; } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f64b5718de..9849528b44 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2854,7 +2854,7 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) dbgLastMessageWasProtocol = true; cloneCursor1->setProtocolVersion(cloneReader.protocolVersion()); } - else if (SpanContextMessage::isNextIn(cloneReader)) { + else if (cloneReader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; } @@ -2951,7 +2951,7 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) data->storage.changeLogProtocol(ver, data->logProtocol); cloneCursor2->setProtocolVersion(rd.protocolVersion()); } - else if (SpanContextMessage::isNextIn(rd)) { + else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) { SpanContextMessage scm; rd >> scm; span.addParent(scm.spanContext); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index f9104b26e6..d381e6b050 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -353,6 +353,7 @@ struct TLogOptions { "_LS_" + boost::lexical_cast(spillType); break; case TLogVersion::V5: + case TLogVersion::V6: toReturn = "V_" + boost::lexical_cast(version); break; } @@ -374,6 +375,7 @@ TLogFn tLogFnForOptions( TLogOptions options ) { else return oldTLog_6_2::tLog; case TLogVersion::V5: + case TLogVersion::V6: return tLog; default: ASSERT(false); diff --git a/fdbserver/workloads/ConfigureDatabase.actor.cpp b/fdbserver/workloads/ConfigureDatabase.actor.cpp index 70c7c01fae..34231fc54f 100644 --- a/fdbserver/workloads/ConfigureDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigureDatabase.actor.cpp @@ -31,7 +31,7 @@ static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory", "memory-1 static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2", - "log_version:=2", "log_version:=3", "log_version:=4", "log_version:=5" + "log_version:=2", "log_version:=3", "log_version:=4", "log_version:=5", "log_version:=6" }; static const char* redundancies[] = { "single", "double", "triple" }; static const char* backupTypes[] = { "backup_worker_enabled:=0", "backup_worker_enabled:=1" }; diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index 39aa05a36b..c8e77da7d4 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -129,6 +129,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason); + PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext); }; // These impact both communications and the deserialization of certain database and IKeyValueStore keys.