Show blob restore in fdbcli status command
This commit is contained in:
parent
1bc6e4cc6f
commit
2e62822183
|
@ -36,7 +36,8 @@ ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringR
|
|||
state bool success = false;
|
||||
wait(store(success, localDb->blobRestore(normalKeys)));
|
||||
if (success) {
|
||||
fmt::print("Started blob restore for the full cluster. Please use 'status' command to check progress.\n");
|
||||
fmt::print(
|
||||
"Started blob restore for the full cluster. Please use 'status details' command to check progress.\n");
|
||||
} else {
|
||||
fmt::print("Fail to start a new blob restore while there is a pending one.\n");
|
||||
}
|
||||
|
|
|
@ -1125,6 +1125,15 @@ void printStatus(StatusObjectReader statusObj,
|
|||
outputString += "\n Number of Workers - " + format("%d", numWorkers);
|
||||
auto numKeyRanges = statusObjBlobGranules["number_of_key_ranges"].get_int();
|
||||
outputString += "\n Number of Key Ranges - " + format("%d", numKeyRanges);
|
||||
if (statusObjCluster.has("blob_restore")) {
|
||||
StatusObjectReader statusObjBlobRestore = statusObjCluster["blob_restore"];
|
||||
std::string restoreStatus = statusObjBlobRestore["blob_full_restore_phase"].get_str();
|
||||
if (statusObjBlobRestore.has("blob_full_restore_progress")) {
|
||||
auto progress = statusObjBlobRestore["blob_full_restore_progress"].get_int();
|
||||
restoreStatus += " " + format("%d%%", progress);
|
||||
}
|
||||
outputString += "\n Full Restore - " + restoreStatus;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10932,8 +10932,7 @@ ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange rang
|
|||
return false; // stop if there is in-progress restore.
|
||||
}
|
||||
}
|
||||
Standalone<BlobRestoreStatus> status;
|
||||
status.progress = 0;
|
||||
BlobRestoreStatus status(BlobRestorePhase::INIT);
|
||||
Value newValue = blobRestoreCommandValueFor(status);
|
||||
tr->set(key, newValue);
|
||||
wait(tr->commit());
|
||||
|
|
|
@ -314,13 +314,19 @@ struct BlobManifest {
|
|||
};
|
||||
|
||||
// Defines blob restore status
|
||||
enum BlobRestorePhase { INIT = 0, LOAD_MANIFEST = 1, MANIFEST_DONE = 2, MIGRATE = 3, APPLY_MLOGS = 4, DONE = 5 };
|
||||
struct BlobRestoreStatus {
|
||||
constexpr static FileIdentifier file_identifier = 378657;
|
||||
BlobRestorePhase phase;
|
||||
int progress;
|
||||
|
||||
BlobRestoreStatus() : phase(BlobRestorePhase::INIT){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha) : phase(pha), progress(0){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha, int prog) : phase(pha), progress(prog){};
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, progress);
|
||||
serializer(ar, phase, progress);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -3543,10 +3543,16 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
bool isFullRestore = wait(isFullRestoreMode(bmData->db, normalKeys));
|
||||
bmData->isFullRestoreMode = isFullRestore;
|
||||
if (bmData->isFullRestoreMode) {
|
||||
BlobRestoreStatus initStatus(BlobRestorePhase::LOAD_MANIFEST);
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, initStatus));
|
||||
|
||||
wait(loadManifest(bmData->db, bmData->bstore));
|
||||
|
||||
int64_t epoc = wait(lastBlobEpoc(bmData->db, bmData->bstore));
|
||||
wait(updateEpoch(bmData, epoc + 1));
|
||||
|
||||
BlobRestoreStatus completedStatus(BlobRestorePhase::MANIFEST_DONE);
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, completedStatus));
|
||||
}
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
|
|
@ -545,7 +545,7 @@ ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
|
|||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keyRange.contains(keys)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
return status.progress < 100; // progress is less than 100
|
||||
return status.phase < BlobRestorePhase::DONE;
|
||||
}
|
||||
}
|
||||
if (!ranges.more) {
|
||||
|
@ -563,3 +563,44 @@ ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update restore status
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db, KeyRangeRef range, BlobRestoreStatus status) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Key key = blobRestoreCommandKeyFor(range);
|
||||
Value value = blobRestoreCommandValueFor(status);
|
||||
tr.set(key, value);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get restore status
|
||||
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef range) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Key key = blobRestoreCommandKeyFor(range);
|
||||
Optional<Value> value = wait(tr.get(key));
|
||||
Optional<BlobRestoreStatus> result;
|
||||
if (value.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
|
||||
result = status;
|
||||
}
|
||||
return result;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/IRandom.h"
|
||||
|
@ -75,8 +76,8 @@ private:
|
|||
// Check if blob manifest is loaded so that blob migration can start
|
||||
ACTOR static Future<Void> checkIfReadyForMigration(Reference<BlobMigrator> self) {
|
||||
loop {
|
||||
bool isFullRestore = wait(isFullRestoreMode(self->db_, normalKeys));
|
||||
if (isFullRestore) {
|
||||
Optional<BlobRestoreStatus> status = wait(getRestoreStatus(self->db_, normalKeys));
|
||||
if (canStartMigration(status)) {
|
||||
BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_));
|
||||
if (!granules.empty()) {
|
||||
self->blobGranules_ = granules;
|
||||
|
@ -87,6 +88,9 @@ private:
|
|||
.detail("Version", granule.version)
|
||||
.detail("SizeInBytes", granule.sizeInBytes);
|
||||
}
|
||||
|
||||
BlobRestoreStatus status(BlobRestorePhase::MIGRATE, 0);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -94,6 +98,15 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Check if we should start migration. Migration can be started after manifest is fully loaded
|
||||
static bool canStartMigration(Optional<BlobRestoreStatus> status) {
|
||||
if (status.present()) {
|
||||
BlobRestoreStatus value = status.get();
|
||||
return value.phase == BlobRestorePhase::MANIFEST_DONE; // manifest is loaded successfully
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Prepare for data migration for given key range.
|
||||
ACTOR static Future<Void> prepare(Reference<BlobMigrator> self, KeyRangeRef keys) {
|
||||
// Register as a storage server, so that DataDistributor could start data movement after
|
||||
|
@ -120,8 +133,8 @@ private:
|
|||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state Value value = keyServersValue(std::vector<UID>({ serverUID }), std::vector<UID>(), UID(), UID());
|
||||
wait(krmSetRange(&tr, keyServersPrefix, keys, value));
|
||||
wait(krmSetRange(&tr, serverKeysPrefixFor(serverUID), keys, serverKeysTrue));
|
||||
wait(krmSetRangeCoalescing(&tr, keyServersPrefix, keys, allKeys, value));
|
||||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverUID), keys, allKeys, serverKeysTrue));
|
||||
wait(tr.commit());
|
||||
dprint("Assign {} to server {}\n", normalKeys.toString(), serverUID.toString());
|
||||
return Void();
|
||||
|
@ -152,7 +165,7 @@ private:
|
|||
}
|
||||
}
|
||||
if (owning) {
|
||||
wait(krmSetRange(&tr, serverKeysPrefixFor(id), keys, serverKeysFalse));
|
||||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), keys, allKeys, serverKeysFalse));
|
||||
dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString());
|
||||
TraceEvent("UnassignKeys").detail("Keys", keys.toString()).detail("From", id.toString());
|
||||
}
|
||||
|
@ -169,8 +182,12 @@ private:
|
|||
ACTOR static Future<Void> logProgress(Reference<BlobMigrator> self) {
|
||||
loop {
|
||||
bool done = wait(checkProgress(self));
|
||||
if (done)
|
||||
if (done) {
|
||||
BlobRestoreStatus status(BlobRestorePhase::DONE);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status));
|
||||
|
||||
return Void();
|
||||
}
|
||||
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +222,8 @@ private:
|
|||
state bool done = incompleted == 0;
|
||||
dprint("Migration progress :{}%. done {}\n", progress, done);
|
||||
TraceEvent("BlobMigratorProgress").detail("Progress", progress).detail("Done", done);
|
||||
wait(updateProgress(self, normalKeys, progress));
|
||||
BlobRestoreStatus status(BlobRestorePhase::MIGRATE, progress);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status));
|
||||
return done;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
@ -213,32 +231,6 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Update restore progress
|
||||
ACTOR static Future<Void> updateProgress(Reference<BlobMigrator> self, KeyRangeRef range, int progress) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Key key = blobRestoreCommandKeyFor(range);
|
||||
Optional<Value> value = wait(tr.get(key));
|
||||
if (value.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
|
||||
if (progress > status.progress) {
|
||||
status.progress = progress;
|
||||
Value updatedValue = blobRestoreCommandValueFor(status);
|
||||
tr.set(key, updatedValue);
|
||||
wait(tr.commit());
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Advance version, so that future commits will have a larger version than the restored data
|
||||
ACTOR static Future<Void> advanceVersion(Reference<BlobMigrator> self) {
|
||||
state Transaction tr(self->db_);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
|
@ -2565,8 +2566,8 @@ ACTOR Future<Void> watchBlobRestoreCommand(ClusterControllerData* self) {
|
|||
Optional<Value> blobRestoreCommand = wait(tr->get(blobRestoreCommandKey));
|
||||
if (blobRestoreCommand.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(blobRestoreCommand.get());
|
||||
TraceEvent("WatchBlobRestoreCommand").detail("Progress", status.progress);
|
||||
if (status.progress == 0) {
|
||||
TraceEvent("WatchBlobRestoreCommand").detail("Progress", status.progress).detail("Phase", status.phase);
|
||||
if (status.phase == BlobRestorePhase::INIT) {
|
||||
self->db.blobRestoreEnabled.set(true);
|
||||
if (self->db.blobGranulesEnabled.get()) {
|
||||
const auto& blobManager = self->db.serverInfo->get().blobManager;
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
*/
|
||||
|
||||
#include <cinttypes>
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "fmt/format.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BlobWorkerInterface.h"
|
||||
|
@ -2442,6 +2444,47 @@ ACTOR static Future<JsonBuilderObject> blobWorkerStatusFetcher(
|
|||
return statusObj;
|
||||
}
|
||||
|
||||
ACTOR static Future<JsonBuilderObject> blobRestoreStatusFetcher(Database db, std::set<std::string>* incompleteReason) {
|
||||
|
||||
state JsonBuilderObject statusObj;
|
||||
state std::vector<Future<Optional<TraceEventFields>>> futures;
|
||||
|
||||
try {
|
||||
Optional<BlobRestoreStatus> status = wait(getRestoreStatus(db, normalKeys));
|
||||
if (status.present()) {
|
||||
switch (status.get().phase) {
|
||||
case BlobRestorePhase::INIT:
|
||||
statusObj["blob_full_restore_phase"] = "Initializing";
|
||||
break;
|
||||
case BlobRestorePhase::LOAD_MANIFEST:
|
||||
statusObj["blob_full_restore_phase"] = "Loading manifest";
|
||||
break;
|
||||
case BlobRestorePhase::MANIFEST_DONE:
|
||||
statusObj["blob_full_restore_phase"] = "Manifest loaded";
|
||||
break;
|
||||
case BlobRestorePhase::MIGRATE:
|
||||
statusObj["blob_full_restore_phase"] = "Copying data";
|
||||
statusObj["blob_full_restore_progress"] = status.get().progress;
|
||||
break;
|
||||
case BlobRestorePhase::APPLY_MLOGS:
|
||||
statusObj["blob_full_restore_phase"] = "Applying mutation logs";
|
||||
statusObj["blob_full_restore_progress"] = status.get().progress;
|
||||
break;
|
||||
case BlobRestorePhase::DONE:
|
||||
statusObj["blob_full_restore_phase"] = "Completed";
|
||||
break;
|
||||
default:
|
||||
statusObj["blob_full_restore_phase"] = "Unexpected phase";
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
incompleteReason->insert("Unable to query blob restore status");
|
||||
}
|
||||
return statusObj;
|
||||
}
|
||||
|
||||
static JsonBuilderObject tlogFetcher(int* logFaultTolerance,
|
||||
const std::vector<TLogSet>& tLogs,
|
||||
std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
|
||||
|
@ -3407,6 +3450,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
JsonBuilderObject blobGranuelsStatus =
|
||||
wait(blobWorkerStatusFetcher(blobWorkers, address_workers, &status_incomplete_reasons));
|
||||
statusObj["blob_granules"] = blobGranuelsStatus;
|
||||
JsonBuilderObject blobRestoreStatus = wait(blobRestoreStatusFetcher(cx, &status_incomplete_reasons));
|
||||
statusObj["blob_restore"] = blobRestoreStatus;
|
||||
}
|
||||
|
||||
JsonBuilderArray incompatibleConnectionsArray;
|
||||
|
|
|
@ -163,7 +163,8 @@ ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProv
|
|||
ACTOR Future<BlobGranuleRestoreVersionVector> listBlobGranules(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<int64_t> lastBlobEpoc(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef range);
|
||||
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db, KeyRangeRef range, BlobRestoreStatus status);
|
||||
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef range);
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue