Merge pull request #222 from cie/alexmiller/drtimefix2
Fix yet another VersionStamp DR issue.
This commit is contained in:
commit
982f0dcb1e
|
@ -87,7 +87,7 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) {
|
|||
}
|
||||
|
||||
struct CommitTransactionRef {
|
||||
CommitTransactionRef() {}
|
||||
CommitTransactionRef() : read_snapshot(0) {}
|
||||
CommitTransactionRef(Arena &a, const CommitTransactionRef &from)
|
||||
: read_conflict_ranges(a, from.read_conflict_ranges),
|
||||
write_conflict_ranges(a, from.write_conflict_ranges),
|
||||
|
|
|
@ -1493,33 +1493,31 @@ public:
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state UID logUid;
|
||||
state Key logUid;
|
||||
state Value backupUid;
|
||||
|
||||
loop {
|
||||
try {
|
||||
UID _logUid = wait(backupAgent->getLogUid(tr, tagName));
|
||||
logUid = _logUid;
|
||||
logUid = BinaryWriter::toValue(_logUid, Unversioned());
|
||||
|
||||
int status = wait(backupAgent->getStateValue(tr, logUid));
|
||||
int status = wait(backupAgent->getStateValue(tr, _logUid));
|
||||
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
|
||||
throw backup_unneeded();
|
||||
}
|
||||
|
||||
Optional<Value> _backupUid = wait(tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId)));
|
||||
Optional<Value> _backupUid = wait(tr->get(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyFolderId)));
|
||||
backupUid = _backupUid.get();
|
||||
|
||||
// Clearing the folder id will prevent future tasks from executing
|
||||
tr->clear(backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).range());
|
||||
tr->clear(backupAgent->config.get(logUid).range());
|
||||
|
||||
// Clearing the end version of apply mutation cancels ongoing apply work
|
||||
const auto& log_uid = BinaryWriter::toValue(logUid, Unversioned());
|
||||
tr->clear(log_uid.withPrefix(applyMutationsEndRange.begin));
|
||||
tr->clear(logUid.withPrefix(applyMutationsEndRange.begin));
|
||||
|
||||
Key logsPath = uidPrefixKey(applyLogKeys.begin, logUid);
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
tr->clear(prefixRange(logUid.withPrefix(applyLogKeys.begin)));
|
||||
|
||||
tr->set(StringRef(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED)));
|
||||
tr->set(StringRef(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED)));
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
TraceEvent("DBA_Abort").detail("commitVersion", tr->getCommittedVersion());
|
||||
|
@ -1534,10 +1532,17 @@ public:
|
|||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
// dumpData's commits are unstoppable, and we need to make sure that no dumpData commits
|
||||
// happen after this transaction, as it would mean that the applyMutationsBeginRange read we
|
||||
// do isn't the final value, and thus a greater version of commits could have been applied.
|
||||
// Thus, we need to commit it against the same proxy that all dumpData transactions were
|
||||
// submitted to. The transaction above will stop any further dumpData calls from adding
|
||||
// transactions to the proxy's commit promise stream, so our commit will come after all
|
||||
// dumpData transactions.
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
try {
|
||||
const auto& log_uid = BinaryWriter::toValue(logUid, Unversioned());
|
||||
// Ensure that we're at a version higher than the data that we've written.
|
||||
Optional<Value> lastApplied = wait(tr->get(log_uid.withPrefix(applyMutationsBeginRange.begin)));
|
||||
Optional<Value> lastApplied = wait(tr->get(logUid.withPrefix(applyMutationsBeginRange.begin)));
|
||||
if (lastApplied.present()) {
|
||||
Version current = tr->getReadVersion().get();
|
||||
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
|
||||
|
@ -1563,19 +1568,16 @@ public:
|
|||
try {
|
||||
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(logUid).pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned()))
|
||||
break;
|
||||
|
||||
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) ));
|
||||
srcTr->set( backupAgent->sourceStates.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId), backupUid );
|
||||
srcTr->set( backupAgent->sourceStates.get(logUid).pack(DatabaseBackupAgent::keyFolderId), backupUid );
|
||||
|
||||
Key logsPath = uidPrefixKey(backupLogKeys.begin, logUid);
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, logUid);
|
||||
|
||||
srcTr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
srcTr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
srcTr->clear(prefixRange(logUid.withPrefix(backupLogKeys.begin)));
|
||||
srcTr->clear(prefixRange(logUid.withPrefix(logRangesRange.begin)));
|
||||
|
||||
Void _ = wait(srcTr->commit());
|
||||
break;
|
||||
|
@ -1591,12 +1593,12 @@ public:
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
Optional<Value> v = wait(tr->get(StringRef(backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId))));
|
||||
Optional<Value> v = wait(tr->get(StringRef(backupAgent->config.get(logUid).pack(DatabaseBackupAgent::keyFolderId))));
|
||||
if(v.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
tr->set(StringRef(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_ABORTED)));
|
||||
tr->set(StringRef(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_ABORTED)));
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
|
||||
|
|
|
@ -244,6 +244,13 @@ inline KeyRangeRef singleKeyRange( KeyRef const& key, Arena& arena ) {
|
|||
t[key.size()] = 0;
|
||||
return KeyRangeRef( KeyRef(t,key.size()), KeyRef(t, key.size()+1) );
|
||||
}
|
||||
inline KeyRange prefixRange( KeyRef prefix ) {
|
||||
Standalone<KeyRangeRef> range;
|
||||
KeyRef start = KeyRef(range.arena(), prefix);
|
||||
KeyRef end = strinc(prefix, range.arena());
|
||||
range.contents() = KeyRangeRef(start, end);
|
||||
return range;
|
||||
}
|
||||
inline KeyRef keyBetween( const KeyRangeRef& keys ) {
|
||||
// Returns (one of) the shortest key(s) either contained in keys or equal to keys.end,
|
||||
// assuming its length is no more than CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT. If the length of
|
||||
|
|
|
@ -2304,13 +2304,12 @@ void Transaction::setupWatches() {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, bool causalWriteRisky ) {
|
||||
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
|
||||
state TraceInterval interval( "TransactionCommit" );
|
||||
state double startTime;
|
||||
if (info.debugID.present())
|
||||
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
|
||||
|
||||
req.transaction.read_snapshot = 0;
|
||||
try {
|
||||
Version v = wait( readVersion );
|
||||
req.transaction.read_snapshot = v;
|
||||
|
@ -2324,7 +2323,13 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
}
|
||||
|
||||
req.debugID = commitID;
|
||||
state Future<CommitID> reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
|
||||
state Future<CommitID> reply;
|
||||
if (options.commitOnFirstProxy) {
|
||||
const std::vector<MasterProxyInterface>& proxies = cx->clientInfo->get().proxies;
|
||||
reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never();
|
||||
} else {
|
||||
reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
|
||||
}
|
||||
|
||||
choose {
|
||||
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {
|
||||
|
@ -2369,7 +2374,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
|
||||
// We don't know if the commit happened, and it might even still be in flight.
|
||||
|
||||
if (!causalWriteRisky) {
|
||||
if (!options.causalWriteRisky) {
|
||||
// Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the version we submitted with is dead, or by committing a conflicting transaction successfully
|
||||
//if ( cx->getMasterProxies()->masterGeneration <= originalMasterGeneration )
|
||||
|
||||
|
@ -2457,7 +2462,7 @@ Future<Void> Transaction::commitMutations() {
|
|||
|
||||
tr.isLockAware = options.lockAware;
|
||||
|
||||
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options.causalWriteRisky );
|
||||
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
|
||||
|
||||
if (isCheckingWrites) {
|
||||
Promise<Void> committed;
|
||||
|
@ -2540,6 +2545,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
options.causalWriteRisky = true;
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY:
|
||||
validateOptionValue(value, false);
|
||||
options.commitOnFirstProxy = true;
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::CHECK_WRITES_ENABLE:
|
||||
validateOptionValue(value, false);
|
||||
options.checkWritesEnabled = true;
|
||||
|
@ -2552,7 +2562,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
|
||||
case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
|
||||
validateOptionValue(value, true);
|
||||
|
||||
|
||||
if(value.get().size() > 100) {
|
||||
throw invalid_option_value();
|
||||
}
|
||||
|
|
|
@ -142,17 +142,24 @@ struct StorageMetrics;
|
|||
|
||||
struct TransactionOptions {
|
||||
double maxBackoff;
|
||||
uint32_t getReadVersionFlags : 32;
|
||||
uint32_t customTransactionSizeLimit : 32;
|
||||
uint32_t getReadVersionFlags;
|
||||
uint32_t customTransactionSizeLimit;
|
||||
bool checkWritesEnabled : 1;
|
||||
bool causalWriteRisky : 1;
|
||||
bool commitOnFirstProxy : 1;
|
||||
bool debugDump : 1;
|
||||
bool lockAware : 1;
|
||||
bool readOnly : 1;
|
||||
|
||||
TransactionOptions() { reset(); }
|
||||
void reset() {
|
||||
memset(this, 0, sizeof(*this));
|
||||
|
||||
TransactionOptions() {
|
||||
reset();
|
||||
if (BUGGIFY) {
|
||||
commitOnFirstProxy = true;
|
||||
}
|
||||
}
|
||||
|
||||
void reset() {
|
||||
memset(this, 0, sizeof(*this));
|
||||
maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF;
|
||||
}
|
||||
};
|
||||
|
@ -319,4 +326,4 @@ std::string unprintable( const std::string& );
|
|||
|
||||
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::numeric_limits<int64_t>::min(), int64_t maxValue = std::numeric_limits<int64_t>::max() );
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -131,6 +131,8 @@ description is not currently required but encouraged.
|
|||
<Option name="causal_read_disable" code="21" />
|
||||
<Option name="next_write_no_write_conflict_range" code="30"
|
||||
description="The next write performed on this transaction will not generate a write conflict range. As a result, other transactions which read the key(s) being modified by the next write will not conflict with this transaction. Care needs to be taken when using this option on a transaction that is shared between multiple threads. When setting this option, write conflict ranges will be disabled on the next write operation, regardless of what thread it is on." />
|
||||
<Option name="commit_on_first_proxy" code="40"
|
||||
description="Committing this transaction will bypass the normal load balancing across proxies and go directly to the specifically nominated 'first proxy'." />
|
||||
<Option name="check_writes_enable" code="50" />
|
||||
<Option name="read_your_writes_disable" code="51"
|
||||
description="Reads performed by a transaction will not see any prior mutations that occured in that transaction, instead seeing the value which was in the database at the transaction's read version. This option may provide a small performance benefit for the client, but also disables a number of client-side optimizations which are beneficial for transactions which tend to read and write the same keys within a single transaction."/>
|
||||
|
|
|
@ -361,13 +361,6 @@ namespace oldTLog {
|
|||
return Void();
|
||||
}
|
||||
|
||||
KeyRange prefixRange( KeyRef prefix ) {
|
||||
Key end = prefix;
|
||||
UNSTOPPABLE_ASSERT( end.size() && end.end()[-1] != 0xFF );
|
||||
++const_cast<uint8_t&>( end.end()[-1] );
|
||||
return KeyRangeRef( prefix, end );
|
||||
}
|
||||
|
||||
////// Persistence format (for self->persistentData)
|
||||
|
||||
// Immutable keys
|
||||
|
|
|
@ -200,11 +200,6 @@ struct CompareFirst {
|
|||
}
|
||||
};
|
||||
|
||||
KeyRange prefixRange( KeyRef prefix ) {
|
||||
Key end = strinc(prefix);
|
||||
return KeyRangeRef( prefix, end );
|
||||
}
|
||||
|
||||
////// Persistence format (for self->persistentData)
|
||||
|
||||
// Immutable keys
|
||||
|
|
|
@ -1675,6 +1675,13 @@ int main(int argc, char* argv[]) {
|
|||
for(auto i = processes.begin(); i != processes.end(); ++i)
|
||||
printf("%s %s: %0.3f Mclocks\n", (*i)->name, (*i)->address.toString().c_str(), (*i)->cpuTicks / 1e6);
|
||||
}
|
||||
if (role == Simulation) {
|
||||
unsigned long sevErrorEventsLogged = TraceEvent::CountEventsLoggedAt(SevError);
|
||||
if (sevErrorEventsLogged > 0) {
|
||||
printf("%lu SevError events logged\n", sevErrorEventsLogged);
|
||||
rc = FDB_EXIT_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
//g_simulator.run();
|
||||
|
||||
|
|
|
@ -216,6 +216,7 @@ ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromCo
|
|||
}
|
||||
|
||||
vector<MasterProxyInterface> newRecruits = wait( getAll( initializationReplies ) );
|
||||
// It is required for the correctness of COMMIT_ON_FIRST_PROXY that self->proxies[0] is the firstProxy.
|
||||
self->proxies = newRecruits;
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -23,13 +23,6 @@
|
|||
#include "fdbrpc/IAsyncFile.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
static KeyRange prefixRange( KeyRef prefix ) {
|
||||
Key end = prefix;
|
||||
UNSTOPPABLE_ASSERT( end.size() && end.end()[-1] != 0xFF );
|
||||
++const_cast<uint8_t&>( end.end()[-1] );
|
||||
return KeyRangeRef( prefix, end );
|
||||
}
|
||||
|
||||
struct DiskDurabilityTest : TestWorkload {
|
||||
bool enabled;
|
||||
std::string filename;
|
||||
|
|
|
@ -695,8 +695,10 @@ TraceEvent& TraceEvent::error(class Error const& error, bool includeCancelled) {
|
|||
// Suppress the entire message
|
||||
enabled = false;
|
||||
} else {
|
||||
if (error.isInjectedFault())
|
||||
if (error.isInjectedFault()) {
|
||||
detail("ErrorIsInjectedFault", true);
|
||||
if (severity == SevError) severity = SevWarnAlways;
|
||||
}
|
||||
detail("Error", error.name());
|
||||
detail("ErrorDescription", error.what());
|
||||
detail("ErrorCode", error.code());
|
||||
|
@ -856,6 +858,14 @@ TraceEvent& TraceEvent::GetLastError() {
|
|||
#endif
|
||||
}
|
||||
|
||||
// We're cheating in counting, as in practice, we only use {10,20,30,40}.
|
||||
static_assert(SevMaxUsed / 10 + 1 == 5, "Please bump eventCounts[5] to SevMaxUsed/10+1");
|
||||
unsigned long TraceEvent::eventCounts[5] = {0,0,0,0,0};
|
||||
|
||||
unsigned long TraceEvent::CountEventsLoggedAt(Severity sev) {
|
||||
return TraceEvent::eventCounts[sev/10];
|
||||
}
|
||||
|
||||
TraceEvent& TraceEvent::backtrace(std::string prefix) {
|
||||
if (this->severity == SevError) return *this; // We'll backtrace this later in ~TraceEvent
|
||||
return detail((prefix + "Backtrace").c_str(), platform::get_backtrace());
|
||||
|
@ -898,6 +908,7 @@ TraceEvent::~TraceEvent() {
|
|||
if (g_traceLog.isOpen()) {
|
||||
writef("/>\r\n");
|
||||
g_traceLog.write( buffer, length );
|
||||
TraceEvent::eventCounts[severity/10]++;
|
||||
|
||||
// Log Metrics
|
||||
if(g_traceLog.logTraceEventMetrics && *type != '\0' && isNetworkThread()) {
|
||||
|
|
|
@ -48,6 +48,7 @@ enum Severity {
|
|||
SevWarn=20,
|
||||
SevWarnAlways=30,
|
||||
SevError=40,
|
||||
SevMaxUsed=SevError,
|
||||
SevMax=1000000
|
||||
};
|
||||
|
||||
|
@ -142,6 +143,9 @@ public:
|
|||
|
||||
~TraceEvent(); // Actually logs the event
|
||||
|
||||
// Return the number of invocations of TraceEvent() at the specified logging level.
|
||||
static unsigned long CountEventsLoggedAt(Severity);
|
||||
|
||||
DynamicEventMetric *tmpEventMetric; // This just just a place to store fields
|
||||
|
||||
private:
|
||||
|
@ -153,6 +157,7 @@ private:
|
|||
const char *type;
|
||||
UID id;
|
||||
|
||||
static unsigned long eventCounts[5];
|
||||
static thread_local bool networkThread;
|
||||
|
||||
bool init( Severity, const char* type );
|
||||
|
|
|
@ -1082,12 +1082,24 @@ ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
|
|||
return t;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_broken_promise)
|
||||
throw e;
|
||||
throw;
|
||||
Void _ = wait(Never()); // never return
|
||||
throw internal_error(); // does not happen
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
|
||||
try {
|
||||
T t = wait(in);
|
||||
return t;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_broken_promise) {
|
||||
throw request_maybe_delivered();
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
|
||||
state Promise<T> out( std::move(*pOutputPromise) );
|
||||
Void _ = wait( signal );
|
||||
|
|
Loading…
Reference in New Issue