Merge pull request #907 from etschannen/release-6.0

Bug fixes related to changing region configurations
This commit is contained in:
Alex Miller 2018-11-12 18:43:22 -08:00 committed by GitHub
commit 5b149376be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 223 additions and 131 deletions

View File

@ -1610,12 +1610,17 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGIONS_CHANGED:
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
@ -1714,15 +1719,20 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGIONS_CHANGED:
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
printf("Type `fileconfigure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;

View File

@ -311,7 +311,18 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
return ConfigurationResult::INVALID_CONFIGURATION;
}
if(oldConfig.usableRegions==1 && newConfig.usableRegions==2) {
if(oldConfig.usableRegions != newConfig.usableRegions) {
//cannot change region configuration
std::map<Key,int32_t> dcId_priority;
for(auto& it : newConfig.regions) {
dcId_priority[it.dcId] = it.priority;
}
for(auto& it : oldConfig.regions) {
if(!dcId_priority.count(it.dcId) || dcId_priority[it.dcId] != it.priority) {
return ConfigurationResult::REGIONS_CHANGED;
}
}
//must only have one region with priority >= 0
int activeRegionCount = 0;
for(auto& it : newConfig.regions) {

View File

@ -53,6 +53,7 @@ public:
STORAGE_IN_UNKNOWN_DCID,
REGION_NOT_FULLY_REPLICATED,
MULTIPLE_ACTIVE_REGIONS,
REGIONS_CHANGED,
SUCCESS
};
};

View File

@ -258,6 +258,29 @@ int decodeDatacenterReplicasValue( ValueRef const& value ) {
return s;
}
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
extern const KeyRangeRef tLogDatacentersKeys;
extern const KeyRef tLogDatacentersPrefix;
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
const KeyRangeRef tLogDatacentersKeys(
LiteralStringRef("\xff\x02/tLogDatacenters/"),
LiteralStringRef("\xff\x02/tLogDatacenters0") );
const KeyRef tLogDatacentersPrefix = tLogDatacentersKeys.begin;
const Key tLogDatacentersKeyFor( Optional<Value> dcID ) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr.serializeBytes( tLogDatacentersKeys.begin );
wr << dcID;
return wr.toStringRef();
}
Optional<Value> decodeTLogDatacentersKey( KeyRef const& key ) {
Optional<Value> dcID;
BinaryReader rd( key.removePrefix(tLogDatacentersKeys.begin), AssumeVersion(currentProtocolVersion) );
rd >> dcID;
return dcID;
}
const KeyRef primaryDatacenterKey = LiteralStringRef("\xff/primaryDatacenter");
// serverListKeys.contains(k) iff k.startsWith( serverListKeys.begin ) because '/'+1 == '0'

View File

@ -78,7 +78,7 @@ const Value tagLocalityListValue( int8_t const& );
Optional<Value> decodeTagLocalityListKey( KeyRef const& );
int8_t decodeTagLocalityListValue( ValueRef const& );
// "\xff\x02/DatacenterReplicas/[[datacenterID]]" := "[[replicas]]"
// "\xff\x02/datacenterReplicas/[[datacenterID]]" := "[[replicas]]"
extern const KeyRangeRef datacenterReplicasKeys;
extern const KeyRef datacenterReplicasPrefix;
const Key datacenterReplicasKeyFor( Optional<Value> dcID );
@ -86,6 +86,12 @@ const Value datacenterReplicasValue( int const& );
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& );
int decodeDatacenterReplicasValue( ValueRef const& );
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
extern const KeyRangeRef tLogDatacentersKeys;
extern const KeyRef tLogDatacentersPrefix;
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
Optional<Value> decodeTLogDatacentersKey( KeyRef const& );
extern const KeyRef primaryDatacenterKey;
// "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"

View File

@ -50,6 +50,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
Promise<Void> wakeUpTracker;
bool inDesiredDC;
LocalityEntry localityEntry;
Promise<Void> updated;
TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, Reference<LocalitySet> storageServerSet) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC) {
localityEntry = ((LocalityMap<UID>*) storageServerSet.getPtr())->add(ssi.locality, &id);
@ -68,6 +69,9 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
when( ErrorOr<GetPhysicalMetricsReply> rep = wait( metricsRequest ) ) {
if( rep.present() ) {
server->serverMetrics = rep;
if(server->updated.canBeSet()) {
server->updated.send(Void());
}
return Void();
}
metricsRequest = Never();
@ -1466,43 +1470,42 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority();
int maxOtherPriority = maxPriority;
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) {
if( teams[t].servers.size() ) {
bool foundServer = false;
for(auto tc : self->teamCollections) {
if( tc->server_info.count( teams[t].servers[0] ) ) {
ASSERT(!foundServer);
foundServer = true;
auto& info = tc->server_info[teams[t].servers[0]];
for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
if( !t.servers.size() ) {
maxPriority = PRIORITY_TEAM_0_LEFT;
break;
}
auto tc = self->teamCollections[t.primary ? 0 : 1];
ASSERT(tc->primary == t.primary);
if( tc->server_info.count( t.servers[0] ) ) {
auto& info = tc->server_info[t.servers[0]];
bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
if(teams[t].primary == self->primary) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
} else {
maxOtherPriority = std::max( maxOtherPriority, info->teams[i]->getPriority() );
}
for( int k = 0; k < info->teams.size(); k++ ) {
if( info->teams[k]->serverIDs == t.servers ) {
maxPriority = std::max( maxPriority, info->teams[k]->getPriority() );
found = true;
break;
}
}
TEST(!found); // A removed team is still associated with a shard in SABTF
//If we cannot find the team, it could be a bad team so assume unhealthy priority
if(!found) {
maxPriority = std::max<int>( maxPriority, PRIORITY_TEAM_UNHEALTHY );
}
}
TEST(!foundServer); // A removed server is still associated with a team in SABTF
} else {
TEST(true); // A removed server is still associated with a team in SABTF
}
}
}
if( maxPriority == team->getPriority() || lastPriority > maxPriority ) {
RelocateShard rs;
rs.keys = shards[i];
rs.priority = std::max(maxPriority,maxOtherPriority);
rs.priority = maxPriority;
self->output.send(rs);
if(g_random->random01() < 0.01) {
@ -1514,10 +1517,6 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
.detail("TeamFailedMachines", team->getServerIDs().size()-serversLeft)
.detail("TeamOKMachines", serversLeft);
}
} else {
TraceEvent("RelocationNotSentToDDQ", self->masterId)
.detail("Team", team->getDesc());
}
}
} else {
if(logTeamEvents) {
@ -1706,7 +1705,7 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
}
}
Void _ = wait(self->addSubsetComplete.getFuture());
TraceEvent("DDRemovingBadTeams", self->masterId);
TraceEvent("DDRemovingBadTeams", self->masterId).detail("Primary", self->primary);
for(auto it : self->badTeams) {
it->tracker.cancel();
}
@ -1879,6 +1878,10 @@ ACTOR Future<Void> storageServerTracker(
changes.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
}
if(server->updated.canBeSet()) {
server->updated.send(Void());
}
// Remove server from FF/serverList
Void _ = wait( removeStorageServer( cx, server->id, lock ) );
@ -2101,7 +2104,13 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
}
ACTOR Future<Void> updateReplicasKey(DDTeamCollection* self, Optional<Key> dcId) {
Void _ = wait(self->initialFailureReactionDelay);
std::vector<Future<Void>> serverUpdates;
for(auto& it : self->server_info) {
serverUpdates.push_back(it.second->updated.getFuture());
}
Void _ = wait(self->initialFailureReactionDelay && waitForAll(serverUpdates));
loop {
while(self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
TraceEvent("DDUpdatingStalled", self->masterId).detail("DcId", printable(dcId)).detail("ZeroHealthy", self->zeroHealthyTeams->get()).detail("ProcessingUnhealthy", self->processingUnhealthy->get());
@ -2145,7 +2154,6 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
// Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection,
std::vector<DDTeamCollection*> teamCollections,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db)
@ -2155,8 +2163,6 @@ ACTOR Future<Void> dataDistributionTeamCollection(
state PromiseStream<Void> serverRemoved;
state Future<Void> error = actorCollection( self->addActor.getFuture() );
self->teamCollections = teamCollections;
try {
Void _ = wait( DDTeamCollection::init( self, initData ) );
initData = Reference<InitialDataDistribution>();
@ -2166,19 +2172,21 @@ ACTOR Future<Void> dataDistributionTeamCollection(
Void _ = wait( self->readyToStart || error );
TraceEvent("DDTeamCollectionReadyToStart", self->masterId).detail("Primary", self->primary);
self->addActor.send(storageRecruiter( self, db ));
self->addActor.send(monitorStorageServerRecruitment( self ));
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
self->addActor.send(trackExcludedServers( self ));
if(self->badTeamRemover.isReady()) {
self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover);
}
if(self->includedDCs.size()) {
//start this actor before any potential recruitments can happen
self->addActor.send(updateReplicasKey(self, self->includedDCs[0]));
}
self->addActor.send(storageRecruiter( self, db ));
self->addActor.send(monitorStorageServerRecruitment( self ));
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
self->addActor.send(trackExcludedServers( self ));
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose {
@ -2415,6 +2423,11 @@ ACTOR Future<Void> dataDistribution(
if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
}
if(g_network->isSimulated()) {
TraceEvent("DDInitShard").detail("Keys", printable(keys)).detail("PrimarySrc", describe(initData->shards[shard].primarySrc)).detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest)).detail("RemoteDest", describe(initData->shards[shard].remoteDest));
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
@ -2452,21 +2465,17 @@ ACTOR Future<Void> dataDistribution(
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
vector<Reference<DDTeamCollection>> teamCollections;
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ) );
if (configuration.usableRegions > 1) {
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) ) );
}
vector<DDTeamCollection*> teamCollectionsPtrs;
for(auto it : teamCollections) {
teamCollectionsPtrs.push_back(it.getPtr());
}
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[0], teamCollectionsPtrs, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[1], teamCollectionsPtrs, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) );
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
}
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
actors.push_back(yieldPromiseStream(output.getFuture(), input));
Void _ = wait( waitForAll( actors ) );

