Merge pull request #2835 from bnamasivayam/revert-report-conflicting-keys

Revert report conflicting keys
This commit is contained in:
Meng Xu 2020-03-20 10:33:26 -07:00 committed by GitHub
commit 980037f3a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 26 additions and 443 deletions

View File

@ -1604,7 +1604,6 @@ struct UnitTestsFunc : InstructionFunc {
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS);
Optional<FDBStandalone<ValueRef> > _ = wait(tr->get(LiteralStringRef("\xff")));
tr->cancel();

View File

@ -150,23 +150,21 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) {
}
struct CommitTransactionRef {
CommitTransactionRef() : read_snapshot(0), report_conflicting_keys(false) {}
CommitTransactionRef() : read_snapshot(0) {}
CommitTransactionRef(Arena &a, const CommitTransactionRef &from)
: read_conflict_ranges(a, from.read_conflict_ranges),
write_conflict_ranges(a, from.write_conflict_ranges),
mutations(a, from.mutations),
read_snapshot(from.read_snapshot),
report_conflicting_keys(from.report_conflicting_keys) {
read_snapshot(from.read_snapshot) {
}
VectorRef< KeyRangeRef > read_conflict_ranges;
VectorRef< KeyRangeRef > write_conflict_ranges;
VectorRef< MutationRef > mutations;
Version read_snapshot;
bool report_conflicting_keys;
template <class Ar>
force_inline void serialize( Ar& ar ) {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys);
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
}
// Convenience for internal code required to manipulate these without the Native API

View File

@ -150,8 +150,7 @@ ACTOR Future<Void> krmSetRange( Reference<ReadYourWritesTransaction> tr, Key map
//Sets a range of keys in a key range map, coalescing with adjacent regions if the values match
//Ranges outside of maxRange will not be coalesced
//CAUTION: use care when attempting to coalesce multiple ranges in the same prefix in a single transaction
ACTOR template <class Transaction>
static Future<Void> krmSetRangeCoalescing_( Transaction *tr, Key mapPrefix, KeyRange range, KeyRange maxRange, Value value ) {
ACTOR Future<Void> krmSetRangeCoalescing( Transaction *tr, Key mapPrefix, KeyRange range, KeyRange maxRange, Value value ) {
ASSERT(maxRange.contains(range));
state KeyRange withPrefix = KeyRangeRef( mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString() );
@ -217,11 +216,3 @@ static Future<Void> krmSetRangeCoalescing_( Transaction *tr, Key mapPrefix, KeyR
return Void();
}
Future<Void> krmSetRangeCoalescing(Transaction* const& tr, Key const& mapPrefix, KeyRange const& range,
KeyRange const& maxRange, Value const& value) {
return krmSetRangeCoalescing_(tr, mapPrefix, range, maxRange, value);
}
Future<Void> krmSetRangeCoalescing(Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix,
KeyRange const& range, KeyRange const& maxRange, Value const& value) {
return holdWhile(tr, krmSetRangeCoalescing_(tr.getPtr(), mapPrefix, range, maxRange, value));
}

View File

@ -103,7 +103,6 @@ void krmSetPreviouslyEmptyRange( struct CommitTransactionRef& tr, Arena& trArena
Future<Void> krmSetRange( Transaction* const& tr, Key const& mapPrefix, KeyRange const& range, Value const& value );
Future<Void> krmSetRange( Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix, KeyRange const& range, Value const& value );
Future<Void> krmSetRangeCoalescing( Transaction* const& tr, Key const& mapPrefix, KeyRange const& range, KeyRange const& maxRange, Value const& value );
Future<Void> krmSetRangeCoalescing( Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix, KeyRange const& range, KeyRange const& maxRange, Value const& value );
Standalone<RangeResultRef> krmDecodeRanges( KeyRef mapPrefix, KeyRange keys, Standalone<RangeResultRef> kv );
template <class Val, class Metric, class MetricFunc>

View File

@ -105,15 +105,14 @@ struct CommitID {
Version version; // returns invalidVersion if transaction conflicts
uint16_t txnBatchId;
Optional<Value> metadataVersion;
Optional<Standalone<VectorRef<int>>> conflictingKRIndices;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, txnBatchId, metadataVersion, conflictingKRIndices);
serializer(ar, version, txnBatchId, metadataVersion);
}
CommitID() : version(invalidVersion), txnBatchId(0) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion, const Optional<Standalone<VectorRef<int>>>& conflictingKRIndices = Optional<Standalone<VectorRef<int>>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKRIndices(conflictingKRIndices) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion) {}
};
struct CommitTransactionRequest : TimedRequest {
@ -138,7 +137,6 @@ struct CommitTransactionRequest : TimedRequest {
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID);
}
};
static inline int getBytes( CommitTransactionRequest const& r ) {

View File

@ -21,7 +21,6 @@
#include "fdbclient/NativeAPI.actor.h"
#include <iterator>
#include <unordered_set>
#include "fdbclient/Atomic.h"
#include "fdbclient/ClusterInterface.h"
@ -33,7 +32,6 @@
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/MutationList.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/LoadBalance.h"
@ -2744,41 +2742,6 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
trLogInfo->addLog(FdbClientLogEvents::EventCommit(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), req));
return Void();
} else {
// clear the RYW transaction which contains previous conflicting keys
tr->info.conflictingKeysRYW.reset();
if (ci.conflictingKRIndices.present()){
// In general, if we want to use getRange to expose conflicting keys,
// we need to support all the parameters getRange provides.
// It is difficult to take care of all corner cases of what getRange does.
// Consequently, we use a hack way here to achieve it.
// We create an empty RYWTransaction and write all conflicting key/values to it.
// Since it is RYWTr, we can call getRange on it with same parameters given to the original getRange.
tr->info.conflictingKeysRYW = std::make_shared<ReadYourWritesTransaction>(tr->getDatabase());
state Reference<ReadYourWritesTransaction> hackTr =
Reference<ReadYourWritesTransaction>(tr->info.conflictingKeysRYW.get());
try {
state Standalone<VectorRef<int>> conflictingKRIndices = ci.conflictingKRIndices.get();
// To make the getRange call local, we need to explicitly set the read version here.
// This version number 100 set here does nothing but prevent getting read version from the proxy
tr->info.conflictingKeysRYW->setVersion(100);
// Clear the whole key space, thus, RYWTr knows to only read keys locally
tr->info.conflictingKeysRYW->clear(normalKeys);
// initialize value
tr->info.conflictingKeysRYW->set(conflictingKeysPrefix, conflictingKeysFalse);
// drop duplicate indices and merge overlapped ranges
// Note: addReadConflictRange in native transaction object does not merge overlapped ranges
state std::unordered_set<int> mergedIds(conflictingKRIndices.begin(), conflictingKRIndices.end());
for (auto const & rCRIndex : mergedIds) {
const KeyRange kr = req.transaction.read_conflict_ranges[rCRIndex];
wait(krmSetRangeCoalescing(hackTr, conflictingKeysPrefix, kr, allKeys, conflictingKeysTrue));
}
} catch (Error& e) {
hackTr.extractPtr(); // Make sure the RYW is not freed twice in case exception thrown
throw;
}
hackTr.extractPtr(); // Avoid the Reference to destroy the RYW object
}
if (info.debugID.present())
TraceEvent(interval.end()).detail("Conflict", 1);
@ -2891,9 +2854,6 @@ Future<Void> Transaction::commitMutations() {
if(options.firstInBatch) {
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
}
if(options.reportConflictingKeys) {
tr.transaction.report_conflicting_keys = true;
}
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
@ -3084,11 +3044,6 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
validateOptionValue(value, false);
options.includePort = true;
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
default:
break;

View File

@ -128,7 +128,6 @@ struct TransactionOptions {
bool readOnly : 1;
bool firstInBatch : 1;
bool includePort : 1;
bool reportConflictingKeys : 1;
TransactionOptions(Database const& cx);
TransactionOptions();
@ -136,14 +135,10 @@ struct TransactionOptions {
void reset(Database const& cx);
};
class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// shared_ptr used here since TransactionInfo is sometimes copied as function parameters.
std::shared_ptr<ReadYourWritesTransaction> conflictingKeysRYW;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
};

View File

@ -1279,44 +1279,6 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
}
}
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
// The returned key value pairs are interpretted as :
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
// Currently, the conflicting keyranges returned are original read_conflict_ranges or union of them.
// TODO : This interface needs to be integrated into the framework that handles special keys' calls in the future
if (begin.getKey().startsWith(conflictingKeysAbsolutePrefix) && end.getKey().startsWith(conflictingKeysAbsolutePrefix)) {
// Remove the special key prefix "\xff\xff"
KeyRef beginConflictingKey = begin.getKey().removePrefix(specialKeys.begin);
KeyRef endConflictingKey = end.getKey().removePrefix(specialKeys.begin);
// Check if the conflicting key range to be read is valid
KeyRef maxKey = getMaxReadKey();
if(beginConflictingKey > maxKey || endConflictingKey > maxKey)
return key_outside_legal_range();
begin.setKey(beginConflictingKey);
end.setKey(endConflictingKey);
if (tr.info.conflictingKeysRYW) {
Future<Standalone<RangeResultRef>> resultWithoutPrefixFuture =
tr.info.conflictingKeysRYW->getRange(begin, end, limits, snapshot, reverse);
// Make sure it happens locally
ASSERT(resultWithoutPrefixFuture.isReady());
Standalone<RangeResultRef> resultWithoutPrefix = resultWithoutPrefixFuture.get();
// Add prefix to results, making keys consistent with the getRange query
Standalone<RangeResultRef> resultWithPrefix;
resultWithPrefix.reserve(resultWithPrefix.arena(), resultWithoutPrefix.size());
for (auto const & kv : resultWithoutPrefix) {
KeyValueRef kvWithPrefix(kv.key.withPrefix(specialKeys.begin, resultWithPrefix.arena()), kv.value);
resultWithPrefix.push_back(resultWithPrefix.arena(), kvWithPrefix);
}
return resultWithPrefix;
} else {
return Standalone<RangeResultRef>();
}
}
if(checkUsedDuringCommit()) {
return used_during_commit();
}

View File

@ -28,7 +28,6 @@ const KeyRangeRef systemKeys(systemKeysPrefix, LiteralStringRef("\xff\xff") );
const KeyRangeRef nonMetadataSystemKeys(LiteralStringRef("\xff\x02"), LiteralStringRef("\xff\x03"));
const KeyRangeRef allKeys = KeyRangeRef(normalKeys.begin, systemKeys.end);
const KeyRef afterAllKeys = LiteralStringRef("\xff\xff\x00");
const KeyRangeRef specialKeys = KeyRangeRef(LiteralStringRef("\xff\xff"), LiteralStringRef("\xff\xff\xff\xff"));
// keyServersKeys.contains(k) iff k.startsWith(keyServersPrefix)
const KeyRangeRef keyServersKeys( LiteralStringRef("\xff/keyServers/"), LiteralStringRef("\xff/keyServers0") );
@ -59,11 +58,6 @@ void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>
}
}
const KeyRef conflictingKeysPrefix = LiteralStringRef("/transaction/conflicting_keys/");
const Key conflictingKeysAbsolutePrefix = conflictingKeysPrefix.withPrefix(specialKeys.begin);
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
// "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
const KeyRangeRef storageCacheKeys( LiteralStringRef("\xff/storageCache/"), LiteralStringRef("\xff/storageCache0") );
const KeyRef storageCachePrefix = storageCacheKeys.begin;

View File

@ -36,7 +36,6 @@ extern const KeyRangeRef normalKeys; // '' to systemKeys.begin
extern const KeyRangeRef systemKeys; // [FF] to [FF][FF]
extern const KeyRangeRef nonMetadataSystemKeys; // [FF][00] to [FF][01]
extern const KeyRangeRef allKeys; // '' to systemKeys.end
extern const KeyRangeRef specialKeys; // [FF][FF] to [FF][FF][FF][FF]
extern const KeyRef afterAllKeys;
// "\xff/keyServers/[[begin]]" := "[[vector<serverID>, vector<serverID>]]"
@ -65,10 +64,6 @@ const Key serverKeysPrefixFor( UID serverID );
UID serverKeysDecodeServer( const KeyRef& key );
bool serverHasKey( ValueRef storedValue );
extern const KeyRef conflictingKeysPrefix;
extern const Key conflictingKeysAbsolutePrefix;
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
extern const KeyRef cacheKeysPrefix;
const Key cacheKeysKey( uint16_t idx, const KeyRef& key );

View File

@ -253,8 +253,6 @@ description is not currently required but encouraged.
hidden="true" />
<Option name="use_provisional_proxies" code="711"
description="This option should only be used by tools which change the database configuration." />
<Option name="report_conflicting_keys" code="712"
description="The transaction can retrieve keys that are conflicting with other transactions." />
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -166,7 +166,6 @@ set(FDBSERVER_SRCS
workloads/RandomSelector.actor.cpp
workloads/ReadWrite.actor.cpp
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp

View File

@ -33,8 +33,7 @@ void clearConflictSet(ConflictSet*, Version);
void destroyConflictSet(ConflictSet*);
struct ConflictBatch {
explicit ConflictBatch(ConflictSet*, std::map<int, VectorRef<int>>* conflictingKeyRangeMap = nullptr,
Arena* resolveBatchReplyArena = nullptr);
explicit ConflictBatch(ConflictSet*);
~ConflictBatch();
enum TransactionCommitResult {
@ -56,8 +55,6 @@ private:
std::vector<std::pair<StringRef, StringRef>> combinedWriteConflictRanges;
std::vector<struct ReadConflictRange> combinedReadConflictRanges;
bool* transactionConflictStatus;
std::map<int, VectorRef<int>>* conflictingKeyRangeMap;
Arena* resolveBatchReplyArena;
void checkIntraBatchConflicts();
void combineWriteConflictRanges();

View File

@ -447,10 +447,8 @@ struct ResolutionRequestBuilder {
vector<int> resolversUsed;
for (int r = 0; r<outTr.size(); r++)
if (outTr[r]) {
if (outTr[r])
resolversUsed.push_back(r);
outTr[r]->report_conflicting_keys = trIn.report_conflicting_keys;
}
transactionResolverMap.push_back(std::move(resolversUsed));
}
};
@ -821,9 +819,7 @@ ACTOR Future<Void> commitBatch(
// Determine which transactions actually committed (conservatively) by combining results from the resolvers
state vector<uint8_t> committed(trs.size());
ASSERT(transactionResolverMap.size() == committed.size());
// For each commitTransactionRef, it is only sent to resolvers specified in transactionResolverMap
// Thus, we use this nextTr to track the correct transaction index on each resolver.
state vector<int> nextTr(resolution.size());
vector<int> nextTr(resolution.size());
for (int t = 0; t<trs.size(); t++) {
uint8_t commit = ConflictBatch::TransactionCommitted;
for (int r : transactionResolverMap[t])
@ -1155,8 +1151,6 @@ ACTOR Future<Void> commitBatch(
// Send replies to clients
double endTime = g_network->timer();
// Reset all to zero, used to track the correct index of each commitTransacitonRef on each resolver
std::fill(nextTr.begin(), nextTr.end(), 0);
for (int t = 0; t < trs.size(); t++) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
@ -1166,26 +1160,9 @@ ACTOR Future<Void> commitBatch(
trs[t].reply.sendError(transaction_too_old());
}
else {
// If enable the option to report conflicting keys from resolvers, we send back all keyranges' indices through CommitID
if (trs[t].transaction.report_conflicting_keys) {
Standalone<VectorRef<int>> conflictingKRIndices;
for (int resolverInd : transactionResolverMap[t]) {
auto const & cKRs = resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]];
for (auto const & rCRIndex : cKRs)
conflictingKRIndices.push_back(conflictingKRIndices.arena(), rCRIndex);
}
// At least one keyRange index should be returned
ASSERT(conflictingKRIndices.size());
trs[t].reply.send(CommitID(invalidVersion, t, Optional<Value>(), Optional<Standalone<VectorRef<int>>>(conflictingKRIndices)));
} else {
trs[t].reply.sendError(not_committed());
}
trs[t].reply.sendError(not_committed());
}
// Update corresponding transaction indices on each resolver
for (int resolverInd : transactionResolverMap[t])
nextTr[resolverInd]++;
// TODO: filter if pipelined with large commit
if(self->latencyBandConfig.present()) {
bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());

View File

@ -167,14 +167,12 @@ ACTOR Future<Void> resolveBatch(
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version];
vector<int> commitList;
vector<int> tooOldList;
// Detect conflicts
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
ConflictBatch conflictBatch( self->conflictSet );
int keys = 0;
for(int t=0; t<req.transactions.size(); t++) {
conflictBatch.addTransaction( req.transactions[t] );
@ -191,6 +189,7 @@ ACTOR Future<Void> resolveBatch(
}
conflictBatch.detectConflicts( req.version, req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS, commitList, &tooOldList);
ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version];
reply.debugID = req.debugID;
reply.committed.resize( reply.arena, req.transactions.size() );
for(int c=0; c<commitList.size(); c++)

View File

@ -77,11 +77,10 @@ struct ResolveTransactionBatchReply {
VectorRef<uint8_t> committed;
Optional<UID> debugID;
VectorRef<VectorRef<StateTransactionRef>> stateMutations; // [version][transaction#] -> (committed, [mutation#])
std::map<int, VectorRef<int>> conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, arena);
serializer(ar, committed, stateMutations, arena, debugID);
}
};

View File

@ -70,14 +70,8 @@ struct ReadConflictRange {
StringRef begin, end;
Version version;
int transaction;
int indexInTx;
VectorRef<int>* conflictingKeyRange;
Arena* cKRArena;
ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction, int indexInTx,
VectorRef<int>* cKR = nullptr, Arena* cKRArena = nullptr)
: begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx),
conflictingKeyRange(cKR), cKRArena(cKRArena) {}
ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction)
: begin(begin), end(end), version(version), transaction(transaction) {}
bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin) < 0; }
};
@ -293,10 +287,10 @@ private:
int nPointers, valueLength;
};
static force_inline bool less(const uint8_t* a, int aLen, const uint8_t* b, int bLen) {
int c = memcmp(a, b, min(aLen, bLen));
if (c < 0) return true;
if (c > 0) return false;
static force_inline bool less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) {
int c = memcmp(a,b,min(aLen,bLen));
if (c<0) return true;
if (c>0) return false;
return aLen < bLen;
}
@ -421,8 +415,7 @@ public:
int started = min(M, count);
for (int i = 0; i < started; i++) {
inProgress[i].init(ranges[i], header, transactionConflictStatus, ranges[i].indexInTx,
ranges[i].conflictingKeyRange, ranges[i].cKRArena);
inProgress[i].init(ranges[i], header, transactionConflictStatus);
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
@ -436,11 +429,8 @@ public:
if (prevJob == job) break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else {
int temp = started++;
inProgress[job].init(ranges[temp], header, transactionConflictStatus, ranges[temp].indexInTx,
ranges[temp].conflictingKeyRange, ranges[temp].cKRArena);
}
} else
inProgress[job].init(ranges[started++], header, transactionConflictStatus);
}
prevJob = job;
job = nextJob[job];
@ -608,26 +598,18 @@ private:
Version version;
bool* result;
int state;
int indexInTx;
VectorRef<int>* conflictingKeyRange; // nullptr if report_conflicting_keys is not enabled.
Arena* cKRArena; // nullptr if report_conflicting_keys is not enabled.
void init(const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, VectorRef<int>* cKR,
Arena* cKRArena) {
void init(const ReadConflictRange& r, Node* header, bool* tCS) {
this->start.init(r.begin, header);
this->end.init(r.end, header);
this->version = r.version;
this->indexInTx = indexInTx;
this->cKRArena = cKRArena;
result = &tCS[r.transaction];
conflictingKeyRange = cKR;
this->state = 0;
}
bool noConflict() { return true; }
bool conflict() {
*result = true;
if (conflictingKeyRange != nullptr) conflictingKeyRange->push_back(*cKRArena, indexInTx);
return true;
}
@ -746,10 +728,7 @@ void destroyConflictSet(ConflictSet* cs) {
delete cs;
}
ConflictBatch::ConflictBatch(ConflictSet* cs, std::map<int, VectorRef<int>>* conflictingKeyRangeMap,
Arena* resolveBatchReplyArena)
: cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap),
resolveBatchReplyArena(resolveBatchReplyArena) {}
ConflictBatch::ConflictBatch(ConflictSet* cs) : cs(cs), transactionCount(0) {}
ConflictBatch::~ConflictBatch() {}
@ -757,7 +736,6 @@ struct TransactionInfo {
VectorRef<std::pair<int, int>> readRanges;
VectorRef<std::pair<int, int>> writeRanges;
bool tooOld;
bool reportConflictingKeys;
};
void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
@ -765,7 +743,6 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
Arena& arena = transactionInfo.arena();
TransactionInfo* info = new (arena) TransactionInfo;
info->reportConflictingKeys = tr.report_conflicting_keys;
if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) {
info->tooOld = true;
@ -779,10 +756,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
const KeyRangeRef& range = tr.read_conflict_ranges[r];
points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first);
points.emplace_back(range.end, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r,
tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t]
: nullptr,
tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t);
}
for (int r = 0; r < tr.write_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r];
@ -819,15 +793,11 @@ void ConflictBatch::checkIntraBatchConflicts() {
const TransactionInfo& tr = *transactionInfo[t];
if (transactionConflictStatus[t]) continue;
bool conflict = tr.tooOld;
for (int i = 0; i < tr.readRanges.size(); i++) {
for (int i = 0; i < tr.readRanges.size(); i++)
if (mcs.any(tr.readRanges[i].first, tr.readRanges[i].second)) {
if (tr.reportConflictingKeys) {
(*conflictingKeyRangeMap)[t].push_back(*resolveBatchReplyArena, i);
}
conflict = true;
break;
}
}
transactionConflictStatus[t] = conflict;
if (!conflict)
for (int i = 0; i < tr.writeRanges.size(); i++) mcs.set(tr.writeRanges[i].first, tr.writeRanges[i].second);

View File

@ -171,7 +171,6 @@
<ActorCompiler Include="workloads\SnapTest.actor.cpp" />
<ActorCompiler Include="workloads\Mako.actor.cpp" />
<ActorCompiler Include="workloads\ExternalWorkload.actor.cpp" />
<ActorCompiler Include="workloads\ReportConflictingKeys.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ApplyMetadataMutation.h" />

View File

@ -300,9 +300,6 @@
<ActorCompiler Include="workloads\Mako.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\ReportConflictingKeys.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="OldTLogServer.actor.cpp" />
</ItemGroup>
<ItemGroup>

View File

@ -1,238 +0,0 @@
/*
* ReportConflictingKeys.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// For this test to report properly buggify must be disabled (flow.h) , and failConnection must be disabled in
// (sim2.actor.cpp)
struct ReportConflictingKeysWorkload : TestWorkload {
double testDuration, transactionsPerSecond, addReadConflictRangeProb, addWriteConflictRangeProb;
Key keyPrefix;
int nodeCount, actorCount, keyBytes, valueBytes, readConflictRangeCount, writeConflictRangeCount;
PerfIntCounter invalidReports, commits, conflicts, xacts;
ReportConflictingKeysWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), invalidReports("InvalidReports"), conflicts("Conflicts"), commits("Commits"),
xacts("Transactions") {
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
// transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
actorCount = getOption(options, LiteralStringRef("actorsPerClient"), 1);
keyPrefix = unprintable(
getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("ReportConflictingKeysWorkload"))
.toString());
keyBytes = getOption(options, LiteralStringRef("keyBytes"), 16);
readConflictRangeCount = getOption(options, LiteralStringRef("readConflictRangeCountPerTx"), 1);
writeConflictRangeCount = getOption(options, LiteralStringRef("writeConflictRangeCountPerTx"), 1);
ASSERT(readConflictRangeCount >= 1 && writeConflictRangeCount >= 1);
// modeled by geometric distribution: (1 - prob) / prob = mean - 1, where we add at least one conflictRange to
// each tx
addReadConflictRangeProb = (readConflictRangeCount - 1.0) / readConflictRangeCount;
addWriteConflictRangeProb = (writeConflictRangeCount - 1.0) / writeConflictRangeCount;
ASSERT(keyPrefix.size() + 16 <= keyBytes); // make sure the string format is valid
nodeCount = getOption(options, LiteralStringRef("nodeCount"), 100);
}
std::string description() override { return "ReportConflictingKeysWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(const Database& cx) override { return _start(cx->clone(), this); }
ACTOR Future<Void> _start(Database cx, ReportConflictingKeysWorkload* self) {
if (self->clientId == 0) wait(timeout(self->conflictingClient(cx, self), self->testDuration, Void()));
return Void();
}
Future<bool> check(Database const& cx) override { return invalidReports.getValue() == 0; }
void getMetrics(vector<PerfMetric>& m) override {
m.push_back(PerfMetric("Measured Duration", testDuration, true));
m.push_back(xacts.getMetric());
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
m.push_back(commits.getMetric());
m.push_back(PerfMetric("Commits/sec", commits.getValue() / testDuration, true));
m.push_back(conflicts.getMetric());
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
}
// disable the default timeout setting
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
// Copied from tester.actor.cpp, added parameter to determine the key's length
Key keyForIndex(int n) {
double p = (double)n / nodeCount;
int paddingLen = keyBytes - 16 - keyPrefix.size();
// left padding by zero
return StringRef(format("%0*llx", paddingLen, *(uint64_t*)&p)).withPrefix(keyPrefix);
}
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* readConflictRanges) {
int startIdx, endIdx;
Key startKey, endKey;
do { // add at least one
startIdx = deterministicRandom()->randomInt(0, nodeCount);
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount + 1);
startKey = keyForIndex(startIdx);
endKey = keyForIndex(endIdx);
tr->addReadConflictRange(KeyRangeRef(startKey, endKey));
if (readConflictRanges) readConflictRanges->push_back(KeyRangeRef(startKey, endKey));
} while (deterministicRandom()->random01() < addReadConflictRangeProb);
}
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* writeConflictRanges) {
int startIdx, endIdx;
Key startKey, endKey;
do { // add at least one
startIdx = deterministicRandom()->randomInt(0, nodeCount);
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount + 1);
startKey = keyForIndex(startIdx);
endKey = keyForIndex(endIdx);
tr->addWriteConflictRange(KeyRangeRef(startKey, endKey));
if (writeConflictRanges) writeConflictRanges->push_back(KeyRangeRef(startKey, endKey));
} while (deterministicRandom()->random01() < addWriteConflictRangeProb);
}
ACTOR Future<Void> conflictingClient(Database cx, ReportConflictingKeysWorkload* self) {
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
state std::vector<KeyRange> readConflictRanges;
state std::vector<KeyRange> writeConflictRanges;
loop {
try {
tr2.setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
// If READ_YOUR_WRITES_DISABLE set, it behaves like native transaction object
// where overlapped conflict ranges are not merged.
if (deterministicRandom()->random01() < 0.5)
tr1.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
if (deterministicRandom()->random01() < 0.5)
tr2.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
// We have the two tx with same grv, then commit the first
// If the second one is not able to commit due to conflicts, verify the returned conflicting keys
// Otherwise, there is no conflicts between tr1's writeConflictRange and tr2's readConflictRange
Version readVersion = wait(tr1.getReadVersion());
tr2.setVersion(readVersion);
self->addRandomReadConflictRange(&tr1, nullptr);
self->addRandomWriteConflictRange(&tr1, &writeConflictRanges);
++self->commits;
wait(tr1.commit());
++self->xacts;
state bool foundConflict = false;
try {
self->addRandomReadConflictRange(&tr2, &readConflictRanges);
self->addRandomWriteConflictRange(&tr2, nullptr);
++self->commits;
wait(tr2.commit());
++self->xacts;
} catch (Error& e) {
if (e.code() != error_code_not_committed) throw e;
foundConflict = true;
++self->conflicts;
}
// check API correctness
if (foundConflict) {
// \xff\xff/transaction/conflicting_keys is always initialized to false, skip it here
state KeyRange ckr =
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysAbsolutePrefix)),
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysAbsolutePrefix));
// The getRange here using the special key prefix "\xff\xff/transaction/conflicting_keys/" happens
// locally Thus, the error handling is not needed here
Future<Standalone<RangeResultRef>> conflictingKeyRangesFuture =
tr2.getRange(ckr, CLIENT_KNOBS->TOO_MANY);
ASSERT(conflictingKeyRangesFuture.isReady());
const Standalone<RangeResultRef> conflictingKeyRanges = conflictingKeyRangesFuture.get();
ASSERT(conflictingKeyRanges.size() &&
(conflictingKeyRanges.size() <= readConflictRanges.size() * 2));
ASSERT(conflictingKeyRanges.size() % 2 == 0);
ASSERT(!conflictingKeyRanges.more);
for (int i = 0; i < conflictingKeyRanges.size(); i += 2) {
KeyValueRef startKeyWithPrefix = conflictingKeyRanges[i];
ASSERT(startKeyWithPrefix.value == conflictingKeysTrue);
KeyValueRef endKeyWithPrefix = conflictingKeyRanges[i + 1];
ASSERT(endKeyWithPrefix.value == conflictingKeysFalse);
// Remove the prefix of returning keys
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
KeyRangeRef kr = KeyRangeRef(startKey, endKey);
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
// Read_conflict_range remains same in the resolver.
// Thus, the returned keyrange is either the original read_conflict_range or merged
// by several overlapped ones in either cases, it contains at least one original
// read_conflict_range
return kr.contains(rCR);
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure")
.detail("Reason",
"Returned conflicting keys are not original or merged readConflictRanges");
} else if (!std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(),
[&kr](KeyRange wCR) {
// Returned key range should be conflicting with at least one
// writeConflictRange
return kr.intersects(wCR);
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Returned keyrange is not conflicting with any writeConflictRange");
}
}
} else {
// make sure no conflicts between tr2's readConflictRange and tr1's writeConflictRange
for (const KeyRange& rCR : readConflictRanges) {
if (std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(), [&rCR](KeyRange wCR) {
bool result = wCR.intersects(rCR);
if (result)
TraceEvent(SevError, "TestFailure")
.detail("WriteConflictRange", wCR.toString())
.detail("ReadConflictRange", rCR.toString());
return result;
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure").detail("Reason", "No conflicts returned but it should");
break;
}
}
}
} catch (Error& e) {
state Error e2 = e;
wait(tr1.onError(e2));
wait(tr2.onError(e2));
}
readConflictRanges.clear();
writeConflictRanges.clear();
tr1.reset();
tr2.reset();
}
}
};
WorkloadFactory<ReportConflictingKeysWorkload> ReportConflictingKeysWorkload("ReportConflictingKeys");