Merge pull request #147 from cie/alexmiller/grvtlogs

Only verify a quorum of TLogs are unlocked for a GRV request
This commit is contained in:
Alex Miller 2017-09-13 16:07:25 -07:00 committed by GitHub Enterprise
commit 5e14f19875
1 changed files with 237 additions and 21 deletions

View File

@ -329,15 +329,59 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual Future<Void> confirmEpochLive(Optional<UID> debugID) {
// Returns success after confirming that pushes in the current epoch are still possible
// FIXME: This is way too conservative?
vector<Future<Void>> alive;
for(auto& t : logServers) {
if( t->get().present() ) alive.push_back( brokenPromiseToNever( t->get().interf().confirmRunning.getReply(TLogConfirmRunningRequest(debugID), TaskTLogConfirmRunningReply ) ) );
else alive.push_back( Never() );
ACTOR static Future<Void> confirmEpochLive_internal(TagPartitionedLogSystem* self, Optional<UID> debugID) {
state vector<Future<Void>> alive;
for(auto& t : self->logServers) {
if( t->get().present() ) {
alive.push_back( brokenPromiseToNever(
t->get().interf().confirmRunning.getReply( TLogConfirmRunningRequest(debugID),
TaskTLogConfirmRunningReply ) ) );
} else {
alive.push_back( Never() );
return quorum( alive, alive.size() - tLogWriteAntiQuorum );
loop {
LocalityGroup locked;
std::vector<LocalityData> unlocked, unused;
for (int i = 0; i < alive.size(); i++) {
if (alive[i].isReady() && !alive[i].isError()) {
} else {
bool quorum_obtained = locked.validate(self->tLogPolicy);
if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) {
quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false);
if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) {
if (quorum_obtained) {
return Void();
// The current set of responders that we have weren't enough to form a quorum, so we must
// wait for more responses and try again.
std::vector<Future<Void>> changes;
for (int i = 0; i < alive.size(); i++) {
if (!alive[i].isReady()) {
changes.push_back( ready(alive[i]) );
} else if (alive[i].isReady() && alive[i].isError() &&
alive[i].getError().code() == error_code_tlog_stopped) {
// All commits must go to all TLogs. If any TLog is stopped, then our epoch has ended.
return Never();
ASSERT(changes.size() != 0);
Void _ = wait( waitForAny(changes) );
// Returns success after confirming that pushes in the current epoch are still possible.
virtual Future<Void> confirmEpochLive(Optional<UID> debugID) {
return confirmEpochLive_internal(this, debugID);
virtual Future<Reference<ILogSystem>> newEpoch( vector<WorkerInterface> availableLogServers, DatabaseConfiguration const& config, LogEpoch recoveryCount ) {
@ -522,7 +566,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// Creates a new logSystem representing the (now frozen) epoch
// No other important side effects.
// The writeQuorum in the master info is from the previous configuration
state vector<Future<TLogLockResult>> tLogReply;
state vector<Future<TLogLockResult>> tLogReply(prevState.tLogs.size());
if (!prevState.tLogs.size()) {
// This is a brand new database
@ -545,7 +589,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// To ensure consistent recovery, the number of servers NOT in the write quorum plus the number of servers NOT in the read quorum
// have to be strictly less than the replication factor. Otherwise there could be a replica set consistent entirely of servers that
// are out of date due to not being in the write quorum or unavailable due to not being in the read quorum.
// So (N - W) + (N - R) < F, and optimally (N-W)+(N-R)=F-1. Thus R=2N+1-F-W.
// So with N = # of tlogs, W = antiquorum, R = required count, F = replication factor,
// W + (N - R) < F, and optimally (N-W)+(N-R)=F-1. Thus R=N+1-F+W.
state int requiredCount = (int)prevState.tLogs.size()+1 - prevState.tLogReplicationFactor + prevState.tLogWriteAntiQuorum;
ASSERT( requiredCount > 0 && requiredCount <= prevState.tLogs.size() );
ASSERT( prevState.tLogReplicationFactor >= 1 && prevState.tLogReplicationFactor <= prevState.tLogs.size() );
@ -579,8 +624,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Future<Void> rejoins = trackRejoins( dbgid, allLogServers, rejoinRequests );
for(int t=0; t<logServers.size(); t++)
tLogReply.push_back( lockTLog( dbgid, logServers[t]) );
state bool buggify_lock_minimal_tlogs = BUGGIFY;
if (!buggify_lock_minimal_tlogs) {
for(int t=0; t<logServers.size(); t++) {
tLogReply[t] = lockTLog( dbgid, logServers[t]);
state Optional<Version> last_end;
@ -588,6 +637,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state int cycles = 0;
loop {
if (buggify_lock_minimal_tlogs) {
lockMinimalTLogSet( dbgid, prevState, logServers, logFailed, &tLogReply );
std::vector<LocalityData> availableItems, badCombo;
std::vector<TLogLockResult> results;
std::string sServerState;
@ -596,7 +648,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cycles ++;
for(int t=0; t<logServers.size(); t++) {
if (tLogReply[t].isReady() && !tLogReply[t].isError() && !logFailed[t]->get()) {
if (tLogReply[t].isValid() && tLogReply[t].isReady() && !tLogReply[t].isError() && !logFailed[t]->get()) {
sServerState += 'a';
@ -621,7 +673,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
(!validateAllCombinations(badCombo, unResponsiveSet, prevState.tLogPolicy, availableItems, prevState.tLogWriteAntiQuorum, false)))
TraceEvent("EpochEndBadCombo", dbgid).detail("Cycles", cycles)
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
@ -659,7 +710,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("LogSystemRecovery", dbgid).detail("Cycles", cycles)
.detail("TotalServers", logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
@ -702,7 +752,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
else {
TraceEvent("LogSystemUnchangedRecovery", dbgid).detail("Cycles", cycles)
.detail("TotalServers", logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
@ -725,9 +774,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
else {
TraceEvent("LogSystemWaitingForRecovery", dbgid).detail("Cycles", cycles)
.detail("AvailableServers", results.size())
.detail("RequiredServers", requiredCount)
.detail("TotalServers", logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
@ -743,11 +790,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// Wait for anything relevant to change
std::vector<Future<Void>> changes;
for(int i=0; i<logServers.size(); i++) {
if (!tLogReply[i].isReady())
if (tLogReply[i].isValid() && !tLogReply[i].isReady()) {
changes.push_back( ready(tLogReply[i]) );
else {
changes.push_back( logServers[i]->onChange() );
} else {
changes.push_back( logFailed[i]->onChange() );
changes.push_back( logServers[i]->onChange() );
@ -792,6 +839,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.recoverAt = oldLogSystem->epochEndVersion.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
TraceEvent("TLogInitializeRequest").detail("address", workers[i].tLog.getEndpoint().address);
logSystem->tLogLocalities.resize( workers.size() );
@ -874,6 +922,174 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
static void lockMinimalTLogSet(const UID& dbgid, const DBCoreState& prevState,
const std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>>& logServers,
const std::vector<Reference<AsyncVar<bool>>>& logFailed,
vector<Future<TLogLockResult>>* tLogReply ) {
// Invariant: tLogReply[i] must correspond to the tlog stored as logServers[i].
ASSERT(tLogReply->size() == prevState.tLogLocalities.size());
ASSERT(logFailed.size() == tLogReply->size());
// For any given index, only one of the following will be true.
auto locking_completed = [&logFailed, tLogReply](int index) {
const auto& entry = tLogReply->at(index);
return entry.isValid() && entry.isReady() && !entry.isError();
auto locking_failed = [&logFailed, tLogReply](int index) {
const auto& entry = tLogReply->at(index);
return (logFailed[index]->get() && !(entry.isValid() && entry.isReady() && !entry.isError())) ||
(entry.isValid() && entry.isReady() && entry.isError());
auto locking_pending = [&logFailed, tLogReply](int index) {
const auto& entry = tLogReply->at(index);
return !logFailed[index]->get() && (entry.isValid() && !entry.isReady());
auto locking_skipped = [&logFailed, tLogReply](int index) {
const auto& entry = tLogReply->at(index);
return !logFailed[index]->get() && !entry.isValid();
auto can_obtain_quorum = [&prevState](std::function<bool(int)> filter) {
LocalityGroup filter_true;
std::vector<LocalityData> filter_false, unused;
for (int i = 0; i < prevState.tLogLocalities.size() ; i++) {
if (filter(i)) {
} else {
bool valid = filter_true.validate(prevState.tLogPolicy);
if (!valid && prevState.tLogWriteAntiQuorum > 0) {
valid = !validateAllCombinations(unused, filter_true, prevState.tLogPolicy, filter_false, prevState.tLogWriteAntiQuorum, false);
return valid;
// Step 1: Check that there aren't already enough locked TLogs.
// This isn't needed for correctness, but prevents us from doing a lot of unnecessary work below.
// TODO: Once toolchain supports C++17, use std::not_fn().
auto not_locking_completed = [&locking_completed](int index) { return !locking_completed(index); };
if (!can_obtain_quorum(not_locking_completed)) {
TraceEvent(SevInfo, "MasterRecoveryTLogLockingAlreadySucceeded", dbgid);
// Step 2: Verify that if all the failed TLogs come back, they can't form a quorum.
if (can_obtain_quorum(locking_failed)) {
TraceEvent(SevInfo, "MasterRecoveryTLogLockingImpossible", dbgid);
// Step 3: It's possible for us to succeed, but we need to lock additional logs.
// First, we need an accurate picture of what TLogs we're capable of locking. We can't tell the
// difference between a temporarily failed TLog and a permanently failed TLog. Thus, we assume
// all failures are permanent, and manually re-issue lock requests if they rejoin.
for (int i = 0; i < logFailed.size(); i++) {
const auto& r = tLogReply->at(i);
TEST(locking_failed(i) && (r.isValid() && !r.isReady())); // A TLog failed with a pending request.
// The reboot_a_tlog BUGGIFY below should cause the above case to be hit.
if (locking_failed(i)) {
tLogReply->at(i) = Future<TLogLockResult>();
// We're trying to paritition the set of old tlogs into two sets, L and R, such that:
// (1). R does not validate the policy
// (2). |R| is as large as possible
// (3). L contains all the already-locked TLogs
// and then we only issue lock requests to TLogs in L. This is safe, as R does not have quorum,
// so no commits may occur. It does not matter if L forms a quorum or not.
// We form these sets by starting with L as all machines and R as the empty set, and moving a
// random machine from L to R until (1) or (2) no longer holds as true. Code-wise, L is
// [0..end-can_omit), and R is [end-can_omit..end), and we move a random machine via randomizing
// the order of the tlogs. Choosing a random machine was verified to generate a good-enough
// result to be interesting intests sufficiently frequently that we don't need to try to
// calculate the exact optimal solution.
std::vector<std::pair<LocalityData, int>> tlogs;
for (int i = 0; i < prevState.tLogLocalities.size(); i++) {
tlogs.emplace_back(prevState.tLogLocalities[i], i);
// Rearrange the array such that things that the left is logs closer to being locked, and
// the right is logs that can't be locked. This makes us prefer locking already-locked TLogs,
// which is how we respect the decisions made in the previous execution.
auto idx_to_order = [&locking_completed, &locking_failed, &locking_pending, &locking_skipped](int index) {
if (locking_completed(index)) return 0;
if (locking_pending(index)) return 1;
if (locking_skipped(index)) return 2;
if (locking_failed(index)) return 3;
ASSERT(false); // Programmer error.
return -1;
std::sort(tlogs.begin(), tlogs.end(),
// TODO: Change long type to `auto` once toolchain supports C++17.
[&idx_to_order](const std::pair<LocalityData, int>& lhs, const std::pair<LocalityData, int>& rhs) {
return idx_to_order(lhs.second) < idx_to_order(rhs.second);
// Indexes that aren't in the vector are the ones we're considering omitting. Remove indexes until
// the removed set forms a quorum.
int can_omit = 0;
std::vector<int> to_lock_indexes;
for (auto it = tlogs.cbegin() ; it != tlogs.cend() - 1 ; it++ ) {
auto filter = [&to_lock_indexes](int index) {
return std::find(to_lock_indexes.cbegin(), to_lock_indexes.cend(), index) == to_lock_indexes.cend();
while(true) {
if (can_obtain_quorum(filter)) {
} else {
ASSERT(can_omit < tlogs.size());
if (prevState.tLogReplicationFactor - prevState.tLogWriteAntiQuorum == 1) {
ASSERT(can_omit == 0);
// Our previous check of making sure there aren't too many failed logs should have prevented this.
// If we've managed to leave more tlogs unlocked than (RF-AQ), it means we've hit the case
// where the policy engine has allowed us to have multiple logs in the same failure domain
// with independant sets of data. This case will validated that no code is relying on the old
// quorum=(RF-AQ) logic, and now goes through the policy engine instead.
TEST(can_omit >= prevState.tLogReplicationFactor - prevState.tLogWriteAntiQuorum); // Locking a subset of the TLogs while ending an epoch.
const bool reboot_a_tlog = g_simulator.enableConnectionFailures && BUGGIFY && g_random->random01() < 0.25;
TraceEvent(SevInfo, "MasterRecoveryTLogLocking", dbgid)
.detail("locks", tlogs.size() - can_omit)
.detail("skipped", can_omit)
.detail("replication", prevState.tLogReplicationFactor)
.detail("antiquorum", prevState.tLogWriteAntiQuorum)
.detail("reboot_buggify", reboot_a_tlog);
for (int i = 0; i < tlogs.size() - can_omit; i++) {
const int index = tlogs[i].second;
Future<TLogLockResult>& entry = tLogReply->at(index);
if (!entry.isValid()) {
entry = lockTLog( dbgid, logServers[index] );
if (reboot_a_tlog) {
for (int i = 0; i < tlogs.size() - can_omit; i++) {
const int index = tlogs[i].second;
if (logServers[index]->get().present()) {
// Intentionally leave `tlogs.size() - can_omit` .. `tlogs.size()` as !isValid() Futures.
template <class T>
static vector<T> getReadyNonError( vector<Future<T>> const& futures ) {
// Return the values of those futures which have (non-error) values ready