View File

@ -154,9 +154,14 @@ public:
int getNumberOfShards( UID ssID );
vector<KeyRange> getShardsFor( Team team );
vector<Team> getTeamsFor( KeyRangeRef keys );
//The first element of the pair is either the source for non-moving shards or the destination team for in-flight shards
//The second element of the pair is all previous sources for in-flight shards
std::pair<vector<Team>,vector<Team>> getTeamsFor( KeyRangeRef keys );
void defineShard( KeyRangeRef keys );
void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam );
void finishMove( KeyRangeRef keys );
void check();
private:
struct OrderByTeamKey {
@ -167,7 +172,7 @@ private:
}
};
KeyRangeMap< vector<Team> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge
KeyRangeMap< std::pair<vector<Team>,vector<Team>> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge, or when usable_regions > 1
std::set< std::pair<Team,KeyRange>, OrderByTeamKey > team_shards;
std::map< UID, int > storageServerShards;

View File

@ -1030,6 +1030,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
}
self->bytesWritten += metrics.bytes;
self->shardsAffectedByTeamFailure->finishMove(rd.keys);
relocationComplete.send( rd );
return Void();
} else {

View File

@ -701,7 +701,7 @@ int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
return storageServerShards[ssID];
}
vector<ShardsAffectedByTeamFailure::Team> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
std::pair<vector<ShardsAffectedByTeamFailure::Team>,vector<ShardsAffectedByTeamFailure::Team>> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
return shard_teams[keys.begin];
}
@ -720,14 +720,20 @@ void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
}
void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
std::set<Team> teams;
std::vector<Team> teams;
std::vector<Team> prevTeams;
auto rs = shard_teams.intersectingRanges(keys);
for(auto it = rs.begin(); it != rs.end(); ++it) {
for(auto t=it->value().begin(); t!=it->value().end(); ++t) {
teams.insert( *t );
for(auto t=it->value().first.begin(); t!=it->value().first.end(); ++t) {
teams.push_back( *t );
erase(*t, it->range());
}
for(auto t=it->value().second.begin(); t!=it->value().second.end(); ++t) {
prevTeams.push_back( *t );
}
}
uniquify(teams);
uniquify(prevTeams);
/*TraceEvent("ShardsAffectedByTeamFailureDefine")
.detail("KeyBegin", printable(keys.begin))
@ -735,12 +741,12 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
.detail("TeamCount", teams.size()); */
auto affectedRanges = shard_teams.getAffectedRangesAfterInsertion(keys);
shard_teams.insert( keys, vector<Team>(teams.begin(), teams.end()) );
shard_teams.insert( keys, std::make_pair(teams, prevTeams) );
for(auto r=affectedRanges.begin(); r != affectedRanges.end(); ++r) {
auto& teams = shard_teams[r->begin];
for(auto t=teams.begin(); t!=teams.end(); ++t) {
insert(*t, *r);
auto& t = shard_teams[r->begin];
for(auto it=t.first.begin(); it!=t.first.end(); ++it) {
insert(*it, *r);
}
}
check();
@ -754,33 +760,39 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
.detail("NewTeam", describe(destinationTeam));*/
auto ranges = shard_teams.intersectingRanges( keys );
std::vector< std::pair<std::vector<Team>,KeyRange> > modifiedShards;
std::vector< std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange> > modifiedShards;
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
if( keys.contains( it->range() ) ) {
// erase the many teams that were associated with this one shard
for(auto t = it->value().begin(); t != it->value().end(); ++t) {
for(auto t = it->value().first.begin(); t != it->value().first.end(); ++t) {
erase(*t, it->range());
}
// save this modification for later insertion
modifiedShards.push_back( std::pair<std::vector<Team>,KeyRange>( destinationTeams, it->range() ) );
std::vector<Team> prevTeams = it->value().second;
prevTeams.insert(prevTeams.end(), it->value().first.begin(), it->value().first.end());
uniquify(prevTeams);
modifiedShards.push_back( std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange>( std::make_pair(destinationTeams, prevTeams), it->range() ) );
} else {
// for each range that touches this move, add our team as affecting this range
for(auto& team : destinationTeams) {
insert(team, it->range());
}
// if we are not in the list of teams associated with this shard, add us in
auto& teams = it->value();
if( std::find( teams.begin(), teams.end(), team ) == teams.end() ) {
teams.push_back( team );
}
}
teams.second.insert(teams.second.end(), teams.first.begin(), teams.first.end());
uniquify(teams.second);
teams.first.insert(teams.first.end(), destinationTeams.begin(), destinationTeams.end());
uniquify(teams.first);
}
}
// we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now
for( int i = 0; i < modifiedShards.size(); i++ ) {
for( auto& t : modifiedShards[i].first) {
for( auto& t : modifiedShards[i].first.first) {
insert(t, modifiedShards[i].second);
}
shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first );
@ -789,19 +801,26 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
check();
}
void ShardsAffectedByTeamFailure::finishMove( KeyRangeRef keys ) {
auto ranges = shard_teams.containedRanges(keys);
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
it.value().second.clear();
}
}
void ShardsAffectedByTeamFailure::check() {
if (EXPENSIVE_VALIDATION) {
for(auto t = team_shards.begin(); t != team_shards.end(); ++t) {
auto i = shard_teams.rangeContaining(t->second.begin);
if (i->range() != t->second ||
!std::count(i->value().begin(), i->value().end(), t->first))
!std::count(i->value().first.begin(), i->value().first.end(), t->first))
{
ASSERT(false);
}
}
auto rs = shard_teams.ranges();
for(auto i = rs.begin(); i != rs.end(); ++i)
for(vector<Team>::iterator t = i->value().begin(); t != i->value().end(); ++t)
for(vector<Team>::iterator t = i->value().first.begin(); t != i->value().first.end(); ++t)
if (!team_shards.count( std::make_pair( *t, i->range() ) )) {
std::string teamDesc, shards;
for(int k=0; k<t->servers.size(); k++)

View File

@ -813,8 +813,9 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
state Future<Standalone<RangeResultRef>> fTags = tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY );
state Future<Standalone<RangeResultRef>> fHistoryTags = tr.getRange( serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY );
state Future<Standalone<RangeResultRef>> fTagLocalities = tr.getRange( tagLocalityListKeys, CLIENT_KNOBS->TOO_MANY );
state Future<Standalone<RangeResultRef>> fTLogDatacenters = tr.getRange( tLogDatacentersKeys, CLIENT_KNOBS->TOO_MANY );
Void _ = wait( success(fListKey) && success(fTags) && success(fHistoryTags) && success(fTagLocalities) );
Void _ = wait( success(fListKey) && success(fTags) && success(fHistoryTags) && success(fTagLocalities) && success(fTLogDatacenters) );
if (!fListKey.get().present()) {
if (retry) {
@ -840,6 +841,14 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
allLocalities.insert(t.locality);
}
std::map<Optional<Value>,int8_t> dcId_locality;
for(auto& kv : fTagLocalities.get()) {
dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
}
for(auto& it : fTLogDatacenters.get()) {
allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]);
}
if(locality >= 0 && !allLocalities.count(locality) ) {
for(auto& it : fTagLocalities.get()) {
if( locality == decodeTagLocalityListValue(it.value) ) {

View File

@ -417,7 +417,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return minVersionWhenReady( waitForAll(quorumResults), allReplies);
}
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore ) {
int bestSet = 0;
std::vector<Reference<LogSet>> localSets;
Version lastBegin = 0;
@ -458,12 +458,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
break;
}
TraceEvent("TLogPeekAllDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
if(throwIfDead) {
throw worker_removed();
} else {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
}
int bestOldSet = 0;
std::vector<Reference<LogSet>> localOldSets;
@ -489,12 +485,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
i++;
continue;
}
if(throwIfDead) {
throw worker_removed();
} else {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
}
if(thisSpecial) {
foundSpecial = true;
}
@ -587,7 +579,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(tag.locality == tagLocalityRemoteLog) {
return peekRemote(dbgid, begin, tag, parallelGetMore);
} else {
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore, false);
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore);
}
}
@ -630,7 +622,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(bestSet == -1) {
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LogCount", logCount);
if(useMergePeekCursors || logCount > 1) {
throw worker_removed();
} else {
@ -672,35 +664,35 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int bestOldSet = -1;
logCount = 0;
bool nextFoundSpecial = false;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->locality != tagLocalitySatellite) {
logCount++;
}
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded)) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
nextFoundSpecial = true;
}
if(foundSpecial && !oldLogData[i].tLogs[t]->isLocal) {
TraceEvent("TLogPeekLocalRemoteBeforeSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i);
throw worker_removed();
}
bestOldSet = t;
break;
}
}
if(foundSpecial) {
TraceEvent("TLogPeekLocalFoundSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
cursors.push_back(peekAll(dbgid, begin, std::min(lastBegin, end), tag, useMergePeekCursors, !useMergePeekCursors));
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
break;
}
if(bestOldSet == -1) {
if(oldLogData[i].logRouterTags == 0 || logCount > 1) {
TraceEvent("TLogPeekLocalNoLogRouterTags", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount);
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount).detail("FoundSpecial", foundSpecial);
if(oldLogData[i].logRouterTags == 0 || logCount > 1 || foundSpecial) {
throw worker_removed();
}
i++;
continue;
}
foundSpecial = nextFoundSpecial;
Version thisBegin = std::max(oldLogData[i].tLogs[bestOldSet]->startVersion, begin);
if(thisBegin < lastBegin) {
if(thisBegin < end) {
@ -723,7 +715,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version end = getEnd();
TraceEvent("TLogPeekSpecial", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
if(localEnd == invalidVersion || localEnd <= begin) {
return peekAll(dbgid, begin, end, tag, true, false);
return peekAll(dbgid, begin, end, tag, true);
}
try {
@ -736,13 +728,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors.resize(2);
cursors[1] = peekLocal(dbgid, tag, begin, localEnd, true, peekLocality);
cursors[0] = peekAll(dbgid, localEnd, end, tag, true, false);
cursors[0] = peekAll(dbgid, localEnd, end, tag, true);
epochEnds.push_back(LogMessageVersion(localEnd));
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
} catch( Error& e ) {
if(e.code() == error_code_worker_removed) {
return peekAll(dbgid, begin, end, tag, true, false);
return peekAll(dbgid, begin, end, tag, true);
}
throw;
}

View File

@ -787,6 +787,8 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a second,
// a provisional master is initialized, and an "emergency transaction" is submitted that might change the configuration so that we can
// finish recovery.
state std::map<Optional<Value>,int8_t> originalLocalityMap = self->dcId_locality;
state Future<vector<Standalone<CommitTransactionRef>>> recruitments = recruitEverything( self, seedServers, oldLogSystem );
loop {
state Future<Standalone<CommitTransactionRef>> provisional = provisionalMaster(self, delay(1.0));
@ -818,6 +820,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
}
if(self->configuration != oldConf) { //confChange does not trigger when including servers
self->dcId_locality = originalLocalityMap;
recruitments = recruitEverything( self, seedServers, oldLogSystem );
}
}
@ -1267,6 +1270,14 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
tr.set(recoveryCommitRequest.arena, primaryDatacenterKey, self->myInterface.locality.dcId().present() ? self->myInterface.locality.dcId().get() : StringRef());
tr.clear(recoveryCommitRequest.arena, tLogDatacentersKeys);
for(auto& dc : self->primaryDcId) {
tr.set(recoveryCommitRequest.arena, tLogDatacentersKeyFor(dc), StringRef());
}
for(auto& dc : self->remoteDcIds) {
tr.set(recoveryCommitRequest.arena, tLogDatacentersKeyFor(dc), StringRef());
}
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore, NULL, NULL);
mmApplied = tr.mutations.size();

View File

@ -29,13 +29,16 @@
static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory" };
static const char* redundancies[] = { "single", "double", "triple" };
std::string generateRegions(int& regions) {
std::string generateRegions() {
std::string result;
if(g_simulator.physicalDatacenters == 1 || (g_simulator.physicalDatacenters == 2 && g_random->random01() < 0.25) || g_simulator.physicalDatacenters == 3) {
regions = 1;
return " usable_regions=1 regions=\"\"";
}
if(g_random->random01() < 0.25) {
return format(" usable_regions=%d", g_random->randomInt(1,3));
}
int primaryPriority = 1;
int remotePriority = -1;
double priorityType = g_random->random01();
@ -180,16 +183,9 @@ std::string generateRegions(int& regions) {
if(g_random->random01() < 0.8) {
regionArr.push_back(remoteObj);
if(g_random->random01() < 0.8) {
regions = 2;
result += " usable_regions=2";
} else {
regions = 1;
result += " usable_regions=1";
if(g_random->random01() < 0.25) {
result += format(" usable_regions=%d", g_random->randomInt(1,3));
}
} else {
regions = 1;
result += " usable_regions=1";
}
result += " regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none);
@ -327,8 +323,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
config = "three_data_hall ";
}
state int regions = 1;
config += generateRegions(regions);
config += generateRegions();
if (g_random->random01() < 0.5) config += " logs=" + format("%d", randomRoleNumber());
if (g_random->random01() < 0.5) config += " proxies=" + format("%d", randomRoleNumber());