A basic feasible version for exclude and include commands

This commit is contained in:
Chaoguang Lin 2020-07-06 11:02:48 -07:00
parent e2c5938c28
commit 784d0f6d00
7 changed files with 244 additions and 43 deletions

View File

@ -1258,10 +1258,25 @@ struct AutoQuorumChange : IQuorumChange {
};
Reference<IQuorumChange> autoQuorumChange( int desired ) { return Reference<IQuorumChange>(new AutoQuorumChange(desired)); }
void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool failed) {
// TODO : do we set these options by default or not
std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
auto serversVersionKey = failed ? failedServersVersionKey : excludedServersVersionKey;
tr.addReadConflictRange( singleKeyRange(serversVersionKey) ); //To conflict with parallel includeServers
tr.set( serversVersionKey, excludeVersionKey );
for(auto& s : servers) {
if (failed) {
tr.set( encodeFailedServersKey(s), StringRef() );
} else {
tr.set( encodeExcludedServersKey(s), StringRef() );
}
}
TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);
}
ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers, bool failed) {
state Transaction tr(cx);
state Key versionKey = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(),Unversioned());
state std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
loop {
try {
@ -1269,19 +1284,7 @@ ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers,
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
auto serversVersionKey = failed ? failedServersVersionKey : excludedServersVersionKey;
tr.addReadConflictRange( singleKeyRange(serversVersionKey) ); //To conflict with parallel includeServers
tr.set( serversVersionKey, excludeVersionKey );
for(auto& s : servers) {
if (failed) {
tr.set( encodeFailedServersKey(s), StringRef() );
} else {
tr.set( encodeExcludedServersKey(s), StringRef() );
}
}
TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);
excludeServers(tr, servers, failed);
wait( tr.commit() );
return Void();
} catch (Error& e) {

View File

@ -144,6 +144,7 @@ Reference<IQuorumChange> nameQuorumChange(std::string const& name, Reference<IQu
// Exclude the given set of servers from use as state servers. Returns as soon as the change is durable, without necessarily waiting for
// the servers to be evacuated. A NetworkAddress with a port of 0 means all servers on the given IP.
ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers, bool failed = false );
void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool failed = false);
// Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given IP. A NetworkAddress() means
// all servers (don't exclude anything)

View File

@ -802,6 +802,23 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
if (apiVersionAtLeast(700)) {
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::FAILURE,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/failure"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}
));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
std::make_unique<ExcludeServersRangeImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/conf/excluded"), LiteralStringRef("\xff\xff/conf/excluded0")
)), true);
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
@ -853,18 +870,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}
return Optional<Value>();
}));
// TODO : this module should be in version 700 ?
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::FAILURE,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/failure"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}
));
}
throttleExpirer = recurring([this](){ expireThrottles(); }, CLIENT_KNOBS->TAG_THROTTLE_EXPIRATION_INTERVAL);

View File

@ -1758,18 +1758,25 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
}
void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) {
if (key == LiteralStringRef("\xff\xff/reboot_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
return;
}
if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
return;
}
if (key == metadataVersionKey) {
throw client_invalid_operation();
}
if (specialKeys.contains(key)) {
if (key == LiteralStringRef("\xff\xff/reboot_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
return;
}
if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
return;
}
if (getDatabase()->apiVersionAtLeast(700)) {
return getDatabase()->specialKeySpace->set(this, key, value);
}
}
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
@ -1805,6 +1812,13 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
throw used_during_commit();
}
// TODO : enable later
// if (specialKeys.contains(range)) {
// if (getDatabase()->apiVersionAtLeast(700)) {
// return getDatabase()->specialKeySpace->clear(this, range);
// }
// }
KeyRef maxKey = getMaxWriteKey();
if(range.begin > maxKey || range.end > maxKey)
throw key_outside_legal_range();
@ -1844,6 +1858,12 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
throw used_during_commit();
}
if (specialKeys.contains(key)) {
if (getDatabase()->apiVersionAtLeast(700)) {
return getDatabase()->specialKeySpace->clear(this, key);
}
}
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
@ -2109,7 +2129,7 @@ void ReadYourWritesTransaction::resetRyow() {
nativeReadRanges = Standalone<VectorRef<KeyRangeRef>>();
nativeWriteRanges = Standalone<VectorRef<KeyRangeRef>>();
specialKeySpaceWriteMap.rawErase(specialKeys); // TODO : check rawErase
specialKeySpaceErrorMsg.reset();
specialKeySpaceErrorMsg.reset(); // TODO : shoule we clear it every time?
watchMap.clear();
reading = AndFuture();
approximateSize = 0;

View File

@ -153,6 +153,7 @@ public:
bool readYourWritesDisabled() const { return options.readYourWritesDisabled; }
const Optional<std::string>& getSpecialKeySpaceErrorMsg() { return specialKeySpaceErrorMsg; }
void setSpecialKeySpaceErrorMsg(const std::string& msg) { specialKeySpaceErrorMsg = msg; }
Transaction& getTransaction() { return tr; }
private:
friend class RYWImpl;

View File

@ -20,6 +20,8 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/StatusClient.h"
#include "flow/actorcompiler.h" // This must be the last #include.
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
@ -325,7 +327,9 @@ ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction*
if (entry.first) {
writeModulePtrs.insert(sks->getRWImpls().rangeContaining(iter->begin())->value());
}
++iter;
}
TraceEvent(SevInfo, "SKSCommitActor").detail("WriteModulesSize", writeModulePtrs.size());
state std::set<SpecialKeyRangeRWImpl*>::const_iterator it;
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it) {
Optional<std::string> msg = wait((*it)->commit(ryw));
@ -476,7 +480,7 @@ Future<Standalone<RangeResultRef>> ExcludeServersRangeImpl::getRange(ReadYourWri
}
void ExcludeServersRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
// TODO : check value valid
// TODO : check key / value valid
Value val(value);
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(val)));
}
@ -489,8 +493,179 @@ void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRan
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
Future<Optional<std::string>> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
Optional<std::string> result;
bool parseNetWorkAddrFromKeys(ReadYourWritesTransaction* ryw, KeyRangeRef range,
std::vector<AddressExclusion>& addresses, std::set<AddressExclusion>& exclusions,
Optional<std::string>& msg) {
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
auto iter = ranges.begin();
while (iter != ranges.end()) {
auto entry = iter->value();
// only check for exclude(set) operation, include(clear) are not checked
TraceEvent(SevInfo, "ParseNetworkAddress")
.detail("Valid", entry.first)
.detail("Set", entry.second.present())
.detail("Key", iter->begin().toString());
if (entry.first && entry.second.present()) {
Key address = iter->begin().removePrefix(range.begin);
auto a = AddressExclusion::parse(address);
if (!a.isValid()) {
std::string error = "ERROR: \'" + address.toString() + "\' is not a valid network endpoint address\n";
if (address.toString().find(":tls") != std::string::npos)
error += " Do not include the `:tls' suffix when naming a process\n";
msg = ManagementAPIError::toJsonString(false, entry.second.present() ? "exclude" : "include", error);
return false;
}
addresses.push_back(a);
exclusions.insert(a);
}
++iter;
}
return true;
}
ACTOR Future<bool> checkExclusion(Database db, std::vector<AddressExclusion>* addresses,
std::set<AddressExclusion>* exclusions, bool markFailed, Optional<std::string> msg) {
if (markFailed) {
state bool safe;
try {
bool _safe = wait(checkSafeExclusions(db, *addresses));
safe = _safe;
} catch (Error& e) {
TraceEvent("CheckSafeExclusionsError").error(e);
safe = false;
}
if (!safe) {
msg = "ERROR: It is unsafe to exclude the specified servers at this time.\n"
"Please check that this exclusion does not bring down an entire storage team.\n"
"Please also ensure that the exclusion will keep a majority of coordinators alive.\n"
"You may add more storage processes or coordinators to make the operation safe.\n"
"Type `exclude FORCE failed <ADDRESS...>' to exclude without performing safety checks.\n";
return false;
}
}
StatusObject status = wait(StatusClient::statusFetcher(db));
state std::string errorString =
"ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n"
"Please try the exclude again in 30 seconds.\n"
"Type `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n"; // TODO : update msg here
StatusObjectReader statusObj(status);
StatusObjectReader statusObjCluster;
if (!statusObj.get("cluster", statusObjCluster)) {
msg = errorString;
return false;
}
StatusObjectReader processesMap;
if (!statusObjCluster.get("processes", processesMap)) {
msg = errorString;
return false;
}
state int ssTotalCount = 0;
state int ssExcludedCount = 0;
state double worstFreeSpaceRatio = 1.0;
try {
for (auto proc : processesMap.obj()) {
bool storageServer = false;
StatusArray rolesArray = proc.second.get_obj()["roles"].get_array();
for (StatusObjectReader role : rolesArray) {
if (role["role"].get_str() == "storage") {
storageServer = true;
break;
}
}
// Skip non-storage servers in free space calculation
if (!storageServer) continue;
StatusObjectReader process(proc.second);
std::string addrStr;
if (!process.get("address", addrStr)) {
msg = errorString;
return false;
}
NetworkAddress addr = NetworkAddress::parse(addrStr);
bool excluded =
(process.has("excluded") && process.last().get_bool()) || addressExcluded(*exclusions, addr);
ssTotalCount++;
if (excluded) ssExcludedCount++;
if (!excluded) {
StatusObjectReader disk;
if (!process.get("disk", disk)) {
msg = errorString;
return false;
}
int64_t total_bytes;
if (!disk.get("total_bytes", total_bytes)) {
msg = errorString;
return false;
}
int64_t free_bytes;
if (!disk.get("free_bytes", free_bytes)) {
msg = errorString;
return false;
}
worstFreeSpaceRatio = std::min(worstFreeSpaceRatio, double(free_bytes) / total_bytes);
}
}
} catch (...) // std::exception
{
msg = errorString;
return false;
}
if (ssExcludedCount == ssTotalCount ||
(1 - worstFreeSpaceRatio) * ssTotalCount / (ssTotalCount - ssExcludedCount) > 0.9) {
msg =
"ERROR: This exclude may cause the total free space in the cluster to drop below 10%%.\n"
"Type `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n"; // TODO : update message here
return false;
}
return true;
}
void includeServers(ReadYourWritesTransaction* ryw) {
std::string versionKey = deterministicRandom()->randomUniqueID().toString();
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(excludedServersKeys.withPrefix(normalKeys.end));
auto iter = ranges.begin();
Transaction& tr = ryw->getTransaction();
while (iter != ranges.end()) {
auto entry = iter->value();
if (entry.first && !entry.second.present()) {
// ignore failed
tr.addReadConflictRange(singleKeyRange(excludedServersVersionKey));
tr.set(excludedServersVersionKey, versionKey);
KeyRangeRef includingRange = iter->range().removePrefix(normalKeys.end);
tr.clear(includingRange);
}
++iter;
}
}
ACTOR Future<Optional<std::string>> excludeCommitActor(ReadYourWritesTransaction* ryw) {
// parse network addresses
state Optional<std::string> result;
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
TraceEvent(SevInfo, "SKSExclude").detail("ExcludeCommitStart", "");
if (!parseNetWorkAddrFromKeys(ryw, excludedServersKeys.withPrefix(normalKeys.end), addresses, exclusions, result)) return result;
// TODO : all checks before exclude
TraceEvent(SevInfo, "SKSExclude").detail("ExcludeSafetyCheckStart", "");
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, false, result));
if (!safe) return result;
TraceEvent(SevInfo, "SKSExclude").detail("UpdateSystemKeys", "");
excludeServers(ryw->getTransaction(), addresses, false);
includeServers(ryw);
return result;
}
Future<Optional<std::string>> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return excludeCommitActor(ryw);
}

View File

@ -74,12 +74,8 @@ public:
ReadYourWritesTransaction* ryw) = 0; // all delayed async operations of writes in special-key-space
explicit SpecialKeyRangeRWImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
KeyRangeRef getKeyRange() const { return range; }
virtual ~SpecialKeyRangeRWImpl() {}
protected:
KeyRange range;
};
class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeReadImpl {
@ -158,7 +154,7 @@ public:
if (impl == nullptr) throw special_keys_no_module_found(); // TODO : change the error type here
return impl->set(ryw, key, value);
}
// void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range );
// void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range ); // TODO : ban cross module clear
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
auto impl = writeImpls[key];
if (impl == nullptr) throw special_keys_no_module_found(); // TODO : change the error type here