Add read impl for inProgressExclusionServers

This commit is contained in:
Chaoguang Lin 2020-07-13 13:35:25 -07:00
parent b0b601478c
commit 60d1bfc247
3 changed files with 64 additions and 0 deletions

View File

@ -822,6 +822,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
std::make_unique<FailedServersRangeImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/conf/failed/"), LiteralStringRef("\xff\xff/conf/failed0")
)), true);
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
std::make_unique<ExclusionInProgressRangeImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/conf/inProgressExclusion/"), LiteralStringRef("\xff\xff/conf/inProgressExclusion0")
)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));

View File

@ -785,4 +785,58 @@ ACTOR Future<Optional<std::string>> failedServerCommitActor(ReadYourWritesTransa
Future<Optional<std::string>> FailedServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return failedServerCommitActor(ryw);
}
ACTOR Future<Standalone<RangeResultRef>> ExclusionInProgressActor(ReadYourWritesTransaction *ryw, KeyRef prefix, KeyRangeRef kr) {
state Standalone<RangeResultRef> result;
// TODO : get all inprogress excluded servers
state Transaction& tr = ryw->getTransaction();
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
state std::vector<AddressExclusion> excl = wait((getExcludedServers(&tr)));
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
state std::set<NetworkAddress> inProgressExclusion;
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery
// Check that there aren't any storage servers with addresses violating the exclusions
state Standalone<RangeResultRef> serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
for(auto& s : serverList) {
auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses;
if ( addressExcluded(exclusions, addresses.address) ) {
inProgressExclusion.insert(addresses.address);
}
if ( addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get()) ) {
inProgressExclusion.insert(addresses.secondaryAddress.get());
}
}
Optional<Standalone<StringRef>> value = wait( tr.get(logsKey) );
ASSERT(value.present());
auto logs = decodeLogsValue(value.get());
for( auto const& log : logs.first ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
inProgressExclusion.insert(log.second);
}
}
for( auto const& log : logs.second ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
inProgressExclusion.insert(log.second);
}
}
for( auto const& address : inProgressExclusion ) {
Key addrKey = prefix.withSuffix(address.toString());
if (kr.contains(addrKey))
result.push_back(result.arena(), KeyValueRef(addrKey, ValueRef()));
}
return result;
}
ExclusionInProgressRangeImpl::ExclusionInProgressRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
Future<Standalone<RangeResultRef>> ExclusionInProgressRangeImpl::getRange(ReadYourWritesTransaction *ryw, KeyRangeRef kr) const {
return ExclusionInProgressActor(ryw, getKeyRange().begin, kr);
}

View File

@ -227,5 +227,11 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ExclusionInProgressRangeImpl : public SpecialKeyRangeAsyncImpl {
public:
explicit ExclusionInProgressRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
#include "flow/unactorcompiler.h"
#endif