Add a standalone recovery initialization function.

This commit is contained in:
Alex Miller 2019-02-07 17:02:25 -08:00
parent 2f49acc8a0
commit 8b21d1ac8f
3 changed files with 34 additions and 15 deletions

View File

@ -689,9 +689,10 @@ public:
class DiskQueue : public IDiskQueue, public Tracked<DiskQueue> { class DiskQueue : public IDiskQueue, public Tracked<DiskQueue> {
public: public:
// FIXME: Is setting lastCommittedSeq to -1 instead of 0 necessary?
DiskQueue( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit ) DiskQueue( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
: rawQueue( new RawDiskQueue_TwoFiles(basename, fileExtension, dbgid, fileSizeWarningLimit) ), dbgid(dbgid), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0), : rawQueue( new RawDiskQueue_TwoFiles(basename, fileExtension, dbgid, fileSizeWarningLimit) ), dbgid(dbgid), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0),
nextReadLocation(-1), readBufPage(NULL), readBufPos(0), pushed_page_buffer(NULL), recovered(false), lastCommittedSeq(0), warnAlwaysForMemory(true) nextReadLocation(-1), readBufPage(NULL), readBufPos(0), pushed_page_buffer(NULL), recovered(false), initialized(false), lastCommittedSeq(-1), warnAlwaysForMemory(true)
{ {
} }
@ -786,8 +787,11 @@ public:
rawQueue->stall(); rawQueue->stall();
} }
virtual Future<bool> initializeRecovery() { return initializeRecovery( this ); }
virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); } virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
// to be changed to understand the new intiailizeRecovery protocol.
virtual location getNextReadLocation() { return nextReadLocation; } virtual location getNextReadLocation() { return nextReadLocation; }
virtual Future<Void> getError() { return rawQueue->getError(); } virtual Future<Void> getError() { return rawQueue->getError(); }
@ -1028,21 +1032,14 @@ private:
ASSERT( !self->recovered ); ASSERT( !self->recovered );
if (self->nextReadLocation < 0) { if (!self->initialized) {
bool nonempty = wait( findStart(self) ); bool recoveryComplete = wait( initializeRecovery(self) );
if (!nonempty) {
// The constructor has already put everything in the right state for an empty queue
self->recovered = true;
ASSERT( self->poppedSeq <= self->endLocation() );
//The next read location isn't necessarily the end of the last commit, but this is sufficient for helping us check an ASSERTion if (recoveryComplete) {
self->lastCommittedSeq = self->nextReadLocation; ASSERT( self->poppedSeq <= self->endLocation() );
return Standalone<StringRef>(); return Standalone<StringRef>();
} }
self->readBufPos = self->nextReadLocation % sizeof(Page) - sizeof(PageHeader);
if (self->readBufPos < 0) { self->nextReadLocation -= self->readBufPos; self->readBufPos = 0; }
TraceEvent("DQRecStart", self->dbgid).detail("ReadBufPos", self->readBufPos).detail("NextReadLoc", self->nextReadLocation).detail("File0Name", self->rawQueue->files[0].dbgFilename);
} }
loop { loop {
@ -1100,13 +1097,19 @@ private:
return result.str; return result.str;
} }
ACTOR static Future<bool> findStart( DiskQueue* self ) { ACTOR static Future<bool> initializeRecovery( DiskQueue* self ) {
if (self->initialized) {
return self->recovered;
}
Standalone<StringRef> lastPageData = wait( self->rawQueue->readFirstAndLastPages( &comparePages ) ); Standalone<StringRef> lastPageData = wait( self->rawQueue->readFirstAndLastPages( &comparePages ) );
self->initialized = true;
if (!lastPageData.size()) { if (!lastPageData.size()) {
// There are no valid pages, so apparently this is a completely empty queue // There are no valid pages, so apparently this is a completely empty queue
self->nextReadLocation = 0; self->nextReadLocation = 0;
return false; self->lastCommittedSeq = 0;
self->recovered = true;
return true;
} }
Page* lastPage = (Page*)lastPageData.begin(); Page* lastPage = (Page*)lastPageData.begin();
@ -1128,10 +1131,15 @@ private:
self->findPhysicalLocation( self->poppedSeq, &file, &page, "poppedSeq" ); self->findPhysicalLocation( self->poppedSeq, &file, &page, "poppedSeq" );
self->rawQueue->setStartPage( file, page ); self->rawQueue->setStartPage( file, page );
return true; self->readBufPos = self->nextReadLocation % sizeof(Page) - sizeof(PageHeader);
if (self->readBufPos < 0) { self->nextReadLocation -= self->readBufPos; self->readBufPos = 0; }
TraceEvent("DQRecStart", self->dbgid).detail("ReadBufPos", self->readBufPos).detail("NextReadLoc", self->nextReadLocation).detail("File0Name", self->rawQueue->files[0].dbgFilename);
return false;
} }
Page& firstPages(int i) { Page& firstPages(int i) {
ASSERT( initialized );
return *(Page*)rawQueue->firstPages[i]; return *(Page*)rawQueue->firstPages[i];
} }
@ -1204,6 +1212,7 @@ private:
// Recovery state // Recovery state
bool recovered; bool recovered;
bool initialized;
loc_t nextReadLocation; loc_t nextReadLocation;
Arena readBufArena; Arena readBufArena;
Page* readBufPage; Page* readBufPage;
@ -1226,6 +1235,7 @@ public:
void close() { queue->close(); delete this; } void close() { queue->close(); delete this; }
//IDiskQueue //IDiskQueue
Future<bool> initializeRecovery() { return queue->initializeRecovery(); }
Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); } Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
virtual location getNextReadLocation() { return queue->getNextReadLocation(); } virtual location getNextReadLocation() { return queue->getNextReadLocation(); }

View File

@ -41,6 +41,14 @@ public:
} }
}; };
//! Find the first and last pages in the disk queue, and initialize invariants.
//!
//! Most importantly, most invariants only hold after this function returns, and
//! some functions assert that the IDiskQueue has been initialized.
//!
//! \returns True, if DiskQueue is now considered in a recovered state.
//! False, if the caller should call readNext until recovered is true.
virtual Future<bool> initializeRecovery() = 0;
// Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes. // Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes.
// Thereafter it may not be called again. // Thereafter it may not be called again.
virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte) virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte)

View File

@ -67,6 +67,7 @@ public:
virtual void close(); virtual void close();
// IDiskQueue interface // IDiskQueue interface
virtual Future<bool> initializeRecovery() { return false; }
virtual Future<Standalone<StringRef>> readNext( int bytes ); virtual Future<Standalone<StringRef>> readNext( int bytes );
virtual IDiskQueue::location getNextReadLocation(); virtual IDiskQueue::location getNextReadLocation();
virtual IDiskQueue::location push( StringRef contents ); virtual IDiskQueue::location push( StringRef contents );