From a269a784cc4b8f9db51374942dae70189cf34128 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Tue, 7 May 2019 13:48:41 -1000 Subject: [PATCH 1/5] Convert push() into an actor. --- fdbserver/DiskQueue.actor.cpp | 85 ++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index bf5eb3a6bb..1be449d0a2 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -270,68 +270,73 @@ public: Future truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); } - Future push(StringRef pageData, vector>& toSync) { + Future push(StringRef pageData, vector>* toSync) { + return push( this, pageData, toSync ); + } + + ACTOR static Future push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector>* toSync) { // Write the given data to the queue files, swapping or extending them if necessary. // Don't do any syncs, but push the modified file(s) onto toSync. - ASSERT( readingFile == 2 ); + ASSERT( self->readingFile == 2 ); ASSERT( pageData.size() % _PAGE_SIZE == 0 ); ASSERT( int64_t(pageData.begin()) % _PAGE_SIZE == 0 ); - ASSERT( writingPos % _PAGE_SIZE == 0 ); - ASSERT( files[0].size % _PAGE_SIZE == 0 && files[1].size % _PAGE_SIZE == 0 ); + ASSERT( self->writingPos % _PAGE_SIZE == 0 ); + ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 ); vector> waitfor; - if (pageData.size() + writingPos > files[1].size) { - if ( files[0].popped == files[0].size ) { - // Finish files[1] and swap - int p = files[1].size - writingPos; + if (pageData.size() + self->writingPos > self->files[1].size) { + if ( self->files[0].popped == self->files[0].size ) { + // Finish self->files[1] and swap + int p = self->files[1].size - self->writingPos; if(p > 0) { - toSync.push_back( files[1].syncQueue ); - /*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size) - .detail("WritingPos", writingPos).detail("WritingBytes", p);*/ - waitfor.push_back( files[1].f->write( pageData.begin(), p, writingPos ) ); + toSync->push_back( self->files[1].syncQueue ); + /*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size) + .detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/ + waitfor.push_back( self->files[1].f->write( pageData.begin(), p, self->writingPos ) ); pageData = pageData.substr( p ); } - dbg_file0BeginSeq += files[0].size; - std::swap(files[0], files[1]); - std::swap(firstPages[0], firstPages[1]); - files[1].popped = 0; - writingPos = 0; + self->dbg_file0BeginSeq += self->files[0].size; + std::swap(self->files[0], self->files[1]); + std::swap(self->firstPages[0], self->firstPages[1]); + self->files[1].popped = 0; + self->writingPos = 0; - const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes); - if (files[1].size > activeDataVolume) { - // Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes - int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes ); - files[1].size -= maxShrink; - waitfor.push_back( files[1].f->truncate( files[1].size ) ); + const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes); + if (self->files[1].size > activeDataVolume) { + // Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes + int64_t maxShrink = std::max( pageFloor(self->files[1].size - activeDataVolume), self->fileShrinkBytes ); + self->files[1].size -= maxShrink; + waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) ); } } else { - // Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes. - /*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size) + // Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes. + /*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size) .detail("ExtensionBytes", fileExtensionBytes);*/ - int64_t minExtension = pageData.size() + writingPos - files[1].size; - files[1].size += std::min(std::max(fileExtensionBytes, minExtension), files[0].size+files[1].size+minExtension); - waitfor.push_back( files[1].f->truncate( files[1].size ) ); + int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size; + self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension), self->files[0].size+self->files[1].size+minExtension); + waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) ); - if(fileSizeWarningLimit > 0 && files[1].size > fileSizeWarningLimit) { - TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", dbgid).suppressFor(1.0).detail("Filename", filename(1)).detail("Size", files[1].size); + if(self->fileSizeWarningLimit > 0 && self->files[1].size > self->fileSizeWarningLimit) { + TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size); } } } - if (writingPos == 0) { - *firstPages[1] = *(const Page*)pageData.begin(); + if (self->writingPos == 0) { + *self->firstPages[1] = *(const Page*)pageData.begin(); } - /*TraceEvent("RDQWrite", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size) - .detail("WritingPos", writingPos).detail("WritingBytes", pageData.size());*/ - files[1].size = std::max( files[1].size, writingPos + pageData.size() ); - toSync.push_back( files[1].syncQueue ); - waitfor.push_back( files[1].f->write( pageData.begin(), pageData.size(), writingPos ) ); - writingPos += pageData.size(); + /*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size) + .detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/ + self->files[1].size = std::max( self->files[1].size, self->writingPos + pageData.size() ); + toSync->push_back( self->files[1].syncQueue ); + waitfor.push_back( self->files[1].f->write( pageData.begin(), pageData.size(), self->writingPos ) ); + self->writingPos += pageData.size(); - return waitForAll(waitfor); + wait( waitForAll(waitfor) ); + return Void(); } ACTOR static UNCANCELLABLE Future pushAndCommit(RawDiskQueue_TwoFiles* self, StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) { @@ -358,7 +363,7 @@ public: TEST( pageData.size() > sizeof(Page) ); // push more than one page of data - Future pushed = self->push( pageData, syncFiles ); + Future pushed = self->push( pageData, &syncFiles ); pushing.send(Void()); ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 ); TEST(2==syncFiles.size()); // push spans both files From 36dfbf4fb3de84399ede32ff1bce5f953a3ffd93 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Tue, 7 May 2019 14:24:45 -1000 Subject: [PATCH 2/5] Only truncate DiskQueues down to TLOG_HARD_LIMIT*2. DiskQueue shrinking was implemented for spill-by-reference, as now a DiskQueue could grow "unboundedly" large. Without a minimum file size, write burst workloads would cause the DiskQueue to shrink down to 100MB, and then grow back to its usual ~4GB size in a cycle. File growth means filesystem metadata mutations, which we'd prefer to avoid if possible since they're more unpredicatble in terms of latency. In a healthy cluster, the TLog never spills, so the disk of a single DiskQueue file should stay less than 2*TLOG_SPILL_THRESHOLD. In the worst case of spill-by-value, the DiskQueue could grow to 2*TLOG_HARD_LIMIT. Therefore, having this limit will cause DiskQueue shrinking to never behave sub-optimally for spill-by-value, and will cause the DiskQueue files to return to the optimal size with spill-by-reference. --- fdbserver/DiskQueue.actor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 1be449d0a2..7b4e37d3bb 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -304,9 +304,10 @@ public: self->writingPos = 0; const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes); - if (self->files[1].size > activeDataVolume) { + const int64_t desiredMaxFileSize = std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 ); + if (self->files[1].size > desiredMaxFileSize) { // Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes - int64_t maxShrink = std::max( pageFloor(self->files[1].size - activeDataVolume), self->fileShrinkBytes ); + int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes ); self->files[1].size -= maxShrink; waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) ); } From 0685e6c1c710e5fa8498dd4d67b2c18dc175327f Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Tue, 7 May 2019 21:01:29 -1000 Subject: [PATCH 3/5] Avoid large truncates in the DiskQueue. And instead create a new file while incrementally truncating the old one down. This avoids queueing up a massive number of filesystem metadata operations in one call, thus flooding the disk with requests and stalling out all other filesystem operations. This sets the knobs so that a truncate of >10GB causes us to create a new file rather than trying to truncate the old one. --- fdbserver/DiskQueue.actor.cpp | 37 +++++++++++++++++++++++++++++++---- fdbserver/Knobs.cpp | 1 + fdbserver/Knobs.h | 1 + 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 7b4e37d3bb..4b6ec6781a 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -270,6 +270,27 @@ public: Future truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); } + // FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile(). + ACTOR static void incrementalTruncate(Reference file) { + state int64_t remainingFileSize = wait( file->size() ); + + for( ; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT ){ + wait(file->truncate(remainingFileSize)); + wait(file->sync()); + wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL)); + } + } + + ACTOR static Future> replaceFile(Reference toReplace) { + incrementalTruncate( toReplace ); + + Reference _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0 ) ); + state Reference replacement = _replacement; + wait( replacement->sync() ); + + return replacement; + } + Future push(StringRef pageData, vector>* toSync) { return push( this, pageData, toSync ); } @@ -283,7 +304,7 @@ public: ASSERT( self->writingPos % _PAGE_SIZE == 0 ); ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 ); - vector> waitfor; + state vector> waitfor; if (pageData.size() + self->writingPos > self->files[1].size) { if ( self->files[0].popped == self->files[0].size ) { @@ -308,8 +329,16 @@ public: if (self->files[1].size > desiredMaxFileSize) { // Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes ); - self->files[1].size -= maxShrink; - waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) ); + if (maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES > + SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) { + TEST(true); // Replacing DiskQueue file + Reference newFile = wait( replaceFile(self->files[1].f) ); + self->files[1].setFile(newFile); + self->files[1].size = 0; + } else { + self->files[1].size -= maxShrink; + waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) ); + } } } else { // Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes. @@ -366,9 +395,9 @@ public: Future pushed = self->push( pageData, &syncFiles ); pushing.send(Void()); + wait( pushed ); ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 ); TEST(2==syncFiles.size()); // push spans both files - wait( pushed ); delete pageMem; pageMem = 0; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 611062b4d8..4c551d4791 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -75,6 +75,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH, 16<<10 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH = 500; init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue + init( DISK_QUEUE_MAX_TRUNCATE_EXTENTS, 1<<10 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_EXTENTS = 0; init( TLOG_DEGRADED_DELAY_COUNT, 5 ); init( TLOG_DEGRADED_DURATION, 5.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index fd45b1a110..ee91df4ad8 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -79,6 +79,7 @@ public: int64_t TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH; int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow? int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink? + int DISK_QUEUE_MAX_TRUNCATE_EXTENTS; int TLOG_DEGRADED_DELAY_COUNT; double TLOG_DEGRADED_DURATION; From c093017c2fd380441d62ed90a0a5a50afe700b60 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Wed, 8 May 2019 12:10:56 -1000 Subject: [PATCH 4/5] Add a TraceEvent and release note. --- documentation/sphinx/source/release-notes.rst | 1 + fdbserver/DiskQueue.actor.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index abc4db96ef..d04214a2b7 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -125,6 +125,7 @@ Fixes only impacting 6.1.0+ * The background actor which removes redundant teams could leave data unbalanced. [6.1.3] `(PR #1479) `_ * The transaction log spill-by-reference policy could read too much data from disk. [6.1.5] `(PR #1527) `_ * Memory tracking trace events could cause the program to crash when called from inside a trace event. [6.1.5] `(PR #1541) `_ +* TLogs will replace a large file with an empty file rather than doing a large truncate operation. [6.1.5] `(PR #1545) `_ Earlier release notes --------------------- diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 4b6ec6781a..02e4ae5927 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -332,6 +332,7 @@ public: if (maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES > SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) { TEST(true); // Replacing DiskQueue file + TraceEvent("DiskQueueReplaceFile", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", self->files[1].size).detail("ElidedTruncateSize", maxShrink); Reference newFile = wait( replaceFile(self->files[1].f) ); self->files[1].setFile(newFile); self->files[1].size = 0; From e4ba2f57887d3582e36dd1e7cc9654728ca138fc Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Wed, 8 May 2019 12:29:18 -1000 Subject: [PATCH 5/5] Add an ending TraceEvent. --- fdbserver/DiskQueue.actor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 02e4ae5927..7097698710 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -279,6 +279,8 @@ public: wait(file->sync()); wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL)); } + + TraceEvent("DiskQueueReplaceTruncateEnded").detail("Filename", file->getFilename()); } ACTOR static Future> replaceFile(Reference toReplace) {