diff --git a/contrib/TestHarness/CMakeLists.txt b/contrib/TestHarness/CMakeLists.txt index 327710e745..265ba847d2 100644 --- a/contrib/TestHarness/CMakeLists.txt +++ b/contrib/TestHarness/CMakeLists.txt @@ -3,7 +3,7 @@ set(SRCS Properties/AssemblyInfo.cs) set(TEST_HARNESS_REFERENCES - "-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,System.Runtime.Serialization,${TraceLogHelperDll}") + "-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,${TraceLogHelperDll}") set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe) diff --git a/contrib/TestHarness/Program.cs b/contrib/TestHarness/Program.cs index d02836cb4e..93b14d176c 100644 --- a/contrib/TestHarness/Program.cs +++ b/contrib/TestHarness/Program.cs @@ -29,7 +29,6 @@ using System.Diagnostics; using System.ComponentModel; using System.Runtime.InteropServices; using System.Xml; -using System.Runtime.Serialization.Json; namespace SummarizeTest { @@ -360,22 +359,20 @@ namespace SummarizeTest { ErrorOutputListener errorListener = new ErrorOutputListener(); process.StartInfo.UseShellExecute = false; - string tlsPluginArg = ""; if (tlsPluginFile.Length > 0) { process.StartInfo.EnvironmentVariables["FDB_TLS_PLUGIN"] = tlsPluginFile; - tlsPluginArg = "--tls_plugin=" + tlsPluginFile; } process.StartInfo.RedirectStandardOutput = true; var args = ""; if (willRestart && oldBinaryName.EndsWith("alpha6")) { - args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash", + IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile); } else { - args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash", + IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile); } if (restarting) args = args + " --restarting"; if (useValgrind && !willRestart) @@ -480,7 +477,7 @@ namespace SummarizeTest memCheckThread.Join(); consoleThread.Join(); - var traceFiles = Directory.GetFiles(tempPath, "trace*.*", SearchOption.TopDirectoryOnly).Where(s => s.EndsWith(".xml") || s.EndsWith(".json")).ToArray(); + var traceFiles = Directory.GetFiles(tempPath, "trace*.xml"); if (traceFiles.Length == 0) { if (!traceToStdout) @@ -661,10 +658,6 @@ namespace SummarizeTest return whats.ToArray(); } - delegate IEnumerable parseDelegate(System.IO.Stream stream, string file, - bool keepOriginalElement = false, double startTime = -1, double endTime = Double.MaxValue, - double samplingFactor = 1.0); - static int Summarize(string[] traceFiles, string summaryFileName, string errorFileName, bool? killed, List outputErrors, int? exitCode, long? peakMemory, string uid, string valgrindOutputFileName, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError, @@ -696,12 +689,7 @@ namespace SummarizeTest { try { - parseDelegate parse; - if (traceFileName.EndsWith(".json")) - parse = Magnesium.JsonParser.Parse; - else - parse = Magnesium.XmlParser.Parse; - foreach (var ev in parse(traceFile, traceFileName)) + foreach (var ev in Magnesium.XmlParser.Parse(traceFile, traceFileName)) { Magnesium.Severity newSeverity; if (severityMap.TryGetValue(new KeyValuePair(ev.Type, ev.Severity), out newSeverity)) @@ -1101,19 +1089,10 @@ namespace SummarizeTest private static void AppendToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, bool shouldLock = true) { - bool useXml = true; - if (summaryFileName.EndsWith(".json")) - useXml = false; - if (traceToStdout) { - if (useXml) { - using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) })) - xout.WriteTo(wr); - } else { - using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(Console.OpenStandardOutput())) - xout.WriteTo(wr); - } + using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) })) + xout.WriteTo(wr); Console.WriteLine(); return; } @@ -1124,6 +1103,7 @@ namespace SummarizeTest takeLock(summaryFileName); try { + using (var f = System.IO.File.Open(summaryFileName, System.IO.FileMode.Append, System.IO.FileAccess.Write)) { if (f.Length == 0) @@ -1131,13 +1111,8 @@ namespace SummarizeTest byte[] bytes = Encoding.UTF8.GetBytes(""); f.Write(bytes, 0, bytes.Length); } - if (useXml) { - using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true })) - xout.Save(wr); - } else { - using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(f)) - xout.WriteTo(wr); - } + using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true })) + xout.Save(wr); var endl = Encoding.UTF8.GetBytes(Environment.NewLine); f.Write(endl, 0, endl.Length); } @@ -1148,7 +1123,6 @@ namespace SummarizeTest releaseLock(summaryFileName); } } - private static void AppendXmlMessageToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, string testFile = null, int? seed = null, bool? buggify = null, bool? determinismCheck = null, string oldBinaryName = null) { diff --git a/contrib/TraceLogHelper/JsonParser.cs b/contrib/TraceLogHelper/JsonParser.cs index 6fafb7fab3..996a1e0e3c 100644 --- a/contrib/TraceLogHelper/JsonParser.cs +++ b/contrib/TraceLogHelper/JsonParser.cs @@ -51,7 +51,7 @@ namespace Magnesium } catch (Exception e) { - throw new Exception(string.Format("Failed to parse JSON {0}", root), e); + throw new Exception(string.Format("Failed to parse {0}", root), e); } if (ev != null) yield return ev; } @@ -80,9 +80,8 @@ namespace Magnesium TraceFile = file, DDetails = xEvent.Elements() .Where(a=>a.Name != "Type" && a.Name != "Time" && a.Name != "Machine" && a.Name != "ID" && a.Name != "Severity" && (!rolledEvent || a.Name != "OriginalTime")) - // When the key contains a colon character, it gets parsed as a:item - .ToDictionary(a=>a.Name.LocalName == "item" ? a.Attribute("item").Value : string.Intern(a.Name.LocalName), a=>(object)a.Value), - original = keepOriginalElement ? xEvent : null + .ToDictionary(a=>string.Intern(a.Name.LocalName), a=>(object)a.Value), + original = keepOriginalElement ? xEvent : null, }; } diff --git a/contrib/TraceLogHelper/XmlParser.cs b/contrib/TraceLogHelper/XmlParser.cs index 3728c58c3b..17b2405060 100644 --- a/contrib/TraceLogHelper/XmlParser.cs +++ b/contrib/TraceLogHelper/XmlParser.cs @@ -53,7 +53,7 @@ namespace Magnesium } catch (Exception e) { - throw new Exception(string.Format("Failed to parse XML {0}", xev), e); + throw new Exception(string.Format("Failed to parse {0}", xev), e); } if (ev != null) yield return ev; } diff --git a/documentation/FDB.pdf b/documentation/FDB.pdf new file mode 100644 index 0000000000..9aded96b85 Binary files /dev/null and b/documentation/FDB.pdf differ diff --git a/documentation/FDB.svg b/documentation/FDB.svg new file mode 100644 index 0000000000..79a64c4b16 --- /dev/null +++ b/documentation/FDB.svg @@ -0,0 +1,42 @@ + + + + Artboard + Created with Sketch. + + + + + + + + + \ No newline at end of file diff --git a/documentation/FDB_logo.svg b/documentation/FDB_logo.svg new file mode 100644 index 0000000000..6491fb91d0 --- /dev/null +++ b/documentation/FDB_logo.svg @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/documentation/sphinx/source/release-notes/release-notes-630.rst b/documentation/sphinx/source/release-notes/release-notes-630.rst index 2e2c86dda4..c1888710c5 100644 --- a/documentation/sphinx/source/release-notes/release-notes-630.rst +++ b/documentation/sphinx/source/release-notes/release-notes-630.rst @@ -16,6 +16,7 @@ Features * Added a new API in all bindings that can be used to query the estimated byte size of a given range. `(PR #2537) `_ * Added the ``lock`` and ``unlock`` commands to ``fdbcli`` which lock or unlock a cluster. `(PR #2890) `_ * Add a framework which helps to add client functions using special keys (keys within ``[\xff\xff, \xff\xff\xff)``). `(PR #2662) `_ +* Added capability of aborting replication to a clone of DR site without affecting replication to the original dr site with ``--dstonly`` option of ``fdbdr abort``. `(PR 3457) `_ Performance ----------- diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index c2b15afed4..b89c0a3dd8 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -122,6 +122,7 @@ enum { OPT_SOURCE_CLUSTER, OPT_DEST_CLUSTER, OPT_CLEANUP, + OPT_DSTONLY, OPT_TRACE_FORMAT, }; @@ -767,6 +768,7 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = { { OPT_DEST_CLUSTER, "-d", SO_REQ_SEP }, { OPT_DEST_CLUSTER, "--destination", SO_REQ_SEP }, { OPT_CLEANUP, "--cleanup", SO_NONE }, + { OPT_DSTONLY, "--dstonly", SO_NONE }, { OPT_TAGNAME, "-t", SO_REQ_SEP }, { OPT_TAGNAME, "--tagname", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, @@ -1147,6 +1149,7 @@ static void printDBBackupUsage(bool devhelp) { printf(" -k KEYS List of key ranges to backup.\n" " If not specified, the entire database will be backed up.\n"); printf(" --cleanup Abort will attempt to stop mutation logging on the source cluster.\n"); + printf(" --dstonly Abort will not make any changes on the source cluster.\n"); #ifndef TLS_DISABLED printf(TLS_HELP); #endif @@ -1912,12 +1915,12 @@ ACTOR Future statusBackup(Database db, std::string tagName, bool showError return Void(); } -ACTOR Future abortDBBackup(Database src, Database dest, std::string tagName, bool partial) { +ACTOR Future abortDBBackup(Database src, Database dest, std::string tagName, bool partial, bool dstOnly) { try { state DatabaseBackupAgent backupAgent(src); - wait(backupAgent.abortBackup(dest, Key(tagName), partial)); + wait(backupAgent.abortBackup(dest, Key(tagName), partial, false, dstOnly)); wait(backupAgent.unlockBackup(dest, Key(tagName))); printf("The DR on tag `%s' was successfully aborted.\n", printable(StringRef(tagName)).c_str()); @@ -2950,6 +2953,7 @@ int main(int argc, char* argv[]) { uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE; ESOError lastError; bool partial = true; + bool dstOnly = false; LocalityData localities; uint64_t memLimit = 8LL << 30; Optional ti; @@ -3130,6 +3134,9 @@ int main(int argc, char* argv[]) { case OPT_CLEANUP: partial = false; break; + case OPT_DSTONLY: + dstOnly = true; + break; case OPT_KNOB: { std::string syn = args->OptionSyntax(); if (!StringRef(syn).startsWith(LiteralStringRef("--knob_"))) { @@ -3798,7 +3805,7 @@ int main(int argc, char* argv[]) { f = stopAfter( switchDBBackup(sourceDb, db, backupKeys, tagName, forceAction) ); break; case DB_ABORT: - f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial) ); + f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial, dstOnly) ); break; case DB_PAUSE: f = stopAfter( changeDBBackupResumed(sourceDb, db, true) ); diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 6cb0e447ea..457afc1d0b 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -419,7 +419,7 @@ public: return runRYWTransaction(cx, [=](Reference tr){ return discontinueBackup(tr, tagName); }); } - Future abortBackup(Database cx, Key tagName, bool partial = false, bool abortOldBackup = false); + Future abortBackup(Database cx, Key tagName, bool partial = false, bool abortOldBackup = false, bool dstOnly = false); Future getStatus(Database cx, int errorLimit, Key tagName); diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 774e3cd017..1162986e10 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -2173,7 +2173,7 @@ public: return Void(); } - ACTOR static Future abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial, bool abortOldBackup) { + ACTOR static Future abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial, bool abortOldBackup, bool dstOnly) { state Reference tr(new ReadYourWritesTransaction(cx)); state Key logUidValue, destUidValue; state UID logUid, destUid; @@ -2265,67 +2265,68 @@ public: } } - state Future partialTimeout = partial ? delay(30.0) : Never(); + if (! dstOnly) { + state Future partialTimeout = partial ? delay(30.0) : Never(); + state Reference srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src)); + state Version beginVersion; + state Version endVersion; - state Reference srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src)); - state Version beginVersion; - state Version endVersion; + loop { + try { + srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + srcTr->setOption(FDBTransactionOptions::LOCK_AWARE); + state Future> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) ); + wait(success(backupVersionF) || partialTimeout); + if(partialTimeout.isReady()) { + return Void(); + } - loop { - try { - srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - srcTr->setOption(FDBTransactionOptions::LOCK_AWARE); - state Future> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) ); - wait(success(backupVersionF) || partialTimeout); - if(partialTimeout.isReady()) { - return Void(); - } + if(backupVersionF.get().present() && BinaryReader::fromStringRef(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef(backupUid, Unversioned())) { + break; + } - if(backupVersionF.get().present() && BinaryReader::fromStringRef(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef(backupUid, Unversioned())) { - break; - } + if (abortOldBackup) { + srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) )); + srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid ); + srcTr->clear(prefixRange(logUidValue.withPrefix(backupLogKeys.begin))); + srcTr->clear(prefixRange(logUidValue.withPrefix(logRangesRange.begin))); + break; + } - if (abortOldBackup) { - srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) )); + Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix)); + + state Future> bVersionF = srcTr->get(latestVersionKey); + wait(success(bVersionF) || partialTimeout); + if(partialTimeout.isReady()) { + return Void(); + } + + if (bVersionF.get().present()) { + beginVersion = BinaryReader::fromStringRef(bVersionF.get().get(), Unversioned()); + } else { + break; + } + + srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) )); srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid ); - srcTr->clear(prefixRange(logUidValue.withPrefix(backupLogKeys.begin))); - srcTr->clear(prefixRange(logUidValue.withPrefix(logRangesRange.begin))); + + wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout ); + if(partialTimeout.isReady()) { + return Void(); + } + + wait(srcTr->commit() || partialTimeout); + if(partialTimeout.isReady()) { + return Void(); + } + + endVersion = srcTr->getCommittedVersion() + 1; + break; } - - Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix)); - - state Future> bVersionF = srcTr->get(latestVersionKey); - wait(success(bVersionF) || partialTimeout); - if(partialTimeout.isReady()) { - return Void(); + catch (Error &e) { + wait(srcTr->onError(e)); } - - if (bVersionF.get().present()) { - beginVersion = BinaryReader::fromStringRef(bVersionF.get().get(), Unversioned()); - } else { - break; - } - - srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) )); - srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid ); - - wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout ); - if(partialTimeout.isReady()) { - return Void(); - } - - wait(srcTr->commit() || partialTimeout); - if(partialTimeout.isReady()) { - return Void(); - } - - endVersion = srcTr->getCommittedVersion() + 1; - - break; - } - catch (Error &e) { - wait(srcTr->onError(e)); } } @@ -2527,8 +2528,8 @@ Future DatabaseBackupAgent::discontinueBackup(Reference DatabaseBackupAgent::abortBackup(Database cx, Key tagName, bool partial, bool abortOldBackup){ - return DatabaseBackupAgentImpl::abortBackup(this, cx, tagName, partial, abortOldBackup); +Future DatabaseBackupAgent::abortBackup(Database cx, Key tagName, bool partial, bool abortOldBackup, bool dstOnly){ + return DatabaseBackupAgentImpl::abortBackup(this, cx, tagName, partial, abortOldBackup, dstOnly); } Future DatabaseBackupAgent::getStatus(Database cx, int errorLimit, Key tagName) { diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 433f0967c2..e5e3cac40f 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -223,11 +223,13 @@ public: bool enableLocalityLoadBalance; struct VersionRequest { + SpanID spanContext; Promise reply; TagSet tags; Optional debugID; - VersionRequest(TagSet tags = TagSet(), Optional debugID = Optional()) : tags(tags), debugID(debugID) {} + VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) + : spanContext(spanContext), tags(tags), debugID(debugID) {} }; // Transaction start request batching diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 04f9c9848a..4744d523f8 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -36,6 +36,7 @@ typedef uint64_t Sequence; typedef StringRef KeyRef; typedef StringRef ValueRef; typedef int64_t Generation; +typedef UID SpanID; enum { tagLocalitySpecial = -1, diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 3a7ffc17c6..4e62efc11d 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -1437,8 +1437,8 @@ namespace fileBackup { // thousands of iterations. // Declare some common iterators which must be state vars and will be used multiple times. state int i; - state RangeMap::Iterator iShard; - state RangeMap::Iterator iShardEnd; + state RangeMap::iterator iShard; + state RangeMap::iterator iShardEnd; // Set anything inside a dispatched range to DONE. // Also ensure that the boundary value are true, false, [true, false]... diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 71069b305e..22646fd436 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -153,6 +153,7 @@ struct CommitTransactionRequest : TimedRequest { bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } Arena arena; + SpanID spanContext; CommitTransactionRef transaction; ReplyPromise reply; uint32_t flags; @@ -164,7 +165,7 @@ struct CommitTransactionRequest : TimedRequest { template void serialize(Ar& ar) { - serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet); + serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet, spanContext); } }; @@ -211,6 +212,7 @@ struct GetReadVersionRequest : TimedRequest { FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE, }; + SpanID spanContext; uint32_t transactionCount; uint32_t flags; TransactionPriority priority; @@ -221,9 +223,11 @@ struct GetReadVersionRequest : TimedRequest { ReplyPromise reply; GetReadVersionRequest() : transactionCount(1), flags(0) {} - GetReadVersionRequest(uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap tags = TransactionTagMap(), Optional debugID = Optional()) - : transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), debugID(debugID) - { + GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority, + uint32_t flags = 0, TransactionTagMap tags = TransactionTagMap(), + Optional debugID = Optional()) + : spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), + debugID(debugID) { flags = flags & ~FLAG_PRIORITY_MASK; switch(priority) { case TransactionPriority::BATCH: @@ -239,12 +243,12 @@ struct GetReadVersionRequest : TimedRequest { ASSERT(false); } } - + bool operator < (GetReadVersionRequest const& rhs) const { return priority < rhs.priority; } template void serialize(Ar& ar) { - serializer(ar, transactionCount, flags, tags, debugID, reply); + serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext); if(ar.isDeserializing) { if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) { @@ -277,6 +281,7 @@ struct GetKeyServerLocationsReply { struct GetKeyServerLocationsRequest { constexpr static FileIdentifier file_identifier = 9144680; Arena arena; + SpanID spanContext; KeyRef begin; Optional end; int limit; @@ -284,24 +289,28 @@ struct GetKeyServerLocationsRequest { ReplyPromise reply; GetKeyServerLocationsRequest() : limit(0), reverse(false) {} - GetKeyServerLocationsRequest( KeyRef const& begin, Optional const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {} - - template + GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional const& end, int limit, + bool reverse, Arena const& arena) + : spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {} + + template void serialize(Ar& ar) { - serializer(ar, begin, end, limit, reverse, reply, arena); + serializer(ar, begin, end, limit, reverse, reply, spanContext, arena); } }; struct GetRawCommittedVersionRequest { constexpr static FileIdentifier file_identifier = 12954034; + SpanID spanContext; Optional debugID; ReplyPromise reply; - explicit GetRawCommittedVersionRequest(Optional const& debugID = Optional()) : debugID(debugID) {} + explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional const& debugID = Optional()) : spanContext(spanContext), debugID(debugID) {} + explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {} template void serialize( Ar& ar ) { - serializer(ar, debugID, reply); + serializer(ar, debugID, reply, spanContext); } }; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 26cfe0931c..1a13c709d8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -47,6 +47,7 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" +#include "fdbclient/versions.h" #include "fdbrpc/LoadBalance.h" #include "fdbrpc/Net2FileSystem.h" #include "fdbrpc/simulator.h" @@ -54,18 +55,17 @@ #include "flow/ActorCollection.h" #include "flow/DeterministicRandom.h" #include "flow/Error.h" +#include "flow/IRandom.h" #include "flow/flow.h" #include "flow/genericactors.actor.h" #include "flow/Knobs.h" #include "flow/Platform.h" #include "flow/SystemMonitor.h" #include "flow/TLSConfig.actor.h" -#include "flow/Trace.h" +#include "flow/Tracing.h" #include "flow/UnitTest.h" #include "flow/serialize.h" -#include "fdbclient/versions.h" - #ifdef WIN32 #define WIN32_LEAN_AND_MEAN #include @@ -1628,7 +1628,10 @@ ACTOR Future>> transactionalGetServerInt } //If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key -ACTOR Future< pair> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) { +ACTOR Future>> getKeyLocation_internal(Database cx, Key key, + TransactionInfo info, + bool isBackward = false) { + state Span span("NAPI:getKeyLocation"_loc, info.spanID); if (isBackward) { ASSERT( key != allKeys.begin && key <= allKeys.end ); } else { @@ -1642,7 +1645,10 @@ ACTOR Future< pair> > getKeyLocation_internal( ++cx->transactionKeyServerLocationRequests; choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { + when(GetKeyServerLocationsReply rep = wait(basicLoadBalance( + cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, + GetKeyServerLocationsRequest(span.context, key, Optional(), 100, isBackward, key.arena()), + TaskPriority::DefaultPromiseEndpoint))) { ++cx->transactionKeyServerLocationRequestsCompleted; if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); @@ -1677,7 +1683,10 @@ Future>> getKeyLocation(Database const& c return ssi; } -ACTOR Future< vector< pair> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) { +ACTOR Future>>> getKeyRangeLocations_internal(Database cx, KeyRange keys, + int limit, bool reverse, + TransactionInfo info) { + state Span span("NAPI:getKeyRangeLocations"_loc, info.spanID); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before"); @@ -1685,7 +1694,10 @@ ACTOR Future< vector< pair> > > getKeyRangeLoca ++cx->transactionKeyServerLocationRequests; choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { + when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance( + cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, + GetKeyServerLocationsRequest(span.context, keys.begin, keys.end, limit, reverse, keys.arena()), + TaskPriority::DefaultPromiseEndpoint))) { ++cx->transactionKeyServerLocationRequestsCompleted; state GetKeyServerLocationsReply rep = _rep; if( info.debugID.present() ) @@ -1776,6 +1788,7 @@ Future Transaction::warmRange(Database cx, KeyRange keys) { ACTOR Future> getValue( Future version, Key key, Database cx, TransactionInfo info, Reference trLogInfo, TagSet tags ) { state Version ver = wait( version ); + state Span span("NAPI:getValue"_loc, info.spanID); cx->validateVersion(ver); loop { @@ -1808,10 +1821,12 @@ ACTOR Future> getValue( Future version, Key key, Databa } choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetValueReply _reply = - wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue, - GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional(), getValueID), TaskPriority::DefaultPromiseEndpoint, false, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + when(GetValueReply _reply = wait( + loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue, + GetValueRequest(span.context, key, ver, + cx->sampleReadTags() ? tags : Optional(), getValueID), + TaskPriority::DefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { reply = _reply; } } @@ -1869,6 +1884,7 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T wait(success(version)); state Optional getKeyID = Optional(); + state Span span("NAPI:getKey"_loc, info.spanID); if( info.debugID.present() ) { getKeyID = nondeterministicRandom()->randomUniqueID(); @@ -1897,9 +1913,11 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } when(GetKeyReply _reply = - wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional(), getKeyID), - TaskPriority::DefaultPromiseEndpoint, false, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, + GetKeyRequest(span.context, k, version.get(), + cx->sampleReadTags() ? tags : Optional(), getKeyID), + TaskPriority::DefaultPromiseEndpoint, false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { reply = _reply; } } @@ -1932,12 +1950,15 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T } } -ACTOR Future waitForCommittedVersion( Database cx, Version version ) { +ACTOR Future waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) { + state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext }); try { loop { choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) { + when(GetReadVersionReply v = wait(basicLoadBalance( + cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, + GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) { cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version); if (v.version >= version) @@ -1953,11 +1974,14 @@ ACTOR Future waitForCommittedVersion( Database cx, Version version ) { } } -ACTOR Future getRawVersion( Database cx ) { +ACTOR Future getRawVersion( Database cx, SpanID spanContext ) { + state Span span("NAPI:getRawVersion"_loc, { spanContext }); loop { choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) { + when(GetReadVersionReply v = + wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, + GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) { return v.version; } } @@ -1971,6 +1995,7 @@ ACTOR Future readVersionBatcher( ACTOR Future watchValue(Future version, Key key, Optional value, Database cx, TransactionInfo info, TagSet tags) { state Version ver = wait( version ); + state Span span("NAPI:watchValue"_loc, info.spanID); cx->validateVersion(ver); ASSERT(ver != latestVersion); @@ -1987,9 +2012,11 @@ ACTOR Future watchValue(Future version, Key key, Optional } state WatchValueReply resp; choose { - when(WatchValueReply r = wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue, - WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional(), watchValueID), - TaskPriority::DefaultPromiseEndpoint))) { + when(WatchValueReply r = wait( + loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue, + WatchValueRequest(span.context, key, value, ver, + cx->sampleReadTags() ? tags : Optional(), watchValueID), + TaskPriority::DefaultPromiseEndpoint))) { resp = r; } when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); } @@ -2000,7 +2027,7 @@ ACTOR Future watchValue(Future version, Key key, Optional //FIXME: wait for known committed version on the storage server before replying, //cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop) - Version v = wait(waitForCommittedVersion(cx, resp.version)); + Version v = wait(waitForCommittedVersion(cx, resp.version, span.context)); //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value); @@ -2053,6 +2080,7 @@ ACTOR Future> getExactRange( Database cx, Version ver KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info, TagSet tags ) { state Standalone output; + state Span span("NAPI:getExactRange"_loc, info.spanID); //printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); loop { @@ -2066,6 +2094,7 @@ ACTOR Future> getExactRange( Database cx, Version ver req.version = version; req.begin = firstGreaterOrEqual( range.begin ); req.end = firstGreaterOrEqual( range.end ); + req.spanContext = span.context; transformRangeLimits(limits, reverse, req); ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); @@ -2310,6 +2339,7 @@ ACTOR Future> getRange( Database cx, Reference output; + state Span span("NAPI:getRange"_loc, info.spanID); try { state Version version = wait( fVersion ); @@ -2362,6 +2392,7 @@ ACTOR Future> getRange( Database cx, ReferencesampleReadTags() ? tags : Optional(); req.debugID = info.debugID; + req.spanContext = span.context; try { if( info.debugID.present() ) { g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before"); @@ -2556,10 +2587,11 @@ void debugAddTags(Transaction *tr) { } -Transaction::Transaction( Database const& cx ) - : cx(cx), info(cx->taskID), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), versionstampPromise(Promise>()), options(cx), numErrors(0), trLogInfo(createTrLogInfoProbabilistically(cx)) -{ - if(DatabaseContext::debugUseTags) { +Transaction::Transaction(Database const& cx) + : cx(cx), info(cx->taskID, deterministicRandom()->randomUniqueID()), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), + committedVersion(invalidVersion), versionstampPromise(Promise>()), options(cx), numErrors(0), + trLogInfo(createTrLogInfoProbabilistically(cx)), span(info.spanID, "Transaction"_loc) { + if (DatabaseContext::debugUseTags) { debugAddTags(this); } } @@ -2699,7 +2731,7 @@ ACTOR Future watch(Reference watch, Database cx, TagSet tags, Trans } Future Transaction::getRawReadVersion() { - return ::getRawVersion(cx); + return ::getRawVersion(cx, info.spanID); } Future< Void > Transaction::watch( Reference watch ) { @@ -3062,6 +3094,8 @@ void Transaction::reset() { void Transaction::fullReset() { reset(); + span = Span(span.location); + info.spanID = span.context; backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; } @@ -3138,7 +3172,7 @@ ACTOR void checkWrites( Database cx, Future committed, Promise outCo tr.setVersion( version ); state int checkedRanges = 0; state KeyRangeMap::Ranges ranges = expectedValues.ranges(); - state KeyRangeMap::Iterator it = ranges.begin(); + state KeyRangeMap::iterator it = ranges.begin(); for(; it != ranges.end(); ++it) { state MutationBlock m = it->value(); if( m.mutated ) { @@ -3178,6 +3212,8 @@ ACTOR void checkWrites( Database cx, Future committed, Promise outCo ACTOR static Future commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) { state Transaction tr(cx); state int retries = 0; + state Span span("NAPI:dummyTransaction"_loc, info.spanID); + tr.span.addParent(span.context); loop { try { TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries); @@ -3224,6 +3260,8 @@ void Transaction::setupWatches() { ACTOR static Future tryCommit( Database cx, Reference trLogInfo, CommitTransactionRequest req, Future readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) { state TraceInterval interval( "TransactionCommit" ); state double startTime = now(); + state Span span("NAPI:tryCommit"_loc, info.spanID); + req.spanContext = span.context; if (info.debugID.present()) TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() ); try { @@ -3677,6 +3715,14 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional(value.get(), Unversioned())); + break; + case FDBTransactionOptions::REPORT_CONFLICTING_KEYS: validateOptionValue(value, false); options.reportConflictingKeys = true; @@ -3687,13 +3733,16 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, TransactionPriority priority, uint32_t flags, TransactionTagMap tags, Optional debugID ) { +ACTOR Future getConsistentReadVersion(SpanID parentSpan, DatabaseContext* cx, uint32_t transactionCount, + TransactionPriority priority, uint32_t flags, + TransactionTagMap tags, Optional debugID) { + state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan); try { ++cx->transactionReadVersionBatches; if( debugID.present() ) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before"); loop { - state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID ); + state GetReadVersionRequest req( span.context, transactionCount, priority, flags, tags, debugID ); choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { @@ -3744,6 +3793,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream replyTimes; state PromiseStream _errorStream; state double batchTime = 0; + state Span span("NAPI:readVersionBatcher"_loc); loop { send_batch = false; choose { @@ -3754,6 +3804,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream readVersionBatcher( DatabaseContext *cx, FutureStream batch = incrementalBroadcastWithError( - getConsistentReadVersion(cx, count, priority, flags, std::move(tags), std::move(debugID)), + getConsistentReadVersion(span.context, cx, count, priority, flags, std::move(tags), std::move(debugID)), std::move(requests), CLIENT_KNOBS->BROADCAST_BATCH_SIZE); + span = Span("NAPI:readVersionBatcher"_loc); tags.clear(); debugID = Optional(); requests.clear(); @@ -3793,7 +3845,11 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream extractReadVersion(DatabaseContext* cx, TransactionPriority priority, Reference trLogInfo, Future f, bool lockAware, double startTime, Promise> metadataVersion, TagSet tags) { +ACTOR Future extractReadVersion(SpanID parentSpan, DatabaseContext* cx, TransactionPriority priority, + Reference trLogInfo, Future f, + bool lockAware, double startTime, Promise> metadataVersion, + TagSet tags) { + // parentSpan here is only used to keep the parent alive until the request completes GetReadVersionReply rep = wait(f); double latency = now() - startTime; cx->GRVLatencies.addSample(latency); @@ -3915,10 +3971,12 @@ Future Transaction::getReadVersion(uint32_t flags) { batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags ); } - auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID); + Span span("NAPI:getReadVersion"_loc, info.spanID); + auto const req = DatabaseContext::VersionRequest(span.context, options.tags, info.debugID); batcher.stream.send(req); startTime = now(); - readVersion = extractReadVersion( cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags); + readVersion = extractReadVersion(span.context, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), + options.lockAware, startTime, metadataVersion, options.tags); } return readVersion; } @@ -4015,9 +4073,10 @@ ACTOR Future doGetStorageMetrics(Database cx, KeyRangeRef keys, } ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys) { - - vector>> locations = wait(getKeyRangeLocations( - cx, keys, std::numeric_limits::max(), false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution))); + state Span span("NAPI:GetStorageMetricsLargeKeyRange"_loc); + vector>> locations = wait( + getKeyRangeLocations(cx, keys, std::numeric_limits::max(), false, &StorageServerInterface::waitMetrics, + TransactionInfo(TaskPriority::DataDistribution, span.context))); state int nLocs = locations.size(); state vector> fx(nLocs); state StorageMetrics total; @@ -4098,12 +4157,13 @@ ACTOR Future< StorageMetrics > extractMetrics( Future>> getReadHotRanges(Database cx, KeyRange keys) { + state Span span("NAPI:GetReadHotRanges"_loc); loop { int64_t shardLimit = 100; // Shard limit here does not really matter since this function is currently only used // to find the read-hot sub ranges within a read-hot shard. vector>> locations = wait(getKeyRangeLocations(cx, keys, shardLimit, false, &StorageServerInterface::getReadHotRanges, - TransactionInfo(TaskPriority::DataDistribution))); + TransactionInfo(TaskPriority::DataDistribution, span.context))); try { // TODO: how to handle this? // This function is called whenever a shard becomes read-hot. But somehow the shard was splitted across more @@ -4151,9 +4211,12 @@ ACTOR Future< std::pair, int> > waitStorageMetrics( int shardLimit, int expectedShardCount ) { + state Span span("NAPI:WaitStorageMetrics"_loc); loop { - vector< pair> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) ); - if(expectedShardCount >= 0 && locations.size() != expectedShardCount) { + vector>> locations = + wait(getKeyRangeLocations(cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, + TransactionInfo(TaskPriority::DataDistribution, span.context))); + if (expectedShardCount >= 0 && locations.size() != expectedShardCount) { return std::make_pair(Optional(), locations.size()); } @@ -4236,8 +4299,11 @@ Future>> Transaction::getReadHotRanges(KeyRang ACTOR Future< Standalone> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated ) { + state Span span("NAPI:SplitStorageMetrics"_loc); loop { - state vector< pair> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) ); + state vector>> locations = wait(getKeyRangeLocations( + cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, + TransactionInfo(TaskPriority::DataDistribution, span.context))); state StorageMetrics used; state Standalone> results; diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 4e75cf05d7..9b564f852a 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -19,6 +19,8 @@ */ #pragma once +#include "flow/IRandom.h" +#include "flow/Tracing.h" #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H) #define FDBCLIENT_NATIVEAPI_ACTOR_G_H #include "fdbclient/NativeAPI.actor.g.h" @@ -152,13 +154,15 @@ class ReadYourWritesTransaction; // workaround cyclic dependency struct TransactionInfo { Optional debugID; TaskPriority taskID; + SpanID spanID; bool useProvisionalProxies; // Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled // prefix/ : '1' - any keys equal or larger than this key are (probably) conflicting keys // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys std::shared_ptr> conflictingKeys; - explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {} + explicit TransactionInfo(TaskPriority taskID, SpanID spanID) + : taskID(taskID), spanID(spanID), useProvisionalProxies(false) {} }; struct TransactionLogInfo : public ReferenceCounted, NonCopyable { @@ -283,7 +287,9 @@ public: void flushTrLogsIfEnabled(); // These are to permit use as state variables in actors: - Transaction() : info( TaskPriority::DefaultEndpoint ) {} + Transaction() + : info(TaskPriority::DefaultEndpoint, deterministicRandom()->randomUniqueID()), + span(info.spanID, "Transaction"_loc) {} void operator=(Transaction&& r) noexcept; void reset(); @@ -309,6 +315,7 @@ public: } static Reference createTrLogInfoProbabilistically(const Database& cx); TransactionOptions options; + Span span; double startTime; Reference trLogInfo; @@ -336,7 +343,7 @@ private: Future committing; }; -ACTOR Future waitForCommittedVersion(Database cx, Version version); +ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext); ACTOR Future>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit); diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index ca4683f037..2ff81af6b5 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -115,7 +115,7 @@ ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite KeyRangeRef boundary, int* actualOffset, Standalone* result, Optional>* cache) { - state RangeMap::Iterator iter = + state RangeMap::iterator iter = ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey()) : sks->getImpls().rangeContaining(ks->getKey()); while ((ks->offset < 1 && iter->begin() > boundary.begin) || (ks->offset > 1 && iter->begin() < boundary.end)) { @@ -164,7 +164,7 @@ ACTOR Future> SpecialKeySpace::getRangeAggregationAct // KeySelector, GetRangeLimits and reverse are all handled here state Standalone result; state Standalone pairs; - state RangeMap::Iterator iter; + state RangeMap::iterator iter; state int actualBeginOffset; state int actualEndOffset; state KeyRangeRef moduleBoundary; diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index cfd8c54ec7..bbf97c7047 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -169,6 +169,7 @@ struct GetValueReply : public LoadBalancedReply { struct GetValueRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 8454530; + SpanID spanContext; Key key; Version version; Optional tags; @@ -176,11 +177,12 @@ struct GetValueRequest : TimedRequest { ReplyPromise reply; GetValueRequest(){} - GetValueRequest(const Key& key, Version ver, Optional tags, Optional debugID) : key(key), version(ver), tags(tags), debugID(debugID) {} - - template + GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional tags, Optional debugID) + : spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {} + + template void serialize( Ar& ar ) { - serializer(ar, key, version, tags, debugID, reply); + serializer(ar, key, version, tags, debugID, reply, spanContext); } }; @@ -200,6 +202,7 @@ struct WatchValueReply { struct WatchValueRequest { constexpr static FileIdentifier file_identifier = 14747733; + SpanID spanContext; Key key; Optional value; Version version; @@ -208,11 +211,13 @@ struct WatchValueRequest { ReplyPromise reply; WatchValueRequest(){} - WatchValueRequest(const Key& key, Optional value, Version ver, Optional tags, Optional debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {} - - template + WatchValueRequest(SpanID spanContext, const Key& key, Optional value, Version ver, Optional tags, + Optional debugID) + : spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {} + + template void serialize( Ar& ar ) { - serializer(ar, key, value, version, tags, debugID, reply); + serializer(ar, key, value, version, tags, debugID, reply, spanContext); } }; @@ -234,6 +239,7 @@ struct GetKeyValuesReply : public LoadBalancedReply { struct GetKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795746; + SpanID spanContext; Arena arena; KeySelectorRef begin, end; Version version; // or latestVersion @@ -246,7 +252,7 @@ struct GetKeyValuesRequest : TimedRequest { GetKeyValuesRequest() : isFetchKeys(false) {} template void serialize( Ar& ar ) { - serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena); + serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena); } }; @@ -266,6 +272,7 @@ struct GetKeyReply : public LoadBalancedReply { struct GetKeyRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 10457870; + SpanID spanContext; Arena arena; KeySelectorRef sel; Version version; // or latestVersion @@ -274,11 +281,13 @@ struct GetKeyRequest : TimedRequest { ReplyPromise reply; GetKeyRequest() {} - GetKeyRequest(KeySelectorRef const& sel, Version version, Optional tags, Optional debugID) : sel(sel), version(version), debugID(debugID) {} + GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional tags, + Optional debugID) + : spanContext(spanContext), sel(sel), version(version), debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, sel, version, tags, debugID, reply, arena); + serializer(ar, sel, version, tags, debugID, reply, spanContext, arena); } }; diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index d7463e5845..86af0f3f5c 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -268,6 +268,8 @@ description is not currently required but encouraged. description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />