added a workload which tests killing an entire region, and recovering from the failure with data loss.

fix: we cannot pop the txs tag from remote logs until they have a full copy of the txnStateStore
fix: we have to modify all of history, we cannot stop after finding a local remote
This commit is contained in:
Evan Tschannen 2018-09-17 18:32:39 -07:00
parent 16018ee36d
commit 200e65fe61
16 changed files with 277 additions and 53 deletions

View File

@ -1398,6 +1398,27 @@ ACTOR Future<Void> forceRecovery (Reference<ClusterConnectionFile> clusterFile)
}
}
ACTOR Future<Void> waitForPrimaryDC( Database cx, StringRef dcId ) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> res = wait( tr.get(primaryDatacenterKey) );
if(res.present() && res.get() == dcId) {
return Void();
}
state Future<Void> watchFuture = tr.watch(primaryDatacenterKey);
Void _ = wait(tr.commit());
Void _ = wait(watchFuture);
tr.reset();
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
}
}
json_spirit::Value_type normJSONType(json_spirit::Value_type type) {
if (type == json_spirit::int_type)
return json_spirit::real_type;

View File

@ -165,6 +165,8 @@ Future<int> setDDMode( Database const& cx, int const& mode );
Future<Void> forceRecovery (Reference<ClusterConnectionFile> const& clusterFile);
Future<Void> waitForPrimaryDC( Database const& cx, StringRef const& dcId );
// Gets the cluster connection string
Future<std::vector<NetworkAddress>> getCoordinators( Database const& cx );

View File

@ -1279,7 +1279,7 @@ public:
}
// Check if machine can be removed, if requested
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
{
std::vector<ProcessInfo*> processesLeft, processesDead;
int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0;
@ -1377,7 +1377,7 @@ public:
return true;
}
virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, KillType* ktFinal) {
virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill, KillType* ktFinal) {
auto ktOrig = kt;
auto processes = getAllProcesses();
std::map<Optional<Standalone<StringRef>>, int> datacenterZones;
@ -1400,7 +1400,7 @@ public:
}
// Check if machine can be removed, if requested
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
{
std::vector<ProcessInfo*> processesLeft, processesDead;
for (auto processInfo : getAllProcesses()) {

View File

@ -149,8 +149,8 @@ public:
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) = 0;
virtual void rebootProcess( ProcessInfo* process, KillType kt ) = 0;
virtual void killInterface( NetworkAddress address, KillType ) = 0;
virtual bool killMachine(Optional<Standalone<StringRef>> zoneId, KillType, bool forceKill = false, KillType* ktFinal = NULL) = 0;
virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, KillType* ktFinal = NULL) = 0;
virtual bool killMachine(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0;
virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0;
//virtual KillType getMachineKillState( UID zoneID ) = 0;
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const = 0;
virtual bool isAvailable() const = 0;
@ -285,6 +285,7 @@ public:
int32_t usableRegions;
std::string disablePrimary;
std::string disableRemote;
std::string originalRegions;
bool allowLogSetKills;
Optional<Standalone<StringRef>> remoteDcId;
bool hasSatelliteReplication;

View File

@ -236,6 +236,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA, 0.1 );
init( COMMIT_TRANSACTION_BATCH_COUNT_MAX, 32768 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_COUNT_MAX = 1000; // Do NOT increase this number beyond 32768, as CommitIds only budget 2 bytes for storing transaction id within each batch
init( COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, 8LL << 30 ); if (randomize && BUGGIFY) COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT = g_random->randomInt64(100LL << 20, 8LL << 30);
init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 );
// these settings disable batch bytes scaling. Try COMMIT_TRANSACTION_BATCH_BYTES_MAX=1e6, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE=50000, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER=0.5?
init( COMMIT_TRANSACTION_BATCH_BYTES_MIN, 100000 );
@ -247,8 +249,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( RESOLVER_COALESCE_TIME, 1.0 );
init( BUGGIFIED_ROW_LIMIT, APPLY_MUTATION_BYTES ); if( randomize && BUGGIFY ) BUGGIFIED_ROW_LIMIT = g_random->randomInt(3, 30);
init( PROXY_SPIN_DELAY, 0.01 );
init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 );
init( UPDATE_REMOTE_LOG_VERSION_INTERVAL, 2.0 );
init( MAX_TXS_POP_VERSION_HISTORY, 1e5 );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)

View File

@ -193,6 +193,8 @@ public:
double RESOLVER_COALESCE_TIME;
int BUGGIFIED_ROW_LIMIT;
double PROXY_SPIN_DELAY;
double UPDATE_REMOTE_LOG_VERSION_INTERVAL;
int MAX_TXS_POP_VERSION_HISTORY;
// Master Server
double COMMIT_SLEEP_TIME;

View File

@ -203,6 +203,8 @@ struct ProxyCommitData {
std::map<UID, Reference<StorageInfo>> storageCache;
std::map<Tag, Version> tag_popped;
Deque<std::pair<Version, Version>> txsPopVersions;
Version lastTxsPop;
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
@ -231,7 +233,7 @@ struct ProxyCommitData {
getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
localCommitBatchesStarted(0), locked(false), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")),
commitBatchesMemBytesCount(0)
commitBatchesMemBytesCount(0), lastTxsPop(0)
{}
};
@ -837,6 +839,14 @@ ACTOR Future<Void> commitBatch(
}
Void _ = wait(yield());
if(!self->txsPopVersions.size() || msg.popTo > self->txsPopVersions.back().second) {
if(self->txsPopVersions.size() > SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) {
TraceEvent(SevWarnAlways, "DiscardingTxsPopHistory").suppressFor(1.0);
self->txsPopVersions.pop_front();
}
self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo));
}
self->logSystem->pop(msg.popTo, txsTag);
/////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for latency)
@ -1188,6 +1198,62 @@ ACTOR static Future<Void> readRequestServer(
}
}
ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
loop {
Void _ = wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
state Optional<std::vector<OptionalInterface<TLogInterface>>> remoteLogs;
if(db->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : db->get().logSystemConfig.tLogs) {
if(!logSet.isLocal) {
remoteLogs = logSet.tLogs;
for(auto& tLog : logSet.tLogs) {
if(!tLog.present()) {
remoteLogs = Optional<std::vector<OptionalInterface<TLogInterface>>>();
break;
}
}
break;
}
}
}
if(!remoteLogs.present()) {
Void _ = wait(db->onChange());
continue;
}
state Future<Void> onChange = db->onChange();
loop {
state std::vector<Future<TLogQueuingMetricsReply>> replies;
for(auto &it : remoteLogs.get()) {
replies.push_back(brokenPromiseToNever( it.interf().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) ));
}
Void _ = wait( waitForAll(replies) );
if(onChange.isReady()) {
break;
}
//FIXME: use the configuration to calculate a more precise minimum recovery version.
Version minVersion = std::numeric_limits<Version>::max();
for(auto& it : replies) {
minVersion = std::min(minVersion, it.get().v);
}
while(self->txsPopVersions.size() && self->txsPopVersions.front().first <= minVersion) {
self->lastTxsPop = self->txsPopVersions.front().second;
self->logSystem->pop(self->txsPopVersions.front().second, txsTag, 0, tagLocalityRemoteLog);
self->txsPopVersions.pop_front();
}
Void _ = wait( delay(SERVER_KNOBS->UPDATE_REMOTE_LOG_VERSION_INTERVAL) || onChange );
if(onChange.isReady()) {
break;
}
}
}
}
ACTOR Future<Void> masterProxyServerCore(
MasterProxyInterface proxy,
MasterInterface master,
@ -1239,6 +1305,7 @@ ACTOR Future<Void> masterProxyServerCore(
state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData, db));
addActor.send(transactionStarter(proxy, master, db, addActor, &commitData));
addActor.send(readRequestServer(proxy, &commitData));
@ -1258,6 +1325,7 @@ ACTOR Future<Void> masterProxyServerCore(
for(auto it : commitData.tag_popped) {
commitData.logSystem->pop(it.second, it.first);
}
commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog);
}
}
when(Void _ = wait(onError)) {}

View File

@ -671,7 +671,7 @@ ACTOR Future<Void> restartSimulatedSystem(
}
struct SimulationConfig {
explicit SimulationConfig(int extraDB, int minimumReplication);
explicit SimulationConfig(int extraDB, int minimumReplication, int minimumRegions);
int extraDB;
DatabaseConfiguration db;
@ -684,11 +684,11 @@ struct SimulationConfig {
int processes_per_machine;
int coordinators;
private:
void generateNormalConfig(int minimumReplication);
void generateNormalConfig(int minimumReplication, int minimumRegions);
};
SimulationConfig::SimulationConfig(int extraDB, int minimumReplication) : extraDB(extraDB) {
generateNormalConfig(minimumReplication);
SimulationConfig::SimulationConfig(int extraDB, int minimumReplication, int minimumRegions) : extraDB(extraDB) {
generateNormalConfig(minimumReplication, minimumRegions);
}
void SimulationConfig::set_config(std::string config) {
@ -703,10 +703,10 @@ StringRef StringRefOf(const char* s) {
return StringRef((uint8_t*)s, strlen(s));
}
void SimulationConfig::generateNormalConfig(int minimumReplication) {
void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumRegions) {
set_config("new");
bool generateFearless = g_random->random01() < 0.5;
datacenters = generateFearless ? ( minimumReplication > 0 || g_random->random01() < 0.5 ? 4 : 6 ) : g_random->randomInt( 1, 4 );
bool generateFearless = minimumRegions > 1 || g_random->random01() < 0.5;
if (g_random->random01() < 0.25) db.desiredTLogCount = g_random->randomInt(1,7);
if (g_random->random01() < 0.25) db.masterProxyCount = g_random->randomInt(1,7);
if (g_random->random01() < 0.25) db.resolverCount = g_random->randomInt(1,7);
@ -759,6 +759,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
}
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {
//The kill region workload relies on the fact that all "0", "2", and "4" are all of the possible primary dcids.
StatusObject primaryObj;
StatusObject primaryDcObj;
primaryDcObj["id"] = "0";
@ -866,7 +867,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
}
//We cannot run with a remote DC when MAX_READ_TRANSACTION_LIFE_VERSIONS is too small, because the log routers will not be able to keep up.
if (g_random->random01() < 0.25 || SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < SERVER_KNOBS->VERSIONS_PER_SECOND) {
if (minimumRegions <= 1 && (g_random->random01() < 0.25 || SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < SERVER_KNOBS->VERSIONS_PER_SECOND)) {
TEST( true ); // Simulated cluster using one region
needsRemote = false;
} else {
@ -920,6 +921,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
set_config("regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none));
if(needsRemote) {
g_simulator.originalRegions = "regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none);
StatusArray disablePrimary = regionArr;
disablePrimary[0].get_obj()["datacenters"].get_array()[0].get_obj()["priority"] = -1;
g_simulator.disablePrimary = "regions=" + json_spirit::write_string(json_spirit::mValue(disablePrimary), json_spirit::Output_options::none);
@ -944,7 +947,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
}
//because we protect a majority of coordinators from being killed, it is better to run with low numbers of coordinators to prevent too many processes from being protected
coordinators = BUGGIFY ? g_random->randomInt(1, machine_count+1) : 1;
coordinators = ( minimumRegions <= 1 && BUGGIFY ) ? g_random->randomInt(1, machine_count+1) : 1;
if(minimumReplication > 1 && datacenters == 3) {
//low latency tests in 3 data hall mode need 2 other data centers with 2 machines each to avoid waiting for logs to recover.
@ -961,10 +964,10 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseFolder,
int* pTesterCount, Optional<ClusterConnectionString> *pConnString,
Standalone<StringRef> *pStartingConfiguration, int extraDB, int minimumReplication, Reference<TLSOptions> tlsOptions)
Standalone<StringRef> *pStartingConfiguration, int extraDB, int minimumReplication, int minimumRegions, Reference<TLSOptions> tlsOptions)
{
// SOMEDAY: this does not test multi-interface configurations
SimulationConfig simconfig(extraDB, minimumReplication);
SimulationConfig simconfig(extraDB, minimumReplication, minimumRegions);
StatusObject startingConfigJSON = simconfig.db.toJSON(true);
std::string startingConfigString = "new";
for( auto kv : startingConfigJSON) {
@ -1038,14 +1041,26 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
TEST( !sslEnabled ); // SSL disabled
vector<NetworkAddress> coordinatorAddresses;
for( int dc = 0; dc < dataCenters; dc++ ) {
int machines = machineCount / dataCenters + (dc < machineCount % dataCenters); // add remainder of machines to first datacenter
int dcCoordinators = coordinatorCount / dataCenters + (dc < coordinatorCount%dataCenters);
if(minimumRegions > 1) {
//do not put coordinators in the primary region so that we can kill that region safely
int nonPrimaryDcs = dataCenters/2;
for( int dc = 1; dc < dataCenters; dc+=2 ) {
int dcCoordinators = coordinatorCount / nonPrimaryDcs + ((dc-1)/2 < coordinatorCount%nonPrimaryDcs);
for(int m = 0; m < dcCoordinators; m++) {
uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m;
coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled));
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
}
}
} else {
for( int dc = 0; dc < dataCenters; dc++ ) {
int dcCoordinators = coordinatorCount / dataCenters + (dc < coordinatorCount%dataCenters);
for(int m = 0; m < dcCoordinators; m++) {
uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m;
coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled));
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
for(int m = 0; m < dcCoordinators; m++) {
uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m;
coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled));
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
}
}
}
@ -1184,7 +1199,7 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
.detail("StartingConfiguration", pStartingConfiguration->toString());
}
void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication) {
void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication, int &minimumRegions) {
std::ifstream ifs;
ifs.open(testFile, std::ifstream::in);
if (!ifs.good())
@ -1212,6 +1227,10 @@ void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication) {
if (attrib == "minimumReplication") {
sscanf( value.c_str(), "%d", &minimumReplication );
}
if (attrib == "minimumRegions") {
sscanf( value.c_str(), "%d", &minimumRegions );
}
}
ifs.close();
@ -1224,7 +1243,8 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
state int testerCount = 1;
state int extraDB = 0;
state int minimumReplication = 0;
checkExtraDB(testFile, extraDB, minimumReplication);
state int minimumRegions = 0;
checkExtraDB(testFile, extraDB, minimumReplication, minimumRegions);
Void _ = wait( g_simulator.onProcess( g_simulator.newProcess(
"TestSystem", 0x01010101, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) );
@ -1243,7 +1263,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
}
else {
g_expect_full_pointermap = 1;
setupSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, minimumReplication, tlsOptions );
setupSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, minimumReplication, minimumRegions, tlsOptions );
Void _ = wait( delay(1.0) ); // FIXME: WHY!!! //wait for machines to boot
}
std::string clusterFileDir = joinPath( dataFolder, g_random->randomUniqueID().toString() );

View File

@ -258,8 +258,6 @@ struct TLogData : NonCopyable {
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
Version prevVersion;
struct peekTrackerData {
std::map<int, Promise<Version>> sequence_version;
double lastUpdate;
@ -277,7 +275,7 @@ struct TLogData : NonCopyable {
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
: dbgid(dbgid), instanceID(g_random->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0),
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
updatePersist(Void()), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS)
{
@ -1204,7 +1202,6 @@ ACTOR Future<Void> tLogCommit(
}
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
self->prevVersion = logData->version.get();
logData->version.set( req.version );
if(req.debugID.present())
@ -1350,14 +1347,14 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
}
}
void getQueuingMetrics( TLogData* self, TLogQueuingMetricsRequest const& req ) {
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
TLogQueuingMetricsReply reply;
reply.localTime = now();
reply.instanceID = self->instanceID;
reply.bytesInput = self->bytesInput;
reply.bytesDurable = self->bytesDurable;
reply.storageBytes = self->persistentData->getStorageBytes();
reply.v = self->prevVersion;
reply.v = logData->queueCommittedVersion.get();
req.reply.send( reply );
}
@ -1409,7 +1406,7 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
logData->addActor.send( tLogLock(self, reply, logData) );
}
when (TLogQueuingMetricsRequest req = waitNext(tli.getQueuingMetrics.getFuture())) {
getQueuingMetrics(self, req);
getQueuingMetrics(self, logData, req);
}
when (TLogConfirmRunningRequest req = waitNext(tli.confirmRunning.getFuture())){
if (req.debugID.present() ) {
@ -1516,8 +1513,6 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
}
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
@ -1554,8 +1549,6 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
}
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
@ -1975,7 +1968,6 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
self->largeDiskQueueCommitBytes.set(true);
}
self->prevVersion = logData->version.get();
logData->version.set( req.recoverAt );
}

View File

@ -809,9 +809,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
popLogRouter(upTo, tag, durableKnownCommittedVersion, popLocality);
return;
}
ASSERT(popLocality == tagLocalityInvalid);
for(auto& t : tLogs) {
if(t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality < 0) {
if(t->locality == tagLocalitySpecial || t->locality == tag.locality || (tag.locality < 0 && ((popLocality == tagLocalityInvalid) == t->isLocal))) {
for(auto& log : t->logServers) {
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
if (prev < upTo)
@ -1200,13 +1199,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int modifiedLogSets = 0;
int removedLogSets = 0;
if(primaryLocality >= 0) {
bool remoteIsLocal = false;
auto copiedLogs = modifiedState.tLogs;
for(auto& coreSet : copiedLogs) {
if(coreSet.locality != primaryLocality && coreSet.locality >= 0) {
foundRemote = true;
remoteLocality = coreSet.locality;
remoteIsLocal = coreSet.isLocal;
modifiedState.tLogs.clear();
modifiedState.tLogs.push_back(coreSet);
modifiedState.tLogs[0].isLocal = true;
@ -1216,14 +1213,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
ASSERT( !remoteIsLocal );
while( !foundRemote && modifiedState.oldTLogData.size() ) {
for(auto& coreSet : modifiedState.oldTLogData[0].tLogs) {
if(coreSet.locality != primaryLocality && coreSet.locality >= tagLocalitySpecial) {
foundRemote = true;
remoteLocality = coreSet.locality;
remoteIsLocal = coreSet.isLocal;
if(coreSet.isLocal) {
modifiedState.tLogs = modifiedState.oldTLogData[0].tLogs;
modifiedState.logRouterTags = modifiedState.oldTLogData[0].logRouterTags;
@ -1242,14 +1236,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(foundRemote) {
for(int i = 0; i < modifiedState.oldTLogData.size() && !remoteIsLocal; i++) {
for(int i = 0; i < modifiedState.oldTLogData.size(); i++) {
bool found = false;
auto copiedLogs = modifiedState.oldTLogData[i].tLogs;
for(auto& coreSet : copiedLogs) {
if(coreSet.locality == remoteLocality || coreSet.locality == tagLocalitySpecial) {
found = true;
remoteIsLocal = coreSet.isLocal;
if(!coreSet.isLocal) {
if(!coreSet.isLocal || copiedLogs.size() > 1) {
modifiedState.oldTLogData[i].tLogs.clear();
modifiedState.oldTLogData[i].tLogs.push_back(coreSet);
modifiedState.oldTLogData[i].tLogs[0].isLocal = true;

View File

@ -63,6 +63,7 @@
<ActorCompiler Include="workloads\Inventory.actor.cpp" />
<ActorCompiler Include="workloads\BulkLoad.actor.cpp" />
<ActorCompiler Include="workloads\MachineAttrition.actor.cpp" />
<ActorCompiler Include="workloads\KillRegion.actor.cpp" />
<ActorCompiler Include="workloads\ReadWrite.actor.cpp" />
<ClCompile Include="sqlite\btree.c">
<ExcludedFromBuild>true</ExcludedFromBuild>

View File

@ -27,6 +27,9 @@
<ActorCompiler Include="workloads\MachineAttrition.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\KillRegion.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\ReadWrite.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
@ -368,4 +371,4 @@
<UniqueIdentifier>{de5e282f-8d97-4054-b795-0a75b772326f}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>
</Project>

View File

@ -1282,8 +1282,9 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
Void _ = wait(self->cstateUpdated.getFuture());
debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion);
if( debugResult )
TraceEvent(SevError, "DBRecoveryDurabilityError");
if( debugResult ) {
TraceEvent(self->forceRecovery ? SevWarn : SevError, "DBRecoveryDurabilityError");
}
TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);

View File

@ -959,6 +959,8 @@ vector<TestSpec> readTests( ifstream& ifs ) {
TraceEvent("TestParserTest").detail("ParsedExtraDB", "");
} else if( attrib == "minimumReplication" ) {
TraceEvent("TestParserTest").detail("ParsedMinimumReplication", "");
} else if( attrib == "minimumRegions" ) {
TraceEvent("TestParserTest").detail("ParsedMinimumRegions", "");
} else if( attrib == "buggify" ) {
TraceEvent("TestParserTest").detail("ParsedBuggify", "");
} else if( attrib == "checkOnly" ) {

View File

@ -0,0 +1,104 @@
/*
* KillRegion.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/actorcompiler.h"
#include "fdbclient/NativeAPI.h"
#include "fdbserver/TesterInterface.h"
#include "fdbserver/WorkerInterface.h"
#include "workloads.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ManagementAPI.h"
struct KillRegionWorkload : TestWorkload {
bool enabled;
double testDuration;
KillRegionWorkload( WorkloadContext const& wcx )
: TestWorkload(wcx)
{
enabled = !clientId && g_network->isSimulated(); // only do this on the "first" client, and only when in simulation
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
g_simulator.usableRegions = 1;
}
virtual std::string description() { return "KillRegionWorkload"; }
virtual Future<Void> setup( Database const& cx ) {
if(enabled) {
return _setup( this, cx );
}
return Void();
}
virtual Future<Void> start( Database const& cx ) {
if(enabled) {
return killRegion( this, cx );
}
return Void();
}
virtual Future<bool> check( Database const& cx ) { return true; }
virtual void getMetrics( vector<PerfMetric>& m ) {
}
ACTOR static Future<Void> _setup( KillRegionWorkload *self, Database cx ) {
TraceEvent("ForceRecovery_DisablePrimaryBegin");
ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.disablePrimary ) );
TraceEvent("ForceRecovery_WaitForRemote");
Void _ = wait( waitForPrimaryDC(cx, LiteralStringRef("1")) );
TraceEvent("ForceRecovery_DisablePrimaryComplete");
return Void();
}
ACTOR static Future<Void> killRegion( KillRegionWorkload *self, Database cx ) {
ASSERT( g_network->isSimulated() );
TraceEvent("ForceRecovery_DisableRemoteBegin");
ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.disableRemote ) );
TraceEvent("ForceRecovery_WaitForPrimary");
Void _ = wait( waitForPrimaryDC(cx, LiteralStringRef("0")) );
TraceEvent("ForceRecovery_DisableRemoteComplete");
ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.originalRegions ) );
TraceEvent("ForceRecovery_RestoreOriginalComplete");
Void _ = wait( delay( g_random->random01() * self->testDuration ) );
g_simulator.killDataCenter( LiteralStringRef("0"), ISimulator::RebootAndDelete, true );
g_simulator.killDataCenter( LiteralStringRef("2"), ISimulator::RebootAndDelete, true );
g_simulator.killDataCenter( LiteralStringRef("4"), ISimulator::RebootAndDelete, true );
loop {
TraceEvent("ForceRecovery_Begin");
Void _ = wait( forceRecovery(cx->cluster->getConnectionFile()) );
TraceEvent("ForceRecovery_Attempted");
state Transaction tr(cx);
try {
choose {
when( Version _ = wait(tr.getReadVersion()) ) {
TraceEvent("ForceRecovery_Complete");
break;
}
when( Void _ = wait(delay(120.0)) ) {}
}
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
return Void();
}
};
WorkloadFactory<KillRegionWorkload> KillRegionWorkloadFactory("KillRegion");

View File

@ -0,0 +1,12 @@
testTitle=KillRegionCycle
testName=Cycle
nodeCount=30000
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
testName=KillRegion
testDuration=30.0
minimumRegions=2