Merge branch 'master' into expired-forward

This commit is contained in:
Dan Lambright 2021-07-26 19:58:29 -04:00 committed by GitHub
commit 55094bdd00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 141 additions and 46 deletions

View File

@ -165,9 +165,13 @@ public class KeySelector {
}
/**
* Returns the {@code or-equal} parameter of this {@code KeySelector}. For internal use.
* Returns the orEqual parameter for this {@code KeySelector}. See the
* {@link #KeySelector(byte[], boolean, int)} KeySelector constructor}
* for more details.
*
* @return the {@code or-equal} parameter of this {@code KeySelector}.
*/
boolean orEqual() {
public boolean orEqual() {
return orEqual;
}

View File

@ -966,6 +966,10 @@ namespace SummarizeTest
// When running ASAN we expect to see this message. Boost coroutine should be using the correct asan annotations so that it shouldn't produce any false positives.
continue;
}
if (err.EndsWith("Warning: unimplemented fcntl command: 1036")) {
// Valgrind produces this warning when F_SET_RW_HINT is used
continue;
}
if (stderrSeverity == (int)Magnesium.Severity.SevError)
{
error = true;

View File

@ -26,6 +26,28 @@
#include "flow/Platform.h"
#include "flow/actorcompiler.h" // has to be last include
namespace {
std::string trim(std::string const& connectionString) {
// Strip out whitespace
// Strip out characters between a # and a newline
std::string trimmed;
auto end = connectionString.end();
for (auto c = connectionString.begin(); c != end; ++c) {
if (*c == '#') {
++c;
while (c != end && *c != '\n' && *c != '\r')
++c;
if (c == end)
break;
} else if (*c != ' ' && *c != '\n' && *c != '\r' && *c != '\t')
trimmed += *c;
}
return trimmed;
}
} // namespace
std::pair<std::string, bool> ClusterConnectionFile::lookupClusterFileName(std::string const& filename) {
if (filename.length())
return std::make_pair(filename, false);
@ -154,24 +176,6 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E
}
}
std::string trim(std::string const& connectionString) {
// Strip out whitespace
// Strip out characters between a # and a newline
std::string trimmed;
auto end = connectionString.end();
for (auto c = connectionString.begin(); c != end; ++c) {
if (*c == '#') {
++c;
while (c != end && *c != '\n' && *c != '\r')
++c;
if (c == end)
break;
} else if (*c != ' ' && *c != '\n' && *c != '\r' && *c != '\t')
trimmed += *c;
}
return trimmed;
}
ClusterConnectionString::ClusterConnectionString(std::string const& connectionString) {
auto trimmed = trim(connectionString);
@ -838,6 +842,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
clientInfo->set(ni);
successIdx = idx;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cannot talk to cluster controller
idx = (idx + 1) % addrs.size();
if (idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));

View File

@ -366,6 +366,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
init( COMMIT_PROXY_LIVENESS_TIMEOUT, 20.0 );
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;
@ -644,6 +645,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DBINFO_FAILED_DELAY, 1.0 );
init( ENABLE_WORKER_HEALTH_MONITOR, false );
init( WORKER_HEALTH_MONITOR_INTERVAL, 60.0 );
init( PEER_LATENCY_CHECK_MIN_POPULATION, 30 );
init( PEER_LATENCY_DEGRADATION_PERCENTILE, 0.90 );
init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 );
init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 );
@ -655,6 +657,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL = 10.0;
init( FORWARD_REQUEST_TOO_OLD, 4*24*60*60 ); if( randomize && BUGGIFY ) FORWARD_REQUEST_TOO_OLD = 60.0;
init( ENABLE_CROSS_CLUSTER_SUPPORT, true ); if( randomize && BUGGIFY ) ENABLE_CROSS_CLUSTER_SUPPORT = false;
init( COORDINATOR_LEADER_CONNECTION_TIMEOUT, 20.0 );
// Buggification
init( BUGGIFIED_EVENTUAL_CONSISTENCY, 1.0 );

View File

@ -297,6 +297,7 @@ public:
double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
double COMMIT_PROXY_LIVENESS_TIMEOUT;
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;
@ -594,6 +595,7 @@ public:
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match
// the local descriptor
double FORWARD_REQUEST_TOO_OLD; // Do not forward requests older than this setting
double COORDINATOR_LEADER_CONNECTION_TIMEOUT;
// Buggification
double BUGGIFIED_EVENTUAL_CONSISTENCY;

View File

@ -46,10 +46,11 @@
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "flow/FaultInjection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
bool simulator_should_inject_fault(const char* context, const char* file, int line, int error_code) {
if (!g_network->isSimulated())
if (!g_network->isSimulated() || !faultInjectionActivated)
return false;
auto p = g_simulator.getCurrentProcess();

View File

@ -4722,7 +4722,7 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
loop {
try {
while (!self->goodRecruitmentTime.isReady()) {
wait(self->goodRecruitmentTime);
wait(lowPriorityDelay(SERVER_KNOBS->CC_WORKER_HEALTH_CHECKING_INTERVAL));
}
self->degradedServers = self->getServersWithDegradedLink();

View File

@ -1923,10 +1923,14 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
lastCommit = now();
if (trs.size() || lastCommitComplete.isReady()) {
lastCommitComplete =
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes);
lastCommitComplete = transformError(
timeoutError(
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes),
SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT),
timed_out(),
failed_to_progress());
addActor.send(lastCommitComplete);
}
}
@ -2068,9 +2072,11 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out) {
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out &&
e.code() != error_code_failed_to_progress) {
throw;
}
TEST(e.code() == error_code_failed_to_progress); // Commit proxy failed to progress
}
return Void();
}

