diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 0a15efac81..a17e55b4f4 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -28,6 +28,20 @@ typedef bool(*compare_pages)(void*,void*); typedef int64_t loc_t; +// 0 -> 0 +// 1 -> 4k +// 4k -> 4k +int64_t pageCeiling( int64_t loc ) { + return (loc+_PAGE_SIZE-1)/_PAGE_SIZE*_PAGE_SIZE; +} + +// 0 -> 0 +// 1 -> 0 +// 4k -> 4k +int64_t pageFloor( int64_t loc ) { + return loc / _PAGE_SIZE * _PAGE_SIZE; +} + struct StringBuffer { Standalone str; int reserved; @@ -144,11 +158,14 @@ public: RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit ) : basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()), readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid), - dbg_file0BeginSeq(0), fileExtensionBytes(10<<20), readingBuffer( dbgid ), + dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES), + fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer( dbgid ), readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true) { - if(BUGGIFY) - fileExtensionBytes = 8<<10; + if (BUGGIFY) + fileExtensionBytes = 1<<10 * g_random->randomSkewedUInt32( 1, 40<<10 ); + if (BUGGIFY) + fileShrinkBytes = _PAGE_SIZE * g_random->randomSkewedUInt32( 1, 10<<10 ); files[0].dbgFilename = filename(0); files[1].dbgFilename = filename(1); // We issue reads into firstPages, so it needs to be 4k aligned. @@ -241,6 +258,7 @@ public: int64_t writingPos; // Position within files[1] that will be next written int64_t fileExtensionBytes; + int64_t fileShrinkBytes; Int64MetricHandle stallCount; @@ -274,6 +292,14 @@ public: std::swap(firstPages[0], firstPages[1]); files[1].popped = 0; writingPos = 0; + + const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes); + if (files[1].size > activeDataVolume) { + // Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes + int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes ); + files[1].size -= maxShrink; + waitfor.push_back( files[1].f->truncate( files[1].size ) ); + } } else { // Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes. /*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size) @@ -942,10 +968,10 @@ private: // might be a bit overly aggressive here, but it's behavior we need to tolerate. throw io_error(); } - ASSERT( ((Page*)pagedData.begin())->seq == start.lo / _PAGE_SIZE * _PAGE_SIZE ); + ASSERT( ((Page*)pagedData.begin())->seq == pageFloor(start.lo) ); ASSERT(pagedData.size() == (toPage - fromPage + 1) * _PAGE_SIZE ); - ASSERT( ((Page*)pagedData.end() - 1)->seq == (end.lo - 1) / _PAGE_SIZE * _PAGE_SIZE ); + ASSERT( ((Page*)pagedData.end() - 1)->seq == pageFloor(end.lo - 1) ); return pagedData; } else { ASSERT(fromFile == 0); @@ -958,9 +984,9 @@ private: throw io_error(); } ASSERT(firstChunk.size() == ( ( file0size / sizeof(Page) ) - fromPage ) * _PAGE_SIZE ); - ASSERT( ((Page*)firstChunk.begin())->seq == start.lo / _PAGE_SIZE * _PAGE_SIZE ); + ASSERT( ((Page*)firstChunk.begin())->seq == pageFloor(start.lo) ); ASSERT(secondChunk.size() == (toPage + 1) * _PAGE_SIZE); - ASSERT( ((Page*)secondChunk.end() - 1)->seq == (end.lo - 1) / _PAGE_SIZE * _PAGE_SIZE ); + ASSERT( ((Page*)secondChunk.end() - 1)->seq == pageFloor(end.lo - 1) ); return firstChunk.withSuffix(secondChunk); } } @@ -979,42 +1005,42 @@ private: if (endingOffset == 0) endingOffset = sizeof(Page); if (endingOffset > 0) endingOffset -= sizeof(PageHeader); - if ((end.lo-1)/sizeof(Page)*sizeof(Page) == start.lo/sizeof(Page)*sizeof(Page)) { + if (pageFloor(end.lo-1) == pageFloor(start.lo)) { // start and end are on the same page ASSERT(pagedData.size() == sizeof(Page)); pagedData.contents() = pagedData.substr(sizeof(PageHeader) + startingOffset, endingOffset - startingOffset); return pagedData; } else { - // FIXME: This allocation is excessive and unnecessary. We know the overhead per page that - // we'll be stripping out (sizeof(PageHeader)), so we should be able to do a smaller - // allocation. But we should be able to re-use the space allocated for pagedData, which - // would mean not having to allocate 2x the space for a read. - Standalone unpagedData = makeString(pagedData.size()); - uint8_t *buf = mutateString(unpagedData); - memset(buf, 0, unpagedData.size()); + // Reusing pagedData wastes # of pages * sizeof(PageHeader) bytes, but means + // we don't have to double allocate in a hot, memory hungry call. + uint8_t *buf = mutateString(pagedData); const Page *data = reinterpret_cast(pagedData.begin()); // Only start copying from `start` in the first page. if( data->payloadSize > startingOffset ) { - memcpy(buf, data->payload+startingOffset, data->payloadSize-startingOffset); - buf += data->payloadSize-startingOffset; + const int length = data->payloadSize-startingOffset; + memmove(buf, data->payload+startingOffset, length); + buf += length; } data++; // Copy all the middle pages - while (data->seq != ((end.lo-1)/sizeof(Page)*sizeof(Page))) { + while (data->seq != pageFloor(end.lo-1)) { // These pages can have varying amounts of data, as pages with partial // data will be zero-filled when commit is called. - memcpy(buf, data->payload, data->payloadSize); - buf += data->payloadSize; + const int length = data->payloadSize; + memmove(buf, data->payload, length); + buf += length; data++; } // Copy only until `end` in the last page. - memcpy(buf, data->payload, std::min(endingOffset, data->payloadSize)); - buf += std::min(endingOffset, data->payloadSize); + const int length = data->payloadSize; + memmove(buf, data->payload, std::min(endingOffset, length)); + buf += std::min(endingOffset, length); - unpagedData.contents() = unpagedData.substr(0, buf - unpagedData.begin()); + memset(buf, 0, pagedData.size() - (buf - pagedData.begin())); + Standalone unpagedData = pagedData.substr(0, buf - pagedData.begin()); return unpagedData; } } @@ -1068,14 +1094,14 @@ private: self->readBufArena = page.arena(); self->readBufPage = (Page*)page.begin(); - if (!self->readBufPage->checkHash() || self->readBufPage->seq < self->nextReadLocation/sizeof(Page)*sizeof(Page)) { + if (!self->readBufPage->checkHash() || self->readBufPage->seq < pageFloor(self->nextReadLocation)) { TraceEvent("DQRecInvalidPage", self->dbgid).detail("NextReadLocation", self->nextReadLocation).detail("HashCheck", self->readBufPage->checkHash()) - .detail("Seq", self->readBufPage->seq).detail("Expect", self->nextReadLocation/sizeof(Page)*sizeof(Page)).detail("File0Name", self->rawQueue->files[0].dbgFilename); + .detail("Seq", self->readBufPage->seq).detail("Expect", pageFloor(self->nextReadLocation)).detail("File0Name", self->rawQueue->files[0].dbgFilename); wait( self->rawQueue->truncateBeforeLastReadPage() ); break; } //TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename); - ASSERT( self->readBufPage->seq == self->nextReadLocation/sizeof(Page)*sizeof(Page) ); + ASSERT( self->readBufPage->seq == pageFloor(self->nextReadLocation) ); self->lastPoppedSeq = self->readBufPage->popped; } @@ -1084,10 +1110,10 @@ private: int f; int64_t p; TEST( self->lastPoppedSeq/sizeof(Page) != self->poppedSeq/sizeof(Page) ); // DiskQueue: Recovery popped position not fully durable self->findPhysicalLocation( self->lastPoppedSeq, &f, &p, "lastPoppedSeq" ); - wait(self->rawQueue->setPoppedPage( f, p, self->lastPoppedSeq/sizeof(Page)*sizeof(Page) )); + wait(self->rawQueue->setPoppedPage( f, p, pageFloor(self->lastPoppedSeq) )); // Writes go at the end of our reads (but on the next page) - self->nextPageSeq = self->nextReadLocation/sizeof(Page)*sizeof(Page); + self->nextPageSeq = pageFloor(self->nextReadLocation); if (self->nextReadLocation % sizeof(Page) > sizeof(PageHeader)) self->nextPageSeq += sizeof(Page); TraceEvent("DQRecovered", self->dbgid).detail("LastPoppedSeq", self->lastPoppedSeq).detail("PoppedSeq", self->poppedSeq).detail("NextPageSeq", self->nextPageSeq).detail("File0Name", self->rawQueue->files[0].dbgFilename); diff --git a/fdbserver/IDiskQueue.h b/fdbserver/IDiskQueue.h index 69991d0b92..84faf3473b 100644 --- a/fdbserver/IDiskQueue.h +++ b/fdbserver/IDiskQueue.h @@ -44,6 +44,10 @@ public: if (hi>r.hi) return false; return lo < r.lo; } + + bool operator == (const location& r) const { + return hi == r.hi && lo == r.lo; + } }; //! Find the first and last pages in the disk queue, and initialize invariants. diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 00cd70fcdf..8b37bb55fb 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -54,6 +54,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( BUGGIFY_RECOVER_MEMORY_LIMIT, 1e6 ); init( BUGGIFY_WORKER_REMOVED_MAX_LAG, 30 ); init( UPDATE_STORAGE_BYTE_LIMIT, 1e6 ); + init( REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT, 20e6 ); init( TLOG_PEEK_DELAY, 0.00005 ); init( LEGACY_TLOG_UPGRADE_ENTRIES_PER_VERSION, 100 ); init( VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS, 1072 ); // Based on a naive interpretation of the gcc version of std::deque, we would expect this to be 16 bytes overhead per 512 bytes data. In practice, it seems to be 24 bytes overhead per 512. @@ -70,6 +71,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( CONCURRENT_LOG_ROUTER_READS, 1 ); init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 ); init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 ); + init( TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES, 2e9 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES = 2e6; + init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue + init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue // Data distribution queue init( HEALTH_POLL_TIME, 1.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index c65798e34d..f3698b3561 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -57,6 +57,7 @@ public: double BUGGIFY_RECOVER_MEMORY_LIMIT; double BUGGIFY_WORKER_REMOVED_MAX_LAG; int64_t UPDATE_STORAGE_BYTE_LIMIT; + int64_t REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT; double TLOG_PEEK_DELAY; int LEGACY_TLOG_UPGRADE_ENTRIES_PER_VERSION; int VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS; // Multiplicative factor to bound total space used to store a version message (measured in 1/1024ths, e.g. a value of 2048 yields a factor of 2). @@ -73,6 +74,9 @@ public: int CONCURRENT_LOG_ROUTER_READS; double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME; double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME; + int64_t TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES; + int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow? + int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink? // Data distribution queue double HEALTH_POLL_TIME; diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index aac98b34d4..0d4469fe7e 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1523,6 +1523,7 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st if(poppedIsKnownCommitted) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); + logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion()); } commitMessages(self, logData, ver, messages); @@ -1561,6 +1562,7 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st if(poppedIsKnownCommitted) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); + logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion()); } if(self->terminated.isSet()) { @@ -1911,7 +1913,6 @@ ACTOR Future updateLogSystem(TLogData* self, Reference logData, L ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) { state TLogInterface recruited(self->dbgid, locality); - recruited.locality = locality; recruited.initEndpoints(); DUMPTOKEN( recruited.peekMessages ); diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index f5d1bb7402..0a36eb5d8a 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -842,9 +842,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR break; } } else { - // FIXME: Temporarily disable spill-by-reference. - //set_config("log_version:=3"); // 6.1 - //set_config("log_spill:=2"); // REFERENCE + set_config("log_version:=3"); // 6.1 + set_config("log_spill:=2"); // REFERENCE } if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) { diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index ed95f34322..064f9626a0 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -199,9 +199,19 @@ static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringR static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) ); static const KeyRangeRef persistLocalityKeys = KeyRangeRef( LiteralStringRef( "Locality/" ), LiteralStringRef( "Locality0" ) ); static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) ); +static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/")); static const KeyRange persistTagMessageRefsKeys = prefixRange(LiteralStringRef("TagMsgRef/")); static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/")); +static Key persistTagMessagesKey( UID id, Tag tag, Version version ) { + BinaryWriter wr( Unversioned() ); + wr.serializeBytes(persistTagMessagesKeys.begin); + wr << id; + wr << tag; + wr << bigEndian64( version ); + return wr.toStringRef(); +} + static Key persistTagMessageRefsKey( UID id, Tag tag, Version version ) { BinaryWriter wr( Unversioned() ); wr.serializeBytes(persistTagMessageRefsKeys.begin); @@ -234,10 +244,18 @@ static Version decodeTagPoppedValue( ValueRef value ) { return BinaryReader::fromStringRef( value, Unversioned() ); } +static StringRef stripTagMessagesKey( StringRef key ) { + return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessagesKeys.begin.size() ); +} + static StringRef stripTagMessageRefsKey( StringRef key ) { return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessageRefsKeys.begin.size() ); } +static Version decodeTagMessagesKey( StringRef key ) { + return bigEndian64( BinaryReader::fromStringRef( stripTagMessagesKey(key), Unversioned() ) ); +} + static Version decodeTagMessageRefsKey( StringRef key ) { return bigEndian64( BinaryReader::fromStringRef( stripTagMessageRefsKey(key), Unversioned() ) ); } @@ -274,6 +292,7 @@ struct TLogData : NonCopyable { std::map peekTracker; WorkerCache tlogCache; + FlowLock peekMemoryLimiter; PromiseStream> sharedActors; Promise terminated; @@ -285,6 +304,7 @@ struct TLogData : NonCopyable { persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0), + peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS) { } @@ -449,6 +469,8 @@ struct LogData : NonCopyable, public ReferenceCounted { specialCounter(cc, "QueueDiskBytesFree", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().free; }); specialCounter(cc, "QueueDiskBytesAvailable", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().available; }); specialCounter(cc, "QueueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; }); + specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); + specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); } ~LogData() { @@ -466,8 +488,10 @@ struct LogData : NonCopyable, public ReferenceCounted { tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLocalityKeys.begin)) ); tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) ); tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) ); - Key msgKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin); + Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin); tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) ); + Key msgRefKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin); + tLogData->persistentData->clear( KeyRangeRef( msgRefKey, strinc(msgRefKey) ) ); Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin); tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) ); } @@ -553,22 +577,48 @@ void updatePersistentPopped( TLogData* self, Reference logData, Referen if (data->nothingPersistent) return; - self->persistentData->clear( KeyRangeRef( - persistTagMessageRefsKey( logData->logId, data->tag, Version(0) ), - persistTagMessageRefsKey( logData->logId, data->tag, data->popped ) ) ); + if (data->tag == txsTag) { + self->persistentData->clear( KeyRangeRef( + persistTagMessagesKey( logData->logId, data->tag, Version(0) ), + persistTagMessagesKey( logData->logId, data->tag, data->popped ) ) ); + } else { + self->persistentData->clear( KeyRangeRef( + persistTagMessageRefsKey( logData->logId, data->tag, Version(0) ), + persistTagMessageRefsKey( logData->logId, data->tag, data->popped ) ) ); + } if (data->popped > logData->persistentDataVersion) data->nothingPersistent = true; } +struct SpilledData { + SpilledData() = default; + SpilledData(Version version, IDiskQueue::location start, uint32_t length, uint32_t mutationBytes) + : version(version), start(start), length(length), mutationBytes(mutationBytes) { + } + + template + void serialize_unversioned(Ar& ar) { + serializer(ar, version, start, length, mutationBytes); + } + + Version version = 0; + IDiskQueue::location start = 0; + uint32_t length = 0; + uint32_t mutationBytes = 0; +}; +// FIXME: One should be able to use SFINAE to choose between serialize and serialize_unversioned. +template void load( Ar& ar, SpilledData& data ) { data.serialize_unversioned(ar); } +template void save( Ar& ar, const SpilledData& data ) { const_cast(data).serialize_unversioned(ar); } + + struct VerifyState { - std::vector> locations; - std::vector versions; + std::vector spilledData; std::vector>> readfutures; }; ACTOR void verifyPersistentData( TLogData* self, VerifyState* vs ) { - for (auto iter = vs->locations.begin(); iter != vs->locations.end(); iter++) { - vs->readfutures.push_back( self->rawPersistentQueue->read( iter->first, iter->second.lo ) ); + for (auto iter = vs->spilledData.begin(); iter != vs->spilledData.end(); iter++) { + vs->readfutures.push_back( self->rawPersistentQueue->read( iter->start, iter->start.lo + iter->length ) ); } try { wait( waitForAll(vs->readfutures) ); @@ -595,13 +645,14 @@ ACTOR void verifyPersistentData( TLogData* self, VerifyState* vs ) { ASSERT(false); } // ASSERT( length == rawdata.size() ); - ASSERT( entry.version == vs->versions[i] ); + ASSERT( entry.version == vs->spilledData[i].version ); ASSERT( valid == 1 ); } delete vs; } ACTOR Future updatePersistentData( TLogData* self, Reference logData, Version newPersistentDataVersion ) { + state BinaryWriter wr( Unversioned() ); // PERSIST: Changes self->persistentDataVersion and writes and commits the relevant changes ASSERT( newPersistentDataVersion <= logData->version.get() ); ASSERT( newPersistentDataVersion <= logData->queueCommittedVersion.get() ); @@ -630,31 +681,53 @@ ACTOR Future updatePersistentData( TLogData* self, Reference logD updatePersistentPopped( self, logData, tagData ); // Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData state std::deque>::iterator msg = tagData->versionMessages.begin(); + state int refSpilledTagCount = 0; + wr = BinaryWriter( Unversioned() ); + // We prefix our spilled locations with a count, so that we can read this back out as a VectorRef. + wr << uint32_t(0); while(msg != tagData->versionMessages.end() && msg->first <= newPersistentDataVersion) { currentVersion = msg->first; anyData = true; tagData->nothingPersistent = false; - BinaryWriter wr( Unversioned() ); - const IDiskQueue::location begin = logData->versionLocation[currentVersion].first; - const IDiskQueue::location end = logData->versionLocation[currentVersion].second; - wr << begin << end; + if (tagData->tag == txsTag) { + // spill txsTag by value + wr = BinaryWriter( Unversioned() ); + for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) { + wr << msg->second.toStringRef(); + } + self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) ); + } else { + // spill everything else by reference + const IDiskQueue::location begin = logData->versionLocation[currentVersion].first; + const IDiskQueue::location end = logData->versionLocation[currentVersion].second; + ASSERT(end > begin && end.lo - begin.lo < std::numeric_limits::max()); + uint32_t length = static_cast(end.lo - begin.lo); + refSpilledTagCount++; + + uint32_t size = msg->second.expectedSize(); + for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) { + // Fast forward until we find a new version. + size += msg->second.expectedSize(); + } + + SpilledData spilledData( currentVersion, begin, length, size ); + wr << spilledData; + + if (vs && (vs->spilledData.empty() || vs->spilledData.back().version != currentVersion)) { + vs->spilledData.push_back( spilledData ); + } + + Future f = yield(TaskUpdateStorage); + if(!f.isReady()) { + wait(f); + msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst>()); + } + } + } + if (refSpilledTagCount > 0) { + *(uint32_t*)wr.getData() = refSpilledTagCount; self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) ); - - if (vs && (vs->versions.empty() || vs->versions.back() != currentVersion)) { - vs->versions.push_back( currentVersion ); - vs->locations.push_back( std::make_pair( begin, end ) ); - } - - for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) { - // Fast forward until we find a new version. - } - - Future f = yield(TaskUpdateStorage); - if(!f.isReady()) { - wait(f); - msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst>()); - } } wait(yield(TaskUpdateStorage)); @@ -714,7 +787,11 @@ ACTOR Future updatePersistentData( TLogData* self, Reference logD for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { Reference tagData = logData->tag_data[tagLocality][tagId]; if (tagData) { - minVersion = std::min(minVersion, tagData->popped); + if (tagData->tag == txsTag) { + minVersion = std::min(minVersion, newPersistentDataVersion); + } else { + minVersion = std::min(minVersion, tagData->popped); + } } } } @@ -751,7 +828,8 @@ ACTOR Future updateStorage( TLogData* self ) { totalSize = 0; Map>::iterator sizeItr = logData->version_sizes.begin(); nextVersion = logData->version.get(); - while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() ) + while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT && + sizeItr != logData->version_sizes.end() ) { totalSize += sizeItr->value.first + sizeItr->value.second; ++sizeItr; @@ -790,7 +868,8 @@ ACTOR Future updateStorage( TLogData* self ) { nextVersion = logData->version.get(); } else { Map>::iterator sizeItr = logData->version_sizes.begin(); - while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() + while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT && + sizeItr != logData->version_sizes.end() && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) { totalSize += sizeItr->value.first + sizeItr->value.second; @@ -1017,10 +1096,10 @@ void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req } } -std::vector parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) { +ACTOR Future> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) { // See the comment in LogSystem.cpp for the binary format of commitBlob. - std::vector relevantMessages; - BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion)); + state std::vector relevantMessages; + state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion)); while (!rd.empty()) { uint32_t messageLength = 0; uint32_t subsequence = 0; @@ -1048,7 +1127,7 @@ std::vector parseMessagesForTag( StringRef commitBlob, Tag tag, int l if (match) { relevantMessages.push_back( StringRef((uint8_t*)begin, messageLength) ); } - // FIXME: Yield here so Evan doesn't have to + wait(yield()); } return relevantMessages; } @@ -1155,60 +1234,98 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere peekMessagesFromMemory( logData, req, messages2, endVersion ); - // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. - state Standalone> kvrefs = wait( - self->persistentData->readRange(KeyRangeRef( - persistTagMessageRefsKey(logData->logId, req.tag, req.begin), - persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)))); + if (req.tag == txsTag) { + Standalone> kvs = wait( + self->persistentData->readRange(KeyRangeRef( + persistTagMessagesKey(logData->logId, req.tag, req.begin), + persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); - - state std::vector>> messageReads; - for (auto &kv : kvrefs) { - IDiskQueue::location begin, end; - BinaryReader r(kv.value, Unversioned()); - r >> begin >> end; - messageReads.push_back( self->rawPersistentQueue->read(begin, end) ); - } - wait( waitForAll( messageReads ) ); - - ASSERT( messageReads.size() == kvrefs.size() ); - Version lastRefMessageVersion = 0; - for (int i = 0; i < messageReads.size(); i++ ) { - Standalone queueEntryData = messageReads[i].get(); - uint8_t valid; - const uint32_t length = *(uint32_t*)queueEntryData.begin(); - queueEntryData = queueEntryData.substr( 4, queueEntryData.size() - 4); - BinaryReader rd( queueEntryData, IncludeVersion() ); - TLogQueueEntry entry; - rd >> entry >> valid; - Version version = decodeTagMessageRefsKey(kvrefs[i].key); - ASSERT( valid == 0x01 ); - ASSERT( length + sizeof(valid) == queueEntryData.size() ); - - messages << int32_t(-1) << version; - - // FIXME: push DESIRED_TOTAL_BYTES into parseMessagesForTag - // FIXME: maybe push this copy in as well - std::vector parsedMessages = parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags); - for (StringRef msg : parsedMessages) { - messages << msg; + for (auto &kv : kvs) { + auto ver = decodeTagMessagesKey(kv.key); + messages << int32_t(-1) << ver; + messages.serializeBytes(kv.value); } - lastRefMessageVersion = version; + if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) + endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; + else + messages.serializeBytes( messages2.toStringRef() ); + } else { + // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. + Standalone> kvrefs = wait( + self->persistentData->readRange(KeyRangeRef( + persistTagMessageRefsKey(logData->logId, req.tag, req.begin), + persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)))); - if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { - break; + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + + state std::vector> commitLocations; + state bool earlyEnd = false; + uint32_t mutationBytes = 0; + state uint64_t commitBytes = 0; + for (auto &kv : kvrefs) { + VectorRef spilledData; + BinaryReader r(kv.value, Unversioned()); + r >> spilledData; + for (const SpilledData& sd : spilledData) { + if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { + earlyEnd = true; + break; + } + if (sd.version >= req.begin) { + const IDiskQueue::location end = sd.start.lo + sd.length; + commitLocations.push_back( std::make_pair(sd.start, end) ); + // This isn't perfect, because we aren't accounting for page boundaries, but should be + // close enough. + commitBytes += sd.length; + mutationBytes += sd.mutationBytes; + } + } + if (earlyEnd) break; } + wait( self->peekMemoryLimiter.take(TaskTLogPeekReply, commitBytes) ); + state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes); + state std::vector>> messageReads; + messageReads.reserve( commitLocations.size() ); + for (const auto& pair : commitLocations) { + messageReads.push_back( self->rawPersistentQueue->read(pair.first, pair.second) ); + } + commitLocations.clear(); + wait( waitForAll( messageReads ) ); + + state Version lastRefMessageVersion = 0; + state int index = 0; + loop { + if (index >= messageReads.size()) break; + Standalone queueEntryData = messageReads[index].get(); + uint8_t valid; + const uint32_t length = *(uint32_t*)queueEntryData.begin(); + queueEntryData = queueEntryData.substr( 4, queueEntryData.size() - 4); + BinaryReader rd( queueEntryData, IncludeVersion() ); + state TLogQueueEntry entry; + rd >> entry >> valid; + ASSERT( valid == 0x01 ); + ASSERT( length + sizeof(valid) == queueEntryData.size() ); + + messages << int32_t(-1) << entry.version; + + std::vector parsedMessages = wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); + for (StringRef msg : parsedMessages) { + messages << msg; + } + + lastRefMessageVersion = entry.version; + index++; + } + + messageReads.clear(); + memoryReservation.release(); + + if (earlyEnd) + endVersion = lastRefMessageVersion + 1; + else + messages.serializeBytes( messages2.toStringRef() ); } - - messageReads.clear(); - kvrefs = Standalone>(); - - if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) - endVersion = lastRefMessageVersion + 1; - else - messages.serializeBytes( messages2.toStringRef() ); } else { peekMessagesFromMemory( logData, req, messages, endVersion ); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); diff --git a/fdbserver/workloads/ConfigureDatabase.actor.cpp b/fdbserver/workloads/ConfigureDatabase.actor.cpp index 556910631d..82ce1f37a8 100644 --- a/fdbserver/workloads/ConfigureDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigureDatabase.actor.cpp @@ -236,6 +236,11 @@ struct ConfigureDatabaseWorkload : TestWorkload { return StringRef(format("DestroyDB%d", dbIndex)); } + static Future IssueConfigurationChange( Database cx, const std::string& config, bool force ) { + printf("Issuing configuration change: %s\n", config.c_str()); + return changeConfig(cx, config, force); + } + ACTOR Future _setup( Database cx, ConfigureDatabaseWorkload *self ) { wait(success( changeConfig( cx, "single", true ) )); return Void(); @@ -330,7 +335,7 @@ struct ConfigureDatabaseWorkload : TestWorkload { if (g_random->random01() < 0.5) config += " proxies=" + format("%d", randomRoleNumber()); if (g_random->random01() < 0.5) config += " resolvers=" + format("%d", randomRoleNumber()); - wait(success( changeConfig( cx, config, false ) )); + wait(success( IssueConfigurationChange( cx, config, false ) )); //TraceEvent("ConfigureTestConfigureEnd").detail("NewConfig", newConfig); } @@ -343,11 +348,11 @@ struct ConfigureDatabaseWorkload : TestWorkload { //TraceEvent("ConfigureTestConfigureEnd").detail("NewQuorum", s); } else if ( randomChoice == 5) { - wait(success( changeConfig( cx, storeTypes[g_random->randomInt( 0, sizeof(storeTypes)/sizeof(storeTypes[0]))], true ) )); + wait(success( IssueConfigurationChange( cx, storeTypes[g_random->randomInt( 0, sizeof(storeTypes)/sizeof(storeTypes[0]))], true ) )); } else if ( randomChoice == 6 ) { // Some configurations will be invalid, and that's fine. - wait(success( changeConfig( cx, logTypes[g_random->randomInt( 0, sizeof(logTypes)/sizeof(logTypes[0]))], false ) )); + wait(success( IssueConfigurationChange( cx, logTypes[g_random->randomInt( 0, sizeof(logTypes)/sizeof(logTypes[0]))], false ) )); } else { ASSERT(false); diff --git a/flow/DeterministicRandom.h b/flow/DeterministicRandom.h index f2e23fefca..369bbef010 100644 --- a/flow/DeterministicRandom.h +++ b/flow/DeterministicRandom.h @@ -24,6 +24,8 @@ #include #include "flow/IRandom.h" +#include "flow/Error.h" +#include "flow/Trace.h" #include @@ -90,6 +92,14 @@ public: uint32_t randomUInt32() { return gen64(); } + uint32_t randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) { + std::uniform_real_distribution distribution( std::log(min), std::log(maxPlusOne-1) ); + double logpower = distribution(random); + uint32_t loguniform = static_cast( std::pow( 10, logpower ) ); + // doubles can be imprecise, so let's make sure we don't violate an edge case. + return std::max(std::min(loguniform, maxPlusOne-1), min); + } + UID randomUniqueID() { uint64_t x,y; x = gen64(); diff --git a/flow/IRandom.h b/flow/IRandom.h index 3d957a004d..c5d60cb01d 100644 --- a/flow/IRandom.h +++ b/flow/IRandom.h @@ -76,6 +76,7 @@ public: virtual UID randomUniqueID() = 0; virtual char randomAlphaNumeric() = 0; virtual std::string randomAlphaNumeric( int length ) = 0; + virtual uint32_t randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) = 0; virtual uint64_t peek() const = 0; // returns something that is probably different for different random states. Deterministic (and idempotent) for a deterministic generator. // The following functions have fixed implementations for now: diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 11aa4f6c67..4b1d6abbc6 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1198,11 +1198,11 @@ struct FlowLock : NonCopyable, public ReferenceCounted { FlowLock* lock; int remaining; Releaser() : lock(0), remaining(0) {} - Releaser( FlowLock& lock, int amount = 1 ) : lock(&lock), remaining(amount) {} + Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {} Releaser(Releaser&& r) noexcept(true) : lock(r.lock), remaining(r.remaining) { r.remaining = 0; } void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; } - void release( int amount = -1 ) { + void release( int64_t amount = -1 ) { if( amount == -1 || amount > remaining ) amount = remaining; @@ -1215,23 +1215,23 @@ struct FlowLock : NonCopyable, public ReferenceCounted { }; FlowLock() : permits(1), active(0) {} - explicit FlowLock(int permits) : permits(permits), active(0) {} + explicit FlowLock(int64_t permits) : permits(permits), active(0) {} - Future take(int taskID = TaskDefaultYield, int amount = 1) { - ASSERT(amount <= permits); - if (active + amount <= permits) { + Future take(int taskID = TaskDefaultYield, int64_t amount = 1) { + ASSERT(amount <= permits || active == 0); + if (active + amount <= permits || active == 0) { active += amount; return safeYieldActor(this, taskID, amount); } return takeActor(this, taskID, amount); } - void release( int amount = 1 ) { - ASSERT( active > 0 || amount == 0 ); + void release( int64_t amount = 1 ) { + ASSERT( (active > 0 || amount == 0) && active - amount >= 0 ); active -= amount; while( !takers.empty() ) { - if( active + takers.begin()->second <= permits ) { - std::pair< Promise, int > next = std::move( *takers.begin() ); + if( active + takers.begin()->second <= permits || active == 0 ) { + std::pair< Promise, int64_t > next = std::move( *takers.begin() ); active += next.second; takers.pop_front(); next.first.send(Void()); @@ -1244,21 +1244,21 @@ struct FlowLock : NonCopyable, public ReferenceCounted { Future releaseWhen( Future const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); } // returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken - Future takeUpTo(int& amount) { + Future takeUpTo(int64_t& amount) { return takeMoreActor(this, &amount); } - int available() const { return permits - active; } - int activePermits() const { return active; } + int64_t available() const { return permits - active; } + int64_t activePermits() const { return active; } int waiters() const { return takers.size(); } private: - std::list< std::pair< Promise, int > > takers; - const int permits; - int active; + std::list< std::pair< Promise, int64_t > > takers; + const int64_t permits; + int64_t active; Promise broken_on_destruct; - ACTOR static Future takeActor(FlowLock* lock, int taskID, int amount) { - state std::list, int>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise(), amount)); + ACTOR static Future takeActor(FlowLock* lock, int taskID, int64_t amount) { + state std::list, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise(), amount)); try { wait( it->first.getFuture() ); @@ -1281,15 +1281,15 @@ private: } } - ACTOR static Future takeMoreActor(FlowLock* lock, int* amount) { + ACTOR static Future takeMoreActor(FlowLock* lock, int64_t* amount) { wait(lock->take()); - int extra = std::min( lock->available(), *amount-1 ); + int64_t extra = std::min( lock->available(), *amount-1 ); lock->active += extra; *amount = 1 + extra; return Void(); } - ACTOR static Future safeYieldActor(FlowLock* lock, int taskID, int amount) { + ACTOR static Future safeYieldActor(FlowLock* lock, int taskID, int64_t amount) { try { choose{ when(wait(yield(taskID))) {} @@ -1302,7 +1302,7 @@ private: } } - ACTOR static Future releaseWhenActor( FlowLock* self, Future signal, int amount ) { + ACTOR static Future releaseWhenActor( FlowLock* self, Future signal, int64_t amount ) { wait(signal); self->release(amount); return Void();