Merge pull request #1217 from alexmiller-apple/tstlog-goodref

Spill-By-Reference TLog Part 4: Actually Usable Reference Spilling
This commit is contained in:
Evan Tschannen 2019-03-04 20:58:24 -08:00 committed by GitHub
commit 69d7633d5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 311 additions and 140 deletions

View File

@ -28,6 +28,20 @@
typedef bool(*compare_pages)(void*,void*);
typedef int64_t loc_t;
// 0 -> 0
// 1 -> 4k
// 4k -> 4k
int64_t pageCeiling( int64_t loc ) {
return (loc+_PAGE_SIZE-1)/_PAGE_SIZE*_PAGE_SIZE;
}
// 0 -> 0
// 1 -> 0
// 4k -> 4k
int64_t pageFloor( int64_t loc ) {
return loc / _PAGE_SIZE * _PAGE_SIZE;
}
struct StringBuffer {
Standalone<StringRef> str;
int reserved;
@ -144,11 +158,14 @@ public:
RawDiskQueue_TwoFiles( std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit )
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
dbg_file0BeginSeq(0), fileExtensionBytes(10<<20), readingBuffer( dbgid ),
dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer( dbgid ),
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true)
{
if(BUGGIFY)
fileExtensionBytes = 8<<10;
if (BUGGIFY)
fileExtensionBytes = 1<<10 * g_random->randomSkewedUInt32( 1, 40<<10 );
if (BUGGIFY)
fileShrinkBytes = _PAGE_SIZE * g_random->randomSkewedUInt32( 1, 10<<10 );
files[0].dbgFilename = filename(0);
files[1].dbgFilename = filename(1);
// We issue reads into firstPages, so it needs to be 4k aligned.
@ -241,6 +258,7 @@ public:
int64_t writingPos; // Position within files[1] that will be next written
int64_t fileExtensionBytes;
int64_t fileShrinkBytes;
Int64MetricHandle stallCount;
@ -274,6 +292,14 @@ public:
std::swap(firstPages[0], firstPages[1]);
files[1].popped = 0;
writingPos = 0;
const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes);
if (files[1].size > activeDataVolume) {
// Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes );
files[1].size -= maxShrink;
waitfor.push_back( files[1].f->truncate( files[1].size ) );
}
} else {
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
@ -942,10 +968,10 @@ private:
// might be a bit overly aggressive here, but it's behavior we need to tolerate.
throw io_error();
}
ASSERT( ((Page*)pagedData.begin())->seq == start.lo / _PAGE_SIZE * _PAGE_SIZE );
ASSERT( ((Page*)pagedData.begin())->seq == pageFloor(start.lo) );
ASSERT(pagedData.size() == (toPage - fromPage + 1) * _PAGE_SIZE );
ASSERT( ((Page*)pagedData.end() - 1)->seq == (end.lo - 1) / _PAGE_SIZE * _PAGE_SIZE );
ASSERT( ((Page*)pagedData.end() - 1)->seq == pageFloor(end.lo - 1) );
return pagedData;
} else {
ASSERT(fromFile == 0);
@ -958,9 +984,9 @@ private:
throw io_error();
}
ASSERT(firstChunk.size() == ( ( file0size / sizeof(Page) ) - fromPage ) * _PAGE_SIZE );
ASSERT( ((Page*)firstChunk.begin())->seq == start.lo / _PAGE_SIZE * _PAGE_SIZE );
ASSERT( ((Page*)firstChunk.begin())->seq == pageFloor(start.lo) );
ASSERT(secondChunk.size() == (toPage + 1) * _PAGE_SIZE);
ASSERT( ((Page*)secondChunk.end() - 1)->seq == (end.lo - 1) / _PAGE_SIZE * _PAGE_SIZE );
ASSERT( ((Page*)secondChunk.end() - 1)->seq == pageFloor(end.lo - 1) );
return firstChunk.withSuffix(secondChunk);
}
}
@ -979,42 +1005,42 @@ private:
if (endingOffset == 0) endingOffset = sizeof(Page);
if (endingOffset > 0) endingOffset -= sizeof(PageHeader);
if ((end.lo-1)/sizeof(Page)*sizeof(Page) == start.lo/sizeof(Page)*sizeof(Page)) {
if (pageFloor(end.lo-1) == pageFloor(start.lo)) {
// start and end are on the same page
ASSERT(pagedData.size() == sizeof(Page));
pagedData.contents() = pagedData.substr(sizeof(PageHeader) + startingOffset, endingOffset - startingOffset);
return pagedData;
} else {
// FIXME: This allocation is excessive and unnecessary. We know the overhead per page that
// we'll be stripping out (sizeof(PageHeader)), so we should be able to do a smaller
// allocation. But we should be able to re-use the space allocated for pagedData, which
// would mean not having to allocate 2x the space for a read.
Standalone<StringRef> unpagedData = makeString(pagedData.size());
uint8_t *buf = mutateString(unpagedData);
memset(buf, 0, unpagedData.size());
// Reusing pagedData wastes # of pages * sizeof(PageHeader) bytes, but means
// we don't have to double allocate in a hot, memory hungry call.
uint8_t *buf = mutateString(pagedData);
const Page *data = reinterpret_cast<const Page*>(pagedData.begin());
// Only start copying from `start` in the first page.
if( data->payloadSize > startingOffset ) {
memcpy(buf, data->payload+startingOffset, data->payloadSize-startingOffset);
buf += data->payloadSize-startingOffset;
const int length = data->payloadSize-startingOffset;
memmove(buf, data->payload+startingOffset, length);
buf += length;
}
data++;
// Copy all the middle pages
while (data->seq != ((end.lo-1)/sizeof(Page)*sizeof(Page))) {
while (data->seq != pageFloor(end.lo-1)) {
// These pages can have varying amounts of data, as pages with partial
// data will be zero-filled when commit is called.
memcpy(buf, data->payload, data->payloadSize);
buf += data->payloadSize;
const int length = data->payloadSize;
memmove(buf, data->payload, length);
buf += length;
data++;
}
// Copy only until `end` in the last page.
memcpy(buf, data->payload, std::min(endingOffset, data->payloadSize));
buf += std::min(endingOffset, data->payloadSize);
const int length = data->payloadSize;
memmove(buf, data->payload, std::min(endingOffset, length));
buf += std::min(endingOffset, length);
unpagedData.contents() = unpagedData.substr(0, buf - unpagedData.begin());
memset(buf, 0, pagedData.size() - (buf - pagedData.begin()));
Standalone<StringRef> unpagedData = pagedData.substr(0, buf - pagedData.begin());
return unpagedData;
}
}
@ -1068,14 +1094,14 @@ private:
self->readBufArena = page.arena();
self->readBufPage = (Page*)page.begin();
if (!self->readBufPage->checkHash() || self->readBufPage->seq < self->nextReadLocation/sizeof(Page)*sizeof(Page)) {
if (!self->readBufPage->checkHash() || self->readBufPage->seq < pageFloor(self->nextReadLocation)) {
TraceEvent("DQRecInvalidPage", self->dbgid).detail("NextReadLocation", self->nextReadLocation).detail("HashCheck", self->readBufPage->checkHash())
.detail("Seq", self->readBufPage->seq).detail("Expect", self->nextReadLocation/sizeof(Page)*sizeof(Page)).detail("File0Name", self->rawQueue->files[0].dbgFilename);
.detail("Seq", self->readBufPage->seq).detail("Expect", pageFloor(self->nextReadLocation)).detail("File0Name", self->rawQueue->files[0].dbgFilename);
wait( self->rawQueue->truncateBeforeLastReadPage() );
break;
}
//TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename);
ASSERT( self->readBufPage->seq == self->nextReadLocation/sizeof(Page)*sizeof(Page) );
ASSERT( self->readBufPage->seq == pageFloor(self->nextReadLocation) );
self->lastPoppedSeq = self->readBufPage->popped;
}
@ -1084,10 +1110,10 @@ private:
int f; int64_t p;
TEST( self->lastPoppedSeq/sizeof(Page) != self->poppedSeq/sizeof(Page) ); // DiskQueue: Recovery popped position not fully durable
self->findPhysicalLocation( self->lastPoppedSeq, &f, &p, "lastPoppedSeq" );
wait(self->rawQueue->setPoppedPage( f, p, self->lastPoppedSeq/sizeof(Page)*sizeof(Page) ));
wait(self->rawQueue->setPoppedPage( f, p, pageFloor(self->lastPoppedSeq) ));
// Writes go at the end of our reads (but on the next page)
self->nextPageSeq = self->nextReadLocation/sizeof(Page)*sizeof(Page);
self->nextPageSeq = pageFloor(self->nextReadLocation);
if (self->nextReadLocation % sizeof(Page) > sizeof(PageHeader)) self->nextPageSeq += sizeof(Page);
TraceEvent("DQRecovered", self->dbgid).detail("LastPoppedSeq", self->lastPoppedSeq).detail("PoppedSeq", self->poppedSeq).detail("NextPageSeq", self->nextPageSeq).detail("File0Name", self->rawQueue->files[0].dbgFilename);

View File

@ -44,6 +44,10 @@ public:
if (hi>r.hi) return false;
return lo < r.lo;
}
bool operator == (const location& r) const {
return hi == r.hi && lo == r.lo;
}
};
//! Find the first and last pages in the disk queue, and initialize invariants.

