First version for reporting conflicting keys

This commit is contained in:
chaoguang 2019-10-10 15:42:52 -07:00
parent 8953b196de
commit edf52e8c97
14 changed files with 150 additions and 31 deletions

View File

@ -1584,6 +1584,7 @@ struct UnitTestsFunc : InstructionFunc {
data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_RETRY_LIMIT, Optional<StringRef>(StringRef((const uint8_t*)&noRetryLimit, 8)));
data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_CAUSAL_READ_RISKY);
data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_INCLUDE_PORT_IN_ADDRESS);
data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_REPORT_CONFLICTING_KEYS);
state Reference<Transaction> tr = data->db->createTransaction();
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE);
@ -1603,6 +1604,7 @@ struct UnitTestsFunc : InstructionFunc {
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS);
Optional<FDBStandalone<ValueRef> > _ = wait(tr->get(LiteralStringRef("\xff")));
tr->cancel();

View File

@ -137,21 +137,23 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) {
}
struct CommitTransactionRef {
CommitTransactionRef() : read_snapshot(0) {}
CommitTransactionRef() : read_snapshot(0), report_conflicting_keys(false) {}
CommitTransactionRef(Arena &a, const CommitTransactionRef &from)
: read_conflict_ranges(a, from.read_conflict_ranges),
write_conflict_ranges(a, from.write_conflict_ranges),
mutations(a, from.mutations),
read_snapshot(from.read_snapshot) {
read_snapshot(from.read_snapshot),
report_conflicting_keys(from.report_conflicting_keys) {
}
VectorRef< KeyRangeRef > read_conflict_ranges;
VectorRef< KeyRangeRef > write_conflict_ranges;
VectorRef< MutationRef > mutations;
Version read_snapshot;
bool report_conflicting_keys;
template <class Ar>
force_inline void serialize( Ar& ar ) {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys);
}
// Convenience for internal code required to manipulate these without the Native API
@ -161,6 +163,7 @@ struct CommitTransactionRef {
}
void clear( Arena& arena, KeyRangeRef const& keys ) {
// TODO: check do I need to clear flag here
mutations.push_back_deep(arena, MutationRef(MutationRef::ClearRange, keys.begin, keys.end));
write_conflict_ranges.push_back_deep(arena, keys);
}

View File

@ -103,26 +103,30 @@ struct CommitID {
constexpr static FileIdentifier file_identifier = 14254927;
Version version; // returns invalidVersion if transaction conflicts
uint16_t txnBatchId;
Optional<Value> metadataVersion;
Optional<Value> metadataVersion;
// TODO : data structure okay here ?
Optional<Standalone<VectorRef<KeyRangeRef>>> conflictingKeyRanges;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, txnBatchId, metadataVersion);
serializer(ar, version, txnBatchId, metadataVersion, conflictingKeyRanges);
}
CommitID() : version(invalidVersion), txnBatchId(0) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion, const Optional<Standalone<VectorRef<KeyRangeRef>>>& conflictingKeyRanges = Optional<Standalone<VectorRef<KeyRangeRef>>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKeyRanges(conflictingKeyRanges) {}
};
struct CommitTransactionRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 93948;
enum {
FLAG_IS_LOCK_AWARE = 0x1,
FLAG_FIRST_IN_BATCH = 0x2
FLAG_FIRST_IN_BATCH = 0x2,
FLAG_REPORT_CONFLICTING_KEYS = 0x4
};
bool isLockAware() const { return (flags & FLAG_IS_LOCK_AWARE) != 0; }
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
bool isReportConflictingKeys() const { return (flags & FLAG_REPORT_CONFLICTING_KEYS) != 0; }
Arena arena;
CommitTransactionRef transaction;
@ -136,6 +140,10 @@ struct CommitTransactionRequest : TimedRequest {
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID);
}
void reportConflictingKeys(){
transaction.report_conflicting_keys = true;
}
};
static inline int getBytes( CommitTransactionRequest const& r ) {

View File

@ -2613,7 +2613,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
proxy_memory_limit_exceeded(),
commit_unknown_result()});
}
try {
Version v = wait( readVersion );
req.transaction.read_snapshot = v;
@ -2673,6 +2673,10 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
}
return Void();
} else {
if (ci.conflictingKeyRanges.present()){
tr->info.conflictingKeyRanges.push_back_deep(tr->info.conflictingKeyRanges.arena(), ci.conflictingKeyRanges.get());
}
if (info.debugID.present())
TraceEvent(interval.end()).detail("Conflict", 1);
@ -2784,6 +2788,11 @@ Future<Void> Transaction::commitMutations() {
if(options.firstInBatch) {
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
}
if(options.reportConflictingKeys) {
// TODO : Is it better to keep it as a flag?
tr.flags = tr.flags | CommitTransactionRequest::FLAG_REPORT_CONFLICTING_KEYS;
tr.reportConflictingKeys();
}
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
@ -2974,6 +2983,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
validateOptionValue(value, false);
options.includePort = true;
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
default:
break;

View File

@ -132,6 +132,7 @@ struct TransactionOptions {
bool readOnly : 1;
bool firstInBatch : 1;
bool includePort : 1;
bool reportConflictingKeys : 1;
TransactionOptions(Database const& cx);
TransactionOptions();
@ -143,6 +144,7 @@ struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
bool useProvisionalProxies;
Standalone<VectorRef<VectorRef<KeyRangeRef>>> conflictingKeyRanges;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
};
@ -271,6 +273,7 @@ public:
void reset();
void fullReset();
double getBackoff(int errCode);
void debugTransaction(UID dID) { info.debugID = dID; }
Future<Void> commitMutations();

View File

@ -23,6 +23,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/Util.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -1228,6 +1229,23 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
return Optional<Value>();
}
// TODO : add conflict keys to special key space
if (key == LiteralStringRef("\xff\xff/conflicting_keys/json")){
if (!tr.info.conflictingKeyRanges.empty()){
// TODO : return a json value which represents all the values
JsonBuilderArray conflictingKeysArray;
for (auto & cKR : tr.info.conflictingKeyRanges) {
for (auto & kr : cKR) {
conflictingKeysArray.push_back(format("[%s, %s)", kr.begin.toString().c_str(), kr.end.toString().c_str()));
}
}
Optional<Value> output = StringRef(conflictingKeysArray.getJson());
return output;
} else {
return Optional<Value>();
}
}
if(checkUsedDuringCommit()) {
return used_during_commit();
}

View File

@ -174,6 +174,9 @@ description is not currently required but encouraged.
<Option name="transaction_include_port_in_address" code="505"
description="Addresses returned by get_addresses_for_key include the port when enabled. This will be enabled by default in api version 700, and this option will be deprecated."
defaultFor="23"/>
<Option name="transaction_report_conflicting_keys" code="506"
description="Report conflicting keys for transactions that are determined as conflicts by resolvers."
defaultFor="712"/>
</Scope>
<Scope name="TransactionOption">
@ -250,6 +253,8 @@ description is not currently required but encouraged.
hidden="true" />
<Option name="use_provisional_proxies" code="711"
description="This option should only be used by tools which change the database configuration." />
<Option name="report_conflicting_keys" code="712"
description="The transaction will save conflicting keys if conflicts happen." />
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -33,7 +33,7 @@ void clearConflictSet( ConflictSet*, Version );
void destroyConflictSet(ConflictSet*);
struct ConflictBatch {
explicit ConflictBatch( ConflictSet* );
explicit ConflictBatch( ConflictSet*, std::map< int, Standalone<VectorRef<KeyRangeRef>>>* conflictingKeyRangeMap = nullptr);
~ConflictBatch();
enum TransactionCommitResult {
@ -54,6 +54,7 @@ private:
std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges;
std::vector< struct ReadConflictRange > combinedReadConflictRanges;
bool* transactionConflictStatus;
std::map< int, Standalone<VectorRef<KeyRangeRef>> >* conflictingKeyRangeMap;
void checkIntraBatchConflicts();
void combineWriteConflictRanges();

View File

@ -356,8 +356,10 @@ struct ResolutionRequestBuilder {
vector<int> resolversUsed;
for (int r = 0; r<outTr.size(); r++)
if (outTr[r])
if (outTr[r]) {
resolversUsed.push_back(r);
outTr[r]->report_conflicting_keys = trIn.report_conflicting_keys;
}
transactionResolverMap.push_back(std::move(resolversUsed));
}
};
@ -1026,7 +1028,36 @@ ACTOR Future<Void> commitBatch(
trs[t].reply.sendError(transaction_too_old());
}
else {
trs[t].reply.sendError(not_committed());
// If enable the option to report conflicting keys from resolvers, we union all conflicting key ranges here and send back through CommitID
if (trs[t].isReportConflictingKeys()) {
Standalone<VectorRef<KeyRangeRef>> conflictingEntries;
std::vector<KeyRange> unmergedConflictingKRs;
for (int resolverInd = 0; resolverInd < resolution.size(); ++resolverInd){
for (const KeyRangeRef & kr : resolution[resolverInd].conflictingKeyRangeMap[t]) {
unmergedConflictingKRs.emplace_back(kr);
// TraceEvent("ConflictingKeyRange").detail("ResolverIndex", resolverInd).detail("TrasactionIndex", t).detail("StartKey", kr.begin.toString()).detail("EndKey", kr.end.toString());
}
}
// Sort the keyranges by begin key, then union overlap ranges from left to right
std::sort(unmergedConflictingKRs.begin(), unmergedConflictingKRs.end(), [](KeyRange a, KeyRange b){
return a.begin < b.begin;
});
auto next = unmergedConflictingKRs.begin();
// At least one conflicting keyrange should be returned, which means curr is valid
KeyRangeRef curr = *(next++);
while (next != unmergedConflictingKRs.end()) {
if (curr.end >= next->begin) {
curr = KeyRangeRef(curr.begin, std::max(curr.end, next->end));
} else {
conflictingEntries.push_back_deep(conflictingEntries.arena(), curr);
curr = *next;
}
next++;
}
conflictingEntries.push_back_deep(conflictingEntries.arena(), curr);
trs[t].reply.send(CommitID(invalidVersion, t, Optional<Value>(), Optional<Standalone<VectorRef<KeyRangeRef>>>(conflictingEntries)));
} else
trs[t].reply.sendError(not_committed());
}
// TODO: filter if pipelined with large commit

View File

@ -132,11 +132,12 @@ ACTOR Future<Void> resolveBatch(
vector<int> commitList;
vector<int> tooOldList;
std::map<int, Standalone<VectorRef<KeyRangeRef>> > conflictingKeyRangeMap;
// Detect conflicts
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
double tstart = timer();
ConflictBatch conflictBatch( self->conflictSet );
ConflictBatch conflictBatch(self->conflictSet, &conflictingKeyRangeMap);
int keys = 0;
for(int t=0; t<req.transactions.size(); t++) {
conflictBatch.addTransaction( req.transactions[t] );
@ -163,6 +164,8 @@ ACTOR Future<Void> resolveBatch(
for (int c = 0; c<tooOldList.size(); c++)
reply.committed[tooOldList[c]] = ConflictBatch::TransactionTooOld;
reply.conflictingKeyRangeMap = std::move(conflictingKeyRangeMap);
ASSERT(req.prevVersion >= 0 || req.txnStateTransactions.size() == 0); // The master's request should not have any state transactions

View File

@ -77,10 +77,11 @@ struct ResolveTransactionBatchReply {
VectorRef<uint8_t> committed;
Optional<UID> debugID;
VectorRef<VectorRef<StateTransactionRef>> stateMutations; // [version][transaction#] -> (committed, [mutation#])
std::map<int, Standalone<VectorRef<KeyRangeRef>>> conflictingKeyRangeMap; // resolver index -> conflicting keyRanges given by that resolver
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, committed, stateMutations, arena, debugID);
serializer(ar, committed, stateMutations, arena, debugID, conflictingKeyRangeMap);
}
};

View File

@ -123,8 +123,9 @@ struct ReadConflictRange {
StringRef begin, end;
Version version;
int transaction;
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction )
: begin(begin), end(end), version(version), transaction(transaction)
Standalone<VectorRef<KeyRangeRef>> * conflictingKeyRange;
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, Standalone<VectorRef<KeyRangeRef>> * cKR = nullptr )
: begin(begin), end(end), version(version), transaction(transaction), conflictingKeyRange(cKR)
{
}
bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; }
@ -529,7 +530,7 @@ public:
int started = min(M,count);
for(int i=0; i<started; i++){
inProgress[i].init( ranges[i], header, transactionConflictStatus );
inProgress[i].init( ranges[i], header, transactionConflictStatus, ranges[i].conflictingKeyRange );
nextJob[i] = i+1;
}
nextJob[started-1] = 0;
@ -544,8 +545,10 @@ public:
nextJob[prevJob] = nextJob[job];
job = prevJob;
}
else
inProgress[job].init( ranges[started++], header, transactionConflictStatus );
else {
int temp = started++;
inProgress[job].init( ranges[temp], header, transactionConflictStatus, ranges[temp].conflictingKeyRange );
}
}
prevJob = job;
job = nextJob[job];
@ -757,17 +760,25 @@ private:
Version version;
bool *result;
int state;
Standalone<VectorRef<KeyRangeRef>>* conflictingKeyRange;
void init( const ReadConflictRange& r, Node* header, bool* tCS ) {
void init( const ReadConflictRange& r, Node* header, bool* tCS, Standalone<VectorRef<KeyRangeRef>>* cKR) {
this->start.init( r.begin, header );
this->end.init( r.end, header );
this->version = r.version;
result = &tCS[ r.transaction ];
conflictingKeyRange = cKR;
this->state = 0;
}
bool noConflict() { return true; }
bool conflict() { *result = true; return true; }
bool conflict() {
*result = true;
if(conflictingKeyRange != nullptr){
conflictingKeyRange->push_back_deep(conflictingKeyRange->arena(), KeyRangeRef(start.value, end.value));
}
return true;
}
// Return true if finished
force_inline bool advance() {
@ -961,8 +972,8 @@ void destroyConflictSet(ConflictSet* cs) {
delete cs;
}
ConflictBatch::ConflictBatch( ConflictSet* cs )
: cs(cs), transactionCount(0)
ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, Standalone<VectorRef<KeyRangeRef>> >* conflictingKeyRangeMap )
: cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap)
{
}
@ -974,6 +985,7 @@ struct TransactionInfo {
VectorRef< std::pair<int,int> > readRanges;
VectorRef< std::pair<int,int> > writeRanges;
bool tooOld;
bool reportConflictingKeys;
};
void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
@ -981,6 +993,7 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
Arena& arena = transactionInfo.arena();
TransactionInfo* info = new (arena) TransactionInfo;
info->reportConflictingKeys = tr.report_conflicting_keys;
if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) {
info->tooOld = true;
@ -995,7 +1008,7 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first);
//points.back().keyEnd = StringRef(buf,range.second);
points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr);
}
for(int r=0; r<tr.write_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r];
@ -1129,7 +1142,6 @@ public:
}
};
void ConflictBatch::checkIntraBatchConflicts() {
int index = 0;
for(int p=0; p<points.size(); p++)
@ -1140,11 +1152,17 @@ void ConflictBatch::checkIntraBatchConflicts() {
const TransactionInfo& tr = *transactionInfo[t];
if (transactionConflictStatus[t]) continue;
bool conflict = tr.tooOld;
for(int i=0; i<tr.readRanges.size(); i++)
if ( mcs.any( tr.readRanges[i].first, tr.readRanges[i].second ) ) {
for(int i=0; i<tr.readRanges.size(); i++){
int startPointIndex = tr.readRanges[i].first;
int endPointIndex = tr.readRanges[i].second;
if ( mcs.any(startPointIndex , endPointIndex ) ) {
if (tr.reportConflictingKeys){
(*conflictingKeyRangeMap)[t].push_back_deep((*conflictingKeyRangeMap)[t].arena(), KeyRangeRef(points[startPointIndex].key, points[endPointIndex].key));
}
conflict = true;
break;
}
}
transactionConflictStatus[t] = conflict;
if (!conflict)
for(int i=0; i<tr.writeRanges.size(); i++)

View File

@ -14,7 +14,7 @@ constexpr int RANGELIMIT = 10000;
struct MakoWorkload : TestWorkload {
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes;
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant;
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf;
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, reportConflictingKeys;
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
std::vector<PerfIntCounter> opCounters;
std::vector<uint64_t> insertionCountsToMeasure;
@ -61,6 +61,8 @@ struct MakoWorkload : TestWorkload {
// If true, the workload will picking up keys which are zipfian distributed
zipf = getOption(options, LiteralStringRef("zipf"), false);
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
// If true, return key ranges conflicting with other txs
reportConflictingKeys = getOption(options, LiteralStringRef("reportConflictingKeys"), false);
// Specified length of keys and length range of values
keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16);
maxValueBytes = getOption( options, LiteralStringRef("valueBytes"), 16 );
@ -290,6 +292,8 @@ struct MakoWorkload : TestWorkload {
// used for throttling
wait(poisson(&lastTime, delay));
try{
if (self->reportConflictingKeys)
tr.setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
// user-defined value: whether commit read-only ops or not; default is false
doCommit = self->commitGet;
for (i = 0; i < MAX_OP; ++i) {
@ -394,11 +398,19 @@ struct MakoWorkload : TestWorkload {
TraceEvent("FailedToExecOperations").error(e);
if (e.code() == error_code_operation_cancelled)
throw;
else if (e.code() == error_code_not_committed)
else if (e.code() == error_code_not_committed){
++self->conflicts;
}
wait(tr.onError(e));
++self->retries;
if (self->reportConflictingKeys && deterministicRandom()->random01() < 0.01){
// Standalone<VectorRef<KeyRangeRef>> rCKs;
// tr.extractConflictingKeys(rCKs);
// TraceEvent("ReportConflictingKeys").detail("KeySize", rCKs.size()).detail("KeyStart", rCKs[0].begin.toString()).detail("KeyEnd", rCKs[0].end.toString());
Optional<Standalone<StringRef>> temp = wait(tr.get(LiteralStringRef("\xff\xff/conflicting_keys/json")));
TraceEvent("ReportConflictingKeys").detail("Log", temp.present() ? temp.get().toString() : "DEBUG_NO_KEYS");
}
}
// reset all the operations' counters to 0
std::fill(perOpCount.begin(), perOpCount.end(), 0);

View File

@ -1,15 +1,15 @@
testTitle=MakoTest
testName=Mako
testDuration=10.0
testDuration=20.0
transactionsPerSecond=100000
rows=1000000
sampleSize=100
valueBytes=16
keyBytes=16
operations=g5gr5:10i5ir5:10grv5
operations=u8
actorCountPerClient=256
enableLogging=false
commitGet=false
populateData=true
runBenchmark=true
preserveData=true
preserveData=false