From 97aec45ce8144b7096774ddaf5c531c1ee9dc7d1 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 1 Mar 2022 08:51:50 -0800 Subject: [PATCH] Fix formatting issues caused by clang-format --- fdbserver/workloads/WriteDuringRead.actor.cpp | 1232 +++++++++-------- 1 file changed, 620 insertions(+), 612 deletions(-) diff --git a/fdbserver/workloads/WriteDuringRead.actor.cpp b/fdbserver/workloads/WriteDuringRead.actor.cpp index d18b6d0fd9..c41bf68f4a 100644 --- a/fdbserver/workloads/WriteDuringRead.actor.cpp +++ b/fdbserver/workloads/WriteDuringRead.actor.cpp @@ -444,645 +444,653 @@ struct WriteDuringReadWorkload : TestWorkload { state int changeNum = self->changeCount[key]; state Optional memRes = self->memoryGet(&self->memoryDatabase, key); *memLimit -= memRes.expectedSize(); - choose{ when(wait(tr->watch(key))){ - if (changeNum == self->changeCount[key]){ TraceEvent(SevError, "WDRWatchWrongResult", randomID) - .detail("Reason", "Triggered without changing") - .detail("Key", printable(key)) - .detail("Value", changeNum) - .detail("DuringCommit", *doingCommit); - } - } - when(wait(self->finished.onTrigger())) { - Optional memRes2 = self->memoryGet(&self->memoryDatabase, key); - if (memRes != memRes2) { - TraceEvent(SevError, "WDRWatchWrongResult", randomID) - .detail("Reason", "Changed without triggering") - .detail("Key", printable(key)) - .detail("Value1", printable(memRes)) - .detail("Value2", printable(memRes2)); - } - } -}* memLimit += memRes.expectedSize(); -return Void(); -} -catch (Error& e) { - // check for transaction cancelled if the watch was not committed - //TraceEvent("WDRWatchError", randomID).error(e,true); - if (e.code() == error_code_used_during_commit) { - ASSERT(*doingCommit); - return Void(); - } else if (e.code() == error_code_transaction_cancelled) - return Void(); - throw; -} -} - -ACTOR Future commitAndUpdateMemory(ReadYourWritesTransaction* tr, - WriteDuringReadWorkload* self, - bool* cancelled, - bool readYourWritesDisabled, - bool snapshotRYWDisabled, - bool readAheadDisabled, - bool useBatchPriority, - bool* doingCommit, - double* startTime, - Key timebombStr) { - // state UID randomID = nondeterministicRandom()->randomUniqueID(); - //TraceEvent("WDRCommit", randomID); - try { - if (!readYourWritesDisabled && !*cancelled) { - KeyRangeMap transactionConflicts; - tr->getWriteConflicts(&transactionConflicts); - - auto transactionRanges = transactionConflicts.ranges(); - auto addedRanges = self->addedConflicts.ranges(); - auto transactionIter = transactionRanges.begin(); - auto addedIter = addedRanges.begin(); - - bool failed = false; - while (transactionIter != transactionRanges.end() && addedIter != addedRanges.end()) { - if (transactionIter->begin() != addedIter->begin() || transactionIter->value() != addedIter->value()) { - TraceEvent(SevError, "WriteConflictError") - .detail("TransactionKey", printable(transactionIter->begin())) - .detail("AddedKey", printable(addedIter->begin())) - .detail("TransactionVal", transactionIter->value()) - .detail("AddedVal", addedIter->value()); - failed = true; - } - ++transactionIter; - ++addedIter; - } - - if (transactionIter != transactionRanges.end() || addedIter != addedRanges.end()) { - failed = true; - } - - if (failed) { - TraceEvent(SevError, "WriteConflictRangeError").log(); - for (transactionIter = transactionRanges.begin(); transactionIter != transactionRanges.end(); - ++transactionIter) { - TraceEvent("WCRTransaction") - .detail("Range", printable(transactionIter.range())) - .detail("Value", transactionIter.value()); - } - for (addedIter = addedRanges.begin(); addedIter != addedRanges.end(); ++addedIter) { - TraceEvent("WCRAdded") - .detail("Range", printable(addedIter.range())) - .detail("Value", addedIter.value()); - } - } - } - - state std::map committedDB = self->memoryDatabase; - *doingCommit = true; - wait(tr->commit()); - *doingCommit = false; - self->finished.trigger(); - - if (readYourWritesDisabled) - tr->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); - if (snapshotRYWDisabled) - tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); - if (readAheadDisabled) - tr->setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); - if (useBatchPriority) - tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); - if (self->useSystemKeys) - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->addWriteConflictRange(self->conflictRange); - self->addedConflicts.insert(allKeys, false); - self->addedConflicts.insert(self->conflictRange, true); - *startTime = now(); - tr->setOption(FDBTransactionOptions::TIMEOUT, timebombStr); - - //TraceEvent("WDRCommitSuccess", randomID).detail("CommittedVersion", tr->getCommittedVersion()); - self->lastCommittedDatabase = committedDB; - - return Void(); - } catch (Error& e) { - //TraceEvent("WDRCommitCancelled", randomID).error(e,true); - if (e.code() == error_code_actor_cancelled || e.code() == error_code_transaction_cancelled || - e.code() == error_code_used_during_commit) - *cancelled = true; - if (e.code() == error_code_actor_cancelled || e.code() == error_code_transaction_cancelled) - throw commit_unknown_result(); - if (e.code() == error_code_transaction_too_old) - throw not_committed(); - throw; - } -} - -Value getRandomValue() { - return Value(std::string(deterministicRandom()->randomInt(valueSizeRange.first, valueSizeRange.second + 1), 'x')); -} - -ACTOR Future loadAndRun(Database cx, WriteDuringReadWorkload* self) { - state double startTime = now(); - loop { - state int i = 0; - state int keysPerBatch = - std::min(1000, - 1 + CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT / 6 / - (self->getKeyForIndex(self->nodes).size() + self->valueSizeRange.second)); - self->memoryDatabase = std::map(); - for (; i < self->nodes; i += keysPerBatch) { - state Transaction tr(cx); - loop { - if (now() - startTime > self->testDuration) - return Void(); - try { - if (i == 0) { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.addWriteConflictRange( - allKeys); // To prevent a write only transaction whose commit was previously cancelled from - // being reordered after this transaction - tr.clear(normalKeys); - } - if (self->useSystemKeys) - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - - int end = std::min(self->nodes, i + keysPerBatch); - tr.clear(KeyRangeRef(self->getKeyForIndex(i), self->getKeyForIndex(end))); - self->memoryDatabase.erase(self->memoryDatabase.lower_bound(self->getKeyForIndex(i)), - self->memoryDatabase.lower_bound(self->getKeyForIndex(end))); - - for (int j = i; j < end; j++) { - if (deterministicRandom()->random01() < self->initialKeyDensity) { - Key key = self->getKeyForIndex(j); - if (key.size() <= (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT)) { - Value value = self->getRandomValue(); - value = value.substr(0, std::min(value.size(), CLIENT_KNOBS->VALUE_SIZE_LIMIT)); - self->memoryDatabase[key] = value; - tr.set(key, value); - } + // This if block prevents formatting issues with clang-format + if (1) { + choose { + when(wait(tr->watch(key))) { + if (changeNum == self->changeCount[key]) { + TraceEvent(SevError, "WDRWatchWrongResult", randomID) + .detail("Reason", "Triggered without changing") + .detail("Key", printable(key)) + .detail("Value", changeNum) + .detail("DuringCommit", *doingCommit); + } + } + when(wait(self->finished.onTrigger())) { + Optional memRes2 = self->memoryGet(&self->memoryDatabase, key); + if (memRes != memRes2) { + TraceEvent(SevError, "WDRWatchWrongResult", randomID) + .detail("Reason", "Changed without triggering") + .detail("Key", printable(key)) + .detail("Value1", printable(memRes)) + .detail("Value2", printable(memRes2)); } } - wait(tr.commit()); - //TraceEvent("WDRInitBatch").detail("I", i).detail("CommittedVersion", tr.getCommittedVersion()); - break; - } catch (Error& e) { - wait(tr.onError(e)); } } + *memLimit += memRes.expectedSize(); + + return Void(); + } catch (Error& e) { + // check for transaction cancelled if the watch was not committed + //TraceEvent("WDRWatchError", randomID).error(e,true); + if (e.code() == error_code_used_during_commit) { + ASSERT(*doingCommit); + return Void(); + } else if (e.code() == error_code_transaction_cancelled) + return Void(); + throw; } - self->lastCommittedDatabase = self->memoryDatabase; - self->addedConflicts.insert(allKeys, false); - //TraceEvent("WDRInit"); + } + + ACTOR Future commitAndUpdateMemory(ReadYourWritesTransaction* tr, + WriteDuringReadWorkload* self, + bool* cancelled, + bool readYourWritesDisabled, + bool snapshotRYWDisabled, + bool readAheadDisabled, + bool useBatchPriority, + bool* doingCommit, + double* startTime, + Key timebombStr) { + // state UID randomID = nondeterministicRandom()->randomUniqueID(); + //TraceEvent("WDRCommit", randomID); + try { + if (!readYourWritesDisabled && !*cancelled) { + KeyRangeMap transactionConflicts; + tr->getWriteConflicts(&transactionConflicts); + + auto transactionRanges = transactionConflicts.ranges(); + auto addedRanges = self->addedConflicts.ranges(); + auto transactionIter = transactionRanges.begin(); + auto addedIter = addedRanges.begin(); + + bool failed = false; + while (transactionIter != transactionRanges.end() && addedIter != addedRanges.end()) { + if (transactionIter->begin() != addedIter->begin() || + transactionIter->value() != addedIter->value()) { + TraceEvent(SevError, "WriteConflictError") + .detail("TransactionKey", printable(transactionIter->begin())) + .detail("AddedKey", printable(addedIter->begin())) + .detail("TransactionVal", transactionIter->value()) + .detail("AddedVal", addedIter->value()); + failed = true; + } + ++transactionIter; + ++addedIter; + } + + if (transactionIter != transactionRanges.end() || addedIter != addedRanges.end()) { + failed = true; + } + + if (failed) { + TraceEvent(SevError, "WriteConflictRangeError").log(); + for (transactionIter = transactionRanges.begin(); transactionIter != transactionRanges.end(); + ++transactionIter) { + TraceEvent("WCRTransaction") + .detail("Range", printable(transactionIter.range())) + .detail("Value", transactionIter.value()); + } + for (addedIter = addedRanges.begin(); addedIter != addedRanges.end(); ++addedIter) { + TraceEvent("WCRAdded") + .detail("Range", printable(addedIter.range())) + .detail("Value", addedIter.value()); + } + } + } + + state std::map committedDB = self->memoryDatabase; + *doingCommit = true; + wait(tr->commit()); + *doingCommit = false; + self->finished.trigger(); + + if (readYourWritesDisabled) + tr->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); + if (snapshotRYWDisabled) + tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); + if (readAheadDisabled) + tr->setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); + if (useBatchPriority) + tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); + if (self->useSystemKeys) + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->addWriteConflictRange(self->conflictRange); + self->addedConflicts.insert(allKeys, false); + self->addedConflicts.insert(self->conflictRange, true); + *startTime = now(); + tr->setOption(FDBTransactionOptions::TIMEOUT, timebombStr); + + //TraceEvent("WDRCommitSuccess", randomID).detail("CommittedVersion", tr->getCommittedVersion()); + self->lastCommittedDatabase = committedDB; + + return Void(); + } catch (Error& e) { + //TraceEvent("WDRCommitCancelled", randomID).error(e,true); + if (e.code() == error_code_actor_cancelled || e.code() == error_code_transaction_cancelled || + e.code() == error_code_used_during_commit) + *cancelled = true; + if (e.code() == error_code_actor_cancelled || e.code() == error_code_transaction_cancelled) + throw commit_unknown_result(); + if (e.code() == error_code_transaction_too_old) + throw not_committed(); + throw; + } + } + + Value getRandomValue() { + return Value( + std::string(deterministicRandom()->randomInt(valueSizeRange.first, valueSizeRange.second + 1), 'x')); + } + + ACTOR Future loadAndRun(Database cx, WriteDuringReadWorkload* self) { + state double startTime = now(); + loop { + state int i = 0; + state int keysPerBatch = + std::min(1000, + 1 + CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT / 6 / + (self->getKeyForIndex(self->nodes).size() + self->valueSizeRange.second)); + self->memoryDatabase = std::map(); + for (; i < self->nodes; i += keysPerBatch) { + state Transaction tr(cx); + loop { + if (now() - startTime > self->testDuration) + return Void(); + try { + if (i == 0) { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.addWriteConflictRange( + allKeys); // To prevent a write only transaction whose commit was previously cancelled + // from being reordered after this transaction + tr.clear(normalKeys); + } + if (self->useSystemKeys) + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + int end = std::min(self->nodes, i + keysPerBatch); + tr.clear(KeyRangeRef(self->getKeyForIndex(i), self->getKeyForIndex(end))); + self->memoryDatabase.erase(self->memoryDatabase.lower_bound(self->getKeyForIndex(i)), + self->memoryDatabase.lower_bound(self->getKeyForIndex(end))); + + for (int j = i; j < end; j++) { + if (deterministicRandom()->random01() < self->initialKeyDensity) { + Key key = self->getKeyForIndex(j); + if (key.size() <= (key.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT)) { + Value value = self->getRandomValue(); + value = + value.substr(0, std::min(value.size(), CLIENT_KNOBS->VALUE_SIZE_LIMIT)); + self->memoryDatabase[key] = value; + tr.set(key, value); + } + } + } + wait(tr.commit()); + //TraceEvent("WDRInitBatch").detail("I", i).detail("CommittedVersion", tr.getCommittedVersion()); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + self->lastCommittedDatabase = self->memoryDatabase; + self->addedConflicts.insert(allKeys, false); + //TraceEvent("WDRInit"); + + loop { + wait(delay(now() - startTime > self->slowModeStart || + (g_network->isSimulated() && g_simulator.speedUpSimulation) + ? 1.0 + : 0.1)); + try { + wait(self->randomTransaction( + (self->useExtraDB && deterministicRandom()->random01() < 0.5) ? self->extraDB : cx, + self, + startTime)); + } catch (Error& e) { + if (e.code() != error_code_not_committed) + throw; + break; + } + if (now() - startTime > self->testDuration) + return Void(); + } + } + } + + Key getRandomKey() { return getKeyForIndex(deterministicRandom()->randomInt(0, nodes)); } + + Key getKeyForIndex(int idx) { + idx += minNode; + if (adjacentKeys) { + return Key(idx ? keyPrefix + std::string(idx, '\x00') : ""); + } else { + return Key(keyPrefix + format("%010d", idx)); + } + } + + Key versionStampKeyForIndex(int idx) { + Key result = KeyRef(getKeyForIndex(idx).toString() + std::string(14, '\x00')); + int32_t pos = deterministicRandom()->randomInt(0, result.size() - 13); + pos = littleEndian32(pos); + uint8_t* data = mutateString(result); + memcpy(data + result.size() - sizeof(int32_t), &pos, sizeof(int32_t)); + return result; + } + + Key getRandomVersionStampKey() { return versionStampKeyForIndex(deterministicRandom()->randomInt(0, nodes)); } + + KeySelector getRandomKeySelector() { + int scale = 1 << deterministicRandom()->randomInt(0, 14); + return KeySelectorRef( + getRandomKey(), deterministicRandom()->random01() < 0.5, deterministicRandom()->randomInt(-scale, scale)); + } + + GetRangeLimits getRandomLimits() { + int kind = deterministicRandom()->randomInt(0, 3); + return GetRangeLimits( + (kind & 1) ? GetRangeLimits::ROW_LIMIT_UNLIMITED + : deterministicRandom()->randomInt(0, 1 << deterministicRandom()->randomInt(1, 10)), + (kind & 2) ? GetRangeLimits::BYTE_LIMIT_UNLIMITED + : deterministicRandom()->randomInt(0, 1 << deterministicRandom()->randomInt(1, 15))); + } + + KeyRange getRandomRange(int sizeLimit) { + int startLocation = deterministicRandom()->randomInt(0, nodes); + int scale = deterministicRandom()->randomInt( + 0, deterministicRandom()->randomInt(2, 5) * deterministicRandom()->randomInt(2, 5)); + int endLocation = startLocation + deterministicRandom()->randomInt( + 0, 1 + std::min(sizeLimit, std::min(nodes - startLocation, 1 << scale))); + + return KeyRangeRef(getKeyForIndex(startLocation), getKeyForIndex(endLocation)); + } + + Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { + Arena arena; + if (type == MutationRef::SetValue) + return value; + else if (type == MutationRef::AddValue) + return doLittleEndianAdd(existingValue, value, arena); + else if (type == MutationRef::AppendIfFits) + return doAppendIfFits(existingValue, value, arena); + else if (type == MutationRef::And) + return doAndV2(existingValue, value, arena); + else if (type == MutationRef::Or) + return doOr(existingValue, value, arena); + else if (type == MutationRef::Xor) + return doXor(existingValue, value, arena); + else if (type == MutationRef::Max) + return doMax(existingValue, value, arena); + else if (type == MutationRef::Min) + return doMinV2(existingValue, value, arena); + else if (type == MutationRef::ByteMin) + return doByteMin(existingValue, value, arena); + else if (type == MutationRef::ByteMax) + return doByteMax(existingValue, value, arena); + ASSERT(false); + return Value(); + } + + ACTOR Future randomTransaction(Database cx, WriteDuringReadWorkload* self, double testStartTime) { + state ReadYourWritesTransaction tr(cx); + state bool readYourWritesDisabled = deterministicRandom()->random01() < 0.5; + state bool readAheadDisabled = deterministicRandom()->random01() < 0.5; + state bool snapshotRYWDisabled = deterministicRandom()->random01() < 0.5; + state bool useBatchPriority = deterministicRandom()->random01() < 0.5; + state int64_t timebomb = + (FLOW_KNOBS->MAX_BUGGIFIED_DELAY == 0.0 && deterministicRandom()->random01() < 0.01) + ? deterministicRandom()->randomInt64(1, 6000) + : 0; // timebomb check can fail incorrectly if simulation injects delay longer than the timebomb + state std::vector> operations; + state ActorCollection commits(false); + state std::vector> watches; + state int changeNum = 1; + state bool doingCommit = false; + state int waitLocation = 0; + state double startTime = now(); + + state bool disableGetKey = BUGGIFY; + state bool disableGetRange = BUGGIFY; + state bool disableGet = BUGGIFY; + state bool disableCommit = BUGGIFY; + state bool disableClearRange = BUGGIFY; + state bool disableClear = BUGGIFY; + state bool disableWatch = BUGGIFY; + state bool disableWriteConflictRange = BUGGIFY; + state bool disableDelay = BUGGIFY; + state bool disableReset = BUGGIFY; + state bool disableReadConflictRange = BUGGIFY; + state bool disableSet = BUGGIFY; + state bool disableAtomicOp = BUGGIFY; + + state Key timebombStr = makeString(8); + uint8_t* data = mutateString(timebombStr); + memcpy(data, &timebomb, 8); loop { - wait(delay(now() - startTime > self->slowModeStart || - (g_network->isSimulated() && g_simulator.speedUpSimulation) - ? 1.0 - : 0.1)); - try { - wait(self->randomTransaction( - (self->useExtraDB && deterministicRandom()->random01() < 0.5) ? self->extraDB : cx, - self, - startTime)); - } catch (Error& e) { - if (e.code() != error_code_not_committed) - throw; - break; - } - if (now() - startTime > self->testDuration) + if (now() - testStartTime > self->testDuration) { return Void(); - } - } -} + } -Key getRandomKey() { - return getKeyForIndex(deterministicRandom()->randomInt(0, nodes)); -} - -Key getKeyForIndex(int idx) { - idx += minNode; - if (adjacentKeys) { - return Key(idx ? keyPrefix + std::string(idx, '\x00') : ""); - } else { - return Key(keyPrefix + format("%010d", idx)); - } -} - -Key versionStampKeyForIndex(int idx) { - Key result = KeyRef(getKeyForIndex(idx).toString() + std::string(14, '\x00')); - int32_t pos = deterministicRandom()->randomInt(0, result.size() - 13); - pos = littleEndian32(pos); - uint8_t* data = mutateString(result); - memcpy(data + result.size() - sizeof(int32_t), &pos, sizeof(int32_t)); - return result; -} - -Key getRandomVersionStampKey() { - return versionStampKeyForIndex(deterministicRandom()->randomInt(0, nodes)); -} - -KeySelector getRandomKeySelector() { - int scale = 1 << deterministicRandom()->randomInt(0, 14); - return KeySelectorRef( - getRandomKey(), deterministicRandom()->random01() < 0.5, deterministicRandom()->randomInt(-scale, scale)); -} - -GetRangeLimits getRandomLimits() { - int kind = deterministicRandom()->randomInt(0, 3); - return GetRangeLimits( - (kind & 1) ? GetRangeLimits::ROW_LIMIT_UNLIMITED - : deterministicRandom()->randomInt(0, 1 << deterministicRandom()->randomInt(1, 10)), - (kind & 2) ? GetRangeLimits::BYTE_LIMIT_UNLIMITED - : deterministicRandom()->randomInt(0, 1 << deterministicRandom()->randomInt(1, 15))); -} - -KeyRange getRandomRange(int sizeLimit) { - int startLocation = deterministicRandom()->randomInt(0, nodes); - int scale = deterministicRandom()->randomInt( - 0, deterministicRandom()->randomInt(2, 5) * deterministicRandom()->randomInt(2, 5)); - int endLocation = startLocation + deterministicRandom()->randomInt( - 0, 1 + std::min(sizeLimit, std::min(nodes - startLocation, 1 << scale))); - - return KeyRangeRef(getKeyForIndex(startLocation), getKeyForIndex(endLocation)); -} - -Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { - Arena arena; - if (type == MutationRef::SetValue) - return value; - else if (type == MutationRef::AddValue) - return doLittleEndianAdd(existingValue, value, arena); - else if (type == MutationRef::AppendIfFits) - return doAppendIfFits(existingValue, value, arena); - else if (type == MutationRef::And) - return doAndV2(existingValue, value, arena); - else if (type == MutationRef::Or) - return doOr(existingValue, value, arena); - else if (type == MutationRef::Xor) - return doXor(existingValue, value, arena); - else if (type == MutationRef::Max) - return doMax(existingValue, value, arena); - else if (type == MutationRef::Min) - return doMinV2(existingValue, value, arena); - else if (type == MutationRef::ByteMin) - return doByteMin(existingValue, value, arena); - else if (type == MutationRef::ByteMax) - return doByteMax(existingValue, value, arena); - ASSERT(false); - return Value(); -} - -ACTOR Future randomTransaction(Database cx, WriteDuringReadWorkload* self, double testStartTime) { - state ReadYourWritesTransaction tr(cx); - state bool readYourWritesDisabled = deterministicRandom()->random01() < 0.5; - state bool readAheadDisabled = deterministicRandom()->random01() < 0.5; - state bool snapshotRYWDisabled = deterministicRandom()->random01() < 0.5; - state bool useBatchPriority = deterministicRandom()->random01() < 0.5; - state int64_t timebomb = - (FLOW_KNOBS->MAX_BUGGIFIED_DELAY == 0.0 && deterministicRandom()->random01() < 0.01) - ? deterministicRandom()->randomInt64(1, 6000) - : 0; // timebomb check can fail incorrectly if simulation injects delay longer than the timebomb - state std::vector> operations; - state ActorCollection commits(false); - state std::vector> watches; - state int changeNum = 1; - state bool doingCommit = false; - state int waitLocation = 0; - state double startTime = now(); - - state bool disableGetKey = BUGGIFY; - state bool disableGetRange = BUGGIFY; - state bool disableGet = BUGGIFY; - state bool disableCommit = BUGGIFY; - state bool disableClearRange = BUGGIFY; - state bool disableClear = BUGGIFY; - state bool disableWatch = BUGGIFY; - state bool disableWriteConflictRange = BUGGIFY; - state bool disableDelay = BUGGIFY; - state bool disableReset = BUGGIFY; - state bool disableReadConflictRange = BUGGIFY; - state bool disableSet = BUGGIFY; - state bool disableAtomicOp = BUGGIFY; - - state Key timebombStr = makeString(8); - uint8_t* data = mutateString(timebombStr); - memcpy(data, &timebomb, 8); - - loop { - if (now() - testStartTime > self->testDuration) { - return Void(); - } - - state int64_t memLimit = 1e8; - state bool cancelled = false; - if (readYourWritesDisabled) - tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); - if (snapshotRYWDisabled) - tr.setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); - if (readAheadDisabled) - tr.setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); - if (useBatchPriority) - tr.setOption(FDBTransactionOptions::PRIORITY_BATCH); - if (self->useSystemKeys) - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::TIMEOUT, timebombStr); - tr.addWriteConflictRange(self->conflictRange); - self->addedConflicts.insert(self->conflictRange, true); - try { - state int numWaits = deterministicRandom()->randomInt(1, 5); - state int i = 0; - for (; i < numWaits && memLimit > 0; i++) { - //TraceEvent("WDROps").detail("Count", i).detail("Max", numWaits).detail("ReadYourWritesDisabled",readYourWritesDisabled); - state int numOps = deterministicRandom()->randomInt(1, self->numOps); - state int j = 0; - for (; j < numOps && memLimit > 0; j++) { - if (commits.getResult().isError()) - throw commits.getResult().getError(); - try { - state int operationType = deterministicRandom()->randomInt(0, 21); - if (operationType == 0 && !disableGetKey) { - operations.push_back(self->getKeyAndCompare(&tr, - self->getRandomKeySelector(), - Snapshot{ deterministicRandom()->coinflip() }, - readYourWritesDisabled, - snapshotRYWDisabled, - self, - &doingCommit, - &memLimit)); - } else if (operationType == 1 && !disableGetRange) { - operations.push_back(self->getRangeAndCompare(&tr, - self->getRandomKeySelector(), - self->getRandomKeySelector(), - self->getRandomLimits(), - Snapshot{ deterministicRandom()->coinflip() }, - Reverse{ deterministicRandom()->coinflip() }, - readYourWritesDisabled, - snapshotRYWDisabled, - self, - &doingCommit, - &memLimit)); - } else if (operationType == 2 && !disableGet) { - operations.push_back(self->getAndCompare(&tr, - self->getRandomKey(), - Snapshot{ deterministicRandom()->coinflip() }, - readYourWritesDisabled, - snapshotRYWDisabled, - self, - &doingCommit, - &memLimit)); - } else if (operationType == 3 && !disableCommit) { - if (!self->rarelyCommit || deterministicRandom()->random01() < 1.0 / self->numOps) { - Future commit = self->commitAndUpdateMemory(&tr, - self, - &cancelled, - readYourWritesDisabled, - snapshotRYWDisabled, - readAheadDisabled, - useBatchPriority, - &doingCommit, - &startTime, - timebombStr); - operations.push_back(commit); - commits.add(commit); - } - } else if (operationType == 4 && !disableClearRange) { - KeyRange range = self->getRandomRange(self->maxClearSize); - self->changeCount.insert(range, changeNum++); - bool noConflict = deterministicRandom()->random01() < 0.5; - //TraceEvent("WDRClearRange").detail("Begin", printable(range)).detail("NoConflict", noConflict); - if (noConflict) - tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); - tr.clear(range); - if (!noConflict) { - KeyRangeRef conflict( - range.begin.substr(0, - std::min(range.begin.size(), - (range.begin.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1)), - range.end.substr(0, - std::min(range.end.size(), - (range.end.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1))); - self->addedConflicts.insert(conflict, true); - } - self->memoryDatabase.erase(self->memoryDatabase.lower_bound(range.begin), - self->memoryDatabase.lower_bound(range.end)); - } else if (operationType == 5 && !disableClear) { - Key key = self->getRandomKey(); - self->changeCount.insert(key, changeNum++); - bool noConflict = deterministicRandom()->random01() < 0.5; - //TraceEvent("WDRClear").detail("Key", printable(key)).detail("NoConflict", noConflict); - if (noConflict) - tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); - tr.clear(key); - if (!noConflict && - key.size() <= (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT)) { - self->addedConflicts.insert(key, true); - } - self->memoryDatabase.erase(key); - } else if (operationType == 6 && !disableWatch) { - watches.push_back(self->watchAndCompare( - &tr, self->getRandomKey(), readYourWritesDisabled, self, &doingCommit, &memLimit)); - } else if (operationType == 7 && !disableWriteConflictRange) { - KeyRange range = self->getRandomRange(self->nodes); - //TraceEvent("WDRAddWriteConflict").detail("Range", printable(range)); - tr.addWriteConflictRange(range); - KeyRangeRef conflict( - range.begin.substr(0, - std::min(range.begin.size(), - (range.begin.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1)), - range.end.substr(0, - std::min(range.end.size(), - (range.end.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1))); - self->addedConflicts.insert(conflict, true); - } else if (operationType == 8 && !disableDelay) { - double maxTime = 6.0; - if (timebomb > 0) - maxTime = startTime + timebomb / 1000.0 - now(); - operations.push_back( - delay(deterministicRandom()->random01() * deterministicRandom()->random01() * - deterministicRandom()->random01() * maxTime)); - } else if (operationType == 9 && !disableReset) { - if (deterministicRandom()->random01() < 0.001) { - //TraceEvent("WDRReset"); - tr.reset(); - self->memoryDatabase = self->lastCommittedDatabase; - self->addedConflicts.insert(allKeys, false); - if (readYourWritesDisabled) - tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); - if (snapshotRYWDisabled) - tr.setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); - if (readAheadDisabled) - tr.setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); - if (self->useSystemKeys) - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.addWriteConflictRange(self->conflictRange); - self->addedConflicts.insert(self->conflictRange, true); - startTime = now(); - tr.setOption(FDBTransactionOptions::TIMEOUT, timebombStr); - } - } else if (operationType == 10 && !disableReadConflictRange) { - KeyRange range = self->getRandomRange(self->maxClearSize); - tr.addReadConflictRange(range); - } else if (operationType == 11 && !disableAtomicOp) { - if (!self->useSystemKeys && deterministicRandom()->random01() < 0.01) { - Key versionStampKey = self->getRandomVersionStampKey(); - Value value = self->getRandomValue(); - KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), - versionStampKey, - tr.getCachedReadVersion().orDefault(0), - normalKeys.end); - self->changeCount.insert(range, changeNum++); - //TraceEvent("WDRVersionStamp").detail("VersionStampKey", printable(versionStampKey)).detail("Range", printable(range)); - tr.atomicOp(versionStampKey, value, MutationRef::SetVersionstampedKey); - tr.clear(range); - KeyRangeRef conflict( - range.begin.substr(0, - std::min(range.begin.size(), - (range.begin.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1)), - range.end.substr(0, - std::min(range.end.size(), - (range.end.startsWith(systemKeys.begin) - ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT) + - 1))); - self->addedConflicts.insert(conflict, true); - self->memoryDatabase.erase(self->memoryDatabase.lower_bound(range.begin), - self->memoryDatabase.lower_bound(range.end)); - } else { - Key key = self->getRandomKey(); - Value value = self->getRandomValue(); - MutationRef::Type opType; - switch (deterministicRandom()->randomInt(0, 8)) { - case 0: - opType = MutationRef::AddValue; - break; - case 1: - opType = MutationRef::And; - break; - case 2: - opType = MutationRef::Or; - break; - case 3: - opType = MutationRef::Xor; - break; - case 4: - opType = MutationRef::Max; - break; - case 5: - opType = MutationRef::Min; - break; - case 6: - opType = MutationRef::ByteMin; - break; - case 7: - opType = MutationRef::ByteMax; - break; + state int64_t memLimit = 1e8; + state bool cancelled = false; + if (readYourWritesDisabled) + tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); + if (snapshotRYWDisabled) + tr.setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); + if (readAheadDisabled) + tr.setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); + if (useBatchPriority) + tr.setOption(FDBTransactionOptions::PRIORITY_BATCH); + if (self->useSystemKeys) + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::TIMEOUT, timebombStr); + tr.addWriteConflictRange(self->conflictRange); + self->addedConflicts.insert(self->conflictRange, true); + try { + state int numWaits = deterministicRandom()->randomInt(1, 5); + state int i = 0; + for (; i < numWaits && memLimit > 0; i++) { + //TraceEvent("WDROps").detail("Count", i).detail("Max", numWaits).detail("ReadYourWritesDisabled",readYourWritesDisabled); + state int numOps = deterministicRandom()->randomInt(1, self->numOps); + state int j = 0; + for (; j < numOps && memLimit > 0; j++) { + if (commits.getResult().isError()) + throw commits.getResult().getError(); + try { + state int operationType = deterministicRandom()->randomInt(0, 21); + if (operationType == 0 && !disableGetKey) { + operations.push_back( + self->getKeyAndCompare(&tr, + self->getRandomKeySelector(), + Snapshot{ deterministicRandom()->coinflip() }, + readYourWritesDisabled, + snapshotRYWDisabled, + self, + &doingCommit, + &memLimit)); + } else if (operationType == 1 && !disableGetRange) { + operations.push_back( + self->getRangeAndCompare(&tr, + self->getRandomKeySelector(), + self->getRandomKeySelector(), + self->getRandomLimits(), + Snapshot{ deterministicRandom()->coinflip() }, + Reverse{ deterministicRandom()->coinflip() }, + readYourWritesDisabled, + snapshotRYWDisabled, + self, + &doingCommit, + &memLimit)); + } else if (operationType == 2 && !disableGet) { + operations.push_back(self->getAndCompare(&tr, + self->getRandomKey(), + Snapshot{ deterministicRandom()->coinflip() }, + readYourWritesDisabled, + snapshotRYWDisabled, + self, + &doingCommit, + &memLimit)); + } else if (operationType == 3 && !disableCommit) { + if (!self->rarelyCommit || deterministicRandom()->random01() < 1.0 / self->numOps) { + Future commit = self->commitAndUpdateMemory(&tr, + self, + &cancelled, + readYourWritesDisabled, + snapshotRYWDisabled, + readAheadDisabled, + useBatchPriority, + &doingCommit, + &startTime, + timebombStr); + operations.push_back(commit); + commits.add(commit); } - self->changeCount.insert(key, changeNum++); + } else if (operationType == 4 && !disableClearRange) { + KeyRange range = self->getRandomRange(self->maxClearSize); + self->changeCount.insert(range, changeNum++); bool noConflict = deterministicRandom()->random01() < 0.5; - //TraceEvent("WDRAtomicOp").detail("Key", printable(key)).detail("Value", value.size()).detail("NoConflict", noConflict); + //TraceEvent("WDRClearRange").detail("Begin", printable(range)).detail("NoConflict", noConflict); if (noConflict) tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); - tr.atomicOp(key, value, opType); - //TraceEvent("WDRAtomicOpSuccess").detail("Key", printable(key)).detail("Value", value.size()); + tr.clear(range); + if (!noConflict) { + KeyRangeRef conflict( + range.begin.substr(0, + std::min(range.begin.size(), + (range.begin.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1)), + range.end.substr(0, + std::min(range.end.size(), + (range.end.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1))); + self->addedConflicts.insert(conflict, true); + } + self->memoryDatabase.erase(self->memoryDatabase.lower_bound(range.begin), + self->memoryDatabase.lower_bound(range.end)); + } else if (operationType == 5 && !disableClear) { + Key key = self->getRandomKey(); + self->changeCount.insert(key, changeNum++); + bool noConflict = deterministicRandom()->random01() < 0.5; + //TraceEvent("WDRClear").detail("Key", printable(key)).detail("NoConflict", noConflict); + if (noConflict) + tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); + tr.clear(key); + if (!noConflict && key.size() <= (key.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT)) { + self->addedConflicts.insert(key, true); + } + self->memoryDatabase.erase(key); + } else if (operationType == 6 && !disableWatch) { + watches.push_back(self->watchAndCompare( + &tr, self->getRandomKey(), readYourWritesDisabled, self, &doingCommit, &memLimit)); + } else if (operationType == 7 && !disableWriteConflictRange) { + KeyRange range = self->getRandomRange(self->nodes); + //TraceEvent("WDRAddWriteConflict").detail("Range", printable(range)); + tr.addWriteConflictRange(range); + KeyRangeRef conflict( + range.begin.substr(0, + std::min(range.begin.size(), + (range.begin.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1)), + range.end.substr(0, + std::min(range.end.size(), + (range.end.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1))); + self->addedConflicts.insert(conflict, true); + } else if (operationType == 8 && !disableDelay) { + double maxTime = 6.0; + if (timebomb > 0) + maxTime = startTime + timebomb / 1000.0 - now(); + operations.push_back( + delay(deterministicRandom()->random01() * deterministicRandom()->random01() * + deterministicRandom()->random01() * maxTime)); + } else if (operationType == 9 && !disableReset) { + if (deterministicRandom()->random01() < 0.001) { + //TraceEvent("WDRReset"); + tr.reset(); + self->memoryDatabase = self->lastCommittedDatabase; + self->addedConflicts.insert(allKeys, false); + if (readYourWritesDisabled) + tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE); + if (snapshotRYWDisabled) + tr.setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE); + if (readAheadDisabled) + tr.setOption(FDBTransactionOptions::READ_AHEAD_DISABLE); + if (self->useSystemKeys) + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.addWriteConflictRange(self->conflictRange); + self->addedConflicts.insert(self->conflictRange, true); + startTime = now(); + tr.setOption(FDBTransactionOptions::TIMEOUT, timebombStr); + } + } else if (operationType == 10 && !disableReadConflictRange) { + KeyRange range = self->getRandomRange(self->maxClearSize); + tr.addReadConflictRange(range); + } else if (operationType == 11 && !disableAtomicOp) { + if (!self->useSystemKeys && deterministicRandom()->random01() < 0.01) { + Key versionStampKey = self->getRandomVersionStampKey(); + Value value = self->getRandomValue(); + KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), + versionStampKey, + tr.getCachedReadVersion().orDefault(0), + normalKeys.end); + self->changeCount.insert(range, changeNum++); + //TraceEvent("WDRVersionStamp").detail("VersionStampKey", printable(versionStampKey)).detail("Range", printable(range)); + tr.atomicOp(versionStampKey, value, MutationRef::SetVersionstampedKey); + tr.clear(range); + KeyRangeRef conflict( + range.begin.substr(0, + std::min(range.begin.size(), + (range.begin.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1)), + range.end.substr(0, + std::min(range.end.size(), + (range.end.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT) + + 1))); + self->addedConflicts.insert(conflict, true); + self->memoryDatabase.erase(self->memoryDatabase.lower_bound(range.begin), + self->memoryDatabase.lower_bound(range.end)); + } else { + Key key = self->getRandomKey(); + Value value = self->getRandomValue(); + MutationRef::Type opType; + switch (deterministicRandom()->randomInt(0, 8)) { + case 0: + opType = MutationRef::AddValue; + break; + case 1: + opType = MutationRef::And; + break; + case 2: + opType = MutationRef::Or; + break; + case 3: + opType = MutationRef::Xor; + break; + case 4: + opType = MutationRef::Max; + break; + case 5: + opType = MutationRef::Min; + break; + case 6: + opType = MutationRef::ByteMin; + break; + case 7: + opType = MutationRef::ByteMax; + break; + } + self->changeCount.insert(key, changeNum++); + bool noConflict = deterministicRandom()->random01() < 0.5; + //TraceEvent("WDRAtomicOp").detail("Key", printable(key)).detail("Value", value.size()).detail("NoConflict", noConflict); + if (noConflict) + tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); + tr.atomicOp(key, value, opType); + //TraceEvent("WDRAtomicOpSuccess").detail("Key", printable(key)).detail("Value", value.size()); + if (!noConflict && key.size() <= (key.startsWith(systemKeys.begin) + ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + : CLIENT_KNOBS->KEY_SIZE_LIMIT)) + self->addedConflicts.insert(key, true); + Optional existing = self->memoryGet(&self->memoryDatabase, key); + self->memoryDatabase[key] = + self->applyAtomicOp(existing.present() ? Optional(existing.get()) + : Optional(), + value, + opType); + } + } else if (operationType > 11 && !disableSet) { + Key key = self->getRandomKey(); + Value value = self->getRandomValue(); + self->changeCount.insert(key, changeNum++); + bool noConflict = deterministicRandom()->random01() < 0.5; + //TraceEvent("WDRSet").detail("Key", printable(key)).detail("Value", value.size()).detail("NoConflict", noConflict); + if (noConflict) + tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); + tr.set(key, value); if (!noConflict && key.size() <= (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) self->addedConflicts.insert(key, true); - Optional existing = self->memoryGet(&self->memoryDatabase, key); - self->memoryDatabase[key] = self->applyAtomicOp( - existing.present() ? Optional(existing.get()) : Optional(), - value, - opType); + //TraceEvent("WDRSetSuccess").detail("Key", printable(key)).detail("Value", value.size()); + self->memoryDatabase[key] = value; + } + } catch (Error& e) { + if (e.code() == error_code_used_during_commit) { + ASSERT(doingCommit); + } else if (e.code() != error_code_transaction_cancelled) { + throw; } - } else if (operationType > 11 && !disableSet) { - Key key = self->getRandomKey(); - Value value = self->getRandomValue(); - self->changeCount.insert(key, changeNum++); - bool noConflict = deterministicRandom()->random01() < 0.5; - //TraceEvent("WDRSet").detail("Key", printable(key)).detail("Value", value.size()).detail("NoConflict", noConflict); - if (noConflict) - tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); - tr.set(key, value); - if (!noConflict && - key.size() <= (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT - : CLIENT_KNOBS->KEY_SIZE_LIMIT)) - self->addedConflicts.insert(key, true); - //TraceEvent("WDRSetSuccess").detail("Key", printable(key)).detail("Value", value.size()); - self->memoryDatabase[key] = value; - } - } catch (Error& e) { - if (e.code() == error_code_used_during_commit) { - ASSERT(doingCommit); - } else if (e.code() != error_code_transaction_cancelled) { - throw; } } - } - if (waitLocation < operations.size()) { - int waitOp = deterministicRandom()->randomInt(waitLocation, operations.size()); - //TraceEvent("WDRWait").detail("Op", waitOp).detail("Operations", operations.size()).detail("WaitLocation", waitLocation); - wait(operations[waitOp]); - wait(delay(0.000001)); // to ensure errors have propgated from reads to commits - waitLocation = operations.size(); + if (waitLocation < operations.size()) { + int waitOp = deterministicRandom()->randomInt(waitLocation, operations.size()); + //TraceEvent("WDRWait").detail("Op", waitOp).detail("Operations", operations.size()).detail("WaitLocation", waitLocation); + wait(operations[waitOp]); + wait(delay(0.000001)); // to ensure errors have propgated from reads to commits + waitLocation = operations.size(); + } + } + wait(waitForAll(operations)); + ASSERT(timebomb == 0 || 1000 * (now() - startTime) <= timebomb + 1); + wait(tr.debug_onIdle()); + wait(delay(0.000001)); // to ensure triggered watches have a change to register + self->finished.trigger(); + wait(waitForAll(watches)); // only for errors, should have all returned + self->changeCount.insert(allKeys, 0); + break; + } catch (Error& e) { + operations.clear(); + commits.clear(false); + waitLocation = 0; + watches.clear(); + self->changeCount.insert(allKeys, 0); + doingCommit = false; + //TraceEvent("WDRError").errorUnsuppressed(e); + if (e.code() == error_code_database_locked) { + self->memoryDatabase = self->lastCommittedDatabase; + self->addedConflicts.insert(allKeys, false); + return Void(); + } + if (e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result || + e.code() == error_code_transaction_too_large || e.code() == error_code_key_too_large || + e.code() == error_code_value_too_large || e.code() == error_code_too_many_watches || cancelled) + throw not_committed(); + try { + wait(tr.onError(e)); + } catch (Error& e) { + if (e.code() == error_code_transaction_timed_out) { + ASSERT(timebomb != 0 && 1000 * (now() - startTime) >= timebomb - 1); + throw not_committed(); + } + throw e; } - } - wait(waitForAll(operations)); - ASSERT(timebomb == 0 || 1000 * (now() - startTime) <= timebomb + 1); - wait(tr.debug_onIdle()); - wait(delay(0.000001)); // to ensure triggered watches have a change to register - self->finished.trigger(); - wait(waitForAll(watches)); // only for errors, should have all returned - self->changeCount.insert(allKeys, 0); - break; - } catch (Error& e) { - operations.clear(); - commits.clear(false); - waitLocation = 0; - watches.clear(); - self->changeCount.insert(allKeys, 0); - doingCommit = false; - //TraceEvent("WDRError").errorUnsuppressed(e); - if (e.code() == error_code_database_locked) { self->memoryDatabase = self->lastCommittedDatabase; self->addedConflicts.insert(allKeys, false); - return Void(); } - if (e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result || - e.code() == error_code_transaction_too_large || e.code() == error_code_key_too_large || - e.code() == error_code_value_too_large || e.code() == error_code_too_many_watches || cancelled) - throw not_committed(); - try { - wait(tr.onError(e)); - } catch (Error& e) { - if (e.code() == error_code_transaction_timed_out) { - ASSERT(timebomb != 0 && 1000 * (now() - startTime) >= timebomb - 1); - throw not_committed(); - } - throw e; - } - self->memoryDatabase = self->lastCommittedDatabase; - self->addedConflicts.insert(allKeys, false); } + self->memoryDatabase = self->lastCommittedDatabase; + self->addedConflicts.insert(allKeys, false); + return Void(); } - self->memoryDatabase = self->lastCommittedDatabase; - self->addedConflicts.insert(allKeys, false); - return Void(); -} -} -; +}; WorkloadFactory WriteDuringReadWorkloadFactory("WriteDuringRead");