fix: storage servers need to be rebooted when increasing replication so that clients become aware that new options are available
This commit is contained in:
parent
1cf5689d62
commit
19ae063b66
|
@ -395,6 +395,8 @@ const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCom
|
|||
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
|
||||
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");
|
||||
const KeyRef lastEpochEndPrivateKey = LiteralStringRef("\xff\xff/globals/lastEpochEnd");
|
||||
const KeyRef rebootWhenDurableKey = LiteralStringRef("\xff/globals/rebootWhenDurable");
|
||||
const KeyRef rebootWhenDurablePrivateKey = LiteralStringRef("\xff\xff/globals/rebootWhenDurable");
|
||||
const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled");
|
||||
const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled");
|
||||
|
||||
|
|
|
@ -148,6 +148,8 @@ std::pair<vector<std::pair<UID, NetworkAddress>>,vector<std::pair<UID, NetworkAd
|
|||
extern const KeyRef globalKeysPrefix;
|
||||
extern const KeyRef lastEpochEndKey;
|
||||
extern const KeyRef lastEpochEndPrivateKey;
|
||||
extern const KeyRef rebootWhenDurableKey;
|
||||
extern const KeyRef rebootWhenDurablePrivateKey;
|
||||
extern const KeyRef fastLoggingEnabled;
|
||||
extern const KeyRef fastLoggingEnabledPrivateKey;
|
||||
|
||||
|
|
|
@ -2100,10 +2100,18 @@ ACTOR Future<Void> updateReplicasKey(DDTeamCollection* self, Optional<Key> dcId)
|
|||
state Transaction tr(self->cx);
|
||||
loop {
|
||||
try {
|
||||
tr.addReadConflictRange(singleKeyRange(datacenterReplicasKeyFor(dcId)));
|
||||
Optional<Value> val = wait( tr.get(datacenterReplicasKeyFor(dcId)) );
|
||||
state int oldReplicas = val.present() ? decodeDatacenterReplicasValue(val.get()) : 0;
|
||||
if(oldReplicas == self->configuration.storageTeamSize) {
|
||||
TraceEvent("DDUpdatedAlready", self->masterId).detail("DcId", printable(dcId)).detail("Replicas", self->configuration.storageTeamSize);
|
||||
return Void();
|
||||
}
|
||||
if(oldReplicas < self->configuration.storageTeamSize) {
|
||||
tr.set(rebootWhenDurableKey, StringRef());
|
||||
}
|
||||
tr.set(datacenterReplicasKeyFor(dcId), datacenterReplicasValue(self->configuration.storageTeamSize));
|
||||
Void _ = wait( tr.commit() );
|
||||
TraceEvent("DDUpdatedReplicas", self->masterId).detail("DcId", printable(dcId)).detail("Replicas", self->configuration.storageTeamSize);
|
||||
TraceEvent("DDUpdatedReplicas", self->masterId).detail("DcId", printable(dcId)).detail("Replicas", self->configuration.storageTeamSize).detail("OldReplicas", oldReplicas);
|
||||
return Void();
|
||||
} catch( Error &e ) {
|
||||
Void _ = wait( tr.onError(e) );
|
||||
|
|
|
@ -347,6 +347,7 @@ public:
|
|||
NotifiedVersion desiredOldestVersion; // We can increase oldestVersion (and then durableVersion) to this version when the disk permits
|
||||
NotifiedVersion oldestVersion; // See also storageVersion()
|
||||
NotifiedVersion durableVersion; // At least this version will be readable from storage after a power failure
|
||||
Version rebootAfterDurableVersion;
|
||||
|
||||
Deque<std::pair<Version,Version>> recoveryVersionSkips;
|
||||
int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage server
|
||||
|
@ -465,6 +466,7 @@ public:
|
|||
: instanceID(g_random->randomUniqueID().first()),
|
||||
storage(this, storage), db(db),
|
||||
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
|
||||
durableInProgress(Void()),
|
||||
versionLag(0),
|
||||
updateEagerReads(0),
|
||||
|
@ -2252,16 +2254,6 @@ struct OrderByVersion {
|
|||
}
|
||||
};
|
||||
|
||||
bool containsRollback( VersionUpdateRef const& changes, Version& rollbackVersion ) {
|
||||
for(auto it = changes.mutations.begin(); it; ++it)
|
||||
if (it->type == it->SetValue && it->param1 == lastEpochEndKey) {
|
||||
BinaryReader br(it->param2, Unversioned());
|
||||
br >> rollbackVersion;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
class StorageUpdater {
|
||||
public:
|
||||
StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {}
|
||||
|
@ -2347,6 +2339,9 @@ private:
|
|||
bool matchesThisServer = decodeServerTagKey(m.param1.substr(1)) == data->thisServerID;
|
||||
if( (m.type == MutationRef::SetValue && !matchesThisServer) || (m.type == MutationRef::ClearRange && matchesThisServer) )
|
||||
throw worker_removed();
|
||||
} else if (m.type == MutationRef::SetValue && m.param1 == rebootWhenDurablePrivateKey) {
|
||||
data->rebootAfterDurableVersion = currentVersion;
|
||||
TraceEvent("RebootWhenDurableSet", data->thisServerID).detail("DurableVersion", data->durableVersion.get()).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||
} else {
|
||||
ASSERT(false); // Unknown private mutation
|
||||
}
|
||||
|
@ -2665,7 +2660,12 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
Void _ = wait( durable );
|
||||
|
||||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
||||
|
||||
|
||||
if(newOldestVersion > data->rebootAfterDurableVersion) {
|
||||
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("NewOldestVersion", newOldestVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||
throw please_reboot();
|
||||
}
|
||||
|
||||
durableInProgress.send(Void());
|
||||
Void _ = wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
|
||||
|
||||
|
|
Loading…
Reference in New Issue