execProcessingHelper made synchronous
tLogCommit exects no blocking between duplicate check and setting of the new version, that constraint was broken when synchronous execProcessingHelper was introduced. As a fix, execProcessingHelper was made asynchronous.
This commit is contained in:
parent
ceac68c990
commit
f27a40f118
|
@ -1277,20 +1277,21 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
||||
Reference<LogData> logData,
|
||||
TLogCommitRequest* req,
|
||||
Standalone<VectorRef<Tag>>* execTags,
|
||||
ExecCmdValueString* execArg,
|
||||
StringRef* execCmd,
|
||||
Version* execVersion,
|
||||
vector<Future<Void>>* snapFailKeySetters)
|
||||
void execProcessingHelper(TLogData* self,
|
||||
Reference<LogData> logData,
|
||||
TLogCommitRequest* req,
|
||||
Standalone<VectorRef<Tag>>* execTags,
|
||||
ExecCmdValueString* execArg,
|
||||
StringRef* execCmd,
|
||||
Version* execVersion,
|
||||
vector<Future<Void>>* snapFailKeySetters,
|
||||
vector<Future<Void>>* ignoredPops)
|
||||
{
|
||||
// inspect the messages to find if there is an Exec type and print
|
||||
// it. message are prefixed by the length of the message and each
|
||||
// field is prefixed by the length too
|
||||
uint8_t type = MutationRef::MAX_ATOMIC_OP;
|
||||
state StringRef param2;
|
||||
StringRef param2;
|
||||
ArenaReader rd(req->arena, req->messages, Unversioned());
|
||||
int32_t messageLength, rawLength;
|
||||
uint16_t tagCount;
|
||||
|
@ -1327,17 +1328,19 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
rd >> len;
|
||||
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
|
||||
|
||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid).detail("Value", execCmd->toString());
|
||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
|
||||
.detail("Value", execCmd->toString())
|
||||
.detail("Version", req->version);
|
||||
|
||||
execArg->setCmdValueString(param2);
|
||||
execArg->dbgPrint();
|
||||
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
||||
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
||||
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
|
||||
*execVersion = req->version;
|
||||
}
|
||||
if (*execCmd == execSnap) {
|
||||
// validation check specific to snap request
|
||||
state std::string reason;
|
||||
std::string reason;
|
||||
if (!self->ignorePopRequest) {
|
||||
*execVersion = invalidVersion;
|
||||
reason = "SnapFailIgnorePopNotSet";
|
||||
|
@ -1350,13 +1353,15 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
TraceEvent(SevWarn, "TLogSnapFailed")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("Reason", reason);
|
||||
.detail("Reason", reason)
|
||||
.detail("Version", req->version);
|
||||
|
||||
TraceEvent("ExecCmdSnapCreate")
|
||||
.detail("Uid", uidStr.toString())
|
||||
.detail("Status", -1)
|
||||
.detail("Tag", logData->allTags.begin()->toString())
|
||||
.detail("Role", "TLog");
|
||||
.detail("Role", "TLog")
|
||||
.detail("Version", req->version);
|
||||
if (g_network->isSimulated()) {
|
||||
// write SnapFailedTLog.$UID
|
||||
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(uidStr);
|
||||
|
@ -1372,7 +1377,8 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
if (self->ignorePopUid != "") {
|
||||
TraceEvent(SevWarn, "TLogPopDisableonDisable")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("UidStr", uidStr.toString());
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
self->ignorePopUid = uidStr.toString();
|
||||
// ignorePopRequest will be turned off after 30 seconds
|
||||
|
@ -1382,19 +1388,20 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
.detail("UidStr", uidStr.toString())
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline);
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
if (*execCmd == execEnableTLogPop) {
|
||||
if (self->ignorePopUid != uidStr.toString()) {
|
||||
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("UidStr", uidStr.toString());
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops2");
|
||||
// use toBePopped and issue all the pops
|
||||
state std::map<Tag, Version>::iterator it;
|
||||
state vector<Future<Void>> ignoredPops;
|
||||
std::map<Tag, Version>::iterator it;
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopDeadline = 0.0;
|
||||
self->ignorePopUid = "";
|
||||
|
@ -1402,19 +1409,18 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
TraceEvent("PlayIgnoredPop")
|
||||
.detail("Tag", it->first.toString())
|
||||
.detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
self->toBePopped.clear();
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("TLogExecCmdPopEnable")
|
||||
.detail("ExecCmd", execCmd->toString())
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline);
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1528,6 +1534,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
state StringRef execCmd;
|
||||
state Standalone<VectorRef<Tag>> execTags;
|
||||
state vector<Future<Void>> snapFailKeySetters;
|
||||
state vector<Future<Void>> playIgnoredPops;
|
||||
|
||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
|
||||
if(req.debugID.present())
|
||||
|
@ -1540,7 +1547,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
qe.id = logData->logId;
|
||||
|
||||
if (req.hasExecOp) {
|
||||
wait(execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters));
|
||||
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
|
||||
if (execVersion != invalidVersion) {
|
||||
TraceEvent(SevDebug, "SettingExecOpCommit")
|
||||
.detail("ExecVersion", execVersion)
|
||||
|
@ -1564,7 +1571,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
|
||||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||
logData->version.set( req.version );
|
||||
|
||||
wait(waitForAll(playIgnoredPops));
|
||||
|
||||
if(req.debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||
|
|
|
@ -1644,20 +1644,21 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
||||
Reference<LogData> logData,
|
||||
TLogCommitRequest* req,
|
||||
Standalone<VectorRef<Tag>>* execTags,
|
||||
ExecCmdValueString* execArg,
|
||||
StringRef* execCmd,
|
||||
Version* execVersion,
|
||||
vector<Future<Void>>* snapFailKeySetters)
|
||||
void execProcessingHelper(TLogData* self,
|
||||
Reference<LogData> logData,
|
||||
TLogCommitRequest* req,
|
||||
Standalone<VectorRef<Tag>>* execTags,
|
||||
ExecCmdValueString* execArg,
|
||||
StringRef* execCmd,
|
||||
Version* execVersion,
|
||||
vector<Future<Void>>* snapFailKeySetters,
|
||||
vector<Future<Void>>* ignoredPops)
|
||||
{
|
||||
// inspect the messages to find if there is an Exec type and print
|
||||
// it. message are prefixed by the length of the message and each
|
||||
// field is prefixed by the length too
|
||||
uint8_t type = MutationRef::MAX_ATOMIC_OP;
|
||||
state StringRef param2;
|
||||
StringRef param2;
|
||||
ArenaReader rd(req->arena, req->messages, Unversioned());
|
||||
int32_t messageLength, rawLength;
|
||||
uint16_t tagCount;
|
||||
|
@ -1694,17 +1695,19 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
rd >> len;
|
||||
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
|
||||
|
||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid).detail("Value", execCmd->toString());
|
||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
|
||||
.detail("Value", execCmd->toString())
|
||||
.detail("Version", req->version);
|
||||
|
||||
execArg->setCmdValueString(param2);
|
||||
execArg->dbgPrint();
|
||||
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
||||
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
||||
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
|
||||
*execVersion = req->version;
|
||||
}
|
||||
if (*execCmd == execSnap) {
|
||||
// validation check specific to snap request
|
||||
state std::string reason;
|
||||
std::string reason;
|
||||
if (!self->ignorePopRequest) {
|
||||
*execVersion = invalidVersion;
|
||||
reason = "SnapFailIgnorePopNotSet";
|
||||
|
@ -1717,13 +1720,15 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
TraceEvent(SevWarn, "TLogSnapFailed")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("Reason", reason);
|
||||
.detail("Reason", reason)
|
||||
.detail("Version", req->version);
|
||||
|
||||
TraceEvent("ExecCmdSnapCreate")
|
||||
.detail("Uid", uidStr.toString())
|
||||
.detail("Status", -1)
|
||||
.detail("Tag", logData->allTags.begin()->toString())
|
||||
.detail("Role", "TLog");
|
||||
.detail("Role", "TLog")
|
||||
.detail("Version", req->version);
|
||||
|
||||
if (g_network->isSimulated()) {
|
||||
// write SnapFailedTLog.$UID
|
||||
|
@ -1740,7 +1745,8 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
if (self->ignorePopUid != "") {
|
||||
TraceEvent(SevWarn, "TLogPopDisableonDisable")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("UidStr", uidStr.toString());
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
self->ignorePopUid = uidStr.toString();
|
||||
// ignorePopRequest will be turned off after 30 seconds
|
||||
|
@ -1750,19 +1756,20 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
.detail("UidStr", uidStr.toString())
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline);
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
if (*execCmd == execEnableTLogPop) {
|
||||
if (self->ignorePopUid != uidStr.toString()) {
|
||||
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("UidStr", uidStr.toString());
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops2");
|
||||
// use toBePopped and issue all the pops
|
||||
state std::map<Tag, Version>::iterator it;
|
||||
state vector<Future<Void>> ignoredPops;
|
||||
std::map<Tag, Version>::iterator it;
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopDeadline = 0.0;
|
||||
self->ignorePopUid = "";
|
||||
|
@ -1770,19 +1777,18 @@ ACTOR Future<Void> execProcessingHelper(TLogData* self,
|
|||
TraceEvent("PlayIgnoredPop")
|
||||
.detail("Tag", it->first.toString())
|
||||
.detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
self->toBePopped.clear();
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("TLogExecCmdPopEnable")
|
||||
.detail("ExecCmd", execCmd->toString())
|
||||
.detail("UidStr", uidStr.toString())
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline);
|
||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
||||
.detail("Version", req->version);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tLogSnapHelper(TLogData* self,
|
||||
|
@ -1894,6 +1900,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
state TLogQueueEntryRef qe;
|
||||
state StringRef execCmd;
|
||||
state Standalone<VectorRef<Tag>> execTags;
|
||||
state vector<Future<Void>> playIgnoredPops;
|
||||
|
||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
|
||||
if(req.debugID.present())
|
||||
|
@ -1907,7 +1914,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
state vector<Future<Void>> snapFailKeySetters;
|
||||
|
||||
if (req.hasExecOp) {
|
||||
wait(execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters));
|
||||
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
|
||||
if (execVersion != invalidVersion) {
|
||||
TraceEvent(SevDebug, "SettingExecOpCommit")
|
||||
.detail("ExecVersion", execVersion)
|
||||
|
@ -1931,6 +1938,7 @@ ACTOR Future<Void> tLogCommit(
|
|||
|
||||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||
logData->version.set( req.version );
|
||||
wait(waitForAll(playIgnoredPops));
|
||||
|
||||
if(req.debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||
|
|
Loading…
Reference in New Issue