merge master

This commit is contained in:
Xiaoxi Wang 2020-07-13 17:22:37 +00:00
commit 6ec2f92a8d
45 changed files with 1358 additions and 481 deletions

View File

@ -3,7 +3,7 @@ set(SRCS
Properties/AssemblyInfo.cs)
set(TEST_HARNESS_REFERENCES
"-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,System.Runtime.Serialization,${TraceLogHelperDll}")
"-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,${TraceLogHelperDll}")
set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe)

View File

@ -29,7 +29,6 @@ using System.Diagnostics;
using System.ComponentModel;
using System.Runtime.InteropServices;
using System.Xml;
using System.Runtime.Serialization.Json;
namespace SummarizeTest
{
@ -360,22 +359,20 @@ namespace SummarizeTest
{
ErrorOutputListener errorListener = new ErrorOutputListener();
process.StartInfo.UseShellExecute = false;
string tlsPluginArg = "";
if (tlsPluginFile.Length > 0) {
process.StartInfo.EnvironmentVariables["FDB_TLS_PLUGIN"] = tlsPluginFile;
tlsPluginArg = "--tls_plugin=" + tlsPluginFile;
}
process.StartInfo.RedirectStandardOutput = true;
var args = "";
if (willRestart && oldBinaryName.EndsWith("alpha6"))
{
args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile);
}
else
{
args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile);
}
if (restarting) args = args + " --restarting";
if (useValgrind && !willRestart)
@ -480,7 +477,7 @@ namespace SummarizeTest
memCheckThread.Join();
consoleThread.Join();
var traceFiles = Directory.GetFiles(tempPath, "trace*.*", SearchOption.TopDirectoryOnly).Where(s => s.EndsWith(".xml") || s.EndsWith(".json")).ToArray();
var traceFiles = Directory.GetFiles(tempPath, "trace*.xml");
if (traceFiles.Length == 0)
{
if (!traceToStdout)
@ -661,10 +658,6 @@ namespace SummarizeTest
return whats.ToArray();
}
delegate IEnumerable<Magnesium.Event> parseDelegate(System.IO.Stream stream, string file,
bool keepOriginalElement = false, double startTime = -1, double endTime = Double.MaxValue,
double samplingFactor = 1.0);
static int Summarize(string[] traceFiles, string summaryFileName,
string errorFileName, bool? killed, List<string> outputErrors, int? exitCode, long? peakMemory,
string uid, string valgrindOutputFileName, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError,
@ -696,12 +689,7 @@ namespace SummarizeTest
{
try
{
parseDelegate parse;
if (traceFileName.EndsWith(".json"))
parse = Magnesium.JsonParser.Parse;
else
parse = Magnesium.XmlParser.Parse;
foreach (var ev in parse(traceFile, traceFileName))
foreach (var ev in Magnesium.XmlParser.Parse(traceFile, traceFileName))
{
Magnesium.Severity newSeverity;
if (severityMap.TryGetValue(new KeyValuePair<string, Magnesium.Severity>(ev.Type, ev.Severity), out newSeverity))
@ -1101,19 +1089,10 @@ namespace SummarizeTest
private static void AppendToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, bool shouldLock = true)
{
bool useXml = true;
if (summaryFileName.EndsWith(".json"))
useXml = false;
if (traceToStdout)
{
if (useXml) {
using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) }))
xout.WriteTo(wr);
} else {
using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(Console.OpenStandardOutput()))
xout.WriteTo(wr);
}
using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) }))
xout.WriteTo(wr);
Console.WriteLine();
return;
}
@ -1124,6 +1103,7 @@ namespace SummarizeTest
takeLock(summaryFileName);
try
{
using (var f = System.IO.File.Open(summaryFileName, System.IO.FileMode.Append, System.IO.FileAccess.Write))
{
if (f.Length == 0)
@ -1131,13 +1111,8 @@ namespace SummarizeTest
byte[] bytes = Encoding.UTF8.GetBytes("<Trace>");
f.Write(bytes, 0, bytes.Length);
}
if (useXml) {
using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true }))
xout.Save(wr);
} else {
using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(f))
xout.WriteTo(wr);
}
using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true }))
xout.Save(wr);
var endl = Encoding.UTF8.GetBytes(Environment.NewLine);
f.Write(endl, 0, endl.Length);
}
@ -1148,7 +1123,6 @@ namespace SummarizeTest
releaseLock(summaryFileName);
}
}
private static void AppendXmlMessageToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, string testFile = null,
int? seed = null, bool? buggify = null, bool? determinismCheck = null, string oldBinaryName = null)
{

View File

@ -51,7 +51,7 @@ namespace Magnesium
}
catch (Exception e)
{
throw new Exception(string.Format("Failed to parse JSON {0}", root), e);
throw new Exception(string.Format("Failed to parse {0}", root), e);
}
if (ev != null) yield return ev;
}
@ -80,9 +80,8 @@ namespace Magnesium
TraceFile = file,
DDetails = xEvent.Elements()
.Where(a=>a.Name != "Type" && a.Name != "Time" && a.Name != "Machine" && a.Name != "ID" && a.Name != "Severity" && (!rolledEvent || a.Name != "OriginalTime"))
// When the key contains a colon character, it gets parsed as a:item
.ToDictionary(a=>a.Name.LocalName == "item" ? a.Attribute("item").Value : string.Intern(a.Name.LocalName), a=>(object)a.Value),
original = keepOriginalElement ? xEvent : null
.ToDictionary(a=>string.Intern(a.Name.LocalName), a=>(object)a.Value),
original = keepOriginalElement ? xEvent : null,
};
}

View File

@ -53,7 +53,7 @@ namespace Magnesium
}
catch (Exception e)
{
throw new Exception(string.Format("Failed to parse XML {0}", xev), e);
throw new Exception(string.Format("Failed to parse {0}", xev), e);
}
if (ev != null) yield return ev;
}

BIN
documentation/FDB.pdf Normal file

Binary file not shown.

42
documentation/FDB.svg Normal file
View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg width="201px" height="98px" viewBox="0 0 201 98" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<!-- Generator: Sketch 55.1 (78136) - https://sketchapp.com -->
<title>Artboard</title>
<desc>Created with Sketch.</desc>
<defs>
<polygon id="path-1" points="0.0278974895 27.28142 0.0278974895 21.9743876 42.1452024 21.9743876 42.1452024 0.409628138 83.9914367 4.60308577 83.9914367 21.9743876 126.435607 21.9743876 126.435607 0.409628138 200.861925 8.19721234 200.861925 27.28142"></polygon>
<polygon id="path-3" points="0.0278974895 28.0890523 42.1452024 21.9158028 42.1452024 0.35104341 83.9914367 10.7214702 83.9914367 28.0932369 126.435607 21.9158028 126.435607 0.35104341 200.861925 19.1079205 200.861925 38.2767505 126.435607 27.9058588 83.9914367 33.4844268 42.1452024 27.9058588 0.0278974895 33.4807071"></polygon>
<polygon id="path-5" points="0.298968096 34.5115194 42.7431386 21.6321783 42.7431386 0.0674189331 117.169456 30.6174948 117.169456 49.1874587 42.7431386 28.2215654 0.298968096 39.9027092"></polygon>
</defs>
<g id="Artboard" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd">
<g id="fdb-Logo">
<g id="Group" transform="translate(0.000000, 69.743724)">
<g id="Clipped">
<mask id="mask-2" fill="white">
<use xlink:href="#path-1"></use>
</mask>
<g id="c"></g>
<polygon id="Path" fill="#0073E0" fill-rule="nonzero" mask="url(#mask-2)" points="0.0278974895 27.3209414 200.889822 27.3209414 200.889822 0.371966527 0.0278974895 0.371966527"></polygon>
</g>
</g>
<g id="Group" transform="translate(0.000000, 34.871862)">
<g id="Clipped">
<mask id="mask-4" fill="white">
<use xlink:href="#path-3"></use>
</mask>
<g id="e"></g>
<polygon id="Path" fill="#289DFC" fill-rule="nonzero" mask="url(#mask-4)" points="0.0278974895 38.3125523 200.889822 38.3125523 200.889822 0.316171548 0.0278974895 0.316171548"></polygon>
</g>
</g>
<g id="Group" transform="translate(83.692469, 0.000000)">
<g id="Clipped">
<mask id="mask-6" fill="white">
<use xlink:href="#path-5"></use>
</mask>
<g id="g"></g>
<polygon id="Path" fill="#9ACEFE" fill-rule="nonzero" mask="url(#mask-6)" points="0.251077406 49.1925732 117.197354 49.1925732 117.197354 0.0371966527 0.251077406 0.0371966527"></polygon>
</g>
</g>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

@ -0,0 +1,34 @@
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="1912" height="212" viewBox="0 0 1912 212">
<defs>
<path id="a" d="M22.551,151 L0.676,151 L0.676,10.082 L88.664,10.082 L88.664,30.004 L22.551,30.004 L22.551,73.266 L83.098,73.266 L83.098,92.602 L22.551,92.602 L22.551,151 Z M159.465,134.398 C177.629,134.398 187.98,120.922 187.98,97.777 C187.98,74.73 177.629,61.254 159.465,61.254 C141.203,61.254 130.949,74.73 130.949,97.777 C130.949,121.02 141.203,134.398 159.465,134.398 Z M159.465,153.051 C128.312,153.051 109.27,132.25 109.27,97.777 C109.27,63.5 128.41,42.602 159.465,42.602 C190.422,42.602 209.562,63.5 209.562,97.777 C209.562,132.25 190.52,153.051 159.465,153.051 Z M324.406,44.652 L324.406,151 L304.191,151 L304.191,134.105 L302.531,134.105 C297.355,146.215 286.516,153.051 270.402,153.051 C246.867,153.051 233.781,138.695 233.781,113.695 L233.781,44.652 L254.777,44.652 L254.777,108.227 C254.777,125.414 261.711,133.617 277.141,133.617 C294.133,133.617 303.41,123.559 303.41,106.859 L303.41,44.652 L324.406,44.652 Z M354.875,151 L354.875,44.652 L375.09,44.652 L375.09,61.547 L376.652,61.547 C381.828,49.73 392.375,42.602 408.391,42.602 C432.121,42.602 445.207,56.859 445.207,82.152 L445.207,151 L424.211,151 L424.211,87.426 C424.211,70.336 416.789,61.84 401.262,61.84 C385.734,61.84 375.871,72.191 375.871,88.793 L375.871,151 L354.875,151 Z M513.566,152.758 C486.516,152.758 469.426,131.469 469.426,97.777 C469.426,64.184 486.711,42.895 513.566,42.895 C528.117,42.895 540.422,49.828 546.184,61.547 L547.746,61.547 L547.746,3.148 L568.742,3.148 L568.742,151 L548.625,151 L548.625,134.203 L546.965,134.203 C540.617,145.824 528.215,152.758 513.566,152.758 Z M519.523,61.742 C501.848,61.742 491.105,75.414 491.105,97.777 C491.105,120.336 501.75,133.91 519.523,133.91 C537.199,133.91 548.137,120.141 548.137,97.875 C548.137,75.707 537.102,61.742 519.523,61.742 Z M634.953,135.082 C650.773,135.082 662.492,125.023 662.492,111.84 L662.492,102.953 L636.516,104.613 C621.867,105.59 615.227,110.57 615.227,119.945 C615.227,129.516 623.527,135.082 634.953,135.082 Z M629.582,152.758 C609.074,152.758 594.133,140.355 594.133,120.922 C594.133,101.781 608.391,90.746 633.684,89.184 L662.492,87.523 L662.492,78.344 C662.492,67.113 655.07,60.766 640.715,60.766 C628.996,60.766 620.891,65.062 618.547,72.582 L598.234,72.582 C600.383,54.32 617.57,42.602 641.691,42.602 C668.352,42.602 683.391,55.883 683.391,78.344 L683.391,151 L663.176,151 L663.176,136.059 L661.516,136.059 C655.168,146.703 643.547,152.758 629.582,152.758 Z M720.695,18.187 L741.691,18.187 L741.691,45.141 L764.738,45.141 L764.738,62.816 L741.691,62.816 L741.691,117.504 C741.691,128.637 746.281,133.52 756.73,133.52 C759.953,133.52 761.809,133.324 764.738,133.031 L764.738,150.512 C761.32,151.098 757.414,151.586 753.313,151.586 C729.973,151.586 720.695,143.383 720.695,122.875 L720.695,62.816 L703.801,62.816 L703.801,45.141 L720.695,45.141 L720.695,18.187 Z M789.836,151 L789.836,44.652 L810.734,44.652 L810.734,151 L789.836,151 Z M800.285,26 C792.473,26 786.711,20.434 786.711,13.207 C786.711,5.883 792.473,0.316 800.285,0.316 C808.098,0.316 813.859,5.883 813.859,13.207 C813.859,20.434 808.098,26 800.285,26 Z M886.516,134.398 C904.68,134.398 915.031,120.922 915.031,97.777 C915.031,74.73 904.68,61.254 886.516,61.254 C868.254,61.254 858,74.73 858,97.777 C858,121.02 868.254,134.398 886.516,134.398 Z M886.516,153.051 C855.363,153.051 836.32,132.25 836.32,97.777 C836.32,63.5 855.461,42.602 886.516,42.602 C917.473,42.602 936.613,63.5 936.613,97.777 C936.613,132.25 917.57,153.051 886.516,153.051 Z M961.809,151 L961.809,44.652 L982.023,44.652 L982.023,61.547 L983.586,61.547 C988.762,49.73 999.309,42.602 1015.324,42.602 C1039.055,42.602 1052.141,56.859 1052.141,82.152 L1052.141,151 L1031.145,151 L1031.145,87.426 C1031.145,70.336 1023.723,61.84 1008.195,61.84 C992.668,61.84 982.805,72.191 982.805,88.793 L982.805,151 L961.809,151 Z M1083.488,9.984 L1138.957,9.984 C1180.852,9.984 1205.07,35.375 1205.07,79.516 C1205.07,125.316 1181.145,151 1138.957,151 L1083.488,151 L1083.488,9.984 Z M1112.98,35.18 L1112.98,125.805 L1134.27,125.805 C1160.344,125.805 1174.992,109.789 1174.992,80.004 C1174.992,51.488 1159.855,35.18 1134.27,35.18 L1112.98,35.18 Z M1296.184,151 L1232.902,151 L1232.902,10.082 L1294.523,10.082 C1321.867,10.082 1338.176,23.461 1338.176,45.238 C1338.176,60.18 1327.141,73.168 1312.687,75.316 L1312.687,77.074 C1331.34,78.441 1344.914,92.504 1344.914,110.668 C1344.914,135.375 1326.262,151 1296.184,151 Z M1262.395,32.641 L1262.395,68.48 L1284.563,68.48 C1300.48,68.48 1309.172,61.937 1309.172,50.609 C1309.172,39.379 1301.066,32.641 1287.004,32.641 L1262.395,32.641 Z M1262.395,128.441 L1288.664,128.441 C1305.656,128.441 1314.836,121.313 1314.836,108.129 C1314.836,95.238 1305.363,88.402 1287.98,88.402 L1262.395,88.402 L1262.395,128.441 Z"/>
<polygon id="c" points=".06 58.675 .06 47.261 90.643 47.261 90.643 .881 180.643 9.9 180.643 47.261 271.929 47.261 271.929 .881 432 17.63 432 58.675 .06 58.675"/>
<polygon id="e" points=".06 60.412 90.643 47.135 90.643 .755 180.643 23.059 180.643 60.421 271.929 47.135 271.929 .755 432 41.096 432 82.323 271.929 60.018 180.643 72.016 90.643 60.018 .06 72.008 .06 60.412"/>
<polygon id="g" points=".643 74.225 91.929 46.525 91.929 .145 252 65.85 252 105.789 91.929 60.697 .643 85.82"/>
</defs>
<g fill="none" fill-rule="evenodd">
<g transform="translate(567 58)">
<mask id="b" fill="white">
<use xlink:href="#a"/>
</mask>
<polygon fill="#0081FF" points=".66 153.16 1345.02 153.16 1345.02 .28 .66 .28" mask="url(#b)"/>
</g>
<g transform="translate(0 150)">
<mask id="d" fill="white">
<use xlink:href="#c"/>
</mask>
<polygon fill="#0073E0" points=".06 58.76 432.06 58.76 432.06 .8 .06 .8" mask="url(#d)"/>
</g>
<g transform="translate(0 75)">
<mask id="f" fill="white">
<use xlink:href="#e"/>
</mask>
<polygon fill="#289DFC" points=".06 82.4 432.06 82.4 432.06 .68 .06 .68" mask="url(#f)"/>
</g>
<g transform="translate(180)">
<mask id="h" fill="white">
<use xlink:href="#g"/>
</mask>
<polygon fill="#9ACEFE" points=".54 105.8 252.06 105.8 252.06 .08 .54 .08" mask="url(#h)"/>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 6.2 KiB

View File

@ -16,6 +16,7 @@ Features
* Added a new API in all bindings that can be used to query the estimated byte size of a given range. `(PR #2537) <https://github.com/apple/foundationdb/pull/2537>`_
* Added the ``lock`` and ``unlock`` commands to ``fdbcli`` which lock or unlock a cluster. `(PR #2890) <https://github.com/apple/foundationdb/pull/2890>`_
* Add a framework which helps to add client functions using special keys (keys within ``[\xff\xff, \xff\xff\xff)``). `(PR #2662) <https://github.com/apple/foundationdb/pull/2662>`_
* Added capability of aborting replication to a clone of DR site without affecting replication to the original dr site with ``--dstonly`` option of ``fdbdr abort``. `(PR 3457) <https://github.com/apple/foundationdb/pull/3457>`_
Performance
-----------

View File

@ -122,6 +122,7 @@ enum {
OPT_SOURCE_CLUSTER,
OPT_DEST_CLUSTER,
OPT_CLEANUP,
OPT_DSTONLY,
OPT_TRACE_FORMAT,
};
@ -767,6 +768,7 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
{ OPT_DEST_CLUSTER, "-d", SO_REQ_SEP },
{ OPT_DEST_CLUSTER, "--destination", SO_REQ_SEP },
{ OPT_CLEANUP, "--cleanup", SO_NONE },
{ OPT_DSTONLY, "--dstonly", SO_NONE },
{ OPT_TAGNAME, "-t", SO_REQ_SEP },
{ OPT_TAGNAME, "--tagname", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
@ -1147,6 +1149,7 @@ static void printDBBackupUsage(bool devhelp) {
printf(" -k KEYS List of key ranges to backup.\n"
" If not specified, the entire database will be backed up.\n");
printf(" --cleanup Abort will attempt to stop mutation logging on the source cluster.\n");
printf(" --dstonly Abort will not make any changes on the source cluster.\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -1912,12 +1915,12 @@ ACTOR Future<Void> statusBackup(Database db, std::string tagName, bool showError
return Void();
}
ACTOR Future<Void> abortDBBackup(Database src, Database dest, std::string tagName, bool partial) {
ACTOR Future<Void> abortDBBackup(Database src, Database dest, std::string tagName, bool partial, bool dstOnly) {
try
{
state DatabaseBackupAgent backupAgent(src);
wait(backupAgent.abortBackup(dest, Key(tagName), partial));
wait(backupAgent.abortBackup(dest, Key(tagName), partial, false, dstOnly));
wait(backupAgent.unlockBackup(dest, Key(tagName)));
printf("The DR on tag `%s' was successfully aborted.\n", printable(StringRef(tagName)).c_str());
@ -2950,6 +2953,7 @@ int main(int argc, char* argv[]) {
uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE;
ESOError lastError;
bool partial = true;
bool dstOnly = false;
LocalityData localities;
uint64_t memLimit = 8LL << 30;
Optional<uint64_t> ti;
@ -3130,6 +3134,9 @@ int main(int argc, char* argv[]) {
case OPT_CLEANUP:
partial = false;
break;
case OPT_DSTONLY:
dstOnly = true;
break;
case OPT_KNOB: {
std::string syn = args->OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--knob_"))) {
@ -3798,7 +3805,7 @@ int main(int argc, char* argv[]) {
f = stopAfter( switchDBBackup(sourceDb, db, backupKeys, tagName, forceAction) );
break;
case DB_ABORT:
f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial) );
f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial, dstOnly) );
break;
case DB_PAUSE:
f = stopAfter( changeDBBackupResumed(sourceDb, db, true) );

View File

@ -419,7 +419,7 @@ public:
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return discontinueBackup(tr, tagName); });
}
Future<Void> abortBackup(Database cx, Key tagName, bool partial = false, bool abortOldBackup = false);
Future<Void> abortBackup(Database cx, Key tagName, bool partial = false, bool abortOldBackup = false, bool dstOnly = false);
Future<std::string> getStatus(Database cx, int errorLimit, Key tagName);

View File

@ -2173,7 +2173,7 @@ public:
return Void();
}
ACTOR static Future<Void> abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial, bool abortOldBackup) {
ACTOR static Future<Void> abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial, bool abortOldBackup, bool dstOnly) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Key logUidValue, destUidValue;
state UID logUid, destUid;
@ -2265,67 +2265,68 @@ public:
}
}
state Future<Void> partialTimeout = partial ? delay(30.0) : Never();
if (! dstOnly) {
state Future<Void> partialTimeout = partial ? delay(30.0) : Never();
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
state Version beginVersion;
state Version endVersion;
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
state Version beginVersion;
state Version endVersion;
loop {
try {
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<Value>> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) );
wait(success(backupVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
loop {
try {
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<Value>> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) );
wait(success(backupVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if(backupVersionF.get().present() && BinaryReader::fromStringRef<Version>(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
break;
}
if(backupVersionF.get().present() && BinaryReader::fromStringRef<Version>(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
break;
}
if (abortOldBackup) {
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) ));
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
srcTr->clear(prefixRange(logUidValue.withPrefix(backupLogKeys.begin)));
srcTr->clear(prefixRange(logUidValue.withPrefix(logRangesRange.begin)));
break;
}
if (abortOldBackup) {
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) ));
Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix));
state Future<Optional<Key>> bVersionF = srcTr->get(latestVersionKey);
wait(success(bVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if (bVersionF.get().present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersionF.get().get(), Unversioned());
} else {
break;
}
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
srcTr->clear(prefixRange(logUidValue.withPrefix(backupLogKeys.begin)));
srcTr->clear(prefixRange(logUidValue.withPrefix(logRangesRange.begin)));
wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout );
if(partialTimeout.isReady()) {
return Void();
}
wait(srcTr->commit() || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
endVersion = srcTr->getCommittedVersion() + 1;
break;
}
Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix));
state Future<Optional<Key>> bVersionF = srcTr->get(latestVersionKey);
wait(success(bVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
catch (Error &e) {
wait(srcTr->onError(e));
}
if (bVersionF.get().present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersionF.get().get(), Unversioned());
} else {
break;
}
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout );
if(partialTimeout.isReady()) {
return Void();
}
wait(srcTr->commit() || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
endVersion = srcTr->getCommittedVersion() + 1;
break;
}
catch (Error &e) {
wait(srcTr->onError(e));
}
}
@ -2527,8 +2528,8 @@ Future<Void> DatabaseBackupAgent::discontinueBackup(Reference<ReadYourWritesTran
return DatabaseBackupAgentImpl::discontinueBackup(this, tr, tagName);
}
Future<Void> DatabaseBackupAgent::abortBackup(Database cx, Key tagName, bool partial, bool abortOldBackup){
return DatabaseBackupAgentImpl::abortBackup(this, cx, tagName, partial, abortOldBackup);
Future<Void> DatabaseBackupAgent::abortBackup(Database cx, Key tagName, bool partial, bool abortOldBackup, bool dstOnly){
return DatabaseBackupAgentImpl::abortBackup(this, cx, tagName, partial, abortOldBackup, dstOnly);
}
Future<std::string> DatabaseBackupAgent::getStatus(Database cx, int errorLimit, Key tagName) {

View File

@ -223,11 +223,13 @@ public:
bool enableLocalityLoadBalance;
struct VersionRequest {
SpanID spanContext;
Promise<GetReadVersionReply> reply;
TagSet tags;
Optional<UID> debugID;
VersionRequest(TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>()) : tags(tags), debugID(debugID) {}
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), tags(tags), debugID(debugID) {}
};
// Transaction start request batching

View File

@ -36,6 +36,7 @@ typedef uint64_t Sequence;
typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
typedef UID SpanID;
enum {
tagLocalitySpecial = -1,

View File

@ -1437,8 +1437,8 @@ namespace fileBackup {
// thousands of iterations.
// Declare some common iterators which must be state vars and will be used multiple times.
state int i;
state RangeMap<Key, int, KeyRangeRef>::Iterator iShard;
state RangeMap<Key, int, KeyRangeRef>::Iterator iShardEnd;
state RangeMap<Key, int, KeyRangeRef>::iterator iShard;
state RangeMap<Key, int, KeyRangeRef>::iterator iShardEnd;
// Set anything inside a dispatched range to DONE.
// Also ensure that the boundary value are true, false, [true, false]...

View File

@ -153,6 +153,7 @@ struct CommitTransactionRequest : TimedRequest {
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
Arena arena;
SpanID spanContext;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
uint32_t flags;
@ -164,7 +165,7 @@ struct CommitTransactionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet);
serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet, spanContext);
}
};
@ -211,6 +212,7 @@ struct GetReadVersionRequest : TimedRequest {
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};
SpanID spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
@ -221,9 +223,11 @@ struct GetReadVersionRequest : TimedRequest {
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount(1), flags(0) {}
GetReadVersionRequest(uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(), Optional<UID> debugID = Optional<UID>())
: transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), debugID(debugID)
{
GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority,
uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
debugID(debugID) {
flags = flags & ~FLAG_PRIORITY_MASK;
switch(priority) {
case TransactionPriority::BATCH:
@ -239,12 +243,12 @@ struct GetReadVersionRequest : TimedRequest {
ASSERT(false);
}
}
bool operator < (GetReadVersionRequest const& rhs) const { return priority < rhs.priority; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply);
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
if(ar.isDeserializing) {
if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
@ -277,6 +281,7 @@ struct GetKeyServerLocationsReply {
struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanID spanContext;
KeyRef begin;
Optional<KeyRef> end;
int limit;
@ -284,24 +289,28 @@ struct GetKeyServerLocationsRequest {
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
GetKeyServerLocationsRequest( KeyRef const& begin, Optional<KeyRef> const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {}
template <class Ar>
GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional<KeyRef> const& end, int limit,
bool reverse, Arena const& arena)
: spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, limit, reverse, reply, arena);
serializer(ar, begin, end, limit, reverse, reply, spanContext, arena);
}
};
struct GetRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 12954034;
SpanID spanContext;
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
explicit GetRawCommittedVersionRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional<UID> const& debugID = Optional<UID>()) : spanContext(spanContext), debugID(debugID) {}
explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, debugID, reply);
serializer(ar, debugID, reply, spanContext);
}
};

View File

@ -47,6 +47,7 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/versions.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/simulator.h"
@ -54,18 +55,17 @@
#include "flow/ActorCollection.h"
#include "flow/DeterministicRandom.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/Knobs.h"
#include "flow/Platform.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"
#include "flow/Trace.h"
#include "flow/Tracing.h"
#include "flow/UnitTest.h"
#include "flow/serialize.h"
#include "fdbclient/versions.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
@ -1628,7 +1628,10 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
}
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
ACTOR Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_internal(Database cx, Key key,
TransactionInfo info,
bool isBackward = false) {
state Span span("NAPI:getKeyLocation"_loc, info.spanID);
if (isBackward) {
ASSERT( key != allKeys.begin && key <= allKeys.end );
} else {
@ -1642,7 +1645,10 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span.context, key, Optional<KeyRef>(), 100, isBackward, key.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
@ -1677,7 +1683,10 @@ Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation(Database const& c
return ssi;
}
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
ACTOR Future<vector<pair<KeyRange, Reference<LocationInfo>>>> getKeyRangeLocations_internal(Database cx, KeyRange keys,
int limit, bool reverse,
TransactionInfo info) {
state Span span("NAPI:getKeyRangeLocations"_loc, info.spanID);
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -1685,7 +1694,10 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span.context, keys.begin, keys.end, limit, reverse, keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if( info.debugID.present() )
@ -1776,6 +1788,7 @@ Future<Void> Transaction::warmRange(Database cx, KeyRange keys) {
ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Database cx, TransactionInfo info, Reference<TransactionLogInfo> trLogInfo, TagSet tags )
{
state Version ver = wait( version );
state Span span("NAPI:getValue"_loc, info.spanID);
cx->validateVersion(ver);
loop {
@ -1808,10 +1821,12 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
}
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
when(GetValueReply _reply = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(span.context, key, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1869,6 +1884,7 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Span span("NAPI:getKey"_loc, info.spanID);
if( info.debugID.present() ) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -1897,9 +1913,11 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey,
GetKeyRequest(span.context, k, version.get(),
cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1932,12 +1950,15 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
}
}
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) {
state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext });
try {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when(GetReadVersionReply v = wait(basicLoadBalance(
cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.version >= version)
@ -1953,11 +1974,14 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
}
}
ACTOR Future<Version> getRawVersion( Database cx ) {
ACTOR Future<Version> getRawVersion( Database cx, SpanID spanContext ) {
state Span span("NAPI:getRawVersion"_loc, { spanContext });
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
return v.version;
}
}
@ -1971,6 +1995,7 @@ ACTOR Future<Void> readVersionBatcher(
ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Version ver = wait( version );
state Span span("NAPI:watchValue"_loc, info.spanID);
cx->validateVersion(ver);
ASSERT(ver != latestVersion);
@ -1987,9 +2012,11 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
}
state WatchValueReply resp;
choose {
when(WatchValueReply r = wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
when(WatchValueReply r = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(span.context, key, value, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); }
@ -2000,7 +2027,7 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
//FIXME: wait for known committed version on the storage server before replying,
//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version));
Version v = wait(waitForCommittedVersion(cx, resp.version, span.context));
//TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
@ -2053,6 +2080,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info, TagSet tags )
{
state Standalone<RangeResultRef> output;
state Span span("NAPI:getExactRange"_loc, info.spanID);
//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
loop {
@ -2066,6 +2094,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
req.version = version;
req.begin = firstGreaterOrEqual( range.begin );
req.end = firstGreaterOrEqual( range.end );
req.spanContext = span.context;
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
@ -2310,6 +2339,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state Standalone<RangeResultRef> output;
state Span span("NAPI:getRange"_loc, info.spanID);
try {
state Version version = wait( fVersion );
@ -2362,6 +2392,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
req.debugID = info.debugID;
req.spanContext = span.context;
try {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
@ -2556,10 +2587,11 @@ void debugAddTags(Transaction *tr) {
}
Transaction::Transaction( Database const& cx )
: cx(cx), info(cx->taskID), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0), trLogInfo(createTrLogInfoProbabilistically(cx))
{
if(DatabaseContext::debugUseTags) {
Transaction::Transaction(Database const& cx)
: cx(cx), info(cx->taskID, deterministicRandom()->randomUniqueID()), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF),
committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0),
trLogInfo(createTrLogInfoProbabilistically(cx)), span(info.spanID, "Transaction"_loc) {
if (DatabaseContext::debugUseTags) {
debugAddTags(this);
}
}
@ -2699,7 +2731,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch, Database cx, TagSet tags, Trans
}
Future<Version> Transaction::getRawReadVersion() {
return ::getRawVersion(cx);
return ::getRawVersion(cx, info.spanID);
}
Future< Void > Transaction::watch( Reference<Watch> watch ) {
@ -3062,6 +3094,8 @@ void Transaction::reset() {
void Transaction::fullReset() {
reset();
span = Span(span.location);
info.spanID = span.context;
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}
@ -3138,7 +3172,7 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
tr.setVersion( version );
state int checkedRanges = 0;
state KeyRangeMap<MutationBlock>::Ranges ranges = expectedValues.ranges();
state KeyRangeMap<MutationBlock>::Iterator it = ranges.begin();
state KeyRangeMap<MutationBlock>::iterator it = ranges.begin();
for(; it != ranges.end(); ++it) {
state MutationBlock m = it->value();
if( m.mutated ) {
@ -3178,6 +3212,8 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) {
state Transaction tr(cx);
state int retries = 0;
state Span span("NAPI:dummyTransaction"_loc, info.spanID);
tr.span.addParent(span.context);
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
@ -3224,6 +3260,8 @@ void Transaction::setupWatches() {
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
state TraceInterval interval( "TransactionCommit" );
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, info.spanID);
req.spanContext = span.context;
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
try {
@ -3677,6 +3715,14 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
options.readTags.addTag(value.get());
break;
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValue(value, true);
if (value.get().size() != 16) {
throw invalid_option_value();
}
span.addParent(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
@ -3687,13 +3733,16 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, TransactionPriority priority, uint32_t flags, TransactionTagMap<uint32_t> tags, Optional<UID> debugID ) {
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan, DatabaseContext* cx, uint32_t transactionCount,
TransactionPriority priority, uint32_t flags,
TransactionTagMap<uint32_t> tags, Optional<UID> debugID) {
state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan);
try {
++cx->transactionReadVersionBatches;
if( debugID.present() )
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
loop {
state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID );
state GetReadVersionRequest req( span.context, transactionCount, priority, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
@ -3744,6 +3793,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
state PromiseStream<double> replyTimes;
state PromiseStream<Error> _errorStream;
state double batchTime = 0;
state Span span("NAPI:readVersionBatcher"_loc);
loop {
send_batch = false;
choose {
@ -3754,6 +3804,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
span.addParent(req.spanContext);
requests.push_back(req.reply);
for(auto tag : req.tags) {
++tags[tag];
@ -3781,9 +3832,10 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
Future<Void> batch = incrementalBroadcastWithError(
getConsistentReadVersion(cx, count, priority, flags, std::move(tags), std::move(debugID)),
getConsistentReadVersion(span.context, cx, count, priority, flags, std::move(tags), std::move(debugID)),
std::move(requests), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
span = Span("NAPI:readVersionBatcher"_loc);
tags.clear();
debugID = Optional<UID>();
requests.clear();
@ -3793,7 +3845,11 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, TransactionPriority priority, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, TagSet tags) {
ACTOR Future<Version> extractReadVersion(SpanID parentSpan, DatabaseContext* cx, TransactionPriority priority,
Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f,
bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion,
TagSet tags) {
// parentSpan here is only used to keep the parent alive until the request completes
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -3915,10 +3971,12 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags );
}
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
Span span("NAPI:getReadVersion"_loc, info.spanID);
auto const req = DatabaseContext::VersionRequest(span.context, options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion( cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags);
readVersion = extractReadVersion(span.context, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(),
options.lockAware, startTime, metadataVersion, options.tags);
}
return readVersion;
}
@ -4015,9 +4073,10 @@ ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRangeRef keys,
}
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys) {
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
cx, keys, std::numeric_limits<int>::max(), false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution)));
state Span span("NAPI:GetStorageMetricsLargeKeyRange"_loc);
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(
getKeyRangeLocations(cx, keys, std::numeric_limits<int>::max(), false, &StorageServerInterface::waitMetrics,
TransactionInfo(TaskPriority::DataDistribution, span.context)));
state int nLocs = locations.size();
state vector<Future<StorageMetrics>> fx(nLocs);
state StorageMetrics total;
@ -4098,12 +4157,13 @@ ACTOR Future< StorageMetrics > extractMetrics( Future<std::pair<Optional<Storage
}
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, KeyRange keys) {
state Span span("NAPI:GetReadHotRanges"_loc);
loop {
int64_t shardLimit = 100; // Shard limit here does not really matter since this function is currently only used
// to find the read-hot sub ranges within a read-hot shard.
vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx, keys, shardLimit, false, &StorageServerInterface::getReadHotRanges,
TransactionInfo(TaskPriority::DataDistribution)));
TransactionInfo(TaskPriority::DataDistribution, span.context)));
try {
// TODO: how to handle this?
// This function is called whenever a shard becomes read-hot. But somehow the shard was splitted across more
@ -4151,9 +4211,12 @@ ACTOR Future< std::pair<Optional<StorageMetrics>, int> > waitStorageMetrics(
int shardLimit,
int expectedShardCount )
{
state Span span("NAPI:WaitStorageMetrics"_loc);
loop {
vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) );
if(expectedShardCount >= 0 && locations.size() != expectedShardCount) {
vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics,
TransactionInfo(TaskPriority::DataDistribution, span.context)));
if (expectedShardCount >= 0 && locations.size() != expectedShardCount) {
return std::make_pair(Optional<StorageMetrics>(), locations.size());
}
@ -4236,8 +4299,11 @@ Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getReadHotRanges(KeyRang
ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
{
state Span span("NAPI:SplitStorageMetrics"_loc);
loop {
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) );
state vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics,
TransactionInfo(TaskPriority::DataDistribution, span.context)));
state StorageMetrics used;
state Standalone<VectorRef<KeyRef>> results;

View File

@ -19,6 +19,8 @@
*/
#pragma once
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
#include "fdbclient/NativeAPI.actor.g.h"
@ -152,13 +154,15 @@ class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
SpanID spanID;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
explicit TransactionInfo(TaskPriority taskID, SpanID spanID)
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
@ -283,7 +287,9 @@ public:
void flushTrLogsIfEnabled();
// These are to permit use as state variables in actors:
Transaction() : info( TaskPriority::DefaultEndpoint ) {}
Transaction()
: info(TaskPriority::DefaultEndpoint, deterministicRandom()->randomUniqueID()),
span(info.spanID, "Transaction"_loc) {}
void operator=(Transaction&& r) noexcept;
void reset();
@ -309,6 +315,7 @@ public:
}
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
TransactionOptions options;
Span span;
double startTime;
Reference<TransactionLogInfo> trLogInfo;
@ -336,7 +343,7 @@ private:
Future<Void> committing;
};
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys,
int shardLimit);

View File

@ -115,7 +115,7 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
KeyRangeRef boundary, int* actualOffset,
Standalone<RangeResultRef>* result,
Optional<Standalone<RangeResultRef>>* cache) {
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter =
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::iterator iter =
ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey())
: sks->getImpls().rangeContaining(ks->getKey());
while ((ks->offset < 1 && iter->begin() > boundary.begin) || (ks->offset > 1 && iter->begin() < boundary.end)) {
@ -164,7 +164,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
// KeySelector, GetRangeLimits and reverse are all handled here
state Standalone<RangeResultRef> result;
state Standalone<RangeResultRef> pairs;
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter;
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::iterator iter;
state int actualBeginOffset;
state int actualEndOffset;
state KeyRangeRef moduleBoundary;

View File

@ -169,6 +169,7 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanID spanContext;
Key key;
Version version;
Optional<TagSet> tags;
@ -176,11 +177,12 @@ struct GetValueRequest : TimedRequest {
ReplyPromise<GetValueReply> reply;
GetValueRequest(){}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, version, tags, debugID, reply);
serializer(ar, key, version, tags, debugID, reply, spanContext);
}
};
@ -200,6 +202,7 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanID spanContext;
Key key;
Optional<Value> value;
Version version;
@ -208,11 +211,13 @@ struct WatchValueRequest {
ReplyPromise<WatchValueReply> reply;
WatchValueRequest(){}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
WatchValueRequest(SpanID spanContext, const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, value, version, tags, debugID, reply);
serializer(ar, key, value, version, tags, debugID, reply, spanContext);
}
};
@ -234,6 +239,7 @@ struct GetKeyValuesReply : public LoadBalancedReply {
struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
KeySelectorRef begin, end;
Version version; // or latestVersion
@ -246,7 +252,7 @@ struct GetKeyValuesRequest : TimedRequest {
GetKeyValuesRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena);
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
}
};
@ -266,6 +272,7 @@ struct GetKeyReply : public LoadBalancedReply {
struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanID spanContext;
Arena arena;
KeySelectorRef sel;
Version version; // or latestVersion
@ -274,11 +281,13 @@ struct GetKeyRequest : TimedRequest {
ReplyPromise<GetKeyReply> reply;
GetKeyRequest() {}
GetKeyRequest(KeySelectorRef const& sel, Version version, Optional<TagSet> tags, Optional<UID> debugID) : sel(sel), version(version), debugID(debugID) {}
GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, sel, version, tags, debugID, reply, arena);
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
}
};

View File

@ -268,6 +268,8 @@ description is not currently required but encouraged.
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
description="Adds a tag to the transaction that can be used to apply manual or automatic targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="span_parent" code="900" paramType="Bytes" paramDescription="A byte string of length 16 used to associate the span of this transaction with a parent"
description="Adds a parent to the Span of this transaction. Used for transaction tracing. A span can be identified with any 16 bytes"/>
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -66,41 +66,51 @@ template <class Key, class Val, class Range = RangeMapRange<Key>, class Metric =
class RangeMap {
private:
typedef MapPair<Key,Val> pair_type;
public:
//Applications may decrement an iterator before ranges begin, or increment after ranges end, but once in this state cannot do further incrementing or decrementing
class Iterator {
template <bool isConst>
class IteratorImpl {
using self_t = IteratorImpl<isConst>;
public:
Iterator() {} // singular
Iterator( typename Map<Key,Val,pair_type,Metric>::iterator it ) : it(it) {}
using value_type = std::conditional_t<isConst, typename Map<Key, Val, pair_type, Metric>::const_iterator,
typename Map<Key, Val, pair_type, Metric>::iterator>;
typedef std::forward_iterator_tag iterator_category;
using difference_type = int;
using pointer = self_t*;
using reference = self_t&;
IteratorImpl() {} // singular
explicit IteratorImpl<isConst>(const value_type it) : it(it) {}
Key const& begin() { return it->key; }
Key const& end() { auto j = it; ++j; return j->key; }
Range range() { return Range(begin(),end()); }
Val& value() {
std::conditional_t<isConst, const Val&, Val&> value() {
//ASSERT( it->key != allKeys.end );
return it->value;
}
const Val& cvalue() const { return it->value; }
void operator ++() { ++it; }
void operator --() { it.decrementNonEnd(); }
bool operator ==(Iterator const& r) const { return it == r.it; }
bool operator !=(Iterator const& r) const { return it != r.it; }
bool operator==(self_t const& r) const { return it == r.it; }
bool operator!=(self_t const& r) const { return it != r.it; }
// operator* and -> return this
Iterator& operator*() { return *this; }
Iterator* operator->() { return this; }
self_t& operator*() { return *this; }
self_t* operator->() { return this; }
typedef std::forward_iterator_tag iterator_category;
typedef Iterator value_type;
typedef int difference_type;
typedef Iterator* pointer;
typedef Iterator& reference;
private:
typename Map<Key,Val,pair_type,Metric>::iterator it;
value_type it;
};
typedef iterator_range<Iterator> Ranges;
public:
using iterator = IteratorImpl<false>;
using const_iterator = IteratorImpl<true>;
using Ranges = iterator_range<iterator>;
using ConstRanges = iterator_range<const_iterator>;
explicit RangeMap(Key endKey, Val v=Val(), MetricFunc m = MetricFunc()) : mf(m) {
Key beginKey = Key();
@ -111,37 +121,56 @@ public:
}
Val const& operator[]( const Key& k ) { return rangeContaining(k).value(); }
Ranges ranges() { return Ranges( Iterator(map.begin()), Iterator(map.lastItem()) ); }
Ranges ranges() { return Ranges(iterator(map.begin()), iterator(map.lastItem())); }
ConstRanges ranges() const { return ConstRanges(const_iterator(map.begin()), const_iterator(map.lastItem())); }
// intersectingRanges returns [begin, end] where begin <= r.begin and end >= r.end
Ranges intersectingRanges( const Range& r ) { return Ranges(rangeContaining(r.begin), Iterator(map.lower_bound(r.end))); }
Ranges intersectingRanges(const Range& r) {
return Ranges(rangeContaining(r.begin), iterator(map.lower_bound(r.end)));
}
ConstRanges intersectingRanges(const Range& r) const {
return ConstRanges(rangeContaining(r.begin), const_iterator(map.lower_bound(r.end)));
}
// containedRanges() will return all ranges that are fully contained by the passed range (note that a range fully contains itself)
Ranges containedRanges( const Range& r ) {
auto s = Iterator( map.lower_bound( r.begin ) );
if ( s.begin() >= r.end ) return Ranges(s,s);
Ranges containedRanges(const Range& r) {
iterator s(map.lower_bound(r.begin));
if (s.begin() >= r.end) return Ranges(s, s);
return Ranges(s, rangeContaining(r.end));
}
template <class ComparableToKey>
Iterator rangeContaining( const ComparableToKey& k ) {
return Iterator(map.lastLessOrEqual(k));
iterator rangeContaining(const ComparableToKey& k) {
return iterator(map.lastLessOrEqual(k));
}
template <class ComparableToKey>
const_iterator rangeContaining(const ComparableToKey& k) const {
return const_iterator(map.lastLessOrEqual(k));
}
// Returns the range containing a key infinitesimally before k, or the first range if k==Key()
template <class ComparableToKey>
Iterator rangeContainingKeyBefore( const ComparableToKey& k ) {
Iterator i = map.lower_bound(k);
iterator rangeContainingKeyBefore(const ComparableToKey& k) {
iterator i(map.lower_bound(k));
if (!i->begin().size()) return i;
--i;
return i;
}
template <class ComparableToKey>
const_iterator rangeContainingKeyBefore(const ComparableToKey& k) const {
const_iterator i(map.lower_bound(k));
if ( !i->begin().size() ) return i;
--i;
return i;
}
Iterator lastItem() {
auto i = map.lastItem();
iterator lastItem() {
auto i(map.lastItem());
i.decrementNonEnd();
return Iterator(i);
return iterator(i);
}
int size() const { return map.size() - 1; } // We always have one range bounded by two entries
Iterator randomRange() {
return Iterator( map.index( deterministicRandom()->randomInt(0, map.size()-1) ) );
iterator randomRange() { return iterator(map.index(deterministicRandom()->randomInt(0, map.size() - 1))); }
const_iterator randomRange() const {
return const_iterator(map.index(deterministicRandom()->randomInt(0, map.size() - 1)));
}
Iterator nthRange(int n) { return Iterator(map.index(n)); }
iterator nthRange(int n) { return iterator(map.index(n)); }
const_iterator nthRange(int n) const { return const_iterator(map.index(n)); }
bool allEqual( const Range& r, const Val& v );

View File

@ -32,6 +32,8 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define SevDebugMemory SevVerbose
@ -429,8 +431,9 @@ struct BackupData {
}
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span("BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(span.context, 1, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onMasterProxiesChanged())) {}

View File

@ -163,18 +163,21 @@ struct GetCommitVersionReply {
struct GetCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683181;
SpanID spanContext;
uint64_t requestNum;
uint64_t mostRecentProcessedRequestNum;
UID requestingProxy;
ReplyPromise<GetCommitVersionReply> reply;
GetCommitVersionRequest() { }
GetCommitVersionRequest(uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy)
: requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum), requestingProxy(requestingProxy) {}
GetCommitVersionRequest(SpanID spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
UID requestingProxy)
: spanContext(spanContext), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
requestingProxy(requestingProxy) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply);
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanContext);
}
};

View File

@ -44,10 +44,13 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Stats.h"
#include "flow/TDMetric.actor.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <tuple>
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
@ -293,9 +296,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
ACTOR Future<Void> queueTransactionStartRequests(
Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
SpannedDeque<GetReadVersionRequest> *systemQueue,
SpannedDeque<GetReadVersionRequest> *defaultQueue,
SpannedDeque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
@ -332,9 +335,11 @@ ACTOR Future<Void> queueTransactionStartRequests(
if (req.priority >= TransactionPriority::IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
systemQueue->span.addParent(req.spanContext);
} else if (req.priority >= TransactionPriority::DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
defaultQueue->span.addParent(req.spanContext);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
@ -346,6 +351,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span.addParent(req.spanContext);
}
}
}
@ -511,8 +517,11 @@ struct ResolutionRequestBuilder {
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
// -> read_conflict_range's original index in the commitTransactionRef
ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
for(auto& req : requests) {
ResolutionRequestBuilder(ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion,
Span& parentSpan)
: self(self), requests(self->resolvers.size()) {
for (auto& req : requests) {
req.spanContext = parentSpan.context;
req.prevVersion = prevVersion;
req.version = version;
req.lastReceivedVersion = lastReceivedVersion;
@ -794,6 +803,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
state Optional<UID> debugID;
state bool forceRecovery = false;
state int batchOperations = 0;
state Span span("MP:commitBatch"_loc);
int64_t batchBytes = 0;
for (int t = 0; t<trs.size(); t++) {
batchOperations += trs[t].transaction.mutations.size();
@ -816,6 +826,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
debugID = nondeterministicRandom()->randomUniqueID();
g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
}
span.addParent(trs[t].spanContext);
}
if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
@ -836,7 +847,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionRequest req(span.context, self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
self->mostRecentProcessedRequestNumber = versionReply.requestNum;
@ -857,7 +868,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version, span );
int conflictRangeCount = 0;
state int64_t maxTransactionBytes = 0;
for (int t = 0; t<trs.size(); t++) {
@ -1170,17 +1181,21 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
// We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
if(self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
computeDuration += g_network->timer() - computeStart;
state Span waitVersionSpan;
while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {span.context});
choose{
when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
break;
}
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, TransactionPriority::IMMEDIATE, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if(v.version > self->committedVersion.get()) {
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(
GetReadVersionRequest(waitVersionSpan.context, 0, TransactionPriority::IMMEDIATE,
GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if (v.version > self->committedVersion.get()) {
self->locked = v.locked;
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
@ -1191,6 +1206,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
}
}
}
waitVersionSpan = Span{};
computeStart = g_network->timer();
}
@ -1397,21 +1413,23 @@ ACTOR Future<Void> updateLastCommit(ProxyCommitData* self, Optional<UID> debugID
return Void();
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("MP:getLiveCommittedVersion"_loc, parentSpan);
++commitData->stats.txnStartBatch;
state vector<Future<GetReadVersionReply>> proxyVersions;
state Future<GetReadVersionReply> replyFromMasterFuture;
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
replyFromMasterFuture = commitData->master.getLiveCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::GetLiveCommittedVersionReply);
replyFromMasterFuture = commitData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
} else {
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::TLogConfirmRunningReply)));
}
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
@ -1517,15 +1535,16 @@ ACTOR static Future<Void> transactionStarter(
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;
state Deque<GetReadVersionRequest> batchQueue;
state SpannedDeque<GetReadVersionRequest> systemQueue("MP:transactionStarterSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("MP:transactionStarterDefaultQueue"_loc);
state SpannedDeque<GetReadVersionRequest> batchQueue("MP:transactionStarterBatchQueue"_loc);
state vector<MasterProxyInterface> otherProxies;
state TransactionTagMap<uint64_t> transactionTagCounter;
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
state Span span;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
@ -1569,7 +1588,7 @@ ACTOR static Future<Void> transactionStarter(
int requestsToStart = 0;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
SpannedDeque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
} else if(!defaultQueue.empty()) {
@ -1579,6 +1598,7 @@ ACTOR static Future<Void> transactionStarter(
} else {
break;
}
transactionQueue->span.swap(span);
auto& req = transactionQueue->front();
int tc = req.transactionCount;
@ -1640,7 +1660,9 @@ ACTOR static Future<Void> transactionStarter(
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
span.context, commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i],
defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats,
commitData->minKnownCommittedVersion, throttledTags));
@ -1650,6 +1672,7 @@ ACTOR static Future<Void> transactionStarter(
}
}
}
span = Span(span.location);
}
}
@ -2110,6 +2133,7 @@ ACTOR Future<Void> masterProxyServerCore(
}
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
//TraceEvent("ProxyGetRCV", proxy.id());
Span span("MP:getRawCommittedReadVersion"_loc, { req.spanContext });
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
GetReadVersionReply rep;

View File

@ -20,6 +20,9 @@
#ifndef FDBSERVER_RESOLVERINTERFACE_H
#define FDBSERVER_RESOLVERINTERFACE_H
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/fdbrpc.h"
#pragma once
#include "fdbrpc/Locality.h"
@ -94,17 +97,19 @@ struct ResolveTransactionBatchRequest {
constexpr static FileIdentifier file_identifier = 16462858;
Arena arena;
SpanID spanContext;
Version prevVersion;
Version version; // FIXME: ?
Version lastReceivedVersion;
VectorRef<CommitTransactionRef> transactions;
VectorRef<struct CommitTransactionRef> transactions;
VectorRef<int> txnStateTransactions; // Offsets of elements of transactions that have (transaction subsystem state) mutations
ReplyPromise<ResolveTransactionBatchReply> reply;
Optional<UID> debugID;
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena, debugID);
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena,
debugID, spanContext);
}
};

View File