View File

@ -54,6 +54,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( BUGGIFY_RECOVER_MEMORY_LIMIT, 1e6 );
init( BUGGIFY_WORKER_REMOVED_MAX_LAG, 30 );
init( UPDATE_STORAGE_BYTE_LIMIT, 1e6 );
init( REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT, 20e6 );
init( TLOG_PEEK_DELAY, 0.00005 );
init( LEGACY_TLOG_UPGRADE_ENTRIES_PER_VERSION, 100 );
init( VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS, 1072 ); // Based on a naive interpretation of the gcc version of std::deque, we would expect this to be 16 bytes overhead per 512 bytes data. In practice, it seems to be 24 bytes overhead per 512.
@ -70,6 +71,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( CONCURRENT_LOG_ROUTER_READS, 1 );
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );
init( TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES, 2e9 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES = 2e6;
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );

View File

@ -57,6 +57,7 @@ public:
double BUGGIFY_RECOVER_MEMORY_LIMIT;
double BUGGIFY_WORKER_REMOVED_MAX_LAG;
int64_t UPDATE_STORAGE_BYTE_LIMIT;
int64_t REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT;
double TLOG_PEEK_DELAY;
int LEGACY_TLOG_UPGRADE_ENTRIES_PER_VERSION;
int VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS; // Multiplicative factor to bound total space used to store a version message (measured in 1/1024ths, e.g. a value of 2048 yields a factor of 2).
@ -73,6 +74,9 @@ public:
int CONCURRENT_LOG_ROUTER_READS;
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;
double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME;
int64_t TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES;
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
// Data distribution queue
double HEALTH_POLL_TIME;

