diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 5bcde2fd0c..a81b4c0293 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1398,6 +1398,27 @@ ACTOR Future forceRecovery (Reference clusterFile) } } +ACTOR Future waitForPrimaryDC( Database cx, StringRef dcId ) { + state ReadYourWritesTransaction tr(cx); + + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + Optional res = wait( tr.get(primaryDatacenterKey) ); + if(res.present() && res.get() == dcId) { + return Void(); + } + + state Future watchFuture = tr.watch(primaryDatacenterKey); + Void _ = wait(tr.commit()); + Void _ = wait(watchFuture); + tr.reset(); + } catch (Error& e) { + Void _ = wait( tr.onError(e) ); + } + } +} + json_spirit::Value_type normJSONType(json_spirit::Value_type type) { if (type == json_spirit::int_type) return json_spirit::real_type; diff --git a/fdbclient/ManagementAPI.h b/fdbclient/ManagementAPI.h index 18fce9d87a..6f1460cfac 100644 --- a/fdbclient/ManagementAPI.h +++ b/fdbclient/ManagementAPI.h @@ -165,6 +165,8 @@ Future setDDMode( Database const& cx, int const& mode ); Future forceRecovery (Reference const& clusterFile); +Future waitForPrimaryDC( Database const& cx, StringRef const& dcId ); + // Gets the cluster connection string Future> getCoordinators( Database const& cx ); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 368bdd539f..18f71fe146 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1279,7 +1279,7 @@ public: } // Check if machine can be removed, if requested - if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) + if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))) { std::vector processesLeft, processesDead; int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0; @@ -1377,7 +1377,7 @@ public: return true; } - virtual bool killDataCenter(Optional> dcId, KillType kt, KillType* ktFinal) { + virtual bool killDataCenter(Optional> dcId, KillType kt, bool forceKill, KillType* ktFinal) { auto ktOrig = kt; auto processes = getAllProcesses(); std::map>, int> datacenterZones; @@ -1400,7 +1400,7 @@ public: } // Check if machine can be removed, if requested - if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) + if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))) { std::vector processesLeft, processesDead; for (auto processInfo : getAllProcesses()) { diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 8bc66d9eb2..8a515484f4 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -149,8 +149,8 @@ public: virtual void rebootProcess(Optional> zoneId, bool allProcesses ) = 0; virtual void rebootProcess( ProcessInfo* process, KillType kt ) = 0; virtual void killInterface( NetworkAddress address, KillType ) = 0; - virtual bool killMachine(Optional> zoneId, KillType, bool forceKill = false, KillType* ktFinal = NULL) = 0; - virtual bool killDataCenter(Optional> dcId, KillType kt, KillType* ktFinal = NULL) = 0; + virtual bool killMachine(Optional> zoneId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0; + virtual bool killDataCenter(Optional> dcId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0; //virtual KillType getMachineKillState( UID zoneID ) = 0; virtual bool canKillProcesses(std::vector const& availableProcesses, std::vector const& deadProcesses, KillType kt, KillType* newKillType) const = 0; virtual bool isAvailable() const = 0; @@ -285,6 +285,7 @@ public: int32_t usableRegions; std::string disablePrimary; std::string disableRemote; + std::string originalRegions; bool allowLogSetKills; Optional> remoteDcId; bool hasSatelliteReplication; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index cb9c8e3019..dae5dcc0a1 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -236,6 +236,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA, 0.1 ); init( COMMIT_TRANSACTION_BATCH_COUNT_MAX, 32768 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_COUNT_MAX = 1000; // Do NOT increase this number beyond 32768, as CommitIds only budget 2 bytes for storing transaction id within each batch init( COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, 8LL << 30 ); if (randomize && BUGGIFY) COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT = g_random->randomInt64(100LL << 20, 8LL << 30); + init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 ); + init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 ); // these settings disable batch bytes scaling. Try COMMIT_TRANSACTION_BATCH_BYTES_MAX=1e6, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE=50000, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER=0.5? init( COMMIT_TRANSACTION_BATCH_BYTES_MIN, 100000 ); @@ -247,8 +249,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( RESOLVER_COALESCE_TIME, 1.0 ); init( BUGGIFIED_ROW_LIMIT, APPLY_MUTATION_BYTES ); if( randomize && BUGGIFY ) BUGGIFIED_ROW_LIMIT = g_random->randomInt(3, 30); init( PROXY_SPIN_DELAY, 0.01 ); - init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 ); - init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 ); + init( UPDATE_REMOTE_LOG_VERSION_INTERVAL, 2.0 ); + init( MAX_TXS_POP_VERSION_HISTORY, 1e5 ); // Master Server // masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution) diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index ae189fc372..4b41be2c57 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -193,6 +193,8 @@ public: double RESOLVER_COALESCE_TIME; int BUGGIFIED_ROW_LIMIT; double PROXY_SPIN_DELAY; + double UPDATE_REMOTE_LOG_VERSION_INTERVAL; + int MAX_TXS_POP_VERSION_HISTORY; // Master Server double COMMIT_SLEEP_TIME; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 94cd78434d..67d240c985 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -203,6 +203,8 @@ struct ProxyCommitData { std::map> storageCache; std::map tag_popped; + Deque> txsPopVersions; + Version lastTxsPop; //The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient. //When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated. @@ -231,7 +233,7 @@ struct ProxyCommitData { getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0), localCommitBatchesStarted(0), locked(false), firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), - commitBatchesMemBytesCount(0) + commitBatchesMemBytesCount(0), lastTxsPop(0) {} }; @@ -837,6 +839,14 @@ ACTOR Future commitBatch( } Void _ = wait(yield()); + if(!self->txsPopVersions.size() || msg.popTo > self->txsPopVersions.back().second) { + if(self->txsPopVersions.size() > SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) { + TraceEvent(SevWarnAlways, "DiscardingTxsPopHistory").suppressFor(1.0); + self->txsPopVersions.pop_front(); + } + + self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo)); + } self->logSystem->pop(msg.popTo, txsTag); /////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for latency) @@ -1188,6 +1198,62 @@ ACTOR static Future readRequestServer( } } +ACTOR Future monitorRemoteCommitted(ProxyCommitData* self, Reference> db) { + loop { + Void _ = wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes. + state Optional>> remoteLogs; + if(db->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) { + for(auto& logSet : db->get().logSystemConfig.tLogs) { + if(!logSet.isLocal) { + remoteLogs = logSet.tLogs; + for(auto& tLog : logSet.tLogs) { + if(!tLog.present()) { + remoteLogs = Optional>>(); + break; + } + } + break; + } + } + } + + if(!remoteLogs.present()) { + Void _ = wait(db->onChange()); + continue; + } + + state Future onChange = db->onChange(); + loop { + state std::vector> replies; + for(auto &it : remoteLogs.get()) { + replies.push_back(brokenPromiseToNever( it.interf().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) )); + } + Void _ = wait( waitForAll(replies) ); + + if(onChange.isReady()) { + break; + } + + //FIXME: use the configuration to calculate a more precise minimum recovery version. + Version minVersion = std::numeric_limits::max(); + for(auto& it : replies) { + minVersion = std::min(minVersion, it.get().v); + } + + while(self->txsPopVersions.size() && self->txsPopVersions.front().first <= minVersion) { + self->lastTxsPop = self->txsPopVersions.front().second; + self->logSystem->pop(self->txsPopVersions.front().second, txsTag, 0, tagLocalityRemoteLog); + self->txsPopVersions.pop_front(); + } + + Void _ = wait( delay(SERVER_KNOBS->UPDATE_REMOTE_LOG_VERSION_INTERVAL) || onChange ); + if(onChange.isReady()) { + break; + } + } + } +} + ACTOR Future masterProxyServerCore( MasterProxyInterface proxy, MasterInterface master, @@ -1239,6 +1305,7 @@ ACTOR Future masterProxyServerCore( state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR)); TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit); + addActor.send(monitorRemoteCommitted(&commitData, db)); addActor.send(transactionStarter(proxy, master, db, addActor, &commitData)); addActor.send(readRequestServer(proxy, &commitData)); @@ -1258,6 +1325,7 @@ ACTOR Future masterProxyServerCore( for(auto it : commitData.tag_popped) { commitData.logSystem->pop(it.second, it.first); } + commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog); } } when(Void _ = wait(onError)) {} diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 1e7ce30ba0..b49ee7900c 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -671,7 +671,7 @@ ACTOR Future restartSimulatedSystem( } struct SimulationConfig { - explicit SimulationConfig(int extraDB, int minimumReplication); + explicit SimulationConfig(int extraDB, int minimumReplication, int minimumRegions); int extraDB; DatabaseConfiguration db; @@ -684,11 +684,11 @@ struct SimulationConfig { int processes_per_machine; int coordinators; private: - void generateNormalConfig(int minimumReplication); + void generateNormalConfig(int minimumReplication, int minimumRegions); }; -SimulationConfig::SimulationConfig(int extraDB, int minimumReplication) : extraDB(extraDB) { - generateNormalConfig(minimumReplication); +SimulationConfig::SimulationConfig(int extraDB, int minimumReplication, int minimumRegions) : extraDB(extraDB) { + generateNormalConfig(minimumReplication, minimumRegions); } void SimulationConfig::set_config(std::string config) { @@ -703,10 +703,10 @@ StringRef StringRefOf(const char* s) { return StringRef((uint8_t*)s, strlen(s)); } -void SimulationConfig::generateNormalConfig(int minimumReplication) { +void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumRegions) { set_config("new"); - bool generateFearless = g_random->random01() < 0.5; datacenters = generateFearless ? ( minimumReplication > 0 || g_random->random01() < 0.5 ? 4 : 6 ) : g_random->randomInt( 1, 4 ); + bool generateFearless = minimumRegions > 1 || g_random->random01() < 0.5; if (g_random->random01() < 0.25) db.desiredTLogCount = g_random->randomInt(1,7); if (g_random->random01() < 0.25) db.masterProxyCount = g_random->randomInt(1,7); if (g_random->random01() < 0.25) db.resolverCount = g_random->randomInt(1,7); @@ -759,6 +759,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) { } if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) { + //The kill region workload relies on the fact that all "0", "2", and "4" are all of the possible primary dcids. StatusObject primaryObj; StatusObject primaryDcObj; primaryDcObj["id"] = "0"; @@ -866,7 +867,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) { } //We cannot run with a remote DC when MAX_READ_TRANSACTION_LIFE_VERSIONS is too small, because the log routers will not be able to keep up. - if (g_random->random01() < 0.25 || SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < SERVER_KNOBS->VERSIONS_PER_SECOND) { + if (minimumRegions <= 1 && (g_random->random01() < 0.25 || SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < SERVER_KNOBS->VERSIONS_PER_SECOND)) { TEST( true ); // Simulated cluster using one region needsRemote = false; } else { @@ -920,6 +921,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) { set_config("regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none)); if(needsRemote) { + g_simulator.originalRegions = "regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none); + StatusArray disablePrimary = regionArr; disablePrimary[0].get_obj()["datacenters"].get_array()[0].get_obj()["priority"] = -1; g_simulator.disablePrimary = "regions=" + json_spirit::write_string(json_spirit::mValue(disablePrimary), json_spirit::Output_options::none); @@ -944,7 +947,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) { } //because we protect a majority of coordinators from being killed, it is better to run with low numbers of coordinators to prevent too many processes from being protected - coordinators = BUGGIFY ? g_random->randomInt(1, machine_count+1) : 1; + coordinators = ( minimumRegions <= 1 && BUGGIFY ) ? g_random->randomInt(1, machine_count+1) : 1; if(minimumReplication > 1 && datacenters == 3) { //low latency tests in 3 data hall mode need 2 other data centers with 2 machines each to avoid waiting for logs to recover. @@ -961,10 +964,10 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) { void setupSimulatedSystem( vector> *systemActors, std::string baseFolder, int* pTesterCount, Optional *pConnString, - Standalone *pStartingConfiguration, int extraDB, int minimumReplication, Reference tlsOptions) + Standalone *pStartingConfiguration, int extraDB, int minimumReplication, int minimumRegions, Reference tlsOptions) { // SOMEDAY: this does not test multi-interface configurations - SimulationConfig simconfig(extraDB, minimumReplication); + SimulationConfig simconfig(extraDB, minimumReplication, minimumRegions); StatusObject startingConfigJSON = simconfig.db.toJSON(true); std::string startingConfigString = "new"; for( auto kv : startingConfigJSON) { @@ -1038,14 +1041,26 @@ void setupSimulatedSystem( vector> *systemActors, std::string baseF TEST( !sslEnabled ); // SSL disabled vector coordinatorAddresses; - for( int dc = 0; dc < dataCenters; dc++ ) { - int machines = machineCount / dataCenters + (dc < machineCount % dataCenters); // add remainder of machines to first datacenter - int dcCoordinators = coordinatorCount / dataCenters + (dc < coordinatorCount%dataCenters); + if(minimumRegions > 1) { + //do not put coordinators in the primary region so that we can kill that region safely + int nonPrimaryDcs = dataCenters/2; + for( int dc = 1; dc < dataCenters; dc+=2 ) { + int dcCoordinators = coordinatorCount / nonPrimaryDcs + ((dc-1)/2 < coordinatorCount%nonPrimaryDcs); + for(int m = 0; m < dcCoordinators; m++) { + uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m; + coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled)); + TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back()); + } + } + } else { + for( int dc = 0; dc < dataCenters; dc++ ) { + int dcCoordinators = coordinatorCount / dataCenters + (dc < coordinatorCount%dataCenters); - for(int m = 0; m < dcCoordinators; m++) { - uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m; - coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled)); - TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back()); + for(int m = 0; m < dcCoordinators; m++) { + uint32_t ip = 2<<24 | dc<<16 | 1<<8 | m; + coordinatorAddresses.push_back(NetworkAddress(ip, 1, true, sslEnabled)); + TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back()); + } } } @@ -1184,7 +1199,7 @@ void setupSimulatedSystem( vector> *systemActors, std::string baseF .detail("StartingConfiguration", pStartingConfiguration->toString()); } -void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication) { +void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication, int &minimumRegions) { std::ifstream ifs; ifs.open(testFile, std::ifstream::in); if (!ifs.good()) @@ -1212,6 +1227,10 @@ void checkExtraDB(const char *testFile, int &extraDB, int &minimumReplication) { if (attrib == "minimumReplication") { sscanf( value.c_str(), "%d", &minimumReplication ); } + + if (attrib == "minimumRegions") { + sscanf( value.c_str(), "%d", &minimumRegions ); + } } ifs.close(); @@ -1224,7 +1243,8 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot state int testerCount = 1; state int extraDB = 0; state int minimumReplication = 0; - checkExtraDB(testFile, extraDB, minimumReplication); + state int minimumRegions = 0; + checkExtraDB(testFile, extraDB, minimumReplication, minimumRegions); Void _ = wait( g_simulator.onProcess( g_simulator.newProcess( "TestSystem", 0x01010101, 1, LocalityData(Optional>(), Standalone(g_random->randomUniqueID().toString()), Optional>(), Optional>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) ); @@ -1243,7 +1263,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot } else { g_expect_full_pointermap = 1; - setupSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, minimumReplication, tlsOptions ); + setupSimulatedSystem( &systemActors, dataFolder, &testerCount, &connFile, &startingConfiguration, extraDB, minimumReplication, minimumRegions, tlsOptions ); Void _ = wait( delay(1.0) ); // FIXME: WHY!!! //wait for machines to boot } std::string clusterFileDir = joinPath( dataFolder, g_random->randomUniqueID().toString() ); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 93a9a2060f..48502ab938 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -258,8 +258,6 @@ struct TLogData : NonCopyable { int64_t overheadBytesInput; int64_t overheadBytesDurable; - Version prevVersion; - struct peekTrackerData { std::map> sequence_version; double lastUpdate; @@ -277,7 +275,7 @@ struct TLogData : NonCopyable { TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference> const& dbInfo) : dbgid(dbgid), instanceID(g_random->randomUniqueID().first()), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), - dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0), + dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0), updatePersist(Void()), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS) { @@ -1204,7 +1202,6 @@ ACTOR Future tLogCommit( } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors - self->prevVersion = logData->version.get(); logData->version.set( req.version ); if(req.debugID.present()) @@ -1350,14 +1347,14 @@ ACTOR Future cleanupPeekTrackers( TLogData* self ) { } } -void getQueuingMetrics( TLogData* self, TLogQueuingMetricsRequest const& req ) { +void getQueuingMetrics( TLogData* self, Reference logData, TLogQueuingMetricsRequest const& req ) { TLogQueuingMetricsReply reply; reply.localTime = now(); reply.instanceID = self->instanceID; reply.bytesInput = self->bytesInput; reply.bytesDurable = self->bytesDurable; reply.storageBytes = self->persistentData->getStorageBytes(); - reply.v = self->prevVersion; + reply.v = logData->queueCommittedVersion.get(); req.reply.send( reply ); } @@ -1409,7 +1406,7 @@ ACTOR Future serveTLogInterface( TLogData* self, TLogInterface tli, Refere logData->addActor.send( tLogLock(self, reply, logData) ); } when (TLogQueuingMetricsRequest req = waitNext(tli.getQueuingMetrics.getFuture())) { - getQueuingMetrics(self, req); + getQueuingMetrics(self, logData, req); } when (TLogConfirmRunningRequest req = waitNext(tli.confirmRunning.getFuture())){ if (req.debugID.present() ) { @@ -1516,8 +1513,6 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors - //FIXME: could we just use the ver and lastVer variables, or replace them with this? - self->prevVersion = logData->version.get(); logData->version.set( ver ); Void _ = wait( yield(TaskTLogCommit) ); } @@ -1554,8 +1549,6 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors - //FIXME: could we just use the ver and lastVer variables, or replace them with this? - self->prevVersion = logData->version.get(); logData->version.set( ver ); Void _ = wait( yield(TaskTLogCommit) ); } @@ -1975,7 +1968,6 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit self->largeDiskQueueCommitBytes.set(true); } - self->prevVersion = logData->version.get(); logData->version.set( req.recoverAt ); } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index c4ab2f406c..1486b187f3 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -809,9 +809,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlocality == tagLocalitySpecial || t->locality == tag.locality || tag.locality < 0) { + if(t->locality == tagLocalitySpecial || t->locality == tag.locality || (tag.locality < 0 && ((popLocality == tagLocalityInvalid) == t->isLocal))) { for(auto& log : t->logServers) { Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first; if (prev < upTo) @@ -1200,13 +1199,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted= 0) { - bool remoteIsLocal = false; auto copiedLogs = modifiedState.tLogs; for(auto& coreSet : copiedLogs) { if(coreSet.locality != primaryLocality && coreSet.locality >= 0) { foundRemote = true; remoteLocality = coreSet.locality; - remoteIsLocal = coreSet.isLocal; modifiedState.tLogs.clear(); modifiedState.tLogs.push_back(coreSet); modifiedState.tLogs[0].isLocal = true; @@ -1216,14 +1213,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted= tagLocalitySpecial) { foundRemote = true; remoteLocality = coreSet.locality; - remoteIsLocal = coreSet.isLocal; if(coreSet.isLocal) { modifiedState.tLogs = modifiedState.oldTLogData[0].tLogs; modifiedState.logRouterTags = modifiedState.oldTLogData[0].logRouterTags; @@ -1242,14 +1236,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 1) { modifiedState.oldTLogData[i].tLogs.clear(); modifiedState.oldTLogData[i].tLogs.push_back(coreSet); modifiedState.oldTLogData[i].tLogs[0].isLocal = true; diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index 6160eef365..368e6b2f61 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -63,6 +63,7 @@ + true diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 6249237b7d..55c7e15281 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -27,6 +27,9 @@ workloads + + workloads + workloads @@ -368,4 +371,4 @@ {de5e282f-8d97-4054-b795-0a75b772326f} - \ No newline at end of file + diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index cf5b2f354f..e163b141a5 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1282,8 +1282,9 @@ ACTOR Future masterCore( Reference self ) { Void _ = wait(self->cstateUpdated.getFuture()); debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion); - if( debugResult ) - TraceEvent(SevError, "DBRecoveryDurabilityError"); + if( debugResult ) { + TraceEvent(self->forceRecovery ? SevWarn : SevError, "DBRecoveryDurabilityError"); + } TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion); diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index be6cf740b2..c89c1a1f33 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -959,6 +959,8 @@ vector readTests( ifstream& ifs ) { TraceEvent("TestParserTest").detail("ParsedExtraDB", ""); } else if( attrib == "minimumReplication" ) { TraceEvent("TestParserTest").detail("ParsedMinimumReplication", ""); + } else if( attrib == "minimumRegions" ) { + TraceEvent("TestParserTest").detail("ParsedMinimumRegions", ""); } else if( attrib == "buggify" ) { TraceEvent("TestParserTest").detail("ParsedBuggify", ""); } else if( attrib == "checkOnly" ) { diff --git a/fdbserver/workloads/KillRegion.actor.cpp b/fdbserver/workloads/KillRegion.actor.cpp new file mode 100644 index 0000000000..235b1ea5a9 --- /dev/null +++ b/fdbserver/workloads/KillRegion.actor.cpp @@ -0,0 +1,104 @@ +/* + * KillRegion.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/actorcompiler.h" +#include "fdbclient/NativeAPI.h" +#include "fdbserver/TesterInterface.h" +#include "fdbserver/WorkerInterface.h" +#include "workloads.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/ManagementAPI.h" + +struct KillRegionWorkload : TestWorkload { + bool enabled; + double testDuration; + + KillRegionWorkload( WorkloadContext const& wcx ) + : TestWorkload(wcx) + { + enabled = !clientId && g_network->isSimulated(); // only do this on the "first" client, and only when in simulation + testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 ); + g_simulator.usableRegions = 1; + } + + virtual std::string description() { return "KillRegionWorkload"; } + virtual Future setup( Database const& cx ) { + if(enabled) { + return _setup( this, cx ); + } + return Void(); + } + virtual Future start( Database const& cx ) { + if(enabled) { + return killRegion( this, cx ); + } + return Void(); + } + virtual Future check( Database const& cx ) { return true; } + virtual void getMetrics( vector& m ) { + } + + ACTOR static Future _setup( KillRegionWorkload *self, Database cx ) { + TraceEvent("ForceRecovery_DisablePrimaryBegin"); + ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.disablePrimary ) ); + TraceEvent("ForceRecovery_WaitForRemote"); + Void _ = wait( waitForPrimaryDC(cx, LiteralStringRef("1")) ); + TraceEvent("ForceRecovery_DisablePrimaryComplete"); + return Void(); + } + + ACTOR static Future killRegion( KillRegionWorkload *self, Database cx ) { + ASSERT( g_network->isSimulated() ); + TraceEvent("ForceRecovery_DisableRemoteBegin"); + ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.disableRemote ) ); + TraceEvent("ForceRecovery_WaitForPrimary"); + Void _ = wait( waitForPrimaryDC(cx, LiteralStringRef("0")) ); + TraceEvent("ForceRecovery_DisableRemoteComplete"); + ConfigurationResult::Type _ = wait( changeConfig( cx, g_simulator.originalRegions ) ); + TraceEvent("ForceRecovery_RestoreOriginalComplete"); + Void _ = wait( delay( g_random->random01() * self->testDuration ) ); + + g_simulator.killDataCenter( LiteralStringRef("0"), ISimulator::RebootAndDelete, true ); + g_simulator.killDataCenter( LiteralStringRef("2"), ISimulator::RebootAndDelete, true ); + g_simulator.killDataCenter( LiteralStringRef("4"), ISimulator::RebootAndDelete, true ); + + loop { + TraceEvent("ForceRecovery_Begin"); + Void _ = wait( forceRecovery(cx->cluster->getConnectionFile()) ); + TraceEvent("ForceRecovery_Attempted"); + state Transaction tr(cx); + try { + choose { + when( Version _ = wait(tr.getReadVersion()) ) { + TraceEvent("ForceRecovery_Complete"); + break; + } + when( Void _ = wait(delay(120.0)) ) {} + } + } catch( Error &e ) { + Void _ = wait( tr.onError(e) ); + } + } + + return Void(); + } +}; + +WorkloadFactory KillRegionWorkloadFactory("KillRegion"); diff --git a/tests/fast/KillRegionCycle.txt b/tests/fast/KillRegionCycle.txt new file mode 100644 index 0000000000..2768429b6a --- /dev/null +++ b/tests/fast/KillRegionCycle.txt @@ -0,0 +1,12 @@ +testTitle=KillRegionCycle + testName=Cycle + nodeCount=30000 + transactionsPerSecond=2500.0 + testDuration=30.0 + expectedRate=0 + clearAfterTest=false + + testName=KillRegion + testDuration=30.0 + +minimumRegions=2