Only ensure a quorum of TLogs are unlocked to confirm the epoch hasn't ended.

Currently, GRV will wait to hear back from (almost) all TLogs to confirm that
they're unlocked and that the current epoch hasn't ended.  This confirms that
there isn't a new set of proxies and using the commit version from the old set
of proxies would violate causal consistency.

However, during recovery, we ensure that no quorum of TLogs exists before
starting a new epoch and allowing new commits on the new TLogs.  Thus, we only
need to wait until we have a quorum of TLogs that are unlocked.

This should be a significant improvement in latency particularly for the cases
when we start running >10 TLogs.
This commit is contained in:
Alex Miller 2017-08-28 13:46:14 -07:00
parent 4c1d61cd08
commit f8486d1368
1 changed files with 50 additions and 8 deletions

View File

@ -329,15 +329,57 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Future<Void> confirmEpochLive(Optional<UID> debugID) {
// Returns success after confirming that pushes in the current epoch are still possible
// FIXME: This is way too conservative?
vector<Future<Void>> alive;
for(auto& t : logServers) {
if( t->get().present() ) alive.push_back( brokenPromiseToNever( t->get().interf().confirmRunning.getReply(TLogConfirmRunningRequest(debugID), TaskTLogConfirmRunningReply ) ) );
else alive.push_back( Never() );
ACTOR static Future<Void> confirmEpochLive_internal(TagPartitionedLogSystem* self, Optional<UID> debugID) {
state vector<Future<Void>> alive;
for(auto& t : self->logServers) {
if( t->get().present() ) {
alive.push_back( brokenPromiseToNever(
t->get().interf().confirmRunning.getReply( TLogConfirmRunningRequest(debugID),
TaskTLogConfirmRunningReply ) ) );
} else {
alive.push_back( Never() );
}
}
return quorum( alive, alive.size() - tLogWriteAntiQuorum );
loop {
LocalityGroup locked;
std::vector<LocalityData> unlocked, unused;
for (int i = 0; i < alive.size(); i++) {
if (alive[i].isReady() && !alive[i].isError()) {
locked.add(self->tLogLocalities[i]);
} else {
unlocked.push_back(self->tLogLocalities[i]);
}
}
bool quorum_obtained = locked.validate(self->tLogPolicy);
if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) {
quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false);
}
if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) {
ASSERT(quorum_obtained);
}
if (quorum_obtained) {
return Void();
}
// The current set of responders that we have weren't enough to form a quorum, so we must
// wait for more responses and try again.
std::vector<Future<Void>> changes;
for (int i = 0; i < alive.size(); i++) {
if (!alive[i].isReady()) {
changes.push_back( ready(alive[i]) );
} else {
changes.push_back( self->logServers[i]->onChange() );
}
}
ASSERT(changes.size() != 0);
Void _ = wait( waitForAny(changes) );
}
}
// Returns success after confirming that pushes in the current epoch are still possible.
virtual Future<Void> confirmEpochLive(Optional<UID> debugID) {
return confirmEpochLive_internal(this, debugID);
}
virtual Future<Reference<ILogSystem>> newEpoch( vector<WorkerInterface> availableLogServers, DatabaseConfiguration const& config, LogEpoch recoveryCount ) {