Truncate logs after force-flushing cold blob granules

This commit is contained in:
Hui Liu 2023-02-01 13:56:07 -08:00
parent 2fe32610d6
commit aa1d983132
9 changed files with 122 additions and 18 deletions

View File

@ -1060,6 +1060,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_RESTORE_MANIFEST_URL, isSimulated ? "file://simfdb/fdbblob/manifest" : "" ); init( BLOB_RESTORE_MANIFEST_URL, isSimulated ? "file://simfdb/fdbblob/manifest" : "" );
init( BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE, isSimulated ? 10000 : 10000000 ); init( BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE, isSimulated ? 10000 : 10000000 );
init( BLOB_RESTORE_MANIFEST_RETENTION_MAX, 10 ); init( BLOB_RESTORE_MANIFEST_RETENTION_MAX, 10 );
init( BLOB_RESTORE_MLOGS_RETENTION_SECS, isSimulated ? 120 : 3600 * 24 * 14 );
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );

View File

@ -1742,6 +1742,7 @@ Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value) {
} }
const Key blobManifestVersionKey = "\xff\x02/blobManifestVersion"_sr; const Key blobManifestVersionKey = "\xff\x02/blobManifestVersion"_sr;
const Key blobGranulesLastFlushKey = "\xff\x02/blobGranulesLastFlushTs"_sr;
const KeyRangeRef idempotencyIdKeys("\xff\x02/idmp/"_sr, "\xff\x02/idmp0"_sr); const KeyRangeRef idempotencyIdKeys("\xff\x02/idmp/"_sr, "\xff\x02/idmp0"_sr);
const KeyRef idempotencyIdsExpiredVersion("\xff\x02/idmpExpiredVersion"_sr); const KeyRef idempotencyIdsExpiredVersion("\xff\x02/idmpExpiredVersion"_sr);

View File

@ -1039,6 +1039,7 @@ public:
std::string BLOB_RESTORE_MANIFEST_URL; std::string BLOB_RESTORE_MANIFEST_URL;
int BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE; int BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE;
int BLOB_RESTORE_MANIFEST_RETENTION_MAX; int BLOB_RESTORE_MANIFEST_RETENTION_MAX;
int BLOB_RESTORE_MLOGS_RETENTION_SECS;
// Blob metadata // Blob metadata
int64_t BLOB_METADATA_CACHE_TTL; int64_t BLOB_METADATA_CACHE_TTL;

View File

@ -733,6 +733,7 @@ const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key);
const Value blobRestoreArgValueFor(BlobRestoreArg args); const Value blobRestoreArgValueFor(BlobRestoreArg args);
Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value); Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value);
extern const Key blobManifestVersionKey; extern const Key blobManifestVersionKey;
extern const Key blobGranulesLastFlushKey;
extern const KeyRangeRef idempotencyIdKeys; extern const KeyRangeRef idempotencyIdKeys;
extern const KeyRef idempotencyIdsExpiredVersion; extern const KeyRef idempotencyIdsExpiredVersion;

View File

@ -19,14 +19,20 @@
*/ */
#include <algorithm> #include <algorithm>
#include <ctime>
#include <limits> #include <limits>
#include <sstream> #include <sstream>
#include <queue> #include <queue>
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include "fdbclient/BackupContainer.h"
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/ServerKnobs.h" #include "fdbclient/ServerKnobs.h"
#include "fdbrpc/simulator.h" #include "fdbrpc/simulator.h"
#include "flow/CodeProbe.h"
#include "flow/serialize.h"
#include "fmt/format.h" #include "fmt/format.h"
#include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobGranuleCommon.h" #include "fdbclient/BlobGranuleCommon.h"
@ -38,6 +44,7 @@
#include "fdbclient/ReadYourWrites.h" #include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h" #include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h" #include "fdbclient/Tuple.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/BlobManagerInterface.h" #include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/Knobs.h" #include "fdbserver/Knobs.h"
#include "fdbserver/BlobGranuleValidation.actor.h" #include "fdbserver/BlobGranuleValidation.actor.h"
@ -5243,6 +5250,91 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
} }
} }
ACTOR Future<Void> updateLastFlushTs(Database db) {
state Transaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
KeyBackedProperty<int64_t> lastFlushTs(blobGranulesLastFlushKey);
int64_t epochs = (int64_t)now();
lastFlushTs.set(&tr, epochs);
wait(tr.commit());
return Void();
} catch (Error& e) {
fmt::print("updateLastFlushTs {} \n", e.what());
wait(tr.onError(e));
}
}
}
ACTOR Future<int64_t> getLastFlushTs(Database db) {
state Transaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
KeyBackedProperty<int64_t> lastFlushTs(blobGranulesLastFlushKey);
state int64_t ret;
wait(store(ret, lastFlushTs.getD(&tr, Snapshot::False, 0)));
return ret;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> maybeFlushAndTruncateMutationLogs(Reference<BlobManagerData> bmData) {
try {
int64_t lastFlushTs = wait(getLastFlushTs(bmData->db));
bool shouldFlush = (now() - lastFlushTs) > SERVER_KNOBS->BLOB_RESTORE_MLOGS_RETENTION_SECS;
if (!shouldFlush) {
TraceEvent("SkipBlobGranulesFlush").detail("LastFlushTs", lastFlushTs);
return Void();
}
state std::string mlogsUrl = wait(getMutationLogUrl());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
state BackupDescription desc = wait(bc->describeBackup(true));
if (!desc.contiguousLogEnd.present()) {
TraceEvent("SkipBlobGranulesFlush").detail("LogUrl", mlogsUrl);
return Void(); // skip truncation if no valid backup for mutation logs
}
state Version logEndVersion = desc.contiguousLogEnd.get();
// Force flush in-memory data of all blob workers until end of log
FlushGranuleRequest req(bmData->epoch, normalKeys, logEndVersion, false);
wait(success(doBlobGranuleRequests(bmData->db, normalKeys, req, &BlobWorkerInterface::flushGranuleRequest)));
wait(updateLastFlushTs(bmData->db));
// Truncate mutation logs to max retention period
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(bmData->db));
Optional<int64_t> logEndEpochs = wait(timeKeeperEpochsFromVersion(logEndVersion, tr));
if (!logEndEpochs.present()) {
TraceEvent("SkipMutationLogTruncation").detail("LogEndVersion", logEndVersion);
return Void(); // skip truncation if no timestamp about log end
}
// Find timestamp and version to truncate
int64_t epochs = logEndEpochs.get() - SERVER_KNOBS->BLOB_RESTORE_MLOGS_RETENTION_SECS;
if (epochs <= 0) {
TraceEvent("SkipMutationLogTruncation").detail("Epochs", epochs);
return Void();
}
state std::string timestamp = BackupAgentBase::formatTime(epochs);
state Version truncVersion = wait(timeKeeperVersionFromDatetime(timestamp, bmData->db));
wait(bc->expireData(truncVersion, true));
TraceEvent("TruncateMutationLogs").detail("Version", truncVersion).detail("Timestamp", timestamp);
CODE_PROBE(true, "Flush blob granules and truncate mutation logs");
} catch (Error& e) {
TraceEvent("TruncateMutationLogsError").error(e); // skip and retry next time
}
return Void();
}
ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) { ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
bmData->initBStore(); bmData->initBStore();
@ -5256,8 +5348,9 @@ ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
break; break;
} }
} }
if (activeRanges) { if (activeRanges && SERVER_KNOBS->BLOB_MANIFEST_BACKUP) {
if (bmData->manifestStore.isValid()) { if (bmData->manifestStore.isValid()) {
wait(maybeFlushAndTruncateMutationLogs(bmData));
wait(dumpManifest(bmData->db, bmData->manifestStore, bmData->epoch, bmData->manifestDumperSeqNo)); wait(dumpManifest(bmData->db, bmData->manifestStore, bmData->epoch, bmData->manifestDumperSeqNo));
bmData->manifestDumperSeqNo++; bmData->manifestDumperSeqNo++;
} else { } else {

View File

@ -260,6 +260,7 @@ public:
self->segmentNo_ = 1; self->segmentNo_ = 1;
self->totalRows_ = 0; self->totalRows_ = 0;
self->logicalSize_ = 0; self->logicalSize_ = 0;
self->totalBytes_ = 0;
wait(waitForAll(self->pendingFutures_)); wait(waitForAll(self->pendingFutures_));
self->pendingFutures_.clear(); self->pendingFutures_.clear();
self->deleteSegmentFiles(); self->deleteSegmentFiles();
@ -999,3 +1000,16 @@ ACTOR Future<Version> getManifestVersion(Database db) {
} }
} }
} }
ACTOR Future<std::string> getMutationLogUrl() {
state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
if (baseUrl.starts_with("file://")) {
state std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, {}));
if (containers.size() == 0) {
throw blob_restore_missing_logs();
}
return containers.back();
} else {
return baseUrl;
}
}

View File

@ -329,27 +329,15 @@ private:
// Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules // Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules
ACTOR static Future<Void> applyMutationLogs(Reference<BlobMigrator> self) { ACTOR static Future<Void> applyMutationLogs(Reference<BlobMigrator> self) {
// check last version in mutation logs // check last version in mutation logs
state std::string mlogsUrl = wait(getMutationLogUrl());
state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
state std::string mlogsUrl;
if (baseUrl.starts_with("file://")) {
state std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, {}));
if (containers.size() == 0) {
TraceEvent("MissingMutationLogs", self->interf_.id()).detail("Url", baseUrl);
throw restore_missing_data();
}
mlogsUrl = containers.front();
} else {
mlogsUrl = baseUrl;
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {}); state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
BackupDescription desc = wait(bc->describeBackup()); BackupDescription desc = wait(bc->describeBackup());
if (!desc.contiguousLogEnd.present()) { if (!desc.contiguousLogEnd.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
throw blob_restore_missing_logs(); throw blob_restore_missing_logs();
} }
if (!desc.minLogBegin.present()) { if (!desc.minLogBegin.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
throw blob_restore_missing_logs(); throw blob_restore_missing_logs();
} }
state Version minLogVersion = desc.minLogBegin.get(); state Version minLogVersion = desc.minLogBegin.get();

View File

@ -171,6 +171,7 @@ ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRange
ACTOR Future<Optional<BlobRestoreArg>> getRestoreArg(Database db, KeyRangeRef range); ACTOR Future<Optional<BlobRestoreArg>> getRestoreArg(Database db, KeyRangeRef range);
ACTOR Future<Version> getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion); ACTOR Future<Version> getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion);
ACTOR Future<Version> getManifestVersion(Database db); ACTOR Future<Version> getManifestVersion(Database db);
ACTOR Future<std::string> getMutationLogUrl();
#include "flow/unactorcompiler.h" #include "flow/unactorcompiler.h"
#endif #endif

View File

@ -25,7 +25,6 @@
#include "fdbclient/BackupContainer.h" #include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/FDBTypes.h" #include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h" #include "fdbclient/SystemData.h"
#include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h"
@ -73,6 +72,10 @@ struct BlobRestoreWorkload : TestWorkload {
if (self->performRestore_) { if (self->performRestore_) {
fmt::print("Perform blob restore\n"); fmt::print("Perform blob restore\n");
// disable manifest backup and log truncation
KnobValueRef knobFalse = KnobValueRef::create(bool{ false });
IKnobCollection::getMutableGlobalKnobCollection().setKnob("blob_manifest_backup", knobFalse);
wait(store(result, self->extraDb_->blobRestore(normalKeys, {}))); wait(store(result, self->extraDb_->blobRestore(normalKeys, {})));
state std::vector<Future<Void>> futures; state std::vector<Future<Void>> futures;
@ -178,8 +181,9 @@ struct BlobRestoreWorkload : TestWorkload {
if (src[i].value != dest[i].value) { if (src[i].value != dest[i].value) {
fmt::print("Value mismatch at {}\n", i); fmt::print("Value mismatch at {}\n", i);
TraceEvent(SevError, "TestFailure") TraceEvent(SevError, "TestFailure")
.detail("Reason", "Key Mismatch") .detail("Reason", "Value Mismatch")
.detail("Index", i) .detail("Index", i)
.detail("Key", src[i].key.printable())
.detail("SrcValue", src[i].value.printable()) .detail("SrcValue", src[i].value.printable())
.detail("DestValue", dest[i].value.printable()); .detail("DestValue", dest[i].value.printable());
return false; return false;