Merge branch 'master' of github.com:apple/foundationdb into add-fdbcli-tests

This commit is contained in:
Chaoguang Lin 2021-06-17 00:29:41 +00:00
commit ecff680b47
41 changed files with 689 additions and 324 deletions

View File

@ -30,7 +30,7 @@ import java.io.OutputStream;
* Utility for loading a dynamic library from the classpath.
*
*/
class JNIUtil {
public class JNIUtil {
private static final String SEPARATOR = "/";
private static final String LOADABLE_PREFIX = "FDB_LIBRARY_PATH_";
private static final String TEMPFILE_PREFIX = "fdbjni";
@ -92,7 +92,7 @@ class JNIUtil {
File exported;
try {
exported = exportResource(path);
exported = exportResource(path, libName);
}
catch (IOException e) {
throw new UnsatisfiedLinkError(e.getMessage());
@ -109,6 +109,19 @@ class JNIUtil {
}
}
/**
* Export a library from classpath resources to a temporary file.
*
* @param libName the name of the library to attempt to export. This name should be
* undecorated with file extensions and, in the case of *nix, "lib" prefixes.
* @return the exported temporary file
*/
public static File exportLibrary(String libName) throws IOException {
OS os = getRunningOS();
String path = getPath(os, libName);
return exportResource(path, libName);
}
/**
* Gets a relative path for a library. The path will be of the form:
* {@code {os}/{arch}/{name}}.
@ -127,20 +140,21 @@ class JNIUtil {
* Export a resource from the classpath to a temporary file.
*
* @param path the relative path of the file to load from the classpath
* @param name an optional descriptive name to include in the temporary file's path
*
* @return the absolute path to the exported file
* @throws IOException
*/
private static File exportResource(String path) throws IOException {
private static File exportResource(String path, String name) throws IOException {
InputStream resource = JNIUtil.class.getResourceAsStream(path);
if(resource == null)
throw new IllegalStateException("Embedded library jar:" + path + " not found");
File f = saveStreamAsTempFile(resource);
File f = saveStreamAsTempFile(resource, name);
return f;
}
private static File saveStreamAsTempFile(InputStream resource) throws IOException {
File f = File.createTempFile(TEMPFILE_PREFIX, TEMPFILE_SUFFIX);
private static File saveStreamAsTempFile(InputStream resource, String name) throws IOException {
File f = File.createTempFile(name.length() > 0 ? name : TEMPFILE_PREFIX, TEMPFILE_SUFFIX);
FileOutputStream outputStream = new FileOutputStream(f);
copyStream(resource, outputStream);
outputStream.flush();

View File

@ -51,9 +51,10 @@ namespace SummarizeTest
bool traceToStdout = false;
try
{
string joshuaSeed = System.Environment.GetEnvironmentVariable("JOSHUA_SEED");
byte[] seed = new byte[4];
new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(seed);
random = new Random(new BinaryReader(new MemoryStream(seed)).ReadInt32());
random = new Random(joshuaSeed != null ? Convert.ToInt32(Int64.Parse(joshuaSeed) % 2147483648) : new BinaryReader(new MemoryStream(seed)).ReadInt32());
if (args.Length < 1)
return UsageMessage();
@ -246,6 +247,7 @@ namespace SummarizeTest
string testFile = null;
string testDir = "";
string oldServerName = "";
bool noSim = false;
if (Directory.Exists(testFolder))
{
@ -254,9 +256,10 @@ namespace SummarizeTest
if( Directory.Exists(Path.Combine(testFolder, "slow")) ) poolSize += 5;
if( Directory.Exists(Path.Combine(testFolder, "fast")) ) poolSize += 14;
if( Directory.Exists(Path.Combine(testFolder, "restarting")) ) poolSize += 1;
if( Directory.Exists(Path.Combine(testFolder, "noSim")) ) poolSize += 1;
if( poolSize == 0 ) {
Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, or restarting sub-folder", testFolder);
Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, restarting, or noSim sub-folder", testFolder);
return 1;
}
int selection = random.Next(poolSize);
@ -272,11 +275,20 @@ namespace SummarizeTest
testDir = Path.Combine(testFolder, "restarting");
else
{
if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5;
if (Directory.Exists(Path.Combine(testFolder, "noSim"))) selectionWindow += 1;
if (selection < selectionWindow)
testDir = Path.Combine(testFolder, "slow");
{
testDir = Path.Combine(testFolder, "noSim");
noSim = true;
}
else
testDir = Path.Combine(testFolder, "fast");
{
if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5;
if (selection < selectionWindow)
testDir = Path.Combine(testFolder, "slow");
else
testDir = Path.Combine(testFolder, "fast");
}
}
}
string[] files = Directory.GetFiles(testDir, "*", SearchOption.AllDirectories);
@ -341,11 +353,11 @@ namespace SummarizeTest
bool useNewPlugin = (oldServerName == fdbserverName) || versionGreaterThanOrEqual(oldServerName.Split('-').Last(), "5.2.0");
bool useToml = File.Exists(testFile + "-1.toml");
string testFile1 = useToml ? testFile + "-1.toml" : testFile + "-1.txt";
result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout);
result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout, noSim);
if (result == 0)
{
string testFile2 = useToml ? testFile + "-2.toml" : testFile + "-2.txt";
result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout);
result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout, noSim);
}
}
else
@ -353,13 +365,13 @@ namespace SummarizeTest
int expectedUnseed = -1;
if (!useValgrind && unseedCheck)
{
result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout);
result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout, noSim);
}
if (!retryableError)
{
int unseed;
result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout);
result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout, noSim);
}
}
@ -374,7 +386,7 @@ namespace SummarizeTest
private static int RunTest(string fdbserverName, string tlsPluginFile, string summaryFileName, string errorFileName, int seed,
bool buggify, string testFile, string runDir, string uid, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError, bool useValgrind, bool restarting = false,
bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false)
bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false, bool noSim = false)
{
unseed = -1;
@ -408,16 +420,17 @@ namespace SummarizeTest
tlsPluginArg = "--tls_plugin=" + tlsPluginFile;
}
process.StartInfo.RedirectStandardOutput = true;
string role = (noSim) ? "test" : "simulation";
var args = "";
if (willRestart && oldBinaryName.EndsWith("alpha6"))
{
args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
args = string.Format("-Rs 1000000000 -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash",
role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
}
else
{
args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash",
IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
args = string.Format("-Rs 1GB -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash",
role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg);
}
if (restarting) args = args + " --restarting";
if (useValgrind && !willRestart)

View File

@ -541,13 +541,17 @@ Applications must provide error handling and an appropriate retry loop around th
|snapshot|
.. function:: FDBFuture* fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length)
Returns an estimated byte size of the key range.
.. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate.
|future-return0| the estimated size of the key range given. |future-return1| call :func:`fdb_future_get_int64()` to extract the size, |future-return2|
.. function:: FDBFuture* fdb_transaction_get_range_split_points( FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length, int64_t chunk_size)
Returns a list of keys that can split the given range into (roughly) equally sized chunks based on ``chunk_size``.
.. note:: The returned split points contain the start key and end key of the given range
|future-return0| the list of split points. |future-return1| call :func:`fdb_future_get_key_array()` to extract the array, |future-return2|

View File

@ -800,6 +800,7 @@ Transaction misc functions
.. method:: Transaction.get_estimated_range_size_bytes(begin_key, end_key)
Gets the estimated byte size of the given key range. Returns a :class:`FutureInt64`.
.. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate.
.. method:: Transaction.get_range_split_points(self, begin_key, end_key, chunk_size)
@ -807,15 +808,11 @@ Transaction misc functions
Gets a list of keys that can split the given range into (roughly) equally sized chunks based on ``chunk_size``. Returns a :class:`FutureKeyArray`.
.. note:: The returned split points contain the start key and end key of the given range
.. _api-python-transaction-options:
Transaction misc functions
--------------------------
.. method:: Transaction.get_approximate_size()
|transaction-get-approximate-size-blurb|. Returns a :class:`FutureInt64`.
|transaction-get-approximate-size-blurb| Returns a :class:`FutureInt64`.
.. _api-python-transaction-options:
Transaction options
-------------------

View File