View File

@ -1523,6 +1523,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
if(poppedIsKnownCommitted) {
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped());
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
}
commitMessages(self, logData, ver, messages);
@ -1561,6 +1562,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
if(poppedIsKnownCommitted) {
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped());
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
}
if(self->terminated.isSet()) {
@ -1911,7 +1913,6 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.locality = locality;
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );

View File

@ -842,9 +842,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
break;
}
} else {
// FIXME: Temporarily disable spill-by-reference.
//set_config("log_version:=3"); // 6.1
//set_config("log_spill:=2"); // REFERENCE
set_config("log_version:=3"); // 6.1
set_config("log_spill:=2"); // REFERENCE
}
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {

View File

@ -199,9 +199,19 @@ static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringR
static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) );
static const KeyRangeRef persistLocalityKeys = KeyRangeRef( LiteralStringRef( "Locality/" ), LiteralStringRef( "Locality0" ) );
static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) );
static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/"));
static const KeyRange persistTagMessageRefsKeys = prefixRange(LiteralStringRef("TagMsgRef/"));
static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/"));
static Key persistTagMessagesKey( UID id, Tag tag, Version version ) {
BinaryWriter wr( Unversioned() );
wr.serializeBytes(persistTagMessagesKeys.begin);
wr << id;
wr << tag;
wr << bigEndian64( version );
return wr.toStringRef();
}
static Key persistTagMessageRefsKey( UID id, Tag tag, Version version ) {
BinaryWriter wr( Unversioned() );
wr.serializeBytes(persistTagMessageRefsKeys.begin);
@ -234,10 +244,18 @@ static Version decodeTagPoppedValue( ValueRef value ) {
return BinaryReader::fromStringRef<Version>( value, Unversioned() );
}
static StringRef stripTagMessagesKey( StringRef key ) {
return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessagesKeys.begin.size() );
}
static StringRef stripTagMessageRefsKey( StringRef key ) {
return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessageRefsKeys.begin.size() );
}
static Version decodeTagMessagesKey( StringRef key ) {
return bigEndian64( BinaryReader::fromStringRef<Version>( stripTagMessagesKey(key), Unversioned() ) );
}
static Version decodeTagMessageRefsKey( StringRef key ) {
return bigEndian64( BinaryReader::fromStringRef<Version>( stripTagMessageRefsKey(key), Unversioned() ) );
}
@ -274,6 +292,7 @@ struct TLogData : NonCopyable {
std::map<UID, PeekTrackerData> peekTracker;
WorkerCache<TLogInterface> tlogCache;
FlowLock peekMemoryLimiter;
PromiseStream<Future<Void>> sharedActors;
Promise<Void> terminated;
@ -285,6 +304,7 @@ struct TLogData : NonCopyable {
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS)
{
}
@ -449,6 +469,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
specialCounter(cc, "QueueDiskBytesFree", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().free; });
specialCounter(cc, "QueueDiskBytesAvailable", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().available; });
specialCounter(cc, "QueueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; });
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
}
~LogData() {
@ -466,8 +488,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLocalityKeys.begin)) );
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) );
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
Key msgKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin);
Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) );
Key msgRefKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( msgRefKey, strinc(msgRefKey) ) );
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) );
}
@ -553,22 +577,48 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
if (data->nothingPersistent) return;
self->persistentData->clear( KeyRangeRef(
persistTagMessageRefsKey( logData->logId, data->tag, Version(0) ),
persistTagMessageRefsKey( logData->logId, data->tag, data->popped ) ) );
if (data->tag == txsTag) {
self->persistentData->clear( KeyRangeRef(
persistTagMessagesKey( logData->logId, data->tag, Version(0) ),
persistTagMessagesKey( logData->logId, data->tag, data->popped ) ) );
} else {
self->persistentData->clear( KeyRangeRef(
persistTagMessageRefsKey( logData->logId, data->tag, Version(0) ),
persistTagMessageRefsKey( logData->logId, data->tag, data->popped ) ) );
}
if (data->popped > logData->persistentDataVersion)
data->nothingPersistent = true;
}
struct SpilledData {
SpilledData() = default;
SpilledData(Version version, IDiskQueue::location start, uint32_t length, uint32_t mutationBytes)
: version(version), start(start), length(length), mutationBytes(mutationBytes) {
}
template <class Ar>
void serialize_unversioned(Ar& ar) {
serializer(ar, version, start, length, mutationBytes);
}
Version version = 0;
IDiskQueue::location start = 0;
uint32_t length = 0;
uint32_t mutationBytes = 0;
};
// FIXME: One should be able to use SFINAE to choose between serialize and serialize_unversioned.
template <class Ar> void load( Ar& ar, SpilledData& data ) { data.serialize_unversioned(ar); }
template <class Ar> void save( Ar& ar, const SpilledData& data ) { const_cast<SpilledData&>(data).serialize_unversioned(ar); }
struct VerifyState {
std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> locations;
std::vector<Version> versions;
std::vector<SpilledData> spilledData;
std::vector<Future<Standalone<StringRef>>> readfutures;
};
ACTOR void verifyPersistentData( TLogData* self, VerifyState* vs ) {
for (auto iter = vs->locations.begin(); iter != vs->locations.end(); iter++) {
vs->readfutures.push_back( self->rawPersistentQueue->read( iter->first, iter->second.lo ) );
for (auto iter = vs->spilledData.begin(); iter != vs->spilledData.end(); iter++) {
vs->readfutures.push_back( self->rawPersistentQueue->read( iter->start, iter->start.lo + iter->length ) );
}
try {
wait( waitForAll(vs->readfutures) );
@ -595,13 +645,14 @@ ACTOR void verifyPersistentData( TLogData* self, VerifyState* vs ) {
ASSERT(false);
}
// ASSERT( length == rawdata.size() );
ASSERT( entry.version == vs->versions[i] );
ASSERT( entry.version == vs->spilledData[i].version );
ASSERT( valid == 1 );
}
delete vs;
}
ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logData, Version newPersistentDataVersion ) {
state BinaryWriter wr( Unversioned() );
// PERSIST: Changes self->persistentDataVersion and writes and commits the relevant changes
ASSERT( newPersistentDataVersion <= logData->version.get() );
ASSERT( newPersistentDataVersion <= logData->queueCommittedVersion.get() );
@ -630,31 +681,53 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
updatePersistentPopped( self, logData, tagData );
// Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData
state std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator msg = tagData->versionMessages.begin();
state int refSpilledTagCount = 0;
wr = BinaryWriter( Unversioned() );
// We prefix our spilled locations with a count, so that we can read this back out as a VectorRef.
wr << uint32_t(0);
while(msg != tagData->versionMessages.end() && msg->first <= newPersistentDataVersion) {
currentVersion = msg->first;
anyData = true;
tagData->nothingPersistent = false;
BinaryWriter wr( Unversioned() );
const IDiskQueue::location begin = logData->versionLocation[currentVersion].first;
const IDiskQueue::location end = logData->versionLocation[currentVersion].second;
wr << begin << end;
if (tagData->tag == txsTag) {
// spill txsTag by value
wr = BinaryWriter( Unversioned() );
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
wr << msg->second.toStringRef();
}
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
} else {
// spill everything else by reference
const IDiskQueue::location begin = logData->versionLocation[currentVersion].first;
const IDiskQueue::location end = logData->versionLocation[currentVersion].second;
ASSERT(end > begin && end.lo - begin.lo < std::numeric_limits<uint32_t>::max());
uint32_t length = static_cast<uint32_t>(end.lo - begin.lo);
refSpilledTagCount++;
uint32_t size = msg->second.expectedSize();
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
// Fast forward until we find a new version.
size += msg->second.expectedSize();
}
SpilledData spilledData( currentVersion, begin, length, size );
wr << spilledData;
if (vs && (vs->spilledData.empty() || vs->spilledData.back().version != currentVersion)) {
vs->spilledData.push_back( spilledData );
}
Future<Void> f = yield(TaskUpdateStorage);
if(!f.isReady()) {
wait(f);
msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
}
}
}
if (refSpilledTagCount > 0) {
*(uint32_t*)wr.getData() = refSpilledTagCount;
self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
if (vs && (vs->versions.empty() || vs->versions.back() != currentVersion)) {
vs->versions.push_back( currentVersion );
vs->locations.push_back( std::make_pair( begin, end ) );
}
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
// Fast forward until we find a new version.
}
Future<Void> f = yield(TaskUpdateStorage);
if(!f.isReady()) {
wait(f);
msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
}
}
wait(yield(TaskUpdateStorage));
@ -714,7 +787,11 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
if (tagData) {
minVersion = std::min(minVersion, tagData->popped);
if (tagData->tag == txsTag) {
minVersion = std::min(minVersion, newPersistentDataVersion);
} else {
minVersion = std::min(minVersion, tagData->popped);
}
}
}
}
@ -751,7 +828,8 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
nextVersion = logData->version.get();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() )
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
sizeItr != logData->version_sizes.end() )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr;
@ -790,7 +868,8 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
nextVersion = logData->version.get();
} else {
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
@ -1017,10 +1096,10 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
}
}
std::vector<StringRef> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) {
ACTOR Future<std::vector<StringRef>> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) {
// See the comment in LogSystem.cpp for the binary format of commitBlob.
std::vector<StringRef> relevantMessages;
BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
state std::vector<StringRef> relevantMessages;
state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
while (!rd.empty()) {
uint32_t messageLength = 0;
uint32_t subsequence = 0;
@ -1048,7 +1127,7 @@ std::vector<StringRef> parseMessagesForTag( StringRef commitBlob, Tag tag, int l
if (match) {
relevantMessages.push_back( StringRef((uint8_t*)begin, messageLength) );
}
// FIXME: Yield here so Evan doesn't have to
wait(yield());
}
return relevantMessages;
}
@ -1155,60 +1234,98 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
peekMessagesFromMemory( logData, req, messages2, endVersion );
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
state Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1))));
if (req.tag == txsTag) {
Standalone<VectorRef<KeyValueRef>> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<Future<Standalone<StringRef>>> messageReads;
for (auto &kv : kvrefs) {
IDiskQueue::location begin, end;
BinaryReader r(kv.value, Unversioned());
r >> begin >> end;
messageReads.push_back( self->rawPersistentQueue->read(begin, end) );
}
wait( waitForAll( messageReads ) );
ASSERT( messageReads.size() == kvrefs.size() );
Version lastRefMessageVersion = 0;
for (int i = 0; i < messageReads.size(); i++ ) {
Standalone<StringRef> queueEntryData = messageReads[i].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr( 4, queueEntryData.size() - 4);
BinaryReader rd( queueEntryData, IncludeVersion() );
TLogQueueEntry entry;
rd >> entry >> valid;
Version version = decodeTagMessageRefsKey(kvrefs[i].key);
ASSERT( valid == 0x01 );
ASSERT( length + sizeof(valid) == queueEntryData.size() );
messages << int32_t(-1) << version;
// FIXME: push DESIRED_TOTAL_BYTES into parseMessagesForTag
// FIXME: maybe push this copy in as well
std::vector<StringRef> parsedMessages = parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags);
for (StringRef msg : parsedMessages) {
messages << msg;
for (auto &kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << int32_t(-1) << ver;
messages.serializeBytes(kv.value);
}
lastRefMessageVersion = version;
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
else
messages.serializeBytes( messages2.toStringRef() );
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1))));
if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
break;
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
for (auto &kv : kvrefs) {
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, Unversioned());
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
break;
}
if (sd.version >= req.begin) {
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.push_back( std::make_pair(sd.start, end) );
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd) break;
}
wait( self->peekMemoryLimiter.take(TaskTLogPeekReply, commitBytes) );
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve( commitLocations.size() );
for (const auto& pair : commitLocations) {
messageReads.push_back( self->rawPersistentQueue->read(pair.first, pair.second) );
}
commitLocations.clear();
wait( waitForAll( messageReads ) );
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size()) break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr( 4, queueEntryData.size() - 4);
BinaryReader rd( queueEntryData, IncludeVersion() );
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT( valid == 0x01 );
ASSERT( length + sizeof(valid) == queueEntryData.size() );
messages << int32_t(-1) << entry.version;
std::vector<StringRef> parsedMessages = wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
for (StringRef msg : parsedMessages) {
messages << msg;
}
lastRefMessageVersion = entry.version;
index++;
}
messageReads.clear();
memoryReservation.release();
if (earlyEnd)
endVersion = lastRefMessageVersion + 1;
else
messages.serializeBytes( messages2.toStringRef() );
}
messageReads.clear();
kvrefs = Standalone<VectorRef<KeyValueRef>>();
if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
endVersion = lastRefMessageVersion + 1;
else
messages.serializeBytes( messages2.toStringRef() );
} else {
peekMessagesFromMemory( logData, req, messages, endVersion );
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());

View File

@ -236,6 +236,11 @@ struct ConfigureDatabaseWorkload : TestWorkload {
return StringRef(format("DestroyDB%d", dbIndex));
}
static Future<ConfigurationResult::Type> IssueConfigurationChange( Database cx, const std::string& config, bool force ) {
printf("Issuing configuration change: %s\n", config.c_str());
return changeConfig(cx, config, force);
}
ACTOR Future<Void> _setup( Database cx, ConfigureDatabaseWorkload *self ) {
wait(success( changeConfig( cx, "single", true ) ));
return Void();
@ -330,7 +335,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
if (g_random->random01() < 0.5) config += " proxies=" + format("%d", randomRoleNumber());
if (g_random->random01() < 0.5) config += " resolvers=" + format("%d", randomRoleNumber());
wait(success( changeConfig( cx, config, false ) ));
wait(success( IssueConfigurationChange( cx, config, false ) ));
//TraceEvent("ConfigureTestConfigureEnd").detail("NewConfig", newConfig);
}
@ -343,11 +348,11 @@ struct ConfigureDatabaseWorkload : TestWorkload {
//TraceEvent("ConfigureTestConfigureEnd").detail("NewQuorum", s);
}
else if ( randomChoice == 5) {
wait(success( changeConfig( cx, storeTypes[g_random->randomInt( 0, sizeof(storeTypes)/sizeof(storeTypes[0]))], true ) ));
wait(success( IssueConfigurationChange( cx, storeTypes[g_random->randomInt( 0, sizeof(storeTypes)/sizeof(storeTypes[0]))], true ) ));
}
else if ( randomChoice == 6 ) {
// Some configurations will be invalid, and that's fine.
wait(success( changeConfig( cx, logTypes[g_random->randomInt( 0, sizeof(logTypes)/sizeof(logTypes[0]))], false ) ));
wait(success( IssueConfigurationChange( cx, logTypes[g_random->randomInt( 0, sizeof(logTypes)/sizeof(logTypes[0]))], false ) ));
}
else {
ASSERT(false);

View File

@ -24,6 +24,8 @@
#include <cinttypes>
#include "flow/IRandom.h"
#include "flow/Error.h"
#include "flow/Trace.h"
#include <random>
@ -90,6 +92,14 @@ public:
uint32_t randomUInt32() { return gen64(); }
uint32_t randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) {
std::uniform_real_distribution<double> distribution( std::log(min), std::log(maxPlusOne-1) );
double logpower = distribution(random);
uint32_t loguniform = static_cast<uint32_t>( std::pow( 10, logpower ) );
// doubles can be imprecise, so let's make sure we don't violate an edge case.
return std::max(std::min(loguniform, maxPlusOne-1), min);
}
UID randomUniqueID() {
uint64_t x,y;
x = gen64();

View File

@ -76,6 +76,7 @@ public:
virtual UID randomUniqueID() = 0;
virtual char randomAlphaNumeric() = 0;
virtual std::string randomAlphaNumeric( int length ) = 0;
virtual uint32_t randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) = 0;
virtual uint64_t peek() const = 0; // returns something that is probably different for different random states. Deterministic (and idempotent) for a deterministic generator.
// The following functions have fixed implementations for now:

