Merge pull request #1116 from alexmiller-apple/tstlog

Random cleanups that prepare for Spill-By-Reference TLog
This commit is contained in:
Evan Tschannen 2019-02-05 18:09:06 -08:00 committed by GitHub
commit 486e0e13c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 64 deletions

View File

@ -606,13 +606,13 @@ public:
state Optional<Version> metaUnreliableEnd;
std::vector<Future<Void>> metaReads;
metaReads.push_back(store(bc->expiredEndVersion().get(), metaExpiredEnd));
metaReads.push_back(store(bc->unreliableEndVersion().get(), metaUnreliableEnd));
metaReads.push_back(store(metaExpiredEnd, bc->expiredEndVersion().get()));
metaReads.push_back(store(metaUnreliableEnd, bc->unreliableEndVersion().get()));
// Only read log begin/end versions if not doing a deep scan, otherwise scan files and recalculate them.
if(!deepScan) {
metaReads.push_back(store(bc->logBeginVersion().get(), metaLogBegin));
metaReads.push_back(store(bc->logEndVersion().get(), metaLogEnd));
metaReads.push_back(store(metaLogBegin, bc->logBeginVersion().get()));
metaReads.push_back(store(metaLogEnd, bc->logEndVersion().get()));
}
wait(waitForAll(metaReads));
@ -682,7 +682,7 @@ public:
}
state std::vector<LogFile> logs;
wait(store(bc->listLogFiles(scanBegin, scanEnd), logs) && store(bc->listKeyspaceSnapshots(), desc.snapshots));
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd)) && store(desc.snapshots, bc->listKeyspaceSnapshots()));
// List logs in version order so log continuity can be analyzed
std::sort(logs.begin(), logs.end());
@ -842,7 +842,7 @@ public:
progress->step = "Listing files";
}
// Get log files or range files that contain any data at or before expireEndVersion
wait(store(bc->listLogFiles(scanBegin, expireEndVersion - 1), logs) && store(bc->listRangeFiles(scanBegin, expireEndVersion - 1), ranges));
wait(store(logs, bc->listLogFiles(scanBegin, expireEndVersion - 1)) && store(ranges, bc->listRangeFiles(scanBegin, expireEndVersion - 1)));
// The new logBeginVersion will be taken from the last log file, if there is one
state Optional<Version> newLogBeginVersion;
@ -1575,7 +1575,7 @@ ACTOR Future<Version> timeKeeperVersionFromDatetime(std::string datetime, Databa
if (results.size() != 1) {
// No key less than time was found in the database
// Look for a key >= time.
wait( store( versionMap.getRange(tr, time, std::numeric_limits<int64_t>::max(), 1), results) );
wait( store( results, versionMap.getRange(tr, time, std::numeric_limits<int64_t>::max(), 1) ) );
if(results.size() != 1) {
fprintf(stderr, "ERROR: Unable to calculate a version for given date/time.\n");
@ -1615,7 +1615,7 @@ ACTOR Future<Optional<int64_t>> timeKeeperEpochsFromVersion(Version v, Reference
if(mid == min) {
// There aren't any records having a version < v, so just look for any record having a time < now
// and base a result on it
wait(store(versionMap.getRange(tr, 0, (int64_t)now(), 1), results));
wait(store(results, versionMap.getRange(tr, 0, (int64_t)now(), 1)));
if (results.size() != 1) {
// There aren't any timekeeper records to base a result on so return nothing

View File

@ -1144,8 +1144,8 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(taskBucket->keepRunning(tr, task)
&& storeOrThrow(backup.snapshotBeginVersion().get(tr), snapshotBeginVersion)
&& store(backup.snapshotRangeFileCount().getD(tr), snapshotRangeFileCount)
&& storeOrThrow(snapshotBeginVersion, backup.snapshotBeginVersion().get(tr))
&& store(snapshotRangeFileCount, backup.snapshotRangeFileCount().getD(tr))
);
break;
@ -1324,15 +1324,15 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
&& store(config.backupRanges().getOrThrow(tr), backupRanges)
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotIntervalSeconds)
wait( store(snapshotBeginVersion, config.snapshotBeginVersion().getOrThrow(tr))
&& store(snapshotTargetEndVersion, config.snapshotTargetEndVersion().getOrThrow(tr))
&& store(backupRanges, config.backupRanges().getOrThrow(tr))
&& store(snapshotIntervalSeconds, config.snapshotIntervalSeconds().getOrThrow(tr))
// The next two parameters are optional
&& store(config.snapshotBatchFuture().get(tr), snapshotBatchFutureKey)
&& store(config.snapshotBatchSize().get(tr), snapshotBatchSize)
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion)
&& store(tr->getReadVersion(), recentReadVersion)
&& store(snapshotBatchFutureKey, config.snapshotBatchFuture().get(tr))
&& store(snapshotBatchSize, config.snapshotBatchSize().get(tr))
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr))
&& store(recentReadVersion, tr->getReadVersion())
&& taskBucket->keepRunning(tr, task));
// If the snapshot batch future key does not exist, create it, set it, and commit
@ -1375,7 +1375,7 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<std::vector<std::pair<Key, bool>>> bounds = config.snapshotRangeDispatchMap().getRange(tr, beginKey, keyAfter(normalKeys.end), CLIENT_KNOBS->TOO_MANY);
wait(success(bounds) && taskBucket->keepRunning(tr, task) && store(tr->getReadVersion(), recentReadVersion));
wait(success(bounds) && taskBucket->keepRunning(tr, task) && store(recentReadVersion, tr->getReadVersion()));
if(bounds.get().empty())
break;
@ -1579,7 +1579,7 @@ namespace fileBackup {
endReads.push_back( config.snapshotRangeDispatchMap().get(tr, range.end));
}
wait(store(config.snapshotBatchSize().getOrThrow(tr), snapshotBatchSize.get())
wait(store(snapshotBatchSize.get(), config.snapshotBatchSize().getOrThrow(tr))
&& waitForAll(beginReads) && waitForAll(endReads) && taskBucket->keepRunning(tr, task));
// Snapshot batch size should be either oldBatchSize or newBatchSize. If new, this transaction is already done.
@ -1683,8 +1683,8 @@ namespace fileBackup {
state Key snapshotBatchFutureKey;
state Key snapshotBatchDispatchDoneKey;
wait( store(config.snapshotBatchFuture().getOrThrow(tr), snapshotBatchFutureKey)
&& store(config.snapshotBatchDispatchDoneKey().getOrThrow(tr), snapshotBatchDispatchDoneKey));
wait( store(snapshotBatchFutureKey, config.snapshotBatchFuture().getOrThrow(tr))
&& store(snapshotBatchDispatchDoneKey, config.snapshotBatchDispatchDoneKey().getOrThrow(tr)));
state Reference<TaskFuture> snapshotBatchFuture = futureBucket->unpack(snapshotBatchFutureKey);
state Reference<TaskFuture> snapshotBatchDispatchDoneFuture = futureBucket->unpack(snapshotBatchDispatchDoneKey);
@ -2010,11 +2010,11 @@ namespace fileBackup {
state Optional<std::string> tag;
state Optional<Version> latestSnapshotEndVersion;
wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
&& store(config.stateEnum().getOrThrow(tr), backupState)
&& store(config.tag().get(tr), tag)
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion));
wait(store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
&& store(restorableVersion, config.getLatestRestorableVersion(tr))
&& store(backupState, config.stateEnum().getOrThrow(tr))
&& store(tag, config.tag().get(tr))
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr)));
// If restorable, update the last restorable version for this tag
if(restorableVersion.present() && tag.present()) {
@ -2161,7 +2161,7 @@ namespace fileBackup {
if(!bc) {
// Backup container must be present if we're still here
wait(store(config.backupContainer().getOrThrow(tr), bc));
wait(store(bc, config.backupContainer().getOrThrow(tr)));
}
BackupConfig::RangeFileMapT::PairsType rangeresults = wait(config.snapshotRangeFileMap().getRange(tr, startKey, {}, batchSize));
@ -2242,11 +2242,11 @@ namespace fileBackup {
state Optional<Version> firstSnapshotEndVersion;
state Optional<std::string> tag;
wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
&& store(config.stateEnum().getOrThrow(tr), backupState)
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)
&& store(config.tag().get(tr), tag));
wait(store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
&& store(backupState, config.stateEnum().getOrThrow(tr))
&& store(restorableVersion, config.getLatestRestorableVersion(tr))
&& store(firstSnapshotEndVersion, config.firstSnapshotEndVersion().get(tr))
&& store(tag, config.tag().get(tr)));
// If restorable, update the last restorable version for this tag
if(restorableVersion.present() && tag.present()) {
@ -2828,7 +2828,7 @@ namespace fileBackup {
state bool addingToExistingBatch = remainingInBatch > 0;
state Version restoreVersion;
wait(store(restore.restoreVersion().getOrThrow(tr), restoreVersion)
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr))
&& checkTaskVersion(tr->getDatabase(), task, name, version));
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
@ -3750,9 +3750,9 @@ public:
state Optional<Version> latestRestorableVersion;
state Version recentReadVersion;
wait( store(config.getLatestRestorableVersion(tr), latestRestorableVersion)
&& store(config.backupContainer().getOrThrow(tr), bc)
&& store(tr->getReadVersion(), recentReadVersion)
wait( store(latestRestorableVersion, config.getLatestRestorableVersion(tr))
&& store(bc, config.backupContainer().getOrThrow(tr))
&& store(recentReadVersion, tr->getReadVersion())
);
bool snapshotProgress = false;
@ -3791,20 +3791,20 @@ public:
state Optional<int64_t> snapshotTargetEndVersionTimestamp;
state bool stopWhenDone;
wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotInterval)
&& store(config.logBytesWritten().get(tr), logBytesWritten)
&& store(config.rangeBytesWritten().get(tr), rangeBytesWritten)
&& store(config.latestLogEndVersion().get(tr), latestLogEndVersion)
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion)
&& store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
wait( store(snapshotBeginVersion, config.snapshotBeginVersion().getOrThrow(tr))
&& store(snapshotTargetEndVersion, config.snapshotTargetEndVersion().getOrThrow(tr))
&& store(snapshotInterval, config.snapshotIntervalSeconds().getOrThrow(tr))
&& store(logBytesWritten, config.logBytesWritten().get(tr))
&& store(rangeBytesWritten, config.rangeBytesWritten().get(tr))
&& store(latestLogEndVersion, config.latestLogEndVersion().get(tr))
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr))
&& store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
);
wait( store(getTimestampFromVersion(latestSnapshotEndVersion, tr), latestSnapshotEndVersionTimestamp)
&& store(getTimestampFromVersion(latestLogEndVersion, tr), latestLogEndVersionTimestamp)
&& store(timeKeeperEpochsFromVersion(snapshotBeginVersion, tr), snapshotBeginVersionTimestamp)
&& store(timeKeeperEpochsFromVersion(snapshotTargetEndVersion, tr), snapshotTargetEndVersionTimestamp)
wait( store(latestSnapshotEndVersionTimestamp, getTimestampFromVersion(latestSnapshotEndVersion, tr))
&& store(latestLogEndVersionTimestamp, getTimestampFromVersion(latestLogEndVersion, tr))
&& store(snapshotBeginVersionTimestamp, timeKeeperEpochsFromVersion(snapshotBeginVersion, tr))
&& store(snapshotTargetEndVersionTimestamp, timeKeeperEpochsFromVersion(snapshotTargetEndVersion, tr))
);
statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval);

