fix: because of forced recoveries, storage servers in remote regions cannot update their durable version to (lastLogVersion - 5e6), because the lastLogVersion might have jumped due to an epoch end and the recovery version after the forced recovery could be before the epoch end, causing the storage server to want to rollback to a version it does not have on disk
This commit is contained in:
parent
05ca0a10d8
commit
4c35ebdcc6
|
@ -422,6 +422,8 @@ const KeyRef killStorageKey = LiteralStringRef("\xff/globals/killStorage");
|
|||
const KeyRef killStoragePrivateKey = LiteralStringRef("\xff\xff/globals/killStorage");
|
||||
const KeyRef rebootWhenDurableKey = LiteralStringRef("\xff/globals/rebootWhenDurable");
|
||||
const KeyRef rebootWhenDurablePrivateKey = LiteralStringRef("\xff\xff/globals/rebootWhenDurable");
|
||||
const KeyRef primaryLocalityKey = LiteralStringRef("\xff/globals/primaryLocality");
|
||||
const KeyRef primaryLocalityPrivateKey = LiteralStringRef("\xff\xff/globals/primaryLocality");
|
||||
const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled");
|
||||
const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled");
|
||||
|
||||
|
|
|
@ -158,6 +158,8 @@ extern const KeyRef killStorageKey;
|
|||
extern const KeyRef killStoragePrivateKey;
|
||||
extern const KeyRef rebootWhenDurableKey;
|
||||
extern const KeyRef rebootWhenDurablePrivateKey;
|
||||
extern const KeyRef primaryLocalityKey;
|
||||
extern const KeyRef primaryLocalityPrivateKey;
|
||||
extern const KeyRef fastLoggingEnabled;
|
||||
extern const KeyRef fastLoggingEnabledPrivateKey;
|
||||
|
||||
|
|
|
@ -221,6 +221,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
Reference<AsyncVar<bool>> recruitmentStalled;
|
||||
bool forceRecovery;
|
||||
int8_t safeLocality;
|
||||
int8_t primaryLocality;
|
||||
bool neverCreated;
|
||||
|
||||
MasterData(
|
||||
|
@ -241,6 +242,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
dbId(dbId),
|
||||
forceRecovery(forceRecovery),
|
||||
safeLocality(tagLocalityInvalid),
|
||||
primaryLocality(tagLocalityInvalid),
|
||||
neverCreated(false),
|
||||
lastEpochEnd(invalidVersion),
|
||||
recoveryTransactionVersion(invalidVersion),
|
||||
|
@ -317,10 +319,12 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
|
|||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())) ) ) );
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
|
||||
self->primaryLocality = self->dcId_locality[recr.dcId];
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
|
||||
self->logSystem = newLogSystem;
|
||||
} else {
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
|
||||
self->primaryLocality = tagLocalitySpecial;
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
|
||||
self->logSystem = newLogSystem;
|
||||
}
|
||||
return Void();
|
||||
|
@ -1268,6 +1272,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
tr.write_conflict_ranges.append_deep(recoveryCommitRequest.arena, itr.write_conflict_ranges.begin(), itr.write_conflict_ranges.size());
|
||||
}
|
||||
|
||||
tr.set(recoveryCommitRequest.arena, primaryLocalityKey, BinaryWriter::toValue(self->primaryLocality, Unversioned()));
|
||||
tr.set(recoveryCommitRequest.arena, backupVersionKey, backupVersionValue);
|
||||
tr.set(recoveryCommitRequest.arena, coordinatorsKey, self->coordinators.ccf->getConnectionString().toString());
|
||||
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
|
||||
|
|
|
@ -364,6 +364,7 @@ public:
|
|||
NotifiedVersion oldestVersion; // See also storageVersion()
|
||||
NotifiedVersion durableVersion; // At least this version will be readable from storage after a power failure
|
||||
Version rebootAfterDurableVersion;
|
||||
int8_t primaryLocality;
|
||||
|
||||
Deque<std::pair<Version,Version>> recoveryVersionSkips;
|
||||
int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage server
|
||||
|
@ -484,7 +485,7 @@ public:
|
|||
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
|
||||
durableInProgress(Void()),
|
||||
versionLag(0),
|
||||
versionLag(0), primaryLocality(tagLocalityInvalid),
|
||||
updateEagerReads(0),
|
||||
shardChangeCounter(0),
|
||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
|
||||
|
@ -2274,6 +2275,23 @@ struct OrderByVersion {
|
|||
}
|
||||
};
|
||||
|
||||
#define PERSIST_PREFIX "\xff\xff"
|
||||
|
||||
// Immutable
|
||||
static const KeyValueRef persistFormat( LiteralStringRef( PERSIST_PREFIX "Format" ), LiteralStringRef("FoundationDB/StorageServer/1/4") );
|
||||
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/StorageServer/1/2"), LiteralStringRef("FoundationDB/StorageServer/1/5") );
|
||||
static const KeyRef persistID = LiteralStringRef( PERSIST_PREFIX "ID" );
|
||||
|
||||
// (Potentially) change with the durable version or when fetchKeys completes
|
||||
static const KeyRef persistVersion = LiteralStringRef( PERSIST_PREFIX "Version" );
|
||||
static const KeyRangeRef persistShardAssignedKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAssigned/" ), LiteralStringRef( PERSIST_PREFIX "ShardAssigned0" ) );
|
||||
static const KeyRangeRef persistShardAvailableKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAvailable/" ), LiteralStringRef( PERSIST_PREFIX "ShardAvailable0" ) );
|
||||
static const KeyRangeRef persistByteSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS0" ) );
|
||||
static const KeyRangeRef persistByteSampleSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0" ) );
|
||||
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
|
||||
static const KeyRef persistPrimaryLocality = LiteralStringRef( PERSIST_PREFIX "PrimaryLocality" );
|
||||
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
|
||||
|
||||
class StorageUpdater {
|
||||
public:
|
||||
StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {}
|
||||
|
@ -2364,6 +2382,10 @@ private:
|
|||
} else if (m.type == MutationRef::SetValue && m.param1 == rebootWhenDurablePrivateKey) {
|
||||
data->rebootAfterDurableVersion = currentVersion;
|
||||
TraceEvent("RebootWhenDurableSet", data->thisServerID).detail("DurableVersion", data->durableVersion.get()).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||
} else if (m.type == MutationRef::SetValue && m.param1 == primaryLocalityPrivateKey) {
|
||||
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
|
||||
auto& mLV = data->addVersionToMutationLog( data->data().getLatestVersion() );
|
||||
data->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2) );
|
||||
} else {
|
||||
ASSERT(false); // Unknown private mutation
|
||||
}
|
||||
|
@ -2611,7 +2633,10 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
// .detail("MaxVersionInMemory", maxVersionsInMemory);
|
||||
|
||||
// Trigger updateStorage if necessary
|
||||
Version proposedOldestVersion = std::max(data->version.get(), data->lastTLogVersion) - maxVersionsInMemory;
|
||||
Version proposedOldestVersion = data->version.get() - maxVersionsInMemory;
|
||||
if(data->primaryLocality == tagLocalitySpecial || data->tag.locality == data->primaryLocality) {
|
||||
proposedOldestVersion = std::max(proposedOldestVersion, data->lastTLogVersion - maxVersionsInMemory);
|
||||
}
|
||||
proposedOldestVersion = std::min(proposedOldestVersion, data->version.get()-1);
|
||||
proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
|
||||
proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
|
||||
|
@ -2718,22 +2743,6 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
////////////////////////////////// StorageServerDisk ///////////////////////////////////////
|
||||
#pragma region StorageServerDisk
|
||||
|
||||
#define PERSIST_PREFIX "\xff\xff"
|
||||
|
||||
// Immutable
|
||||
static const KeyValueRef persistFormat( LiteralStringRef( PERSIST_PREFIX "Format" ), LiteralStringRef("FoundationDB/StorageServer/1/4") );
|
||||
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/StorageServer/1/2"), LiteralStringRef("FoundationDB/StorageServer/1/5") );
|
||||
static const KeyRef persistID = LiteralStringRef( PERSIST_PREFIX "ID" );
|
||||
|
||||
// (Potentially) change with the durable version or when fetchKeys completes
|
||||
static const KeyRef persistVersion = LiteralStringRef( PERSIST_PREFIX "Version" );
|
||||
static const KeyRangeRef persistShardAssignedKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAssigned/" ), LiteralStringRef( PERSIST_PREFIX "ShardAssigned0" ) );
|
||||
static const KeyRangeRef persistShardAvailableKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAvailable/" ), LiteralStringRef( PERSIST_PREFIX "ShardAvailable0" ) );
|
||||
static const KeyRangeRef persistByteSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS0" ) );
|
||||
static const KeyRangeRef persistByteSampleSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0" ) );
|
||||
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
|
||||
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
|
||||
|
||||
void StorageServerDisk::makeNewStorageServerDurable() {
|
||||
storage->set( persistFormat );
|
||||
storage->set( KeyValueRef(persistID, BinaryWriter::toValue(data->thisServerID, Unversioned())) );
|
||||
|
@ -2921,6 +2930,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
|
|||
state Future<Optional<Value>> fID = storage->readValue(persistID);
|
||||
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
|
||||
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
|
||||
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
|
||||
|
||||
|
@ -2928,7 +2938,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
|
|||
data->byteSampleRecovery = restoreByteSample(data, storage, byteSampleSampleRecovered);
|
||||
|
||||
TraceEvent("ReadingDurableState", data->thisServerID);
|
||||
wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fID, fVersion, fLogProtocol) ) );
|
||||
wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fID, fVersion, fLogProtocol, fPrimaryLocality) ) );
|
||||
wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fShardAssigned, fShardAvailable) ) );
|
||||
wait( byteSampleSampleRecovered.getFuture() );
|
||||
TraceEvent("RestoringDurableState", data->thisServerID);
|
||||
|
@ -2951,6 +2961,9 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
|
|||
if (fLogProtocol.get().present())
|
||||
data->logProtocol = BinaryReader::fromStringRef<uint64_t>(fLogProtocol.get().get(), Unversioned());
|
||||
|
||||
if (fPrimaryLocality.get().present())
|
||||
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
|
||||
|
||||
state Version version = BinaryReader::fromStringRef<Version>( fVersion.get().get(), Unversioned() );
|
||||
debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
|
||||
data->setInitialVersion( version );
|
||||
|
|
Loading…
Reference in New Issue