snap v2: TLog related changes

This commit is contained in:
sramamoorthy 2019-06-19 11:16:30 -07:00 committed by Alex Miller
parent ba6bccce73
commit f4e257e464
3 changed files with 223 additions and 3 deletions

View File

@ -1478,7 +1478,7 @@ ACTOR Future<Void> tLogSnapHelper(TLogData* self,
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog"));
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
@ -1796,6 +1796,37 @@ void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingM
req.reply.send( reply );
}
ACTOR Future<Void>
tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").error(e);
snapReq.reply.sendError(e);
}
return Void();
}
ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Reference<LogData> logData, PromiseStream<Void> warningCollectorInput ) {
state Future<Void> dbInfoChange = Void();
@ -1857,6 +1888,57 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
else
req.reply.sendError( tlog_stopped() );
}
when( TLogDisablePopRequest req = waitNext( tli.disablePopRequest.getFuture() ) ) {
self->ignorePopRequest = true;
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", req.snapUID.toString())
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
}
//FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
self->ignorePopUid = req.snapUID.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
req.reply.send(Void());
}
when( state TLogEnablePopRequest enablePopReq = waitNext( tli.enablePopRequest.getFuture() ) ) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", enablePopReq.snapUID.toString());
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
state vector<Future<Void>> ignoredPops;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
TraceEvent("TLogExecCmdPopEnable")
.detail("UidStr", enablePopReq.snapUID.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
enablePopReq.reply.send(Void());
}
when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) {
logData->addActor.send( tLogSnapCreate( snapReq, self, logData) );
}
}
}

View File

@ -45,6 +45,10 @@ struct TLogInterface {
RequestStream< struct TLogConfirmRunningRequest > confirmRunning; // used for getReadVersion requests from client
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream< struct TLogRecoveryFinishedRequest > recoveryFinished;
RequestStream< struct TLogDisablePopRequest> disablePopRequest;
RequestStream< struct TLogEnablePopRequest> enablePopRequest;
RequestStream< struct TLogSnapRequest> snapRequest;
TLogInterface() {}
explicit TLogInterface(LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
@ -69,7 +73,8 @@ struct TLogInterface {
ASSERT(ar.isDeserializing || uniqueID != UID());
}
serializer(ar, uniqueID, sharedTLogID, locality, peekMessages, popMessages
, commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished);
, commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished
, disablePopRequest, enablePopRequest, snapRequest);
}
};
@ -231,6 +236,7 @@ struct TLogCommitRequest {
}
};
struct TLogQueuingMetricsReply {
constexpr static FileIdentifier file_identifier = 12206626;
double localTime;
@ -255,4 +261,53 @@ struct TLogQueuingMetricsRequest {
}
};
struct TLogDisablePopRequest {
constexpr static FileIdentifier file_identifier = 4022806;
Arena arena;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
TLogDisablePopRequest() {}
TLogDisablePopRequest(const UID uid) : snapUID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapUID, reply, arena, debugID);
}
};
struct TLogEnablePopRequest {
constexpr static FileIdentifier file_identifier = 4022809;
Arena arena;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
TLogEnablePopRequest() {}
TLogEnablePopRequest(const UID uid) : snapUID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapUID, reply, arena, debugID);
}
};
struct TLogSnapRequest {
constexpr static FileIdentifier file_identifier = 8184128;
ReplyPromise<Void> reply;
Arena arena;
StringRef snapPayload;
UID snapUID;
StringRef role;
TLogSnapRequest(StringRef snapPayload, UID snapUID, StringRef role) : snapPayload(snapPayload), snapUID(snapUID), role(role) {}
TLogSnapRequest() : snapPayload() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, snapPayload, snapUID, role, arena);
}
};
#endif

View File

@ -1853,7 +1853,7 @@ ACTOR Future<Void> tLogSnapHelper(TLogData* self,
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog"));
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
@ -2174,6 +2174,38 @@ void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingM
req.reply.send( reply );
}
ACTOR Future<Void>
tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").error(e);
snapReq.reply.sendError(e);
}
return Void();
}
ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Reference<LogData> logData, PromiseStream<Void> warningCollectorInput ) {
state Future<Void> dbInfoChange = Void();
@ -2235,6 +2267,57 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
else
req.reply.sendError( tlog_stopped() );
}
when( TLogDisablePopRequest req = waitNext( tli.disablePopRequest.getFuture() ) ) {
self->ignorePopRequest = true;
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", req.snapUID.toString())
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
}
//FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
self->ignorePopUid = req.snapUID.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
req.reply.send(Void());
}
when( state TLogEnablePopRequest enablePopReq = waitNext( tli.enablePopRequest.getFuture() ) ) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", enablePopReq.snapUID.toString());
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
state vector<Future<Void>> ignoredPops;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
TraceEvent("TLogExecCmdPopEnable")
.detail("UidStr", enablePopReq.snapUID.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
enablePopReq.reply.send(Void());
}
when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) {
logData->addActor.send( tLogSnapCreate( snapReq, self, logData) );
}
}
}