Fix a variety of problems stemming from a wait() being added to push().

And that this code was previously insufficiently tested.
This commit is contained in:
Alex Miller 2019-05-10 14:46:26 -10:00
parent fbc4e7b351
commit c502ed3d15
3 changed files with 28 additions and 13 deletions

View File

@ -265,7 +265,7 @@ public:
result = fallocate( fd, 0, 0, size);
if (result != 0) {
int fallocateErrCode = errno;
TraceEvent("AsyncFileKAIOAllocateError").detail("Fd",fd).detail("Filename", filename).GetLastError();
TraceEvent("AsyncFileKAIOAllocateError").detail("Fd",fd).detail("Filename", filename).detail("Size", size).GetLastError();
if ( fallocateErrCode == EOPNOTSUPP ) {
// Mark fallocate as unsupported. Try again with truncate.
ctx.fallocateSupported = false;

View File

@ -604,11 +604,16 @@ private:
if (randLog)
fprintf( randLog, "SFT1 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), size );
if (size == 0) {
// KAIO will return EINVAL, as len==0 is an error.
throw io_error();
}
if(self->delayOnWrite)
wait( waitUntilDiskReady( self->diskParameters, 0 ) );
if( _chsize( self->h, (long) size ) == -1 ) {
TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 6).detail("Filename", self->filename).detail("Error", strerror(errno)).detail("Size", size).detail("Fd", self->h);
TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 6).detail("Filename", self->filename).detail("Size", size).detail("Fd", self->h).GetLastError();
throw io_error();
}

View File

@ -158,13 +158,13 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
public:
RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
readingFile(-1), readingPage(-1), pushlock(1), writingPos(-1), dbgid(dbgid),
dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer( dbgid ),
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true)
{
if (BUGGIFY)
fileExtensionBytes = 1<<10 * g_random->randomSkewedUInt32( 1, 40<<10 );
fileExtensionBytes = _PAGE_SIZE * g_random->randomSkewedUInt32( 1, 10<<10 );
if (BUGGIFY)
fileShrinkBytes = _PAGE_SIZE * g_random->randomSkewedUInt32( 1, 10<<10 );
files[0].dbgFilename = filename(0);
@ -261,6 +261,7 @@ public:
int readingFile; // i if the next page after readingBuffer should be read from files[i], 2 if recovery is complete
int64_t readingPage; // Page within readingFile that is the next page after readingBuffer
FlowLock pushlock;
int64_t writingPos; // Position within files[1] that will be next written
int64_t fileExtensionBytes;
@ -293,7 +294,7 @@ public:
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
incrementalTruncate( toReplace );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED, 0 ) );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0600 ) );
state Reference<IAsyncFile> replacement = _replacement;
wait( replacement->sync() );
@ -306,6 +307,11 @@ public:
}
ACTOR static Future<Void> push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
state TrackMe trackMe(self);
wait( self->pushlock.take(g_network->getCurrentTask(), 1) );
state FlowLock::Releaser releaser(self->pushlock, 1);
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
ASSERT( self->readingFile == 2 );
@ -333,23 +339,28 @@ public:
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[1].popped = 0;
self->writingPos = 0;
*self->firstPages[1] = *(const Page*)pageData.begin();
const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes);
const int64_t desiredMaxFileSize = std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 );
const bool frivolouslyTruncate = BUGGIFY_WITH_PROB(0.001);
const int64_t desiredMaxFileSize = pageCeiling( std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 ) );
const bool frivolouslyTruncate = BUGGIFY_WITH_PROB(0.1);
if (self->files[1].size > desiredMaxFileSize || frivolouslyTruncate) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes );
int64_t maxShrink = pageFloor( std::max( self->files[1].size - desiredMaxFileSize, self->fileShrinkBytes ) );
if ((maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES >
SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) ||
SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) ||
(frivolouslyTruncate && g_random->random01() < 0.3)) {
TEST(true); // Replacing DiskQueue file
TraceEvent("DiskQueueReplaceFile", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", self->files[1].size).detail("ElidedTruncateSize", maxShrink);
Reference<IAsyncFile> newFile = wait( replaceFile(self->files[1].f) );
self->files[1].setFile(newFile);
self->files[1].size = 0;
waitfor.push_back( self->files[1].f->truncate( self->fileExtensionBytes ) );
self->files[1].size = self->fileExtensionBytes;
} else {
const int64_t startingSize = self->files[1].size;
self->files[1].size -= std::min(maxShrink, self->files[1].size);
self->files[1].size = std::max(self->files[1].size, self->fileExtensionBytes);
TraceEvent("DiskQueueTruncate", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", startingSize).detail("NewFileSize", self->files[1].size);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
}
@ -365,9 +376,8 @@ public:
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size);
}
}
}
if (self->writingPos == 0) {
} else if (self->writingPos == 0) {
// If this is the first write to a brand new disk queue file.
*self->firstPages[1] = *(const Page*)pageData.begin();
}