blob migrator: advance version and report data copy progress
This commit is contained in:
parent
ccfef328af
commit
f09e995ebb
|
@ -1002,6 +1002,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_MANIFEST_BACKUP, false );
|
||||
init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 );
|
||||
init( BLOB_FULL_RESTORE_MODE, false );
|
||||
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
|
||||
|
||||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
|
|
@ -982,6 +982,7 @@ public:
|
|||
bool BLOB_MANIFEST_BACKUP;
|
||||
double BLOB_MANIFEST_BACKUP_INTERVAL;
|
||||
bool BLOB_FULL_RESTORE_MODE;
|
||||
double BLOB_MIGRATOR_CHECK_INTERVAL;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/BlobMigratorInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/IRandom.h"
|
||||
|
@ -35,6 +33,8 @@
|
|||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "fdbserver/BlobMigratorInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include "flow/network.h"
|
||||
#include <algorithm>
|
||||
|
@ -72,7 +72,7 @@ public:
|
|||
self->blobGranules_ = granules;
|
||||
|
||||
wait(prepare(self, normalKeys));
|
||||
|
||||
wait(advanceVersion(self));
|
||||
wait(serverLoop(self));
|
||||
return Void();
|
||||
}
|
||||
|
@ -148,9 +148,78 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Print migration progress periodically
|
||||
ACTOR static Future<Void> logProgress(Reference<BlobMigrator> self) {
|
||||
loop {
|
||||
bool done = wait(checkProgress(self));
|
||||
if (done)
|
||||
return Void();
|
||||
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
// Check key ranges that are migrated. Return true if all ranges are done
|
||||
ACTOR static Future<bool> checkProgress(Reference<BlobMigrator> self) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
// Get key ranges that are still owned by the migrator. Those ranges are
|
||||
// incompleted migrations
|
||||
state UID serverID = self->interf_.ssi.id();
|
||||
RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(serverID), normalKeys));
|
||||
|
||||
// Count incompleted size
|
||||
int64_t incompleted = 0;
|
||||
for (auto i = 0; i < ranges.size() - 1; ++i) {
|
||||
if (ranges[i].value == serverKeysTrue) {
|
||||
KeyRangeRef range(ranges[i].key, ranges[i + 1].key);
|
||||
int64_t bytes = sizeInBytes(self, range);
|
||||
dprint(" incompleted {}, size: {}\n", range.toString(), bytes);
|
||||
incompleted += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculated progress
|
||||
int64_t total = sizeInBytes(self);
|
||||
int progress = (total - incompleted) * 100 / total;
|
||||
bool done = incompleted == 0;
|
||||
dprint("Progress {} :{}%. done {}\n", serverID.toString(), progress, done);
|
||||
return done;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Advance version, so that future commits will have a larger version than the restored data
|
||||
ACTOR static Future<Void> advanceVersion(Reference<BlobMigrator> self) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
Version currentVersion = wait(tr.getRawReadVersion());
|
||||
Version expectedVersion = maxVersion(self);
|
||||
if (currentVersion <= expectedVersion) {
|
||||
tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned()));
|
||||
dprint("Advance version from {} to {}\n", currentVersion, expectedVersion);
|
||||
wait(tr.commit());
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main server loop
|
||||
ACTOR static Future<Void> serverLoop(Reference<BlobMigrator> self) {
|
||||
self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture()));
|
||||
self->actors_.add(logProgress(self));
|
||||
self->actors_.add(handleRequest(self));
|
||||
self->actors_.add(handleUnsupportedRequest(self));
|
||||
loop {
|
||||
|
|
Loading…
Reference in New Issue