Removed TLogData::toBePoppedMutex

This commit is contained in:
sfc-gh-tclinkenbeard 2020-12-03 09:36:44 -08:00
parent a8af598307
commit 1003057a7e
1 changed files with 6 additions and 14 deletions

View File

@ -340,7 +340,6 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
FlowLock toBePoppedMutex;
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
@ -1006,19 +1005,12 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
return Void();
}
ACTOR Future<Void> queuePopRequest(TLogData* self, Tag tag, Version version) {
wait(self->toBePoppedMutex.take());
FlowLock::Releaser releaser(self->toBePoppedMutex);
auto& v = self->toBePopped[tag];
v = std::max(v, version);
return Void();
}
ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Reference<LogData> logData ) {
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
wait(queuePopRequest(self, inputTag, to));
auto& v = self->toBePopped[inputTag];
v = std::max(v, to);
TraceEvent(SevDebug, "IgnoringPopRequest")
.detail("IgnorePopDeadline", self->ignorePopDeadline)
@ -1063,15 +1055,16 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData) {
state const size_t batchSize = 1000;
wait(self->toBePoppedMutex.take());
FlowLock::Releaser releaser(self->toBePoppedMutex);
state std::vector<Future<Void>> ignoredPops;
state std::map<Tag, Version>::const_iterator it;
state int ignoredPopsPlayed = 0;
state std::map<Tag, Version> toBePopped;
toBePopped = std::move(self->toBePopped);
self->toBePopped.clear();
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.cbegin(); it != self->toBePopped.cend(); ++it) {
for (it = toBePopped.cbegin(); it != toBePopped.cend(); ++it) {
const auto& [tag, version] = *it;
TraceEvent("PlayIgnoredPop").detail("Tag", tag.toString()).detail("Version", version);
ignoredPops.push_back(tLogPopCore(self, tag, version, logData));
@ -1079,7 +1072,6 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
wait(yield());
}
}
self->toBePopped.clear();
wait(waitForAll(ignoredPops));
return Void();
}