@ -37,8 +37,8 @@
typedef std::map<Standalone<StringRef>, std::pair<Standalone<StringRef>, uint32_t>> SerializedMutationListMap;
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void splitMutation(const KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
@ -473,6 +473,19 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
return Void();
}
void buildApplierRangeMap(KeyRangeMap<UID>* krMap, std::map<Key, UID>* pRangeToApplier) {
std::map<Key, UID>::iterator beginKey = pRangeToApplier->begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != pRangeToApplier->end()) {
krMap->insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != pRangeToApplier->end()) {
krMap->insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
}
// Assume: kvOps data are from the same RestoreAsset.
// Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex)
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
@ -517,6 +530,8 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
KeyRangeMap<UID> krMap;
buildApplierRangeMap(&krMap, pRangeToApplier);
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
@ -529,8 +544,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
Standalone<VectorRef<UID>> nodeIDs;
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
splitMutation(pRangeToApplier, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(),
nodeIDs.contents());
splitMutation(krMap, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
if (MUTATION_TRACKING_ENABLED) {
@ -625,59 +639,21 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
return Void();
}
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// Splits a clear range mutation for Appliers and puts results of splitted mutations and
// Applier IDs into "mvector" and "nodeIDs" on return.
void splitMutation(const KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m.toString());
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
std::map<Key, UID>::iterator itlow, itup; // we will return [itlow, itup)
itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if (itlow == pRangeToApplier->end()) {
--itlow;
mvector.push_back_deep(mvector_arena, m);
nodeIDs.push_back(nodeIDs_arena, itlow->second);
return;
}
if (itlow->first > m.param1) {
if (itlow != pRangeToApplier->begin()) {
--itlow;
}
}
itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2);
std::map<Key, UID>::iterator itApplier;
while (itlow != itup) {
Standalone<MutationRef> curm; // current mutation
curm.type = m.type;
// The first split mutation should starts with m.first.
// The later ones should start with the rangeToApplier boundary.
if (m.param1 > itlow->first) {
curm.param1 = m.param1;
} else {
curm.param1 = itlow->first;
}
itApplier = itlow;
itlow++;
if (itlow == itup) {
ASSERT(m.param2 <= normalKeys.end);
curm.param2 = m.param2;
} else if (m.param2 < itlow->first) {
UNREACHABLE();
curm.param2 = m.param2;
} else {
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
auto r = krMap.intersectingRanges(KeyRangeRef(m.param1, m.param2));
for (auto i = r.begin(); i != r.end(); ++i) {
// Calculate the overlap range
KeyRef rangeBegin = m.param1 > i->range().begin ? m.param1 : i->range().begin;
KeyRef rangeEnd = m.param2 < i->range().end ? m.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
mvector.push_back_deep(mvector_arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
nodeIDs.push_back(nodeIDs_arena, i->cvalue());
}
}
@ -1007,6 +983,63 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
return Void();
}
namespace {
void oldSplitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
std::map<Key, UID>::iterator itlow, itup; // we will return [itlow, itup)
itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if (itlow == pRangeToApplier->end()) {
--itlow;
mvector.push_back_deep(mvector_arena, m);
nodeIDs.push_back(nodeIDs_arena, itlow->second);
return;
}
if (itlow->first > m.param1) {
if (itlow != pRangeToApplier->begin()) {
--itlow;
}
}
itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2);
std::map<Key, UID>::iterator itApplier;
while (itlow != itup) {
Standalone<MutationRef> curm; // current mutation
curm.type = m.type;
// The first split mutation should starts with m.first.
// The later ones should start with the rangeToApplier boundary.
if (m.param1 > itlow->first) {
curm.param1 = m.param1;
} else {
curm.param1 = itlow->first;
}
itApplier = itlow;
itlow++;
if (itlow == itup) {
ASSERT(m.param2 <= normalKeys.end);
curm.param2 = m.param2;
} else if (m.param2 < itlow->first) {
UNREACHABLE();
curm.param2 = m.param2;
} else {
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
}
}
// Test splitMutation
TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
std::map<Key, UID> rangeToApplier;
@ -1028,77 +1061,45 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
Key endK = k1 < k2 ? k2 : k1;
Standalone<MutationRef> mutation(MutationRef(MutationRef::ClearRange, beginK.contents(), endK.contents()));
// Method 1: Use splitMutation
splitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
// Method 1: Use old splitMutation
oldSplitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(),
nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
// Method 2: Use intersection
// Method 2: Use new intersection based method
KeyRangeMap<UID> krMap;
std::map<Key, UID>::iterator beginKey = rangeToApplier.begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", endKey->first)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", normalKeys.end)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
buildApplierRangeMap(&krMap, &rangeToApplier);
MutationsVec mvector2;
Standalone<VectorRef<UID>> nodeIDs2;
splitMutation(krMap, mutation, mvector2.arena(), mvector2.contents(), nodeIDs2.arena(), nodeIDs2.contents());
ASSERT(mvector2.size() == nodeIDs2.size());
ASSERT(mvector.size() == mvector2.size());
int splitMutationIndex = 0;
auto r = krMap.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2));
bool correctResult = true;
for (auto i = r.begin(); i != r.end(); ++i) {
// intersectionRange result
// Calculate the overlap range
KeyRef rangeBegin = mutation.param1 > i->range().begin ? mutation.param1 : i->range().begin;
KeyRef rangeEnd = mutation.param2 < i->range().end ? mutation.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
UID nodeID = i->value();
// splitMuation result
if (splitMutationIndex >= mvector.size()) {
correctResult = false;
break;
}
MutationRef result2M = mvector[splitMutationIndex];
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
MutationRef result = mvector[splitMutationIndex];
MutationRef result2 = mvector2[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
KeyRange krange2(KeyRangeRef(result2M.param1, result2M.param2));
UID applierID2 = nodeIDs2[splitMutationIndex];
KeyRange krange(KeyRangeRef(result.param1, result.param2));
KeyRange krange2(KeyRangeRef(result2.param1, result2.param2));
TraceEvent("Result")
.detail("KeyRange1", krange1.toString())
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
if (krange1 != krange2 || nodeID != applierID) {
correctResult = false;
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
if (krange != krange2 || applierID != applierID2) {
TraceEvent(SevError, "IncorrectResult")
.detail("Mutation", mutation.toString())
.detail("KeyRange1", krange1.toString())
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
}
splitMutationIndex++;
}
if (splitMutationIndex != mvector.size()) {
correctResult = false;
TraceEvent(SevError, "SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Results", mvector.size());
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
TraceEvent("SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Result", mvector[splitMutationIndex].toString());
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
}
}
return Void();
}
} // namespace

View File

