Merge pull request #1545 from alexmiller-apple/tstlog-6.1

Prefer re-creating a DiskQueue file rather than performing a large truncate
This commit is contained in:
Evan Tschannen 2019-05-08 15:39:27 -07:00 committed by GitHub
commit e8ef401aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 42 deletions

View File

@ -125,6 +125,7 @@ Fixes only impacting 6.1.0+
* The background actor which removes redundant teams could leave data unbalanced. [6.1.3] `(PR #1479) <https://github.com/apple/foundationdb/pull/1479>`_
* The transaction log spill-by-reference policy could read too much data from disk. [6.1.5] `(PR #1527) <https://github.com/apple/foundationdb/pull/1527>`_
* Memory tracking trace events could cause the program to crash when called from inside a trace event. [6.1.5] `(PR #1541) <https://github.com/apple/foundationdb/pull/1541>`_
* TLogs will replace a large file with an empty file rather than doing a large truncate operation. [6.1.5] `(PR #1545) <https://github.com/apple/foundationdb/pull/1545>`_
Earlier release notes
---------------------

View File

@ -270,68 +270,106 @@ public:
Future<Void> truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); }
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>& toSync) {
// FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile().
ACTOR static void incrementalTruncate(Reference<IAsyncFile> file) {
state int64_t remainingFileSize = wait( file->size() );
for( ; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT ){
wait(file->truncate(remainingFileSize));
wait(file->sync());
wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL));
}
TraceEvent("DiskQueueReplaceTruncateEnded").detail("Filename", file->getFilename());
}
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
incrementalTruncate( toReplace );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0 ) );
state Reference<IAsyncFile> replacement = _replacement;
wait( replacement->sync() );
return replacement;
}
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
return push( this, pageData, toSync );
}
ACTOR static Future<Void> push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
ASSERT( readingFile == 2 );
ASSERT( self->readingFile == 2 );
ASSERT( pageData.size() % _PAGE_SIZE == 0 );
ASSERT( int64_t(pageData.begin()) % _PAGE_SIZE == 0 );
ASSERT( writingPos % _PAGE_SIZE == 0 );
ASSERT( files[0].size % _PAGE_SIZE == 0 && files[1].size % _PAGE_SIZE == 0 );
ASSERT( self->writingPos % _PAGE_SIZE == 0 );
ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 );
vector<Future<Void>> waitfor;
state vector<Future<Void>> waitfor;
if (pageData.size() + writingPos > files[1].size) {
if ( files[0].popped == files[0].size ) {
// Finish files[1] and swap
int p = files[1].size - writingPos;
if (pageData.size() + self->writingPos > self->files[1].size) {
if ( self->files[0].popped == self->files[0].size ) {
// Finish self->files[1] and swap
int p = self->files[1].size - self->writingPos;
if(p > 0) {
toSync.push_back( files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", p);*/
waitfor.push_back( files[1].f->write( pageData.begin(), p, writingPos ) );
toSync->push_back( self->files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
waitfor.push_back( self->files[1].f->write( pageData.begin(), p, self->writingPos ) );
pageData = pageData.substr( p );
}
dbg_file0BeginSeq += files[0].size;
std::swap(files[0], files[1]);
std::swap(firstPages[0], firstPages[1]);
files[1].popped = 0;
writingPos = 0;
self->dbg_file0BeginSeq += self->files[0].size;
std::swap(self->files[0], self->files[1]);
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[1].popped = 0;
self->writingPos = 0;
const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes);
if (files[1].size > activeDataVolume) {
// Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes );
files[1].size -= maxShrink;
waitfor.push_back( files[1].f->truncate( files[1].size ) );
const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes);
const int64_t desiredMaxFileSize = std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 );
if (self->files[1].size > desiredMaxFileSize) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes );
if (maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES >
SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) {
TEST(true); // Replacing DiskQueue file
TraceEvent("DiskQueueReplaceFile", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", self->files[1].size).detail("ElidedTruncateSize", maxShrink);
Reference<IAsyncFile> newFile = wait( replaceFile(self->files[1].f) );
self->files[1].setFile(newFile);
self->files[1].size = 0;
} else {
self->files[1].size -= maxShrink;
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
}
} else {
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("ExtensionBytes", fileExtensionBytes);*/
int64_t minExtension = pageData.size() + writingPos - files[1].size;
files[1].size += std::min(std::max(fileExtensionBytes, minExtension), files[0].size+files[1].size+minExtension);
waitfor.push_back( files[1].f->truncate( files[1].size ) );
int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size;
self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension), self->files[0].size+self->files[1].size+minExtension);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
if(fileSizeWarningLimit > 0 && files[1].size > fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", dbgid).suppressFor(1.0).detail("Filename", filename(1)).detail("Size", files[1].size);
if(self->fileSizeWarningLimit > 0 && self->files[1].size > self->fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size);
}
}
}
if (writingPos == 0) {
*firstPages[1] = *(const Page*)pageData.begin();
if (self->writingPos == 0) {
*self->firstPages[1] = *(const Page*)pageData.begin();
}
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", pageData.size());*/
files[1].size = std::max( files[1].size, writingPos + pageData.size() );
toSync.push_back( files[1].syncQueue );
waitfor.push_back( files[1].f->write( pageData.begin(), pageData.size(), writingPos ) );
writingPos += pageData.size();
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
self->files[1].size = std::max( self->files[1].size, self->writingPos + pageData.size() );
toSync->push_back( self->files[1].syncQueue );
waitfor.push_back( self->files[1].f->write( pageData.begin(), pageData.size(), self->writingPos ) );
self->writingPos += pageData.size();
return waitForAll(waitfor);
wait( waitForAll(waitfor) );
return Void();
}
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self, StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
@ -358,11 +396,11 @@ public:
TEST( pageData.size() > sizeof(Page) ); // push more than one page of data
Future<Void> pushed = self->push( pageData, syncFiles );
Future<Void> pushed = self->push( pageData, &syncFiles );
pushing.send(Void());
wait( pushed );
ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 );
TEST(2==syncFiles.size()); // push spans both files
wait( pushed );
delete pageMem;
pageMem = 0;

View File

@ -75,6 +75,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH, 16<<10 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH = 500;
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_EXTENTS, 1<<10 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_EXTENTS = 0;
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );

View File

@ -79,6 +79,7 @@ public:
int64_t TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH;
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_EXTENTS;
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;