StorageEngineSwitch:Remove unnecessary code and format code

Uncessary code include debug code and the unnecessary calling of
the removeWrongStoreType actor;

Format the changes with clang-format as well.
This commit is contained in:
Meng Xu 2019-08-16 16:46:54 -07:00
parent 0648388b25
commit b448f92d61
6 changed files with 83 additions and 101 deletions

View File

@ -552,7 +552,7 @@ OpenDatabaseRequest ClientData::getRequest() {
}
}
StringRef maxProtocol;
for(auto& it : ci.second.versions) {
for (auto& it : ci.second.versions) {
maxProtocol = std::max(maxProtocol, it.protocolVersion);
auto& entry = versionMap[it];
entry.count++;
@ -562,7 +562,7 @@ OpenDatabaseRequest ClientData::getRequest() {
}
auto& maxEntry = maxProtocolMap[maxProtocol];
maxEntry.count++;
if(maxEntry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
if (maxEntry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
maxEntry.examples.push_back(std::make_pair(ci.first, ci.second.traceLogGroup));
}
}

View File

@ -3080,9 +3080,9 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
++cx->transactionReadVersions;
flags |= options.getReadVersionFlags;
auto& batcher = cx->versionBatcher[ flags ];
auto& batcher = cx->versionBatcher[flags];
if (!batcher.actor.isValid()) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
batcher.actor = readVersionBatcher(cx.getPtr(), batcher.stream.getFuture(), flags);
}
if (!readVersion.isValid()) {
Promise<GetReadVersionReply> p;

View File

@ -62,7 +62,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
LocalityEntry localityEntry;
Promise<Void> updated;
AsyncVar<bool> wrongStoreTypeToRemove;
// A storage server's StoreType does not change.
// A storage server's StoreType does not change.
// To change storeType for an ip:port, we destroy the old one and create a new one.
KeyValueStoreType storeType; // Storage engine type
@ -664,16 +664,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> processingUnhealthy)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), configuration(configuration), readyToStart(readyToStart),
clearHealthyZoneFuture(true),
teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), configuration(configuration),
readyToStart(readyToStart), clearHealthyZoneFuture(true),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)),
initialFailureReactionDelay(
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
healthyTeamCount(0), storageServerSet(new LocalityMap<UID>()),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)),
optimalTeamCount(0), recruitingStream(0), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
doRemoveWrongStoreType(true),
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs),
doRemoveWrongStoreType(true), unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs),
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
processingUnhealthy(processingUnhealthy) {
if(!primary || configuration.usableRegions == 1) {
@ -2542,7 +2541,7 @@ bool inCorrectDC(DDTeamCollection* self, TCServerInfo* server) {
// Is there any healthy team whose members do not include serverID
bool existOtherHealthyTeams(DDTeamCollection* self, UID serverID) {
for (auto& team : self->teams) {
if (team->isHealthy() && std::count(team->serverIDs.begin(),team->serverIDs.end(), serverID) == 0) {
if (team->isHealthy() && std::count(team->serverIDs.begin(), team->serverIDs.end(), serverID) == 0) {
return true;
}
}
@ -2552,20 +2551,20 @@ bool existOtherHealthyTeams(DDTeamCollection* self, UID serverID) {
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self) {
// Wait for storage servers to initialize its storeType
wait( delay(SERVER_KNOBS->DD_REMOVE_STORE_ENGINE_DELAY) );
wait(delay(SERVER_KNOBS->DD_REMOVE_STORE_ENGINE_DELAY));
state bool foundSSToRemove = false;
state Reference<TCServerInfo> secondPreferedSSToRemove;
loop {
foundSSToRemove = false;
secondPreferedSSToRemove = Reference<TCServerInfo>();
foundSSToRemove = false;
secondPreferedSSToRemove = Reference<TCServerInfo>();
if (self->doRemoveWrongStoreType.get() == false) {
// Once the wrong storeType SS picked to be removed is removed, doRemoveWrongStoreType will be set to true;
// In case the SS fails in between, we should time out and check for the next SS.
wait(self->doRemoveWrongStoreType.onChange() || delay(SERVER_KNOBS->DD_REMOVE_STORE_ENGINE_TIMEOUT));
}
for (auto& server : self->server_info) {
if (!server.second->isCorrectStoreType(self->configuration.storageServerStoreType)) {
if (existOtherHealthyTeams(self, server.first)) {
@ -2575,32 +2574,32 @@ ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self) {
NetworkAddress a = server.second->lastKnownInterface.address();
AddressExclusion addr(a.ip, a.port);
TraceEvent("WrongStoreTypeRemover", self->distributorId)
.detail("Server", server.first)
.detail("Addr", addr.toString())
.detail("StoreType", server.second->storeType)
.detail("ConfiguredStoreType", self->configuration.storageServerStoreType);
.detail("Server", server.first)
.detail("Addr", addr.toString())
.detail("StoreType", server.second->storeType)
.detail("ConfiguredStoreType", self->configuration.storageServerStoreType);
break;
} else if (!secondPreferedSSToRemove.isValid()){
} else if (!secondPreferedSSToRemove.isValid()) {
secondPreferedSSToRemove = server.second;
}
}
}
if (!foundSSToRemove && secondPreferedSSToRemove.isValid()) {
// To ensure all wrong storeType SS to be removed, we have to face the fact that health team number will drop to 0;
// This may create more than one SS on a worker, which cause performance issue.
// In a correct operation configuration, this should not happen.
// To ensure all wrong storeType SS to be removed, we have to face the fact that health team number will
// drop to 0; This may create more than one SS on a worker, which cause performance issue. In a correct
// operation configuration, this should not happen.
secondPreferedSSToRemove->wrongStoreTypeToRemove.set(true);
foundSSToRemove = true;
NetworkAddress a = secondPreferedSSToRemove->lastKnownInterface.address();
AddressExclusion addr(a.ip, a.port);
TraceEvent(SevWarnAlways, "WrongStoreTypeRemover", self->distributorId)
.detail("Server", secondPreferedSSToRemove->id)
.detail("Addr", addr.toString())
.detail("StoreType", secondPreferedSSToRemove->storeType)
.detail("ConfiguredStoreType", self->configuration.storageServerStoreType);
.detail("Server", secondPreferedSSToRemove->id)
.detail("Addr", addr.toString())
.detail("StoreType", secondPreferedSSToRemove->storeType)
.detail("ConfiguredStoreType", self->configuration.storageServerStoreType);
}
self->doRemoveWrongStoreType.set(false);
if (!foundSSToRemove) {
break;
@ -3262,7 +3261,7 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
}
// Set the server's storeType
ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo* server) {
try {
// Update server's storeType, especially when it was created
state KeyValueStoreType type = wait(
@ -3385,10 +3384,6 @@ ACTOR Future<Void> storageServerFailureTracker(DDTeamCollection* self, TCServerI
self->healthyZone.set(Optional<Key>());
}
}
// if (status->isFailed) {
// self->restartRecruiting.trigger();
// self->server_status.set( interf.id(), *status ); // Update the global server status, so that storageRecruiter can use the updated info for recruiting
// }
TraceEvent("StatusMapChange", self->distributorId)
.detail("ServerID", interf.id())
@ -3416,7 +3411,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<Void> metricsTracker = serverMetricsPolling( server );
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<Void> storeTypeTracker = keyValueStoreTypeTracker( self, server );
state Future<Void> storeTypeTracker = keyValueStoreTypeTracker(self, server);
state bool hasWrongDC = !inCorrectDC(self, server);
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
@ -3490,7 +3485,7 @@ ACTOR Future<Void> storageServerTracker(
status.isUndesired = true;
status.isWrongConfiguration = true;
}
if (server->wrongStoreTypeToRemove.get()) { // TODO: merge with the above if (hasWrongDC)
if (server->wrongStoreTypeToRemove.get()) {
TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId)
.detail("Server", server->id)
.detail("StoreType", "?");
@ -3642,9 +3637,9 @@ ACTOR Future<Void> storageServerTracker(
interfaceChanged = server->onInterfaceChanged;
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to
// an invalid location ?
// What does this mean? Why does the old failureTracker has a pointer to an invalid location?
// MXQ: Will the status's isFailed and isUndesired field be reset at the beginning of loop?!!
// an invalid location ?
// Q: Why does the old failureTracker has a pointer to an invalid location?
// Q: Will the status's isFailed and isUndesired field be reset at the beginning of loop?!
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
// self->traceTeamCollectionInfo();
@ -3653,12 +3648,6 @@ ACTOR Future<Void> storageServerTracker(
storeTypeTracker = keyValueStoreTypeTracker(self, server);
hasWrongDC = !inCorrectDC(self, server);
self->restartTeamBuilder.trigger();
// TODO: remove this doRemoveWrongStoreType
self->doRemoveWrongStoreType.set(true);
if (self->wrongStoreTypeRemover.isReady()) {
self->wrongStoreTypeRemover = removeWrongStoreType(self);
self->addActor.send(self->wrongStoreTypeRemover);
}
if(restartRecruiting)
self->restartRecruiting.trigger();
@ -3671,7 +3660,7 @@ ACTOR Future<Void> storageServerTracker(
.detail("Server", server->id)
.detail("StoreType", server->storeType)
.detail("ConfigStoreType", self->configuration.storageServerStoreType)
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get());
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get());
}
when( wait( server->wakeUpTracker.getFuture() ) ) {
server->wakeUpTracker = Promise<Void>();
@ -3734,12 +3723,12 @@ int numExistingSSOnAddr(DDTeamCollection* self, const AddressExclusion& addr) {
ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply candidateWorker) {
// SOMEDAY: Cluster controller waits for availability, retry quickly if a server's Locality changes
self->recruitingStream.set(self->recruitingStream.get()+1);
self->recruitingStream.set(self->recruitingStream.get() + 1);
const NetworkAddress& netAddr = candidateWorker.worker.address();
AddressExclusion workerAddr(netAddr.ip, netAddr.port);
if (numExistingSSOnAddr(self,workerAddr) <= 2 &&
self->recruitingLocalities.find(candidateWorker.worker.address()) == self->recruitingLocalities.end()) {
AddressExclusion workerAddr(netAddr.ip, netAddr.port);
if (numExistingSSOnAddr(self, workerAddr) <= 2 &&
self->recruitingLocalities.find(candidateWorker.worker.address()) == self->recruitingLocalities.end()) {
// Only allow at most 2 storage servers on an address, because
// too many storage server on the same address (i.e., process) can cause OOM.
// Ask the candidateWorker to initialize a SS only if the worker does not have a pending request
@ -3751,38 +3740,41 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply
isr.interfaceId = interfaceId;
TraceEvent("DDRecruiting")
.detail("Primary", self->primary)
.detail("State", "Sending request to worker")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address())
.detail("RecruitingStream", self->recruitingStream.get());
.detail("Primary", self->primary)
.detail("State", "Sending request to worker")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address())
.detail("RecruitingStream", self->recruitingStream.get());
self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.address());
state ErrorOr<InitializeStorageReply> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr, TaskPriority::DataDistribution ) );
if(newServer.isError()) {
state ErrorOr<InitializeStorageReply> newServer =
wait(candidateWorker.worker.storage.tryGetReply(isr, TaskPriority::DataDistribution));
if (newServer.isError()) {
TraceEvent(SevWarn, "DDRecruitmentError").error(newServer.getError());
if( !newServer.isError( error_code_recruitment_failed ) && !newServer.isError( error_code_request_maybe_delivered ) )
if (!newServer.isError(error_code_recruitment_failed) &&
!newServer.isError(error_code_request_maybe_delivered))
throw newServer.getError();
wait( delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution) );
wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution));
}
self->recruitingIds.erase(interfaceId);
self->recruitingLocalities.erase(candidateWorker.worker.address());
TraceEvent("DDRecruiting")
.detail("Primary", self->primary)
.detail("State", "Finished request")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address())
.detail("RecruitingStream", self->recruitingStream.get());
.detail("Primary", self->primary)
.detail("State", "Finished request")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address())
.detail("RecruitingStream", self->recruitingStream.get());
if( newServer.present() ) {
if( !self->server_info.count( newServer.get().interf.id() ) )
self->addServer( newServer.get().interf, candidateWorker.processClass, self->serverTrackerErrorOut, newServer.get().addedVersion );
if (newServer.present()) {
if (!self->server_info.count(newServer.get().interf.id()))
self->addServer(newServer.get().interf, candidateWorker.processClass, self->serverTrackerErrorOut,
newServer.get().addedVersion);
else
TraceEvent(SevWarn, "DDRecruitmentError").detail("Reason", "Server ID already recruited");
@ -3790,7 +3782,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, RecruitStorageReply
}
}
self->recruitingStream.set(self->recruitingStream.get()-1);
self->recruitingStream.set(self->recruitingStream.get() - 1);
self->restartRecruiting.trigger();
return Void();
@ -3893,7 +3885,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
.detail("State", "Restart recruiting");
}
}
wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) ); //Q: What if restartRecruiting is trigger while recruiter is waiting on the delay?
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} catch( Error &e ) {
if(e.code() != error_code_timed_out) {
TraceEvent("StorageRecruiterMXExit", self->distributorId)

View File

@ -196,8 +196,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( TR_REMOVE_SERVER_TEAM_DELAY, 60.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_DELAY = deterministicRandom()->random01() * 60.0;
init( TR_REMOVE_SERVER_TEAM_EXTRA_DELAY, 5.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_EXTRA_DELAY = deterministicRandom()->random01() * 10.0;
init( DD_REMOVE_STORE_ENGINE_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_TIMEOUT = deterministicRandom()->random01() * 60.0;
init( DD_REMOVE_STORE_ENGINE_DELAY, 60.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0;
init( DD_REMOVE_STORE_ENGINE_TIMEOUT, 120.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_TIMEOUT = deterministicRandom()->random01() * 120.0;
init( DD_REMOVE_STORE_ENGINE_DELAY, 60.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0;
// Redwood Storage Engine
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 );

View File

@ -260,8 +260,8 @@ ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> s
// keyServer: map from keys to destination servers
// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have),
// complete(already has), ""() MXQ: What does serverKeys[dest][keys] mean? It seems having the same meaning with
// serverKeys[servers][keys]?
// complete(already has), ""()
// MXQ: What does serverKeys[dest][keys] mean? It seems having the same meaning with serverKeys[servers][keys]? (I think so.)
// Set keyServers[keys].dest = servers
// Set serverKeys[servers][keys] = active for each subrange of keys that the server did not already have, complete for each subrange that it already has
@ -313,9 +313,9 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
for(int s=0; s<serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// MXQ: Maybe a bug exists somewhere, causing this to happen
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
TEST(true); //start move keys moving to a removed server
throw move_to_removed_server();
}
@ -329,9 +329,9 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
state Key endKey = old.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
TraceEvent("StartMoveKeysBatch", relocationIntervalId)
.detail("KeyBegin", currentKeys.begin.toString())
.detail("KeyEnd", currentKeys.end.toString());
// TraceEvent("StartMoveKeysBatch", relocationIntervalId)
// .detail("KeyBegin", currentKeys.begin.toString())
// .detail("KeyEnd", currentKeys.end.toString());
// printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(),
// keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; i<old.size(); i++)
@ -347,12 +347,12 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
vector<UID> dest;
decodeKeyServersValue( old[i].value, src, dest );
TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
.detail("KeyBegin", rangeIntersectKeys.begin.toString())
.detail("KeyEnd", rangeIntersectKeys.end.toString())
.detail("OldSrc", describe(src))
.detail("OldDest", describe(dest))
.detail("ReadVersion", tr.getReadVersion().get());
// TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
// .detail("KeyBegin", rangeIntersectKeys.begin.toString())
// .detail("KeyEnd", rangeIntersectKeys.end.toString())
// .detail("OldSrc", describe(src))
// .detail("OldDest", describe(dest))
// .detail("ReadVersion", tr.getReadVersion().get());
for(auto& uid : addAsSource[i]) {
src.push_back(uid);
@ -365,13 +365,13 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
//Track old destination servers. They may be removed from serverKeys soon, since they are about to be overwritten in keyServers
for(auto s = dest.begin(); s != dest.end(); ++s) {
oldDests.insert(*s);
TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
// TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
}
//Keep track of src shards so that we can preserve their values when we overwrite serverKeys
for(auto& uid : src) {
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
// TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
}
}
@ -835,16 +835,6 @@ ACTOR Future<bool> canRemoveStorageServer( Transaction* tr, UID serverID ) {
ASSERT(false);
}
// DEBUG purpose
// if (!(keys[0].value == serverKeysFalse && keys[1].key == allKeys.end)) {
// Standalone<RangeResultRef> allKeys =
// wait(krmGetRanges(tr, serverKeysPrefixFor(serverID), allKeys, CLIENT_KNOBS->TOO_MANY));
// TraceEvent("CanNOTRemove").detail("KeysNum", allKeys.size());
// for (auto& k : allKeys) {
// TraceEvent("CanNOTRemove").detail("Key", k.key).detail("Value", k.value);
// }
// }
//Return true if the entire range is false. Since these values are coalesced, we can return false if there is more than one result
return keys[0].value == serverKeysFalse && keys[1].key == allKeys.end;
}

View File

@ -350,10 +350,10 @@ struct StorageServerMetrics {
if (sb.free < 1e9 && deterministicRandom()->random01() < 0.1) {
TraceEvent(SevWarn, "PhysicalDiskMetrics")
.detail("Free", sb.free)
.detail("Total", sb.total)
.detail("Available", sb.available)
.detail("Load", rep.load.bytes);
.detail("Free", sb.free)
.detail("Total", sb.total)
.detail("Available", sb.available)
.detail("Load", rep.load.bytes);
}
rep.free.bytes = sb.free;