View File

@ -37,6 +37,30 @@
// This module implements coordinationServer() and the interfaces in CoordinationInterface.h
namespace {
class LivenessChecker {
double threshold;
AsyncVar<double> lastTime;
ACTOR static Future<Void> checkStuck(LivenessChecker const* self) {
loop {
choose {
when(wait(delayUntil(self->lastTime.get() + self->threshold))) { return Void(); }
when(wait(self->lastTime.onChange())) {}
}
}
}
public:
explicit LivenessChecker(double threshold) : threshold(threshold), lastTime(now()) {}
void confirmLiveness() { lastTime.set(now()); }
Future<Void> checkStuck() const { return checkStuck(this); }
};
} // namespace
struct GenerationRegVal {
UniqueGeneration readGen, writeGen;
Optional<Value> val;
@ -179,7 +203,10 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
ACTOR Future<Void> openDatabase(ClientData* db,
int* clientCount,
Reference<AsyncVar<bool>> hasConnectedClients,
OpenDatabaseCoordRequest req) {
OpenDatabaseCoordRequest req,
Future<Void> checkStuck) {
state ErrorOr<CachedSerialization<ClientDBInfo>> replyContents;
++(*clientCount);
hasConnectedClients->set(true);
@ -191,19 +218,27 @@ ACTOR Future<Void> openDatabase(ClientData* db,
while (db->clientInfo->get().read().id == req.knownClientInfoID &&
!db->clientInfo->get().read().forward.present()) {
choose {
when(wait(checkStuck)) {
replyContents = failed_to_progress();
break;
}
when(wait(yieldedFuture(db->clientInfo->onChange()))) {}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) {
if (req.supportedVersions.size() > 0) {
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
}
replyContents = db->clientInfo->get();
break;
} // The client might be long gone!
}
}
if (req.supportedVersions.size() > 0) {
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
if (replyContents.present()) {
req.reply.send(replyContents.get());
} else {
req.reply.sendError(replyContents.getError());
}
req.reply.send(db->clientInfo->get());
if (--(*clientCount) == 0) {
hasConnectedClients->set(false);
}
@ -255,6 +290,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
state AsyncVar<Value> leaderInterface;
state Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader =
makeReference<AsyncVar<Optional<LeaderInfo>>>();
state LivenessChecker canConnectToLeader(SERVER_KNOBS->COORDINATOR_LEADER_CONNECTION_TIMEOUT);
loop choose {
when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) {
@ -266,7 +302,8 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
leaderMon =
monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader);
}
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
actors.add(
openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck()));
}
}
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
@ -320,8 +357,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
// TODO: use notify to only send a heartbeat once per interval
availableLeaders.erase(LeaderInfo(req.prevChangeID));
availableLeaders.insert(req.myInfo);
req.reply.send(
LeaderHeartbeatReply{ currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) });
bool const isCurrentLeader = currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo);
if (isCurrentLeader) {
canConnectToLeader.confirmLiveness();
}
req.reply.send(LeaderHeartbeatReply{ isCurrentLeader });
}
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
LeaderInfo newInfo;

View File

@ -41,6 +41,7 @@
#include "flow/ProtocolVersion.h"
#include "flow/network.h"
#include "flow/TypeTraits.h"
#include "flow/FaultInjection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#undef max
@ -1647,7 +1648,7 @@ void SimulationConfig::setTss(const TestConfig& testConfig) {
tssCount =
std::max(0, std::min(tssCount, (db.usableRegions * (machine_count / datacenters) - replication_type) / 2));
if (!testConfig.config.present() && tssCount > 0) {
if (!testConfig.config.present() && tssCount > 0 && faultInjectionActivated) {
std::string confStr = format("tss_count:=%d tss_storage_engine:=%d", tssCount, db.storageServerStoreType);
set_config(confStr);
double tssRandom = deterministicRandom()->random01();

View File

@ -68,6 +68,7 @@
#include "flow/TLSConfig.actor.h"
#include "flow/Tracing.h"
#include "flow/UnitTest.h"
#include "flow/FaultInjection.h"
#if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h>
@ -92,7 +93,7 @@ enum {
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR,
OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION,
};
CSimpleOpt::SOption g_rgOptions[] = {
@ -177,6 +178,8 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_BLOB_CREDENTIAL_FILE, "--blob_credential_file", SO_REQ_SEP },
{ OPT_CONFIG_PATH, "--config_path", SO_REQ_SEP },
{ OPT_USE_TEST_CONFIG_DB, "--use_test_config_db", SO_NONE },
{ OPT_FAULT_INJECTION, "-fi", SO_REQ_SEP },
{ OPT_FAULT_INJECTION, "--fault_injection", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
@ -646,6 +649,7 @@ static void printUsage(const char* name, bool devhelp) {
"--kvfile FILE",
"Input file (SQLite database file) for use by the 'kvfilegeneratesums' and 'kvfileintegritycheck' roles.");
printOptionUsage("-b [on,off], --buggify [on,off]", " Sets Buggify system state, defaults to `off'.");
printOptionUsage("-fi [on,off], --fault_injection [on,off]", " Sets fault injection, defaults to `on'.");
printOptionUsage("--crash", "Crash on serious errors instead of continuing.");
printOptionUsage("-N NETWORKIMPL, --network NETWORKIMPL",
" Select network implementation, `net2' (default),"
@ -960,7 +964,7 @@ struct CLIOptions {
8LL << 30; // Nice to maintain the same default value for memLimit and SERVER_KNOBS->SERVER_MEM_LIMIT and
// SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT
uint64_t storageMemLimit = 1LL << 30;
bool buggifyEnabled = false, restarting = false;
bool buggifyEnabled = false, faultInjectionEnabled = true, restarting = false;
Optional<Standalone<StringRef>> zoneId;
Optional<Standalone<StringRef>> dcId;
ProcessClass processClass = ProcessClass(ProcessClass::UnsetClass, ProcessClass::CommandLineSource);
@ -1382,6 +1386,17 @@ private:
flushAndExit(FDB_EXIT_ERROR);
}
break;
case OPT_FAULT_INJECTION:
if (!strcmp(args.OptionArg(), "on"))
faultInjectionEnabled = true;
else if (!strcmp(args.OptionArg(), "off"))
faultInjectionEnabled = false;
else {
fprintf(stderr, "ERROR: Unknown fault injection state `%s'\n", args.OptionArg());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
break;
case OPT_CRASHONERROR:
g_crashOnError = true;
break;
@ -1638,6 +1653,7 @@ int main(int argc, char* argv[]) {
setThreadLocalDeterministicRandomSeed(opts.randomSeed);
enableBuggify(opts.buggifyEnabled, BuggifyType::General);
enableFaultInjection(opts.faultInjectionEnabled);
IKnobCollection::setGlobalKnobCollection(IKnobCollection::Type::SERVER,
Randomize::True,
@ -1795,6 +1811,7 @@ int main(int argc, char* argv[]) {
.detail("CommandLine", opts.commandLine)
.setMaxFieldLength(0)
.detail("BuggifyEnabled", opts.buggifyEnabled)
.detail("FaultInjectionEnabled", opts.faultInjectionEnabled)
.detail("MemoryLimit", opts.memLimit)
.trackLatest("ProgramStart");

View File

@ -25,6 +25,7 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/FaultInjection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
static std::set<int> const& normalAttritionErrors() {
@ -78,8 +79,8 @@ struct MachineAttritionWorkload : TestWorkload {
std::vector<LocalityData> machines;
MachineAttritionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled =
!clientId && g_network->isSimulated(); // only do this on the "first" client, and only when in simulation
// only do this on the "first" client, and only when in simulation and only when fault injection is enabled
enabled = !clientId && g_network->isSimulated() && faultInjectionActivated;
machinesToKill = getOption(options, LiteralStringRef("machinesToKill"), 2);
machinesToLeave = getOption(options, LiteralStringRef("machinesToLeave"), 1);
workersToKill = getOption(options, LiteralStringRef("workersToKill"), 2);

View File

@ -20,4 +20,9 @@
#include "flow/FaultInjection.h"
bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool faultInjectionActivated = true;
void enableFaultInjection(bool enabled) {
faultInjectionActivated = enabled;
}

View File

@ -32,6 +32,8 @@
#define SHOULD_INJECT_FAULT(context) (should_inject_fault && should_inject_fault(context, __FILE__, __LINE__, 0))
extern bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code);
extern bool faultInjectionActivated;
extern void enableFaultInjection(bool enabled); // Enable fault injection called from fdbserver actor main function
#else
#define INJECT_FAULT(error_type, context)
#endif

View File

@ -1,3 +1,6 @@
// Thread naming only works on Linux.
#if defined(__linux__)
#include "flow/IThreadPool.h"
#include <pthread.h>
@ -6,9 +9,6 @@
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
// Thread naming only works on Linux.
#if defined(__linux__)
void forceLinkIThreadPoolTests() {}
struct ThreadNameReceiver : IThreadPoolReceiver {

View File

@ -99,6 +99,7 @@ ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup w
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
ERROR( grv_proxy_failed, 1214, "Master terminating because a GRV CommitProxy failed" )
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -21,6 +21,7 @@
#pragma once
#include <algorithm>
#include <iterator>
#include <cstring>
#include <functional>
#include <map>

View File

@ -9,12 +9,14 @@ connectionFailuresDisableDuration = 100000
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 1000.0
testDuration = 30.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'LowLatency'
testDuration = 30.0
maxGRVLatency = 40.0
maxCommitLatency = 40.0
testDuration = 60.0
[[test.workload]]
testName = 'ClogSingleConnection'