From edf52e8c97050e62f0b9f7d07688ab3ca7ce6b65 Mon Sep 17 00:00:00 2001 From: chaoguang <13974480+zjuLcg@users.noreply.github.com> Date: Thu, 10 Oct 2019 15:42:52 -0700 Subject: [PATCH 01/45] First version for reporting conflicting keys --- bindings/flow/tester/Tester.actor.cpp | 2 ++ fdbclient/CommitTransaction.h | 9 ++++-- fdbclient/MasterProxyInterface.h | 16 +++++++--- fdbclient/NativeAPI.actor.cpp | 16 +++++++++- fdbclient/NativeAPI.actor.h | 3 ++ fdbclient/ReadYourWrites.actor.cpp | 18 +++++++++++ fdbclient/vexillographer/fdb.options | 5 +++ fdbserver/ConflictSet.h | 3 +- fdbserver/MasterProxyServer.actor.cpp | 35 +++++++++++++++++++-- fdbserver/Resolver.actor.cpp | 5 ++- fdbserver/ResolverInterface.h | 3 +- fdbserver/SkipList.cpp | 44 +++++++++++++++++++-------- fdbserver/workloads/Mako.actor.cpp | 16 ++++++++-- tests/Mako.txt | 6 ++-- 14 files changed, 150 insertions(+), 31 deletions(-) diff --git a/bindings/flow/tester/Tester.actor.cpp b/bindings/flow/tester/Tester.actor.cpp index 52d193320e..508d3ae30f 100644 --- a/bindings/flow/tester/Tester.actor.cpp +++ b/bindings/flow/tester/Tester.actor.cpp @@ -1584,6 +1584,7 @@ struct UnitTestsFunc : InstructionFunc { data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_RETRY_LIMIT, Optional(StringRef((const uint8_t*)&noRetryLimit, 8))); data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_CAUSAL_READ_RISKY); data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_INCLUDE_PORT_IN_ADDRESS); + data->db->setDatabaseOption(FDBDatabaseOption::FDB_DB_OPTION_TRANSACTION_REPORT_CONFLICTING_KEYS); state Reference tr = data->db->createTransaction(); tr->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE); @@ -1603,6 +1604,7 @@ struct UnitTestsFunc : InstructionFunc { tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_LOCK_AWARE); tr->setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE); tr->setOption(FDBTransactionOption::FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS); + tr->setOption(FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS); Optional > _ = wait(tr->get(LiteralStringRef("\xff"))); tr->cancel(); diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 5ebb245c72..700cf75a4f 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -137,21 +137,23 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) { } struct CommitTransactionRef { - CommitTransactionRef() : read_snapshot(0) {} + CommitTransactionRef() : read_snapshot(0), report_conflicting_keys(false) {} CommitTransactionRef(Arena &a, const CommitTransactionRef &from) : read_conflict_ranges(a, from.read_conflict_ranges), write_conflict_ranges(a, from.write_conflict_ranges), mutations(a, from.mutations), - read_snapshot(from.read_snapshot) { + read_snapshot(from.read_snapshot), + report_conflicting_keys(from.report_conflicting_keys) { } VectorRef< KeyRangeRef > read_conflict_ranges; VectorRef< KeyRangeRef > write_conflict_ranges; VectorRef< MutationRef > mutations; Version read_snapshot; + bool report_conflicting_keys; template force_inline void serialize( Ar& ar ) { - serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot); + serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys); } // Convenience for internal code required to manipulate these without the Native API @@ -161,6 +163,7 @@ struct CommitTransactionRef { } void clear( Arena& arena, KeyRangeRef const& keys ) { + // TODO: check do I need to clear flag here mutations.push_back_deep(arena, MutationRef(MutationRef::ClearRange, keys.begin, keys.end)); write_conflict_ranges.push_back_deep(arena, keys); } diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 5b00fd5008..ae0e76ce36 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -103,26 +103,30 @@ struct CommitID { constexpr static FileIdentifier file_identifier = 14254927; Version version; // returns invalidVersion if transaction conflicts uint16_t txnBatchId; - Optional metadataVersion; + Optional metadataVersion; + // TODO : data structure okay here ? + Optional>> conflictingKeyRanges; template void serialize(Ar& ar) { - serializer(ar, version, txnBatchId, metadataVersion); + serializer(ar, version, txnBatchId, metadataVersion, conflictingKeyRanges); } CommitID() : version(invalidVersion), txnBatchId(0) {} - CommitID( Version version, uint16_t txnBatchId, const Optional& metadataVersion ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion) {} + CommitID( Version version, uint16_t txnBatchId, const Optional& metadataVersion, const Optional>>& conflictingKeyRanges = Optional>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKeyRanges(conflictingKeyRanges) {} }; struct CommitTransactionRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 93948; enum { FLAG_IS_LOCK_AWARE = 0x1, - FLAG_FIRST_IN_BATCH = 0x2 + FLAG_FIRST_IN_BATCH = 0x2, + FLAG_REPORT_CONFLICTING_KEYS = 0x4 }; bool isLockAware() const { return (flags & FLAG_IS_LOCK_AWARE) != 0; } bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } + bool isReportConflictingKeys() const { return (flags & FLAG_REPORT_CONFLICTING_KEYS) != 0; } Arena arena; CommitTransactionRef transaction; @@ -136,6 +140,10 @@ struct CommitTransactionRequest : TimedRequest { void serialize(Ar& ar) { serializer(ar, transaction, reply, arena, flags, debugID); } + + void reportConflictingKeys(){ + transaction.report_conflicting_keys = true; + } }; static inline int getBytes( CommitTransactionRequest const& r ) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 34bbc60ed3..6195434854 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2613,7 +2613,7 @@ ACTOR static Future tryCommit( Database cx, Reference proxy_memory_limit_exceeded(), commit_unknown_result()}); } - + try { Version v = wait( readVersion ); req.transaction.read_snapshot = v; @@ -2673,6 +2673,10 @@ ACTOR static Future tryCommit( Database cx, Reference } return Void(); } else { + if (ci.conflictingKeyRanges.present()){ + tr->info.conflictingKeyRanges.push_back_deep(tr->info.conflictingKeyRanges.arena(), ci.conflictingKeyRanges.get()); + } + if (info.debugID.present()) TraceEvent(interval.end()).detail("Conflict", 1); @@ -2784,6 +2788,11 @@ Future Transaction::commitMutations() { if(options.firstInBatch) { tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH; } + if(options.reportConflictingKeys) { + // TODO : Is it better to keep it as a flag? + tr.flags = tr.flags | CommitTransactionRequest::FLAG_REPORT_CONFLICTING_KEYS; + tr.reportConflictingKeys(); + } Future commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options ); @@ -2974,6 +2983,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional debugID; TaskPriority taskID; bool useProvisionalProxies; + Standalone>> conflictingKeyRanges; explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {} }; @@ -271,6 +273,7 @@ public: void reset(); void fullReset(); double getBackoff(int errCode); + void debugTransaction(UID dID) { info.debugID = dID; } Future commitMutations(); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index c41739d907..e459d05af8 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -23,6 +23,7 @@ #include "fdbclient/DatabaseContext.h" #include "fdbclient/StatusClient.h" #include "fdbclient/MonitorLeader.h" +#include "fdbclient/JsonBuilder.h" #include "flow/Util.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -1228,6 +1229,23 @@ Future< Optional > ReadYourWritesTransaction::get( const Key& key, bool s return Optional(); } + // TODO : add conflict keys to special key space + if (key == LiteralStringRef("\xff\xff/conflicting_keys/json")){ + if (!tr.info.conflictingKeyRanges.empty()){ + // TODO : return a json value which represents all the values + JsonBuilderArray conflictingKeysArray; + for (auto & cKR : tr.info.conflictingKeyRanges) { + for (auto & kr : cKR) { + conflictingKeysArray.push_back(format("[%s, %s)", kr.begin.toString().c_str(), kr.end.toString().c_str())); + } + } + Optional output = StringRef(conflictingKeysArray.getJson()); + return output; + } else { + return Optional(); + } + } + if(checkUsedDuringCommit()) { return used_during_commit(); } diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index 890dea4864..035335e2a2 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -174,6 +174,9 @@ description is not currently required but encouraged.