Allow the disk queue to shrink if it has unneeded slack space.
This commit is contained in:
parent
a462359af1
commit
94bf75cb00
|
@ -144,11 +144,14 @@ public:
|
||||||
RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
|
RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
|
||||||
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
|
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
|
||||||
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
|
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
|
||||||
dbg_file0BeginSeq(0), fileExtensionBytes(10<<20), readingBuffer( dbgid ),
|
dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
|
||||||
|
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer( dbgid ),
|
||||||
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true)
|
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true)
|
||||||
{
|
{
|
||||||
if(BUGGIFY)
|
if (BUGGIFY)
|
||||||
fileExtensionBytes = 8<<10;
|
fileExtensionBytes = 1<<10 * g_random->randomSkewedUInt32( 1, 40<<10 );
|
||||||
|
if (BUGGIFY)
|
||||||
|
fileShrinkBytes = _PAGE_SIZE * g_random->randomSkewedUInt32( 1, 10<<10 );
|
||||||
files[0].dbgFilename = filename(0);
|
files[0].dbgFilename = filename(0);
|
||||||
files[1].dbgFilename = filename(1);
|
files[1].dbgFilename = filename(1);
|
||||||
// We issue reads into firstPages, so it needs to be 4k aligned.
|
// We issue reads into firstPages, so it needs to be 4k aligned.
|
||||||
|
@ -241,6 +244,7 @@ public:
|
||||||
int64_t writingPos; // Position within files[1] that will be next written
|
int64_t writingPos; // Position within files[1] that will be next written
|
||||||
|
|
||||||
int64_t fileExtensionBytes;
|
int64_t fileExtensionBytes;
|
||||||
|
int64_t fileShrinkBytes;
|
||||||
|
|
||||||
Int64MetricHandle stallCount;
|
Int64MetricHandle stallCount;
|
||||||
|
|
||||||
|
@ -274,6 +278,13 @@ public:
|
||||||
std::swap(firstPages[0], firstPages[1]);
|
std::swap(firstPages[0], firstPages[1]);
|
||||||
files[1].popped = 0;
|
files[1].popped = 0;
|
||||||
writingPos = 0;
|
writingPos = 0;
|
||||||
|
|
||||||
|
if (files[1].size > pageData.size() + fileExtensionBytes + fileShrinkBytes) {
|
||||||
|
// Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes
|
||||||
|
int64_t maxShrink = std::max( (files[1].size - files[0].size+_PAGE_SIZE-1)/_PAGE_SIZE*_PAGE_SIZE, fileShrinkBytes );
|
||||||
|
files[1].size -= maxShrink;
|
||||||
|
waitfor.push_back( files[1].f->truncate( files[1].size ) );
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
|
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
|
||||||
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
|
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
|
||||||
|
|
|
@ -72,6 +72,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
||||||
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
|
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
|
||||||
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );
|
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );
|
||||||
init( TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES, 2e9 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES = 2e6;
|
init( TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES, 2e9 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES = 2e6;
|
||||||
|
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
|
||||||
|
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
|
||||||
|
|
||||||
// Data distribution queue
|
// Data distribution queue
|
||||||
init( HEALTH_POLL_TIME, 1.0 );
|
init( HEALTH_POLL_TIME, 1.0 );
|
||||||
|
|
|
@ -75,6 +75,8 @@ public:
|
||||||
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;
|
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;
|
||||||
double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME;
|
double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME;
|
||||||
int64_t TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES;
|
int64_t TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES;
|
||||||
|
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
|
||||||
|
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
|
||||||
|
|
||||||
// Data distribution queue
|
// Data distribution queue
|
||||||
double HEALTH_POLL_TIME;
|
double HEALTH_POLL_TIME;
|
||||||
|
|
Loading…
Reference in New Issue