View File

@ -32,6 +32,7 @@
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "flow/actorcompiler.h" // This must be the last #include.
bool simulator_should_inject_fault( const char* context, const char* file, int line, int error_code ) {
if (!g_network->isSimulated()) return false;
@ -521,6 +522,8 @@ private:
}
ACTOR static Future<int> read_impl( SimpleFile* self, void* data, int length, int64_t offset ) {
ASSERT( ( self->flags & IAsyncFile::OPEN_NO_AIO ) != 0 ||
( (uintptr_t)data % 4096 == 0 && length % 4096 == 0 && offset % 4096 == 0 ) ); // Required by KAIO.
state UID opId = g_random->randomUniqueID();
if (randLog)
fprintf( randLog, "SFR1 %s %s %s %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), length, offset );

View File

@ -704,9 +704,9 @@ public:
ASSERT( recovered );
if (!pushedPageCount()) {
if (!anyPopped) return Void();
anyPopped = false;
addEmptyPage();
}
anyPopped = false;
backPage().popped = poppedSeq;
backPage().zeroPad();
backPage().updateHash();
@ -732,6 +732,7 @@ public:
pushed_page_buffer = 0;
return f;
}
void stall() {
rawQueue->stall();
}
@ -827,15 +828,15 @@ private:
}
}
void readFromBuffer( StringBuffer& result, int& bytes ) {
void readFromBuffer( StringBuffer* result, int* bytes ) {
// extract up to bytes from readBufPage into result
int len = std::min( readBufPage->payloadSize - readBufPos, bytes );
int len = std::min( readBufPage->payloadSize - readBufPos, *bytes );
if (len<=0) return;
result.append( StringRef(readBufPage->payload+readBufPos, len) );
result->append( StringRef(readBufPage->payload+readBufPos, len) );
readBufPos += len;
bytes -= len;
*bytes -= len;
nextReadLocation += len;
}
@ -865,7 +866,7 @@ private:
loop {
if (self->readBufPage) {
self->readFromBuffer( result, bytes );
self->readFromBuffer( &result, &bytes );
// if done, return
if (!bytes) return result.str;
ASSERT( self->readBufPos == self->readBufPage->payloadSize );
@ -898,7 +899,7 @@ private:
// The fully durable popped point is self->lastPoppedSeq; tell the raw queue that.
int f; int64_t p;
TEST( self->lastPoppedSeq/sizeof(Page) != self->poppedSeq/sizeof(Page) ); // DiskQueue: Recovery popped position not fully durable
self->findPhysicalLocation( self->lastPoppedSeq, f, p, "lastPoppedSeq" );
self->findPhysicalLocation( self->lastPoppedSeq, &f, &p, "lastPoppedSeq" );
wait(self->rawQueue->setPoppedPage( f, p, self->lastPoppedSeq/sizeof(Page)*sizeof(Page) ));
// Writes go at the end of our reads (but on the next page)
@ -947,13 +948,13 @@ private:
*/
int file; int64_t page;
self->findPhysicalLocation( self->poppedSeq, file, page, "poppedSeq" );
self->findPhysicalLocation( self->poppedSeq, &file, &page, "poppedSeq" );
self->rawQueue->setStartPage( file, page );
return true;
}
void findPhysicalLocation( loc_t loc, int& file, int64_t& page, const char* context ) {
void findPhysicalLocation( loc_t loc, int* file, int64_t* page, const char* context ) {
bool ok = false;
Page*p = (Page*)recoveryFirstPages.begin();
@ -969,11 +970,11 @@ private:
for(int i=recoveryFirstPages.size() / sizeof(Page) - 2; i>=0; i--)
if ( p[i].checkHash() && p[i].seq <= (size_t)loc ) {
file = i;
page = (loc - p[i].seq)/sizeof(Page);
*file = i;
*page = (loc - p[i].seq)/sizeof(Page);
TraceEvent("FoundPhysicalLocation", dbgid)
.detail("PageIndex", i)
.detail("PageLocation", page)
.detail("PageLocation", *page)
.detail("RecoveryFirstPagesSize", recoveryFirstPages.size())
.detail("SizeofPage", sizeof(Page))
.detail("PageSequence", p[i].seq)
@ -1007,7 +1008,7 @@ private:
RawDiskQueue_TwoFiles *rawQueue;
UID dbgid;
bool anyPopped; // pop() has been called since the most recent commit()
bool anyPopped; // pop() has been called since the most recent call to commit()
bool warnAlwaysForMemory;
loc_t nextPageSeq, poppedSeq;
loc_t lastPoppedSeq; // poppedSeq the last time commit was called

View File

@ -294,12 +294,12 @@ Future<Void> holdWhileVoid(X object, Future<T> what)
}
template<class T>
Future<Void> store(Future<T> what, T &out) {
Future<Void> store(T &out, Future<T> what) {
return map(what, [&out](T const &v) { out = v; return Void(); });
}
template<class T>
Future<Void> storeOrThrow(Future<Optional<T>> what, T &out, Error e = key_not_found()) {
Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
return map(what, [&out,e](Optional<T> const &o) {
if(!o.present())
throw e;