@ -744,6 +744,7 @@ Transaction misc functions
.. method:: Transaction.get_estimated_range_size_bytes(begin_key, end_key) -> Int64Future
Gets the estimated byte size of the given key range. Returns a :class:`Int64Future`.
.. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate.
.. method:: Transaction.get_range_split_points(begin_key, end_key, chunk_size) -> FutureKeyArray
@ -753,7 +754,7 @@ Transaction misc functions
.. method:: Transaction.get_approximate_size() -> Int64Future
|transaction-get-approximate-size-blurb|. Returns a :class:`Int64Future`.
|transaction-get-approximate-size-blurb| Returns a :class:`Int64Future`.
Transaction options
-------------------

View File

@ -38,7 +38,6 @@ ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db
return true;
}
// hidden commands, no help text for now
CommandFactory forceRecoveryWithDataLossFactory(
"force_recovery_with_data_loss",
CommandHelp("force_recovery_with_data_loss <DCID>",

View File

@ -548,11 +548,11 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
return true; // All of the above options currently require recovery to take effect
}
inline static KeyValueRef* lower_bound(VectorRef<KeyValueRef>& config, KeyRef const& key) {
static KeyValueRef* lower_bound(VectorRef<KeyValueRef>& config, KeyRef const& key) {
return std::lower_bound(config.begin(), config.end(), KeyValueRef(key, ValueRef()), KeyValueRef::OrderByKey());
}
inline static KeyValueRef const* lower_bound(VectorRef<KeyValueRef> const& config, KeyRef const& key) {
return lower_bound(const_cast<VectorRef<KeyValueRef>&>(config), key);
static KeyValueRef const* lower_bound(VectorRef<KeyValueRef> const& config, KeyRef const& key) {
return std::lower_bound(config.begin(), config.end(), KeyValueRef(key, ValueRef()), KeyValueRef::OrderByKey());
}
void DatabaseConfiguration::applyMutation(MutationRef m) {
@ -664,7 +664,7 @@ void DatabaseConfiguration::fromKeyValues(Standalone<VectorRef<KeyValueRef>> raw
}
bool DatabaseConfiguration::isOverridden(std::string key) const {
key = configKeysPrefix.toString() + key;
key = configKeysPrefix.toString() + std::move(key);
if (mutableConfiguration.present()) {
return mutableConfiguration.get().find(key) != mutableConfiguration.get().end();

View File

@ -4976,9 +4976,6 @@ ACTOR Future<Version> extractReadVersion(Location location,
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3(
startTime, cx->clientLocality.dcId(), latency, priority, rep.version));
if (rep.version == 1 && rep.locked) {
throw proxy_memory_limit_exceeded();
}
if (rep.locked && !lockAware)
throw database_locked();

View File

@ -52,6 +52,7 @@ class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams);
bool _exclusionSafetyCheck(vector<UID>& excludeServerIDs, DDTeamCollection* teamCollection);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id;
@ -375,14 +376,16 @@ struct ServerStatus {
LocalityData locality;
ServerStatus()
: isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus(bool isFailed, bool isUndesired, LocalityData const& locality)
ServerStatus(bool isFailed, bool isUndesired, bool isWiggling, LocalityData const& locality)
: isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false),
initialized(true), isWiggling(false) {}
initialized(true), isWiggling(isWiggling) {}
bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
const char* toString() const {
return isFailed ? "Failed" : isUndesired ? "Undesired" : isWiggling ? "Wiggling" : "Healthy";
}
bool operator==(ServerStatus const& r) const {
return isFailed == r.isFailed && isUndesired == r.isUndesired &&
return isFailed == r.isFailed && isUndesired == r.isUndesired && isWiggling == r.isWiggling &&
isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized;
}
bool operator!=(ServerStatus const& r) const { return !(*this == r); }
@ -621,11 +624,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info;
std::map<Key, std::vector<Reference<TCServerInfo>>> pid2server_info; // some process may serve as multiple storage servers
std::vector<AddressExclusion> wiggle_addresses; // collection of wiggling servers' address
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
AsyncVar<bool> disableFailingLaggingServers;
Optional<Key> wigglingPid; // Process id of current wiggling storage server;
Reference<AsyncVar<bool>> pauseWiggle;
// machine_info has all machines info; key must be unique across processes on the same machine
std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
@ -651,6 +656,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int optimalTeamCount;
AsyncVar<bool> zeroOptimalTeams;
bool bestTeamStuck = false;
bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of QuietDB
// WIGGLING if an address is under storage wiggling.
@ -1003,8 +1010,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Log BestTeamStuck reason when we have healthy teams but they do not have healthy free space
if (g_network->isSimulated() && randomTeams.empty() && !self->zeroHealthyTeams->get()) {
TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount);
if (randomTeams.empty() && !self->zeroHealthyTeams->get()) {
self->bestTeamStuck = true;
if (g_network->isSimulated()) {
TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount);
}
} else {
self->bestTeamStuck = false;
}
for (int i = 0; i < randomTeams.size(); i++) {
@ -2826,6 +2838,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer
}
this->wiggle_addresses.push_back(addr);
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(
waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this));
@ -2837,19 +2850,19 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return moveFutures;
}
// Include storage servers held on process of which the Process Id is “pid” by setting their status from `WIGGLING`
// Include wiggled storage servers by setting their status from `WIGGLING`
// to `NONE`. The storage recruiter will recruit them as new storage servers
void includeStorageServersForWiggle(const Value& pid) {
void includeStorageServersForWiggle() {
bool included = false;
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (!this->excludedServers.count(addr) ||
this->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) {
for (auto& address : this->wiggle_addresses) {
if (!this->excludedServers.count(address) ||
this->excludedServers.get(address) != DDTeamCollection::Status::WIGGLING) {
continue;
}
included = true;
this->excludedServers.set(addr, DDTeamCollection::Status::NONE);
this->excludedServers.set(address, DDTeamCollection::Status::NONE);
}
this->wiggle_addresses.clear();
if (included) {
this->restartRecruiting.trigger();
}
@ -3531,8 +3544,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
change.push_back(self->zeroHealthyTeams->onChange());
bool healthy =
!badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer;
bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize;
team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam
bool optimal = team->isOptimal() && healthy;
bool containsFailed = teamContainsFailedServer(self, team);
@ -3829,10 +3841,12 @@ ACTOR Future<Void> trackExcludedServers(DDTeamCollection* self) {
// Reset and reassign self->excludedServers based on excluded, but we only
// want to trigger entries that are different
// Do not retrigger and double-overwrite failed servers
// Do not retrigger and double-overwrite failed or wiggling servers
auto old = self->excludedServers.getKeys();
for (const auto& o : old) {
if (!excluded.count(o) && !failed.count(o)) {
if (!excluded.count(o) && !failed.count(o) &&
!(self->excludedServers.count(o) &&
self->excludedServers.get(o) == DDTeamCollection::Status::WIGGLING)) {
self->excludedServers.set(o, DDTeamCollection::Status::NONE);
}
}
@ -3884,6 +3898,7 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx);
state Value writeValue = LiteralStringRef("0");
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -3896,11 +3911,14 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
} else {
tr.set(wigglingStorageServerKey, nextIt->first);
writeValue = nextIt->first;
}
} else {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
}
}
wait(tr.commit());
@ -3909,21 +3927,26 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
wait(tr.onError(e));
}
}
TraceEvent(SevDebug, "PerpetualNextWigglingStoragePID", teamCollection->distributorId)
.detail("WriteValue", writeValue);
return Void();
}
// Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a signal
// from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the next
// Process ID to a system key: `wigglingStorageServerKey` to show the next process to wiggle.
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
// initialize PID
wait(updateNextWigglingStoragePID(teamCollection));
loop choose {
when(wait(stopSignal->onTrigger())) { break; }
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
loop {
choose {
when(wait(stopSignal->onChange())) {}
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
}
if (stopSignal->get()) {
break;
}
}
return Void();
@ -3931,8 +3954,8 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
// Watch the value change of `wigglingStorageServerKey`.
// Return the watch future and the current value of `wigglingStorageServerKey`.
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
state ReadYourWritesTransaction tr(cx);
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(DDTeamCollection* self) {
state ReadYourWritesTransaction tr(self->cx);
state Future<Void> watchFuture;
state Value ret;
loop {
@ -3952,77 +3975,84 @@ ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Data
return std::make_pair(watchFuture, ret);
}
// periodically check whether the cluster is healthy if we continue perpetual wiggle
ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) {
loop {
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
// pause wiggle when
// a. DDQueue is busy with unhealthy relocation request
// b. no healthy team
// c. the overall disk space is not enough
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 ||
self->bestTeamStuck) {
self->pauseWiggle->set(true);
} else {
self->pauseWiggle->set(false);
}
wait(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow));
}
}
// Watches the value (pid) change of \xff/storageWigglePID, and adds storage servers held on process of which the
// Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker
// start to move data off the affected teams. The wiggling process of current storage servers will be paused if the
// cluster is unhealthy and restarted once the cluster is healthy again.
ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
PromiseStream<Void> finishStorageWiggleSignal,
DDTeamCollection* self,
const DDEnabledState* ddEnabledState) {
state Future<Void> watchFuture;
state Future<Void> watchFuture = Never();
state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart;
state Future<Void> ddQueueCheck =
delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self);
state int movingCount = 0;
state bool isPaused = false;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
watchFuture = res.first;
state vector<UID> excludedServerIds;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self));
ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed
self->wigglingPid = Optional<Key>(res.second);
// start with the initial pid
if (self->healthyTeamCount > 1) { // pre-check health status
TEST(true); // start the first wiggling
auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get());
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleInitialStart", self->distributorId)
.detail("ProcessId", self->wigglingPid.get())
.detail("StorageCount", movingCount);
} else {
isPaused = true;
TraceEvent("PerpetualStorageWiggleInitialPause", self->distributorId)
.detail("ProcessId", self->wigglingPid.get());
}
loop {
choose {
when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) {
// read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
self->wigglingPid = Optional<Key>(res.second);
StringRef pid = self->wigglingPid.get();
if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger();
} else {
if (self->wigglingPid.present()) {
StringRef pid = self->wigglingPid.get();
if (self->pauseWiggle->get()) {
TEST(true); // paused because cluster is unhealthy
moveFinishFuture = Never();
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
} else {
// pre-check whether wiggling chosen servers still satisfy replica requirement
excludedServerIds.clear();
for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) {
excludedServerIds.push_back(info->id);
}
if (_exclusionSafetyCheck(excludedServerIds, self)) {
TEST(true); // start wiggling
auto fv = self->excludeStorageServersForWiggle(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
} else {
TEST(true); // skip wiggling current process
TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString());
moveFinishFuture = Void();
}
}
when(wait(restart.onTrigger())) {
if (self->wigglingPid.present()) {
TEST(true); // restart paused wiggling
StringRef pid = self->wigglingPid.get();
auto fv = self->excludeStorageServersForWiggle(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleRestart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
isPaused = false;
}
}
choose {
when(wait(watchFuture)) {
ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished
watchFuture = Never();
// read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self)));
self->wigglingPid = Optional<Key>(res.second);
// random delay
wait(delayJittered(5.0, TaskPriority::DataDistributionLow));
}
when(wait(moveFinishFuture)) {
TEST(true); // finish wiggling this process
@ -4030,49 +4060,27 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
StringRef pid = self->wigglingPid.get();
moveFinishFuture = Never();
self->includeStorageServersForWiggle(pid);
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId)
.detail("ProcessId", pid.toString())
.detail("StorageCount", movingCount);
self->wigglingPid.reset();
watchFuture = res.first;
finishStorageWiggleSignal.send(Void());
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get() && !isPaused) {
pauseWiggle.trigger();
}
}
when(wait(ddQueueCheck)) { // check health status periodically
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {}
}
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) {
pauseWiggle.trigger();
} else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1 &&
isPaused) {
restart.trigger();
}
ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
}
when(wait(pauseWiggle.onTrigger())) {
if (self->wigglingPid.present()) {
TEST(true); // paused because cluster is unhealthy
StringRef pid = self->wigglingPid.get();
isPaused = true;
moveFinishFuture = Never();
self->includeStorageServersForWiggle(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
}
}
if (stopSignal->get()) {
break;
}
}
if (self->wigglingPid.present()) {
self->includeStorageServersForWiggle(self->wigglingPid.get());
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWiggleExitingPause", self->distributorId)
.detail("ProcessId", self->wigglingPid.get());
self->wigglingPid.reset();
}
@ -4085,9 +4093,10 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection,
const DDEnabledState* ddEnabledState) {
state int speed = 0;
state AsyncTrigger stopWiggleSignal;
state AsyncVar<bool> stopWiggleSignal(false);
state PromiseStream<Void> finishStorageWiggleSignal;
state SignalableActorCollection collection;
teamCollection->pauseWiggle = makeReference<AsyncVar<bool>>(true);
loop {
state ReadYourWritesTransaction tr(teamCollection->cx);
@ -4103,16 +4112,18 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
wait(tr.commit());
ASSERT(speed == 1 || speed == 0);
if (speed == 1) {
if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start
stopWiggleSignal.set(false);
collection.add(perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler(
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else {
stopWiggleSignal.trigger();
} else if (speed == 0 && !stopWiggleSignal.get()) {
stopWiggleSignal.set(true);
wait(collection.signalAndReset());
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId);
teamCollection->pauseWiggle->set(true);
}
wait(watchFuture);
break;
@ -4410,7 +4421,7 @@ ACTOR Future<Void> storageServerTracker(
bool isTss) {
state Future<Void> failureTracker;
state ServerStatus status(false, false, server->lastKnownInterface.locality);
state ServerStatus status(false, false, false, server->lastKnownInterface.locality);
state bool lastIsUnhealthy = false;
state Future<Void> metricsTracker = serverMetricsPolling(server);
@ -4427,6 +4438,7 @@ ACTOR Future<Void> storageServerTracker(
loop {
status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
status.isWrongConfiguration = false;
status.isWiggling = false;
hasWrongDC = !isCorrectDC(self, server);
hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -4506,10 +4518,21 @@ ACTOR Future<Void> storageServerTracker(
status.isWrongConfiguration = true;
}
// An invalid wiggle server should set itself the right status. Otherwise, it cannot be re-included by
// wiggler.
auto invalidWiggleServer =
[](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) {
return server->lastKnownInterface.locality.processId() != tc->wigglingPid;
};
// If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address();
AddressExclusion worstAddr(a.ip, a.port);
DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr);
if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) {
self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE);
worstStatus = DDTeamCollection::Status::NONE;
}
otherChanges.push_back(self->excludedServers.onChange(worstAddr));
for (int i = 0; i < 3; i++) {
@ -4525,6 +4548,12 @@ ACTOR Future<Void> storageServerTracker(
else if (i == 2)
testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip);
DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr);
if (testStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(testAddr, self, server)) {
self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE);
testStatus = DDTeamCollection::Status::NONE;
}
if (testStatus > worstStatus) {
worstStatus = testStatus;
worstAddr = testAddr;
@ -4543,6 +4572,7 @@ ACTOR Future<Void> storageServerTracker(
status.isWiggling = true;
TraceEvent("PerpetualWigglingStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("Address", worstAddr.toString());
} else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) {
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
@ -4607,11 +4637,14 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();
bool processIdChanged = server->lastKnownInterface.locality.processId().get() !=
newInterface.first.locality.processId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged)
.detail("ProcessIdChanged", processIdChanged)
.detail("MachineLocalityChanged", machineLocalityChanged);
server->lastKnownInterface = newInterface.first;
@ -4656,6 +4689,20 @@ ACTOR Future<Void> storageServerTracker(
ASSERT(destMachine.isValid());
}
// update pid2server_info if the process id has changed
if (processIdChanged) {
self->pid2server_info[newInterface.first.locality.processId().get()].push_back(
self->server_info[server->id]);
// delete the old one
auto& old_infos =
self->pid2server_info[server->lastKnownInterface.locality.processId().get()];
for (int i = 0; i < old_infos.size(); ++i) {
if (old_infos[i].getPtr() == server) {
std::swap(old_infos[i--], old_infos.back());
old_infos.pop_back();
}
}
}
// Ensure the server's server team belong to a machine team, and
// Get the newBadTeams due to the locality change
vector<Reference<TCTeamInfo>> newBadTeams;
@ -4702,7 +4749,8 @@ ACTOR Future<Void> storageServerTracker(
interfaceChanged = server->onInterfaceChanged;
// Old failureTracker for the old interface will be actorCancelled since the handler of the old
// actor now points to the new failure monitor actor.
status = ServerStatus(status.isFailed, status.isUndesired, server->lastKnownInterface.locality);
status = ServerStatus(
status.isFailed, status.isUndesired, status.isWiggling, server->lastKnownInterface.locality);
// self->traceTeamCollectionInfo();
recordTeamCollectionInfo = true;
@ -5462,8 +5510,10 @@ ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> te
self->addActor.send(trackExcludedServers(self));
self->addActor.send(monitorHealthyTeams(self));
self->addActor.send(waitHealthyZoneChange(self));
self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState));
if (self->primary) { // the primary dc also handle the satellite dc's perpetual wiggling
self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState));
}
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose {
@ -6215,6 +6265,30 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
return Void();
}
// Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is valid
bool _exclusionSafetyCheck(vector<UID>& excludeServerIDs, DDTeamCollection* teamCollection) {
std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
for (const auto& team : teamCollection->teams) {
vector<UID> teamServerIDs = team->getServerIDs();
std::sort(teamServerIDs.begin(), teamServerIDs.end());
TraceEvent(SevDebug, "DDExclusionSafetyCheck", teamCollection->distributorId)
.detail("Excluding", describe(excludeServerIDs))
.detail("Existing", team->getDesc());
// Find size of set intersection of both vectors and see if the leftover team is valid
vector<UID> intersectSet(teamServerIDs.size());
auto it = std::set_intersection(excludeServerIDs.begin(),
excludeServerIDs.end(),
teamServerIDs.begin(),
teamServerIDs.end(),
intersectSet.begin());
intersectSet.resize(it - intersectSet.begin());
if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) {
return false;
}
}
return true;
}
ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req,
Reference<DataDistributorData> self,
Database cx) {
@ -6244,26 +6318,7 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
}
}
}
std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
for (const auto& team : self->teamCollection->teams) {
vector<UID> teamServerIDs = team->getServerIDs();
std::sort(teamServerIDs.begin(), teamServerIDs.end());
TraceEvent(SevDebug, "DDExclusionSafetyCheck", self->ddId)
.detail("Excluding", describe(excludeServerIDs))
.detail("Existing", team->getDesc());
// Find size of set intersection of both vectors and see if the leftover team is valid
vector<UID> intersectSet(teamServerIDs.size());
auto it = std::set_intersection(excludeServerIDs.begin(),
excludeServerIDs.end(),
teamServerIDs.begin(),
teamServerIDs.end(),
intersectSet.begin());
intersectSet.resize(it - intersectSet.begin());
if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) {
reply.safe = false;
break;
}
}
reply.safe = _exclusionSafetyCheck(excludeServerIDs, self->teamCollection);
TraceEvent("DDExclusionSafetyCheckFinish", self->ddId);
req.reply.send(reply);
return Void();
@ -6440,7 +6495,7 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
collection->server_info[uid] = makeReference<TCServerInfo>(
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
collection->checkAndCreateMachine(collection->server_info[uid]);
}
@ -6497,7 +6552,7 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
collection->server_info[uid] = makeReference<TCServerInfo>(
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
}
int totalServerIndex = collection->constructMachinesFromServers();

