fix: a forced recovery needed to force the data distributor to restart
This commit is contained in:
parent
8ed89fd711
commit
3a572b010f
|
@ -287,7 +287,7 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
|
|||
}
|
||||
|
||||
state Future<Void> tooLong = delay(4.5);
|
||||
state std::string versionKey = g_random->randomUniqueID().toString();
|
||||
state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned());
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
|
@ -433,8 +433,8 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
|
|||
for(auto i=m.begin(); i!=m.end(); ++i)
|
||||
tr.set( StringRef(i->first), StringRef(i->second) );
|
||||
|
||||
tr.addReadConflictRange( singleKeyRange(configVersionKey) );
|
||||
tr.set( configVersionKey, versionKey );
|
||||
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
|
||||
tr.set( moveKeysLockOwnerKey, versionKey );
|
||||
|
||||
wait( tr.commit() );
|
||||
break;
|
||||
|
@ -702,7 +702,7 @@ ConfigureAutoResult parseConfig( StatusObject const& status ) {
|
|||
|
||||
ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoResult conf ) {
|
||||
state Transaction tr(cx);
|
||||
state std::string versionKey = g_random->randomUniqueID().toString();
|
||||
state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned());
|
||||
|
||||
if(!conf.address_class.size())
|
||||
return ConfigurationResult::INCOMPLETE_CONFIGURATION; //FIXME: correct return type
|
||||
|
@ -752,8 +752,8 @@ ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoRe
|
|||
tr.set(kv.first, kv.second);
|
||||
}
|
||||
|
||||
tr.addReadConflictRange( singleKeyRange(configVersionKey) );
|
||||
tr.set( configVersionKey, versionKey );
|
||||
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
|
||||
tr.set( moveKeysLockOwnerKey, versionKey );
|
||||
|
||||
wait( tr.commit() );
|
||||
return ConfigurationResult::SUCCESS;
|
||||
|
@ -1132,7 +1132,7 @@ Reference<IQuorumChange> autoQuorumChange( int desired ) { return Reference<IQuo
|
|||
|
||||
ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers ) {
|
||||
state Transaction tr(cx);
|
||||
state std::string versionKey = g_random->randomUniqueID().toString();
|
||||
state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned());
|
||||
state std::string excludeVersionKey = g_random->randomUniqueID().toString();
|
||||
loop {
|
||||
try {
|
||||
|
@ -1141,8 +1141,8 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers
|
|||
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
|
||||
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); //To conflict with parallel includeServers
|
||||
tr.addReadConflictRange( singleKeyRange(configVersionKey) );
|
||||
tr.set( configVersionKey, versionKey );
|
||||
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
|
||||
tr.set( moveKeysLockOwnerKey, versionKey );
|
||||
tr.set( excludedServersVersionKey, excludeVersionKey );
|
||||
for(auto& s : servers)
|
||||
tr.set( encodeExcludedServersKey(s), StringRef() );
|
||||
|
@ -1160,7 +1160,7 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers
|
|||
ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers ) {
|
||||
state bool includeAll = false;
|
||||
state Transaction tr(cx);
|
||||
state std::string versionKey = g_random->randomUniqueID().toString();
|
||||
state Key versionKey = BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned());
|
||||
state std::string excludeVersionKey = g_random->randomUniqueID().toString();
|
||||
loop {
|
||||
try {
|
||||
|
@ -1171,9 +1171,9 @@ ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers
|
|||
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY
|
||||
tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
|
||||
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) );
|
||||
tr.addReadConflictRange( singleKeyRange(configVersionKey) );
|
||||
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
|
||||
|
||||
tr.set( configVersionKey, versionKey );
|
||||
tr.set( moveKeysLockOwnerKey, versionKey );
|
||||
tr.set( excludedServersVersionKey, excludeVersionKey );
|
||||
|
||||
for(auto& s : servers ) {
|
||||
|
|
|
@ -381,8 +381,6 @@ std::string encodeExcludedServersKey( AddressExclusion const& addr ) {
|
|||
return excludedServersPrefix.toString() + as;
|
||||
}
|
||||
|
||||
const KeyRef configVersionKey = LiteralStringRef("\xff/conf/confChange");
|
||||
|
||||
const KeyRangeRef workerListKeys( LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0") );
|
||||
const KeyRef workerListPrefix = workerListKeys.begin;
|
||||
|
||||
|
|
|
@ -133,11 +133,6 @@ extern const KeyRef excludedServersVersionKey; // The value of this key shall b
|
|||
const AddressExclusion decodeExcludedServersKey( KeyRef const& key ); // where key.startsWith(excludedServersPrefix)
|
||||
std::string encodeExcludedServersKey( AddressExclusion const& );
|
||||
|
||||
// "\xff/conf/confChange" := ""
|
||||
// This is the key representing the version of the configuration, which should be updated for each
|
||||
// new configuration.
|
||||
extern const KeyRef configVersionKey;
|
||||
|
||||
// "\xff/workers/[[processID]]" := ""
|
||||
// Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster
|
||||
// and are currently (recently) available
|
||||
|
|
|
@ -149,7 +149,7 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
|
|||
}
|
||||
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
|
||||
if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().castTo<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change
|
||||
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey && m.param1 != configVersionKey) {
|
||||
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) {
|
||||
auto t = txnStateStore->readValue(m.param1).get();
|
||||
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("ToCommit", toCommit!=NULL);
|
||||
if(confChange) *confChange = true;
|
||||
|
|
|
@ -3105,48 +3105,20 @@ ACTOR Future<Void> pollMoveKeysLock( Database cx, MoveKeysLock lock ) {
|
|||
|
||||
ACTOR Future<Void> dataDistribution(
|
||||
Reference<AsyncVar<struct ServerDBInfo>> db,
|
||||
UID myId, DatabaseConfiguration configuration,
|
||||
UID myId,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
std::vector<Optional<Key>> primaryDcId,
|
||||
std::vector<Optional<Key>> remoteDcIds,
|
||||
double* lastLimited)
|
||||
{
|
||||
state Database cx = openDBOnServer(db, TaskDataDistributionLaunch, true, true);
|
||||
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
|
||||
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
|
||||
Standalone<RangeResultRef> replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
for(auto& kv : replicaKeys) {
|
||||
auto dcId = decodeDatacenterReplicasKey(kv.key);
|
||||
auto replicas = decodeDatacenterReplicasValue(kv.value);
|
||||
if((primaryDcId.size() && primaryDcId[0] == dcId) || (remoteDcIds.size() && remoteDcIds[0] == dcId && configuration.usableRegions > 1)) {
|
||||
if(replicas > configuration.storageTeamSize) {
|
||||
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
|
||||
}
|
||||
} else {
|
||||
tr.clear(kv.key);
|
||||
}
|
||||
}
|
||||
|
||||
wait(tr.commit());
|
||||
break;
|
||||
}
|
||||
catch(Error &e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) );
|
||||
//ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE );
|
||||
|
||||
//wait(debugCheckCoalescing(cx));
|
||||
state std::vector<Optional<Key>> primaryDcId;
|
||||
state std::vector<Optional<Key>> remoteDcIds;
|
||||
state DatabaseConfiguration configuration;
|
||||
|
||||
loop {
|
||||
try {
|
||||
|
@ -3154,6 +3126,50 @@ ACTOR Future<Void> dataDistribution(
|
|||
TraceEvent("DDInitTakingMoveKeysLock", myId);
|
||||
state MoveKeysLock lock = wait( takeMoveKeysLock( cx, myId ) );
|
||||
TraceEvent("DDInitTookMoveKeysLock", myId);
|
||||
|
||||
DatabaseConfiguration _configuration = wait( getDatabaseConfiguration(cx) );
|
||||
configuration = _configuration;
|
||||
primaryDcId.clear();
|
||||
remoteDcIds.clear();
|
||||
const std::vector<RegionInfo>& regions = configuration.regions;
|
||||
if ( configuration.regions.size() > 0 ) {
|
||||
primaryDcId.push_back( regions[0].dcId );
|
||||
}
|
||||
if ( configuration.regions.size() > 1 ) {
|
||||
remoteDcIds.push_back( regions[1].dcId );
|
||||
}
|
||||
|
||||
TraceEvent("DDInitGotConfiguration", myId).detail("Conf", configuration.toString());
|
||||
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
|
||||
Standalone<RangeResultRef> replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
for(auto& kv : replicaKeys) {
|
||||
auto dcId = decodeDatacenterReplicasKey(kv.key);
|
||||
auto replicas = decodeDatacenterReplicasValue(kv.value);
|
||||
if((primaryDcId.size() && primaryDcId[0] == dcId) || (remoteDcIds.size() && remoteDcIds[0] == dcId && configuration.usableRegions > 1)) {
|
||||
if(replicas > configuration.storageTeamSize) {
|
||||
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
|
||||
}
|
||||
} else {
|
||||
tr.clear(kv.key);
|
||||
}
|
||||
}
|
||||
|
||||
wait(tr.commit());
|
||||
break;
|
||||
}
|
||||
catch(Error &e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("DDInitUpdatedReplicaKeys", myId);
|
||||
state Reference<InitialDataDistribution> initData = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>() ) );
|
||||
if(initData->shards.size() > 1) {
|
||||
TraceEvent("DDInitGotInitialDD", myId)
|
||||
|
@ -3277,64 +3293,13 @@ ACTOR Future<Void> dataDistribution(
|
|||
|
||||
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
|
||||
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
|
||||
Reference<AsyncVar<DatabaseConfiguration>> configuration;
|
||||
std::vector<Optional<Key>> primaryDcId;
|
||||
std::vector<Optional<Key>> remoteDcIds;
|
||||
UID ddId;
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
||||
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, Reference<AsyncVar<DatabaseConfiguration>> const& dbConfig, UID id)
|
||||
: dbInfo(db), configuration(dbConfig), ddId(id) {}
|
||||
|
||||
void refreshDcIds() {
|
||||
primaryDcId.clear();
|
||||
remoteDcIds.clear();
|
||||
|
||||
const std::vector<RegionInfo>& regions = configuration->get().regions;
|
||||
TraceEvent ev("DataDistributor", ddId);
|
||||
if ( regions.size() > 0 ) {
|
||||
primaryDcId.push_back( regions[0].dcId );
|
||||
ev.detail("PrimaryDcID", regions[0].dcId.toHexString());
|
||||
}
|
||||
if ( regions.size() > 1 ) {
|
||||
remoteDcIds.push_back( regions[1].dcId );
|
||||
ev.detail("SecondaryDcID", regions[1].dcId.toHexString());
|
||||
}
|
||||
}
|
||||
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) : dbInfo(db), ddId(id) {}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> configurationMonitor( Reference<DataDistributorData> self ) {
|
||||
state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true);
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
TraceEvent("DataDistributor_MonitorConfigurationStart", self->ddId);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues( (VectorRef<KeyValueRef>) results );
|
||||
if ( conf != self->configuration->get() ) {
|
||||
TraceEvent("DataDistributor_UpdateConfiguration", self->ddId).detail("Config", conf.toString());
|
||||
self->configuration->set( conf );
|
||||
}
|
||||
|
||||
state Future<Void> watchFuture = tr.watch(configVersionKey);
|
||||
wait( tr.commit() );
|
||||
wait( watchFuture );
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static std::set<int> const& normalDataDistributorErrors() {
|
||||
static std::set<int> s;
|
||||
if (s.empty()) {
|
||||
|
@ -3360,43 +3325,20 @@ static std::set<int> const& normalRateKeeperErrors() {
|
|||
|
||||
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
|
||||
state UID lastClusterControllerID(0,0);
|
||||
state Reference<AsyncVar<DatabaseConfiguration>> configuration( new AsyncVar<DatabaseConfiguration>(DatabaseConfiguration()) );
|
||||
state Reference<DataDistributorData> self( new DataDistributorData(db, configuration, di.id()) );
|
||||
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
|
||||
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
|
||||
|
||||
TraceEvent("DataDistributor_Starting", di.id());
|
||||
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
|
||||
self->addActor.send( configurationMonitor( self ) );
|
||||
|
||||
loop choose {
|
||||
when ( wait( self->configuration->onChange() ) ) {
|
||||
self->refreshDcIds();
|
||||
break;
|
||||
}
|
||||
when ( wait(self->dbInfo->onChange()) ) {}
|
||||
}
|
||||
|
||||
try {
|
||||
TraceEvent("DataDistributor_Running", di.id());
|
||||
state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
||||
state double lastLimited = 0;
|
||||
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
||||
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
|
||||
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), ddStorageServerChanges, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
||||
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
|
||||
|
||||
loop choose {
|
||||
when ( wait( self->configuration->onChange() ) ) {
|
||||
TraceEvent("DataDistributor_Restart", di.id())
|
||||
.detail("ClusterControllerID", lastClusterControllerID)
|
||||
.detail("Configuration", self->configuration->get().toString());
|
||||
self->refreshDcIds();
|
||||
distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
||||
}
|
||||
when ( wait( collection ) ) {
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
when ( wait( distributor ) ) {}
|
||||
}
|
||||
wait( distributor || collection );
|
||||
}
|
||||
catch ( Error &err ) {
|
||||
if ( normalDataDistributorErrors().count(err.code()) == 0 ) {
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbrpc/Smoother.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
enum limitReason_t {
|
||||
|
@ -505,11 +506,35 @@ void updateRate( Ratekeeper* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> configurationMonitor( Ratekeeper* self, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
state Database cx = openDBOnServer(dbInfo, TaskDefaultEndpoint, true, true);
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results );
|
||||
|
||||
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
|
||||
wait( tr.commit() );
|
||||
wait( watchFuture );
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> rateKeeper(
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
FutureStream< struct GetRateInfoRequest > getRateInfo,
|
||||
DatabaseConfiguration configuration,
|
||||
double* lastLimited)
|
||||
{
|
||||
state Ratekeeper self;
|
||||
|
@ -519,7 +544,7 @@ ACTOR Future<Void> rateKeeper(
|
|||
state std::vector<Future<Void>> tlogTrackers;
|
||||
state std::vector<TLogInterface> tlogInterfs;
|
||||
state Promise<Void> err;
|
||||
self.configuration = configuration;
|
||||
state Future<Void> configMonitor = configurationMonitor(&self, dbInfo);
|
||||
self.lastLimited = lastLimited;
|
||||
|
||||
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
|
||||
|
@ -570,6 +595,7 @@ ACTOR Future<Void> rateKeeper(
|
|||
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
|
||||
}
|
||||
}
|
||||
when(wait(configMonitor)) {}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
|
|
@ -30,7 +30,6 @@ Future<Void> rateKeeper(
|
|||
Reference<AsyncVar<struct ServerDBInfo>> const& dbInfo,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges, // actually an input, but we don't want broken_promise
|
||||
FutureStream< struct GetRateInfoRequest > const& getRateInfo,
|
||||
DatabaseConfiguration const& configuration,
|
||||
double* const& lastLimited);
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -1196,7 +1196,7 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
|
|||
self->registrationTrigger.trigger();
|
||||
}
|
||||
|
||||
state Future<Void> watchFuture = tr.watch(configVersionKey);
|
||||
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
break;
|
||||
|
@ -1299,6 +1299,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
|
||||
if(self->forceRecovery) {
|
||||
tr.set(recoveryCommitRequest.arena, rebootWhenDurableKey, StringRef());
|
||||
tr.set(recoveryCommitRequest.arena, moveKeysLockOwnerKey, BinaryWriter::toValue(g_random->randomUniqueID(),Unversioned()));
|
||||
}
|
||||
} else {
|
||||
// Recruit and seed initial shard servers
|
||||
|
|
Loading…
Reference in New Issue