fix issues according to andrew's comments
This commit is contained in:
parent
5aee6880a8
commit
20b2984841
|
@ -521,7 +521,7 @@ DatabaseContext::DatabaseContext(
|
|||
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
|
||||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
|
||||
specialKeySpace(std::make_shared<SpecialKeySpace>(normalKeys.begin, specialKeys.end)), cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeys.begin, conflictingKeys.end))
|
||||
specialKeySpace(std::make_shared<SpecialKeySpace>(normalKeys.begin, specialKeys.end)), cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeysRange.begin, conflictingKeysRange.end))
|
||||
{
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
|
||||
|
@ -541,9 +541,7 @@ DatabaseContext::DatabaseContext(
|
|||
|
||||
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
// specialKeySpace = std::make_shared<SpecialKeySpace>(normalKeys.begin, normalKeys.end);
|
||||
// cKImpl = std::make_shared<ConflictingKeysImpl>(conflictingKeys.begin, conflictingKeys.end);
|
||||
specialKeySpace->registerKeyRange(conflictingKeys, cKImpl.get());
|
||||
specialKeySpace->registerKeyRange(conflictingKeysRange, cKImpl.get());
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
|
||||
|
@ -2758,7 +2756,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
// clear the RYW transaction which contains previous conflicting keys
|
||||
tr->info.conflictingKeys.reset();
|
||||
if (ci.conflictingKRIndices.present()) {
|
||||
tr->info.conflictingKeys = std::make_shared<CoalescedKeyRangeMap<Value>>(conflictingKeysFalse);
|
||||
tr->info.conflictingKeys = std::make_shared<CoalescedKeyRangeMap<Value>>(conflictingKeysFalse, specialKeys.end);
|
||||
state Standalone<VectorRef<int>> conflictingKRIndices = ci.conflictingKRIndices.get();
|
||||
// drop duplicate indices and merge overlapped ranges
|
||||
// Note: addReadConflictRange in native transaction object does not merge overlapped ranges
|
||||
|
@ -2766,8 +2764,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
conflictingKRIndices.end());
|
||||
for (auto const& rCRIndex : mergedIds) {
|
||||
const KeyRangeRef kr = req.transaction.read_conflict_ranges[rCRIndex];
|
||||
const KeyRange krWithPrefix = KeyRangeRef(kr.begin.withPrefix(conflictingKeysPrefix),
|
||||
kr.end.withPrefix(conflictingKeysPrefix));
|
||||
const KeyRange krWithPrefix = KeyRangeRef(kr.begin.withPrefix(conflictingKeysRange.begin),
|
||||
kr.end.withPrefix(conflictingKeysRange.begin));
|
||||
tr->info.conflictingKeys->insert(krWithPrefix, conflictingKeysTrue);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,10 +73,10 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
|
|||
state int actualEndOffset;
|
||||
|
||||
// remove specialKeys prefix
|
||||
if (withPrefix) {
|
||||
begin.setKey(begin.getKey().removePrefix(specialKeys.begin));
|
||||
end.setKey(end.getKey().removePrefix(specialKeys.begin));
|
||||
}
|
||||
// if (withPrefix) {
|
||||
// begin.setKey(begin.getKey().removePrefix(specialKeys.begin));
|
||||
// end.setKey(end.getKey().removePrefix(specialKeys.begin));
|
||||
// }
|
||||
|
||||
// make sure offset == 1
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator beginIter =
|
||||
|
@ -158,14 +158,14 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
|
|||
// limits handler
|
||||
for (int i = pairs.size() - 1; i >= 0; --i) {
|
||||
// TODO : use depends on with push_back
|
||||
KeyValueRef element =
|
||||
withPrefix ? KeyValueRef(pairs[i].key.withPrefix(specialKeys.begin, result.arena()), pairs[i].value)
|
||||
: pairs[i];
|
||||
result.push_back(result.arena(), element);
|
||||
// KeyValueRef element =
|
||||
// withPrefix ? KeyValueRef(pairs[i].key.withPrefix(specialKeys.begin, result.arena()), pairs[i].value)
|
||||
// : pairs[i];
|
||||
result.push_back(result.arena(), pairs[i]);
|
||||
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it is still
|
||||
// returned In other words, the total size of the returned value (less the last entry) will be less than
|
||||
// byteLimit
|
||||
limits.decrement(element);
|
||||
limits.decrement(pairs[i]);
|
||||
if (limits.isReached()) {
|
||||
result.more = true;
|
||||
result.readToBegin = false;
|
||||
|
@ -184,14 +184,14 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
|
|||
// limits handler
|
||||
for (int i = 0; i < pairs.size(); ++i) {
|
||||
// TODO : use depends on with push_back
|
||||
KeyValueRef element =
|
||||
withPrefix ? KeyValueRef(pairs[i].key.withPrefix(specialKeys.begin, result.arena()), pairs[i].value)
|
||||
: pairs[i];
|
||||
result.push_back(result.arena(), element);
|
||||
// KeyValueRef element =
|
||||
// withPrefix ? KeyValueRef(pairs[i].key.withPrefix(specialKeys.begin, result.arena()), pairs[i].value)
|
||||
// : pairs[i];
|
||||
result.push_back(result.arena(), pairs[i]);
|
||||
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it is still
|
||||
// returned In other words, the total size of the returned value (less the last entry) will be less than
|
||||
// byteLimit
|
||||
limits.decrement(element);
|
||||
limits.decrement(pairs[i]);
|
||||
if (limits.isReached()) {
|
||||
result.more = true;
|
||||
result.readThroughEnd = false;
|
||||
|
|
|
@ -59,9 +59,8 @@ void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>
|
|||
}
|
||||
}
|
||||
|
||||
const KeyRangeRef conflictingKeys = KeyRangeRef(LiteralStringRef("/transaction/conflicting_keys/"), LiteralStringRef("/transaction/conflicting_keys/\xff"));
|
||||
const KeyRef conflictingKeysPrefix = conflictingKeys.begin;
|
||||
const Key conflictingKeysAbsolutePrefix = conflictingKeysPrefix.withPrefix(specialKeys.begin);
|
||||
const KeyRangeRef conflictingKeysRange = KeyRangeRef(LiteralStringRef("\xff\xff/transaction/conflicting_keys/"), LiteralStringRef("\xff\xff/transaction/conflicting_keys/\xff"));
|
||||
const KeyRef conflictingKeysPrefix = LiteralStringRef("/transaction/conflicting_keys/");
|
||||
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
|
||||
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
|
||||
|
||||
|
|
|
@ -66,8 +66,7 @@ UID serverKeysDecodeServer( const KeyRef& key );
|
|||
bool serverHasKey( ValueRef storedValue );
|
||||
|
||||
extern const KeyRef conflictingKeysPrefix;
|
||||
extern const KeyRangeRef conflictingKeys;
|
||||
extern const Key conflictingKeysAbsolutePrefix;
|
||||
extern const KeyRangeRef conflictingKeysRange;
|
||||
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
|
||||
|
||||
extern const KeyRef cacheKeysPrefix;
|
||||
|
|
|
@ -162,8 +162,8 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
if (foundConflict) {
|
||||
// \xff\xff/transaction/conflicting_keys is always initialized to false, skip it here
|
||||
state KeyRange ckr =
|
||||
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysAbsolutePrefix)),
|
||||
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysAbsolutePrefix));
|
||||
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysRange.begin)),
|
||||
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysRange.begin));
|
||||
// The getRange here using the special key prefix "\xff\xff/transaction/conflicting_keys/" happens
|
||||
// locally Thus, the error handling is not needed here
|
||||
Future<Standalone<RangeResultRef>> conflictingKeyRangesFuture =
|
||||
|
@ -176,14 +176,14 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
ASSERT(!conflictingKeyRanges.more);
|
||||
for (int i = 0; i < conflictingKeyRanges.size(); i += 2) {
|
||||
KeyValueRef startKeyWithPrefix = conflictingKeyRanges[i];
|
||||
ASSERT(startKeyWithPrefix.key.startsWith(conflictingKeysAbsolutePrefix));
|
||||
ASSERT(startKeyWithPrefix.key.startsWith(conflictingKeysRange.begin));
|
||||
ASSERT(startKeyWithPrefix.value == conflictingKeysTrue);
|
||||
KeyValueRef endKeyWithPrefix = conflictingKeyRanges[i + 1];
|
||||
ASSERT(endKeyWithPrefix.key.startsWith(conflictingKeysAbsolutePrefix));
|
||||
ASSERT(endKeyWithPrefix.key.startsWith(conflictingKeysRange.begin));
|
||||
ASSERT(endKeyWithPrefix.value == conflictingKeysFalse);
|
||||
// Remove the prefix of returning keys
|
||||
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
|
||||
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
|
||||
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysRange.begin);
|
||||
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysRange.begin);
|
||||
KeyRangeRef kr = KeyRangeRef(startKey, endKey);
|
||||
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
|
||||
// Read_conflict_range remains same in the resolver.
|
||||
|
|
Loading…
Reference in New Issue