Implemented direct removal of failed storage server from system keyspace
This commit is contained in:
parent
c908c6c1db
commit
66bba51988
|
@ -132,7 +132,8 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
|
|||
}
|
||||
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
|
||||
if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().castTo<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change
|
||||
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) {
|
||||
if((!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) &&
|
||||
(!m.param1.startsWith( failedServersPrefix ) && m.param1 != failedServersVersionKey)) {
|
||||
auto t = txnStateStore->readValue(m.param1).get();
|
||||
TraceEvent("MutationRequiresRestart", dbgid)
|
||||
.detail("M", m.toString())
|
||||
|
|
|
@ -594,6 +594,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
AsyncVar<bool> zeroOptimalTeams;
|
||||
|
||||
AsyncMap< AddressExclusion, bool > excludedServers; // true if an address is in the excluded list in the database. Updated asynchronously (eventually)
|
||||
std::set< AddressExclusion > failedServers;
|
||||
|
||||
std::vector<Optional<Key>> includedDCs;
|
||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
|
||||
|
@ -2949,13 +2950,16 @@ ACTOR Future<Void> trackExcludedServers( DDTeamCollection* self ) {
|
|||
std::set<AddressExclusion> excluded;
|
||||
for(auto r = excludedResults.begin(); r != excludedResults.end(); ++r) {
|
||||
AddressExclusion addr = decodeExcludedServersKey(r->key);
|
||||
if (addr.isValid())
|
||||
if (addr.isValid()) {
|
||||
excluded.insert( addr );
|
||||
}
|
||||
}
|
||||
for(auto r = failedResults.begin(); r != failedResults.end(); ++r) {
|
||||
AddressExclusion addr = decodeFailedServersKey(r->key);
|
||||
if (addr.isValid())
|
||||
if (addr.isValid()) {
|
||||
excluded.insert( addr );
|
||||
self->failedServers.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("DDExcludedServersChanged", self->distributorId)
|
||||
|
@ -3133,7 +3137,12 @@ ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version add
|
|||
//we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server.
|
||||
if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
|
||||
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
|
||||
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) {
|
||||
TraceEvent("FailedServerDataRemoved")
|
||||
.detail("CanRemove", canRemove)
|
||||
.detail("NumShards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID));
|
||||
// Current implementation of server erasure is sort of a hack that sets # shards to 0
|
||||
// Defensive check for negative values instead of just 0
|
||||
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) <= 0) {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -3306,13 +3315,20 @@ ACTOR Future<Void> storageServerTracker(
|
|||
|
||||
// If the storage server is in the excluded servers list, it is undesired
|
||||
NetworkAddress a = server->lastKnownInterface.address();
|
||||
AddressExclusion addr( a.ip, a.port );
|
||||
AddressExclusion ipaddr( a.ip );
|
||||
state AddressExclusion addr( a.ip, a.port );
|
||||
state AddressExclusion ipaddr( a.ip );
|
||||
if (self->excludedServers.get( addr ) || self->excludedServers.get( ipaddr )) {
|
||||
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId).detail("Server", server->id)
|
||||
.detail("Excluded", self->excludedServers.get( addr ) ? addr.toString() : ipaddr.toString());
|
||||
status.isUndesired = true;
|
||||
status.isWrongConfiguration = true;
|
||||
if (self->failedServers.find(addr) != self->failedServers.end()) {
|
||||
TraceEvent("FailedServerRemoveKeys")
|
||||
.detail("Address", addr.toString())
|
||||
.detail("ServerID", server->id);
|
||||
wait(removeKeysFromFailedServer(cx, server->id, self->lock));
|
||||
self->shardsAffectedByTeamFailure->eraseServer(server->id);
|
||||
}
|
||||
}
|
||||
otherChanges.push_back( self->excludedServers.onChange( addr ) );
|
||||
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );
|
||||
|
|
|
@ -174,6 +174,7 @@ public:
|
|||
void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam );
|
||||
void finishMove( KeyRangeRef keys );
|
||||
void check();
|
||||
void eraseServer(UID ssID);
|
||||
private:
|
||||
struct OrderByTeamKey {
|
||||
bool operator()( const std::pair<Team,KeyRange>& lhs, const std::pair<Team,KeyRange>& rhs ) const {
|
||||
|
|
|
@ -713,6 +713,10 @@ void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
|
|||
}
|
||||
}
|
||||
|
||||
void ShardsAffectedByTeamFailure::eraseServer(UID ssID) {
|
||||
storageServerShards[ssID] = 0;
|
||||
}
|
||||
|
||||
void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
|
||||
if(team_shards.insert( std::pair<Team,KeyRange>( team, range ) ).second) {
|
||||
for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)
|
||||
|
|
|
@ -918,6 +918,65 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKeysLock lock) {
|
||||
state Transaction tr( cx );
|
||||
loop {
|
||||
try {
|
||||
tr.info.taskID = TaskPriority::MoveKeys;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
wait( checkMoveKeysLock(&tr, lock) );
|
||||
TraceEvent("RemoveKeysFromFailedServerLocked").detail("ServerID", serverID).detail("Version", tr.getReadVersion().get());
|
||||
// Get all values of keyServers and remove serverID from every occurrence
|
||||
// Very inefficient going over every entry in keyServers
|
||||
// No shortcut because keyServers and serverKeys are not guaranteed same shard boundaries (change this?)
|
||||
state Standalone<RangeResultRef> keyServers = wait( krmGetRanges(&tr, keyServersPrefix, allKeys) );
|
||||
state KeyValueRef* it = keyServers.begin();
|
||||
for ( ; it != keyServers.end() ; ++it) {
|
||||
state vector<UID> src;
|
||||
state vector<UID> dest;
|
||||
decodeKeyServersValue(it->value, src, dest);
|
||||
TraceEvent("FailedServerCheckpoint1.0")
|
||||
.detail("Key", keyServersKey(it->key));
|
||||
for (UID i : src) {
|
||||
TraceEvent("FailedServerCheckpoint1.0Src")
|
||||
.detail("UID", i);
|
||||
}
|
||||
for (UID i : dest) {
|
||||
TraceEvent("FailedServerCheckpoint1.0Dest")
|
||||
.detail("UID", i);
|
||||
}
|
||||
// // The failed server is not present
|
||||
// if (std::find(src.begin(), src.end(), serverID) == src.end() && std::find(dest.begin(), dest.end(), serverID) == dest.end() ) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// Update the vectors to remove failed server then set the value again
|
||||
// Dest is usually empty, but keep this in case there is parallel data movement (?)
|
||||
src.erase(std::remove(src.begin(), src.end(), serverID), src.end());
|
||||
dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end());
|
||||
TraceEvent("FailedServerCheckpoint1.1")
|
||||
.detail("Key", keyServersKey(it->key));
|
||||
for (UID i : src) {
|
||||
TraceEvent("FailedServerCheckpoint1.1Src")
|
||||
.detail("UID", i);
|
||||
}
|
||||
for (UID i : dest) {
|
||||
TraceEvent("FailedServerCheckpoint1.1Dest")
|
||||
.detail("UID", i);
|
||||
}
|
||||
tr.set(keyServersKey(it->key), keyServersValue(src, dest));
|
||||
}
|
||||
|
||||
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
|
||||
wait( krmSetRangeCoalescing( &tr, serverKeysPrefixFor(serverID), allKeys, allKeys, serverKeysFalse) );
|
||||
wait( tr.commit() );
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> moveKeys(
|
||||
Database cx,
|
||||
KeyRange keys,
|
||||
|
|
|
@ -82,6 +82,9 @@ ACTOR Future<Void> removeStorageServer(Database cx, UID serverID, MoveKeysLock l
|
|||
ACTOR Future<bool> canRemoveStorageServer(Transaction* tr, UID serverID);
|
||||
// Returns true if the given storage server has no keys assigned to it and may be safely removed
|
||||
// Obviously that could change later!
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKeysLock lock);
|
||||
// Directly removes serverID from serverKeys and keyServers system keyspace.
|
||||
// Performed when a storage server is marked as permanently failed.
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue