Merge pull request #8819 from sfc-gh-huliu/fix
BlobManifest - add limits for getRange and transactions for resiliency
This commit is contained in:
commit
1bc6e4cc6f
|
@ -1025,6 +1025,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 );
|
||||
init( BLOB_FULL_RESTORE_MODE, false );
|
||||
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
|
||||
init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000);
|
||||
|
||||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
|
|
@ -996,6 +996,7 @@ public:
|
|||
double BLOB_MANIFEST_BACKUP_INTERVAL;
|
||||
bool BLOB_FULL_RESTORE_MODE;
|
||||
double BLOB_MIGRATOR_CHECK_INTERVAL;
|
||||
int BLOB_MANIFEST_RW_ROWS;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
|
@ -137,10 +138,23 @@ private:
|
|||
blobRangeKeys // Key ranges managed by blob
|
||||
};
|
||||
for (auto range : ranges) {
|
||||
// todo use getRangeStream for better performance
|
||||
RangeResult result = wait(tr.getRange(range, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
for (auto& row : result) {
|
||||
rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value));
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(range.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(range.end);
|
||||
loop {
|
||||
RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : result) {
|
||||
rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value));
|
||||
}
|
||||
if (!result.more) {
|
||||
break;
|
||||
}
|
||||
if (result.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(result.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(result.end()[-1].key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
|
@ -152,6 +166,13 @@ private:
|
|||
|
||||
// Write data to blob manifest file
|
||||
ACTOR static Future<Void> writeToFile(Reference<BlobManifestDumper> self, Value data) {
|
||||
static int32_t lastWrittenBytes = 0;
|
||||
if (data.size() == lastWrittenBytes) {
|
||||
dprint("Skip writting blob manifest with same size {}\n", lastWrittenBytes);
|
||||
return Void();
|
||||
}
|
||||
lastWrittenBytes = data.size();
|
||||
|
||||
state Reference<BackupContainerFileSystem> writer;
|
||||
state std::string fullPath;
|
||||
|
||||
|
@ -212,7 +233,7 @@ public:
|
|||
ACTOR static Future<Void> execute(Reference<BlobManifestLoader> self) {
|
||||
try {
|
||||
Value data = wait(readFromFile(self));
|
||||
Standalone<BlobManifest> manifest = decode(data);
|
||||
state Standalone<BlobManifest> manifest = decode(data);
|
||||
wait(writeSystemKeys(self, manifest.rows));
|
||||
BlobGranuleRestoreVersionVector _ = wait(listGranules(self));
|
||||
} catch (Error& e) {
|
||||
|
@ -231,13 +252,32 @@ public:
|
|||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
try {
|
||||
std::vector<KeyRangeRef> granules;
|
||||
state Standalone<VectorRef<KeyRef>> blobRanges;
|
||||
// Read all granules
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(blobGranuleMappingKeys.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(blobGranuleMappingKeys.end);
|
||||
loop {
|
||||
RangeResult rows = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : rows) {
|
||||
blobRanges.push_back_deep(blobRanges.arena(), row.key);
|
||||
}
|
||||
if (!rows.more) {
|
||||
break;
|
||||
}
|
||||
if (rows.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(rows.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(rows.end()[-1].key);
|
||||
}
|
||||
}
|
||||
|
||||
// check each granule range
|
||||
state int i = 0;
|
||||
auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED;
|
||||
state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit));
|
||||
for (i = 0; i < blobRanges.size() - 1; i++) {
|
||||
Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key startKey = blobRanges[i].removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].removePrefix(blobGranuleMappingKeys.begin);
|
||||
state KeyRange granuleRange = KeyRangeRef(startKey, endKey);
|
||||
try {
|
||||
Standalone<BlobGranuleRestoreVersion> granule = wait(getGranule(&tr, granuleRange));
|
||||
|
@ -300,17 +340,32 @@ private:
|
|||
|
||||
// Write system keys to database
|
||||
ACTOR static Future<Void> writeSystemKeys(Reference<BlobManifestLoader> self, VectorRef<KeyValueRef> rows) {
|
||||
state int start = 0;
|
||||
state int end = 0;
|
||||
for (start = 0; start < rows.size(); start = end) {
|
||||
end = std::min(start + SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS, rows.size());
|
||||
wait(writeSystemKeys(self, rows, start, end));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Write system keys from start index to end(exclusive), so that we don't exceed the limit of transaction limit
|
||||
ACTOR static Future<Void> writeSystemKeys(Reference<BlobManifestLoader> self,
|
||||
VectorRef<KeyValueRef> rows,
|
||||
int start,
|
||||
int end) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
for (auto& row : rows) {
|
||||
tr.set(row.key, row.value);
|
||||
for (int i = start; i < end; ++i) {
|
||||
tr.set(rows[i].key, rows[i].value);
|
||||
}
|
||||
wait(tr.commit());
|
||||
dprint("Blob manifest loaded {} rows\n", rows.size());
|
||||
dprint("Blob manifest loaded rows from {} to {}\n", start, end);
|
||||
TraceEvent("BlobManifestLoader").detail("RowStart", start).detail("RowEnd", end);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
@ -324,8 +379,7 @@ private:
|
|||
KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
// reverse lookup so that the first row is the newest version
|
||||
state RangeResult results =
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::False, Reverse::True));
|
||||
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::True, Reverse::True));
|
||||
for (KeyValueRef row : results) {
|
||||
state KeyRange keyRange;
|
||||
state Version version;
|
||||
|
@ -367,24 +421,39 @@ private:
|
|||
|
||||
// List all files for given granule
|
||||
ACTOR static Future<std::vector<GranuleFileVersion>> listGranuleFiles(Transaction* tr, UID granuleID) {
|
||||
state std::vector<GranuleFileVersion> files;
|
||||
|
||||
state KeyRange fileKeyRange = blobGranuleFileKeyRangeFor(granuleID);
|
||||
RangeResult results = wait(tr->getRange(fileKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(fileKeyRange.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(fileKeyRange.end);
|
||||
loop {
|
||||
RangeResult results = wait(tr->getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : results) {
|
||||
UID gid;
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
Standalone<StringRef> filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::vector<GranuleFileVersion> files;
|
||||
for (auto& row : results) {
|
||||
UID gid;
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
Standalone<StringRef> filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) =
|
||||
decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
}
|
||||
if (!results.more) {
|
||||
break;
|
||||
}
|
||||
if (results.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(results.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(results.end()[-1].key);
|
||||
}
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
@ -466,12 +535,26 @@ ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
|
|||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
RangeResult ranges = wait(tr.getRange(blobRestoreCommandKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (auto& r : ranges) {
|
||||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keyRange.contains(keys)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
return status.progress < 100; // progress is less than 100
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreCommandKeys.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(blobRestoreCommandKeys.end);
|
||||
loop {
|
||||
RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& r : ranges) {
|
||||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keyRange.contains(keys)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
return status.progress < 100; // progress is less than 100
|
||||
}
|
||||
}
|
||||
if (!ranges.more) {
|
||||
break;
|
||||
}
|
||||
if (ranges.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(ranges.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(ranges.end()[-1].key);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue