Change store from (Future, T&) to (T&, Future).
LHS = RHS, and the name of what's being modified is easier to find.
This commit is contained in:
parent
22a08b2b4e
commit
0750dc0418
|
@ -606,13 +606,13 @@ public:
|
|||
state Optional<Version> metaUnreliableEnd;
|
||||
|
||||
std::vector<Future<Void>> metaReads;
|
||||
metaReads.push_back(store(bc->expiredEndVersion().get(), metaExpiredEnd));
|
||||
metaReads.push_back(store(bc->unreliableEndVersion().get(), metaUnreliableEnd));
|
||||
metaReads.push_back(store(metaExpiredEnd, bc->expiredEndVersion().get()));
|
||||
metaReads.push_back(store(metaUnreliableEnd, bc->unreliableEndVersion().get()));
|
||||
|
||||
// Only read log begin/end versions if not doing a deep scan, otherwise scan files and recalculate them.
|
||||
if(!deepScan) {
|
||||
metaReads.push_back(store(bc->logBeginVersion().get(), metaLogBegin));
|
||||
metaReads.push_back(store(bc->logEndVersion().get(), metaLogEnd));
|
||||
metaReads.push_back(store(metaLogBegin, bc->logBeginVersion().get()));
|
||||
metaReads.push_back(store(metaLogEnd, bc->logEndVersion().get()));
|
||||
}
|
||||
|
||||
wait(waitForAll(metaReads));
|
||||
|
@ -682,7 +682,7 @@ public:
|
|||
}
|
||||
|
||||
state std::vector<LogFile> logs;
|
||||
wait(store(bc->listLogFiles(scanBegin, scanEnd), logs) && store(bc->listKeyspaceSnapshots(), desc.snapshots));
|
||||
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd)) && store(desc.snapshots, bc->listKeyspaceSnapshots()));
|
||||
|
||||
// List logs in version order so log continuity can be analyzed
|
||||
std::sort(logs.begin(), logs.end());
|
||||
|
@ -842,7 +842,7 @@ public:
|
|||
progress->step = "Listing files";
|
||||
}
|
||||
// Get log files or range files that contain any data at or before expireEndVersion
|
||||
wait(store(bc->listLogFiles(scanBegin, expireEndVersion - 1), logs) && store(bc->listRangeFiles(scanBegin, expireEndVersion - 1), ranges));
|
||||
wait(store(logs, bc->listLogFiles(scanBegin, expireEndVersion - 1)) && store(ranges, bc->listRangeFiles(scanBegin, expireEndVersion - 1)));
|
||||
|
||||
// The new logBeginVersion will be taken from the last log file, if there is one
|
||||
state Optional<Version> newLogBeginVersion;
|
||||
|
@ -1575,7 +1575,7 @@ ACTOR Future<Version> timeKeeperVersionFromDatetime(std::string datetime, Databa
|
|||
if (results.size() != 1) {
|
||||
// No key less than time was found in the database
|
||||
// Look for a key >= time.
|
||||
wait( store( versionMap.getRange(tr, time, std::numeric_limits<int64_t>::max(), 1), results) );
|
||||
wait( store( results, versionMap.getRange(tr, time, std::numeric_limits<int64_t>::max(), 1) ) );
|
||||
|
||||
if(results.size() != 1) {
|
||||
fprintf(stderr, "ERROR: Unable to calculate a version for given date/time.\n");
|
||||
|
@ -1615,7 +1615,7 @@ ACTOR Future<Optional<int64_t>> timeKeeperEpochsFromVersion(Version v, Reference
|
|||
if(mid == min) {
|
||||
// There aren't any records having a version < v, so just look for any record having a time < now
|
||||
// and base a result on it
|
||||
wait(store(versionMap.getRange(tr, 0, (int64_t)now(), 1), results));
|
||||
wait(store(results, versionMap.getRange(tr, 0, (int64_t)now(), 1)));
|
||||
|
||||
if (results.size() != 1) {
|
||||
// There aren't any timekeeper records to base a result on so return nothing
|
||||
|
|
|
@ -1144,8 +1144,8 @@ namespace fileBackup {
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
wait(taskBucket->keepRunning(tr, task)
|
||||
&& storeOrThrow(backup.snapshotBeginVersion().get(tr), snapshotBeginVersion)
|
||||
&& store(backup.snapshotRangeFileCount().getD(tr), snapshotRangeFileCount)
|
||||
&& storeOrThrow(snapshotBeginVersion, backup.snapshotBeginVersion().get(tr))
|
||||
&& store(snapshotRangeFileCount, backup.snapshotRangeFileCount().getD(tr))
|
||||
);
|
||||
|
||||
break;
|
||||
|
@ -1324,15 +1324,15 @@ namespace fileBackup {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
|
||||
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
|
||||
&& store(config.backupRanges().getOrThrow(tr), backupRanges)
|
||||
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotIntervalSeconds)
|
||||
wait( store(snapshotBeginVersion, config.snapshotBeginVersion().getOrThrow(tr))
|
||||
&& store(snapshotTargetEndVersion, config.snapshotTargetEndVersion().getOrThrow(tr))
|
||||
&& store(backupRanges, config.backupRanges().getOrThrow(tr))
|
||||
&& store(snapshotIntervalSeconds, config.snapshotIntervalSeconds().getOrThrow(tr))
|
||||
// The next two parameters are optional
|
||||
&& store(config.snapshotBatchFuture().get(tr), snapshotBatchFutureKey)
|
||||
&& store(config.snapshotBatchSize().get(tr), snapshotBatchSize)
|
||||
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion)
|
||||
&& store(tr->getReadVersion(), recentReadVersion)
|
||||
&& store(snapshotBatchFutureKey, config.snapshotBatchFuture().get(tr))
|
||||
&& store(snapshotBatchSize, config.snapshotBatchSize().get(tr))
|
||||
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr))
|
||||
&& store(recentReadVersion, tr->getReadVersion())
|
||||
&& taskBucket->keepRunning(tr, task));
|
||||
|
||||
// If the snapshot batch future key does not exist, create it, set it, and commit
|
||||
|
@ -1375,7 +1375,7 @@ namespace fileBackup {
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state Future<std::vector<std::pair<Key, bool>>> bounds = config.snapshotRangeDispatchMap().getRange(tr, beginKey, keyAfter(normalKeys.end), CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(bounds) && taskBucket->keepRunning(tr, task) && store(tr->getReadVersion(), recentReadVersion));
|
||||
wait(success(bounds) && taskBucket->keepRunning(tr, task) && store(recentReadVersion, tr->getReadVersion()));
|
||||
|
||||
if(bounds.get().empty())
|
||||
break;
|
||||
|
@ -1579,7 +1579,7 @@ namespace fileBackup {
|
|||
endReads.push_back( config.snapshotRangeDispatchMap().get(tr, range.end));
|
||||
}
|
||||
|
||||
wait(store(config.snapshotBatchSize().getOrThrow(tr), snapshotBatchSize.get())
|
||||
wait(store(snapshotBatchSize.get(), config.snapshotBatchSize().getOrThrow(tr))
|
||||
&& waitForAll(beginReads) && waitForAll(endReads) && taskBucket->keepRunning(tr, task));
|
||||
|
||||
// Snapshot batch size should be either oldBatchSize or newBatchSize. If new, this transaction is already done.
|
||||
|
@ -1683,8 +1683,8 @@ namespace fileBackup {
|
|||
state Key snapshotBatchFutureKey;
|
||||
state Key snapshotBatchDispatchDoneKey;
|
||||
|
||||
wait( store(config.snapshotBatchFuture().getOrThrow(tr), snapshotBatchFutureKey)
|
||||
&& store(config.snapshotBatchDispatchDoneKey().getOrThrow(tr), snapshotBatchDispatchDoneKey));
|
||||
wait( store(snapshotBatchFutureKey, config.snapshotBatchFuture().getOrThrow(tr))
|
||||
&& store(snapshotBatchDispatchDoneKey, config.snapshotBatchDispatchDoneKey().getOrThrow(tr)));
|
||||
|
||||
state Reference<TaskFuture> snapshotBatchFuture = futureBucket->unpack(snapshotBatchFutureKey);
|
||||
state Reference<TaskFuture> snapshotBatchDispatchDoneFuture = futureBucket->unpack(snapshotBatchDispatchDoneKey);
|
||||
|
@ -2010,11 +2010,11 @@ namespace fileBackup {
|
|||
state Optional<std::string> tag;
|
||||
state Optional<Version> latestSnapshotEndVersion;
|
||||
|
||||
wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.tag().get(tr), tag)
|
||||
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion));
|
||||
wait(store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
|
||||
&& store(restorableVersion, config.getLatestRestorableVersion(tr))
|
||||
&& store(backupState, config.stateEnum().getOrThrow(tr))
|
||||
&& store(tag, config.tag().get(tr))
|
||||
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr)));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
|
@ -2161,7 +2161,7 @@ namespace fileBackup {
|
|||
|
||||
if(!bc) {
|
||||
// Backup container must be present if we're still here
|
||||
wait(store(config.backupContainer().getOrThrow(tr), bc));
|
||||
wait(store(bc, config.backupContainer().getOrThrow(tr)));
|
||||
}
|
||||
|
||||
BackupConfig::RangeFileMapT::PairsType rangeresults = wait(config.snapshotRangeFileMap().getRange(tr, startKey, {}, batchSize));
|
||||
|
@ -2242,11 +2242,11 @@ namespace fileBackup {
|
|||
state Optional<Version> firstSnapshotEndVersion;
|
||||
state Optional<std::string> tag;
|
||||
|
||||
wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)
|
||||
&& store(config.tag().get(tr), tag));
|
||||
wait(store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
|
||||
&& store(backupState, config.stateEnum().getOrThrow(tr))
|
||||
&& store(restorableVersion, config.getLatestRestorableVersion(tr))
|
||||
&& store(firstSnapshotEndVersion, config.firstSnapshotEndVersion().get(tr))
|
||||
&& store(tag, config.tag().get(tr)));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
|
@ -2828,7 +2828,7 @@ namespace fileBackup {
|
|||
state bool addingToExistingBatch = remainingInBatch > 0;
|
||||
state Version restoreVersion;
|
||||
|
||||
wait(store(restore.restoreVersion().getOrThrow(tr), restoreVersion)
|
||||
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr))
|
||||
&& checkTaskVersion(tr->getDatabase(), task, name, version));
|
||||
|
||||
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
|
||||
|
@ -3750,9 +3750,9 @@ public:
|
|||
state Optional<Version> latestRestorableVersion;
|
||||
state Version recentReadVersion;
|
||||
|
||||
wait( store(config.getLatestRestorableVersion(tr), latestRestorableVersion)
|
||||
&& store(config.backupContainer().getOrThrow(tr), bc)
|
||||
&& store(tr->getReadVersion(), recentReadVersion)
|
||||
wait( store(latestRestorableVersion, config.getLatestRestorableVersion(tr))
|
||||
&& store(bc, config.backupContainer().getOrThrow(tr))
|
||||
&& store(recentReadVersion, tr->getReadVersion())
|
||||
);
|
||||
|
||||
bool snapshotProgress = false;
|
||||
|
@ -3791,20 +3791,20 @@ public:
|
|||
state Optional<int64_t> snapshotTargetEndVersionTimestamp;
|
||||
state bool stopWhenDone;
|
||||
|
||||
wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
|
||||
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
|
||||
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotInterval)
|
||||
&& store(config.logBytesWritten().get(tr), logBytesWritten)
|
||||
&& store(config.rangeBytesWritten().get(tr), rangeBytesWritten)
|
||||
&& store(config.latestLogEndVersion().get(tr), latestLogEndVersion)
|
||||
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion)
|
||||
&& store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
wait( store(snapshotBeginVersion, config.snapshotBeginVersion().getOrThrow(tr))
|
||||
&& store(snapshotTargetEndVersion, config.snapshotTargetEndVersion().getOrThrow(tr))
|
||||
&& store(snapshotInterval, config.snapshotIntervalSeconds().getOrThrow(tr))
|
||||
&& store(logBytesWritten, config.logBytesWritten().get(tr))
|
||||
&& store(rangeBytesWritten, config.rangeBytesWritten().get(tr))
|
||||
&& store(latestLogEndVersion, config.latestLogEndVersion().get(tr))
|
||||
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr))
|
||||
&& store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
|
||||
);
|
||||
|
||||
wait( store(getTimestampFromVersion(latestSnapshotEndVersion, tr), latestSnapshotEndVersionTimestamp)
|
||||
&& store(getTimestampFromVersion(latestLogEndVersion, tr), latestLogEndVersionTimestamp)
|
||||
&& store(timeKeeperEpochsFromVersion(snapshotBeginVersion, tr), snapshotBeginVersionTimestamp)
|
||||
&& store(timeKeeperEpochsFromVersion(snapshotTargetEndVersion, tr), snapshotTargetEndVersionTimestamp)
|
||||
wait( store(latestSnapshotEndVersionTimestamp, getTimestampFromVersion(latestSnapshotEndVersion, tr))
|
||||
&& store(latestLogEndVersionTimestamp, getTimestampFromVersion(latestLogEndVersion, tr))
|
||||
&& store(snapshotBeginVersionTimestamp, timeKeeperEpochsFromVersion(snapshotBeginVersion, tr))
|
||||
&& store(snapshotTargetEndVersionTimestamp, timeKeeperEpochsFromVersion(snapshotTargetEndVersion, tr))
|
||||
);
|
||||
|
||||
statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval);
|
||||
|
|
|
@ -294,12 +294,12 @@ Future<Void> holdWhileVoid(X object, Future<T> what)
|
|||
}
|
||||
|
||||
template<class T>
|
||||
Future<Void> store(Future<T> what, T &out) {
|
||||
Future<Void> store(T &out, Future<T> what) {
|
||||
return map(what, [&out](T const &v) { out = v; return Void(); });
|
||||
}
|
||||
|
||||
template<class T>
|
||||
Future<Void> storeOrThrow(Future<Optional<T>> what, T &out, Error e = key_not_found()) {
|
||||
Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
|
||||
return map(what, [&out,e](Optional<T> const &o) {
|
||||
if(!o.present())
|
||||
throw e;
|
||||
|
|
Loading…
Reference in New Issue