Implement limiting how many bytes recovery will read.
This time, track what location in the DiskQueue has been spilled in persistent state, and then feed it back into the disk queue before recovery. This also introduces an ASSERT that recovery only reads exactly the bytes that it needs to have in memory.
This commit is contained in:
parent
29ab7370cd
commit
37ea71b117
|
@ -819,7 +819,7 @@ public:
|
||||||
rawQueue->stall();
|
rawQueue->stall();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Future<bool> initializeRecovery() { return initializeRecovery( this ); }
|
virtual Future<bool> initializeRecovery(location recoverAt) { return initializeRecovery( this, recoverAt ); }
|
||||||
virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
|
virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
|
||||||
|
|
||||||
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
|
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
|
||||||
|
@ -1118,7 +1118,7 @@ private:
|
||||||
ASSERT( !self->recovered );
|
ASSERT( !self->recovered );
|
||||||
|
|
||||||
if (!self->initialized) {
|
if (!self->initialized) {
|
||||||
bool recoveryComplete = wait( initializeRecovery(self) );
|
bool recoveryComplete = wait( initializeRecovery(self, 0) );
|
||||||
|
|
||||||
if (recoveryComplete) {
|
if (recoveryComplete) {
|
||||||
ASSERT( self->poppedSeq <= self->endLocation() );
|
ASSERT( self->poppedSeq <= self->endLocation() );
|
||||||
|
@ -1182,7 +1182,7 @@ private:
|
||||||
return result.str;
|
return result.str;
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<bool> initializeRecovery( DiskQueue* self ) {
|
ACTOR static Future<bool> initializeRecovery( DiskQueue* self, location recoverAt ) {
|
||||||
if (self->initialized) {
|
if (self->initialized) {
|
||||||
return self->recovered;
|
return self->recovered;
|
||||||
}
|
}
|
||||||
|
@ -1199,7 +1199,11 @@ private:
|
||||||
|
|
||||||
Page* lastPage = (Page*)lastPageData.begin();
|
Page* lastPage = (Page*)lastPageData.begin();
|
||||||
self->poppedSeq = lastPage->popped;
|
self->poppedSeq = lastPage->popped;
|
||||||
self->nextReadLocation = lastPage->popped;
|
if (self->diskQueueVersion >= DiskQueueVersion::V1) {
|
||||||
|
self->nextReadLocation = std::max(recoverAt.lo, self->poppedSeq);
|
||||||
|
} else {
|
||||||
|
self->nextReadLocation = lastPage->popped;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
state std::auto_ptr<Page> testPage(new Page);
|
state std::auto_ptr<Page> testPage(new Page);
|
||||||
|
@ -1219,7 +1223,7 @@ private:
|
||||||
|
|
||||||
self->readBufPos = self->nextReadLocation % sizeof(Page) - sizeof(PageHeader);
|
self->readBufPos = self->nextReadLocation % sizeof(Page) - sizeof(PageHeader);
|
||||||
if (self->readBufPos < 0) { self->nextReadLocation -= self->readBufPos; self->readBufPos = 0; }
|
if (self->readBufPos < 0) { self->nextReadLocation -= self->readBufPos; self->readBufPos = 0; }
|
||||||
TraceEvent("DQRecStart", self->dbgid).detail("ReadBufPos", self->readBufPos).detail("NextReadLoc", self->nextReadLocation).detail("File0Name", self->rawQueue->files[0].dbgFilename);
|
TraceEvent("DQRecStart", self->dbgid).detail("ReadBufPos", self->readBufPos).detail("NextReadLoc", self->nextReadLocation).detail("Popped", self->poppedSeq).detail("MinRecoverAt", recoverAt).detail("File0Name", self->rawQueue->files[0].dbgFilename);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1324,7 +1328,7 @@ public:
|
||||||
void close() { queue->close(); delete this; }
|
void close() { queue->close(); delete this; }
|
||||||
|
|
||||||
//IDiskQueue
|
//IDiskQueue
|
||||||
Future<bool> initializeRecovery() { return queue->initializeRecovery(); }
|
Future<bool> initializeRecovery(location recoverAt) { return queue->initializeRecovery(recoverAt); }
|
||||||
Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
|
Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
|
||||||
|
|
||||||
virtual location getNextReadLocation() { return queue->getNextReadLocation(); }
|
virtual location getNextReadLocation() { return queue->getNextReadLocation(); }
|
||||||
|
|
|
@ -60,9 +60,10 @@ public:
|
||||||
//! Most importantly, most invariants only hold after this function returns, and
|
//! Most importantly, most invariants only hold after this function returns, and
|
||||||
//! some functions assert that the IDiskQueue has been initialized.
|
//! some functions assert that the IDiskQueue has been initialized.
|
||||||
//!
|
//!
|
||||||
|
//! \param recoverAt The minimum location from which to start recovery.
|
||||||
//! \returns True, if DiskQueue is now considered in a recovered state.
|
//! \returns True, if DiskQueue is now considered in a recovered state.
|
||||||
//! False, if the caller should call readNext until recovered is true.
|
//! False, if the caller should call readNext until recovered is true.
|
||||||
virtual Future<bool> initializeRecovery() = 0;
|
virtual Future<bool> initializeRecovery(location recoverAt) = 0;
|
||||||
// Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes.
|
// Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes.
|
||||||
// Thereafter it may not be called again.
|
// Thereafter it may not be called again.
|
||||||
virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte)
|
virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte)
|
||||||
|
|
|
@ -79,7 +79,7 @@ public:
|
||||||
virtual void close();
|
virtual void close();
|
||||||
|
|
||||||
// IDiskQueue interface
|
// IDiskQueue interface
|
||||||
virtual Future<bool> initializeRecovery() { return false; }
|
virtual Future<bool> initializeRecovery(location recoverAt) { return false; }
|
||||||
virtual Future<Standalone<StringRef>> readNext( int bytes );
|
virtual Future<Standalone<StringRef>> readNext( int bytes );
|
||||||
virtual IDiskQueue::location getNextReadLocation();
|
virtual IDiskQueue::location getNextReadLocation();
|
||||||
virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); }
|
virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); }
|
||||||
|
|
|
@ -119,6 +119,10 @@ public:
|
||||||
return readNext( this, tLog );
|
return readNext( this, tLog );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<bool> initializeRecovery( IDiskQueue::location recoverAt ) {
|
||||||
|
return queue->initializeRecovery( recoverAt );
|
||||||
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
void push( T const& qe, Reference<LogData> logData );
|
void push( T const& qe, Reference<LogData> logData );
|
||||||
void forgetBefore( Version upToVersion, Reference<LogData> logData );
|
void forgetBefore( Version upToVersion, Reference<LogData> logData );
|
||||||
|
@ -141,10 +145,6 @@ private:
|
||||||
state TLogQueueEntry result;
|
state TLogQueueEntry result;
|
||||||
state int zeroFillSize = 0;
|
state int zeroFillSize = 0;
|
||||||
|
|
||||||
// FIXME: initializeRecovery should probably be called on its own by the caller of readNext.
|
|
||||||
bool recoveryFinished = wait( self->queue->initializeRecovery() );
|
|
||||||
if (recoveryFinished) throw end_of_stream();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
state IDiskQueue::location startloc = self->queue->getNextReadLocation();
|
state IDiskQueue::location startloc = self->queue->getNextReadLocation();
|
||||||
Standalone<StringRef> h = wait( self->queue->readNext( sizeof(uint32_t) ) );
|
Standalone<StringRef> h = wait( self->queue->readNext( sizeof(uint32_t) ) );
|
||||||
|
@ -201,6 +201,7 @@ static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRe
|
||||||
// Updated on updatePersistentData()
|
// Updated on updatePersistentData()
|
||||||
static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringRef( "version/" ), LiteralStringRef( "version0" ) );
|
static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringRef( "version/" ), LiteralStringRef( "version0" ) );
|
||||||
static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) );
|
static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) );
|
||||||
|
static const KeyRef persistRecoveryLocationKey = KeyRef( LiteralStringRef( "recoveryLocation" ) );
|
||||||
static const KeyRangeRef persistLocalityKeys = KeyRangeRef( LiteralStringRef( "Locality/" ), LiteralStringRef( "Locality0" ) );
|
static const KeyRangeRef persistLocalityKeys = KeyRangeRef( LiteralStringRef( "Locality/" ), LiteralStringRef( "Locality0" ) );
|
||||||
static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) );
|
static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) );
|
||||||
static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/"));
|
static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/"));
|
||||||
|
@ -526,6 +527,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) );
|
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) );
|
||||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
|
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
|
||||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistProtocolVersionKeys.begin)) );
|
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistProtocolVersionKeys.begin)) );
|
||||||
|
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryLocationKey)) );
|
||||||
Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin);
|
Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin);
|
||||||
tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) );
|
tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) );
|
||||||
Key msgRefKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin);
|
Key msgRefKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin);
|
||||||
|
@ -812,6 +814,11 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto locationIter = logData->versionLocation.lower_bound(newPersistentDataVersion);
|
||||||
|
if (locationIter != logData->versionLocation.end()) {
|
||||||
|
self->persistentData->set( KeyValueRef( persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.second,Unversioned()) ) );
|
||||||
|
}
|
||||||
|
|
||||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||||
logData->persistentDataVersion = newPersistentDataVersion;
|
logData->persistentDataVersion = newPersistentDataVersion;
|
||||||
|
@ -2032,6 +2039,9 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
||||||
ACTOR Future<Void> checkEmptyQueue(TLogData* self) {
|
ACTOR Future<Void> checkEmptyQueue(TLogData* self) {
|
||||||
TraceEvent("TLogCheckEmptyQueueBegin", self->dbgid);
|
TraceEvent("TLogCheckEmptyQueueBegin", self->dbgid);
|
||||||
try {
|
try {
|
||||||
|
bool recoveryFinished = wait( self->persistentQueue->initializeRecovery(0) );
|
||||||
|
if (recoveryFinished)
|
||||||
|
return Void();
|
||||||
TLogQueueEntry r = wait( self->persistentQueue->readNext(self) );
|
TLogQueueEntry r = wait( self->persistentQueue->readNext(self) );
|
||||||
throw internal_error();
|
throw internal_error();
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
@ -2059,6 +2069,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
state IKeyValueStore *storage = self->persistentData;
|
state IKeyValueStore *storage = self->persistentData;
|
||||||
wait(storage->init());
|
wait(storage->init());
|
||||||
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
|
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
|
||||||
|
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
|
||||||
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
|
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
|
||||||
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
|
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
|
||||||
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
|
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
|
||||||
|
@ -2068,8 +2079,8 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
|
|
||||||
// FIXME: metadata in queue?
|
// FIXME: metadata in queue?
|
||||||
|
|
||||||
wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat ) ) );
|
wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fRecoveryLocation ) ) );
|
||||||
wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fVers, fKnownCommitted, fLocality, fLogRouterTags, fRecoverCounts, fProtocolVersions) ) );
|
wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fVers, fKnownCommitted, fLocality, fLogRouterTags, fRecoverCounts, fProtocolVersions ) ) );
|
||||||
|
|
||||||
if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) {
|
if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) {
|
||||||
//FIXME: remove when we no longer need to test upgrades from 4.X releases
|
//FIXME: remove when we no longer need to test upgrades from 4.X releases
|
||||||
|
@ -2116,6 +2127,11 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
id_knownCommitted[ BinaryReader::fromStringRef<UID>(it.key.removePrefix(persistKnownCommittedVersionKeys.begin), Unversioned())] = BinaryReader::fromStringRef<Version>( it.value, Unversioned() );
|
id_knownCommitted[ BinaryReader::fromStringRef<UID>(it.key.removePrefix(persistKnownCommittedVersionKeys.begin), Unversioned())] = BinaryReader::fromStringRef<Version>( it.value, Unversioned() );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state IDiskQueue::location minimumRecoveryLocation = 0;
|
||||||
|
if (fRecoveryLocation.get().present()) {
|
||||||
|
minimumRecoveryLocation = BinaryReader::fromStringRef<IDiskQueue::location>(fRecoveryLocation.get().get(), Unversioned());
|
||||||
|
}
|
||||||
|
|
||||||
state int idx = 0;
|
state int idx = 0;
|
||||||
state Promise<Void> registerWithMaster;
|
state Promise<Void> registerWithMaster;
|
||||||
state std::map<UID, TLogInterface> id_interf;
|
state std::map<UID, TLogInterface> id_interf;
|
||||||
|
@ -2155,8 +2171,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
removed.push_back(errorOr(logData->removed));
|
removed.push_back(errorOr(logData->removed));
|
||||||
logsByVersion.push_back(std::make_pair(ver, id1));
|
logsByVersion.push_back(std::make_pair(ver, id1));
|
||||||
|
|
||||||
TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver);
|
TraceEvent("TLogPersistentStateRestore", self->dbgid).detail("LogId", logData->logId).detail("Ver", ver);
|
||||||
|
|
||||||
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion might be lost, but
|
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion might be lost, but
|
||||||
// that is fine because we will get the corresponding data back, too.
|
// that is fine because we will get the corresponding data back, too.
|
||||||
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
|
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
|
||||||
|
@ -2193,6 +2208,9 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
(double)SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT});
|
(double)SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
bool recoveryFinished = wait( self->persistentQueue->initializeRecovery(minimumRecoveryLocation) );
|
||||||
|
if (recoveryFinished)
|
||||||
|
throw end_of_stream();
|
||||||
loop {
|
loop {
|
||||||
if(allRemoved.isReady()) {
|
if(allRemoved.isReady()) {
|
||||||
TEST(true); //all tlogs removed during queue recovery
|
TEST(true); //all tlogs removed during queue recovery
|
||||||
|
@ -2232,6 +2250,11 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
||||||
when( wait( allRemoved ) ) { throw worker_removed(); }
|
when( wait( allRemoved ) ) { throw worker_removed(); }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Updating persistRecoveryLocation and persistCurrentVersion at the same time,
|
||||||
|
// transactionally, should mean that we never read any TLogQueueEntry that has already
|
||||||
|
// been spilled.
|
||||||
|
ASSERT_WE_THINK(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue