Merge branch 'release-6.0'
# Conflicts: # fdbserver/DataDistribution.actor.cpp # fdbserver/MoveKeys.actor.cpp
This commit is contained in:
commit
4e54690005
|
@ -58,14 +58,14 @@ class Result:
|
|||
self.key_tuple = subspace.unpack(key)
|
||||
self.values = values
|
||||
|
||||
def key(self, specification):
|
||||
return self.key_tuple[specification.key_start_index:]
|
||||
|
||||
def matches_key(self, rhs, specification):
|
||||
if not isinstance(rhs, Result):
|
||||
return False
|
||||
|
||||
left_key = self.key_tuple[specification.key_start_index:]
|
||||
right_key = rhs.key_tuple[specification.key_start_index:]
|
||||
|
||||
return left_key == right_key
|
||||
return self.key(specification) == rhs.key(specification)
|
||||
|
||||
def matches(self, rhs, specification):
|
||||
if not self.matches_key(rhs, specification):
|
||||
|
|
|
@ -75,33 +75,46 @@ class ResultSet(object):
|
|||
util.get_logger().info('Comparing results from \'%s\'...' % repr(util.subspace_to_tuple(self.specification.subspace)))
|
||||
|
||||
num_errors = 0
|
||||
has_filtered_error = False
|
||||
|
||||
# Tracks the current result being evaluated for each tester
|
||||
indices = [0 for i in range(len(self.tester_results))]
|
||||
|
||||
name_length = max([len(name) for name in self.tester_results.keys()])
|
||||
|
||||
has_filtered_error = False
|
||||
|
||||
while True:
|
||||
# Gets the next result for each tester
|
||||
results = {i: r[indices[i]] for i, r in enumerate(self.tester_results.values()) if len(r) > indices[i]}
|
||||
if len(results) == 0:
|
||||
break
|
||||
|
||||
# Attempt to 'align' the results. If two results have matching sequence numbers, then they should be compared.
|
||||
# Only those testers which have a result matching the minimum current sequence number will be included. All
|
||||
# others are considered to have not produced a result and will be evaluated in a future iteration.
|
||||
sequence_nums = [r.sequence_num(self.specification) for r in results.values()]
|
||||
if any([s is not None for s in sequence_nums]):
|
||||
results = {i: r for i, r in results.items() if r.sequence_num(self.specification) == min(sequence_nums)}
|
||||
else:
|
||||
results = {i: r for i, r in results.items() if r.matches_key(min(results.values()), self.specification)}
|
||||
|
||||
# If these results aren't using sequence numbers, then we match two results based on whether they share the same key
|
||||
else:
|
||||
min_key = min([r.key(self.specification) for r in results.values()])
|
||||
results = {i: r for i, r in results.items() if r.key(self.specification) == min_key}
|
||||
|
||||
# Increment the indices for those testers which produced a result in this iteration
|
||||
for i in results.keys():
|
||||
indices[i] += 1
|
||||
|
||||
# Fill in 'None' values for testers that didn't produce a result and generate an output string describing the results
|
||||
all_results = {i: results[i] if i in results else None for i in range(len(self.tester_results))}
|
||||
result_str = '\n'.join([' %-*s - %s' % (name_length, self.tester_results.keys()[i], r) for i, r in all_results.items()])
|
||||
|
||||
result_list = results.values()
|
||||
|
||||
# If any of our results matches the global error filter, we ignore the result
|
||||
if any(r.matches_global_error_filter(self.specification) for r in result_list):
|
||||
has_filtered_error = True
|
||||
|
||||
# The result is considered correct if every tester produced a value and all the values meet the matching criteria
|
||||
if len(results) < len(all_results) or not all(result_list[0].matches(r, self.specification) for r in result_list):
|
||||
util.get_logger().error('\nIncorrect result: \n%s' % result_str)
|
||||
num_errors += 1
|
||||
|
|
|
@ -1604,12 +1604,17 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
|
||||
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
|
||||
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
|
||||
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::REGIONS_CHANGED:
|
||||
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
|
@ -1708,15 +1713,20 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
|
|||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
|
||||
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
|
||||
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
|
||||
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::REGIONS_CHANGED:
|
||||
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
printf("Type `fileconfigure FORCE <TOKEN>*' to configure without this check\n");
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::SUCCESS:
|
||||
printf("Configuration changed\n");
|
||||
ret=false;
|
||||
|
|
|
@ -24,7 +24,13 @@
|
|||
#include "fdbclient/Knobs.h"
|
||||
|
||||
namespace HTTP {
|
||||
typedef std::map<std::string, std::string> Headers;
|
||||
struct is_iless {
|
||||
bool operator() (const std::string &a, const std::string &b) const {
|
||||
return strcasecmp(a.c_str(), b.c_str()) < 0;
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::map<std::string, std::string, is_iless> Headers;
|
||||
|
||||
std::string urlEncode(const std::string &s);
|
||||
|
||||
|
|
|
@ -312,7 +312,18 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
|
|||
return ConfigurationResult::INVALID_CONFIGURATION;
|
||||
}
|
||||
|
||||
if(oldConfig.usableRegions==1 && newConfig.usableRegions==2) {
|
||||
if(oldConfig.usableRegions != newConfig.usableRegions) {
|
||||
//cannot change region configuration
|
||||
std::map<Key,int32_t> dcId_priority;
|
||||
for(auto& it : newConfig.regions) {
|
||||
dcId_priority[it.dcId] = it.priority;
|
||||
}
|
||||
for(auto& it : oldConfig.regions) {
|
||||
if(!dcId_priority.count(it.dcId) || dcId_priority[it.dcId] != it.priority) {
|
||||
return ConfigurationResult::REGIONS_CHANGED;
|
||||
}
|
||||
}
|
||||
|
||||
//must only have one region with priority >= 0
|
||||
int activeRegionCount = 0;
|
||||
for(auto& it : newConfig.regions) {
|
||||
|
|
|
@ -53,6 +53,7 @@ public:
|
|||
STORAGE_IN_UNKNOWN_DCID,
|
||||
REGION_NOT_FULLY_REPLICATED,
|
||||
MULTIPLE_ACTIVE_REGIONS,
|
||||
REGIONS_CHANGED,
|
||||
SUCCESS
|
||||
};
|
||||
};
|
||||
|
|
|
@ -258,6 +258,29 @@ int decodeDatacenterReplicasValue( ValueRef const& value ) {
|
|||
return s;
|
||||
}
|
||||
|
||||
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
|
||||
extern const KeyRangeRef tLogDatacentersKeys;
|
||||
extern const KeyRef tLogDatacentersPrefix;
|
||||
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
|
||||
|
||||
const KeyRangeRef tLogDatacentersKeys(
|
||||
LiteralStringRef("\xff\x02/tLogDatacenters/"),
|
||||
LiteralStringRef("\xff\x02/tLogDatacenters0") );
|
||||
const KeyRef tLogDatacentersPrefix = tLogDatacentersKeys.begin;
|
||||
|
||||
const Key tLogDatacentersKeyFor( Optional<Value> dcID ) {
|
||||
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
|
||||
wr.serializeBytes( tLogDatacentersKeys.begin );
|
||||
wr << dcID;
|
||||
return wr.toStringRef();
|
||||
}
|
||||
Optional<Value> decodeTLogDatacentersKey( KeyRef const& key ) {
|
||||
Optional<Value> dcID;
|
||||
BinaryReader rd( key.removePrefix(tLogDatacentersKeys.begin), AssumeVersion(currentProtocolVersion) );
|
||||
rd >> dcID;
|
||||
return dcID;
|
||||
}
|
||||
|
||||
const KeyRef primaryDatacenterKey = LiteralStringRef("\xff/primaryDatacenter");
|
||||
|
||||
// serverListKeys.contains(k) iff k.startsWith( serverListKeys.begin ) because '/'+1 == '0'
|
||||
|
|
|
@ -78,7 +78,7 @@ const Value tagLocalityListValue( int8_t const& );
|
|||
Optional<Value> decodeTagLocalityListKey( KeyRef const& );
|
||||
int8_t decodeTagLocalityListValue( ValueRef const& );
|
||||
|
||||
// "\xff\x02/DatacenterReplicas/[[datacenterID]]" := "[[replicas]]"
|
||||
// "\xff\x02/datacenterReplicas/[[datacenterID]]" := "[[replicas]]"
|
||||
extern const KeyRangeRef datacenterReplicasKeys;
|
||||
extern const KeyRef datacenterReplicasPrefix;
|
||||
const Key datacenterReplicasKeyFor( Optional<Value> dcID );
|
||||
|
@ -86,6 +86,12 @@ const Value datacenterReplicasValue( int const& );
|
|||
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& );
|
||||
int decodeDatacenterReplicasValue( ValueRef const& );
|
||||
|
||||
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
|
||||
extern const KeyRangeRef tLogDatacentersKeys;
|
||||
extern const KeyRef tLogDatacentersPrefix;
|
||||
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
|
||||
Optional<Value> decodeTLogDatacentersKey( KeyRef const& );
|
||||
|
||||
extern const KeyRef primaryDatacenterKey;
|
||||
|
||||
// "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
|
||||
|
|
|
@ -635,13 +635,15 @@ bool argv_equal(const char** a1, const char** a2)
|
|||
return true;
|
||||
}
|
||||
|
||||
void kill_process(uint64_t id) {
|
||||
void kill_process(uint64_t id, bool wait = true) {
|
||||
pid_t pid = id_pid[id];
|
||||
|
||||
log_msg(SevInfo, "Killing process %d\n", pid);
|
||||
|
||||
kill(pid, SIGTERM);
|
||||
if(wait) {
|
||||
waitpid(pid, NULL, 0);
|
||||
}
|
||||
|
||||
pid_id.erase(pid);
|
||||
id_pid.erase(id);
|
||||
|
@ -1367,8 +1369,19 @@ int main(int argc, char** argv) {
|
|||
signal(SIGCHLD, SIG_IGN);
|
||||
sigprocmask(SIG_SETMASK, &normal_mask, NULL);
|
||||
|
||||
/* Send SIGHUP to all child processes */
|
||||
/* If daemonized, setsid() was called earlier so we can just kill our entire new process group */
|
||||
if(daemonize) {
|
||||
kill(0, SIGHUP);
|
||||
}
|
||||
else {
|
||||
/* Otherwise kill each process individually but don't wait on them yet */
|
||||
auto i = id_pid.begin();
|
||||
auto iEnd = id_pid.end();
|
||||
while(i != iEnd) {
|
||||
// Must advance i before calling kill_process() which erases the entry at i
|
||||
kill_process((i++)->first, false);
|
||||
}
|
||||
}
|
||||
|
||||
/* Wait for all child processes (says POSIX.1-2001) */
|
||||
/* POSIX.1-2001 specifies that if the disposition of SIGCHLD is set to SIG_IGN, then children that terminate do not become zombies and a call to wait()
|
||||
|
|
|
@ -50,6 +50,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
|
|||
Promise<Void> wakeUpTracker;
|
||||
bool inDesiredDC;
|
||||
LocalityEntry localityEntry;
|
||||
Promise<Void> updated;
|
||||
|
||||
TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, Reference<LocalitySet> storageServerSet) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC) {
|
||||
localityEntry = ((LocalityMap<UID>*) storageServerSet.getPtr())->add(ssi.locality, &id);
|
||||
|
@ -68,6 +69,9 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
|
|||
when( ErrorOr<GetPhysicalMetricsReply> rep = wait( metricsRequest ) ) {
|
||||
if( rep.present() ) {
|
||||
server->serverMetrics = rep;
|
||||
if(server->updated.canBeSet()) {
|
||||
server->updated.send(Void());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
metricsRequest = Never();
|
||||
|
@ -1465,43 +1469,42 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
|
|||
|
||||
for(int i=0; i<shards.size(); i++) {
|
||||
int maxPriority = team->getPriority();
|
||||
int maxOtherPriority = maxPriority;
|
||||
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
|
||||
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
|
||||
for( int t=0; t<teams.size(); t++) {
|
||||
if( teams[t].servers.size() ) {
|
||||
bool foundServer = false;
|
||||
for(auto tc : self->teamCollections) {
|
||||
if( tc->server_info.count( teams[t].servers[0] ) ) {
|
||||
ASSERT(!foundServer);
|
||||
foundServer = true;
|
||||
auto& info = tc->server_info[teams[t].servers[0]];
|
||||
for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
|
||||
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
|
||||
if( !t.servers.size() ) {
|
||||
maxPriority = PRIORITY_TEAM_0_LEFT;
|
||||
break;
|
||||
}
|
||||
|
||||
auto tc = self->teamCollections[t.primary ? 0 : 1];
|
||||
ASSERT(tc->primary == t.primary);
|
||||
if( tc->server_info.count( t.servers[0] ) ) {
|
||||
auto& info = tc->server_info[t.servers[0]];
|
||||
|
||||
bool found = false;
|
||||
for( int i = 0; i < info->teams.size(); i++ ) {
|
||||
if( info->teams[i]->serverIDs == teams[t].servers ) {
|
||||
if(teams[t].primary == self->primary) {
|
||||
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
|
||||
} else {
|
||||
maxOtherPriority = std::max( maxOtherPriority, info->teams[i]->getPriority() );
|
||||
}
|
||||
for( int k = 0; k < info->teams.size(); k++ ) {
|
||||
if( info->teams[k]->serverIDs == t.servers ) {
|
||||
maxPriority = std::max( maxPriority, info->teams[k]->getPriority() );
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
TEST(!found); // A removed team is still associated with a shard in SABTF
|
||||
//If we cannot find the team, it could be a bad team so assume unhealthy priority
|
||||
if(!found) {
|
||||
maxPriority = std::max<int>( maxPriority, PRIORITY_TEAM_UNHEALTHY );
|
||||
}
|
||||
}
|
||||
TEST(!foundServer); // A removed server is still associated with a team in SABTF
|
||||
} else {
|
||||
TEST(true); // A removed server is still associated with a team in SABTF
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( maxPriority == team->getPriority() || lastPriority > maxPriority ) {
|
||||
RelocateShard rs;
|
||||
rs.keys = shards[i];
|
||||
rs.priority = std::max(maxPriority,maxOtherPriority);
|
||||
rs.priority = maxPriority;
|
||||
|
||||
self->output.send(rs);
|
||||
if(g_random->random01() < 0.01) {
|
||||
|
@ -1513,10 +1516,6 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
|
|||
.detail("TeamFailedMachines", team->getServerIDs().size()-serversLeft)
|
||||
.detail("TeamOKMachines", serversLeft);
|
||||
}
|
||||
} else {
|
||||
TraceEvent("RelocationNotSentToDDQ", self->masterId)
|
||||
.detail("Team", team->getDesc());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if(logTeamEvents) {
|
||||
|
@ -1705,7 +1704,7 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
|
|||
}
|
||||
}
|
||||
wait(self->addSubsetComplete.getFuture());
|
||||
TraceEvent("DDRemovingBadTeams", self->masterId);
|
||||
TraceEvent("DDRemovingBadTeams", self->masterId).detail("Primary", self->primary);
|
||||
for(auto it : self->badTeams) {
|
||||
it->tracker.cancel();
|
||||
}
|
||||
|
@ -1878,6 +1877,10 @@ ACTOR Future<Void> storageServerTracker(
|
|||
changes.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
|
||||
}
|
||||
|
||||
if(server->updated.canBeSet()) {
|
||||
server->updated.send(Void());
|
||||
}
|
||||
|
||||
// Remove server from FF/serverList
|
||||
wait( removeStorageServer( cx, server->id, lock ) );
|
||||
|
||||
|
@ -2100,7 +2103,13 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
|
|||
}
|
||||
|
||||
ACTOR Future<Void> updateReplicasKey(DDTeamCollection* self, Optional<Key> dcId) {
|
||||
wait(self->initialFailureReactionDelay);
|
||||
std::vector<Future<Void>> serverUpdates;
|
||||
|
||||
for(auto& it : self->server_info) {
|
||||
serverUpdates.push_back(it.second->updated.getFuture());
|
||||
}
|
||||
|
||||
wait(self->initialFailureReactionDelay && waitForAll(serverUpdates));
|
||||
loop {
|
||||
while(self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
|
||||
TraceEvent("DDUpdatingStalled", self->masterId).detail("DcId", printable(dcId)).detail("ZeroHealthy", self->zeroHealthyTeams->get()).detail("ProcessingUnhealthy", self->processingUnhealthy->get());
|
||||
|
@ -2144,7 +2153,6 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
|
|||
// Keep track of servers and teams -- serves requests for getRandomTeam
|
||||
ACTOR Future<Void> dataDistributionTeamCollection(
|
||||
Reference<DDTeamCollection> teamCollection,
|
||||
std::vector<DDTeamCollection*> teamCollections,
|
||||
Reference<InitialDataDistribution> initData,
|
||||
TeamCollectionInterface tci,
|
||||
Reference<AsyncVar<struct ServerDBInfo>> db)
|
||||
|
@ -2154,8 +2162,6 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
state PromiseStream<Void> serverRemoved;
|
||||
state Future<Void> error = actorCollection( self->addActor.getFuture() );
|
||||
|
||||
self->teamCollections = teamCollections;
|
||||
|
||||
try {
|
||||
wait( DDTeamCollection::init( self, initData ) );
|
||||
initData = Reference<InitialDataDistribution>();
|
||||
|
@ -2165,19 +2171,21 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
wait( self->readyToStart || error );
|
||||
TraceEvent("DDTeamCollectionReadyToStart", self->masterId).detail("Primary", self->primary);
|
||||
|
||||
self->addActor.send(storageRecruiter( self, db ));
|
||||
self->addActor.send(monitorStorageServerRecruitment( self ));
|
||||
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
|
||||
self->addActor.send(trackExcludedServers( self ));
|
||||
|
||||
if(self->badTeamRemover.isReady()) {
|
||||
self->badTeamRemover = removeBadTeams(self);
|
||||
self->addActor.send(self->badTeamRemover);
|
||||
}
|
||||
|
||||
if(self->includedDCs.size()) {
|
||||
//start this actor before any potential recruitments can happen
|
||||
self->addActor.send(updateReplicasKey(self, self->includedDCs[0]));
|
||||
}
|
||||
|
||||
self->addActor.send(storageRecruiter( self, db ));
|
||||
self->addActor.send(monitorStorageServerRecruitment( self ));
|
||||
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
|
||||
self->addActor.send(trackExcludedServers( self ));
|
||||
|
||||
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
|
||||
|
||||
loop choose {
|
||||
|
@ -2414,6 +2422,11 @@ ACTOR Future<Void> dataDistribution(
|
|||
if(configuration.usableRegions > 1) {
|
||||
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
|
||||
}
|
||||
if(g_network->isSimulated()) {
|
||||
TraceEvent("DDInitShard").detail("Keys", printable(keys)).detail("PrimarySrc", describe(initData->shards[shard].primarySrc)).detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
|
||||
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest)).detail("RemoteDest", describe(initData->shards[shard].remoteDest));
|
||||
}
|
||||
|
||||
shardsAffectedByTeamFailure->moveShard(keys, teams);
|
||||
if(initData->shards[shard].hasDest) {
|
||||
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
|
||||
|
@ -2451,21 +2464,17 @@ ACTOR Future<Void> dataDistribution(
|
|||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
|
||||
vector<Reference<DDTeamCollection>> teamCollections;
|
||||
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ) );
|
||||
if (configuration.usableRegions > 1) {
|
||||
teamCollections.push_back( Reference<DDTeamCollection>( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) ) );
|
||||
}
|
||||
|
||||
vector<DDTeamCollection*> teamCollectionsPtrs;
|
||||
for(auto it : teamCollections) {
|
||||
teamCollectionsPtrs.push_back(it.getPtr());
|
||||
}
|
||||
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[0], teamCollectionsPtrs, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
|
||||
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
|
||||
if (configuration.usableRegions > 1) {
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( teamCollections[1], teamCollectionsPtrs, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, mi.id(), lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy) );
|
||||
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
|
||||
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], db ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
}
|
||||
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], db ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back(yieldPromiseStream(output.getFuture(), input));
|
||||
|
||||
wait( waitForAll( actors ) );
|
||||
|
|
|
@ -154,9 +154,14 @@ public:
|
|||
|
||||
int getNumberOfShards( UID ssID );
|
||||
vector<KeyRange> getShardsFor( Team team );
|
||||
vector<Team> getTeamsFor( KeyRangeRef keys );
|
||||
|
||||
//The first element of the pair is either the source for non-moving shards or the destination team for in-flight shards
|
||||
//The second element of the pair is all previous sources for in-flight shards
|
||||
std::pair<vector<Team>,vector<Team>> getTeamsFor( KeyRangeRef keys );
|
||||
|
||||
void defineShard( KeyRangeRef keys );
|
||||
void moveShard( KeyRangeRef keys, std::vector<Team> destinationTeam );
|
||||
void finishMove( KeyRangeRef keys );
|
||||
void check();
|
||||
private:
|
||||
struct OrderByTeamKey {
|
||||
|
@ -167,7 +172,7 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
KeyRangeMap< vector<Team> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge
|
||||
KeyRangeMap< std::pair<vector<Team>,vector<Team>> > shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge, or when usable_regions > 1
|
||||
std::set< std::pair<Team,KeyRange>, OrderByTeamKey > team_shards;
|
||||
std::map< UID, int > storageServerShards;
|
||||
|
||||
|
|
|
@ -1031,6 +1031,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
}
|
||||
|
||||
self->bytesWritten += metrics.bytes;
|
||||
self->shardsAffectedByTeamFailure->finishMove(rd.keys);
|
||||
relocationComplete.send( rd );
|
||||
return Void();
|
||||
} else {
|
||||
|
|
|
@ -701,7 +701,7 @@ int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
|
|||
return storageServerShards[ssID];
|
||||
}
|
||||
|
||||
vector<ShardsAffectedByTeamFailure::Team> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
|
||||
std::pair<vector<ShardsAffectedByTeamFailure::Team>,vector<ShardsAffectedByTeamFailure::Team>> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
|
||||
return shard_teams[keys.begin];
|
||||
}
|
||||
|
||||
|
@ -720,14 +720,20 @@ void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
|
|||
}
|
||||
|
||||
void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
|
||||
std::set<Team> teams;
|
||||
std::vector<Team> teams;
|
||||
std::vector<Team> prevTeams;
|
||||
auto rs = shard_teams.intersectingRanges(keys);
|
||||
for(auto it = rs.begin(); it != rs.end(); ++it) {
|
||||
for(auto t=it->value().begin(); t!=it->value().end(); ++t) {
|
||||
teams.insert( *t );
|
||||
for(auto t=it->value().first.begin(); t!=it->value().first.end(); ++t) {
|
||||
teams.push_back( *t );
|
||||
erase(*t, it->range());
|
||||
}
|
||||
for(auto t=it->value().second.begin(); t!=it->value().second.end(); ++t) {
|
||||
prevTeams.push_back( *t );
|
||||
}
|
||||
}
|
||||
uniquify(teams);
|
||||
uniquify(prevTeams);
|
||||
|
||||
/*TraceEvent("ShardsAffectedByTeamFailureDefine")
|
||||
.detail("KeyBegin", printable(keys.begin))
|
||||
|
@ -735,12 +741,12 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
|
|||
.detail("TeamCount", teams.size()); */
|
||||
|
||||
auto affectedRanges = shard_teams.getAffectedRangesAfterInsertion(keys);
|
||||
shard_teams.insert( keys, vector<Team>(teams.begin(), teams.end()) );
|
||||
shard_teams.insert( keys, std::make_pair(teams, prevTeams) );
|
||||
|
||||
for(auto r=affectedRanges.begin(); r != affectedRanges.end(); ++r) {
|
||||
auto& teams = shard_teams[r->begin];
|
||||
for(auto t=teams.begin(); t!=teams.end(); ++t) {
|
||||
insert(*t, *r);
|
||||
auto& t = shard_teams[r->begin];
|
||||
for(auto it=t.first.begin(); it!=t.first.end(); ++it) {
|
||||
insert(*it, *r);
|
||||
}
|
||||
}
|
||||
check();
|
||||
|
@ -754,33 +760,39 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
|
|||
.detail("NewTeam", describe(destinationTeam));*/
|
||||
|
||||
auto ranges = shard_teams.intersectingRanges( keys );
|
||||
std::vector< std::pair<std::vector<Team>,KeyRange> > modifiedShards;
|
||||
std::vector< std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange> > modifiedShards;
|
||||
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
|
||||
if( keys.contains( it->range() ) ) {
|
||||
// erase the many teams that were associated with this one shard
|
||||
for(auto t = it->value().begin(); t != it->value().end(); ++t) {
|
||||
for(auto t = it->value().first.begin(); t != it->value().first.end(); ++t) {
|
||||
erase(*t, it->range());
|
||||
}
|
||||
|
||||
// save this modification for later insertion
|
||||
modifiedShards.push_back( std::pair<std::vector<Team>,KeyRange>( destinationTeams, it->range() ) );
|
||||
std::vector<Team> prevTeams = it->value().second;
|
||||
prevTeams.insert(prevTeams.end(), it->value().first.begin(), it->value().first.end());
|
||||
uniquify(prevTeams);
|
||||
|
||||
modifiedShards.push_back( std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange>( std::make_pair(destinationTeams, prevTeams), it->range() ) );
|
||||
} else {
|
||||
// for each range that touches this move, add our team as affecting this range
|
||||
for(auto& team : destinationTeams) {
|
||||
insert(team, it->range());
|
||||
}
|
||||
|
||||
// if we are not in the list of teams associated with this shard, add us in
|
||||
auto& teams = it->value();
|
||||
if( std::find( teams.begin(), teams.end(), team ) == teams.end() ) {
|
||||
teams.push_back( team );
|
||||
}
|
||||
}
|
||||
teams.second.insert(teams.second.end(), teams.first.begin(), teams.first.end());
|
||||
uniquify(teams.second);
|
||||
|
||||
teams.first.insert(teams.first.end(), destinationTeams.begin(), destinationTeams.end());
|
||||
uniquify(teams.first);
|
||||
}
|
||||
}
|
||||
|
||||
// we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now
|
||||
for( int i = 0; i < modifiedShards.size(); i++ ) {
|
||||
for( auto& t : modifiedShards[i].first) {
|
||||
for( auto& t : modifiedShards[i].first.first) {
|
||||
insert(t, modifiedShards[i].second);
|
||||
}
|
||||
shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first );
|
||||
|
@ -789,19 +801,26 @@ void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team>
|
|||
check();
|
||||
}
|
||||
|
||||
void ShardsAffectedByTeamFailure::finishMove( KeyRangeRef keys ) {
|
||||
auto ranges = shard_teams.containedRanges(keys);
|
||||
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
|
||||
it.value().second.clear();
|
||||
}
|
||||
}
|
||||
|
||||
void ShardsAffectedByTeamFailure::check() {
|
||||
if (EXPENSIVE_VALIDATION) {
|
||||
for(auto t = team_shards.begin(); t != team_shards.end(); ++t) {
|
||||
auto i = shard_teams.rangeContaining(t->second.begin);
|
||||
if (i->range() != t->second ||
|
||||
!std::count(i->value().begin(), i->value().end(), t->first))
|
||||
!std::count(i->value().first.begin(), i->value().first.end(), t->first))
|
||||
{
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
auto rs = shard_teams.ranges();
|
||||
for(auto i = rs.begin(); i != rs.end(); ++i)
|
||||
for(vector<Team>::iterator t = i->value().begin(); t != i->value().end(); ++t)
|
||||
for(vector<Team>::iterator t = i->value().first.begin(); t != i->value().first.end(); ++t)
|
||||
if (!team_shards.count( std::make_pair( *t, i->range() ) )) {
|
||||
std::string teamDesc, shards;
|
||||
for(int k=0; k<t->servers.size(); k++)
|
||||
|
|
|
@ -264,9 +264,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
// Master Server
|
||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||
// by delay()ing for this amount of time between accepted batches of TransactionRequests.
|
||||
bool fastBalancing = randomize && BUGGIFY;
|
||||
init( COMMIT_SLEEP_TIME, 0.0001 ); if( randomize && BUGGIFY ) COMMIT_SLEEP_TIME = 0;
|
||||
init( KEY_BYTES_PER_SAMPLE, 2e4 ); if( fastBalancing ) KEY_BYTES_PER_SAMPLE = 1e3;
|
||||
init( MIN_BALANCE_TIME, 0.2 );
|
||||
init( MIN_BALANCE_DIFFERENCE, 10000 );
|
||||
init( MIN_BALANCE_DIFFERENCE, 1e6 ); if( fastBalancing ) MIN_BALANCE_DIFFERENCE = 1e4;
|
||||
init( SECONDS_BEFORE_NO_FAILURE_DELAY, 8 * 3600 );
|
||||
init( MAX_TXS_SEND_MEMORY, 1e7 ); if( randomize && BUGGIFY ) MAX_TXS_SEND_MEMORY = 1e5;
|
||||
init( MAX_RECOVERY_VERSIONS, 200 * VERSIONS_PER_SECOND );
|
||||
|
|
|
@ -215,6 +215,7 @@ public:
|
|||
double MAX_RECOVERY_TIME;
|
||||
|
||||
// Resolver
|
||||
int64_t KEY_BYTES_PER_SAMPLE;
|
||||
int64_t SAMPLE_OFFSET_PER_KEY;
|
||||
double SAMPLE_EXPIRATION_TIME;
|
||||
double SAMPLE_POLL_TIME;
|
||||
|
|
|
@ -812,8 +812,9 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
|
|||
state Future<Standalone<RangeResultRef>> fTags = tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
state Future<Standalone<RangeResultRef>> fHistoryTags = tr.getRange( serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
state Future<Standalone<RangeResultRef>> fTagLocalities = tr.getRange( tagLocalityListKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
state Future<Standalone<RangeResultRef>> fTLogDatacenters = tr.getRange( tLogDatacentersKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
wait( success(fListKey) && success(fTags) && success(fHistoryTags) && success(fTagLocalities) );
|
||||
wait( success(fListKey) && success(fTags) && success(fHistoryTags) && success(fTagLocalities) && success(fTLogDatacenters) );
|
||||
|
||||
if (!fListKey.get().present()) {
|
||||
if (retry) {
|
||||
|
@ -839,6 +840,14 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
|
|||
allLocalities.insert(t.locality);
|
||||
}
|
||||
|
||||
std::map<Optional<Value>,int8_t> dcId_locality;
|
||||
for(auto& kv : fTagLocalities.get()) {
|
||||
dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
|
||||
}
|
||||
for(auto& it : fTLogDatacenters.get()) {
|
||||
allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]);
|
||||
}
|
||||
|
||||
if(locality >= 0 && !allLocalities.count(locality) ) {
|
||||
for(auto& it : fTagLocalities.get()) {
|
||||
if( locality == decodeTagLocalityListValue(it.value) ) {
|
||||
|
|
|
@ -44,7 +44,7 @@ struct ProxyRequestsInfo {
|
|||
namespace{
|
||||
struct Resolver : ReferenceCounted<Resolver> {
|
||||
Resolver( UID dbgid, int proxyCount, int resolverCount )
|
||||
: dbgid(dbgid), proxyCount(proxyCount), resolverCount(resolverCount), version(-1), conflictSet( newConflictSet() ), iopsSample( SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE ), debugMinRecentStateVersion(0)
|
||||
: dbgid(dbgid), proxyCount(proxyCount), resolverCount(resolverCount), version(-1), conflictSet( newConflictSet() ), iopsSample( SERVER_KNOBS->KEY_BYTES_PER_SAMPLE ), debugMinRecentStateVersion(0)
|
||||
{
|
||||
}
|
||||
~Resolver() {
|
||||
|
|
|
@ -417,7 +417,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return minVersionWhenReady( waitForAll(quorumResults), allReplies);
|
||||
}
|
||||
|
||||
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {
|
||||
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore ) {
|
||||
int bestSet = 0;
|
||||
std::vector<Reference<LogSet>> localSets;
|
||||
Version lastBegin = 0;
|
||||
|
@ -458,12 +458,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
break;
|
||||
}
|
||||
TraceEvent("TLogPeekAllDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
|
||||
if(throwIfDead) {
|
||||
throw worker_removed();
|
||||
} else {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
}
|
||||
|
||||
int bestOldSet = 0;
|
||||
std::vector<Reference<LogSet>> localOldSets;
|
||||
|
@ -489,12 +485,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
i++;
|
||||
continue;
|
||||
}
|
||||
if(throwIfDead) {
|
||||
throw worker_removed();
|
||||
} else {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
}
|
||||
if(thisSpecial) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
|
@ -587,7 +579,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
if(tag.locality == tagLocalityRemoteLog) {
|
||||
return peekRemote(dbgid, begin, tag, parallelGetMore);
|
||||
} else {
|
||||
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore, false);
|
||||
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -630,7 +622,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
}
|
||||
if(bestSet == -1) {
|
||||
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
|
||||
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LogCount", logCount);
|
||||
if(useMergePeekCursors || logCount > 1) {
|
||||
throw worker_removed();
|
||||
} else {
|
||||
|
@ -672,35 +664,35 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
int bestOldSet = -1;
|
||||
logCount = 0;
|
||||
bool nextFoundSpecial = false;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->locality != tagLocalitySatellite) {
|
||||
logCount++;
|
||||
}
|
||||
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded)) {
|
||||
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
|
||||
foundSpecial = true;
|
||||
nextFoundSpecial = true;
|
||||
}
|
||||
if(foundSpecial && !oldLogData[i].tLogs[t]->isLocal) {
|
||||
TraceEvent("TLogPeekLocalRemoteBeforeSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i);
|
||||
throw worker_removed();
|
||||
}
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(foundSpecial) {
|
||||
TraceEvent("TLogPeekLocalFoundSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
|
||||
cursors.push_back(peekAll(dbgid, begin, std::min(lastBegin, end), tag, useMergePeekCursors, !useMergePeekCursors));
|
||||
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
|
||||
break;
|
||||
}
|
||||
|
||||
if(bestOldSet == -1) {
|
||||
if(oldLogData[i].logRouterTags == 0 || logCount > 1) {
|
||||
TraceEvent("TLogPeekLocalNoLogRouterTags", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount);
|
||||
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()).detail("Idx", i).detail("LogRouterTags", oldLogData[i].logRouterTags).detail("LogCount", logCount).detail("FoundSpecial", foundSpecial);
|
||||
if(oldLogData[i].logRouterTags == 0 || logCount > 1 || foundSpecial) {
|
||||
throw worker_removed();
|
||||
}
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
||||
foundSpecial = nextFoundSpecial;
|
||||
|
||||
Version thisBegin = std::max(oldLogData[i].tLogs[bestOldSet]->startVersion, begin);
|
||||
if(thisBegin < lastBegin) {
|
||||
if(thisBegin < end) {
|
||||
|
@ -723,7 +715,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
Version end = getEnd();
|
||||
TraceEvent("TLogPeekSpecial", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
|
||||
if(localEnd == invalidVersion || localEnd <= begin) {
|
||||
return peekAll(dbgid, begin, end, tag, true, false);
|
||||
return peekAll(dbgid, begin, end, tag, true);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -736,13 +728,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
cursors.resize(2);
|
||||
cursors[1] = peekLocal(dbgid, tag, begin, localEnd, true, peekLocality);
|
||||
cursors[0] = peekAll(dbgid, localEnd, end, tag, true, false);
|
||||
cursors[0] = peekAll(dbgid, localEnd, end, tag, true);
|
||||
epochEnds.push_back(LogMessageVersion(localEnd));
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
} catch( Error& e ) {
|
||||
if(e.code() == error_code_worker_removed) {
|
||||
return peekAll(dbgid, begin, end, tag, true, false);
|
||||
return peekAll(dbgid, begin, end, tag, true);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
|
|
@ -783,6 +783,8 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a second,
|
||||
// a provisional master is initialized, and an "emergency transaction" is submitted that might change the configuration so that we can
|
||||
// finish recovery.
|
||||
|
||||
state std::map<Optional<Value>,int8_t> originalLocalityMap = self->dcId_locality;
|
||||
state Future<vector<Standalone<CommitTransactionRef>>> recruitments = recruitEverything( self, seedServers, oldLogSystem );
|
||||
loop {
|
||||
state Future<Standalone<CommitTransactionRef>> provisional = provisionalMaster(self, delay(1.0));
|
||||
|
@ -814,6 +816,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
}
|
||||
|
||||
if(self->configuration != oldConf) { //confChange does not trigger when including servers
|
||||
self->dcId_locality = originalLocalityMap;
|
||||
recruitments = recruitEverything( self, seedServers, oldLogSystem );
|
||||
}
|
||||
}
|
||||
|
@ -1263,6 +1266,14 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
|
||||
tr.set(recoveryCommitRequest.arena, primaryDatacenterKey, self->myInterface.locality.dcId().present() ? self->myInterface.locality.dcId().get() : StringRef());
|
||||
|
||||
tr.clear(recoveryCommitRequest.arena, tLogDatacentersKeys);
|
||||
for(auto& dc : self->primaryDcId) {
|
||||
tr.set(recoveryCommitRequest.arena, tLogDatacentersKeyFor(dc), StringRef());
|
||||
}
|
||||
for(auto& dc : self->remoteDcIds) {
|
||||
tr.set(recoveryCommitRequest.arena, tLogDatacentersKeyFor(dc), StringRef());
|
||||
}
|
||||
|
||||
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore, NULL, NULL);
|
||||
mmApplied = tr.mutations.size();
|
||||
|
||||
|
|
|
@ -29,13 +29,16 @@
|
|||
static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory" };
|
||||
static const char* redundancies[] = { "single", "double", "triple" };
|
||||
|
||||
std::string generateRegions(int& regions) {
|
||||
std::string generateRegions() {
|
||||
std::string result;
|
||||
if(g_simulator.physicalDatacenters == 1 || (g_simulator.physicalDatacenters == 2 && g_random->random01() < 0.25) || g_simulator.physicalDatacenters == 3) {
|
||||
regions = 1;
|
||||
return " usable_regions=1 regions=\"\"";
|
||||
}
|
||||
|
||||
if(g_random->random01() < 0.25) {
|
||||
return format(" usable_regions=%d", g_random->randomInt(1,3));
|
||||
}
|
||||
|
||||
int primaryPriority = 1;
|
||||
int remotePriority = -1;
|
||||
double priorityType = g_random->random01();
|
||||
|
@ -180,16 +183,9 @@ std::string generateRegions(int& regions) {
|
|||
|
||||
if(g_random->random01() < 0.8) {
|
||||
regionArr.push_back(remoteObj);
|
||||
if(g_random->random01() < 0.8) {
|
||||
regions = 2;
|
||||
result += " usable_regions=2";
|
||||
} else {
|
||||
regions = 1;
|
||||
result += " usable_regions=1";
|
||||
if(g_random->random01() < 0.25) {
|
||||
result += format(" usable_regions=%d", g_random->randomInt(1,3));
|
||||
}
|
||||
} else {
|
||||
regions = 1;
|
||||
result += " usable_regions=1";
|
||||
}
|
||||
|
||||
result += " regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none);
|
||||
|
@ -320,15 +316,14 @@ struct ConfigureDatabaseWorkload : TestWorkload {
|
|||
if(g_simulator.physicalDatacenters == 2 || g_simulator.physicalDatacenters > 3) {
|
||||
maxRedundancies--; //There are not enough machines for triple replication in fearless configurations
|
||||
}
|
||||
int redundancy = g_random->randomInt( 0, maxRedundancies);
|
||||
int redundancy = g_random->randomInt(0, maxRedundancies);
|
||||
std::string config = redundancies[redundancy];
|
||||
|
||||
if(config == "triple" && g_simulator.physicalDatacenters == 3) {
|
||||
config = "three_data_hall ";
|
||||
}
|
||||
|
||||
state int regions = 1;
|
||||
config += generateRegions(regions);
|
||||
config += generateRegions();
|
||||
|
||||
if (g_random->random01() < 0.5) config += " logs=" + format("%d", randomRoleNumber());
|
||||
if (g_random->random01() < 0.5) config += " proxies=" + format("%d", randomRoleNumber());
|
||||
|
|
|
@ -374,7 +374,10 @@ void FastAllocator<Size>::getMagazine() {
|
|||
ASSERT( block == desiredBlock );
|
||||
#endif
|
||||
#else
|
||||
block = (void **)::allocate(magazine_size * Size, true);
|
||||
// FIXME: We should be able to allocate larger magazine sizes here if we
|
||||
// detect that the underlying system supports hugepages. Using hugepages
|
||||
// with smaller-than-2MiB magazine sizes strands memory. See issue #909.
|
||||
block = (void **)::allocate(magazine_size * Size, false);
|
||||
#endif
|
||||
|
||||
//void** block = new void*[ magazine_size * PSize ];
|
||||
|
|
|
@ -475,6 +475,9 @@ inline static void flushOutputStreams() { fflush(NULL); }
|
|||
|
||||
#define crashAndDie() (*(volatile int*)0 = 0)
|
||||
|
||||
#ifdef _WIN32
|
||||
#define strcasecmp stricmp
|
||||
#endif
|
||||
|
||||
#if defined(__GNUG__)
|
||||
#define DEFAULT_CONSTRUCTORS(X) \
|
||||
|
|
Loading…
Reference in New Issue