StorageEngineSwitch:Use AsyncVar to signal server to remove
Trigger does not have an effect if the receiver is not waiting on the trigger. To ensure the wrong store type server that is selected to be removed is removed, we should use an AysncVar<bool> to trigger the storage tracker.
This commit is contained in:
parent
a588710376
commit
b216cd2516
|
@ -59,7 +59,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
|
|||
bool inDesiredDC;
|
||||
LocalityEntry localityEntry;
|
||||
Promise<Void> updated;
|
||||
AsyncTrigger wrongStoreTypeRemoved; // wrongStoreTypeRemoved
|
||||
AsyncVar<bool> wrongStoreTypeRemoved; // wrongStoreTypeRemoved
|
||||
int toRemove; // Debug purpose: 0: not remove, >0: to remove due to wrongStoreType
|
||||
// A storage server's StoreType does not change.
|
||||
//To change storeType for an ip:port, we destroy the old one and create a new one.
|
||||
|
@ -185,11 +185,9 @@ public:
|
|||
|
||||
// TeamCollection's server team info.
|
||||
class TCTeamInfo : public ReferenceCounted<TCTeamInfo>, public IDataDistributionTeam {
|
||||
private:
|
||||
public:
|
||||
vector< Reference<TCServerInfo> > servers;
|
||||
vector<UID> serverIDs;
|
||||
|
||||
public:
|
||||
Reference<TCMachineTeamInfo> machineTeam;
|
||||
Future<Void> tracker;
|
||||
bool healthy;
|
||||
|
@ -660,8 +658,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
Reference<AsyncVar<bool>> processingUnhealthy)
|
||||
: cx(cx), distributorId(distributorId), lock(lock), output(output),
|
||||
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
|
||||
teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), redundantMachineTeamRemover(Void()),
|
||||
redundantServerTeamRemover(Void()), configuration(configuration), readyToStart(readyToStart),
|
||||
teamBuilder(Void()), badTeamRemover(Void()), configuration(configuration), readyToStart(readyToStart),
|
||||
clearHealthyZoneFuture(true),
|
||||
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)),
|
||||
initialFailureReactionDelay(
|
||||
|
@ -2542,9 +2539,19 @@ bool inCorrectDC(DDTeamCollection* self, TCServerInfo* server) {
|
|||
self->includedDCs.end());
|
||||
}
|
||||
|
||||
// Is there any healthy team whose members do not include serverID
|
||||
bool existOtherHealthyTeams(DDTeamCollection* self, UID serverID) {
|
||||
for (auto& team : self->teams) {
|
||||
if (team->isHealthy() && std::count(team->serverIDs.begin(),team->serverIDs.end(), serverID) == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self) {
|
||||
state int numServersRemoved = 0;
|
||||
state std::map<UID, Reference<TCServerInfo>>::iterator server;
|
||||
state vector<Reference<TCServerInfo>> serversToRemove;
|
||||
state int i = 0;
|
||||
|
||||
|
@ -2555,43 +2562,29 @@ ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self) {
|
|||
.detail("ServerInfoSize", self->server_info.size())
|
||||
.detail("SysRestoreType", self->configuration.storageServerStoreType);
|
||||
serversToRemove.clear();
|
||||
for (server = self->server_info.begin(); server != self->server_info.end(); ++server) {
|
||||
NetworkAddress a = server->second->lastKnownInterface.address();
|
||||
for (auto& server : self->server_info) {
|
||||
NetworkAddress a = server.second->lastKnownInterface.address();
|
||||
AddressExclusion addr(a.ip, a.port);
|
||||
TraceEvent("WrongStoreTypeRemover", self->distributorId)
|
||||
.detail("DDID", self->distributorId)
|
||||
.detail("Server", server->first)
|
||||
.detail("Server", server.first)
|
||||
.detail("Addr", addr.toString())
|
||||
.detail("StoreType", server->second->storeType)
|
||||
.detail("StoreType", server.second->storeType)
|
||||
.detail("IsCorrectStoreType",
|
||||
server->second->isCorrectStoreType(self->configuration.storageServerStoreType))
|
||||
.detail("ToRemove", server->second->toRemove);
|
||||
if (!server->second->isCorrectStoreType(self->configuration.storageServerStoreType) ||
|
||||
!inCorrectDC(self, server->second.getPtr())) {
|
||||
serversToRemove.push_back(server->second);
|
||||
server.second->isCorrectStoreType(self->configuration.storageServerStoreType))
|
||||
.detail("ToRemove", server.second->toRemove);
|
||||
if (!server.second->isCorrectStoreType(self->configuration.storageServerStoreType) && existOtherHealthyTeams(self, server.first)) {
|
||||
// Only remove a server if there exist at least a healthy team that do not include the server,
|
||||
// so that the server's data can be moved away
|
||||
serversToRemove.push_back(server.second);
|
||||
server.second->toRemove++;
|
||||
server.second->wrongStoreTypeRemoved.set(true);
|
||||
break; // Remove a server will change teams' healthyness
|
||||
} else {
|
||||
server->second->toRemove =
|
||||
0; // In case the configuration.storeType is changed back to the server's type
|
||||
// In case the configuration.storeType is changed back to the server's type
|
||||
server.second->toRemove = 0;
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < serversToRemove.size(); i++) {
|
||||
Reference<TCServerInfo> s = serversToRemove[i];
|
||||
if (s.isValid()) {
|
||||
s->toRemove++; // The server's location will not be excluded
|
||||
s->wrongStoreTypeRemoved.trigger();
|
||||
ASSERT(s->toRemove >= 0);
|
||||
wait(delay(1.0));
|
||||
}
|
||||
}
|
||||
|
||||
if (!serversToRemove.empty() || self->healthyTeamCount == 0) {
|
||||
TraceEvent("WrongStoreTypeRemover").detail("KickTeamBuilder", "Start");
|
||||
self->restartRecruiting.trigger();
|
||||
self->doBuildTeams = true;
|
||||
wait(delay(5.0)); // I have to add delay here; otherwise, it will immediately go to the next loop and print
|
||||
// WrongStoreTypeRemoverStartLoop. Why?!
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2906,7 +2899,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
.detail("Primary", self->primary);
|
||||
self->traceAllInfo(true);
|
||||
// Create a new team for safe
|
||||
self->restartRecruiting.trigger();
|
||||
// self->restartRecruiting.trigger();
|
||||
self->doBuildTeams = true;
|
||||
self->restartTeamBuilder.trigger();
|
||||
}
|
||||
|
@ -3246,8 +3239,8 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
|
|||
}
|
||||
}
|
||||
|
||||
//Returns the KeyValueStoreType of server if it is different from self->storeType
|
||||
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
|
||||
// Set the server's storeType
|
||||
ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
|
||||
try {
|
||||
// Update server's storeType, especially when it was created
|
||||
state KeyValueStoreType type = wait(
|
||||
|
@ -3257,17 +3250,12 @@ ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self,
|
|||
if (server->storeType == self->configuration.storageServerStoreType) {
|
||||
server->toRemove = 0; // In case sys config is changed back to the server's storeType
|
||||
}
|
||||
if (server->storeType == self->configuration.storageServerStoreType &&
|
||||
(self->includedDCs.empty() ||
|
||||
std::find(self->includedDCs.begin(), self->includedDCs.end(),
|
||||
server->lastKnownInterface.locality.dcId()) != self->includedDCs.end())) {
|
||||
wait(Future<Void>(Never()));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// Failed server should be removed by storageServerTracker
|
||||
wait(Future<Void>(Never()));
|
||||
}
|
||||
|
||||
return type;
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams ) {
|
||||
|
@ -3405,15 +3393,15 @@ ACTOR Future<Void> storageServerTracker(
|
|||
state Future<Void> metricsTracker = serverMetricsPolling( server );
|
||||
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
|
||||
|
||||
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
|
||||
state Future<Void> storeTypeTracker = keyValueStoreTypeTracker( self, server );
|
||||
state bool hasWrongDC = !inCorrectDC(self, server);
|
||||
state bool toRemoveWrongStoreType = false;
|
||||
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
|
||||
|
||||
try {
|
||||
loop {
|
||||
status.isUndesired = false;
|
||||
status.isWrongConfiguration = false;
|
||||
hasWrongDC = !inCorrectDC(self, server);
|
||||
|
||||
// If there is any other server on this exact NetworkAddress, this server is undesired and will eventually
|
||||
// be eliminated. This samAddress checking must be redo whenever the server's state (e.g., storeType,
|
||||
|
@ -3483,7 +3471,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
status.isUndesired = true;
|
||||
status.isWrongConfiguration = true;
|
||||
}
|
||||
if (toRemoveWrongStoreType) { // TODO: merge with the above if (hasWrongDC)
|
||||
if (server->wrongStoreTypeRemoved.get()) { // TODO: merge with the above if (hasWrongDC)
|
||||
TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId)
|
||||
.detail("Server", server->id)
|
||||
.detail("StoreType", "?");
|
||||
|
@ -3506,7 +3494,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
|
||||
failureTracker = storageServerFailureTracker(self, server, cx, &status, addedVersion);
|
||||
//We need to recruit new storage servers if the key value store type has changed
|
||||
if (hasWrongDC || toRemoveWrongStoreType) self->restartRecruiting.trigger();
|
||||
if (hasWrongDC || server->wrongStoreTypeRemoved.get()) self->restartRecruiting.trigger();
|
||||
|
||||
if (lastIsUnhealthy && !status.isUnhealthy() &&
|
||||
( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) {
|
||||
|
@ -3640,10 +3628,8 @@ ACTOR Future<Void> storageServerTracker(
|
|||
// self->traceTeamCollectionInfo();
|
||||
recordTeamCollectionInfo = true;
|
||||
//Restart the storeTracker for the new interface
|
||||
storeTracker = keyValueStoreTypeTracker(
|
||||
self, server); // hasWrongStoretype server will be delayed to be deleted.
|
||||
hasWrongDC = false;
|
||||
toRemoveWrongStoreType = false;
|
||||
storeTypeTracker = keyValueStoreTypeTracker(self, server);
|
||||
hasWrongDC = !inCorrectDC(self, server);
|
||||
self->restartTeamBuilder.trigger();
|
||||
|
||||
if(restartRecruiting)
|
||||
|
@ -3652,22 +3638,12 @@ ACTOR Future<Void> storageServerTracker(
|
|||
when( wait( otherChanges.empty() ? Never() : quorum( otherChanges, 1 ) ) ) {
|
||||
TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id);
|
||||
}
|
||||
when( KeyValueStoreType type = wait( storeTracker ) ) {
|
||||
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
|
||||
.detail("ServerID", server->id)
|
||||
.detail("StoreType", type.toString())
|
||||
.detail("DesiredType", self->configuration.storageServerStoreType.toString());
|
||||
TEST(true); //KeyValueStore type changed
|
||||
|
||||
storeTracker = Never();
|
||||
hasWrongDC = !inCorrectDC(self, server);
|
||||
}
|
||||
when(wait(server->wrongStoreTypeRemoved.onTrigger())) {
|
||||
when(wait(server->wrongStoreTypeRemoved.onChange())) {
|
||||
TraceEvent(SevWarn, "UndesiredStorageServerTriggered", self->distributorId)
|
||||
.detail("Server", server->id)
|
||||
.detail("StoreType", server->storeType)
|
||||
.detail("ConfigStoreType", self->configuration.storageServerStoreType);
|
||||
toRemoveWrongStoreType = true;
|
||||
.detail("ConfigStoreType", self->configuration.storageServerStoreType)
|
||||
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeRemoved.get());
|
||||
}
|
||||
when( wait( server->wakeUpTracker.getFuture() ) ) {
|
||||
server->wakeUpTracker = Promise<Void>();
|
||||
|
@ -3979,19 +3955,9 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
self->addActor.send(self->badTeamRemover);
|
||||
}
|
||||
|
||||
if (self->redundantMachineTeamRemover.isReady()) {
|
||||
self->redundantMachineTeamRemover = machineTeamRemover(self);
|
||||
self->addActor.send(self->redundantMachineTeamRemover);
|
||||
}
|
||||
if (self->redundantServerTeamRemover.isReady()) {
|
||||
self->redundantServerTeamRemover = serverTeamRemover(self);
|
||||
self->addActor.send(self->redundantServerTeamRemover);
|
||||
}
|
||||
|
||||
if (self->wrongStoreTypeRemover.isReady()) {
|
||||
self->wrongStoreTypeRemover = removeWrongStoreType(self);
|
||||
self->addActor.send(self->wrongStoreTypeRemover);
|
||||
}
|
||||
self->addActor.send(machineTeamRemover(self));
|
||||
self->addActor.send(serverTeamRemover(self));
|
||||
self->addActor.send(removeWrongStoreType(self));
|
||||
|
||||
self->traceTeamCollectionInfo();
|
||||
|
||||
|
|
Loading…
Reference in New Issue