Bug fixes. Added snapshotBatchSize to backupConfig to enable detecting if a transaction for adding a group of tasks to a batch had already completed. Changed KeyRangeMap usage so that each range value to be dispatched has a unique integer value, enabling more efficient range coalescing and avoiding some iterator invalidation bugs.

This commit is contained in:
Stephen Atherton 2017-12-15 01:39:50 -08:00
parent 33f9f1a95c
commit 18305ab326
2 changed files with 53 additions and 15 deletions

View File

@ -574,18 +574,26 @@ public:
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Interval to use for determining the target end version for new snapshots
KeyBackedProperty<int64_t> snapshotIntervalSeconds() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// When the current snapshot began
KeyBackedProperty<Version> snapshotBeginVersion() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// When the current snapshot is desired to end.
// This can be changed at runtime to speed up or slow down a snapshot
KeyBackedProperty<Version> snapshotTargetEndVersion() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<int64_t> snapshotBatchSize() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Key> snapshotBatchFuture() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}

View File

@ -1032,7 +1032,7 @@ namespace fileBackup {
return key;
}
enum DispatchState { SKIP, NOT_DONE, DONE };
enum DispatchState { SKIP=0, DONE=1, NOT_DONE_MIN=2};
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
@ -1040,7 +1040,11 @@ namespace fileBackup {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state KeyRangeMap<DispatchState> shardMap(NOT_DONE, normalKeys.end);
// The shard map will use 3 values classes. Exactly SKIP, exactly DONE, then any number >= NOT_DONE_MIN which will mean not done.
// This is to enable an efficient coalesce() call to squash adjacent ranges which are not yet finished to enable efficiently
// finding random database shards which are not done.
state int notDoneSequence = NOT_DONE_MIN;
state KeyRangeMap<int> shardMap(notDoneSequence++, normalKeys.end);
state Key beginKey = normalKeys.begin;
// Read all shard boundaries and add them to the map
@ -1056,7 +1060,7 @@ namespace fileBackup {
break;
for(auto &boundary : shardBoundaries.get()) {
shardMap.rawInsert(boundary, NOT_DONE);
shardMap.rawInsert(boundary, notDoneSequence++);
}
beginKey = keyAfter(shardBoundaries.get().back());
@ -1075,6 +1079,7 @@ namespace fileBackup {
state Optional<Version> snapshotTargetEndVersion;
state Optional<Key> snapshotBatchFutureKey;
state Reference<TaskFuture> snapshotBatchFuture;
state Optional<int64_t> snapshotBatchSize;
tr->reset();
loop {
@ -1084,10 +1089,11 @@ namespace fileBackup {
Void _ = wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
&& store(config.backupRanges().getOrThrow(tr), backupRanges)
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotIntervalSeconds)
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotIntervalSeconds)
// The next two parameters are optional
&& store(config.snapshotBatchFuture().get(tr), snapshotBatchFutureKey)
&& store(config.snapshotTargetEndVersion().get(tr), snapshotTargetEndVersion)
&& store(config.snapshotBatchSize().get(tr), snapshotBatchSize)
&& store(tr->getReadVersion(), recentReadVersion)
&& taskBucket->keepRunning(tr, task));
@ -1096,6 +1102,8 @@ namespace fileBackup {
if(!snapshotBatchFutureKey.present()) {
snapshotBatchFuture = futureBucket->future(tr);
config.snapshotBatchFuture().set(tr, snapshotBatchFuture->pack());
snapshotBatchSize = 0;
config.snapshotBatchSize().set(tr, snapshotBatchSize.get());
// The dispatch of this batch can take multiple separate executions if the executor fails
// so store a completion key for the dispatch finish() to set when dispatching the batch is done.
@ -1116,6 +1124,7 @@ namespace fileBackup {
}
else {
ASSERT(snapshotTargetEndVersion.present());
ASSERT(snapshotBatchSize.present());
// Batch future key exists in the config so create future from it
snapshotBatchFuture = Reference<TaskFuture>(new TaskFuture(futureBucket, snapshotBatchFutureKey.get()));
}
@ -1190,16 +1199,12 @@ namespace fileBackup {
if(range.value() == DONE) {
++countShardsDone;
}
else if(range.value() == NOT_DONE)
else if(range.value() >= NOT_DONE_MIN)
++countShardsNotDone;
}
// Scan the shard map a second time, coalescing the DONE shards so that finding random NOT_DONE
// ranges is more efficient when there are only a few left.
for(auto &range : shardMap.ranges()) {
if(range->value() == DONE)
shardMap.coalesce(Key(range.begin()));
}
shardMap.coalesce(normalKeys);
// In this context "all" refers to all of the shards relevant for this particular backup
state int countAllShards = countShardsDone + countShardsNotDone;
@ -1253,7 +1258,7 @@ namespace fileBackup {
auto it = shardMap.randomRange();
// Find a NOT_DONE range and add it to rangesToAdd
while(1) {
if(it->value() == NOT_DONE) {
if(it->value() >= NOT_DONE_MIN) {
rangesToAdd.push_back(it->range());
it->value() = DONE;
shardMap.coalesce(Key(it->begin()));
@ -1269,6 +1274,7 @@ namespace fileBackup {
}
}
state int64_t oldBatchSize = snapshotBatchSize.get();
// Now add the ranges in a single transaction
tr->reset();
loop {
@ -1285,7 +1291,18 @@ namespace fileBackup {
endReads.push_back( config.rangeDispatchMap().get(tr, range.end));
}
Void _ = wait(waitForAll(beginReads) && waitForAll(endReads) && taskBucket->keepRunning(tr, task));
Void _ = wait(store(config.snapshotBatchSize().getOrThrow(tr), snapshotBatchSize.get())
&& waitForAll(beginReads) && waitForAll(endReads) && taskBucket->keepRunning(tr, task));
// If the snapshot batch size was already changed to oldBatchSize + rangesToAdd.size() then this transaction
// was already committed
if(snapshotBatchSize.get() == oldBatchSize) {
config.snapshotBatchSize().set(tr, oldBatchSize + rangesToAdd.size());
}
else {
ASSERT(snapshotBatchSize.get() == oldBatchSize + rangesToAdd.size());
break;
}
// Count the undispatched ranges while checking the boundaries
int undispatchedRanges = 0;
@ -1339,6 +1356,19 @@ namespace fileBackup {
}
}
if(countShardsNotDone == 0) {
TraceEvent("FileBackupSnapshotDispatchFinished")
.detail("BackupUID", config.getUid())
.detail("AllShards", countAllShards)
.detail("ShardsDone", countShardsDone)
.detail("ShardsNotDone", countShardsNotDone)
.detail("SnapshotBeginVersion", snapshotBeginVersion)
.detail("SnapshotTargetEndVersion", snapshotTargetEndVersion.get())
.detail("CurrentVersion", recentReadVersion)
.detail("SnapshotIntervalSeconds", snapshotIntervalSeconds);
Params.snapshotFinished().set(task, true);
}
return Void();
}
@ -1816,8 +1846,9 @@ namespace fileBackup {
BackupConfig::RangeFileMapT::PairsType rangeresults = wait(config.rangeFileMap().getRange(tr, startKey, {}, batchSize));
for(auto &p : rangeresults)
for(auto &p : rangeresults) {
localmap.insert(p);
}
if(rangeresults.size() < batchSize)
break;
@ -2005,7 +2036,6 @@ namespace fileBackup {
state Reference<TaskFuture> kvBackupRangeComplete = futureBucket->future(tr);
state Reference<TaskFuture> kvBackupComplete = futureBucket->future(tr);
state int rangeCount = 0;
config.snapshotBeginVersion().set(tr, beginVersion);
Void _ = wait(success(BackupSnapshotDispatch::addTask(tr, taskBucket, task, TaskCompletionKey::signal(kvBackupRangeComplete), Reference<TaskFuture>(), 1)));