Adding more blobrange cli commands and a couple other tweaks (#7727)
This commit is contained in:
parent
d99d0370b1
commit
cfc13e7018
|
@ -58,6 +58,69 @@ ACTOR Future<Void> setBlobRange(Database db, Key startKey, Key endKey, Value val
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> getLatestReadVersion(Database db) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
Version rv = wait(tr.getReadVersion());
|
||||
fmt::print("Resolved latest read version as {0}\n", rv);
|
||||
return rv;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// print after delay if not cancelled
|
||||
ACTOR Future<Void> printAfterDelay(double delaySeconds, std::string message) {
|
||||
wait(delay(delaySeconds));
|
||||
fmt::print("{}\n", message);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doBlobPurge(Database db, Key startKey, Key endKey, Optional<Version> version) {
|
||||
state Version purgeVersion;
|
||||
if (version.present()) {
|
||||
purgeVersion = version.get();
|
||||
} else {
|
||||
wait(store(purgeVersion, getLatestReadVersion(db)));
|
||||
}
|
||||
|
||||
state Key purgeKey = wait(db->purgeBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), purgeVersion, {}));
|
||||
|
||||
fmt::print("Blob purge registered for [{0} - {1}) @ {2}\n", startKey.printable(), endKey.printable(), purgeVersion);
|
||||
|
||||
state Future<Void> printWarningActor = printAfterDelay(
|
||||
5.0, "Waiting for purge to complete. (interrupting this wait with CTRL+C will not cancel the purge)");
|
||||
wait(db->waitPurgeGranulesComplete(purgeKey));
|
||||
|
||||
fmt::print("Blob purge complete for [{0} - {1}) @ {2}\n", startKey.printable(), endKey.printable(), purgeVersion);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doBlobCheck(Database db, Key startKey, Key endKey, Optional<Version> version) {
|
||||
state Transaction tr(db);
|
||||
state Version readVersionOut = invalidVersion;
|
||||
state double elapsed = -timer_monotonic();
|
||||
loop {
|
||||
try {
|
||||
wait(success(tr.readBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), 0, version, &readVersionOut)));
|
||||
elapsed += timer_monotonic();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
fmt::print("Blob check complete for [{0} - {1}) @ {2} in {3:.6f} seconds\n",
|
||||
startKey.printable(),
|
||||
endKey.printable(),
|
||||
readVersionOut,
|
||||
elapsed);
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace fdb_cli {
|
||||
|
@ -66,7 +129,7 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
|
|||
Optional<TenantMapEntry> tenantEntry,
|
||||
std::vector<StringRef> tokens) {
|
||||
// enables blob writing for the given range
|
||||
if (tokens.size() != 4) {
|
||||
if (tokens.size() != 4 && tokens.size() != 5) {
|
||||
printUsage(tokens[0]);
|
||||
return false;
|
||||
}
|
||||
|
@ -84,29 +147,60 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
|
|||
|
||||
if (end > LiteralStringRef("\xff")) {
|
||||
// TODO is this something we want?
|
||||
printf("Cannot blobbify system keyspace! Problematic End Key: %s\n", tokens[3].printable().c_str());
|
||||
fmt::print("Cannot blobbify system keyspace! Problematic End Key: {0}\n", tokens[3].printable());
|
||||
return false;
|
||||
} else if (tokens[2] >= tokens[3]) {
|
||||
printf("Invalid blob range [%s - %s)\n", tokens[2].printable().c_str(), tokens[3].printable().c_str());
|
||||
fmt::print("Invalid blob range [{0} - {1})\n", tokens[2].printable(), tokens[3].printable());
|
||||
} else {
|
||||
if (tokencmp(tokens[1], "start")) {
|
||||
printf("Starting blobbify range for [%s - %s)\n",
|
||||
tokens[2].printable().c_str(),
|
||||
tokens[3].printable().c_str());
|
||||
wait(setBlobRange(localDb, begin, end, LiteralStringRef("1")));
|
||||
} else if (tokencmp(tokens[1], "stop")) {
|
||||
printf("Stopping blobbify range for [%s - %s)\n",
|
||||
tokens[2].printable().c_str(),
|
||||
tokens[3].printable().c_str());
|
||||
wait(setBlobRange(localDb, begin, end, StringRef()));
|
||||
if (tokencmp(tokens[1], "start") || tokencmp(tokens[1], "stop")) {
|
||||
bool starting = tokencmp(tokens[1], "start");
|
||||
if (tokens.size() > 4) {
|
||||
printUsage(tokens[0]);
|
||||
return false;
|
||||
}
|
||||
fmt::print("{0} blobbify range for [{1} - {2})\n",
|
||||
starting ? "Starting" : "Stopping",
|
||||
tokens[2].printable().c_str(),
|
||||
tokens[3].printable().c_str());
|
||||
wait(setBlobRange(localDb, begin, end, starting ? LiteralStringRef("1") : StringRef()));
|
||||
} else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "check")) {
|
||||
bool purge = tokencmp(tokens[1], "purge");
|
||||
|
||||
Optional<Version> version;
|
||||
if (tokens.size() > 4) {
|
||||
Version v;
|
||||
int n = 0;
|
||||
if (sscanf(tokens[4].toString().c_str(), "%" PRId64 "%n", &v, &n) != 1 || n != tokens[4].size()) {
|
||||
printUsage(tokens[0]);
|
||||
return false;
|
||||
}
|
||||
version = v;
|
||||
}
|
||||
|
||||
fmt::print("{0} blob range [{1} - {2})",
|
||||
purge ? "Purging" : "Checking",
|
||||
tokens[2].printable(),
|
||||
tokens[3].printable());
|
||||
if (version.present()) {
|
||||
fmt::print(" @ {0}", version.get());
|
||||
}
|
||||
fmt::print("\n");
|
||||
|
||||
if (purge) {
|
||||
wait(doBlobPurge(localDb, begin, end, version));
|
||||
} else {
|
||||
wait(doBlobCheck(localDb, begin, end, version));
|
||||
}
|
||||
} else {
|
||||
printUsage(tokens[0]);
|
||||
printf("Usage: blobrange <start|stop> <startkey> <endkey>");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
CommandFactory blobRangeFactory("blobrange", CommandHelp("blobrange <start|stop> <startkey> <endkey>", "", ""));
|
||||
CommandFactory blobRangeFactory("blobrange",
|
||||
CommandHelp("blobrange <start|stop|purge|check> <startkey> <endkey> [version]",
|
||||
"",
|
||||
""));
|
||||
} // namespace fdb_cli
|
||||
|
|
|
@ -82,7 +82,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( METADATA_VERSION_CACHE_SIZE, 1000 );
|
||||
init( CHANGE_FEED_LOCATION_LIMIT, 10000 );
|
||||
init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1;
|
||||
init( CHANGE_FEED_POP_TIMEOUT, 5.0 );
|
||||
init( CHANGE_FEED_POP_TIMEOUT, 10.0 );
|
||||
init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1;
|
||||
|
||||
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
|
||||
|
|
|
@ -59,7 +59,7 @@ struct BlobWorkerStats {
|
|||
bytesReadFromFDBForInitialSnapshot("BytesReadFromFDBForInitialSnapshot", cc),
|
||||
bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc),
|
||||
rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc),
|
||||
wrongShardServer("WrongShardServer", cc), changeFeedInputBytes("RangeFeedInputBytes", cc),
|
||||
wrongShardServer("WrongShardServer", cc), changeFeedInputBytes("ChangeFeedInputBytes", cc),
|
||||
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc),
|
||||
readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc),
|
||||
granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc),
|
||||
|
|
Loading…
Reference in New Issue