Fixed a memory bug in the watch map. A few other formatting tweaks, etc.
This commit is contained in:
parent
431d3de43b
commit
138932c12f
|
@ -533,7 +533,7 @@ public:
|
||||||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
|
std::unordered_map<Key, Reference<WatchMetadata>> watchMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1912,9 +1912,8 @@ Reference<WatchMetadata> DatabaseContext::getWatchMetadata(KeyRef key) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
KeyRef DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
KeyRef DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||||
KeyRef keyRef = metadata->parameters->key;
|
watchMap[metadata->parameters->key] = metadata;
|
||||||
watchMap[keyRef] = metadata;
|
return metadata->parameters->key;
|
||||||
return keyRef;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseContext::deleteWatchMetadata(KeyRef key) {
|
void DatabaseContext::deleteWatchMetadata(KeyRef key) {
|
||||||
|
@ -2974,14 +2973,18 @@ ACTOR Future<Void> watchStorageServerResp(KeyRef key, Database cx) {
|
||||||
if (!metadata.isValid())
|
if (!metadata.isValid())
|
||||||
return Void();
|
return Void();
|
||||||
|
|
||||||
if (watchVersion >= metadata->parameters->version) { // case 1: version_1 (SS) >= version_2 (map)
|
// case 1: version_1 (SS) >= version_2 (map)
|
||||||
|
if (watchVersion >= metadata->parameters->version) {
|
||||||
cx->deleteWatchMetadata(key);
|
cx->deleteWatchMetadata(key);
|
||||||
if (metadata->watchPromise.canBeSet())
|
if (metadata->watchPromise.canBeSet())
|
||||||
metadata->watchPromise.send(watchVersion);
|
metadata->watchPromise.send(watchVersion);
|
||||||
} else { // ABA happens
|
}
|
||||||
|
// ABA happens
|
||||||
|
else {
|
||||||
TEST(true); // ABA issue where the version returned from the server is less than the version in the map
|
TEST(true); // ABA issue where the version returned from the server is less than the version in the map
|
||||||
if (metadata->watchPromise.getFutureReferenceCount() ==
|
|
||||||
1) { // case 2: version_1 < version_2 and future_count == 1
|
// case 2: version_1 < version_2 and future_count == 1
|
||||||
|
if (metadata->watchPromise.getFutureReferenceCount() == 1) {
|
||||||
cx->deleteWatchMetadata(key);
|
cx->deleteWatchMetadata(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3067,7 +3070,7 @@ Future<Void> getWatchFuture(Database cx, Reference<WatchParameters> parameters)
|
||||||
// recreate in SS)
|
// recreate in SS)
|
||||||
else if (parameters->version > metadata->parameters->version) {
|
else if (parameters->version > metadata->parameters->version) {
|
||||||
TEST(true); // Setting a watch that has a different value than the one in the map but a higher version (newer)
|
TEST(true); // Setting a watch that has a different value than the one in the map but a higher version (newer)
|
||||||
cx->deleteWatchMetadata(parameters->key.contents());
|
cx->deleteWatchMetadata(parameters->key);
|
||||||
|
|
||||||
metadata->watchPromise.send(parameters->version);
|
metadata->watchPromise.send(parameters->version);
|
||||||
metadata->watchFutureSS.cancel();
|
metadata->watchFutureSS.cancel();
|
||||||
|
|
|
@ -252,8 +252,6 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
||||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||||
|
|
||||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||||
TransactionState() {}
|
|
||||||
|
|
||||||
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID) {}
|
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID) {}
|
||||||
|
|
||||||
TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference<TransactionLogInfo> trLogInfo)
|
TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference<TransactionLogInfo> trLogInfo)
|
||||||
|
|
|
@ -98,7 +98,7 @@ struct ReadHotDetectionWorkload : TestWorkload {
|
||||||
loop {
|
loop {
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
try {
|
try {
|
||||||
StorageMetrics sm = wait(tr.getDatabase()->getStorageMetrics(self->wholeRange, 100));
|
StorageMetrics sm = wait(cx->getStorageMetrics(self->wholeRange, 100));
|
||||||
// TraceEvent("RHDCheckPhaseLog")
|
// TraceEvent("RHDCheckPhaseLog")
|
||||||
// .detail("KeyRangeSize", sm.bytes)
|
// .detail("KeyRangeSize", sm.bytes)
|
||||||
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
|
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
|
||||||
|
|
Loading…
Reference in New Issue