View File

@ -993,7 +993,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
allHealthy = true;
anyWithSource = false;
bestTeams.clear();
// Get team from teamCollections in diffrent DCs and find the best one
// Get team from teamCollections in different DCs and find the best one
while (tciIndex < self->teamCollections.size()) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||

View File

@ -107,7 +107,7 @@ struct GrvProxyStats {
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
// The rate at which the limit(budget) is allowed to grow.
specialCounter(
cc, "SystemAndDefaultTxnRateAllowed", [this]() { return int64_t(this->transactionRateAllowed); });
@ -346,6 +346,26 @@ ACTOR Future<Void> getRate(UID myID,
}
}
// Respond with an error to the GetReadVersion request when the GRV limit is hit.
void proxyGRVThresholdExceeded(const GetReadVersionRequest* req, GrvProxyStats* stats) {
++stats->txnRequestErrors;
req->reply.sendError(proxy_memory_limit_exceeded());
if (req->priority == TransactionPriority::IMMEDIATE) {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededSystem").suppressFor(60);
} else if (req->priority == TransactionPriority::DEFAULT) {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededDefault").suppressFor(60);
} else {
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededBatch").suppressFor(60);
}
}
// Drop a GetReadVersion request from a queue, by responding an error to the request.
void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* stats) {
proxyGRVThresholdExceeded(&queue->front(), stats);
queue->pop_front();
}
// Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>> db,
SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue,
@ -361,16 +381,30 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>>
loop choose {
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
bool canBeQueued = true;
if (stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() >
SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE) {
++stats->txnRequestErrors;
// FIXME: send an error instead of giving an unreadable version when the client can support the error:
// req.reply.sendError(proxy_memory_limit_exceeded());
GetReadVersionReply rep;
rep.version = 1;
rep.locked = true;
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
// When the limit is hit, try to drop requests from the lower priority queues.
if (req.priority == TransactionPriority::BATCH) {
canBeQueued = false;
} else if (req.priority == TransactionPriority::DEFAULT) {
if (!batchQueue->empty()) {
dropRequestFromQueue(batchQueue, stats);
} else {
canBeQueued = false;
}
} else {
if (!batchQueue->empty()) {
dropRequestFromQueue(batchQueue, stats);
} else if (!defaultQueue->empty()) {
dropRequestFromQueue(defaultQueue, stats);
} else {
canBeQueued = false;
}
}
}
if (!canBeQueued) {
proxyGRVThresholdExceeded(&req, stats);
} else {
stats->addRequest(req.transactionCount);
// TODO: check whether this is reasonable to do in the fast path

View File

@ -488,7 +488,7 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
namespace {
TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") {
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/Reopen") {
state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);

View File

@ -133,7 +133,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 139 );
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );

View File

@ -285,6 +285,55 @@ ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
return result;
}
// Helper function to extract he maximum SQ size of all provided messages. All futures in the
// messages vector have to be ready.
int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& messages,
const std::vector<StorageServerInterface>& servers) {
int64_t maxQueueSize = 0;
UID maxQueueServer;
for (int i = 0; i < messages.size(); i++) {
try {
auto queueSize = getQueueSize(messages[i].get());
if (queueSize > maxQueueSize) {
maxQueueSize = queueSize;
maxQueueServer = servers[i].id();
}
} catch (Error& e) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Failed to extract MaxStorageServerQueue")
.detail("SS", servers[i].id());
for (auto& m : messages) {
TraceEvent("Messages").detail("Info", m.get().toString());
}
throw;
}
}
TraceEvent("QuietDatabaseGotMaxStorageServerQueueSize")
.detail("Stage", "MaxComputed")
.detail("Max", maxQueueSize)
.detail("MaxQueueServer", format("%016" PRIx64, maxQueueServer.first()));
return maxQueueSize;
}
// Timeout wrapper when getting the storage metrics. This will do some additional tracing
ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi) {
state Future<TraceEventFields> result =
wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics")));
state Future<Void> timeout = delay(1.0);
choose {
when(TraceEventFields res = wait(result)) { return res; }
when(wait(timeout)) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics")
.detail("Storage", format("%016" PRIx64, storage.first()));
throw timed_out();
}
}
};
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
@ -312,9 +361,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
// Ignore TSS in add delay mode since it can purposefully freeze forever
if (!servers[i].isTss() || !g_network->isSimulated() ||
g_simulator.tssMode != ISimulator::TSSMode::EnabledAddDelay) {
messages.push_back(timeoutError(itr->second.eventLogRequest.getReply(EventLogRequest(
StringRef(servers[i].id().toString() + "/StorageMetrics"))),
1.0));
messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second));
}
}
@ -322,23 +369,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
state int64_t maxQueueSize = 0;
state int i = 0;
for (; i < messages.size(); i++) {
try {
maxQueueSize = std::max(maxQueueSize, getQueueSize(messages[i].get()));
} catch (Error& e) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Failed to extract MaxStorageServerQueue")
.detail("SS", servers[i].id());
for (auto& m : messages) {
TraceEvent("Messages").detail("Info", m.get().toString());
}
throw;
}
}
return maxQueueSize;
return extractMaxQueueSize(messages, servers);
}
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
@ -581,6 +612,8 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
return Void();
}
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
std::string phase,
@ -591,6 +624,13 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
int64_t maxPoppedVersionLag = 30e6) {
state Future<Void> reconfig =
reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase");
state Future<int64_t> dataInFlight;
state Future<std::pair<int64_t, int64_t>> tLogQueueInfo;
state Future<int64_t> dataDistributionQueueSize;
state Future<bool> teamCollectionValid;
state Future<int64_t> storageQueueSize;
state Future<bool> dataDistributionActive;
state Future<bool> storageServersRecruiting;
auto traceMessage = "QuietDatabase" + phase + "Begin";
TraceEvent(traceMessage.c_str());
@ -614,15 +654,13 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
TraceEvent("QuietDatabaseGotDataDistributor", distributorUID)
.detail("Locality", distributorWorker.locality.toString());
state Future<int64_t> dataInFlight = getDataInFlight(cx, distributorWorker);
state Future<std::pair<int64_t, int64_t>> tLogQueueInfo = getTLogQueueInfo(cx, dbInfo);
state Future<int64_t> dataDistributionQueueSize =
getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0);
state Future<bool> teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
state Future<int64_t> storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
state Future<bool> dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
state Future<bool> storageServersRecruiting =
getStorageServersRecruiting(cx, distributorWorker, distributorUID);
dataInFlight = getDataInFlight(cx, distributorWorker);
tLogQueueInfo = getTLogQueueInfo(cx, dbInfo);
dataDistributionQueueSize = getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0);
teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
@ -662,6 +700,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
}
}
} catch (Error& e) {
TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e, true);
if (e.code() != error_code_actor_cancelled && e.code() != error_code_attribute_not_found &&
e.code() != error_code_timed_out)
TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e);
@ -671,7 +710,38 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
if (e.code() != error_code_attribute_not_found && e.code() != error_code_timed_out)
throw;
TraceEvent(("QuietDatabase" + phase + "Retry").c_str()).error(e);
auto evtType = "QuietDatabase" + phase + "Retry";
TraceEvent evt(evtType.c_str());
evt.error(e);
int notReadyCount = 0;
if (dataInFlight.isReady() && dataInFlight.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataInFlight");
}
if (tLogQueueInfo.isReady() && tLogQueueInfo.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "tLogQueueInfo");
}
if (dataDistributionQueueSize.isReady() && dataDistributionQueueSize.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataDistributionQueueSize");
}
if (teamCollectionValid.isReady() && teamCollectionValid.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "teamCollectionValid");
}
if (storageQueueSize.isReady() && storageQueueSize.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageQueueSize");
}
if (dataDistributionActive.isReady() && dataDistributionActive.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataDistributionActive");
}
if (storageServersRecruiting.isReady() && storageServersRecruiting.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageServersRecruiting");
}
wait(delay(1.0));
numSuccesses = 0;
}

View File

@ -39,6 +39,7 @@
#include "fdbclient/versions.h"
#include "flow/ProtocolVersion.h"
#include "flow/network.h"
#include "flow/TypeTraits.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#undef max
@ -58,50 +59,85 @@ bool destructed = false;
class TestConfig {
class ConfigBuilder {
using value_type = toml::basic_value<toml::discard_comments>;
std::unordered_map<std::string_view, std::function<void(value_type const&)>> confMap;
using base_variant = std::variant<int, bool, std::string, std::vector<int>>;
using types =
variant_map<variant_concat<base_variant, variant_map<base_variant, Optional>>, std::add_pointer_t>;
std::unordered_map<std::string_view, types> confMap;
struct visitor {
const value_type& value;
visitor(const value_type& v) : value(v) {}
void operator()(int* val) const { *val = value.as_integer(); }
void operator()(Optional<int>* val) const { *val = value.as_integer(); }
void operator()(bool* val) const { *val = value.as_boolean(); }
void operator()(Optional<bool>* val) const { *val = value.as_boolean(); }
void operator()(std::string* val) const { *val = value.as_string(); }
void operator()(Optional<std::string>* val) const { *val = value.as_string(); }
void operator()(std::vector<int>* val) const {
auto arr = value.as_array();
for (const auto& i : arr) {
val->emplace_back(i.as_integer());
}
}
void operator()(Optional<std::vector<int>>* val) const {
std::vector<int> res;
(*this)(&res);
*val = std::move(res);
}
};
struct trace_visitor {
std::string key;
TraceEvent& evt;
trace_visitor(std::string const& key, TraceEvent& e) : key("Key" + key), evt(e) {}
void operator()(int* val) const { evt.detail(key.c_str(), *val); }
void operator()(Optional<int>* val) const { evt.detail(key.c_str(), *val); }
void operator()(bool* val) const { evt.detail(key.c_str(), *val); }
void operator()(Optional<bool>* val) const { evt.detail(key.c_str(), *val); }
void operator()(std::string* val) const { evt.detail(key.c_str(), *val); }
void operator()(Optional<std::string>* val) const { evt.detail(key.c_str(), *val); }
void operator()(std::vector<int>* val) const {
if (val->empty()) {
evt.detail(key.c_str(), "[]");
return;
}
std::stringstream value;
value << "[" << val->at(0);
for (int i = 1; i < val->size(); ++i) {
value << "," << val->at(i);
}
value << "]";
evt.detail(key.c_str(), value.str());
}
void operator()(Optional<std::vector<int>>* val) const {
std::vector<int> res;
(*this)(&res);
*val = std::move(res);
}
};
public:
ConfigBuilder& add(std::string_view key, int* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_integer(); });
~ConfigBuilder() {
TraceEvent evt("SimulatorConfigFromToml");
for (const auto& p : confMap) {
std::visit(trace_visitor(std::string(p.first), evt), p.second);
}
}
template <class V>
ConfigBuilder& add(std::string_view key, V value) {
confMap.emplace(key, value);
return *this;
}
ConfigBuilder& add(std::string_view key, Optional<int>* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_integer(); });
return *this;
}
ConfigBuilder& add(std::string_view key, bool* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_boolean(); });
return *this;
}
ConfigBuilder& add(std::string_view key, Optional<bool>* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_boolean(); });
return *this;
}
ConfigBuilder& add(std::string_view key, std::string* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_string(); });
return *this;
}
ConfigBuilder& add(std::string_view key, Optional<std::string>* value) {
confMap.emplace(key, [value](value_type const& v) { *value = v.as_string(); });
return *this;
}
ConfigBuilder& add(std::string_view key, std::vector<int>* value) {
confMap.emplace(key, [value](value_type const& v) {
auto arr = v.as_array();
for (const auto& i : arr) {
value->push_back(i.as_integer());
}
});
return *this;
}
void set(std::string const& key, value_type const& val) {
void set(std::string_view key, const value_type& value) {
auto iter = confMap.find(key);
if (iter == confMap.end()) {
std::cerr << "Unknown configuration attribute " << key << std::endl;
TraceEvent("UnknownConfigurationAttribute").detail("Name", key);
TraceEvent("UnknownConfigurationAttribute").detail("Name", std::string(key));
throw unknown_error();
}
iter->second(val);
std::visit(visitor(value), iter->second);
}
};
@ -280,6 +316,13 @@ public:
TraceEvent("TOMLParseError").detail("Error", printable(e.what()));
throw unknown_error();
}
// Verify that we can use the passed config
if (simpleConfig) {
if (minimumRegions > 1) {
TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0);
flushAndExit(0);
}
}
}
};
@ -1744,6 +1787,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
}
}
ASSERT(coordinatorAddresses.size() > 0);
deterministicRandom()->randomShuffle(coordinatorAddresses);
for (int i = 0; i < (coordinatorAddresses.size() / 2) + 1; i++) {
TraceEvent("ProtectCoordinator")

View File

@ -809,6 +809,8 @@ public:
if (offset == p->endOffset) {
debug_printf("FIFOQueue::Cursor(%s) Page exhausted\n", toString().c_str());
LogicalPageID oldPageID = pageID;
LogicalPageID extentCurPageID = p->extentCurPageID;
LogicalPageID extentEndPageID = p->extentEndPageID;
pageID = p->nextPageID;
offset = p->nextOffset;
@ -820,19 +822,21 @@ public:
if (mode == POP) {
--queue->numPages;
}
page.clear();
debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str());
if (mode == POP && !queue->usesExtents) {
// Freeing the old page must happen after advancing the cursor and clearing the page reference
// because freePage() could cause a push onto a queue that causes a newPageID() call which could
// pop() from this very same queue. Queue pages are freed at version 0 because they can be reused
// after the next commit.
queue->pager->freePage(oldPageID, 0);
} else if (queue->usesExtents && (p->extentCurPageID == p->extentEndPageID)) {
// Figure out the beginning of the extent
int pagesPerExtent = queue->pagesPerExtent;
queue->pager->freeExtent(oldPageID - pagesPerExtent + 1);
if (mode == POP) {
if(!queue->usesExtents) {
// Freeing the old page must happen after advancing the cursor and clearing the page reference
// because freePage() could cause a push onto a queue that causes a newPageID() call which could
// pop() from this very same queue. Queue pages are freed at version 0 because they can be reused
// after the next commit.
queue->pager->freePage(oldPageID, 0);
} else if (extentCurPageID == extentEndPageID) {
// Figure out the beginning of the extent
int pagesPerExtent = queue->pagesPerExtent;
queue->pager->freeExtent(oldPageID - pagesPerExtent + 1);
}
}
}

View File

@ -890,6 +890,7 @@ ACTOR Future<Void> checkConsistency(Database cx,
StringRef performTSSCheck = LiteralStringRef("false");
if (doQuiescentCheck) {
performQuiescent = LiteralStringRef("true");
spec.restorePerpetualWiggleSetting = false;
}
if (doCacheCheck) {
performCacheCheck = LiteralStringRef("true");
@ -1385,6 +1386,8 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
state bool useDB = false;
state bool waitForQuiescenceBegin = false;
state bool waitForQuiescenceEnd = false;
state bool restorePerpetualWiggleSetting = false;
state bool perpetualWiggleEnabled = false;
state double startDelay = 0.0;
state double databasePingDelay = 1e9;
state ISimulator::BackupAgentType simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents;
@ -1399,6 +1402,8 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
waitForQuiescenceBegin = true;
if (iter->waitForQuiescenceEnd)
waitForQuiescenceEnd = true;
if (iter->restorePerpetualWiggleSetting)
restorePerpetualWiggleSetting = true;
startDelay = std::max(startDelay, iter->startDelay);
databasePingDelay = std::min(databasePingDelay, iter->databasePingDelay);
if (iter->simBackupAgents != ISimulator::BackupAgentType::NoBackupAgents)
@ -1437,6 +1442,15 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
} catch (Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration");
}
if (restorePerpetualWiggleSetting) {
std::string_view confView(reinterpret_cast<const char*>(startingConfiguration.begin()),
startingConfiguration.size());
const std::string setting = "perpetual_storage_wiggle:=";
auto pos = confView.find(setting);
if (pos != confView.npos && confView.at(pos + setting.size()) == '1') {
perpetualWiggleEnabled = true;
}
}
}
if (useDB && waitForQuiescenceBegin) {
@ -1452,6 +1466,10 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
TraceEvent("QuietDatabaseStartExternalError").error(e);
throw;
}
if (perpetualWiggleEnabled) { // restore the enabled perpetual storage wiggle setting
wait(setPerpetualStorageWiggle(cx, true, true));
}
}
TraceEvent("TestsExpectedToPass").detail("Count", tests.size());

