fix: do not allow a storage server to be removed within 5 million versions of it being added, because if a storage server is added and removed within the known committed version and recovery version, they storage server will need see either the add or remove when it peeks

This commit is contained in:
Evan Tschannen 2018-05-05 18:16:28 -07:00
parent 8371afb565
commit b1935f1738
5 changed files with 46 additions and 24 deletions

View File

@ -268,14 +268,20 @@ struct ServerStatus {
};
typedef AsyncMap<UID, ServerStatus> ServerStatusMap;
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID ) {
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
if (canRemove)
return Void();
Version ver = wait( tr.getReadVersion() );
//we cannot remove a server immediately after adding it, because
if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
if (canRemove) {
return Void();
}
}
// Wait for any change to the serverKeys for this server
Void _ = wait( delay(SERVER_KNOBS->ALL_DATA_REMOVED_DELAY, TaskDataDistribution) );
@ -295,7 +301,8 @@ ACTOR Future<Void> storageServerFailureTracker(
ServerStatus *status,
PromiseStream<Void> serverFailures,
int64_t *unhealthyServers,
UID masterId )
UID masterId,
Version addedVersion )
{
loop {
bool unhealthy = statusMap->count(server.id()) && statusMap->get(server.id()).isUnhealthy();
@ -319,7 +326,7 @@ ACTOR Future<Void> storageServerFailureTracker(
TraceEvent("StatusMapChange", masterId).detail("ServerID", server.id()).detail("Status", status->toString()).
detail("Available", IFailureMonitor::failureMonitor().getState(server.waitFailure.getEndpoint()).isAvailable());
}
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, server.id()) : Never() ) ) { break; }
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, server.id(), addedVersion) : Never() ) ) { break; }
}
}
@ -479,7 +486,8 @@ Future<Void> storageServerTracker(
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& changes,
PromiseStream<Void> const& serverFailures,
Promise<Void> const& errorOut);
Promise<Void> const& errorOut,
Version const& addedVersion);
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<IDataDistributionTeam> const& team );
@ -802,7 +810,7 @@ struct DDTeamCollection {
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams.allServers.begin(); i != initTeams.allServers.end(); ++i) {
if (shouldHandleServer(i->first)) {
addServer(i->first, i->second, serverTrackerErrorOut);
addServer(i->first, i->second, serverTrackerErrorOut, 0);
}
}
@ -1156,7 +1164,7 @@ struct DDTeamCollection {
return (includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end() || (otherTrackedDCs.present() && std::find(otherTrackedDCs.get().begin(), otherTrackedDCs.get().end(), newServer.locality.dcId()) == otherTrackedDCs.get().end()));
}
void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise<Void> errorOut ) {
void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise<Void> errorOut, Version addedVersion ) {
if (!shouldHandleServer(newServer)) {
return;
}
@ -1164,7 +1172,7 @@ struct DDTeamCollection {
TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("address", newServer.waitFailure.getEndpoint().address);
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass ) );
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, serverFailures, errorOut );
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, serverFailures, errorOut, addedVersion );
restartTeamBuilder.trigger();
}
@ -1515,7 +1523,7 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, Fu
currentInterfaceChanged.send( std::make_pair(ssi,processClass) );
}
} else if( !self->recruitingIds.count(ssi.id()) ) {
self->addServer( ssi, processClass, self->serverTrackerErrorOut );
self->addServer( ssi, processClass, self->serverTrackerErrorOut, tr.getReadVersion().get() );
self->doBuildTeams = true;
}
}
@ -1567,7 +1575,8 @@ ACTOR Future<Void> storageServerTracker(
std::map<UID, Reference<TCServerInfo>>* other_servers,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > changes,
PromiseStream<Void> serverFailures,
Promise<Void> errorOut)
Promise<Void> errorOut,
Version addedVersion)
{
state Future<Void> failureTracker;
state ServerStatus status( false, false, server->lastKnownInterface.locality );
@ -1651,7 +1660,7 @@ ACTOR Future<Void> storageServerTracker(
otherChanges.push_back( self->excludedServers.onChange( addr ) );
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );
failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId );
failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId, addedVersion );
//We need to recruit new storage servers if the key value store type has changed
if(hasWrongStoreTypeOrDC)
@ -1766,7 +1775,7 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl
self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.address());
ErrorOr<StorageServerInterface> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr, TaskDataDistribution ) );
ErrorOr<InitializeStorageReply> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr, TaskDataDistribution ) );
self->recruitingIds.erase(interfaceId);
self->recruitingLocalities.erase(candidateWorker.worker.address());
@ -1782,8 +1791,8 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl
Void _ = wait( delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskDataDistribution) );
}
else if( newServer.present() ) {
if( !self->server_info.count( newServer.get().id() ) )
self->addServer( newServer.get(), candidateWorker.processClass, self->serverTrackerErrorOut );
if( !self->server_info.count( newServer.get().interf.id() ) )
self->addServer( newServer.get().interf, candidateWorker.processClass, self->serverTrackerErrorOut, newServer.get().addedVersion );
else
TraceEvent(SevWarn, "DDRecruitmentError").detail("Reason", "Server ID already recruited");

View File

@ -146,12 +146,22 @@ struct InitializeResolverRequest {
}
};
struct InitializeStorageReply {
StorageServerInterface interf;
Version addedVersion;
template <class Ar>
void serialize(Ar& ar) {
ar & interf & addedVersion;
}
};
struct InitializeStorageRequest {
Tag seedTag; //< If this server will be passed to seedShardServers, this will be a tag, otherwise it is invalidTag
UID reqId;
UID interfaceId;
KeyValueStoreType storeType;
ReplyPromise< struct StorageServerInterface > reply;
ReplyPromise< InitializeStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
@ -288,7 +298,7 @@ Future<Void> storageServer(
class IKeyValueStore* const& persistentData,
StorageServerInterface const& ssi,
Tag const& seedTag,
ReplyPromise<StorageServerInterface> const& recruitReply,
ReplyPromise<InitializeStorageReply> const& recruitReply,
Reference<AsyncVar<ServerDBInfo>> const& db,
std::string const& folder );
Future<Void> storageServer(

View File

@ -338,7 +338,7 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfig
isr.reqId = g_random->randomUniqueID();
isr.interfaceId = g_random->randomUniqueID();
ErrorOr<StorageServerInterface> newServer = wait( recruits.storageServers[idx].storage.tryGetReply( isr ) );
ErrorOr<InitializeStorageReply> newServer = wait( recruits.storageServers[idx].storage.tryGetReply( isr ) );
if( newServer.isError() ) {
if( !newServer.isError( error_code_recruitment_failed ) && !newServer.isError( error_code_request_maybe_delivered ) )
@ -357,7 +357,7 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfig
tag.id++;
idx++;
servers->push_back( newServer.get() );
servers->push_back( newServer.get().interf );
}
}

View File

@ -3239,7 +3239,7 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
return false;
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<StorageServerInterface> recruitReply,
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
{
state StorageServer self(persistentData, db, ssi);
@ -3260,7 +3260,10 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
Void _ = wait( self.storage.commit() );
TraceEvent("StorageServerInit", ssi.id()).detail("Version", self.version.get()).detail("SeedTag", seedTag.toString());
recruitReply.send(ssi);
InitializeStorageReply rep;
rep.interf = ssi;
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
Void _ = wait( storageServerCore(&self, ssi) );

View File

@ -487,7 +487,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
state ActorCollection filesClosed(true);
state Promise<Void> stopping;
state WorkerCache<StorageServerInterface> storageCache;
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo(LiteralStringRef("DB"))) );
state Future<Void> metricsLogger;
state UID processIDUid;
@ -761,7 +761,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
IKeyValueStore* data = openKVStore( req.storeType, filename, recruited.id(), memoryLimit );
Future<Void> kvClosed = data->onClosed();
filesClosed.add( kvClosed );
ReplyPromise<StorageServerInterface> storageReady = req.reply;
ReplyPromise<InitializeStorageReply> storageReady = req.reply;
storageCache.set( req.reqId, storageReady.getFuture() );
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
s = handleIOErrors(s, data, recruited.id(), kvClosed);