Added updating of backup snapshot shards behind in snapshot dispatcher so status can determine if a snapshot is lagging the configured speed.
This commit is contained in:
parent
023bbb566f
commit
f2953db7d8
|
@ -742,7 +742,7 @@ public:
|
|||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
KeyBackedBinaryValue<int64_t> snapshotDispatchLastShardsBehind() {
|
||||
KeyBackedProperty<int64_t> snapshotDispatchLastShardsBehind() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
|
|
|
@ -1273,6 +1273,10 @@ namespace fileBackup {
|
|||
static const uint32_t version;
|
||||
|
||||
static struct {
|
||||
// Set by Execute, used by Finish
|
||||
static TaskParam<int64_t> shardsBehind() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
// Set by Execute, used by Finish
|
||||
static TaskParam<bool> snapshotFinished() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
|
@ -1369,8 +1373,11 @@ namespace fileBackup {
|
|||
&& store(recentReadVersion, tr->getReadVersion())
|
||||
&& taskBucket->keepRunning(tr, task));
|
||||
|
||||
// If the snapshot batch future key does not exist, create it, set it, and commit
|
||||
// Also initialize the target snapshot end version if it is not yet set.
|
||||
// If the snapshot batch future key does not exist, this is the first execution of this dispatch task so
|
||||
// - create and set the snapshot batch future key
|
||||
// - initialize the batch size to 0
|
||||
// - initialize the target snapshot end version if it is not yet set
|
||||
// - commit
|
||||
if(!snapshotBatchFutureKey.present()) {
|
||||
snapshotBatchFuture = futureBucket->future(tr);
|
||||
config.snapshotBatchFuture().set(tr, snapshotBatchFuture->pack());
|
||||
|
@ -1540,14 +1547,28 @@ namespace fileBackup {
|
|||
// Calculate number of shards that should be done before the next interval end
|
||||
// timeElapsed is between 0 and 1 and represents what portion of the shards we should have completed by now
|
||||
double timeElapsed;
|
||||
Version snapshotScheduledVersionInterval = snapshotTargetEndVersion - snapshotBeginVersion;
|
||||
if(snapshotTargetEndVersion > snapshotBeginVersion)
|
||||
timeElapsed = std::min(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion));
|
||||
timeElapsed = std::min(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotScheduledVersionInterval));
|
||||
else
|
||||
timeElapsed = 1.0;
|
||||
|
||||
state int countExpectedShardsDone = countAllShards * timeElapsed;
|
||||
state int countShardsToDispatch = std::max<int>(0, countExpectedShardsDone - countShardsDone);
|
||||
|
||||
// Calculate the number of shards that would have been dispatched by a normal (on-schedule) BackupSnapshotDispatchTask given
|
||||
// the dispatch window and the start and expected-end versions of the current snapshot.
|
||||
int64_t dispatchWindow = nextDispatchVersion - recentReadVersion;
|
||||
int countShardsExpectedPerNormalWindow = (double(dispatchWindow) / snapshotScheduledVersionInterval) * countAllShards;
|
||||
// countShardsThisDispatch is how many total shards are to be dispatched by this dispatch cycle.
|
||||
// Since this dispatch cycle can span many incrementally progressing separate executions of the BackupSnapshotDispatchTask
|
||||
// instance, this is calculated as the number of shards dispatched so far in the dispatch batch plus the number of shards
|
||||
// the current execution is going to attempt to do.
|
||||
int countShardsThisDispatch = countShardsToDispatch + snapshotBatchSize.get();
|
||||
// The number of shards 'behind' the snapshot is the count of how may additional shards beyond normal are being dispatched, if any.
|
||||
int countShardsBehind = std::max<int64_t>(0, countShardsToDispatch + snapshotBatchSize.get() - countShardsExpectedPerNormalWindow);
|
||||
Params.shardsBehind().set(task, countShardsBehind);
|
||||
|
||||
TraceEvent("FileBackupSnapshotDispatchStats")
|
||||
.detail("BackupUID", config.getUid())
|
||||
.detail("AllShards", countAllShards)
|
||||
|
@ -1555,6 +1576,7 @@ namespace fileBackup {
|
|||
.detail("ShardsNotDone", countShardsNotDone)
|
||||
.detail("ExpectedShardsDone", countExpectedShardsDone)
|
||||
.detail("ShardsToDispatch", countShardsToDispatch)
|
||||
.detail("ShardsBehind", countShardsBehind)
|
||||
.detail("SnapshotBeginVersion", snapshotBeginVersion)
|
||||
.detail("SnapshotTargetEndVersion", snapshotTargetEndVersion)
|
||||
.detail("NextDispatchVersion", nextDispatchVersion)
|
||||
|
@ -1627,6 +1649,8 @@ namespace fileBackup {
|
|||
ASSERT(snapshotBatchSize.get() == oldBatchSize);
|
||||
config.snapshotBatchSize().set(tr, newBatchSize);
|
||||
snapshotBatchSize = newBatchSize;
|
||||
config.snapshotDispatchLastShardsBehind().set(tr, Params.shardsBehind().get(task));
|
||||
config.snapshotDispatchLastVersion().set(tr, tr->getReadVersion().get());
|
||||
}
|
||||
|
||||
state std::vector<Future<Void>> addTaskFutures;
|
||||
|
@ -1729,6 +1753,10 @@ namespace fileBackup {
|
|||
config.snapshotBatchDispatchDoneKey().clear(tr);
|
||||
config.snapshotBatchSize().clear(tr);
|
||||
|
||||
// Update shardsBehind here again in case the execute phase did not actually have to create any shard tasks
|
||||
config.snapshotDispatchLastShardsBehind().set(tr, Params.shardsBehind().getOrDefault(task, 0));
|
||||
config.snapshotDispatchLastVersion().set(tr, tr->getReadVersion().get());
|
||||
|
||||
state Reference<TaskFuture> snapshotFinishedFuture = task->getDoneFuture(futureBucket);
|
||||
|
||||
// If the snapshot is finished, the next task is to write a snapshot manifest, otherwise it's another snapshot dispatch task.
|
||||
|
|
Loading…
Reference in New Issue