diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index 02b922c4a8..66cee7b0b7 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -58,6 +58,69 @@ ACTOR Future setBlobRange(Database db, Key startKey, Key endKey, Value val } } +ACTOR Future getLatestReadVersion(Database db) { + state Transaction tr(db); + loop { + try { + Version rv = wait(tr.getReadVersion()); + fmt::print("Resolved latest read version as {0}\n", rv); + return rv; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +// print after delay if not cancelled +ACTOR Future printAfterDelay(double delaySeconds, std::string message) { + wait(delay(delaySeconds)); + fmt::print("{}\n", message); + return Void(); +} + +ACTOR Future doBlobPurge(Database db, Key startKey, Key endKey, Optional version) { + state Version purgeVersion; + if (version.present()) { + purgeVersion = version.get(); + } else { + wait(store(purgeVersion, getLatestReadVersion(db))); + } + + state Key purgeKey = wait(db->purgeBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), purgeVersion, {})); + + fmt::print("Blob purge registered for [{0} - {1}) @ {2}\n", startKey.printable(), endKey.printable(), purgeVersion); + + state Future printWarningActor = printAfterDelay( + 5.0, "Waiting for purge to complete. (interrupting this wait with CTRL+C will not cancel the purge)"); + wait(db->waitPurgeGranulesComplete(purgeKey)); + + fmt::print("Blob purge complete for [{0} - {1}) @ {2}\n", startKey.printable(), endKey.printable(), purgeVersion); + + return Void(); +} + +ACTOR Future doBlobCheck(Database db, Key startKey, Key endKey, Optional version) { + state Transaction tr(db); + state Version readVersionOut = invalidVersion; + state double elapsed = -timer_monotonic(); + loop { + try { + wait(success(tr.readBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), 0, version, &readVersionOut))); + elapsed += timer_monotonic(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + fmt::print("Blob check complete for [{0} - {1}) @ {2} in {3:.6f} seconds\n", + startKey.printable(), + endKey.printable(), + readVersionOut, + elapsed); + return Void(); +} + } // namespace namespace fdb_cli { @@ -66,7 +129,7 @@ ACTOR Future blobRangeCommandActor(Database localDb, Optional tenantEntry, std::vector tokens) { // enables blob writing for the given range - if (tokens.size() != 4) { + if (tokens.size() != 4 && tokens.size() != 5) { printUsage(tokens[0]); return false; } @@ -84,29 +147,60 @@ ACTOR Future blobRangeCommandActor(Database localDb, if (end > LiteralStringRef("\xff")) { // TODO is this something we want? - printf("Cannot blobbify system keyspace! Problematic End Key: %s\n", tokens[3].printable().c_str()); + fmt::print("Cannot blobbify system keyspace! Problematic End Key: {0}\n", tokens[3].printable()); return false; } else if (tokens[2] >= tokens[3]) { - printf("Invalid blob range [%s - %s)\n", tokens[2].printable().c_str(), tokens[3].printable().c_str()); + fmt::print("Invalid blob range [{0} - {1})\n", tokens[2].printable(), tokens[3].printable()); } else { - if (tokencmp(tokens[1], "start")) { - printf("Starting blobbify range for [%s - %s)\n", - tokens[2].printable().c_str(), - tokens[3].printable().c_str()); - wait(setBlobRange(localDb, begin, end, LiteralStringRef("1"))); - } else if (tokencmp(tokens[1], "stop")) { - printf("Stopping blobbify range for [%s - %s)\n", - tokens[2].printable().c_str(), - tokens[3].printable().c_str()); - wait(setBlobRange(localDb, begin, end, StringRef())); + if (tokencmp(tokens[1], "start") || tokencmp(tokens[1], "stop")) { + bool starting = tokencmp(tokens[1], "start"); + if (tokens.size() > 4) { + printUsage(tokens[0]); + return false; + } + fmt::print("{0} blobbify range for [{1} - {2})\n", + starting ? "Starting" : "Stopping", + tokens[2].printable().c_str(), + tokens[3].printable().c_str()); + wait(setBlobRange(localDb, begin, end, starting ? LiteralStringRef("1") : StringRef())); + } else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "check")) { + bool purge = tokencmp(tokens[1], "purge"); + + Optional version; + if (tokens.size() > 4) { + Version v; + int n = 0; + if (sscanf(tokens[4].toString().c_str(), "%" PRId64 "%n", &v, &n) != 1 || n != tokens[4].size()) { + printUsage(tokens[0]); + return false; + } + version = v; + } + + fmt::print("{0} blob range [{1} - {2})", + purge ? "Purging" : "Checking", + tokens[2].printable(), + tokens[3].printable()); + if (version.present()) { + fmt::print(" @ {0}", version.get()); + } + fmt::print("\n"); + + if (purge) { + wait(doBlobPurge(localDb, begin, end, version)); + } else { + wait(doBlobCheck(localDb, begin, end, version)); + } } else { printUsage(tokens[0]); - printf("Usage: blobrange "); return false; } } return true; } -CommandFactory blobRangeFactory("blobrange", CommandHelp("blobrange ", "", "")); +CommandFactory blobRangeFactory("blobrange", + CommandHelp("blobrange [version]", + "", + "")); } // namespace fdb_cli diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 29316ed2b8..07e9d819ff 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -82,7 +82,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( METADATA_VERSION_CACHE_SIZE, 1000 ); init( CHANGE_FEED_LOCATION_LIMIT, 10000 ); init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1; - init( CHANGE_FEED_POP_TIMEOUT, 5.0 ); + init( CHANGE_FEED_POP_TIMEOUT, 10.0 ); init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1; init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; diff --git a/fdbclient/include/fdbclient/BlobWorkerCommon.h b/fdbclient/include/fdbclient/BlobWorkerCommon.h index 9912d4d2c7..0535301427 100644 --- a/fdbclient/include/fdbclient/BlobWorkerCommon.h +++ b/fdbclient/include/fdbclient/BlobWorkerCommon.h @@ -59,7 +59,7 @@ struct BlobWorkerStats { bytesReadFromFDBForInitialSnapshot("BytesReadFromFDBForInitialSnapshot", cc), bytesReadFromS3ForCompaction("BytesReadFromS3ForCompaction", cc), rangeAssignmentRequests("RangeAssignmentRequests", cc), readRequests("ReadRequests", cc), - wrongShardServer("WrongShardServer", cc), changeFeedInputBytes("RangeFeedInputBytes", cc), + wrongShardServer("WrongShardServer", cc), changeFeedInputBytes("ChangeFeedInputBytes", cc), readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc), granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc),