A simple intergration to put conflictingKeys into special-key-space
This commit is contained in:
parent
b76baa9cc2
commit
5b6ae4da7f
|
@ -46,6 +46,7 @@ typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo
|
|||
typedef MultiInterface<MasterProxyInterface> ProxyInfo;
|
||||
|
||||
class SpecialKeySpace; //forward declaration
|
||||
class ConflictingKeysImpl;
|
||||
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
|
||||
public:
|
||||
static DatabaseContext* allocateOnForeignThread() {
|
||||
|
@ -230,6 +231,7 @@ public:
|
|||
|
||||
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
|
||||
std::shared_ptr<SpecialKeySpace> specialKeySpace;
|
||||
std::shared_ptr<ConflictingKeysImpl> cKImpl;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -541,6 +541,8 @@ DatabaseContext::DatabaseContext(
|
|||
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
specialKeySpace = std::make_shared<SpecialKeySpace>(normalKeys.begin, normalKeys.end);
|
||||
cKImpl = std::make_shared<ConflictingKeysImpl>(conflictingKeys.begin, conflictingKeys.end);
|
||||
specialKeySpace->registerKeyRange(conflictingKeys, cKImpl.get());
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
|
||||
|
|
|
@ -1281,8 +1281,12 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
|
|||
}
|
||||
|
||||
// start with simplest point, special key space are only allowed to query if both begin and end start with \xff\xff
|
||||
if (begin.getKey().startsWith(specialKeys.begin) && end.getKey().startsWith(specialKeys.begin))
|
||||
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>(this), begin, end, limits, snapshot, reverse);
|
||||
if (begin.getKey().startsWith(specialKeys.begin) && end.getKey().startsWith(specialKeys.begin)) {
|
||||
Reference<ReadYourWritesTransaction> self = Reference<ReadYourWritesTransaction>(this);
|
||||
auto result = getDatabase()->specialKeySpace->getRange(self, begin, end, limits, snapshot, reverse);
|
||||
self.extractPtr();
|
||||
return result;
|
||||
}
|
||||
|
||||
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
|
||||
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
|
||||
|
|
|
@ -131,6 +131,10 @@ public:
|
|||
Database getDatabase() const {
|
||||
return tr.getDatabase();
|
||||
}
|
||||
|
||||
const Transaction& getTransaction() const {
|
||||
return tr;
|
||||
}
|
||||
private:
|
||||
friend class RYWImpl;
|
||||
|
||||
|
|
|
@ -69,5 +69,17 @@ private:
|
|||
KeyRange range;
|
||||
};
|
||||
|
||||
class ConflictingKeysImpl : public SpecialKeyRangeBaseImpl {
|
||||
public:
|
||||
explicit ConflictingKeysImpl(KeyRef start, KeyRef end) : SpecialKeyRangeBaseImpl(start, end) {}
|
||||
virtual Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const {
|
||||
auto resultFuture = ryw->getTransaction().info.conflictingKeysRYW->getRange(kr, CLIENT_KNOBS->TOO_MANY);
|
||||
// all keys are written to RYW, since GRV is set, the read should happen locally
|
||||
ASSERT(resultFuture.isReady());
|
||||
return resultFuture.getValue();
|
||||
}
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue