fix: persist the known committed version on the tlogs
This commit is contained in:
parent
a6d9e889f0
commit
8d350ceb5f
|
@ -187,6 +187,7 @@ static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRe
|
|||
|
||||
// Updated on updatePersistentData()
|
||||
static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringRef( "version/" ), LiteralStringRef( "version0" ) );
|
||||
static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) );
|
||||
static const KeyRangeRef persistUnrecoveredBeforeVersionKeys = KeyRangeRef( LiteralStringRef( "UnrecoveredBefore/" ), LiteralStringRef( "UnrecoveredBefore0" ) );
|
||||
static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) );
|
||||
static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/"));
|
||||
|
@ -443,6 +444,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
if(!tLogData->terminated) {
|
||||
Key logIdKey = BinaryWriter::toValue(logId,Unversioned());
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistCurrentVersionKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistKnownCommittedVersionKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistUnrecoveredBeforeVersionKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
|
||||
|
@ -589,7 +591,8 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
}
|
||||
}
|
||||
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||
logData->persistentDataVersion = newPersistentDataVersion;
|
||||
|
||||
Void _ = wait( self->persistentData->commit() ); // SOMEDAY: This seems to be running pretty often, should we slow it down???
|
||||
|
@ -1220,7 +1223,8 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
|
||||
IKeyValueStore *storage = self->persistentData;
|
||||
storage->set( persistFormat );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistUnrecoveredBeforeVersionKeys.begin), BinaryWriter::toValue(logData->unrecoveredBefore, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistLogRouterTagsKeys.begin), BinaryWriter::toValue(logData->logRouterTags, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistRecoveryCountKeys.begin), BinaryWriter::toValue(logData->recoveryCount, Unversioned()) ) );
|
||||
|
@ -1603,6 +1607,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
IKeyValueStore *storage = self->persistentData;
|
||||
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fUnrecoveredBefore = storage->readRange(persistUnrecoveredBeforeVersionKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
|
||||
|
@ -1610,7 +1615,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
// FIXME: metadata in queue?
|
||||
|
||||
Void _ = wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat ) ) );
|
||||
Void _ = wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fVers, fUnrecoveredBefore, fLogRouterTags, fRecoverCounts) ) );
|
||||
Void _ = wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fVers, fKnownCommitted, fUnrecoveredBefore, fLogRouterTags, fRecoverCounts) ) );
|
||||
|
||||
if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) {
|
||||
//FIXME: remove when we no longer need to test upgrades from 4.X releases
|
||||
|
@ -1660,6 +1665,11 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
id_logRouterTags[ BinaryReader::fromStringRef<UID>(it.key.removePrefix(persistLogRouterTagsKeys.begin), Unversioned())] = BinaryReader::fromStringRef<int>( it.value, Unversioned() );
|
||||
}
|
||||
|
||||
state std::map<UID, Version> id_knownCommitted;
|
||||
for(auto it : fKnownCommitted.get()) {
|
||||
id_knownCommitted[ BinaryReader::fromStringRef<UID>(it.key.removePrefix(persistKnownCommittedVersionKeys.begin), Unversioned())] = BinaryReader::fromStringRef<Version>( it.value, Unversioned() );
|
||||
}
|
||||
|
||||
state int idx = 0;
|
||||
state Promise<Void> registerWithMaster;
|
||||
state std::map<UID, TLogInterface> id_interf;
|
||||
|
@ -1686,6 +1696,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
id_interf[id1] = recruited;
|
||||
|
||||
logData->unrecoveredBefore = id_unrecoveredBefore[id1];
|
||||
logData->knownCommittedVersion = id_knownCommitted[id1];
|
||||
Version ver = BinaryReader::fromStringRef<Version>( fVers.get()[idx].value, Unversioned() );
|
||||
logData->persistentDataVersion = ver;
|
||||
logData->persistentDataDurableVersion = ver;
|
||||
|
|
Loading…
Reference in New Issue