Merge pull request #3518 from etschannen/master
Merge release 6.3 into master
This commit is contained in:
commit
7b29f7eff4
|
@ -69,10 +69,6 @@ struct VersionedMessage {
|
|||
}
|
||||
};
|
||||
|
||||
static bool sameArena(const Arena& a, const Arena& b) {
|
||||
return a.impl.getPtr() == b.impl.getPtr();
|
||||
}
|
||||
|
||||
struct BackupData {
|
||||
const UID myId;
|
||||
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
|
||||
|
@ -340,11 +336,10 @@ struct BackupData {
|
|||
for (int i = 0; i < num; i++) {
|
||||
const Arena& a = messages[i].arena;
|
||||
const Arena& b = messages[i + 1].arena;
|
||||
if (!sameArena(a, b)) {
|
||||
if (!a.sameArena(b)) {
|
||||
bytes += messages[i].bytes;
|
||||
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId)
|
||||
.detail("Release", messages[i].bytes)
|
||||
.detail("Arena", (void*)a.impl.getPtr());
|
||||
.detail("Release", messages[i].bytes);
|
||||
}
|
||||
}
|
||||
lock->release(bytes);
|
||||
|
@ -904,10 +899,9 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|||
// Note we aggressively peek (uncommitted) messages, but only committed
|
||||
// messages/mutations will be flushed to disk/blob in uploadData().
|
||||
while (r->hasMessage()) {
|
||||
if (!sameArena(prev, r->arena())) {
|
||||
if (!prev.sameArena(r->arena())) {
|
||||
TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId)
|
||||
.detail("Take", r->arena().getSize())
|
||||
.detail("Arena", (void*)r->arena().impl.getPtr())
|
||||
.detail("Current", self->lock->activePermits());
|
||||
|
||||
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));
|
||||
|
|
|
@ -943,6 +943,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
allHealthy = true;
|
||||
anyWithSource = false;
|
||||
bestTeams.clear();
|
||||
// Get team from teamCollections in diffrent DCs and find the best one
|
||||
while( tciIndex < self->teamCollections.size() ) {
|
||||
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
|
||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
|
||||
|
@ -951,6 +952,9 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, inflightPenalty);
|
||||
req.src = rd.src;
|
||||
req.completeSources = rd.completeSources;
|
||||
// bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any
|
||||
// server that hosts the relocateData. This is possible, for example, in a fearless configuration
|
||||
// when the remote DC is just brought up.
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
||||
// If a DC has no healthy team, we stop checking the other DCs until
|
||||
// the unhealthy DC is healthy again or is excluded.
|
||||
|
@ -967,7 +971,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
if(bestTeam.second) {
|
||||
anyWithSource = true;
|
||||
}
|
||||
|
||||
|
||||
bestTeams.push_back(std::make_pair(bestTeam.first.get(), bestTeam.second));
|
||||
tciIndex++;
|
||||
}
|
||||
|
@ -995,8 +999,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(serverIds, i == 0));
|
||||
|
||||
if (allHealthy && anyWithSource && !bestTeams[i].second) {
|
||||
// When all teams in bestTeams[i] do not hold the shard
|
||||
// We randomly choose a server in bestTeams[i] as the shard's destination and
|
||||
// When all servers in bestTeams[i] do not hold the shard (!bestTeams[i].second), it indicates
|
||||
// the bestTeams[i] is in a new DC where data has not been replicated to.
|
||||
// To move data (specified in RelocateShard) to bestTeams[i] in the new DC AND reduce data movement
|
||||
// across DC, we randomly choose a server in bestTeams[i] as the shard's destination, and
|
||||
// move the shard to the randomly chosen server (in the remote DC), which will later
|
||||
// propogate its data to the servers in the same team. This saves data movement bandwidth across DC
|
||||
int idx = deterministicRandom()->randomInt(0, serverIds.size());
|
||||
|
|
|
@ -114,7 +114,11 @@ public:
|
|||
friend void* operator new ( size_t size, Arena& p );
|
||||
friend void* operator new[] ( size_t size, Arena& p );
|
||||
|
||||
//private:
|
||||
bool sameArena(const Arena& other) const {
|
||||
return impl.getPtr() == other.impl.getPtr();
|
||||
}
|
||||
|
||||
private:
|
||||
Reference<struct ArenaBlock> impl;
|
||||
};
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{0EFA1E57-0081-4CB5-8502-F0779A0C59F5}'
|
||||
Id='{C2791390-0993-4F6B-9708-ED2A4558A013}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
Loading…
Reference in New Issue