From 3a572b010ff3501868a99792af6f118e67321715 Mon Sep 17 00:00:00 2001 From: Evan Tschannen <ejt@apple.com> Date: Tue, 19 Feb 2019 16:04:52 -0800 Subject: [PATCH] fix: a forced recovery needed to force the data distributor to restart --- fdbclient/ManagementAPI.actor.cpp | 24 ++-- fdbclient/SystemData.cpp | 2 - fdbclient/SystemData.h | 5 - fdbserver/ApplyMetadataMutation.h | 2 +- fdbserver/DataDistribution.actor.cpp | 164 +++++++++------------------ fdbserver/Ratekeeper.actor.cpp | 30 ++++- fdbserver/Ratekeeper.h | 3 +- fdbserver/masterserver.actor.cpp | 3 +- 8 files changed, 97 insertions(+), 136 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 79973b33ee..7589405274 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -287,7 +287,7 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std: } state Future<Void> tooLong = delay(4.5); - state std::string versionKey = g_random->randomUniqueID().toString(); + state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned()); loop { try { tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); @@ -433,8 +433,8 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std: for(auto i=m.begin(); i!=m.end(); ++i) tr.set( StringRef(i->first), StringRef(i->second) ); - tr.addReadConflictRange( singleKeyRange(configVersionKey) ); - tr.set( configVersionKey, versionKey ); + tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); + tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); break; @@ -702,7 +702,7 @@ ConfigureAutoResult parseConfig( StatusObject const& status ) { ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoResult conf ) { state Transaction tr(cx); - state std::string versionKey = g_random->randomUniqueID().toString(); + state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned()); if(!conf.address_class.size()) return ConfigurationResult::INCOMPLETE_CONFIGURATION; //FIXME: correct return type @@ -752,8 +752,8 @@ ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoRe tr.set(kv.first, kv.second); } - tr.addReadConflictRange( singleKeyRange(configVersionKey) ); - tr.set( configVersionKey, versionKey ); + tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); + tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); return ConfigurationResult::SUCCESS; @@ -1132,7 +1132,7 @@ Reference<IQuorumChange> autoQuorumChange( int desired ) { return Reference<IQuo ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers ) { state Transaction tr(cx); - state std::string versionKey = g_random->randomUniqueID().toString(); + state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned()); state std::string excludeVersionKey = g_random->randomUniqueID().toString(); loop { try { @@ -1141,8 +1141,8 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); //To conflict with parallel includeServers - tr.addReadConflictRange( singleKeyRange(configVersionKey) ); - tr.set( configVersionKey, versionKey ); + tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); + tr.set( moveKeysLockOwnerKey, versionKey ); tr.set( excludedServersVersionKey, excludeVersionKey ); for(auto& s : servers) tr.set( encodeExcludedServersKey(s), StringRef() ); @@ -1160,7 +1160,7 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers ) { state bool includeAll = false; state Transaction tr(cx); - state std::string versionKey = g_random->randomUniqueID().toString(); + state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned()); state std::string excludeVersionKey = g_random->randomUniqueID().toString(); loop { try { @@ -1171,9 +1171,9 @@ ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers // includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY ); tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); - tr.addReadConflictRange( singleKeyRange(configVersionKey) ); + tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); - tr.set( configVersionKey, versionKey ); + tr.set( moveKeysLockOwnerKey, versionKey ); tr.set( excludedServersVersionKey, excludeVersionKey ); for(auto& s : servers ) { diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index bde1ddaa3d..ea4be9c8d8 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -381,8 +381,6 @@ std::string encodeExcludedServersKey( AddressExclusion const& addr ) { return excludedServersPrefix.toString() + as; } -const KeyRef configVersionKey = LiteralStringRef("\xff/conf/confChange"); - const KeyRangeRef workerListKeys( LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0") ); const KeyRef workerListPrefix = workerListKeys.begin; diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index d3dff462f5..e136d83986 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -133,11 +133,6 @@ extern const KeyRef excludedServersVersionKey; // The value of this key shall b const AddressExclusion decodeExcludedServersKey( KeyRef const& key ); // where key.startsWith(excludedServersPrefix) std::string encodeExcludedServersKey( AddressExclusion const& ); -// "\xff/conf/confChange" := "" -// This is the key representing the version of the configuration, which should be updated for each -// new configuration. -extern const KeyRef configVersionKey; - // "\xff/workers/[[processID]]" := "" // Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster // and are currently (recently) available diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index c8e4343be3..776ae371f6 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -149,7 +149,7 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut } else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) { if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().castTo<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change - if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey && m.param1 != configVersionKey) { + if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) { auto t = txnStateStore->readValue(m.param1).get(); TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("ToCommit", toCommit!=NULL); if(confChange) *confChange = true; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b010dcfe84..bc46c15bb5 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3105,48 +3105,20 @@ ACTOR Future<Void> pollMoveKeysLock( Database cx, MoveKeysLock lock ) { ACTOR Future<Void> dataDistribution( Reference<AsyncVar<struct ServerDBInfo>> db, - UID myId, DatabaseConfiguration configuration, + UID myId, PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges, - std::vector<Optional<Key>> primaryDcId, - std::vector<Optional<Key>> remoteDcIds, double* lastLimited) { state Database cx = openDBOnServer(db, TaskDataDistributionLaunch, true, true); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; - state Transaction tr(cx); - loop { - try { - tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); - tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); - - Standalone<RangeResultRef> replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY)); - - for(auto& kv : replicaKeys) { - auto dcId = decodeDatacenterReplicasKey(kv.key); - auto replicas = decodeDatacenterReplicasValue(kv.value); - if((primaryDcId.size() && primaryDcId[0] == dcId) || (remoteDcIds.size() && remoteDcIds[0] == dcId && configuration.usableRegions > 1)) { - if(replicas > configuration.storageTeamSize) { - tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize)); - } - } else { - tr.clear(kv.key); - } - } - - wait(tr.commit()); - break; - } - catch(Error &e) { - wait(tr.onError(e)); - } - } - - //cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); //ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE ); //wait(debugCheckCoalescing(cx)); + state std::vector<Optional<Key>> primaryDcId; + state std::vector<Optional<Key>> remoteDcIds; + state DatabaseConfiguration configuration; loop { try { @@ -3154,6 +3126,50 @@ ACTOR Future<Void> dataDistribution( TraceEvent("DDInitTakingMoveKeysLock", myId); state MoveKeysLock lock = wait( takeMoveKeysLock( cx, myId ) ); TraceEvent("DDInitTookMoveKeysLock", myId); + + DatabaseConfiguration _configuration = wait( getDatabaseConfiguration(cx) ); + configuration = _configuration; + primaryDcId.clear(); + remoteDcIds.clear(); + const std::vector<RegionInfo>& regions = configuration.regions; + if ( configuration.regions.size() > 0 ) { + primaryDcId.push_back( regions[0].dcId ); + } + if ( configuration.regions.size() > 1 ) { + remoteDcIds.push_back( regions[1].dcId ); + } + + TraceEvent("DDInitGotConfiguration", myId).detail("Conf", configuration.toString()); + + state Transaction tr(cx); + loop { + try { + tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); + tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); + + Standalone<RangeResultRef> replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY)); + + for(auto& kv : replicaKeys) { + auto dcId = decodeDatacenterReplicasKey(kv.key); + auto replicas = decodeDatacenterReplicasValue(kv.value); + if((primaryDcId.size() && primaryDcId[0] == dcId) || (remoteDcIds.size() && remoteDcIds[0] == dcId && configuration.usableRegions > 1)) { + if(replicas > configuration.storageTeamSize) { + tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize)); + } + } else { + tr.clear(kv.key); + } + } + + wait(tr.commit()); + break; + } + catch(Error &e) { + wait(tr.onError(e)); + } + } + + TraceEvent("DDInitUpdatedReplicaKeys", myId); state Reference<InitialDataDistribution> initData = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>() ) ); if(initData->shards.size() > 1) { TraceEvent("DDInitGotInitialDD", myId) @@ -3277,64 +3293,13 @@ ACTOR Future<Void> dataDistribution( struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> { Reference<AsyncVar<struct ServerDBInfo>> dbInfo; - Reference<AsyncVar<DatabaseConfiguration>> configuration; - std::vector<Optional<Key>> primaryDcId; - std::vector<Optional<Key>> remoteDcIds; UID ddId; PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges; PromiseStream<Future<Void>> addActor; - DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, Reference<AsyncVar<DatabaseConfiguration>> const& dbConfig, UID id) - : dbInfo(db), configuration(dbConfig), ddId(id) {} - - void refreshDcIds() { - primaryDcId.clear(); - remoteDcIds.clear(); - - const std::vector<RegionInfo>& regions = configuration->get().regions; - TraceEvent ev("DataDistributor", ddId); - if ( regions.size() > 0 ) { - primaryDcId.push_back( regions[0].dcId ); - ev.detail("PrimaryDcID", regions[0].dcId.toHexString()); - } - if ( regions.size() > 1 ) { - remoteDcIds.push_back( regions[1].dcId ); - ev.detail("SecondaryDcID", regions[1].dcId.toHexString()); - } - } + DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) : dbInfo(db), ddId(id) {} }; -ACTOR Future<Void> configurationMonitor( Reference<DataDistributorData> self ) { - state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true); - loop { - state ReadYourWritesTransaction tr(cx); - - loop { - try { - TraceEvent("DataDistributor_MonitorConfigurationStart", self->ddId); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); - Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) ); - ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY ); - - DatabaseConfiguration conf; - conf.fromKeyValues( (VectorRef<KeyValueRef>) results ); - if ( conf != self->configuration->get() ) { - TraceEvent("DataDistributor_UpdateConfiguration", self->ddId).detail("Config", conf.toString()); - self->configuration->set( conf ); - } - - state Future<Void> watchFuture = tr.watch(configVersionKey); - wait( tr.commit() ); - wait( watchFuture ); - break; - } catch (Error& e) { - wait( tr.onError(e) ); - } - } - } -} - static std::set<int> const& normalDataDistributorErrors() { static std::set<int> s; if (s.empty()) { @@ -3360,43 +3325,20 @@ static std::set<int> const& normalRateKeeperErrors() { ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) { state UID lastClusterControllerID(0,0); - state Reference<AsyncVar<DatabaseConfiguration>> configuration( new AsyncVar<DatabaseConfiguration>(DatabaseConfiguration()) ); - state Reference<DataDistributorData> self( new DataDistributorData(db, configuration, di.id()) ); + state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) ); state Future<Void> collection = actorCollection( self->addActor.getFuture() ); TraceEvent("DataDistributor_Starting", di.id()); self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) ); - self->addActor.send( configurationMonitor( self ) ); - - loop choose { - when ( wait( self->configuration->onChange() ) ) { - self->refreshDcIds(); - break; - } - when ( wait(self->dbInfo->onChange()) ) {} - } try { TraceEvent("DataDistributor_Running", di.id()); state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges; state double lastLimited = 0; - state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) ); + state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), ddStorageServerChanges, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); + self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) ); - loop choose { - when ( wait( self->configuration->onChange() ) ) { - TraceEvent("DataDistributor_Restart", di.id()) - .detail("ClusterControllerID", lastClusterControllerID) - .detail("Configuration", self->configuration->get().toString()); - self->refreshDcIds(); - distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - } - when ( wait( collection ) ) { - ASSERT(false); - throw internal_error(); - } - when ( wait( distributor ) ) {} - } + wait( distributor || collection ); } catch ( Error &err ) { if ( normalDataDistributorErrors().count(err.code()) == 0 ) { diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 2ebcb88c9a..ee74d1cd74 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -25,6 +25,7 @@ #include "fdbrpc/Smoother.h" #include "fdbserver/ServerDBInfo.h" #include "fdbrpc/simulator.h" +#include "fdbclient/ReadYourWrites.h" #include "flow/actorcompiler.h" // This must be the last #include. enum limitReason_t { @@ -505,11 +506,35 @@ void updateRate( Ratekeeper* self ) { } } +ACTOR Future<Void> configurationMonitor( Ratekeeper* self, Reference<AsyncVar<ServerDBInfo>> dbInfo ) { + state Database cx = openDBOnServer(dbInfo, TaskDefaultEndpoint, true, true); + loop { + state ReadYourWritesTransaction tr(cx); + + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); + Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) ); + ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY ); + + self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results ); + + state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey); + wait( tr.commit() ); + wait( watchFuture ); + break; + } catch (Error& e) { + wait( tr.onError(e) ); + } + } + } +} + ACTOR Future<Void> rateKeeper( Reference<AsyncVar<ServerDBInfo>> dbInfo, PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges, FutureStream< struct GetRateInfoRequest > getRateInfo, - DatabaseConfiguration configuration, double* lastLimited) { state Ratekeeper self; @@ -519,7 +544,7 @@ ACTOR Future<Void> rateKeeper( state std::vector<Future<Void>> tlogTrackers; state std::vector<TLogInterface> tlogInterfs; state Promise<Void> err; - self.configuration = configuration; + state Future<Void> configMonitor = configurationMonitor(&self, dbInfo); self.lastLimited = lastLimited; TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) @@ -570,6 +595,7 @@ ACTOR Future<Void> rateKeeper( tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) ); } } + when(wait(configMonitor)) {} } } return Void(); diff --git a/fdbserver/Ratekeeper.h b/fdbserver/Ratekeeper.h index a034e7720d..282e99f766 100644 --- a/fdbserver/Ratekeeper.h +++ b/fdbserver/Ratekeeper.h @@ -30,7 +30,6 @@ Future<Void> rateKeeper( Reference<AsyncVar<struct ServerDBInfo>> const& dbInfo, PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges, // actually an input, but we don't want broken_promise FutureStream< struct GetRateInfoRequest > const& getRateInfo, - DatabaseConfiguration const& configuration, double* const& lastLimited); -#endif \ No newline at end of file +#endif diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index be773ee8a4..500905a4d6 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1196,7 +1196,7 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) { self->registrationTrigger.trigger(); } - state Future<Void> watchFuture = tr.watch(configVersionKey); + state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey); wait(tr.commit()); wait(watchFuture); break; @@ -1299,6 +1299,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) { if(self->forceRecovery) { tr.set(recoveryCommitRequest.arena, rebootWhenDurableKey, StringRef()); + tr.set(recoveryCommitRequest.arena, moveKeysLockOwnerKey, BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned())); } } else { // Recruit and seed initial shard servers