foundationdb/fdbserver/DiskQueue.actor.cpp

1449 lines
57 KiB
C++

/*
* DiskQueue.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/crc32c.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
typedef bool(*compare_pages)(void*,void*);
typedef int64_t loc_t;
// 0 -> 0
// 1 -> 4k
// 4k -> 4k
int64_t pageCeiling( int64_t loc ) {
return (loc+_PAGE_SIZE-1)/_PAGE_SIZE*_PAGE_SIZE;
}
// 0 -> 0
// 1 -> 0
// 4k -> 4k
int64_t pageFloor( int64_t loc ) {
return loc / _PAGE_SIZE * _PAGE_SIZE;
}
struct StringBuffer {
Standalone<StringRef> str;
int reserved;
UID id;
StringBuffer(UID fromFileID) : reserved(0), id( fromFileID ) {}
int size() const { return str.size(); }
StringRef& ref() { return str; }
void clear() {
str = Standalone<StringRef>();
reserved = 0;
}
void clearReserve(int size) {
str = Standalone<StringRef>();
reserved = size;
ref() = StringRef( new (str.arena()) uint8_t[size], 0 );
}
void append( StringRef x ) {
memcpy( append(x.size()), x.begin(), x.size() );
}
void* append(int bytes) {
ASSERT( str.size() + bytes <= reserved );
void* p = const_cast<uint8_t*>(str.end());
ref() = StringRef( str.begin(), str.size()+bytes );
return p;
}
StringRef pop_front(int bytes) {
ASSERT( bytes <= str.size() );
StringRef result = str.substr(0, bytes);
ref() = str.substr(bytes);
return result;
}
void alignReserve(int alignment, int size) {
ASSERT( alignment && (alignment & (alignment-1)) == 0 ); // alignment is a power of two
if (size >= reserved) {
// SOMEDAY: Use a new arena and discard the old one after copying?
reserved = std::max( size, reserved*2 );
if( reserved > 1e9 ) {
printf("WOAH! Huge allocation\n");
TraceEvent(SevError, "StringBufferHugeAllocation", id).detail("Alignment", alignment).detail("Reserved", reserved).backtrace();
}
uint8_t* b = new (str.arena()) uint8_t[reserved+alignment-1];
uint8_t* e = b + (reserved+alignment-1);
uint8_t* p = (uint8_t*)(int64_t(b+alignment-1) & ~(alignment-1)); // first multiple of alignment greater than or equal to b
ASSERT( p>=b && p+reserved<=e && int64_t(p)%alignment == 0 );
memcpy(p, str.begin(), str.size());
ref() = StringRef( p, str.size() );
}
}
};
struct SyncQueue : ReferenceCounted<SyncQueue> {
SyncQueue( int outstandingLimit, Reference<IAsyncFile> file )
: outstandingLimit(outstandingLimit), file(file)
{
for(int i=0; i<outstandingLimit; i++)
outstanding.push_back( Void() );
}
Future<Void> onSync() { // Future is set when all writes completed before the call to onSync are complete
if (outstanding.size() <= outstandingLimit)
outstanding.push_back( waitAndSync(this) );
return outstanding.back();
}
private:
int outstandingLimit;
Deque<Future<Void>> outstanding;
Reference<IAsyncFile> file;
ACTOR static Future<Void> waitAndSync(SyncQueue* self) {
wait( self->outstanding.front() );
self->outstanding.pop_front();
wait( self->file->sync() );
return Void();
}
};
// We use a Tracked instead of a Reference when the shutdown/destructor code would need to wait().
template <typename T>
class Tracked {
protected:
struct TrackMe : NonCopyable {
T* self;
explicit TrackMe( T* self ) : self(self) {
self->actorCount++;
if (self->actorCount == 1) self->actorCountIsZero.set(false);
}
~TrackMe() {
self->actorCount--;
if (self->actorCount == 0) self->actorCountIsZero.set(true);
}
};
Future<Void> onSafeToDestruct() {
if (actorCountIsZero.get()) {
return Void();
} else {
return actorCountIsZero.onChange();
}
}
private:
int actorCount = 0;
AsyncVar<bool> actorCountIsZero = true;
};
class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
public:
RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer( dbgid ),
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true)
{
if (BUGGIFY)
fileExtensionBytes = _PAGE_SIZE * deterministicRandom()->randomSkewedUInt32( 1, 10<<10 );
if (BUGGIFY)
fileShrinkBytes = _PAGE_SIZE * deterministicRandom()->randomSkewedUInt32( 1, 10<<10 );
files[0].dbgFilename = filename(0);
files[1].dbgFilename = filename(1);
// We issue reads into firstPages, so it needs to be 4k aligned.
firstPages.reserve(firstPages.arena(), 2);
void* pageMemory = operator new (sizeof(Page) * 3, firstPages.arena());
// firstPages is assumed to always be a valid page, and our initialization here is the only
// time that it would not contain a valid page. Whenever DiskQueue reaches in to look at
// these bytes, it only cares about `seq`, and having that be all 0xFF's means uninitialized
// pages will look like the ultimate end of the disk queue, rather than the beginning of it.
// This makes code fail in more immediate and obvious ways.
firstPages[0] = (Page*)((((uintptr_t)pageMemory + 4095) / 4096) * 4096);
memset(firstPages[0], 0xFF, sizeof(Page));
firstPages[1] = (Page*)((uintptr_t)firstPages[0] + 4096);
memset(firstPages[1], 0xFF, sizeof(Page));
stallCount.init(LiteralStringRef("RawDiskQueue.StallCount"));
}
Future<Void> pushAndCommit( StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages ) {
return pushAndCommit( this, pageData, pageMem, poppedPages );
}
void stall() {
stallCount++;
readyToPush = lastCommit;
}
Future<Standalone<StringRef>> readFirstAndLastPages( compare_pages compare ) { return readFirstAndLastPages(this,compare); }
void setStartPage( int file, int64_t page ) {
TraceEvent("RDQSetStart", dbgid).detail("FileNum",file).detail("PageNum",page).detail("File0Name", files[0].dbgFilename);
readingFile = file;
readingPage = page;
}
Future<Void> setPoppedPage( int file, int64_t page, int64_t debugSeq ) { return setPoppedPage(this, file, page, debugSeq); }
// FIXME: let the caller pass in where to write the data.
Future<Standalone<StringRef>> read(int file, int page, int nPages) { return read(this, file, page, nPages); }
Future<Standalone<StringRef>> readNextPage() { return readNextPage(this); }
Future<Void> truncateBeforeLastReadPage() { return truncateBeforeLastReadPage(this); }
Future<Void> getError() { return onError; }
Future<Void> onClosed() { return onStopped; }
void dispose() { shutdown(this, true); }
void close() { shutdown(this, false); }
StorageBytes getStorageBytes() {
int64_t free;
int64_t total;
g_network->getDiskBytes(parentDirectory(basename), free, total);
return StorageBytes(free, total, files[0].size + files[1].size, free); // TODO: we could potentially do better in the available field by accounting for the unused pages at the end of the file
}
//private:
struct Page { uint8_t data[_PAGE_SIZE]; };
struct File {
Reference<IAsyncFile> f;
int64_t size; // always a multiple of _PAGE_SIZE, even if the physical file isn't for some reason
int64_t popped;
std::string dbgFilename;
Reference<SyncQueue> syncQueue;
File() : size(-1), popped(-1) {}
void setFile(Reference<IAsyncFile> f) {
this->f = f;
this->syncQueue = Reference<SyncQueue>( new SyncQueue(1, f) );
}
};
File files[2]; // After readFirstAndLastPages(), files[0] is logically before files[1] (pushes are always into files[1])
Standalone<VectorRef<Page*>> firstPages;
std::string basename;
std::string fileExtension;
std::string filename(int i) const { return basename + format("%d.%s", i, fileExtension.c_str()); }
UID dbgid;
int64_t dbg_file0BeginSeq;
int64_t fileSizeWarningLimit;
Promise<Void> error, stopped;
Future<Void> onError, onStopped;
Future<Void> readyToPush;
Future<Void> lastCommit;
bool isFirstCommit;
StringBuffer readingBuffer; // Pages that have been read and not yet returned
int readingFile; // i if the next page after readingBuffer should be read from files[i], 2 if recovery is complete
int64_t readingPage; // Page within readingFile that is the next page after readingBuffer
int64_t writingPos; // Position within files[1] that will be next written
int64_t fileExtensionBytes;
int64_t fileShrinkBytes;
Int64MetricHandle stallCount;
Future<Void> truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); }
// FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile().
ACTOR static void incrementalTruncate(Reference<IAsyncFile> file) {
state int64_t remainingFileSize = wait( file->size() );
for( ; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT ){
wait(file->truncate(remainingFileSize));
wait(file->sync());
wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL));
}
TraceEvent("DiskQueueReplaceTruncateEnded").detail("Filename", file->getFilename());
}
#if defined(_WIN32)
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
// Windows doesn't support a rename over an open file.
wait( toReplace->truncate(4<<10) );
return toReplace;
}
#else
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
incrementalTruncate( toReplace );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0600 ) );
state Reference<IAsyncFile> replacement = _replacement;
wait( replacement->sync() );
return replacement;
}
#endif
Future<Future<Void>> push(StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
return push( this, pageData, toSync );
}
ACTOR static Future<Future<Void>> push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
ASSERT( self->readingFile == 2 );
ASSERT( pageData.size() % _PAGE_SIZE == 0 );
ASSERT( int64_t(pageData.begin()) % _PAGE_SIZE == 0 );
ASSERT( self->writingPos % _PAGE_SIZE == 0 );
ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 );
state vector<Future<Void>> waitfor;
if (pageData.size() + self->writingPos > self->files[1].size) {
if ( self->files[0].popped == self->files[0].size ) {
// Finish self->files[1] and swap
int p = self->files[1].size - self->writingPos;
if(p > 0) {
toSync->push_back( self->files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
waitfor.push_back( self->files[1].f->write( pageData.begin(), p, self->writingPos ) );
pageData = pageData.substr( p );
}
self->dbg_file0BeginSeq += self->files[0].size;
std::swap(self->files[0], self->files[1]);
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[1].popped = 0;
self->writingPos = 0;
*self->firstPages[1] = *(const Page*)pageData.begin();
const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes);
const int64_t desiredMaxFileSize = pageCeiling( std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 ) );
const bool frivolouslyTruncate = BUGGIFY_WITH_PROB(0.1);
if (self->files[1].size > desiredMaxFileSize || frivolouslyTruncate) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = pageFloor( std::max( self->files[1].size - desiredMaxFileSize, self->fileShrinkBytes ) );
if ((maxShrink > SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_BYTES) ||
(frivolouslyTruncate && deterministicRandom()->random01() < 0.3)) {
TEST(true); // Replacing DiskQueue file
TraceEvent("DiskQueueReplaceFile", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", self->files[1].size).detail("ElidedTruncateSize", maxShrink);
Reference<IAsyncFile> newFile = wait( replaceFile(self->files[1].f) );
self->files[1].setFile(newFile);
waitfor.push_back( self->files[1].f->truncate( self->fileExtensionBytes ) );
self->files[1].size = self->fileExtensionBytes;
} else {
const int64_t startingSize = self->files[1].size;
self->files[1].size -= std::min(maxShrink, self->files[1].size);
self->files[1].size = std::max(self->files[1].size, self->fileExtensionBytes);
TraceEvent("DiskQueueTruncate", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", startingSize).detail("NewFileSize", self->files[1].size);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
}
} else {
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("ExtensionBytes", fileExtensionBytes);*/
int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size;
self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension), self->files[0].size+self->files[1].size+minExtension);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
if(self->fileSizeWarningLimit > 0 && self->files[1].size > self->fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size);
}
}
} else if (self->writingPos == 0) {
// If this is the first write to a brand new disk queue file.
*self->firstPages[1] = *(const Page*)pageData.begin();
}
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
self->files[1].size = std::max( self->files[1].size, self->writingPos + pageData.size() );
toSync->push_back( self->files[1].syncQueue );
waitfor.push_back( self->files[1].f->write( pageData.begin(), pageData.size(), self->writingPos ) );
self->writingPos += pageData.size();
return waitForAll(waitfor);
}
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self, StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
state Promise<Void> pushing, committed;
state Promise<Void> errorPromise = self->error;
state std::string filename = self->files[0].dbgFilename;
state UID dbgid = self->dbgid;
state vector<Reference<SyncQueue>> syncFiles;
state Future<Void> lastCommit = self->lastCommit;
try {
// pushing might need to wait for previous pushes to start (to maintain order) or for
// a previous commit to finish if stall() was called
Future<Void> ready = self->readyToPush;
self->readyToPush = pushing.getFuture();
self->lastCommit = committed.getFuture();
// the first commit must complete before we can pipeline other commits so that we will always have a valid page to binary search to
if(self->isFirstCommit) {
self->isFirstCommit = false;
self->readyToPush = self->lastCommit;
}
wait( ready );
TEST( pageData.size() > sizeof(Page) ); // push more than one page of data
Future<Void> pushed = wait( self->push( pageData, &syncFiles ) );
pushing.send(Void());
ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 );
TEST(2==syncFiles.size()); // push spans both files
wait( pushed );
delete pageMem;
pageMem = 0;
Future<Void> sync = syncFiles[0]->onSync();
for(int i=1; i<syncFiles.size(); i++) sync = sync && syncFiles[i]->onSync();
wait( sync );
wait( lastCommit );
//Calling check_yield instead of yield to avoid a destruction ordering problem in simulation
if(g_network->check_yield(g_network->getCurrentTask())) {
wait(delay(0, g_network->getCurrentTask()));
}
self->updatePopped( poppedPages*sizeof(Page) );
/*TraceEvent("RDQCommitEnd", self->dbgid).detail("DeltaPopped", poppedPages*sizeof(Page)).detail("PoppedCommitted", self->dbg_file0BeginSeq + self->files[0].popped + self->files[1].popped)
.detail("File0Size", self->files[0].size).detail("File1Size", self->files[1].size)
.detail("File0Name", self->files[0].dbgFilename).detail("SyncedFiles", syncFiles.size());*/
committed.send(Void());
} catch (Error& e) {
delete pageMem;
TEST(true); // push error
TEST(2==syncFiles.size()); // push spanning both files error
TraceEvent(SevError, "RDQPushAndCommitError", dbgid).error(e, true).detail("InitialFilename0", filename);
if (errorPromise.canBeSet()) errorPromise.sendError(e);
if (pushing.canBeSet()) pushing.sendError(e);
if (committed.canBeSet()) committed.sendError(e);
throw e;
}
return Void();
}
void updatePopped( int64_t popped ) {
int64_t pop0 = std::min(popped, files[0].size - files[0].popped);
files[0].popped += pop0;
files[1].popped += popped - pop0;
}
ACTOR static Future<Void> setPoppedPage( RawDiskQueue_TwoFiles *self, int file, int64_t page, int64_t debugSeq ) {
self->files[file].popped = page*sizeof(Page);
if (file) self->files[0].popped = self->files[0].size;
else self->files[1].popped = 0;
self->dbg_file0BeginSeq = debugSeq - self->files[1].popped - self->files[0].popped;
//If we are starting in file 1, we truncate file 0 in case it has been corrupted.
// In particular, we are trying to avoid a dropped or corrupted write to the first page of file 0 causing it to be sequenced before file 1,
// when in fact it contains many pages that follow file 1. These ok pages may be incorrectly read if the machine dies after overwritting the
// first page of file 0 and is then recovered
if(file == 1)
wait(self->truncateFile(self, 0, 0));
return Void();
}
ACTOR static Future<Void> openFiles( RawDiskQueue_TwoFiles* self ) {
state vector<Future<Reference<IAsyncFile>>> fs;
for(int i=0; i<2; i++)
fs.push_back( IAsyncFileSystem::filesystem()->open( self->filename(i), IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0 ) );
wait( waitForAllReady(fs) );
// Treatment of errors here is important. If only one of the two files is present
// (due to a power failure during creation or deletion, or administrative error) we don't want to
// open the queue!
if (!fs[0].isError() && !fs[1].isError()) {
// Both files were opened OK: success
} else if ( fs[0].isError() && fs[0].getError().code() == error_code_file_not_found &&
fs[1].isError() && fs[1].getError().code() == error_code_file_not_found )
{
// Neither file was found: we can create a new queue
// OPEN_ATOMIC_WRITE_AND_CREATE defers creation (using a .part file) until the calls to sync() below
TraceEvent("DiskQueueCreate").detail("File0", self->filename(0));
for(int i=0; i<2; i++)
fs[i] = IAsyncFileSystem::filesystem()->open( self->filename(i), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0600 );
// Any error here is fatal
wait( waitForAll(fs) );
// sync on each file to actually create it will be done below
} else {
// One file had a more serious error or one file is present and the other is not. Die.
if (!fs[0].isError() || (fs[1].isError() && fs[1].getError().code() != error_code_file_not_found))
throw fs[1].getError();
else
throw fs[0].getError();
}
// fsync both files. This is necessary to trigger atomic file creation in the creation case above.
// It also permits the recovery code to assume that whatever it reads is durable. Otherwise a prior
// process could have written (but not synchronized) data to the file which we will read but which
// might not survive a reboot. The recovery code assumes otherwise and could corrupt the disk.
vector<Future<Void>> syncs;
for(int i=0; i<fs.size(); i++)
syncs.push_back( fs[i].get()->sync() );
wait(waitForAll(syncs));
// Successfully opened or created; fill in self->files[]
for(int i=0; i<2; i++)
self->files[i].setFile(fs[i].get());
return Void();
}
ACTOR static void shutdown( RawDiskQueue_TwoFiles* self, bool deleteFiles ) {
// Wait for all reads and writes on the file, and all actors referencing self, to be finished
state Error error = success();
try {
wait(success(errorOr(self->lastCommit)));
wait( self->onSafeToDestruct() );
for(int i=0; i<2; i++)
self->files[i].f.clear();
if (deleteFiles) {
TraceEvent("DiskQueueShutdownDeleting", self->dbgid)
.detail("File0", self->filename(0))
.detail("File1", self->filename(1));
wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename(0), false ) );
wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename(1), true ) );
}
TraceEvent("DiskQueueShutdownComplete", self->dbgid)
.detail("DeleteFiles", deleteFiles)
.detail("File0", self->filename(0));
} catch( Error &e ) {
TraceEvent(SevError, "DiskQueueShutdownError", self->dbgid)
.error(e,true)
.detail("Reason", e.code() == error_code_platform_error ? "could not delete database" : "unknown");
error = e;
}
if( error.code() != error_code_actor_cancelled ) {
if (self->stopped.canBeSet()) self->stopped.send(Void());
if (self->error.canBeSet()) self->error.send(Never());
delete self;
}
}
ACTOR static UNCANCELLABLE Future<Standalone<StringRef>> readFirstAndLastPages(RawDiskQueue_TwoFiles* self, compare_pages compare) {
state TrackMe trackMe(self);
try {
// Open both files or create both files
wait( openFiles(self) );
// Get the file sizes
vector<Future<int64_t>> fsize;
for(int i=0; i<2; i++)
fsize.push_back( self->files[i].f->size() );
vector<int64_t> file_sizes = wait( getAll(fsize) );
for(int i=0; i<2; i++) {
// SOMEDAY: If the file size is not a multiple of page size, it may never be shortened. Change this?
self->files[i].size = file_sizes[i] - file_sizes[i] % sizeof(Page);
ASSERT( self->files[i].size % sizeof(Page) == 0 );
}
// Read the first pages
vector<Future<int>> reads;
for(int i=0; i<2; i++)
if( self->files[i].size > 0)
reads.push_back( self->files[i].f->read( self->firstPages[i], sizeof(Page), 0 ) );
wait( waitForAll(reads) );
// Determine which file comes first
if ( compare( self->firstPages[1], self->firstPages[0] ) ) {
std::swap( self->firstPages[0], self->firstPages[1] );
std::swap( self->files[0], self->files[1] );
}
if ( !compare( self->firstPages[0], self->firstPages[0] ) ) {
memset(self->firstPages[0], 0xFF, sizeof(Page));
}
if ( !compare( self->firstPages[1], self->firstPages[1] ) ) {
// Both files are invalid... the queue is empty!
// Begin pushing at the beginning of files[1]
//Truncate both files, since perhaps only the first pages are corrupted. This avoids cases where overwritting the first page and then terminating makes
//subsequent pages valid upon recovery.
vector<Future<Void>> truncates;
for(int i = 0; i < 2; ++i)
if(self->files[i].size > 0)
truncates.push_back(self->truncateFile(self, i, 0));
wait(waitForAll(truncates));
self->files[0].popped = self->files[0].size;
self->files[1].popped = 0;
memset(self->firstPages[1], 0xFF, sizeof(Page));
self->writingPos = 0;
self->readingFile = 2;
return Standalone<StringRef>();
}
// A page in files[1] is "valid" iff compare(self->firstPages[1], page)
// Binary search to find a page in files[1] that is "valid" but the next page is not valid
// Invariant: the page at begin is valid, and the page at end is invalid
state int64_t begin = 0;
state int64_t end = self->files[1].size/sizeof(Page);
state Standalone<StringRef> middlePageAllocation = makeAlignedString(sizeof(Page), sizeof(Page));
state Page *middlePage = (Page*)middlePageAllocation.begin();
while ( begin + 1 != end ) {
state int64_t middle = (begin+end)/2;
ASSERT( middle > begin && middle < end ); // So the loop always changes begin or end
int len = wait( self->files[1].f->read( middlePage, sizeof(Page), middle*sizeof(Page) ) );
ASSERT( len == sizeof(Page) );
bool middleValid = compare( self->firstPages[1], middlePage );
TraceEvent("RDQBS", self->dbgid).detail("Begin", begin).detail("End", end).detail("Middle", middle).detail("Valid", middleValid).detail("File0Name", self->files[0].dbgFilename);
if (middleValid)
begin = middle;
else
end = middle;
}
// Now by the invariant and the loop condition, begin is a valid page and begin+1 is an invalid page
// Check that begin+1 is invalid
int len1 = wait( self->files[1].f->read( middlePage, sizeof(Page), (begin+1)*sizeof(Page) ) );
ASSERT( !(len1 == sizeof(Page) && compare( self->firstPages[1], middlePage )) );
// Read it
int len2 = wait( self->files[1].f->read( middlePage, sizeof(Page), begin*sizeof(Page) ) );
ASSERT( len2 == sizeof(Page) && compare( self->firstPages[1], middlePage ) );
TraceEvent("RDQEndFound", self->dbgid).detail("File0Name", self->files[0].dbgFilename).detail("Pos", begin).detail("FileSize", self->files[1].size);
return middlePageAllocation;
} catch (Error& e) {
bool ok = e.code() == error_code_file_not_found;
TraceEvent(ok ? SevInfo : SevError, "RDQReadFirstAndLastPagesError", self->dbgid).error(e, true).detail("File0Name", self->files[0].dbgFilename);
if (!self->error.isSet()) self->error.sendError(e);
throw;
}
}
ACTOR static Future<Standalone<StringRef>> read(RawDiskQueue_TwoFiles* self, int file, int pageOffset, int nPages) {
state TrackMe trackMe(self);
state const size_t bytesRequested = nPages * sizeof(Page);
state Standalone<StringRef> result = makeAlignedString(sizeof(Page), bytesRequested);
if (file == 1) ASSERT_WE_THINK(pageOffset * sizeof(Page) + bytesRequested <= self->writingPos );
int bytesRead = wait( self->files[file].f->read( mutateString(result), bytesRequested, pageOffset*sizeof(Page) ) );
ASSERT_WE_THINK(bytesRead == bytesRequested);
return result;
}
Future<int> fillReadingBuffer() {
// If we're right at the end of a file...
if ( readingPage*sizeof(Page) >= (size_t)files[readingFile].size ) {
readingFile++;
readingPage = 0;
if (readingFile>=2) {
// Recovery complete
readingBuffer.clear();
writingPos = files[1].size;
return 0;
}
}
// Read up to 1MB into readingBuffer
int len = std::min<int64_t>( (files[readingFile].size/sizeof(Page) - readingPage)*sizeof(Page), BUGGIFY_WITH_PROB(1.0) ? sizeof(Page)*deterministicRandom()->randomInt(1,4) : (1<<20) );
readingBuffer.clear();
readingBuffer.alignReserve( sizeof(Page), len );
void* p = readingBuffer.append(len);
auto pos = readingPage * sizeof(Page);
readingPage += len / sizeof(Page);
ASSERT( int64_t(p) % sizeof(Page) == 0 );
return files[readingFile].f->read( p, len, pos );
}
ACTOR static UNCANCELLABLE Future<Standalone<StringRef>> readNextPage(RawDiskQueue_TwoFiles* self) {
state TrackMe trackMe(self);
try {
ASSERT( self->readingFile < 2 );
ASSERT( self->files[0].f && self->files[1].f );
if (!self->readingBuffer.size()) {
state Future<Void> f = Void();
//if (BUGGIFY) f = delay( deterministicRandom()->random01() * 0.1 );
int read = wait( self->fillReadingBuffer() );
ASSERT( read == self->readingBuffer.size() );
wait(f);
}
if (!self->readingBuffer.size()) return Standalone<StringRef>();
ASSERT( self->readingBuffer.size() >= sizeof(Page) );
Standalone<StringRef> result = self->readingBuffer.pop_front( sizeof(Page) );
return result;
} catch (Error& e) {
TEST(true); // Read next page error
TraceEvent(SevError, "RDQReadNextPageError", self->dbgid).error(e, true).detail("File0Name", self->files[0].dbgFilename);
if (!self->error.isSet()) self->error.sendError(e);
throw;
}
}
ACTOR static UNCANCELLABLE Future<Void> truncateFile(RawDiskQueue_TwoFiles* self, int file, int64_t pos) {
state TrackMe trackMe(self);
TraceEvent("DQTruncateFile", self->dbgid).detail("File", file).detail("Pos", pos).detail("File0Name", self->files[0].dbgFilename);
state Reference<IAsyncFile> f = self->files[file].f; // Hold onto a reference in the off-chance that the DQ is removed from underneath us.
if (pos == 0) {
memset(self->firstPages[file], 0xFF, _PAGE_SIZE);
}
wait( f->zeroRange( pos, self->files[file].size-pos ) );
wait(self->files[file].syncQueue->onSync());
// We intentionally don't return the f->zero future, so that TrackMe is destructed after f->zero finishes.
return Void();
}
ACTOR static Future<Void> truncateBeforeLastReadPage( RawDiskQueue_TwoFiles* self ) {
try {
state int file = self->readingFile;
state int64_t pos = (self->readingPage - self->readingBuffer.size()/sizeof(Page) - 1) * sizeof(Page);
state vector<Future<Void>> commits;
state bool swap = file==0;
TEST( file==0 ); // truncate before last read page on file 0
TEST( file==1 && pos != self->files[1].size ); // truncate before last read page on file 1
self->readingFile = 2;
self->readingBuffer.clear();
self->writingPos = pos;
while (file < 2) {
commits.push_back(self->truncateFile(self, file, pos));
file++;
pos = 0;
}
wait( waitForAll(commits) );
if (swap) {
std::swap(self->files[0], self->files[1]);
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[0].popped = self->files[0].size;
}
return Void();
} catch (Error& e) {
TraceEvent(SevError, "RDQTruncateBeforeLastReadPageError", self->dbgid).error(e).detail("File0Name", self->files[0].dbgFilename);
if (!self->error.isSet()) self->error.sendError(e);
throw;
}
}
};
class DiskQueue : public IDiskQueue, public Tracked<DiskQueue> {
public:
// FIXME: Is setting lastCommittedSeq to -1 instead of 0 necessary?
DiskQueue( std::string basename, std::string fileExtension, UID dbgid, DiskQueueVersion diskQueueVersion, int64_t fileSizeWarningLimit )
: rawQueue( new RawDiskQueue_TwoFiles(basename, fileExtension, dbgid, fileSizeWarningLimit) ), dbgid(dbgid), diskQueueVersion(diskQueueVersion), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0),
nextReadLocation(-1), readBufPage(NULL), readBufPos(0), pushed_page_buffer(NULL), recovered(false), initialized(false), lastCommittedSeq(-1), warnAlwaysForMemory(true)
{
}
virtual location push( StringRef contents ) {
ASSERT( recovered );
uint8_t const* begin = contents.begin();
uint8_t const* end = contents.end();
TEST( contents.size() && pushedPageCount() ); // More than one push between commits
TEST( contents.size()>=4 && pushedPageCount() && backPage().remainingCapacity()<4 ); // Push right at the end of a page, possibly splitting size
while (begin != end) {
if (!pushedPageCount() || !backPage().remainingCapacity()) addEmptyPage();
auto &p = backPage();
int s = std::min<int>( p.remainingCapacity(), end-begin );
memcpy( p.payload + p.payloadSize, begin, s );
p.payloadSize += s;
begin += s;
}
return endLocation();
}
virtual void pop( location upTo ) {
ASSERT( !upTo.hi );
ASSERT( !recovered || upTo.lo <= endLocation() );
// The following ASSERT is NOT part of the intended contract of IDiskQueue, but alerts the user to a known bug where popping
// into uncommitted pages can cause a durability failure.
// FIXME: Remove this ASSERT when popping into uncommitted pages is fixed
if( upTo.lo > lastCommittedSeq ) {
TraceEvent(SevError, "DQPopUncommittedData", dbgid)
.detail("UpTo", upTo)
.detail("LastCommittedSeq", lastCommittedSeq)
.detail("File0Name", rawQueue->files[0].dbgFilename);
}
if (upTo.lo > poppedSeq) {
poppedSeq = upTo.lo;
anyPopped = true;
}
}
virtual Future<Standalone<StringRef>> read(location from, location to, CheckHashes ch) { return read(this, from, to, ch); }
int getMaxPayload() {
return Page::maxPayload;
}
virtual int getCommitOverhead() {
if(!pushedPageCount()) {
if(!anyPopped)
return 0;
return Page::maxPayload;
}
else
return backPage().remainingCapacity();
}
virtual Future<Void> commit() {
ASSERT( recovered );
if (!pushedPageCount()) {
if (!anyPopped) return Void();
addEmptyPage();
}
anyPopped = false;
backPage().popped = poppedSeq;
backPage().zeroPad();
backPage().updateHash();
if( pushedPageCount() >= 8000 ) {
TraceEvent( warnAlwaysForMemory ? SevWarnAlways : SevWarn, "DiskQueueMemoryWarning", dbgid)
.suppressFor(1.0)
.detail("PushedPages", pushedPageCount())
.detail("NextPageSeq", nextPageSeq)
.detail("Details", format("%d pages", pushedPageCount()))
.detail("File0Name", rawQueue->files[0].dbgFilename);
if(g_network->isSimulated())
warnAlwaysForMemory = false;
}
/*TraceEvent("DQCommit", dbgid).detail("Pages", pushedPageCount()).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq)
.detail("RawFile0Size", rawQueue->files[0].size).detail("RawFile1Size", rawQueue->files[1].size).detail("WritingPos", rawQueue->writingPos)
.detail("RawFile0Name", rawQueue->files[0].dbgFilename);*/
lastCommittedSeq = backPage().endSeq();
auto f = rawQueue->pushAndCommit( pushed_page_buffer->ref(), pushed_page_buffer, poppedSeq/sizeof(Page) - lastPoppedSeq/sizeof(Page) );
lastPoppedSeq = poppedSeq;
pushed_page_buffer = 0;
return f;
}
void stall() {
rawQueue->stall();
}
virtual Future<bool> initializeRecovery(location recoverAt) { return initializeRecovery( this, recoverAt ); }
virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
// to be changed to understand the new intiailizeRecovery protocol.
virtual location getNextReadLocation() { return nextReadLocation; }
virtual location getNextCommitLocation() { ASSERT( initialized ); return lastCommittedSeq + sizeof(Page); }
virtual location getNextPushLocation() { ASSERT( initialized ); return endLocation(); }
virtual Future<Void> getError() { return rawQueue->getError(); }
virtual Future<Void> onClosed() { return rawQueue->onClosed(); }
virtual void dispose() {
TraceEvent("DQDestroy", dbgid).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq).detail("File0Name", rawQueue->files[0].dbgFilename);
dispose(this);
}
ACTOR static void dispose(DiskQueue* self) {
wait( self->onSafeToDestruct() );
TraceEvent("DQDestroyDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename);
self->rawQueue->dispose();
delete self;
}
virtual void close() {
TraceEvent("DQClose", dbgid)
.detail("LastPoppedSeq", lastPoppedSeq)
.detail("PoppedSeq", poppedSeq)
.detail("NextPageSeq", nextPageSeq)
.detail("PoppedCommitted", rawQueue->dbg_file0BeginSeq + rawQueue->files[0].popped + rawQueue->files[1].popped)
.detail("File0Name", rawQueue->files[0].dbgFilename);
close(this);
}
ACTOR static void close(DiskQueue* self) {
wait( self->onSafeToDestruct() );
TraceEvent("DQCloseDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename);
self->rawQueue->close();
delete self;
}
virtual StorageBytes getStorageBytes() {
return rawQueue->getStorageBytes();
}
private:
#pragma pack(push, 1)
struct PageHeader {
union {
UID hash;
struct {
uint32_t hash32;
uint32_t _unused;
uint16_t magic;
uint16_t implementationVersion;
};
};
uint64_t seq;
uint64_t popped;
int payloadSize;
};
// The on disk format depends on the size of PageHeader.
static_assert( sizeof(PageHeader) == 36, "PageHeader must be 36 bytes" );
struct Page : PageHeader {
static const int maxPayload = _PAGE_SIZE - sizeof(PageHeader);
uint8_t payload[maxPayload];
DiskQueueVersion diskQueueVersion() const { return static_cast<DiskQueueVersion>(implementationVersion); }
int remainingCapacity() const { return maxPayload - payloadSize; }
uint64_t endSeq() const { return seq + sizeof(PageHeader) + payloadSize; }
UID checksum_hashlittle2() const {
// SOMEDAY: Better hash?
uint32_t part[2] = { 0x12345678, 0xbeefabcd };
hashlittle2( &seq, sizeof(Page)-sizeof(UID), &part[0], &part[1] );
return UID( int64_t(part[0])<<32 | part[1], 0xFDB );
}
uint32_t checksum_crc32c() const {
return crc32c_append( 0xfdbeefdb, (uint8_t*)&_unused, sizeof(Page)-sizeof(uint32_t) );
}
void updateHash() {
switch (diskQueueVersion()) {
case DiskQueueVersion::V0: {
hash = checksum_hashlittle2();
return;
}
case DiskQueueVersion::V1:
default: {
hash32 = checksum_crc32c();
return;
}
}
}
bool checkHash() {
switch (diskQueueVersion()) {
case DiskQueueVersion::V0: {
return hash == checksum_hashlittle2();
}
case DiskQueueVersion::V1: {
return hash32 == checksum_crc32c();
}
default:
return false;
}
}
void zeroPad() {
memset( payload+payloadSize, 0, maxPayload-payloadSize );
}
};
static_assert( sizeof(Page) == _PAGE_SIZE, "Page must be 4k" );
#pragma pack(pop)
loc_t endLocation() const { return pushedPageCount() ? backPage().endSeq() : nextPageSeq; }
void addEmptyPage() {
if (pushedPageCount()) {
backPage().updateHash();
ASSERT( backPage().payloadSize == Page::maxPayload );
}
//pushed_pages.resize( pushed_pages.arena(), pushed_pages.size()+1 );
if (!pushed_page_buffer) pushed_page_buffer = new StringBuffer( dbgid );
pushed_page_buffer->alignReserve( sizeof(Page), pushed_page_buffer->size() + sizeof(Page) );
pushed_page_buffer->append( sizeof(Page) );
ASSERT( nextPageSeq%sizeof(Page)==0 );
auto& p = backPage();
memset(&p, 0, sizeof(Page)); // FIXME: unnecessary?
p.magic = 0xFDB;
switch (diskQueueVersion) {
case DiskQueueVersion::V0:
p.implementationVersion = 0;
break;
case DiskQueueVersion::V1:
p.implementationVersion = 1;
break;
}
p.payloadSize = 0;
p.seq = nextPageSeq;
nextPageSeq += sizeof(Page);
p.popped = poppedSeq;
if (pushedPageCount() == 8000) {
TraceEvent("DiskQueueHighPageCount", dbgid)
.detail("PushedPages", pushedPageCount())
.detail("NextPageSeq", nextPageSeq)
.detail("File0Name", rawQueue->files[0].dbgFilename);
}
}
ACTOR static void verifyCommit(DiskQueue* self, Future<Void> commitSynced, StringBuffer* buffer, loc_t start, loc_t end) {
state TrackMe trackme(self);
try {
wait( commitSynced );
Standalone<StringRef> pagedData = wait( readPages(self, start, end) );
const int startOffset = start % _PAGE_SIZE;
const int dataLen = end - start;
ASSERT( pagedData.substr(startOffset, dataLen).compare( buffer->ref().substr(0, dataLen) ) == 0 );
} catch (Error& e) {
if (e.code() != error_code_io_error) {
delete buffer;
throw;
}
}
delete buffer;
}
ACTOR static Future<Standalone<StringRef>> readPages(DiskQueue *self, location start, location end) {
state TrackMe trackme(self);
state int fromFile;
state int toFile;
state int64_t fromPage;
state int64_t toPage;
state uint64_t file0size = self->rawQueue->files[0].size ? self->firstPages(1).seq - self->firstPages(0).seq : self->firstPages(1).seq;
ASSERT(end > start);
ASSERT(start.lo >= self->firstPages(0).seq || start.lo >= self->firstPages(1).seq);
self->findPhysicalLocation(start.lo, &fromFile, &fromPage, nullptr);
self->findPhysicalLocation(end.lo-1, &toFile, &toPage, nullptr);
if (fromFile == 0) { ASSERT( fromPage < file0size / _PAGE_SIZE ); }
if (toFile == 0) { ASSERT( toPage < file0size / _PAGE_SIZE ); }
// FIXME I think there's something with nextReadLocation we can do here when initialized && !recovered.
if (fromFile == 1 && self->recovered) { ASSERT( fromPage < self->rawQueue->writingPos / _PAGE_SIZE ); }
if (toFile == 1 && self->recovered) { ASSERT( toPage < self->rawQueue->writingPos / _PAGE_SIZE ); }
if (fromFile == toFile) {
ASSERT(toPage >= fromPage);
Standalone<StringRef> pagedData = wait( self->rawQueue->read( fromFile, fromPage, toPage - fromPage + 1 ) );
if ( std::min(self->firstPages(0).seq, self->firstPages(1).seq) > start.lo ) {
// Simulation allows for reads to be delayed and executed after overlapping subsequent
// write operations. This means that by the time our read was executed, it's possible
// that both disk queue files have been completely overwritten.
// I'm not clear what is the actual contract for read/write in this case, so simulation
// might be a bit overly aggressive here, but it's behavior we need to tolerate.
throw io_error();
}
ASSERT( ((Page*)pagedData.begin())->seq == pageFloor(start.lo) );
ASSERT(pagedData.size() == (toPage - fromPage + 1) * _PAGE_SIZE );
ASSERT( ((Page*)pagedData.end() - 1)->seq == pageFloor(end.lo - 1) );
return pagedData;
} else {
ASSERT(fromFile == 0);
state Standalone<StringRef> firstChunk;
state Standalone<StringRef> secondChunk;
wait( store(firstChunk, self->rawQueue->read( fromFile, fromPage, ( file0size / sizeof(Page) ) - fromPage )) &&
store(secondChunk, self->rawQueue->read( toFile, 0, toPage + 1 )) );
if ( std::min(self->firstPages(0).seq, self->firstPages(1).seq) > start.lo ) {
// See above.
throw io_error();
}
ASSERT(firstChunk.size() == ( ( file0size / sizeof(Page) ) - fromPage ) * _PAGE_SIZE );
ASSERT( ((Page*)firstChunk.begin())->seq == pageFloor(start.lo) );
ASSERT(secondChunk.size() == (toPage + 1) * _PAGE_SIZE);
ASSERT( ((Page*)secondChunk.end() - 1)->seq == pageFloor(end.lo - 1) );
return firstChunk.withSuffix(secondChunk);
}
}
ACTOR static Future<Standalone<StringRef>> read(DiskQueue *self, location start, location end, CheckHashes ch) {
// This `state` is unnecessary, but works around pagedData wrongly becoming const
// due to the actor compiler.
state Standalone<StringRef> pagedData = wait(readPages(self, start, end));
ASSERT(start.lo % sizeof(Page) == 0 ||
start.lo % sizeof(Page) >= sizeof(PageHeader));
int startingOffset = start.lo % sizeof(Page);
if (startingOffset > 0) startingOffset -= sizeof(PageHeader);
ASSERT(end.lo % sizeof(Page) == 0 ||
end.lo % sizeof(Page) > sizeof(PageHeader));
int endingOffset = end.lo % sizeof(Page);
if (endingOffset == 0) endingOffset = sizeof(Page);
if (endingOffset > 0) endingOffset -= sizeof(PageHeader);
if (pageFloor(end.lo-1) == pageFloor(start.lo)) {
// start and end are on the same page
ASSERT(pagedData.size() == sizeof(Page));
Page *data = reinterpret_cast<Page*>(const_cast<uint8_t*>(pagedData.begin()));
if (ch == CheckHashes::YES && !data->checkHash()) throw io_error();
if (ch == CheckHashes::NO && data->payloadSize > Page::maxPayload) throw io_error();
pagedData.contents() = pagedData.substr(sizeof(PageHeader) + startingOffset, endingOffset - startingOffset);
return pagedData;
} else {
// Reusing pagedData wastes # of pages * sizeof(PageHeader) bytes, but means
// we don't have to double allocate in a hot, memory hungry call.
uint8_t *buf = mutateString(pagedData);
Page *data = reinterpret_cast<Page*>(const_cast<uint8_t*>(pagedData.begin()));
if (ch == CheckHashes::YES && !data->checkHash()) throw io_error();
if (ch == CheckHashes::NO && data->payloadSize > Page::maxPayload) throw io_error();
// Only start copying from `start` in the first page.
if( data->payloadSize > startingOffset ) {
const int length = data->payloadSize-startingOffset;
memmove(buf, data->payload+startingOffset, length);
buf += length;
}
data++;
if (ch == CheckHashes::YES && !data->checkHash()) throw io_error();
if (ch == CheckHashes::NO && data->payloadSize > Page::maxPayload) throw io_error();
// Copy all the middle pages
while (data->seq != pageFloor(end.lo-1)) {
// These pages can have varying amounts of data, as pages with partial
// data will be zero-filled when commit is called.
const int length = data->payloadSize;
memmove(buf, data->payload, length);
buf += length;
data++;
if (ch == CheckHashes::YES && !data->checkHash()) throw io_error();
if (ch == CheckHashes::NO && data->payloadSize > Page::maxPayload) throw io_error();
}
// Copy only until `end` in the last page.
const int length = data->payloadSize;
memmove(buf, data->payload, std::min(endingOffset, length));
buf += std::min(endingOffset, length);
memset(buf, 0, pagedData.size() - (buf - pagedData.begin()));
Standalone<StringRef> unpagedData = pagedData.substr(0, buf - pagedData.begin());
return unpagedData;
}
}
void readFromBuffer( StringBuffer* result, int* bytes ) {
// extract up to bytes from readBufPage into result
int len = std::min( readBufPage->payloadSize - readBufPos, *bytes );
if (len<=0) return;
result->append( StringRef(readBufPage->payload+readBufPos, len) );
readBufPos += len;
*bytes -= len;
nextReadLocation += len;
}
ACTOR static Future<Standalone<StringRef>> readNext( DiskQueue *self, int bytes ) {
state StringBuffer result( self->dbgid );
ASSERT(bytes >= 0);
result.clearReserve(bytes);
ASSERT( !self->recovered );
if (!self->initialized) {
bool recoveryComplete = wait( initializeRecovery(self, 0) );
if (recoveryComplete) {
ASSERT( self->poppedSeq <= self->endLocation() );
return Standalone<StringRef>();
}
}
loop {
if (self->readBufPage) {
self->readFromBuffer( &result, &bytes );
// if done, return
if (!bytes) return result.str;
ASSERT( self->readBufPos == self->readBufPage->payloadSize );
self->readBufPage = 0;
self->nextReadLocation += sizeof(Page) - self->readBufPos;
self->readBufPos = 0;
}
Standalone<StringRef> page = wait( self->rawQueue->readNextPage() );
if (!page.size()) {
TraceEvent("DQRecEOF", self->dbgid).detail("NextReadLocation", self->nextReadLocation).detail("File0Name", self->rawQueue->files[0].dbgFilename);
break;
}
ASSERT( page.size() == sizeof(Page) );
self->readBufArena = page.arena();
self->readBufPage = (Page*)page.begin();
if (!self->readBufPage->checkHash() || self->readBufPage->seq < pageFloor(self->nextReadLocation)) {
TraceEvent("DQRecInvalidPage", self->dbgid).detail("NextReadLocation", self->nextReadLocation).detail("HashCheck", self->readBufPage->checkHash())
.detail("Seq", self->readBufPage->seq).detail("Expect", pageFloor(self->nextReadLocation)).detail("File0Name", self->rawQueue->files[0].dbgFilename);
wait( self->rawQueue->truncateBeforeLastReadPage() );
break;
}
//TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename);
ASSERT( self->readBufPage->seq == pageFloor(self->nextReadLocation) );
self->lastPoppedSeq = self->readBufPage->popped;
}
// Recovery complete.
// The fully durable popped point is self->lastPoppedSeq; tell the raw queue that.
int f; int64_t p;
TEST( self->lastPoppedSeq/sizeof(Page) != self->poppedSeq/sizeof(Page) ); // DiskQueue: Recovery popped position not fully durable
self->findPhysicalLocation( self->lastPoppedSeq, &f, &p, "lastPoppedSeq" );
wait(self->rawQueue->setPoppedPage( f, p, pageFloor(self->lastPoppedSeq) ));
// Writes go at the end of our reads (but on the next page)
self->nextPageSeq = pageFloor(self->nextReadLocation);
if (self->nextReadLocation % sizeof(Page) > sizeof(PageHeader)) self->nextPageSeq += sizeof(Page);
TraceEvent("DQRecovered", self->dbgid).detail("LastPoppedSeq", self->lastPoppedSeq).detail("PoppedSeq", self->poppedSeq).detail("NextPageSeq", self->nextPageSeq).detail("File0Name", self->rawQueue->files[0].dbgFilename);
self->recovered = true;
ASSERT( self->poppedSeq <= self->endLocation() );
TEST( result.size() == 0 ); // End of queue at border between reads
TEST( result.size() != 0 ); // Partial read at end of queue
//The next read location isn't necessarily the end of the last commit, but this is sufficient for helping us check an ASSERTion
self->lastCommittedSeq = self->nextReadLocation;
return result.str;
}
ACTOR static Future<bool> initializeRecovery( DiskQueue* self, location recoverAt ) {
if (self->initialized) {
return self->recovered;
}
Standalone<StringRef> lastPageData = wait( self->rawQueue->readFirstAndLastPages( &comparePages ) );
self->initialized = true;
if (!lastPageData.size()) {
// There are no valid pages, so apparently this is a completely empty queue
self->nextReadLocation = 0;
self->lastCommittedSeq = 0;
self->recovered = true;
return true;
}
Page* lastPage = (Page*)lastPageData.begin();
self->poppedSeq = lastPage->popped;
if (self->diskQueueVersion >= DiskQueueVersion::V1) {
self->nextReadLocation = std::max(recoverAt.lo, self->poppedSeq);
} else {
self->nextReadLocation = lastPage->popped;
}
/*
state std::auto_ptr<Page> testPage(new Page);
state int fileNum;
for( fileNum=0; fileNum<2; fileNum++) {
state int sizeNum;
for( sizeNum=0; sizeNum < self->rawQueue->files[fileNum].size; sizeNum += sizeof(Page) ) {
wait(success( self->rawQueue->files[fileNum].f->read( testPage.get(), sizeof(Page), sizeNum ) ));
TraceEvent("PageData").detail("File", self->rawQueue->files[fileNum].dbgFilename).detail("SizeNum", sizeNum).detail("Seq", testPage->seq).detail("Hash", testPage->checkHash()).detail("Popped", testPage->popped);
}
}
*/
int file; int64_t page;
self->findPhysicalLocation( self->nextReadLocation, &file, &page, "FirstReadLocation" );
self->rawQueue->setStartPage( file, page );
self->readBufPos = self->nextReadLocation % sizeof(Page) - sizeof(PageHeader);
if (self->readBufPos < 0) { self->nextReadLocation -= self->readBufPos; self->readBufPos = 0; }
TraceEvent("DQRecStart", self->dbgid).detail("ReadBufPos", self->readBufPos).detail("NextReadLoc", self->nextReadLocation).detail("Popped", self->poppedSeq).detail("MinRecoverAt", recoverAt).detail("File0Name", self->rawQueue->files[0].dbgFilename);
return false;
}
Page& firstPages(int i) {
ASSERT( initialized );
return *(Page*)rawQueue->firstPages[i];
}
void findPhysicalLocation( loc_t loc, int* file, int64_t* page, const char* context ) {
bool ok = false;
if (context)
TraceEvent(SevInfo, "FindPhysicalLocation", dbgid)
.detail("Page0Valid", firstPages(0).checkHash())
.detail("Page0Seq", firstPages(0).seq)
.detail("Page1Valid", firstPages(1).checkHash())
.detail("Page1Seq", firstPages(1).seq)
.detail("Location", loc)
.detail("Context", context)
.detail("File0Name", rawQueue->files[0].dbgFilename);
for(int i = 1; i >= 0; i--) {
ASSERT_WE_THINK( firstPages(i).checkHash() );
if ( firstPages(i).seq <= (size_t)loc ) {
*file = i;
*page = (loc - firstPages(i).seq)/sizeof(Page);
if (context)
TraceEvent("FoundPhysicalLocation", dbgid)
.detail("PageIndex", i)
.detail("PageLocation", *page)
.detail("SizeofPage", sizeof(Page))
.detail("PageSequence", firstPages(i).seq)
.detail("Location", loc)
.detail("Context", context)
.detail("File0Name", rawQueue->files[0].dbgFilename);
ok = true;
break;
}
}
if (!ok)
TraceEvent(SevError, "DiskQueueLocationError", dbgid)
.detail("Page0Valid", firstPages(0).checkHash())
.detail("Page0Seq", firstPages(0).seq)
.detail("Page1Valid", firstPages(1).checkHash())
.detail("Page1Seq", firstPages(1).seq)
.detail("Location", loc)
.detail("Context", context ? context : "")
.detail("File0Name", rawQueue->files[0].dbgFilename);
ASSERT( ok );
}
// isValid(firstPage) == compare(firstPage, firstPage)
// isValid(otherPage) == compare(firstPage, otherPage)
// Swap file1, file2 if comparePages( file2.firstPage, file1.firstPage )
static bool comparePages( void* v1, void* v2 ) {
Page* p1 = (Page*)v1; Page* p2 = (Page*)v2;
return p2->checkHash() && (p2->seq >= p1->seq || !p1->checkHash());
}
RawDiskQueue_TwoFiles *rawQueue;
UID dbgid;
DiskQueueVersion diskQueueVersion;
bool anyPopped; // pop() has been called since the most recent call to commit()
bool warnAlwaysForMemory;
loc_t nextPageSeq, poppedSeq;
loc_t lastPoppedSeq; // poppedSeq the last time commit was called
loc_t lastCommittedSeq;
// Buffer of pushed pages that haven't been committed. The last one (backPage()) is still mutable.
StringBuffer* pushed_page_buffer;
Page& backPage() {
ASSERT( pushedPageCount() );
return ((Page*)pushed_page_buffer->ref().end())[-1];
}
Page const& backPage() const { return ((Page*)pushed_page_buffer->ref().end())[-1]; }
int pushedPageCount() const { return pushed_page_buffer ? pushed_page_buffer->size() / sizeof(Page) : 0; }
// Recovery state
bool recovered;
bool initialized;
loc_t nextReadLocation;
Arena readBufArena;
Page* readBufPage;
int readBufPos;
};
//A class wrapping DiskQueue which durably allows uncommitted data to be popped
//This works by performing two commits when uncommitted data is popped:
// Commit 1 - pop only previously committed data and push new data
// Commit 2 - finish pop into uncommitted data
class DiskQueue_PopUncommitted : public IDiskQueue {
public:
DiskQueue_PopUncommitted( std::string basename, std::string fileExtension, UID dbgid, DiskQueueVersion diskQueueVersion, int64_t fileSizeWarningLimit ) : queue(new DiskQueue(basename, fileExtension, dbgid, diskQueueVersion, fileSizeWarningLimit)), pushed(0), popped(0), committed(0) { };
//IClosable
Future<Void> getError() { return queue->getError(); }
Future<Void> onClosed() { return queue->onClosed(); }
void dispose() { queue->dispose(); delete this; }
void close() { queue->close(); delete this; }
//IDiskQueue
Future<bool> initializeRecovery(location recoverAt) { return queue->initializeRecovery(recoverAt); }
Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
virtual location getNextReadLocation() { return queue->getNextReadLocation(); }
virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { return queue->read( start, end, ch ); }
virtual location getNextCommitLocation() { return queue->getNextCommitLocation(); }
virtual location getNextPushLocation() { return queue->getNextPushLocation(); }
virtual location push( StringRef contents ) {
pushed = queue->push(contents);
return pushed;
}
virtual void pop( location upTo ) {
popped = std::max(popped, upTo);
ASSERT_WE_THINK(committed >= popped);
queue->pop(std::min(committed, popped));
}
virtual int getCommitOverhead() {
return queue->getCommitOverhead() + (popped > committed ? queue->getMaxPayload() : 0);
}
Future<Void> commit() {
location pushLocation = pushed;
location popLocation = popped;
Future<Void> commitFuture = queue->commit();
bool updatePop = popLocation > committed;
committed = pushLocation;
if(updatePop) {
ASSERT_WE_THINK(false);
ASSERT(popLocation <= committed);
queue->stall(); // Don't permit this pipelined commit to write anything to disk until the previous commit is totally finished
pop(popLocation);
commitFuture = commitFuture && queue->commit();
}
else
TEST(true); //No uncommitted data was popped
return commitFuture;
}
virtual StorageBytes getStorageBytes() { return queue->getStorageBytes(); }
private:
DiskQueue *queue;
location pushed;
location popped;
location committed;
ACTOR static Future<Standalone<StringRef>> readNext( DiskQueue_PopUncommitted *self, int bytes ) {
Standalone<StringRef> str = wait(self->queue->readNext(bytes));
if(str.size() < bytes)
self->pushed = self->getNextReadLocation();
return str;
}
};
IDiskQueue* openDiskQueue( std::string basename, std::string ext, UID dbgid, DiskQueueVersion dqv, int64_t fileSizeWarningLimit ) {
return new DiskQueue_PopUncommitted( basename, ext, dbgid, dqv, fileSizeWarningLimit );
}