tlog to snapshot exactly at exec version

This commit is contained in:
sramamoorthy 2019-04-08 05:23:48 -07:00 committed by Alex Miller
parent 89b7a052f5
commit cfdad0c5e6
5 changed files with 63 additions and 17 deletions

View File

@ -431,13 +431,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
UID recruitmentID;
std::set<Tag> allTags;
Future<Void> terminated;
Promise<Void> execOpHold;
bool execOpCommitInProgress;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid)
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
@ -1313,6 +1315,14 @@ ACTOR Future<Void> tLogCommit(
return Void();
}
// while exec op is being committed, no new transactions will be admitted.
// This property is useful for snapshot kind of operations which wants to
// take a snap of the disk image at a particular version (no data from
// future version to be included)
if (logData->execOpCommitInProgress) {
wait(logData->execOpHold.getFuture());
}
state Version execVersion = invalidVersion;
state ExecCmdValueString execArg();
state TLogQueueEntryRef qe;
@ -1458,6 +1468,13 @@ ACTOR Future<Void> tLogCommit(
}
}
}
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
logData->execOpHold.reset();
}
}
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
@ -1517,7 +1534,7 @@ ACTOR Future<Void> tLogCommit(
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(mkdirBin);
paramList.push_back(tLogFolderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, true);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {
@ -1569,8 +1586,12 @@ ACTOR Future<Void> tLogCommit(
te.trackLatest(message.c_str());
}
}
execVersion = invalidVersion;
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
logData->execOpCommitInProgress = false;
logData->execOpHold.send(Void());
}
execVersion = invalidVersion;
if(stopped.isReady()) {
ASSERT(logData->stopped);

View File

@ -489,13 +489,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
UID recruitmentID;
std::set<Tag> allTags;
Future<Void> terminated;
Promise<Void> execOpHold;
bool execOpCommitInProgress;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, uint64_t protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid)
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
@ -1680,6 +1682,14 @@ ACTOR Future<Void> tLogCommit(
return Void();
}
// while exec op is being committed, no new transactions will be admitted.
// This property is useful for snapshot kind of operations which wants to
// take a snap of the disk image at a particular version (not data from
// future version to be included)
if (logData->execOpCommitInProgress) {
wait(logData->execOpHold.getFuture());
}
state Version execVersion = invalidVersion;
state ExecCmdValueString execArg();
state TLogQueueEntryRef qe;
@ -1738,7 +1748,7 @@ ACTOR Future<Void> tLogCommit(
rd >> len;
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
TraceEvent("TLogExecCommandType", self->dbgid).detail("Value", execCmd.toString());
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid).detail("Value", execCmd.toString());
execArg.setCmdValueString(param2.toString());
execArg.dbgPrint();
@ -1766,12 +1776,12 @@ ACTOR Future<Void> tLogCommit(
std::string message = "ExecTrace/TLog/" +
logData->allTags.begin()->toString();
"/" + uidStr;
TraceEvent("ExecCmdSnapCreate")
.detail("Uid", uidStr)
.detail("Status", -1)
.detail("Tag", logData->allTags.begin()->toString())
.detail("Role", "TLog")
.trackLatest(message.c_str());
TraceEvent("ExecCmdSnapCreate")
.detail("Uid", uidStr)
.detail("Status", -1)
.detail("Tag", logData->allTags.begin()->toString())
.detail("Role", "TLog")
.trackLatest(message.c_str());
}
}
if (execCmd == execDisableTLogPop) {
@ -1826,6 +1836,13 @@ ACTOR Future<Void> tLogCommit(
}
}
}
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
logData->execOpHold.reset();
}
}
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
@ -1886,7 +1903,7 @@ ACTOR Future<Void> tLogCommit(
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(mkdirBin);
paramList.push_back(tLogFolderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, true);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {
@ -1938,8 +1955,12 @@ ACTOR Future<Void> tLogCommit(
te.trackLatest(message.c_str());
}
}
execVersion = invalidVersion;
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
logData->execOpCommitInProgress = false;
logData->execOpHold.send(Void());
}
execVersion = invalidVersion;
if(stopped.isReady()) {
ASSERT(logData->stopped);

View File

@ -451,7 +451,7 @@ typedef decltype(&tLog) TLogFn;
// spawns a process pointed by `binPath` and the arguments provided at `paramList`,
// if the process spawned takes more than `maxWaitTime` then it will be killed
ACTOR Future<int> spawnProcess(std::string binPath, vector<std::string> paramList, double maxWaitTime);
ACTOR Future<int> spawnProcess(std::string binPath, vector<std::string> paramList, double maxWaitTime, bool isSync = false);
#include "flow/unactorcompiler.h"
#endif

View File

@ -1915,7 +1915,7 @@ snapHelper(StorageServer* data, MutationRef m, Version ver)
paramList.push_back(mkdirBin);
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, true);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {

View File

@ -1211,7 +1211,7 @@ ACTOR Future<Void> workerServer(
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(mkdirBin);
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, true);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {
@ -1432,7 +1432,7 @@ ACTOR Future<Void> fdbd(
}
}
ACTOR Future<int> spawnProcess(std::string binPath, vector<std::string> paramList, double maxWaitTime)
ACTOR Future<int> spawnProcess(std::string binPath, vector<std::string> paramList, double maxWaitTime, bool isSync)
{
state pid_t pid = -1;
try {
@ -1445,6 +1445,10 @@ ACTOR Future<int> spawnProcess(std::string binPath, vector<std::string> paramLis
return -1;
}
if (!isSync && g_network->isSimulated()) {
wait(delay(g_random->random01()));
}
state double sleepTime = 0;
state int err = 0;
while (true) {