View File

@ -1198,11 +1198,11 @@ struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
FlowLock* lock;
int remaining;
Releaser() : lock(0), remaining(0) {}
Releaser( FlowLock& lock, int amount = 1 ) : lock(&lock), remaining(amount) {}
Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {}
Releaser(Releaser&& r) noexcept(true) : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; }
void release( int amount = -1 ) {
void release( int64_t amount = -1 ) {
if( amount == -1 || amount > remaining )
amount = remaining;
@ -1215,23 +1215,23 @@ struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
};
FlowLock() : permits(1), active(0) {}
explicit FlowLock(int permits) : permits(permits), active(0) {}
explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
Future<Void> take(int taskID = TaskDefaultYield, int amount = 1) {
ASSERT(amount <= permits);
if (active + amount <= permits) {
Future<Void> take(int taskID = TaskDefaultYield, int64_t amount = 1) {
ASSERT(amount <= permits || active == 0);
if (active + amount <= permits || active == 0) {
active += amount;
return safeYieldActor(this, taskID, amount);
}
return takeActor(this, taskID, amount);
}
void release( int amount = 1 ) {
ASSERT( active > 0 || amount == 0 );
void release( int64_t amount = 1 ) {
ASSERT( (active > 0 || amount == 0) && active - amount >= 0 );
active -= amount;
while( !takers.empty() ) {
if( active + takers.begin()->second <= permits ) {
std::pair< Promise<Void>, int > next = std::move( *takers.begin() );
if( active + takers.begin()->second <= permits || active == 0 ) {
std::pair< Promise<Void>, int64_t > next = std::move( *takers.begin() );
active += next.second;
takers.pop_front();
next.first.send(Void());
@ -1244,21 +1244,21 @@ struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
Future<Void> releaseWhen( Future<Void> const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); }
// returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken
Future<Void> takeUpTo(int& amount) {
Future<Void> takeUpTo(int64_t& amount) {
return takeMoreActor(this, &amount);
}
int available() const { return permits - active; }
int activePermits() const { return active; }
int64_t available() const { return permits - active; }
int64_t activePermits() const { return active; }
int waiters() const { return takers.size(); }
private:
std::list< std::pair< Promise<Void>, int > > takers;
const int permits;
int active;
std::list< std::pair< Promise<Void>, int64_t > > takers;
const int64_t permits;
int64_t active;
Promise<Void> broken_on_destruct;
ACTOR static Future<Void> takeActor(FlowLock* lock, int taskID, int amount) {
state std::list<std::pair<Promise<Void>, int>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
ACTOR static Future<Void> takeActor(FlowLock* lock, int taskID, int64_t amount) {
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
try {
wait( it->first.getFuture() );
@ -1281,15 +1281,15 @@ private:
}
}
ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int* amount) {
ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
wait(lock->take());
int extra = std::min( lock->available(), *amount-1 );
int64_t extra = std::min( lock->available(), *amount-1 );
lock->active += extra;
*amount = 1 + extra;
return Void();
}
ACTOR static Future<Void> safeYieldActor(FlowLock* lock, int taskID, int amount) {
ACTOR static Future<Void> safeYieldActor(FlowLock* lock, int taskID, int64_t amount) {
try {
choose{
when(wait(yield(taskID))) {}
@ -1302,7 +1302,7 @@ private:
}
}
ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int amount ) {
ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int64_t amount ) {
wait(signal);
self->release(amount);
return Void();