@ -447,7 +447,6 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageCacheData* data, Version ve
ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
state int64_t resultSize = 0;
try {
++data->counters.getValueQueries;
++data->counters.allQueries;
@ -457,12 +456,13 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
//TODO what's this?
wait( delay(0, TaskPriority::DefaultEndpoint) );
if( req.debugID.present() )
if( req.debugID.present() ) {
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
//FIXME
}
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );
@ -1077,7 +1077,7 @@ void coalesceCacheRanges(StorageCacheData *data, KeyRangeRef keys) {
bool lastReadable = false;
bool lastNotAssigned = false;
KeyRangeMap<Reference<CacheRangeInfo>>::Iterator lastRange;
KeyRangeMap<Reference<CacheRangeInfo>>::iterator lastRange;
for( ; iter != iterEnd; ++iter) {
if( lastReadable && iter->value()->isReadable() ) {

View File

@ -259,7 +259,7 @@ struct StorageServerMetrics {
// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
// Should not be called for keys past allKeys.end
void notifyBytes( RangeMap<Key, std::vector<PromiseStream<StorageMetrics>>, KeyRangeRef>::Iterator shard, int64_t bytes ) {
void notifyBytes( RangeMap<Key, std::vector<PromiseStream<StorageMetrics>>, KeyRangeRef>::iterator shard, int64_t bytes ) {
ASSERT(shard.end() <= allKeys.end);
StorageMetrics notifyMetrics;

View File

@ -20,6 +20,9 @@
// There's something in one of the files below that defines a macros
// a macro that makes boost interprocess break on Windows.
#include "flow/Tracing.h"
#include <cctype>
#include <iterator>
#define BOOST_DATE_TIME_NO_LIB
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/algorithm/string.hpp>
@ -78,7 +81,7 @@
// clang-format off
enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_NEWCONSOLE,
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE,
OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID,
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK,
OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
@ -111,6 +114,7 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_MAXLOGSSIZE, "--maxlogssize", SO_REQ_SEP },
{ OPT_LOGGROUP, "--loggroup", SO_REQ_SEP },
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
{ OPT_TRACER, "--tracer", SO_REQ_SEP },
#ifdef _WIN32
{ OPT_NEWCONSOLE, "-n", SO_NONE },
{ OPT_NEWCONSOLE, "--newconsole", SO_NONE },
@ -514,6 +518,9 @@ static void printUsage( const char *name, bool devhelp ) {
printf(" --trace_format FORMAT\n"
" Select the format of the log files. xml (the default) and json\n"
" are supported.\n");
printf(" --tracer TRACER\n"
" Select a tracer for transaction tracing. Currently disabled\n"
" (the default) and log_file are supported.\n");
printf(" -i ID, --machine_id ID\n"
" Machine and zone identifier key (up to 16 hex characters).\n"
" Defaults to a random value shared by all fdbserver processes\n"
@ -1169,6 +1176,22 @@ private:
break;
}
#endif
case OPT_TRACER:
{
std::string arg = args.OptionArg();
std::string tracer;
std::transform(arg.begin(), arg.end(), std::back_inserter(tracer), [](char c) { return tolower(c); });
if (tracer == "none" || tracer == "disabled") {
openTracer(TracerType::DISABLED);
} else if (tracer == "logfile" || tracer == "file" || tracer == "log_file") {
openTracer(TracerType::LOG_FILE);
} else {
fprintf(stderr, "ERROR: Unknown or unsupported tracer: `%s'", args.OptionArg());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
break;
}
case OPT_TESTFILE:
testFile = args.OptionArg();
break;

View File

@ -921,6 +921,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
}
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, { req.spanContext });
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes
if (proxyItr == self->lastProxyVersionReplies.end()) {

View File

@ -21,6 +21,9 @@
#include <cinttypes>
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
@ -846,7 +849,8 @@ updateProcessStats(StorageServer* self)
#pragma region Queries
#endif
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version) {
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
state Span span("SS.WaitForVersion"_loc, { spanContext });
choose {
when(wait(data->version.whenAtLeast(version))) {
// FIXME: A bunch of these can block with or without the following delay 0.
@ -865,7 +869,7 @@ ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version)
}
}
Future<Version> waitForVersion(StorageServer* data, Version version) {
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
}
@ -883,7 +887,7 @@ Future<Version> waitForVersion(StorageServer* data, Version version) {
if (deterministicRandom()->random01() < 0.001) {
TraceEvent("WaitForVersion1000x");
}
return waitForVersionActor(data, version);
return waitForVersionActor(data, version, spanContext);
}
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
@ -909,6 +913,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version versi
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
state int64_t resultSize = 0;
Span span("SS:getValue"_loc, { req.spanContext });
try {
++data->counters.getValueQueries;
@ -924,7 +929,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
@ -1008,7 +1013,9 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
return Void();
};
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req, SpanID parent ) {
state Location spanLocation = "SS:WatchValueImpl"_loc;
state Span span(spanLocation, { parent });
try {
++data->counters.watchQueries;
@ -1025,9 +1032,10 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
try {
state Version latest = data->version.get();
TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version
GetValueRequest getReq( req.key, latest, req.tags, req.debugID );
GetValueRequest getReq( span.context, req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
span = Span(spanLocation, parent);
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
if(reply.error.present()) {
@ -1094,7 +1102,8 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
}
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
state Future<Void> watch = watchValue_impl( data, req );
state Span span("SS:watchValue"_loc, { req.spanContext });
state Future<Void> watch = watchValue_impl( data, req, span.context );
state double startTime = now();
loop {
@ -1199,7 +1208,7 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
// readRange has O(|result|) + O(log |data|) cost
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes, SpanID parentSpan ) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vCurrent = view.end();
@ -1207,6 +1216,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount = 0;
state Span span("SS:readRange"_loc, parentSpan);
// for caching the storage queue results during the first PTree traversal
state VectorRef<KeyValueRef> resultCache;
@ -1378,7 +1388,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
//}
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset, SpanID parentSpan)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -1395,6 +1405,7 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1-sel.offset;
state Span span("SS.findKey"_loc, { parentSpan });
//Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
state int maxBytes;
@ -1403,14 +1414,18 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
else
maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
state GetKeyValuesReply rep = wait(
readRange(data, version,
forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())),
(distance + skipEqualKey) * sign, &maxBytes, span.context));
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
//If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
if(more && !forward && rep.data.size() == 1) {
TEST(true); //Reverse key selector returned only one result in range read
maxBytes = std::numeric_limits<int>::max();
GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
GetKeyValuesReply rep2 =
wait(readRange(data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span.context));
rep = rep2;
more = rep.more && rep.data.size() != distance + skipEqualKey;
ASSERT(rep.data.size() == 2 || !more);
@ -1469,6 +1484,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
// all data from being read in one range read
{
state Span span("SS:getKeyValues"_loc, { req.spanContext });
state int64_t resultSize = 0;
++data->counters.getRangeQueries;
@ -1490,7 +1506,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
try {
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, span.context ) );
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -1508,8 +1524,12 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
state int offset1;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
? Future<Key>(req.begin.getKey())
: findKey(data, req.begin, version, shard, &offset1, span.context);
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
? Future<Key>(req.end.getKey())
: findKey(data, req.end, version, shard, &offset2, span.context);
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if( req.debugID.present() )
@ -1543,7 +1563,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
} else {
state int remainingLimitBytes = req.limitBytes;
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context) );
GetKeyValuesReply r = _r;
if( req.debugID.present() )
@ -1606,6 +1626,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
}
ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
state Span span("SS:getKey"_loc, { req.spanContext });
state int64_t resultSize = 0;
++data->counters.getKeyQueries;
@ -1618,12 +1639,12 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
wait( delay(0, TaskPriority::DefaultEndpoint) );
try {
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange( data, req.sel );
state int offset;
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
Key k = wait( findKey( data, req.sel, version, shard, &offset, req.spanContext ) );
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
@ -1982,7 +2003,7 @@ void coalesceShards(StorageServer *data, KeyRangeRef keys) {
bool lastReadable = false;
bool lastNotAssigned = false;
KeyRangeMap<Reference<ShardInfo>>::Iterator lastRange;
KeyRangeMap<Reference<ShardInfo>>::iterator lastRange;
for( ; iter != iterEnd; ++iter) {
if( lastReadable && iter->value()->isReadable() ) {

View File

@ -21,6 +21,7 @@
#include <math.h>
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -376,12 +377,16 @@ struct ConsistencyCheckWorkload : TestWorkload
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
while (begin < end) {
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture(false));
keyServerLocationFutures.clear();
for (int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0));
keyServerLocationFutures.push_back(
proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations)
.getReplyUnlessFailedFor(
GetKeyServerLocationsRequest(span.context, begin, end, limitKeyServers, false, Arena()), 2, 0));
state bool keyServersInsertedForThisIteration = false;
choose {

View File

@ -18,15 +18,21 @@
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/serialize.h"
#include <cstring>
struct CycleWorkload : TestWorkload {
int actorCount, nodeCount;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond, traceParentProbability;
Key keyPrefix;
vector<Future<Void>> clients;
@ -38,12 +44,13 @@ struct CycleWorkload : TestWorkload {
transactions("Transactions"), retries("Retries"), totalLatency("Latency"),
tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed")
{
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount;
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
nodeCount = getOption(options, LiteralStringRef("nodeCount"), transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("")).toString() );
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, LiteralStringRef("expectedRate"), 0.7);
testDuration = getOption( options, "testDuration"_sr, 10.0 );
transactionsPerSecond = getOption( options, "transactionsPerSecond"_sr, 5000.0 ) / clientCount;
actorCount = getOption( options, "actorsPerClient"_sr, transactionsPerSecond / 5 );
nodeCount = getOption(options, "nodeCount"_sr, transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, "keyPrefix"_sr, LiteralStringRef("")).toString() );
traceParentProbability = getOption(options, "traceParentProbability "_sr, 0.01);
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, "expectedRate"_sr, 0.7);
}
virtual std::string description() { return "CycleWorkload"; }
@ -98,6 +105,12 @@ struct CycleWorkload : TestWorkload {
state double tstart = now();
state int r = deterministicRandom()->randomInt(0, self->nodeCount);
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("CycleClient"_loc);
TraceEvent("CycleTracingTransaction", span.context);
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span.context, Unversioned()));
}
while (true) {
try {
// Reverse next and next^2 node

View File

@ -20,6 +20,8 @@
#ifndef FLOW_ARENA_H
#define FLOW_ARENA_H
#include <array>
#include <iterator>
#pragma once
#include "flow/FastAlloc.h"
@ -632,6 +634,9 @@ struct Traceable<Standalone<T>> : std::conditional<Traceable<T>::value, std::tru
};
#define LiteralStringRef( str ) StringRef( (const uint8_t*)(str), sizeof((str))-1 )
inline StringRef operator "" _sr(const char* str, size_t size) {
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
}
// makeString is used to allocate a Standalone<StringRef> of a known length for later
// mutation (via mutateString). If you need to append to a string of unknown length,
@ -982,7 +987,7 @@ public:
void extendUnsafeNoReallocNoInit(int amount) { m_size += amount; }
private:
protected:
T* data;
int m_size, m_capacity;
@ -998,7 +1003,351 @@ private:
}
};
template<class T>
// This is a VectorRef that optimizes for tiny to small sizes.
// It keeps the first #InlineMembers on the stack - which means
// that all of them are always copied. This should be faster
// when you expect the vector to be usually very small as it
// won't need allocations in these cases.
template <class T, int InlineMembers = 1>
class SmallVectorRef {
public:
// types
template <bool isConst>
class iterator_impl {
using self_t = iterator_impl<isConst>;
using VecType = SmallVectorRef<T, InlineMembers>;
std::conditional_t<isConst, const VecType*, VecType*> vec = nullptr;
int idx = 0;
public:
using Category = std::random_access_iterator_tag;
using value_type = std::conditional_t<isConst, const T, T>;
using difference_type = int;
using pointer = value_type*;
using reference = value_type&;
friend class SmallVectorRef<T, InlineMembers>;
template<bool I>
friend bool operator<(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator>(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator<=(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator>=(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend self_t operator+(const iterator_impl<I>&, difference_type);
template<bool I>
friend self_t operator+(difference_type, const self_t&);
template<bool I>
friend self_t operator-(const iterator_impl<I>&, difference_type);
template<bool I>
friend difference_type operator-(iterator_impl<I>, self_t);
self_t& operator++() {
++idx;
return *this;
}
self_t operator++(int) {
auto res = *this;
++res;
return res;
}
self_t& operator--() {
--idx;
return *this;
}
self_t operator--(int) {
auto res = *this;
--res;
return res;
}
self_t& operator+=(difference_type diff) {
idx += diff;
return *this;
}
self_t& operator-=(difference_type diff) {
idx -= diff;
return *this;
}
bool operator!=(self_t const& o) const { return vec != o.vec || idx != o.idx; }
bool operator==(self_t const& o) const { return vec == o.vec && idx == o.idx; }
reference operator[](difference_type i) const { return get(idx + i); }
reference& get(int i) const {
if (i < InlineMembers) {
return vec->arr[i];
} else {
return vec->data[i];
}
}
reference get() const { return get(idx); }
reference operator*() const { return get(); }
pointer operator->() const { return &get(); }
};
using const_iterator = iterator_impl<true>;
using iterator = iterator_impl<false>;
using reverse_iterator = std::reverse_iterator<iterator>;
using const_reverse_iterator = std::reverse_iterator<const_iterator>;
public: // Construction
static_assert(std::is_trivially_destructible_v<T>);
SmallVectorRef() {}
SmallVectorRef(const SmallVectorRef<T, InlineMembers>& other)
: m_size(other.m_size), m_capacity(std::max(other.m_capacity, InlineMembers)), arr(other.arr), data(other.data) {}
SmallVectorRef& operator=(const SmallVectorRef<T, InlineMembers>& other) {
m_size = other.m_size;
m_capacity = other.m_capacity;
arr = other.arr;
data = other.data;
return *this;
}
template <class T2 = T, int IM = InlineMembers>
SmallVectorRef(Arena& arena, const SmallVectorRef<T, IM>& toCopy,
typename std::enable_if<!flow_ref<T2>::value, int>::type = 0)
: m_size(toCopy.m_size), m_capacity(std::max(InlineMembers, toCopy.m_capacity)),
data(toCopy.m_size <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (toCopy.m_size - InlineMembers)]) {
std::copy(toCopy.cbegin(), toCopy.cend(), begin());
}
template <class T2 = T, int IM = InlineMembers>
SmallVectorRef(Arena& arena, const SmallVectorRef<T2, IM>& toCopy,
typename std::enable_if<flow_ref<T2>::value, int>::type = 0)
: m_size(toCopy.m_size), m_capacity(std::max(toCopy.m_capacity, InlineMembers)),
data(toCopy.m_size <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (toCopy.m_size - InlineMembers)]) {
for (int i = 0; i < toCopy.m_size; ++i) {
if (i < arr.size()) {
new (&arr[i]) T(arena, toCopy[i]);
} else {
new (&data[i - InlineMembers]) T(arena, toCopy[i]);
}
}
std::copy(toCopy.cbegin(), toCopy.cend(), begin());
}
template <class It>
SmallVectorRef(Arena& arena, It first, It last)
: m_size(0), m_capacity(std::max(int(std::distance(first, last)), InlineMembers)),
data(m_capacity <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (m_capacity - InlineMembers)]) {
while (first != last && m_size < InlineMembers) {
new (&arr[m_size++]) T(*(first++));
}
while (first != last) {
new (&arr[m_size++ - InlineMembers]) T(*(first++));
}
}
SmallVectorRef(SmallVectorRef<T, InlineMembers>&& o)
: m_size(o.m_size), m_capacity(o.m_capacity), arr(std::move(o.arr)), data(o.data) {
o.m_size = 0;
o.m_capacity = InlineMembers;
o.data = nullptr;
}
public: // information
int size() const { return m_size; }
int capacity() const { return m_capacity; }
bool empty() const { return m_size == 0; }
public: // element access
T const& front() const { return *cbegin(); }
T const& back() const { return *crbegin(); }
T& front() { return *begin(); }
T& back() { return *rbegin(); }
T const& operator[](int i) const {
if (i < InlineMembers) {
return arr[i];
} else {
return data[i - InlineMembers];
}
}
public: // Modification
void push_back(Arena& arena, T const& value) {
UNSTOPPABLE_ASSERT(m_capacity >= m_size && m_capacity >= InlineMembers);
if (m_size < InlineMembers) {
new (&arr[m_size++]) T(value);
return;
}
if (m_size == m_capacity) {
reallocate(arena, m_capacity + 1);
}
new (&data[m_size++ - InlineMembers]) T(value);
}
void push_back_deep(Arena& arena, T const& value) {
if (m_size < InlineMembers) {
new (&arr[m_size++]) T(arena, value);
return;
}
if (m_size == m_capacity) {
reallocate(arena, m_capacity + 1);
}
new (&data[m_size++ - InlineMembers]) T(arena, value);
}
void pop_back() { --m_size; }
template <class It>
void append(Arena& arena, It first, It last) {
if (first == last) {
return;
}
auto d = std::distance(first, last);
if (m_size + d < m_capacity) {
reallocate(arena, m_capacity);
}
while (first != last && m_size < InlineMembers) {
new (&(arr[m_size++])) T(*(first++));
}
while (first != last) {
new (&data[m_size++ - InlineMembers]) T(*(first++));
}
}
template <class It>
void append_deep(Arena& arena, It first, It last) {
if (first == last) {
return;
}
auto d = std::distance(first, last);
if (m_size + d < m_capacity) {
reallocate(arena, m_capacity);
}
while (first != last && m_size < InlineMembers) {
new (&(arr[m_size++])) T(arena, *(first++));
}
while (first != last) {
new (&data[m_size++ - InlineMembers]) T(arena, *(first++));
}
}
public: // iterator access
iterator begin() {
iterator res;
res.vec = this;
res.idx = 0;
return res;
}
const_iterator cbegin() const {
const_iterator res;
res.vec = this;
res.idx = 0;
return res;
}
const_iterator begin() const { return cbegin(); }
iterator end() {
iterator res;
res.vec = this;
res.idx = m_size;
return res;
}
const_iterator cend() const {
const_iterator res;
res.vec = this;
res.idx = m_size;
return res;
}
const_iterator end() const { return cend(); }
reverse_iterator rbegin() { return reverse_iterator(end()); }
const_reverse_iterator crbegin() const { return const_reverse_iterator(cend()); }
const_reverse_iterator rbegin() const { return crbegin(); }
reverse_iterator rend() { return reverse_iterator(begin()); }
const_reverse_iterator crend() const { return const_reverse_iterator(begin()); }
const_reverse_iterator rend() const { return crend(); }
private: // memory management
void reallocate(Arena& p, int requiredCapacity) {
requiredCapacity = std::max(m_capacity * 2, requiredCapacity);
// SOMEDAY: Maybe we are right at the end of the arena and can expand cheaply
T* newData = new (p) T[requiredCapacity - InlineMembers];
if (m_size > InlineMembers) {
std::move(data, data + m_size - InlineMembers, newData);
}
data = newData;
m_capacity = requiredCapacity;
}
private:
int m_size = 0, m_capacity = InlineMembers;
std::array<T, InlineMembers> arr;
T* data = nullptr;
};
template <class T, int InlineMembers, bool isConst>
bool operator<(const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& lhs,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& rhs) {
return lhs.idx < rhs.idx;
}
template <class T, int InlineMembers, bool isConst>
bool operator>(const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& lhs,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& rhs) {
return lhs.idx > rhs.idx;
}
template <class T, int InlineMembers, bool isConst>
bool operator<=(const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& lhs,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& rhs) {
return lhs.idx <= rhs.idx;
}
template <class T, int InlineMembers, bool isConst>
bool operator>=(const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& lhs,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& rhs) {
return lhs.idx >= rhs.idx;
}
template <class T, int InlineMembers, bool isConst>
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst> operator+(
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& iter,
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>::difference_type diff) {
auto res = iter;
res.idx += diff;
return res;
}
template <class T, int InlineMembers, bool isConst>
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst> operator+(
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>::difference_type diff,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& iter) {
auto res = iter;
res.idx += diff;
return res;
}
template <class T, int InlineMembers, bool isConst>
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst> operator-(
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& iter,
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>::difference_type diff) {
auto res = iter;
res.idx -= diff;
return res;
}
template <class T, int InlineMembers, bool isConst>
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst> operator-(
typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>::difference_type diff,
const typename SmallVectorRef<T, InlineMembers>::template iterator_impl<isConst>& iter) {
auto res = iter;
res.idx -= diff;
return res;
}
template <class T>
struct Traceable<VectorRef<T>> {
constexpr static bool value = Traceable<T>::value;

View File

@ -28,6 +28,7 @@ set(FLOW_SRCS
IRandom.h
IThreadPool.cpp
IThreadPool.h
ITrace.h
IndexedSet.actor.h
IndexedSet.cpp
IndexedSet.h
@ -62,6 +63,8 @@ set(FLOW_SRCS
ThreadSafeQueue.h
Trace.cpp
Trace.h
Tracing.h
Tracing.cpp
TreeBenchmark.h
UnitTest.cpp
UnitTest.h

View File

@ -48,6 +48,36 @@
#include <fcntl.h>
#include <cmath>
struct IssuesListImpl {
IssuesListImpl(){}
void addIssue(std::string issue) {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) {
MutexHolder h(mutex);
issues.erase(issue);
}
private:
Mutex mutex;
std::set<std::string> issues;
};
IssuesList::IssuesList() : impl(new IssuesListImpl{}) {}
IssuesList::~IssuesList() { delete impl; }
void IssuesList::addIssue(std::string issue) { impl->addIssue(issue); }
void IssuesList::retrieveIssues(std::set<std::string> &out) { impl->retrieveIssues(out); }
void IssuesList::resolveIssue(std::string issue) { impl->resolveIssue(issue); }
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename,
std::string extension, uint64_t maxLogsSize, std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues)
@ -72,8 +102,16 @@ void FileTraceLogWriter::lastError(int err) {
}
void FileTraceLogWriter::write(const std::string& str) {
auto ptr = str.c_str();
int remaining = str.size();
write(str.data(), str.size());
}
void FileTraceLogWriter::write(const StringRef& str) {
write(reinterpret_cast<const char*>(str.begin()), str.size());
}
void FileTraceLogWriter::write(const char* str, size_t len) {
auto ptr = str;
int remaining = len;
bool needsResolve = false;
while ( remaining ) {

View File

@ -23,11 +23,29 @@
#define FLOW_FILE_TRACE_LOG_WRITER_H
#pragma once
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include <functional>
struct IssuesListImpl;
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList();
virtual ~IssuesList();
void addIssue(std::string issue) override;
void retrieveIssues(std::set<std::string>& out) override;
void resolveIssue(std::string issue) override;
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
IssuesListImpl* impl;
};
class FileTraceLogWriter : public ITraceLogWriter, ReferenceCounted<FileTraceLogWriter> {
private:
std::string directory;
@ -42,6 +60,8 @@ private:
std::function<void()> onError;
void write(const char* str, size_t size);
public:
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension,
uint64_t maxLogsSize, std::function<void()> onError, Reference<ITraceLogIssuesReporter> issues);
@ -51,11 +71,12 @@ public:
void lastError(int err);
void write(const std::string& str);
void open();
void close();
void roll();
void sync();
void write(const std::string& str) override;
void write(StringRef const& str) override;
void open() override;
void close() override;
void roll() override;
void sync() override;
void cleanupTraceFiles();
};

View File

@ -109,5 +109,41 @@ private:
Reference<IThreadPool> createGenericThreadPool();
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
#endif

61
flow/ITrace.h Normal file
View File

@ -0,0 +1,61 @@
/*
* ITrace.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <set>
class StringRef;
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void write(const StringRef&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class TraceEventFields;
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual ~ITraceLogIssuesReporter();
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};

View File

@ -21,6 +21,7 @@
#include "flow/Trace.h"
#include "flow/FileTraceLogWriter.h"
#include "flow/Knobs.h"
#include "flow/XmlTraceLogFormatter.h"
#include "flow/JsonTraceLogFormatter.h"
#include "flow/flow.h"
@ -54,40 +55,7 @@
// during an open trace event
thread_local int g_allocation_tracing_disabled = 1;
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {}
struct SuppressionMap {
struct SuppressionInfo {
@ -229,33 +197,6 @@ public:
}
};
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList(){};
void addIssue(std::string issue) override {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) override {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) override {
MutexHolder h(mutex);
issues.erase(issue);
}
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
Mutex mutex;
std::set<std::string> issues;
};
Reference<IssuesList> issues;
Reference<BarrierList> barriers;

View File

@ -31,6 +31,7 @@
#include <type_traits>
#include "flow/IRandom.h"
#include "flow/Error.h"
#include "flow/ITrace.h"
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
@ -516,36 +517,7 @@ private:
bool init( struct TraceInterval& );
};
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class StringRef;
struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}

84
flow/Tracing.cpp Normal file
View File

@ -0,0 +1,84 @@
/*
* Tracing.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Tracing.h"
namespace {
struct NoopTracer : ITracer {
TracerType type() const { return TracerType::DISABLED; }
void trace(Span const& span) override {}
};
struct LogfileTracer : ITracer {
TracerType type() const { return TracerType::LOG_FILE; }
void trace(Span const& span) override {
TraceEvent te(SevInfo, "TracingSpan", span.context);
te.detail("Location", span.location.name).detail("Begin", span.begin).detail("End", span.end);
if (span.parents.size() == 1) {
te.detail("Parent", *span.parents.begin());
} else {
for (auto parent : span.parents) {
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
}
}
}
};
ITracer* g_tracer = new NoopTracer();
} // namespace
void openTracer(TracerType type) {
if (g_tracer->type() == type) {
return;
}
delete g_tracer;
switch (type) {
case TracerType::DISABLED:
g_tracer = new NoopTracer{};
break;
case TracerType::LOG_FILE:
g_tracer = new LogfileTracer{};
break;
}
}
ITracer::~ITracer() {}
Span& Span::operator=(Span&& o) {
if (begin > 0.0) {
end = g_network->now();
g_tracer->trace(*this);
}
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
return *this;
}
Span::~Span() {
if (begin > 0.0) {
g_tracer->trace(*this);
}
}

98
flow/Tracing.h Normal file
View File

@ -0,0 +1,98 @@
/*
* Tracing.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/FDBTypes.h"
#include "flow/IRandom.h"
#include <unordered_set>
#include <atomic>
struct Location {
StringRef name;
};
inline Location operator"" _loc(const char* str, size_t size) {
return Location{ StringRef(reinterpret_cast<const uint8_t*>(str), size) };
}
struct Span {
Span(SpanID context, Location location, std::initializer_list<SpanID> const& parents = {})
: context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {}
Span(Location location, std::initializer_list<SpanID> const& parents = {})
: Span(deterministicRandom()->randomUniqueID(), location, parents) {}
Span(Location location, SpanID context) : Span(location, { context }) {}
Span(const Span&) = delete;
Span(Span&& o) {
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.context = UID();
o.begin = 0.0;
o.end = 0.0;
}
Span() {}
~Span();
Span& operator=(Span&& o);
Span& operator=(const Span&) = delete;
void swap(Span& other) {
std::swap(arena, other.arena);
std::swap(context, other.context);
std::swap(begin, other.begin);
std::swap(end, other.end);
std::swap(location, other.location);
std::swap(parents, other.parents);
}
void addParent(SpanID span) { parents.push_back(arena, span); }
Arena arena;
UID context = UID();
double begin = 0.0, end = 0.0;
Location location;
SmallVectorRef<SpanID> parents;
};
enum class TracerType { DISABLED, LOG_FILE };
struct ITracer {
virtual ~ITracer();
virtual TracerType type() const = 0;
// passed ownership to the tracer
virtual void trace(Span const& span) = 0;
};
void openTracer(TracerType type);
template <class T>
struct SpannedDeque : Deque<T> {
Span span;
explicit SpannedDeque(Location loc) : span(deterministicRandom()->randomUniqueID(), loc) {}
SpannedDeque(SpannedDeque&& other) : Deque<T>(std::move(other)), span(std::move(other.span)) {}
SpannedDeque(SpannedDeque const&) = delete;
SpannedDeque& operator=(SpannedDeque const&) = delete;
SpannedDeque& operator=(SpannedDeque&& other) {
*static_cast<Deque<T>*>(this) = std::move(other);
span = std::move(other.span);
}
};

View File

@ -232,7 +232,6 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupApiCorrectnessAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreTestSplitMutation.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
useDB=false
startDelay=0
testName=UnitTests
maxTestCases=0
testsMatching=/FastRestore/RestoreLoader/splitMutation