Merge pull request #2202 from etschannen/feature-share-mutations
Backup and DR would not share mutations if started on different versions of FDB
This commit is contained in:
commit
42b7acf7b7
|
@ -812,6 +812,16 @@ ACTOR static Future<Void> _eraseLogData(Reference<ReadYourWritesTransaction> tr,
|
|||
// Disable committing mutations into blog
|
||||
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
|
||||
}
|
||||
|
||||
if(!endVersion.present() && backupVersions.size() == 1) {
|
||||
Standalone<RangeResultRef> existingDestUidValues = wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
|
||||
for(auto it : existingDestUidValues) {
|
||||
if( it.value == destUidValue ) {
|
||||
tr->clear(it.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -1296,19 +1296,25 @@ namespace dbBackup {
|
|||
}
|
||||
|
||||
if (backupRanges.size() == 1) {
|
||||
state Key destUidLookupPath = BinaryWriter::toValue(backupRanges[0], IncludeVersion()).withPrefix(destUidLookupPrefix);
|
||||
Optional<Key> existingDestUidValue = wait(srcTr->get(destUidLookupPath));
|
||||
if (existingDestUidValue.present()) {
|
||||
if (destUidValue == existingDestUidValue.get()) {
|
||||
// due to unknown commit result
|
||||
break;
|
||||
} else {
|
||||
// existing backup/DR is running
|
||||
return Void();
|
||||
Standalone<RangeResultRef> existingDestUidValues = wait(srcTr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
|
||||
bool found = false;
|
||||
for(auto it : existingDestUidValues) {
|
||||
if( BinaryReader::fromStringRef<KeyRange>(it.key.removePrefix(destUidLookupPrefix), IncludeVersion()) == backupRanges[0] ) {
|
||||
if(destUidValue != it.value) {
|
||||
// existing backup/DR is running
|
||||
return Void();
|
||||
} else {
|
||||
// due to unknown commit result
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
srcTr->set(destUidLookupPath, destUidValue);
|
||||
if(found) {
|
||||
break;
|
||||
}
|
||||
|
||||
srcTr->set(BinaryWriter::toValue(backupRanges[0], IncludeVersion(ProtocolVersion::withSharedMutations())).withPrefix(destUidLookupPrefix), destUidValue);
|
||||
}
|
||||
|
||||
Key versionKey = logUidValue.withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix);
|
||||
|
@ -1466,13 +1472,18 @@ namespace dbBackup {
|
|||
|
||||
// Initialize destUid
|
||||
if (backupRanges.size() == 1) {
|
||||
state Key destUidLookupPath = BinaryWriter::toValue(backupRanges[0], IncludeVersion()).withPrefix(destUidLookupPrefix);
|
||||
Optional<Key> existingDestUidValue = wait(srcTr->get(destUidLookupPath));
|
||||
if (existingDestUidValue.present()) {
|
||||
destUidValue = existingDestUidValue.get();
|
||||
} else {
|
||||
Standalone<RangeResultRef> existingDestUidValues = wait(srcTr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
|
||||
bool found = false;
|
||||
for(auto it : existingDestUidValues) {
|
||||
if( BinaryReader::fromStringRef<KeyRange>(it.key.removePrefix(destUidLookupPrefix), IncludeVersion()) == backupRanges[0] ) {
|
||||
destUidValue = it.value;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if( !found ) {
|
||||
destUidValue = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned());
|
||||
srcTr->set(destUidLookupPath, destUidValue);
|
||||
srcTr->set(BinaryWriter::toValue(backupRanges[0], IncludeVersion(ProtocolVersion::withSharedMutations())).withPrefix(destUidLookupPrefix), destUidValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3619,13 +3619,18 @@ public:
|
|||
|
||||
state Key destUidValue(BinaryWriter::toValue(uid, Unversioned()));
|
||||
if (normalizedRanges.size() == 1) {
|
||||
state Key destUidLookupPath = BinaryWriter::toValue(normalizedRanges[0], IncludeVersion()).withPrefix(destUidLookupPrefix);
|
||||
Optional<Key> existingDestUidValue = wait(tr->get(destUidLookupPath));
|
||||
if (existingDestUidValue.present()) {
|
||||
destUidValue = existingDestUidValue.get();
|
||||
} else {
|
||||
Standalone<RangeResultRef> existingDestUidValues = wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
|
||||
bool found = false;
|
||||
for(auto it : existingDestUidValues) {
|
||||
if( BinaryReader::fromStringRef<KeyRange>(it.key.removePrefix(destUidLookupPrefix), IncludeVersion()) == normalizedRanges[0] ) {
|
||||
destUidValue = it.value;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if( !found ) {
|
||||
destUidValue = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned());
|
||||
tr->set(destUidLookupPath, destUidValue);
|
||||
tr->set(BinaryWriter::toValue(normalizedRanges[0], IncludeVersion(ProtocolVersion::withSharedMutations())).withPrefix(destUidLookupPrefix), destUidValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -498,6 +498,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( STATUS_MIN_TIME_BETWEEN_REQUESTS, 0.0 );
|
||||
init( MAX_STATUS_REQUESTS_PER_SECOND, 256.0 );
|
||||
init( CONFIGURATION_ROWS_TO_FETCH, 20000 );
|
||||
init( DISABLE_DUPLICATE_LOG_WARNING, false );
|
||||
|
||||
// IPager
|
||||
init( PAGER_RESERVED_PAGES, 1 );
|
||||
|
|
|
@ -441,6 +441,7 @@ public:
|
|||
double STATUS_MIN_TIME_BETWEEN_REQUESTS;
|
||||
double MAX_STATUS_REQUESTS_PER_SECOND;
|
||||
int CONFIGURATION_ROWS_TO_FETCH;
|
||||
bool DISABLE_DUPLICATE_LOG_WARNING;
|
||||
|
||||
// IPager
|
||||
int PAGER_RESERVED_PAGES;
|
||||
|
|
|
@ -1122,33 +1122,67 @@ ACTOR static Future<JsonBuilderObject> latencyProbeFetcher(Database cx, JsonBuil
|
|||
return statusObj;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> consistencyCheckStatusFetcher(Database cx, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons, bool isAvailable) {
|
||||
if(isAvailable) {
|
||||
try {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
Optional<Value> ccSuspendVal = wait(timeoutError(BUGGIFY ? Never() : tr.get(fdbShouldConsistencyCheckBeSuspended), 5.0));
|
||||
bool ccSuspend = ccSuspendVal.present() ? BinaryReader::fromStringRef<bool>(ccSuspendVal.get(), Unversioned()) : false;
|
||||
if(ccSuspend) {
|
||||
messages->push_back(JsonString::makeMessage("consistencycheck_disabled", "Consistency checker is disabled."));
|
||||
}
|
||||
ACTOR static Future<Void> consistencyCheckStatusFetcher(Database cx, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
|
||||
try {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
Optional<Value> ccSuspendVal = wait(timeoutError(BUGGIFY ? Never() : tr.get(fdbShouldConsistencyCheckBeSuspended), 5.0));
|
||||
bool ccSuspend = ccSuspendVal.present() ? BinaryReader::fromStringRef<bool>(ccSuspendVal.get(), Unversioned()) : false;
|
||||
if(ccSuspend) {
|
||||
messages->push_back(JsonString::makeMessage("consistencycheck_disabled", "Consistency checker is disabled."));
|
||||
}
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
if(e.code() == error_code_timed_out) {
|
||||
messages->push_back(JsonString::makeMessage("consistencycheck_suspendkey_fetch_timeout",
|
||||
format("Timed out trying to fetch `%s` from the database.", printable(fdbShouldConsistencyCheckBeSuspended).c_str()).c_str()));
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
if(e.code() == error_code_timed_out) {
|
||||
messages->push_back(JsonString::makeMessage("consistencycheck_suspendkey_fetch_timeout",
|
||||
format("Timed out trying to fetch `%s` from the database.", printable(fdbShouldConsistencyCheckBeSuspended).c_str()).c_str()));
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
} catch(Error &e) {
|
||||
incomplete_reasons->insert(format("Unable to retrieve consistency check settings (%s).", e.what()));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> logRangeWarningFetcher(Database cx, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
|
||||
try {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
|
||||
Standalone<RangeResultRef> existingDestUidValues = wait(timeoutError(tr.getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY), 5.0));
|
||||
std::set<std::pair<Key,Key>> existingRanges;
|
||||
for(auto it : existingDestUidValues) {
|
||||
KeyRange range = BinaryReader::fromStringRef<KeyRange>(it.key.removePrefix(destUidLookupPrefix), IncludeVersion());
|
||||
std::pair<Key,Key> rangePair = std::make_pair(range.begin,range.end);
|
||||
if(existingRanges.count(rangePair)) {
|
||||
messages->push_back(JsonString::makeMessage("duplicate_mutation_streams", format("Backup and DR are not sharing the same stream of mutations for `%s` - `%s`", printable(range.begin).c_str(), printable(range.end).c_str()).c_str()));
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
existingRanges.insert(rangePair);
|
||||
}
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
if(e.code() == error_code_timed_out) {
|
||||
messages->push_back(JsonString::makeMessage("duplicate_mutation_fetch_timeout",
|
||||
format("Timed out trying to fetch `%s` from the database.", printable(destUidLookupPrefix).c_str()).c_str()));
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
} catch(Error &e) {
|
||||
incomplete_reasons->insert(format("Unable to retrieve consistency check settings (%s).", e.what()));
|
||||
}
|
||||
} catch(Error &e) {
|
||||
incomplete_reasons->insert(format("Unable to retrieve log ranges (%s).", e.what()));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -2222,7 +2256,13 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
statusObj["latency_probe"] = latencyProbeResults;
|
||||
}
|
||||
|
||||
wait(consistencyCheckStatusFetcher(cx, &messages, &status_incomplete_reasons, isAvailable));
|
||||
state std::vector<Future<Void>> warningFutures;
|
||||
if(isAvailable) {
|
||||
warningFutures.push_back( consistencyCheckStatusFetcher(cx, &messages, &status_incomplete_reasons) );
|
||||
if(!SERVER_KNOBS->DISABLE_DUPLICATE_LOG_WARNING) {
|
||||
warningFutures.push_back( logRangeWarningFetcher(cx, &messages, &status_incomplete_reasons) );
|
||||
}
|
||||
}
|
||||
|
||||
// Start getting storage servers now (using system priority) concurrently. Using sys priority because having storage servers
|
||||
// in status output is important to give context to error messages in status that reference a storage server role ID.
|
||||
|
@ -2316,6 +2356,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
else {
|
||||
messages.push_back(JsonBuilder::makeMessage("proxies_error", "Timed out trying to retrieve proxies."));
|
||||
}
|
||||
wait( waitForAll(warningFutures) );
|
||||
}
|
||||
else {
|
||||
// Set layers status to { _valid: false, error: "configurationMissing"}
|
||||
|
|
|
@ -212,6 +212,7 @@ bool Knobs::setKnob( std::string const& knob, std::string const& value ) {
|
|||
}
|
||||
*bool_knobs[knob] = v;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (int64_knobs.count(knob) || int_knobs.count(knob)) {
|
||||
int64_t v;
|
||||
|
|
|
@ -79,6 +79,7 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(0x0FDB00A400040000LL, OpenDatabase);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00A446020000LL, Locality);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00A460010000LL, MultiGenerationTLog);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00A460010000LL, SharedMutations);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00A551000000LL, MultiVersionClient);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00A560010000LL, TagLocality);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B060000000LL, Fearless);
|
||||
|
|
Loading…
Reference in New Issue