Merge pull request #3777 from apple/release-6.3
Merge Release 6.3 into master
This commit is contained in:
commit
a3c861b59d
|
@ -59,11 +59,14 @@ else()
|
|||
set(ROCKSDB_LIBRARIES
|
||||
${BINARY_DIR}/librocksdb.a)
|
||||
|
||||
ExternalProject_Get_Property(rocksdb SOURCE_DIR)
|
||||
set (ROCKSDB_INCLUDE_DIR "${SOURCE_DIR}/include")
|
||||
|
||||
set(ROCKSDB_FOUND TRUE)
|
||||
endif()
|
||||
|
||||
message(STATUS "Found RocksDB library: ${ROCKSDB_LIBRARIES}")
|
||||
message(STATUS "Found RocksDB includes: ${ROCKSDB_INCLUDE_DIRS}")
|
||||
message(STATUS "Found RocksDB includes: ${ROCKSDB_INCLUDE_DIR}")
|
||||
|
||||
mark_as_advanced(
|
||||
ROCKSDB_LIBRARIES
|
||||
|
|
|
@ -12,7 +12,7 @@ endif()
|
|||
# SSL
|
||||
################################################################################
|
||||
include(CheckSymbolExists)
|
||||
|
||||
|
||||
set(DISABLE_TLS OFF CACHE BOOL "Don't try to find OpenSSL and always build without TLS support")
|
||||
if(DISABLE_TLS)
|
||||
set(WITH_TLS OFF)
|
||||
|
@ -107,7 +107,9 @@ endif()
|
|||
################################################################################
|
||||
|
||||
set(SSD_ROCKSDB_EXPERIMENTAL OFF CACHE BOOL "Build with experimental RocksDB support")
|
||||
if (SSD_ROCKSDB_EXPERIMENTAL)
|
||||
# RocksDB is currently enabled by default for GCC but does not build with the latest
|
||||
# Clang.
|
||||
if (SSD_ROCKSDB_EXPERIMENTAL OR GCC)
|
||||
set(WITH_ROCKSDB_EXPERIMENTAL ON)
|
||||
else()
|
||||
set(WITH_ROCKSDB_EXPERIMENTAL OFF)
|
||||
|
|
|
@ -958,5 +958,7 @@ Value makePadding(int size);
|
|||
ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key addPrefix, Key removePrefix);
|
||||
|
||||
void simulateBlobFailure();
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -1343,19 +1343,44 @@ public:
|
|||
|
||||
ACTOR static Future<KeyRange> getSnapshotFileKeyRange_impl(Reference<BackupContainerFileSystem> bc,
|
||||
RangeFile file) {
|
||||
state Reference<IAsyncFile> inFile = wait(bc->readFile(file.fileName));
|
||||
state int readFileRetries = 0;
|
||||
state bool beginKeySet = false;
|
||||
state Key beginKey;
|
||||
state Key endKey;
|
||||
state int64_t j = 0;
|
||||
for (; j < file.fileSize; j += file.blockSize) {
|
||||
int64_t len = std::min<int64_t>(file.blockSize, file.fileSize - j);
|
||||
Standalone<VectorRef<KeyValueRef>> blockData = wait(fileBackup::decodeRangeFileBlock(inFile, j, len));
|
||||
if (!beginKeySet) {
|
||||
beginKey = blockData.front().key;
|
||||
beginKeySet = true;
|
||||
loop {
|
||||
try {
|
||||
state Reference<IAsyncFile> inFile = wait(bc->readFile(file.fileName));
|
||||
beginKeySet = false;
|
||||
state int64_t j = 0;
|
||||
for (; j < file.fileSize; j += file.blockSize) {
|
||||
int64_t len = std::min<int64_t>(file.blockSize, file.fileSize - j);
|
||||
Standalone<VectorRef<KeyValueRef>> blockData =
|
||||
wait(fileBackup::decodeRangeFileBlock(inFile, j, len));
|
||||
if (!beginKeySet) {
|
||||
beginKey = blockData.front().key;
|
||||
beginKeySet = true;
|
||||
}
|
||||
endKey = blockData.back().key;
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_restore_bad_read ||
|
||||
e.code() == error_code_restore_unsupported_file_version ||
|
||||
e.code() == error_code_restore_corrupted_data_padding) { // no retriable error
|
||||
TraceEvent(SevError, "BackupContainerGetSnapshotFileKeyRange").error(e);
|
||||
throw;
|
||||
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
|
||||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
|
||||
// blob http request failure, retry
|
||||
TraceEvent(SevWarnAlways, "BackupContainerGetSnapshotFileKeyRangeConnectionFailure")
|
||||
.detail("Retries", ++readFileRetries)
|
||||
.error(e);
|
||||
wait(delayJittered(0.1));
|
||||
} else {
|
||||
TraceEvent(SevError, "BackupContainerGetSnapshotFileKeyRangeUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
endKey = blockData.back().key;
|
||||
}
|
||||
return KeyRange(KeyRangeRef(beginKey, endKey));
|
||||
}
|
||||
|
|
|
@ -563,7 +563,9 @@ namespace fileBackup {
|
|||
if(rLen != len)
|
||||
throw restore_bad_read();
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
|
||||
simulateBlobFailure();
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
|
||||
state StringRefReader reader(buf, restore_corrupted_data());
|
||||
|
||||
try {
|
||||
|
@ -603,17 +605,17 @@ namespace fileBackup {
|
|||
if(b != 0xFF)
|
||||
throw restore_corrupted_data_padding();
|
||||
|
||||
return results;
|
||||
return results;
|
||||
|
||||
} catch(Error &e) {
|
||||
TraceEvent(SevWarn, "FileRestoreCorruptRangeFileBlock")
|
||||
.error(e)
|
||||
.detail("Filename", file->getFilename())
|
||||
.detail("BlockOffset", offset)
|
||||
.detail("BlockLen", len)
|
||||
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
|
||||
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset);
|
||||
throw;
|
||||
TraceEvent(SevWarn, "FileRestoreDecodeRangeFileBlockFailed")
|
||||
.error(e)
|
||||
.detail("Filename", file->getFilename())
|
||||
.detail("BlockOffset", offset)
|
||||
.detail("BlockLen", len)
|
||||
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
|
||||
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5013,3 +5015,18 @@ ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<K
|
|||
|
||||
return Void();
|
||||
}
|
||||
|
||||
void simulateBlobFailure() {
|
||||
if (BUGGIFY && deterministicRandom()->random01() < 0.01) { // Simulate blob failures
|
||||
double i = deterministicRandom()->random01();
|
||||
if (i < 0.5) {
|
||||
throw http_request_failed();
|
||||
} else if (i < 0.7) {
|
||||
throw connection_failed();
|
||||
} else if (i < 0.8) {
|
||||
throw timed_out();
|
||||
} else if (i < 0.9) {
|
||||
throw lookup_failed();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -89,9 +89,10 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
break;
|
||||
}
|
||||
}
|
||||
TraceEvent("RestoreApplierCore", self->id()).detail("Request", requestTypeStr); // For debug only
|
||||
//TraceEvent("RestoreApplierCore", self->id()).detail("Request", requestTypeStr); // For debug only
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
|
||||
bool isError = e.code() != error_code_operation_cancelled;
|
||||
TraceEvent(isError ? SevError : SevWarnAlways, "FastRestoreApplierError", self->id())
|
||||
.detail("RequestType", requestTypeStr)
|
||||
.error(e, true);
|
||||
actors.clear(false);
|
||||
|
@ -477,7 +478,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state int sets = 0;
|
||||
state int clears = 0;
|
||||
state Key endKey = begin->second.key;
|
||||
state Key endKey = begin->first;
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
|
||||
loop {
|
||||
try {
|
||||
|
@ -507,7 +508,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
endKey = iter != end ? iter->second.key : endKey;
|
||||
endKey = iter != end ? iter->first : endKey;
|
||||
iter++;
|
||||
if (sets > 10000000 || clears > 10000000) {
|
||||
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID)
|
||||
|
@ -521,6 +522,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("End", endKey)
|
||||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load
|
||||
wait(tr->commit());
|
||||
cc->appliedTxns += 1;
|
||||
break;
|
||||
|
|
|
@ -55,7 +55,7 @@ struct StagingKey {
|
|||
LogMessageVersion version; // largest version of set or clear for the key
|
||||
std::map<LogMessageVersion, Standalone<MutationRef>> pendingMutations; // mutations not set or clear type
|
||||
|
||||
explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {}
|
||||
explicit StagingKey(Key key) : key(key), version(0), type(MutationRef::MAX_ATOMIC_OP) {}
|
||||
|
||||
// Add mutation m at newVersion to stagingKey
|
||||
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
|
||||
|
@ -148,7 +148,7 @@ struct StagingKey {
|
|||
}
|
||||
for (; lb != pendingMutations.end(); lb++) {
|
||||
MutationRef mutation = lb->second;
|
||||
if (type == MutationRef::CompareAndClear) { // Special atomicOp
|
||||
if (mutation.type == MutationRef::CompareAndClear) { // Special atomicOp
|
||||
Arena arena;
|
||||
Optional<StringRef> inputVal;
|
||||
if (hasBaseValue()) {
|
||||
|
@ -167,14 +167,14 @@ struct StagingKey {
|
|||
val = applyAtomicOp(inputVal, mutation.param2, (MutationRef::Type)mutation.type);
|
||||
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
||||
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
|
||||
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
||||
type = MutationRef::SetValue;
|
||||
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Context", context)
|
||||
.detail("MutationType", getTypeString(mutation.type))
|
||||
.detail("Version", lb->first.toString());
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation", applierID)
|
||||
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Context", context)
|
||||
.detail("MutationType", getTypeString(mutation.type))
|
||||
|
@ -291,7 +291,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
|
||||
void addMutation(MutationRef m, LogMessageVersion ver) {
|
||||
if (!isRangeMutation(m)) {
|
||||
auto item = stagingKeys.emplace(m.param1, StagingKey());
|
||||
auto item = stagingKeys.emplace(m.param1, StagingKey(m.param1));
|
||||
item.first->second.add(m, ver);
|
||||
} else {
|
||||
stagingKeyRanges.insert(StagingKeyRange(m, ver));
|
||||
|
|
|
@ -312,6 +312,8 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IA
|
|||
int rLen = wait(file->read(mutateString(buf), len, offset));
|
||||
if (rLen != len) throw restore_bad_read();
|
||||
|
||||
simulateBlobFailure();
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
|
||||
state StringRefReader reader(buf, restore_corrupted_data());
|
||||
|
||||
|
|
|
@ -307,6 +307,12 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
|||
if (ongoingReplies[j].isReady()) {
|
||||
std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now();
|
||||
--oustandingReplies;
|
||||
} else if (ongoingReplies[j].isError()) {
|
||||
// When this happens,
|
||||
// the above assertion ASSERT(ongoingReplies.size() == oustandingReplies) will fail
|
||||
TraceEvent(SevError, "FastRestoreGetBatchRepliesReplyError")
|
||||
.detail("OngoingReplyIndex", j)
|
||||
.detail("FutureError", ongoingReplies[j].getError().what());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,9 @@ ACTOR Future<Void> sendMutationsToApplier(
|
|||
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
|
||||
SerializedMutationListMap* mutationMap,
|
||||
Reference<IBackupContainer> bc, RestoreAsset asset);
|
||||
ACTOR static Future<Void> parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
|
||||
SerializedMutationListMap* mutationMap,
|
||||
Reference<IBackupContainer> bc, RestoreAsset asset);
|
||||
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
||||
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
|
||||
|
@ -280,8 +283,8 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
when(wait(error)) { TraceEvent("FastRestoreLoaderActorCollectionError", self->id()); }
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(e.code() == error_code_broken_promise ? SevError : SevWarnAlways, "FastRestoreLoaderError",
|
||||
self->id())
|
||||
bool isError = e.code() != error_code_operation_cancelled; // == error_code_broken_promise
|
||||
TraceEvent(isError ? SevError : SevWarnAlways, "FastRestoreLoaderError", self->id())
|
||||
.detail("RequestType", requestTypeStr)
|
||||
.error(e, true);
|
||||
actors.clear(false);
|
||||
|
@ -354,6 +357,8 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
|
||||
if (rLen != asset.len) throw restore_bad_read();
|
||||
|
||||
simulateBlobFailure();
|
||||
|
||||
TraceEvent("FastRestoreLoaderDecodingLogFile")
|
||||
.detail("BatchIndex", asset.batchIndex)
|
||||
.detail("Filename", asset.filename)
|
||||
|
@ -460,6 +465,39 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
return Void();
|
||||
}
|
||||
|
||||
// wrapper of _parsePartitionedLogFileOnLoader to retry on blob error
|
||||
ACTOR static Future<Void> parsePartitionedLogFileOnLoader(
|
||||
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
|
||||
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
|
||||
Reference<IBackupContainer> bc, RestoreAsset asset) {
|
||||
state int readFileRetries = 0;
|
||||
loop {
|
||||
try {
|
||||
wait(_parsePartitionedLogFileOnLoader(pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc,
|
||||
asset));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
|
||||
e.code() == error_code_restore_corrupted_data_padding) { // no retriable error
|
||||
TraceEvent(SevError, "FastRestoreFileRestoreCorruptedPartitionedLogFileBlock").error(e);
|
||||
throw;
|
||||
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
|
||||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
|
||||
// blob http request failure, retry
|
||||
TraceEvent(SevWarnAlways, "FastRestoreDecodedPartitionedLogFileConnectionFailure")
|
||||
.detail("Retries", ++readFileRetries)
|
||||
.error(e);
|
||||
wait(delayJittered(0.1));
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreParsePartitionedLogFileOnLoaderUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, LoadingParam param,
|
||||
Reference<LoaderBatchData> batchData, UID loaderID,
|
||||
Reference<IBackupContainer> bc) {
|
||||
|
@ -496,12 +534,12 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
|
|||
} else {
|
||||
// TODO: Sanity check the log file's range is overlapped with the restored version range
|
||||
if (param.isPartitionedLog()) {
|
||||
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(pRangeVersions, &processedFileOffset,
|
||||
kvOpsPerLPIter, samplesIter,
|
||||
&batchData->counters, bc, subAsset));
|
||||
fileParserFutures.push_back(parsePartitionedLogFileOnLoader(pRangeVersions, &processedFileOffset,
|
||||
kvOpsPerLPIter, samplesIter,
|
||||
&batchData->counters, bc, subAsset));
|
||||
} else {
|
||||
fileParserFutures.push_back(
|
||||
_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
|
||||
parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -586,9 +624,10 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
state int samplesMessages = fSendSamples.size();
|
||||
wait(waitForAll(fSendSamples));
|
||||
} catch (Error& e) { // In case ci.samples throws broken_promise due to unstable network
|
||||
if (e.code() == error_code_broken_promise) {
|
||||
if (e.code() == error_code_broken_promise || e.code() == error_code_operation_cancelled) {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreLoaderPhaseLoadFileSendSamples")
|
||||
.detail("SamplesMessages", samplesMessages);
|
||||
.detail("SamplesMessages", samplesMessages)
|
||||
.error(e, true);
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreLoaderPhaseLoadFileSendSamplesUnexpectedError").error(e, true);
|
||||
}
|
||||
|
@ -1107,20 +1146,39 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
// Sanity check the range file is within the restored version range
|
||||
ASSERT_WE_THINK(asset.isInVersionRange(version));
|
||||
|
||||
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
|
||||
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
|
||||
state Standalone<VectorRef<KeyValueRef>> blockData;
|
||||
try {
|
||||
Standalone<VectorRef<KeyValueRef>> kvs =
|
||||
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
|
||||
TraceEvent("FastRestoreLoaderDecodedRangeFile")
|
||||
.detail("BatchIndex", asset.batchIndex)
|
||||
.detail("Filename", asset.filename)
|
||||
.detail("DataSize", kvs.contents().size());
|
||||
blockData = kvs;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FileRestoreCorruptRangeFileBlock").error(e);
|
||||
throw;
|
||||
// should retry here
|
||||
state int readFileRetries = 0;
|
||||
loop {
|
||||
try {
|
||||
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same
|
||||
// version
|
||||
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
|
||||
Standalone<VectorRef<KeyValueRef>> kvs =
|
||||
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
|
||||
TraceEvent("FastRestoreLoaderDecodedRangeFile")
|
||||
.detail("BatchIndex", asset.batchIndex)
|
||||
.detail("Filename", asset.filename)
|
||||
.detail("DataSize", kvs.contents().size());
|
||||
blockData = kvs;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
|
||||
e.code() == error_code_restore_corrupted_data_padding) { // no retriable error
|
||||
TraceEvent(SevError, "FastRestoreFileRestoreCorruptedRangeFileBlock").error(e);
|
||||
throw;
|
||||
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
|
||||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
|
||||
// blob http request failure, retry
|
||||
TraceEvent(SevWarnAlways, "FastRestoreDecodedRangeFileConnectionFailure")
|
||||
.detail("Retries", ++readFileRetries)
|
||||
.error(e);
|
||||
wait(delayJittered(0.1));
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreParseRangeFileOnLoaderUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First and last key are the range for this file
|
||||
|
@ -1218,6 +1276,36 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
|
|||
return Void();
|
||||
}
|
||||
|
||||
// retry on _parseLogFileToMutationsOnLoader
|
||||
ACTOR static Future<Void> parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
|
||||
SerializedMutationListMap* pMutationMap,
|
||||
Reference<IBackupContainer> bc, RestoreAsset asset) {
|
||||
state int readFileRetries = 0;
|
||||
loop {
|
||||
try {
|
||||
wait(_parseLogFileToMutationsOnLoader(pProcessedFileOffset, pMutationMap, bc, asset));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
|
||||
e.code() == error_code_restore_corrupted_data_padding) { // non retriable error
|
||||
TraceEvent(SevError, "FastRestoreFileRestoreCorruptedLogFileBlock").error(e);
|
||||
throw;
|
||||
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
|
||||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
|
||||
// blob http request failure, retry
|
||||
TraceEvent(SevWarnAlways, "FastRestoreDecodedLogFileConnectionFailure")
|
||||
.detail("Retries", ++readFileRetries)
|
||||
.error(e);
|
||||
wait(delayJittered(0.1));
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreParseLogFileToMutationsOnLoaderUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Return applier IDs that are used to apply key-values
|
||||
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier) {
|
||||
std::vector<UID> applierIDs;
|
||||
|
|
Loading…
Reference in New Issue