merge master
This commit is contained in:
commit
9640ba42d3
|
@ -3,7 +3,7 @@ set(SRCS
|
|||
Properties/AssemblyInfo.cs)
|
||||
|
||||
set(TEST_HARNESS_REFERENCES
|
||||
"-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,${TraceLogHelperDll}")
|
||||
"-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,System.Runtime.Serialization,${TraceLogHelperDll}")
|
||||
|
||||
set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe)
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ using System.Diagnostics;
|
|||
using System.ComponentModel;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Xml;
|
||||
using System.Runtime.Serialization.Json;
|
||||
|
||||
namespace SummarizeTest
|
||||
{
|
||||
|
@ -362,20 +363,22 @@ namespace SummarizeTest
|
|||
{
|
||||
ErrorOutputListener errorListener = new ErrorOutputListener();
|
||||
process.StartInfo.UseShellExecute = false;
|
||||
string tlsPluginArg = "";
|
||||
if (tlsPluginFile.Length > 0) {
|
||||
process.StartInfo.EnvironmentVariables["FDB_TLS_PLUGIN"] = tlsPluginFile;
|
||||
tlsPluginArg = "--tls_plugin=" + tlsPluginFile;
|
||||
}
|
||||
process.StartInfo.RedirectStandardOutput = true;
|
||||
var args = "";
|
||||
if (willRestart && oldBinaryName.EndsWith("alpha6"))
|
||||
{
|
||||
args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash",
|
||||
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile);
|
||||
args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
|
||||
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
|
||||
}
|
||||
else
|
||||
{
|
||||
args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} --tls_plugin={4} --crash",
|
||||
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginFile);
|
||||
args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
|
||||
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
|
||||
}
|
||||
if (restarting) args = args + " --restarting";
|
||||
if (useValgrind && !willRestart)
|
||||
|
@ -480,7 +483,7 @@ namespace SummarizeTest
|
|||
memCheckThread.Join();
|
||||
consoleThread.Join();
|
||||
|
||||
var traceFiles = Directory.GetFiles(tempPath, "trace*.xml");
|
||||
var traceFiles = Directory.GetFiles(tempPath, "trace*.*").Where(s => s.EndsWith(".xml") || s.EndsWith(".json")).ToArray();
|
||||
if (traceFiles.Length == 0)
|
||||
{
|
||||
if (!traceToStdout)
|
||||
|
@ -661,6 +664,10 @@ namespace SummarizeTest
|
|||
return whats.ToArray();
|
||||
}
|
||||
|
||||
delegate IEnumerable<Magnesium.Event> parseDelegate(System.IO.Stream stream, string file,
|
||||
bool keepOriginalElement = false, double startTime = -1, double endTime = Double.MaxValue,
|
||||
double samplingFactor = 1.0);
|
||||
|
||||
static int Summarize(string[] traceFiles, string summaryFileName,
|
||||
string errorFileName, bool? killed, List<string> outputErrors, int? exitCode, long? peakMemory,
|
||||
string uid, string valgrindOutputFileName, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError,
|
||||
|
@ -692,7 +699,12 @@ namespace SummarizeTest
|
|||
{
|
||||
try
|
||||
{
|
||||
foreach (var ev in Magnesium.XmlParser.Parse(traceFile, traceFileName))
|
||||
parseDelegate parse;
|
||||
if (traceFileName.EndsWith(".json"))
|
||||
parse = Magnesium.JsonParser.Parse;
|
||||
else
|
||||
parse = Magnesium.XmlParser.Parse;
|
||||
foreach (var ev in parse(traceFile, traceFileName))
|
||||
{
|
||||
Magnesium.Severity newSeverity;
|
||||
if (severityMap.TryGetValue(new KeyValuePair<string, Magnesium.Severity>(ev.Type, ev.Severity), out newSeverity))
|
||||
|
@ -1092,10 +1104,20 @@ namespace SummarizeTest
|
|||
|
||||
private static void AppendToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, bool shouldLock = true)
|
||||
{
|
||||
bool useXml = true;
|
||||
if (summaryFileName != null && summaryFileName.EndsWith(".json")) {
|
||||
useXml = false;
|
||||
}
|
||||
|
||||
if (traceToStdout)
|
||||
{
|
||||
using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) }))
|
||||
xout.WriteTo(wr);
|
||||
if (useXml) {
|
||||
using (var wr = System.Xml.XmlWriter.Create(Console.OpenStandardOutput(), new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true, Encoding = new System.Text.UTF8Encoding(false) }))
|
||||
xout.WriteTo(wr);
|
||||
} else {
|
||||
using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(Console.OpenStandardOutput()))
|
||||
xout.WriteTo(wr);
|
||||
}
|
||||
Console.WriteLine();
|
||||
return;
|
||||
}
|
||||
|
@ -1106,7 +1128,6 @@ namespace SummarizeTest
|
|||
takeLock(summaryFileName);
|
||||
try
|
||||
{
|
||||
|
||||
using (var f = System.IO.File.Open(summaryFileName, System.IO.FileMode.Append, System.IO.FileAccess.Write))
|
||||
{
|
||||
if (f.Length == 0)
|
||||
|
@ -1114,8 +1135,13 @@ namespace SummarizeTest
|
|||
byte[] bytes = Encoding.UTF8.GetBytes("<Trace>");
|
||||
f.Write(bytes, 0, bytes.Length);
|
||||
}
|
||||
using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true }))
|
||||
xout.Save(wr);
|
||||
if (useXml) {
|
||||
using (var wr = System.Xml.XmlWriter.Create(f, new System.Xml.XmlWriterSettings() { OmitXmlDeclaration = true }))
|
||||
xout.Save(wr);
|
||||
} else {
|
||||
using (var wr = System.Runtime.Serialization.Json.JsonReaderWriterFactory.CreateJsonWriter(f))
|
||||
xout.WriteTo(wr);
|
||||
}
|
||||
var endl = Encoding.UTF8.GetBytes(Environment.NewLine);
|
||||
f.Write(endl, 0, endl.Length);
|
||||
}
|
||||
|
@ -1126,6 +1152,7 @@ namespace SummarizeTest
|
|||
releaseLock(summaryFileName);
|
||||
}
|
||||
}
|
||||
|
||||
private static void AppendXmlMessageToSummary(string summaryFileName, XElement xout, bool traceToStdout = false, string testFile = null,
|
||||
int? seed = null, bool? buggify = null, bool? determinismCheck = null, string oldBinaryName = null)
|
||||
{
|
||||
|
|
|
@ -51,7 +51,7 @@ namespace Magnesium
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new Exception(string.Format("Failed to parse {0}", root), e);
|
||||
throw new Exception(string.Format("Failed to parse JSON {0}", root), e);
|
||||
}
|
||||
if (ev != null) yield return ev;
|
||||
}
|
||||
|
@ -80,8 +80,9 @@ namespace Magnesium
|
|||
TraceFile = file,
|
||||
DDetails = xEvent.Elements()
|
||||
.Where(a=>a.Name != "Type" && a.Name != "Time" && a.Name != "Machine" && a.Name != "ID" && a.Name != "Severity" && (!rolledEvent || a.Name != "OriginalTime"))
|
||||
.ToDictionary(a=>string.Intern(a.Name.LocalName), a=>(object)a.Value),
|
||||
original = keepOriginalElement ? xEvent : null,
|
||||
// When the key contains a colon character, it gets parsed as a:item
|
||||
.ToDictionary(a=>a.Name.LocalName == "item" ? a.Attribute("item").Value : string.Intern(a.Name.LocalName), a=>(object)a.Value),
|
||||
original = keepOriginalElement ? xEvent : null
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ namespace Magnesium
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new Exception(string.Format("Failed to parse {0}", xev), e);
|
||||
throw new Exception(string.Format("Failed to parse XML {0}", xev), e);
|
||||
}
|
||||
if (ev != null) yield return ev;
|
||||
}
|
||||
|
|
|
@ -228,6 +228,8 @@ If you interrupt the exclude command with Ctrl-C after seeing the "waiting for s
|
|||
|
||||
7) If you ever want to add a removed machine back to the cluster, you will have to take it off the excluded servers list to which it was added in step 3. This can be done using the ``include`` command of ``fdbcli``. If attempting to re-include a failed server, this can be done using the ``include failed`` command of ``fdbcli``. Typing ``exclude`` with no parameters will tell you the current list of excluded and failed machines.
|
||||
|
||||
As of api version 700, excluding servers can be done with the :ref:`special key space management module <special-key-space-management-module>` as well.
|
||||
|
||||
Moving a cluster
|
||||
================
|
||||
|
||||
|
|
|
@ -146,6 +146,16 @@ FoundationDB may return the following error codes from API functions. If you nee
|
|||
| special_keys_no_module_found | 2113| Special key space range read does not intersect a module. |
|
||||
| | | Refer to the ``SPECIAL_KEY_SPACE_RELAXED`` transaction option for more details.|
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| special_keys_write_disabled | 2114| Special key space is not allowed to write by default. Refer |
|
||||
| | | to the ``SPECIAL_KEY_SPACE_ENABLE_WRITES`` transaction option for more details.|
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| special_keys_no_write_module_found | 2115| Special key space key or keyrange in set or clear does not intersect a module. |
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| special_keys_cross_module_write | 2116| Special key space clear crosses modules |
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| special_keys_api_failure | 2117| Api call through special keys failed. For more information, read the |
|
||||
| | | ``0xff0xff/error_message`` key |
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| api_version_unset | 2200| API version is not set |
|
||||
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
|
||||
| api_version_already_set | 2201| API version may be set only once |
|
||||
|
|
|
@ -763,8 +763,8 @@ Special keys
|
|||
Keys starting with the bytes ``\xff\xff`` are called "special" keys, and they are materialized when read. :doc:`\\xff\\xff/status/json <mr-status>` is an example of a special key.
|
||||
As of api version 630, additional features have been exposed as special keys and are available to read as ranges instead of just individual keys. Additionally, the special keys are now organized into "modules".
|
||||
|
||||
Modules
|
||||
-------
|
||||
Read-only modules
|
||||
-----------------
|
||||
|
||||
A module is loosely defined as a key range in the special key space where a user can expect similar behavior from reading any key in that range.
|
||||
By default, users will see a ``special_keys_no_module_found`` error if they read from a range not contained in a module.
|
||||
|
@ -912,6 +912,59 @@ Caveats
|
|||
|
||||
#. ``\xff\xff/metrics/health/`` These keys may return data that's several seconds old, and the data may not be available for a brief period during recovery. This will be indicated by the keys being absent.
|
||||
|
||||
|
||||
Read/write modules
|
||||
------------------
|
||||
|
||||
As of api version 700, some modules in the special key space allow writes as
|
||||
well as reads. In these modules, a user can expect that mutations (i.e. sets,
|
||||
clears, etc) do not have side-effects outside of the current transaction
|
||||
until commit is called (the same is true for writes to the normal key space).
|
||||
A user can also expect the effects on commit to be atomic. Reads to
|
||||
special keys may require reading system keys (whose format is an implementation
|
||||
detail), and for those reads appropriate read conflict ranges are added on
|
||||
the underlying system keys.
|
||||
|
||||
Writes to read/write modules in the special key space are disabled by
|
||||
default. Use the ``special_key_space_enable_writes`` transaction option to
|
||||
enable them [#special_key_space_enable_writes]_.
|
||||
|
||||
|
||||
.. _special-key-space-management-module:
|
||||
|
||||
Management module
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
The management module is for temporary cluster configuration changes. For
|
||||
example, in order to safely remove a process from the cluster, one can add an
|
||||
exclusion to the ``\xff\xff/management/exclude/`` key prefix that matches
|
||||
that process, and wait for necessary data to be moved away.
|
||||
|
||||
#. ``\xff\xff/management/exclude/<exclusion>`` Read/write. Indicates that the cluster should move data away from processes matching ``<exclusion>``, so that they can be safely removed. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
|
||||
#. ``\xff\xff/management/failed/<exclusion>`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
|
||||
#. ``\xff\xff/management/inProgressExclusion/<address>`` Read-only. Indicates that the process matching ``<address>`` matches an exclusion, but still has necessary data and can't yet be safely removed.
|
||||
#. ``\xff\xff/management/options/exclude/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/exclude/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
|
||||
#. ``\xff\xff/management/options/failed/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
|
||||
|
||||
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
|
||||
an ip address and port (e.g. ``127.0.0.1:4500``). If no port is specified,
|
||||
then all processes on that host match the exclusion.
|
||||
|
||||
Error message module
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Each module written to validates the transaction before committing, and this
|
||||
validation failing is indicated by a ``special_keys_api_failure`` error.
|
||||
More detailed information about why this validation failed can be accessed through the ``\xff\xff/error_message`` key, whose value is a json document with the following schema.
|
||||
|
||||
========================== ======== ===============
|
||||
**Field** **Type** **Description**
|
||||
-------------------------- -------- ---------------
|
||||
retriable boolean Whether or not this operation might succeed if retried
|
||||
command string The fdbcli command corresponding to this operation
|
||||
message string Help text explaining the reason this operation failed
|
||||
========================== ======== ===============
|
||||
|
||||
Performance considerations
|
||||
==========================
|
||||
|
||||
|
@ -1114,3 +1167,4 @@ At a first glance this looks very similar to an ``commit_unknown_result``. Howev
|
|||
|
||||
.. [#conflicting_keys] In practice, the transaction probably committed successfully. However, if you're running multiple resolvers then it's possible for a transaction to cause another to abort even if it doesn't commit successfully.
|
||||
.. [#max_read_transaction_life_versions] The number 5000000 comes from the server knob MAX_READ_TRANSACTION_LIFE_VERSIONS
|
||||
.. [#special_key_space_enable_writes] Enabling this option enables other transaction options, such as ``ACCESS_SYSTEM_KEYS``. This may change in the future.
|
||||
|
|
|
@ -2154,8 +2154,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
|
||||
return false;
|
||||
} else {
|
||||
state std::vector<AddressExclusion> addresses;
|
||||
state std::set<AddressExclusion> exclusions;
|
||||
state std::vector<AddressExclusion> exclusionVector;
|
||||
state std::set<AddressExclusion> exclusionSet;
|
||||
bool force = false;
|
||||
state bool waitForAllExcluded = true;
|
||||
state bool markFailed = false;
|
||||
|
@ -2174,8 +2174,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
printf(" Do not include the `:tls' suffix when naming a process\n");
|
||||
return true;
|
||||
}
|
||||
addresses.push_back( a );
|
||||
exclusions.insert( a );
|
||||
exclusionVector.push_back(a);
|
||||
exclusionSet.insert(a);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2183,7 +2183,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
if (markFailed) {
|
||||
state bool safe;
|
||||
try {
|
||||
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, addresses)));
|
||||
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, exclusionVector)));
|
||||
safe = _safe;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
|
@ -2245,7 +2245,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
return true;
|
||||
}
|
||||
NetworkAddress addr = NetworkAddress::parse(addrStr);
|
||||
bool excluded = (process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusions, addr);
|
||||
bool excluded =
|
||||
(process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusionSet, addr);
|
||||
ssTotalCount++;
|
||||
if (excluded)
|
||||
ssExcludedCount++;
|
||||
|
@ -2286,7 +2287,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
}
|
||||
}
|
||||
|
||||
wait(makeInterruptable(excludeServers(db, addresses, markFailed)));
|
||||
wait(makeInterruptable(excludeServers(db, exclusionVector, markFailed)));
|
||||
|
||||
if (waitForAllExcluded) {
|
||||
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
|
||||
|
@ -2297,7 +2298,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
warn.cancel();
|
||||
|
||||
state std::set<NetworkAddress> notExcludedServers =
|
||||
wait(makeInterruptable(checkForExcludingServers(db, addresses, waitForAllExcluded)));
|
||||
wait(makeInterruptable(checkForExcludingServers(db, exclusionVector, waitForAllExcluded)));
|
||||
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
|
||||
std::map<IPAddress, std::set<uint16_t>> workerPorts;
|
||||
for(auto addr : workers)
|
||||
|
@ -2305,7 +2306,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
|
||||
// Print a list of all excluded addresses that don't have a corresponding worker
|
||||
std::set<AddressExclusion> absentExclusions;
|
||||
for(auto addr : addresses) {
|
||||
for (const auto& addr : exclusionVector) {
|
||||
auto worker = workerPorts.find(addr.ip);
|
||||
if(worker == workerPorts.end())
|
||||
absentExclusions.insert(addr);
|
||||
|
@ -2313,43 +2314,46 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
absentExclusions.insert(addr);
|
||||
}
|
||||
|
||||
for (auto addr : addresses) {
|
||||
NetworkAddress _addr(addr.ip, addr.port);
|
||||
if (absentExclusions.find(addr) != absentExclusions.end()) {
|
||||
if(addr.port == 0)
|
||||
for (const auto& exclusion : exclusionVector) {
|
||||
if (absentExclusions.find(exclusion) != absentExclusions.end()) {
|
||||
if (exclusion.port == 0) {
|
||||
printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
|
||||
"correct machines before removing them from the cluster!\n",
|
||||
addr.ip.toString().c_str());
|
||||
else
|
||||
exclusion.ip.toString().c_str());
|
||||
} else {
|
||||
printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
|
||||
"before removing them from the cluster!\n",
|
||||
addr.toString().c_str());
|
||||
} else if (notExcludedServers.find(_addr) != notExcludedServers.end()) {
|
||||
if (addr.port == 0)
|
||||
exclusion.toString().c_str());
|
||||
}
|
||||
} else if (std::any_of(notExcludedServers.begin(), notExcludedServers.end(),
|
||||
[&](const NetworkAddress& a) { return addressExcluded({ exclusion }, a); })) {
|
||||
if (exclusion.port == 0) {
|
||||
printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
|
||||
"machine from the cluster\n",
|
||||
addr.ip.toString().c_str());
|
||||
else
|
||||
exclusion.ip.toString().c_str());
|
||||
} else {
|
||||
printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
|
||||
"cluster\n",
|
||||
addr.toString().c_str());
|
||||
exclusion.toString().c_str());
|
||||
}
|
||||
} else {
|
||||
if (addr.port == 0)
|
||||
if (exclusion.port == 0) {
|
||||
printf(" %s(Whole machine) ---- Successfully excluded. It is now safe to remove this machine "
|
||||
"from the cluster.\n",
|
||||
addr.ip.toString().c_str());
|
||||
else
|
||||
exclusion.ip.toString().c_str());
|
||||
} else {
|
||||
printf(
|
||||
" %s ---- Successfully excluded. It is now safe to remove this process from the cluster.\n",
|
||||
addr.toString().c_str());
|
||||
exclusion.toString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool foundCoordinator = false;
|
||||
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
|
||||
for( auto& c : ccs.coordinators()) {
|
||||
if (std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip, c.port) ) ||
|
||||
std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip) )) {
|
||||
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
|
||||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
|
||||
printf("WARNING: %s is a coordinator!\n", c.toString().c_str());
|
||||
foundCoordinator = true;
|
||||
}
|
||||
|
@ -3457,7 +3461,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
if (db->apiVersionAtLeast(700))
|
||||
BinaryReader::fromStringRef<ClientWorkerInterface>(
|
||||
address_interface[tokens[i]].first, IncludeVersion())
|
||||
.reboot.send(RebootRequest(false, false, timeout_ms));
|
||||
.reboot.send(RebootRequest(false, false, seconds));
|
||||
else
|
||||
tr->set(LiteralStringRef("\xff\xff/suspend_worker"),
|
||||
address_interface[tokens[i]].first);
|
||||
|
|
|
@ -53,7 +53,7 @@ struct RebootRequest {
|
|||
constexpr static FileIdentifier file_identifier = 11913957;
|
||||
bool deleteData;
|
||||
bool checkData;
|
||||
uint32_t waitForDuration;
|
||||
uint32_t waitForDuration; // seconds
|
||||
|
||||
explicit RebootRequest(bool deleteData = false, bool checkData = false, uint32_t waitForDuration = 0)
|
||||
: deleteData(deleteData), checkData(checkData), waitForDuration(waitForDuration) {}
|
||||
|
|
|
@ -262,8 +262,14 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
|
|||
nextRequestTimer = Never();
|
||||
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
|
||||
|
||||
TransactionTagMap<uint64_t> tagCounts;
|
||||
for(auto itr : *throttledTags) {
|
||||
for(auto priorityThrottles : itr.second) {
|
||||
tagCounts[priorityThrottles.first] = (*transactionTagCounter)[priorityThrottles.first];
|
||||
}
|
||||
}
|
||||
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
|
||||
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,
|
||||
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, tagCounts,
|
||||
*ssTagCommitCost, detailed)));
|
||||
transactionTagCounter->clear();
|
||||
ssTagCommitCost->clear();
|
||||
|
@ -1110,6 +1116,7 @@ ACTOR Future<Void> commitBatch(
|
|||
yieldBytes += mutationSize;
|
||||
// Determine the set of tags (responsible storage servers) for the mutation, splitting it
|
||||
// if necessary. Serialize (splits of) the mutation into the message buffer and add the tags.
|
||||
|
||||
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
|
||||
// sample single key mutation based on byte
|
||||
// the expectation of sampling is every COMMIT_SAMPLE_BYTE sample once
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "flow/crc32c.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <unordered_map>
|
||||
|
||||
|
@ -87,7 +88,7 @@ void* FastAllocator<Size>::freelist = nullptr;
|
|||
typedef void (*ThreadInitFunction)();
|
||||
|
||||
ThreadInitFunction threadInitFunction = 0; // See ThreadCleanup.cpp in the C binding
|
||||
void setFastAllocatorThreadInitFunction( ThreadInitFunction f ) {
|
||||
void setFastAllocatorThreadInitFunction(ThreadInitFunction f) {
|
||||
ASSERT( !threadInitFunction );
|
||||
threadInitFunction = f;
|
||||
}
|
||||
|
@ -221,9 +222,9 @@ struct FastAllocator<Size>::GlobalData {
|
|||
CRITICAL_SECTION mutex;
|
||||
std::vector<void*> magazines; // These magazines are always exactly magazine_size ("full")
|
||||
std::vector<std::pair<int, void*>> partial_magazines; // Magazines that are not "full" and their counts. Only created by releaseThreadMagazines().
|
||||
long long totalMemory;
|
||||
std::atomic<long long> totalMemory;
|
||||
long long partialMagazineUnallocatedMemory;
|
||||
long long activeThreads;
|
||||
std::atomic<long long> activeThreads;
|
||||
GlobalData() : totalMemory(0), partialMagazineUnallocatedMemory(0), activeThreads(0) {
|
||||
InitializeCriticalSection(&mutex);
|
||||
}
|
||||
|
@ -231,18 +232,22 @@ struct FastAllocator<Size>::GlobalData {
|
|||
|
||||
template <int Size>
|
||||
long long FastAllocator<Size>::getTotalMemory() {
|
||||
return globalData()->totalMemory;
|
||||
return globalData()->totalMemory.load();
|
||||
}
|
||||
|
||||
// This does not include memory held by various threads that's available for allocation
|
||||
template <int Size>
|
||||
long long FastAllocator<Size>::getApproximateMemoryUnused() {
|
||||
return globalData()->magazines.size() * magazine_size * Size + globalData()->partialMagazineUnallocatedMemory;
|
||||
EnterCriticalSection(&globalData()->mutex);
|
||||
long long unused =
|
||||
globalData()->magazines.size() * magazine_size * Size + globalData()->partialMagazineUnallocatedMemory;
|
||||
LeaveCriticalSection(&globalData()->mutex);
|
||||
return unused;
|
||||
}
|
||||
|
||||
template <int Size>
|
||||
long long FastAllocator<Size>::getActiveThreads() {
|
||||
return globalData()->activeThreads;
|
||||
return globalData()->activeThreads.load();
|
||||
}
|
||||
|
||||
#if FAST_ALLOCATOR_DEBUG
|
||||
|
@ -411,9 +416,7 @@ void FastAllocator<Size>::initThread() {
|
|||
threadInitFunction();
|
||||
}
|
||||
|
||||
EnterCriticalSection(&globalData()->mutex);
|
||||
++globalData()->activeThreads;
|
||||
LeaveCriticalSection(&globalData()->mutex);
|
||||
globalData()->activeThreads.fetch_add(1);
|
||||
|
||||
threadData.freelist = nullptr;
|
||||
threadData.alternate = nullptr;
|
||||
|
@ -442,7 +445,7 @@ void FastAllocator<Size>::getMagazine() {
|
|||
threadData.count = p.first;
|
||||
return;
|
||||
}
|
||||
globalData()->totalMemory += magazine_size*Size;
|
||||
globalData()->totalMemory.fetch_add(magazine_size * Size);
|
||||
LeaveCriticalSection(&globalData()->mutex);
|
||||
|
||||
// Allocate a new page of data from the system allocator
|
||||
|
@ -454,8 +457,9 @@ void FastAllocator<Size>::getMagazine() {
|
|||
#if FAST_ALLOCATOR_DEBUG
|
||||
#ifdef WIN32
|
||||
static int alt = 0; alt++;
|
||||
block = (void**)VirtualAllocEx( GetCurrentProcess(),
|
||||
(void*)( ((getSizeCode(Size)<<11) + alt) * magazine_size*Size), magazine_size*Size, MEM_COMMIT|MEM_RESERVE, PAGE_READWRITE );
|
||||
block =
|
||||
(void**)VirtualAllocEx(GetCurrentProcess(), (void*)(((getSizeCode(Size) << 11) + alt) * magazine_size * Size),
|
||||
magazine_size * Size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
|
||||
#else
|
||||
static int alt = 0; alt++;
|
||||
void* desiredBlock = (void*)( ((getSizeCode(Size)<<11) + alt) * magazine_size*Size);
|
||||
|
@ -509,7 +513,7 @@ void FastAllocator<Size>::releaseThreadMagazines() {
|
|||
globalData()->magazines.push_back(thr.alternate);
|
||||
}
|
||||
}
|
||||
--globalData()->activeThreads;
|
||||
globalData()->activeThreads.fetch_add(-1);
|
||||
LeaveCriticalSection(&globalData()->mutex);
|
||||
|
||||
thr.count = 0;
|
||||
|
|
|
@ -133,7 +133,7 @@ private:
|
|||
};
|
||||
static thread_local ThreadData threadData;
|
||||
static thread_local bool threadInitialized;
|
||||
static GlobalData* globalData() {
|
||||
static GlobalData* globalData() noexcept {
|
||||
#ifdef VALGRIND
|
||||
ANNOTATE_RWLOCK_ACQUIRED(vLock, 1);
|
||||
#endif
|
||||
|
|
|
@ -143,4 +143,4 @@ constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B070010001LL);
|
|||
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");
|
||||
|
||||
// Downgrades are only supported for one minor version
|
||||
constexpr ProtocolVersion minInvalidProtocolVersion(0x0FDB00B071000000LL);
|
||||
constexpr ProtocolVersion minInvalidProtocolVersion(0x0FDB00B072000000LL);
|
||||
|
|
Loading…
Reference in New Issue