View File

@ -743,7 +743,21 @@ ACTOR Future<Void> monitorHighMemory(int64_t threshold) {
return Void();
}
ACTOR Future<Void> storageServerRollbackRebooter(Future<Void> prevStorageServer,
struct TrackRunningStorage {
UID self;
KeyValueStoreType storeType;
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages;
TrackRunningStorage(UID self,
KeyValueStoreType storeType,
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages)
: self(self), storeType(storeType), runningStorages(runningStorages) {
runningStorages->emplace(self, storeType);
}
~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); };
};
ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValueStoreType>>* runningStorages,
Future<Void> prevStorageServer,
KeyValueStoreType storeType,
std::string filename,
UID id,
@ -754,6 +768,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(Future<Void> prevStorageServer,
ActorCollection* filesClosed,
int64_t memoryLimit,
IKeyValueStore* store) {
state TrackRunningStorage _(id, storeType, runningStorages);
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
if (!e.isError())
@ -1038,6 +1053,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
interf.initEndpoints();
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
@ -1150,7 +1166,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(f,
f = storageServerRollbackRebooter(&runningStorages,
f,
s.storeType,
s.filename,
recruited.id(),
@ -1512,7 +1529,17 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
activeSharedTLog->set(logData.uid);
}
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
if (!storageCache.exists(req.reqId)) {
// We want to prevent double recruiting on a worker unless we try to recruit something
// with a different storage engine (otherwise storage migration won't work for certain
// configuration). Additionally we also need to allow double recruitment for seed servers.
// The reason for this is that a storage will only remove itself if after it was able
// to read the system key space. But if recovery fails right after a `configure new ...`
// was run it won't be able to do so.
if (!storageCache.exists(req.reqId) &&
(std::all_of(runningStorages.begin(),
runningStorages.end(),
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
bool isTss = req.tssPairIDAndVersion.present();
@ -1561,7 +1588,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
folder);
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(s,
s = storageServerRollbackRebooter(&runningStorages,
s,
req.storeType,
filename,
recruited.id(),
@ -1573,8 +1601,15 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
memoryLimit,
data);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
} else
} else if (storageCache.exists(req.reqId)) {
forwardPromise(req.reply, storageCache.get(req.reqId));
} else {
TraceEvent("AttemptedDoubleRecruitement", interf.id()).detail("ForRole", "StorageServer");
errorForwarders.add(map(delay(0.5), [reply = req.reply](Void) {
reply.sendError(recruitment_failed());
return Void();
}));
}
}
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
CommitProxyInterface recruited;

View File

@ -1777,6 +1777,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (!found) {
TraceEvent("ConsistencyCheck_NoStorage")
.detail("Address", addr)
.detail("ProcessId", workers[i].interf.locality.processId())
.detail("ProcessClassEqualToStorageClass",
(int)(workers[i].processClass == ProcessClass::StorageClass));
missingStorage.push_back(workers[i].interf.locality.dcId());

View File

@ -168,6 +168,7 @@ struct HealthMetricsApiWorkload : TestWorkload {
traceDiskUsage.detail(format("Storage-%s", ss.first.toString().c_str()), storageStats.diskUsage);
}
TraceEvent traceTLogQueue("TLogQueue");
traceTLogQueue.setMaxEventLength(10000);
for (const auto& ss : healthMetrics.tLogQueue) {
self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second);
traceTLogQueue.detail(format("TLog-%s", ss.first.toString().c_str()), ss.second);

View File

@ -159,6 +159,7 @@ public:
simConnectionFailuresDisableDuration = 0;
simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents;
simDrAgents = ISimulator::BackupAgentType::NoBackupAgents;
restorePerpetualWiggleSetting = true;
}
TestSpec(StringRef title,
bool dump,
@ -169,8 +170,8 @@ public:
: title(title), dumpAfterTest(dump), clearAfterTest(clear), startDelay(startDelay), useDB(useDB), timeout(600),
databasePingDelay(databasePingDelay), runConsistencyCheck(g_network->isSimulated()),
runConsistencyCheckOnCache(false), runConsistencyCheckOnTSS(false), waitForQuiescenceBegin(true),
waitForQuiescenceEnd(true), simCheckRelocationDuration(false), simConnectionFailuresDisableDuration(0),
simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
waitForQuiescenceEnd(true), restorePerpetualWiggleSetting(true), simCheckRelocationDuration(false),
simConnectionFailuresDisableDuration(0), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) {
phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
if (databasePingDelay < 0)
@ -191,6 +192,11 @@ public:
bool runConsistencyCheckOnTSS;
bool waitForQuiescenceBegin;
bool waitForQuiescenceEnd;
bool restorePerpetualWiggleSetting; // whether set perpetual_storage_wiggle as the value after run
// QuietDatabase. QuietDatabase always disables perpetual storage wiggle on
// purpose. If waitForQuiescenceBegin == true and we want to keep perpetual
// storage wiggle the same setting as before during testing, this value should
// be set true.
bool simCheckRelocationDuration; // If set to true, then long duration relocations generate SevWarnAlways messages.
// Once any workload sets this to true, it will be true for the duration of the

View File

@ -67,6 +67,7 @@ set(FLOW_SRCS
Tracing.h
Tracing.actor.cpp
TreeBenchmark.h
TypeTraits.h
UnitTest.cpp
UnitTest.h
XmlTraceLogFormatter.cpp

View File

@ -49,7 +49,7 @@
struct IssuesListImpl {
IssuesListImpl() {}
void addIssue(std::string issue) {
void addIssue(std::string const& issue) {
MutexHolder h(mutex);
issues.insert(issue);
}
@ -61,7 +61,7 @@ struct IssuesListImpl {
}
}
void resolveIssue(std::string issue) {
void resolveIssue(std::string const& issue) {
MutexHolder h(mutex);
issues.erase(issue);
}
@ -73,23 +73,23 @@ private:
IssuesList::IssuesList() : impl(std::make_unique<IssuesListImpl>()) {}
IssuesList::~IssuesList() = default;
void IssuesList::addIssue(std::string issue) {
void IssuesList::addIssue(std::string const& issue) {
impl->addIssue(issue);
}
void IssuesList::retrieveIssues(std::set<std::string>& out) const {
impl->retrieveIssues(out);
}
void IssuesList::resolveIssue(std::string issue) {
void IssuesList::resolveIssue(std::string const& issue) {
impl->resolveIssue(issue);
}
FileTraceLogWriter::FileTraceLogWriter(std::string directory,
std::string processName,
std::string basename,
std::string extension,
FileTraceLogWriter::FileTraceLogWriter(std::string const& directory,
std::string const& processName,
std::string const& basename,
std::string const& extension,
uint64_t maxLogsSize,
std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues)
std::function<void()> const& onError,
Reference<ITraceLogIssuesReporter> const& issues)
: directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize),
traceFileFD(-1), index(0), onError(onError), issues(issues) {}

View File

@ -32,11 +32,11 @@ struct IssuesListImpl;
struct IssuesList final : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList();
~IssuesList() override;
void addIssue(std::string issue) override;
void addIssue(std::string const& issue) override;
void retrieveIssues(std::set<std::string>& out) const override;
void resolveIssue(std::string issue) override;
void resolveIssue(std::string const& issue) override;
void addref() override { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() override { ThreadSafeReferenceCounted<IssuesList>::delref(); }
@ -62,13 +62,13 @@ private:
void write(const char* str, size_t size);
public:
FileTraceLogWriter(std::string directory,
std::string processName,
std::string basename,
std::string extension,
FileTraceLogWriter(std::string const& directory,
std::string const& processName,
std::string const& basename,
std::string const& extension,
uint64_t maxLogsSize,
std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues);
std::function<void()> const& onError,
Reference<ITraceLogIssuesReporter> const& issues);
void addref() override;
void delref() override;

View File

@ -77,7 +77,7 @@ void HistogramRegistry::unregisterHistogram(Histogram* h) {
ASSERT(count == 1);
}
Histogram* HistogramRegistry::lookupHistogram(std::string name) {
Histogram* HistogramRegistry::lookupHistogram(std::string const& name) {
auto h = histograms.find(name);
if (h == histograms.end()) {
return nullptr;

View File

@ -39,7 +39,7 @@ class HistogramRegistry {
public:
void registerHistogram(Histogram* h);
void unregisterHistogram(Histogram* h);
Histogram* lookupHistogram(std::string name);
Histogram* lookupHistogram(std::string const& name);
void logReport();
private:
@ -63,7 +63,7 @@ public:
private:
static const std::unordered_map<Unit, std::string> UnitToStringMapper;
Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry)
Histogram(std::string const& group, std::string const& op, Unit unit, HistogramRegistry& registry)
: group(group), op(op), unit(unit), registry(registry), ReferenceCounted<Histogram>() {
ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end());
@ -71,7 +71,7 @@ private:
clear();
}
static std::string generateName(std::string group, std::string op) { return group + ":" + op; }
static std::string generateName(std::string const& group, std::string const& op) { return group + ":" + op; }
public:
~Histogram() { registry.unregisterHistogram(this); }
@ -125,7 +125,7 @@ public:
}
void writeToLog();
std::string name() { return generateName(this->group, this->op); }
std::string name() const { return generateName(this->group, this->op); }
std::string const group;
std::string const op;

View File

@ -51,8 +51,8 @@ struct ITraceLogFormatter {
struct ITraceLogIssuesReporter {
virtual ~ITraceLogIssuesReporter();
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void addIssue(std::string const& issue) = 0;
virtual void resolveIssue(std::string const& issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) const = 0;

View File

@ -1482,7 +1482,7 @@ void initPdhStrings(SystemStatisticsState* state, std::string dataFolder) {
}
#endif
SystemStatistics getSystemStatistics(std::string dataFolder,
SystemStatistics getSystemStatistics(std::string const& dataFolder,
const IPAddress* ip,
SystemStatisticsState** statState,
bool logDetails) {
@ -2640,7 +2640,7 @@ Future<vector<std::string>> listDirectoriesAsync(std::string const& directory) {
return findFiles(directory, "", true /* directoryOnly */, true);
}
void findFilesRecursively(std::string path, std::vector<std::string>& out) {
void findFilesRecursively(std::string const& path, std::vector<std::string>& out) {
// Add files to output, prefixing path
std::vector<std::string> files = platform::listFiles(path);
for (auto const& f : files)

View File

@ -232,7 +232,7 @@ struct SystemStatisticsState;
struct IPAddress;
SystemStatistics getSystemStatistics(std::string dataFolder,
SystemStatistics getSystemStatistics(std::string const& dataFolder,
const IPAddress* ip,
SystemStatisticsState** statState,
bool logDetails);
@ -369,7 +369,7 @@ std::vector<std::string> listFiles(std::string const& directory, std::string con
// returns directory names relative to directory
std::vector<std::string> listDirectories(std::string const& directory);
void findFilesRecursively(std::string path, std::vector<std::string>& out);
void findFilesRecursively(std::string const& path, std::vector<std::string>& out);
// Tag the given file as "temporary", i.e. not really needing commits to disk
void makeTemporary(const char* filename);

View File

@ -61,7 +61,7 @@ SystemStatistics getSystemStatistics() {
.detail("ApproximateUnusedMemory" #size, FastAllocator<size>::getApproximateMemoryUnused()) \
.detail("ActiveThreads" #size, FastAllocator<size>::getActiveThreads())
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics) {
SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsState* statState, bool machineMetrics) {
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
SystemStatistics currentStats = getSystemStatistics(
machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState->systemState, true);

View File

@ -36,11 +36,11 @@ struct SystemMonitorMachineState {
SystemMonitorMachineState() : monitorStartTime(0) {}
explicit SystemMonitorMachineState(const IPAddress& ip) : ip(ip), monitorStartTime(0) {}
SystemMonitorMachineState(std::string folder,
Optional<Standalone<StringRef>> dcId,
Optional<Standalone<StringRef>> zoneId,
Optional<Standalone<StringRef>> machineId,
const IPAddress& ip)
SystemMonitorMachineState(std::string const& folder,
Optional<Standalone<StringRef>> const& dcId,
Optional<Standalone<StringRef>> const& zoneId,
Optional<Standalone<StringRef>> const& machineId,
IPAddress const& ip)
: folder(folder), dcId(dcId), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {}
};
@ -148,7 +148,9 @@ struct StatisticsState {
};
void systemMonitor();
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics = false);
SystemStatistics customSystemMonitor(std::string const& eventName,
StatisticsState* statState,
bool machineMetrics = false);
SystemStatistics getSystemStatistics();
#endif /* FLOW_SYSTEM_MONITOR_H */

View File

@ -503,7 +503,7 @@ public:
}
}
void addRole(std::string role) {
void addRole(std::string const& role) {
MutexHolder holder(mutex);
RoleInfo& r = mutateRoleInfo();
@ -511,7 +511,7 @@ public:
r.refreshRolesString();
}
void removeRole(std::string role) {
void removeRole(std::string const& role) {
MutexHolder holder(mutex);
RoleInfo& r = mutateRoleInfo();
@ -557,13 +557,13 @@ NetworkAddress getAddressIndex() {
}
// This does not check for simulation, and as such is not safe for external callers
void clearPrefix_internal(std::map<std::string, TraceEventFields>& data, std::string prefix) {
void clearPrefix_internal(std::map<std::string, TraceEventFields>& data, std::string const& prefix) {
auto first = data.lower_bound(prefix);
auto last = data.lower_bound(strinc(prefix).toString());
data.erase(first, last);
}
void LatestEventCache::clear(std::string prefix) {
void LatestEventCache::clear(std::string const& prefix) {
clearPrefix_internal(latest[getAddressIndex()], prefix);
}
@ -575,7 +575,7 @@ void LatestEventCache::set(std::string tag, const TraceEventFields& contents) {
latest[getAddressIndex()][tag] = contents;
}
TraceEventFields LatestEventCache::get(std::string tag) {
TraceEventFields LatestEventCache::get(std::string const& tag) {
return latest[getAddressIndex()][tag];
}
@ -757,11 +757,11 @@ bool traceFileIsOpen() {
return g_traceLog.isOpen();
}
void addTraceRole(std::string role) {
void addTraceRole(std::string const& role) {
g_traceLog.addRole(role);
}
void removeTraceRole(std::string role) {
void removeTraceRole(std::string const& role) {
g_traceLog.removeRole(role);
}

View File

@ -519,11 +519,11 @@ struct TraceInterval {
struct LatestEventCache {
public:
void set(std::string tag, const TraceEventFields& fields);
TraceEventFields get(std::string tag);
TraceEventFields get(std::string const& tag);
std::vector<TraceEventFields> getAll();
std::vector<TraceEventFields> getAllUnsafe();
void clear(std::string prefix);
void clear(std::string const& prefix);
void clear();
// Latest error tracking only tracks errors when called from the main thread. Other errors are silently ignored.
@ -577,8 +577,8 @@ bool selectTraceClockSource(std::string source);
// Returns true iff source is recognized.
bool validateTraceClockSource(std::string source);
void addTraceRole(std::string role);
void removeTraceRole(std::string role);
void addTraceRole(std::string const& role);
void removeTraceRole(std::string const& role);
void retrieveTraceLogIssues(std::set<std::string>& out);
void setTraceLogGroup(const std::string& role);
template <class T>

55
flow/TypeTraits.h Normal file
View File

@ -0,0 +1,55 @@
/*
* TypeTraits.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file, similar to `type_traits` in the standard library, contains utility types that can be used for template
// metaprogramming. While they can be very useful and simplify certain things, please be aware that their use will
// increase compilation times significantly. Therefore it is not recommended to use them in header file if not
// absosultely necessary.
#pragma once
#include <variant>
// This type class will take two std::variant types and concatenate them
template <class L, class R>
struct variant_concat_t;
template <class... Args1, class... Args2>
struct variant_concat_t<std::variant<Args1...>, std::variant<Args2...>> {
using type = std::variant<Args1..., Args2...>;
};
// Helper definition for variant_concat_t. Instead of using `typename variant_concat_t<...>::type` one can simply use
// `variant_concat<...>`
template <class L, class R>
using variant_concat = typename variant_concat_t<L, R>::type;
// Takes a std::variant as first argument and applies Fun to all of them. For example: typename
// variant_map_t<std::variant<int, bool>, std::add_pointer_t>::type will be defined as std::variant<int*, bool*>
template <class T, template <class> class Fun>
struct variant_map_t;
template <class... Args, template <class> class Fun>
struct variant_map_t<std::variant<Args...>, Fun> {
using type = std::variant<Fun<Args>...>;
};
// Helper definition for variant_map_t. Instead of using `typename variant_map<...>::type` one can simple use
// `varirant_map<...>` which is equivalent but shorter.
template <class T, template <class> class Fun>
using variant_map = typename variant_map_t<T, Fun>::type;

View File

@ -93,7 +93,7 @@ std::string UID::shortString() const {
void detectFailureAfter(int const& address, double const& delay);
Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit) {
Optional<uint64_t> parse_with_suffix(std::string const& toparse, std::string const& default_unit) {
char* endptr;
uint64_t ret = strtoull(toparse.c_str(), &endptr, 10);
@ -144,7 +144,7 @@ Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_un
// m - minutes
// h - hours
// d - days
Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit) {
Optional<uint64_t> parseDuration(std::string const& str, std::string const& defaultUnit) {
char* endptr;
uint64_t ret = strtoull(str.c_str(), &endptr, 10);
@ -284,7 +284,7 @@ std::vector<double> P_BUGGIFIED_SECTION_FIRES{ .25, .25 };
double P_EXPENSIVE_VALIDATION = .05;
int getSBVar(std::string file, int line, BuggifyType type) {
int getSBVar(std::string const& file, int line, BuggifyType type) {
if (!buggifyActivated[int(type)])
return 0;

View File

@ -72,7 +72,7 @@ extern double P_EXPENSIVE_VALIDATION;
enum class BuggifyType : uint8_t { General = 0, Client };
bool isBuggifyEnabled(BuggifyType type);
void clearBuggifySections(BuggifyType type);
int getSBVar(std::string file, int line, BuggifyType);
int getSBVar(std::string const& file, int line, BuggifyType);
void enableBuggify(bool enabled,
BuggifyType type); // Currently controls buggification and (randomized) expensive validation
bool validationIsEnabled(BuggifyType type);
@ -83,8 +83,8 @@ bool validationIsEnabled(BuggifyType type);
#define EXPENSIVE_VALIDATION \
(validationIsEnabled(BuggifyType::General) && deterministicRandom()->random01() < P_EXPENSIVE_VALIDATION)
extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
extern Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit = "");
extern Optional<uint64_t> parse_with_suffix(std::string const& toparse, std::string const& default_unit = "");
extern Optional<uint64_t> parseDuration(std::string const& str, std::string const& defaultUnit = "");
extern std::string format(const char* form, ...);
// On success, returns the number of characters written. On failure, returns a negative number.

View File

@ -46,7 +46,7 @@ std::string IPAddress::toString() const {
}
}
Optional<IPAddress> IPAddress::parse(std::string str) {
Optional<IPAddress> IPAddress::parse(std::string const& str) {
try {
auto addr = boost::asio::ip::address::from_string(str);
return addr.is_v6() ? IPAddress(addr.to_v6().to_bytes()) : IPAddress(addr.to_v4().to_ulong());

View File

@ -150,7 +150,7 @@ public:
const IPAddressStore& toV6() const { return std::get<IPAddressStore>(addr); }
std::string toString() const;
static Optional<IPAddress> parse(std::string str);
static Optional<IPAddress> parse(std::string const& str);
bool operator==(const IPAddress& addr) const;
bool operator!=(const IPAddress& addr) const;

View File

@ -155,6 +155,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/Watches.toml)
add_fdb_test(TEST_FILES fast/WriteDuringRead.toml)
add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml)
add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT)
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml)

View File

@ -117,7 +117,7 @@ class LogParser:
continue
if 'Type' not in obj:
continue
if obj['Severity'] == '40':
if obj['Severity'] == '40' and obj.get('ErrorIsInjectedFault', None) != '1':
self.fail()
if self.name is not None:
obj['testname'] = self.name
@ -390,7 +390,11 @@ def run_simulation_test(basedir, options):
os.remove(trace)
if options.keep_simdirs == 'NONE' or options.keep_simdirs == 'FAILED' and res:
print("Delete {}".format(os.path.join(wd, 'simfdb')))
shutil.rmtree(os.path.join(wd, 'simfdb'))
# Don't fail if the directory doesn't exist.
try:
shutil.rmtree(os.path.join(wd, 'simfdb'))
except FileNotFoundError:
pass
if len(os.listdir(wd)) == 0:
print("Delete {} - empty".format(wd))
os.rmdir(wd)

View File

@ -0,0 +1,9 @@
[[test]]
testTitle = 'UnitTests'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 1
testsMatching = 'noSim/'