use FlowLock for implementing critical section

Instead of using Promises and future to implement
critcal section use FlowLock
This commit is contained in:
sramamoorthy 2019-05-09 14:56:54 -07:00 committed by Alex Miller
parent e6c0b87a4d
commit 5749e220bd
2 changed files with 34 additions and 8 deletions

View File

@ -431,7 +431,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
UID recruitmentID;
std::set<Tag> allTags;
Future<Void> terminated;
Promise<Void> execOpHold;
FlowLock execOpLock;
bool execOpCommitInProgress;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
@ -1524,8 +1524,10 @@ ACTOR Future<Void> tLogCommit(
// future version to be included)
// NOTE: execOpCommitInProgress will not be set for exec commands which
// start with \xff
state bool execOpLockTaken = false;
if (logData->execOpCommitInProgress) {
wait(logData->execOpHold.getFuture());
wait(logData->execOpLock.take());
execOpLockTaken = true;
}
state Version execVersion = invalidVersion;
@ -1550,10 +1552,17 @@ ACTOR Future<Void> tLogCommit(
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("LogId", logData->logId)
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
logData->execOpHold.reset();
if (!execOpLockTaken) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
} else {
ASSERT(logData->execOpLock.available() == 0);
}
ASSERT(execOpLockTaken);
}
}
@ -1584,8 +1593,12 @@ ACTOR Future<Void> tLogCommit(
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
ASSERT(execOpLockTaken);
logData->execOpCommitInProgress = false;
logData->execOpHold.send(Void());
}
if (execOpLockTaken) {
logData->execOpLock.release();
execOpLockTaken = false;
}
execVersion = invalidVersion;

View File

@ -489,7 +489,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
UID recruitmentID;
std::set<Tag> allTags;
Future<Void> terminated;
Promise<Void> execOpHold;
FlowLock execOpLock;
bool execOpCommitInProgress;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, uint64_t protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
@ -1891,8 +1891,10 @@ ACTOR Future<Void> tLogCommit(
// future version to be included)
// NOTE: execOpCommitInProgress will not be set for exec commands which
// start with \xff
state bool execOpLockTaken = false;
if (logData->execOpCommitInProgress) {
wait(logData->execOpHold.getFuture());
wait(logData->execOpLock.take());
execOpLockTaken = true;
}
state Version execVersion = invalidVersion;
@ -1917,10 +1919,17 @@ ACTOR Future<Void> tLogCommit(
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("LogId", logData->logId)
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
logData->execOpHold.reset();
if (!execOpLockTaken) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
} else {
ASSERT(logData->execOpLock.available() == 0);
}
ASSERT(execOpLockTaken);
}
}
@ -1952,8 +1961,12 @@ ACTOR Future<Void> tLogCommit(
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
ASSERT(execOpLockTaken);
logData->execOpCommitInProgress = false;
logData->execOpHold.send(Void());
}
if (execOpLockTaken) {
logData->execOpLock.release();
execOpLockTaken = false;
}
execVersion = invalidVersion;