remove snap v1 related code

This commit is contained in:
sramamoorthy 2019-07-22 15:44:49 -07:00 committed by Alex Miller
parent 75ed367889
commit 9afd162e2f
26 changed files with 126 additions and 1228 deletions

View File

@ -2171,13 +2171,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
try {
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd, 2 /* version */)));
int version = 2;
if (version == 1) {
printf("Snapshots tagged with UID: %s, check logs for status\n", snapUID.toString().c_str());
} else {
printf("Snapshots create succeeded with UID: %s\n", snapUID.toString().c_str());
}
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd)));
printf("Snapshots create succeeded with UID: %s\n", snapUID.toString().c_str());
} catch (Error& e) {
fprintf(stderr, "Snapshot create failed, %d (%s)."
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());

View File

@ -44,8 +44,7 @@ static const char* typeString[] = { "SetValue",
"ByteMax",
"MinV2",
"AndV2",
"CompareAndClear",
"Exec" };
"CompareAndClear"};
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
@ -71,9 +70,6 @@ struct MutationRef {
MinV2,
AndV2,
CompareAndClear,
// ExecOp is always set with FIRST_IN_BATCH option to quickly identify
// the op in a transaction batch while parsing it in TLog
Exec,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.

View File

@ -1482,13 +1482,13 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vec
return inProgressExclusion;
}
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version) {
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd) {
state int retryCount = 0;
loop {
state UID snapUID = deterministicRandom()->randomUniqueID();
try {
wait(snapCreate(cx, snapCmd, snapUID, version));
wait(snapCreate(cx, snapCmd, snapUID));
TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID);
return snapUID;
} catch (Error& e) {

View File

@ -195,7 +195,7 @@ bool schemaMatch( json_spirit::mValue const& schema, json_spirit::mValue const&
// execute payload in 'snapCmd' on all the coordinators, TLogs and
// storage nodes
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version);
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd);
#include "flow/unactorcompiler.h"
#endif

View File

@ -50,7 +50,6 @@ struct MasterProxyInterface {
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct ExecRequest > execReq;
RequestStream< struct ProxySnapRequest > proxySnapReq;
UID id() const { return commit.getEndpoint().token; }
@ -63,7 +62,7 @@ struct MasterProxyInterface {
void serialize(Archive& ar) {
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics, execReq, proxySnapReq);
txnState, getHealthMetrics, proxySnapReq);
}
void initEndpoints() {
@ -333,23 +332,6 @@ struct GetHealthMetricsRequest
}
};
struct ExecRequest
{
constexpr static FileIdentifier file_identifier = 22403900;
Arena arena;
StringRef execPayload;
ReplyPromise<Void> reply;
Optional<UID> debugID;
explicit ExecRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit ExecRequest(StringRef exec, Optional<UID> debugID = Optional<UID>()) : execPayload(exec), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, execPayload, reply, arena, debugID);
}
};
struct ProxySnapRequest
{
constexpr static FileIdentifier file_identifier = 22204900;

View File

@ -2371,45 +2371,6 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
TEST(true); //NativeAPI atomic operation
}
ACTOR Future<Void> executeCoordinators(DatabaseContext* cx, StringRef execPayload, Optional<UID> debugID) {
try {
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.executeCoordinators.Before");
}
state ExecRequest req(execPayload, debugID);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
"NativeAPI.executeCoordinators.Inside loop");
}
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::execReq, req, cx->taskID));
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
"NativeAPI.executeCoordinators.After");
return Void();
} catch (Error& e) {
TraceEvent("NativeAPI.executeCoordinatorsError").error(e);
throw;
}
}
void Transaction::execute(const KeyRef& cmdType, const ValueRef& cmdPayload) {
TraceEvent("Execute operation").detail("Key", cmdType.toString()).detail("Value", cmdPayload.toString());
if (cmdType.size() > CLIENT_KNOBS->KEY_SIZE_LIMIT) throw key_too_large();
if (cmdPayload.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) throw value_too_large();
auto& req = tr;
// Helps with quickly finding the exec op in a tlog batch
setOption(FDBTransactionOptions::FIRST_IN_BATCH);
auto& t = req.transaction;
auto r = singleKeyRange(cmdType, req.arena);
auto v = ValueRef(req.arena, cmdPayload);
t.mutations.push_back(req.arena, MutationRef(MutationRef::Exec, r.begin, v));
}
void Transaction::clear( const KeyRangeRef& range, bool addConflictRange ) {
auto &req = tr;
auto &t = req.transaction;
@ -3440,105 +3401,6 @@ void enableClientInfoLogging() {
TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
}
ACTOR Future<Void> snapCreateVersion1(Database inputCx, StringRef snapCmd, UID snapUID) {
state Transaction tr(inputCx);
state DatabaseContext* cx = inputCx.getPtr();
// remember the client ID before the snap operation
state UID preSnapClientUID = cx->clientInfo->get().id;
TraceEvent("SnapCreateEnter")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
StringRef snapCmdArgs = snapCmd;
StringRef snapCmdPart = snapCmdArgs.eat(":");
Standalone<StringRef> snapUIDRef(snapUID.toString());
state Standalone<StringRef> snapPayloadRef = snapCmdPart
.withSuffix(LiteralStringRef(":uid="))
.withSuffix(snapUIDRef)
.withSuffix(LiteralStringRef(","))
.withSuffix(snapCmdArgs);
state Standalone<StringRef>
tLogCmdPayloadRef = LiteralStringRef("empty-binary:uid=").withSuffix(snapUIDRef);
// disable popping of TLog
tr.reset();
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.execute(execDisableTLogPop, tLogCmdPayloadRef);
wait(timeoutError(tr.commit(), 10));
break;
} catch (Error& e) {
TraceEvent("DisableTLogPopFailed").error(e);
wait(tr.onError(e));
}
}
TraceEvent("SnapCreateAfterLockingTLogs").detail("UID", snapUID);
// snap the storage and Tlogs
// if we retry the below command in failure cases with the same snapUID
// then the snapCreate can end up creating multiple snapshots with
// the same name which needs additional handling, hence we fail in
// failure cases and let the caller retry with different snapUID
tr.reset();
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.execute(execSnap, snapPayloadRef);
wait(tr.commit());
} catch (Error& e) {
TraceEvent("SnapCreateErroSnapTLogStorage").error(e);
throw;
}
TraceEvent("SnapCreateAfterSnappingTLogStorage").detail("UID", snapUID);
if (BUGGIFY) {
int32_t toDelay = deterministicRandom()->randomInt(1, 30);
wait(delay(toDelay));
}
// enable popping of the TLog
tr.reset();
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.execute(execEnableTLogPop, tLogCmdPayloadRef);
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("EnableTLogPopFailed").error(e);
wait(tr.onError(e));
}
}
TraceEvent("SnapCreateAfterUnlockingTLogs").detail("UID", snapUID);
// snap the coordinators
try {
Future<Void> exec = executeCoordinators(cx, snapPayloadRef, snapUID);
wait(timeoutError(exec, 5.0));
} catch (Error& e) {
TraceEvent("SnapCreateErrorSnapCoords").error(e);
throw;
}
TraceEvent("SnapCreateAfterSnappingCoords").detail("UID", snapUID);
// if the client IDs did not change then we have a clean snapshot
UID postSnapClientUID = cx->clientInfo->get().id;
if (preSnapClientUID != postSnapClientUID) {
TraceEvent("UID mismatch")
.detail("SnapPreSnapClientUID", preSnapClientUID)
.detail("SnapPostSnapClientUID", postSnapClientUID);
throw coordinators_changed();
}
TraceEvent("SnapCreateComplete").detail("UID", snapUID);
return Void();
}
ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef snapPayload, UID snapUID, Optional<UID> debugID) {
TraceEvent("NativeAPI.SnapshotDatabaseEnter")
.detail("SnapPayload", snapPayload)
@ -3563,11 +3425,11 @@ ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef sna
return Void();
}
ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUID) {
ACTOR Future<Void> snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) {
// remember the client ID before the snap operation
state UID preSnapClientUID = cx->clientInfo->get().id;
TraceEvent("SnapCreateEnterVersion2")
TraceEvent("SnapCreateCoreEnter")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
@ -3585,7 +3447,7 @@ ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUI
Future<Void> exec = snapshotDatabase(Reference<DatabaseContext>::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID);
wait(exec);
} catch (Error& e) {
TraceEvent("SnapshotDatabaseErrorVersion2")
TraceEvent("SnapCreateCoreError")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.error(e);
@ -3595,27 +3457,23 @@ ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUI
UID postSnapClientUID = cx->clientInfo->get().id;
if (preSnapClientUID != postSnapClientUID) {
// if the client IDs changed then we fail the snapshot
TraceEvent("UIDMismatchVersion2")
TraceEvent("SnapCreateCoreUIDMismatch")
.detail("SnapPreSnapClientUID", preSnapClientUID)
.detail("SnapPostSnapClientUID", postSnapClientUID);
throw coordinators_changed();
}
TraceEvent("SnapCreateExitVersion2")
TraceEvent("SnapCreateCoreExit")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
return Void();
}
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version) {
if (version == 1) {
wait(snapCreateVersion1(cx, snapCmd, snapUID));
return Void();
}
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
state int oldMode = wait( setDDMode( cx, 0 ) );
try {
wait(snapCreateVersion2(cx, snapCmd, snapUID));
wait(snapCreateCore(cx, snapCmd, snapUID));
} catch (Error& e) {
state Error err = e;
wait(success( setDDMode( cx, oldMode ) ));

View File

@ -268,14 +268,6 @@ public:
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );
void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true );
// execute operation is similar to set, but the command will reach
// one of the proxies, all the TLogs and all the storage nodes.
// instead of setting a key and value on the DB, it executes the command
// that is passed in the value field.
// - cmdType can be used for logging purposes
// - cmdPayload contains the details of the command to be executed:
// format of the cmdPayload : <binary-path>:<arg1=val1>,<arg2=val2>...
void execute(const KeyRef& cmdType, const ValueRef& cmdPayload);
void clear( const KeyRangeRef& range, bool addConflictRange = true );
void clear( const KeyRef& key, bool addConflictRange = true );
Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation
@ -344,7 +336,7 @@ int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::num
// Takes a snapshot of the cluster, specifically the following persistent
// states: coordinator, TLog and storage state
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version);
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID);
#include "flow/unactorcompiler.h"
#endif

View File

@ -40,9 +40,6 @@ struct ConflictBatch {
TransactionConflict = 0,
TransactionTooOld,
TransactionCommitted,
TransactionNotPermitted,
TransactionNotFullyRecovered,
TransactionExecLogAntiQuorum,
};
void addTransaction( const CommitTransactionRef& transaction );

View File

@ -153,11 +153,11 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
}
#endif
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int snapVersion) {
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role) {
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
state int err = 0;
state Future<int> cmdErr;
state double maxWaitTime = (snapVersion == 2) ? SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT : 3.0;
state double maxWaitTime = SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
if (!g_network->isSimulated()) {
// get bin path
auto snapBin = execArg->getBinaryPath();
@ -183,13 +183,8 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
// copy the files
state std::string folderFrom = folder + "/.";
state std::string folderTo = folder + "-snap-" + uidStr.toString();
double maxSimDelayTime = 1.0;
if (snapVersion == 1) {
folderTo = folder + "-snap-" + uidStr.toString();
} else {
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
maxSimDelayTime = 10.0;
}
double maxSimDelayTime = 10.0;
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
std::vector<std::string> paramList;
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(folderTo);

View File

@ -52,7 +52,7 @@ private: // data
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime);
// helper to run all the work related to running the exec command
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int version);
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role);
// returns true if the execUID op is in progress
bool isExecOpInProgress(UID execUID);

View File

@ -749,7 +749,7 @@ struct CompareFirst {
struct LogPushData : NonCopyable {
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
explicit LogPushData(Reference<ILogSystem> logSystem) : logSystem(logSystem), subsequence(1), hasExecOp(false) {
explicit LogPushData(Reference<ILogSystem> logSystem) : logSystem(logSystem), subsequence(1) {
for(auto& log : logSystem->getLogSystemConfig().tLogs) {
if(log.isLocal) {
for(int i = 0; i < log.tLogs.size(); i++) {
@ -825,10 +825,6 @@ struct LogPushData : NonCopyable {
return messagesWriter[loc].toValue();
}
void setHasExecOp() { hasExecOp = true; }
bool getHasExecOp() { return hasExecOp; }
private:
Reference<ILogSystem> logSystem;
std::vector<Tag> next_message_tags;
@ -836,7 +832,6 @@ private:
std::vector<BinaryWriter> messagesWriter;
std::vector<int> msg_locations;
uint32_t subsequence;
bool hasExecOp;
};
#endif

View File

@ -769,96 +769,6 @@ ACTOR Future<Void> commitBatch(
toCommit.addTags(allSources);
}
toCommit.addTypedMessage(m);
} else if (m.type == MutationRef::Exec) {
state std::string param2 = m.param2.toString();
state ExecCmdValueString execArg(param2);
execArg.dbgPrint();
state StringRef binPath = execArg.getBinaryPath();
state StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
auto result =
self->txnStateStore->readValue(LiteralStringRef("log_anti_quorum").withPrefix(configKeysPrefix)).get();
state int logAntiQuorum = 0;
if (result.present()) {
logAntiQuorum = atoi(result.get().toString().c_str());
}
if (m.param1 != execDisableTLogPop
&& m.param1 != execEnableTLogPop
&& !isWhitelisted(self->whitelistedBinPathVec, binPath)) {
TraceEvent("ExecTransactionNotPermitted")
.detail("TransactionNum", transactionNum);
committed[transactionNum] = ConflictBatch::TransactionNotPermitted;
} else if (self->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
// Cluster is not fully recovered and needs TLogs
// from previous generation for full recovery.
// Currently, snapshot of old tlog generation is not
// supported and hence failing the snapshot request until
// cluster is fully_recovered.
TraceEvent("ExecTransactionNotFullyRecovered")
.detail("TransactionNum", transactionNum);
committed[transactionNum] = ConflictBatch::TransactionNotFullyRecovered;
} else if (logAntiQuorum > 0) {
// exec op is not supported when logAntiQuorum is configured
// FIXME: Add support for exec ops in the presence of log anti quorum
TraceEvent("ExecOpNotSupportedWithLogAntiQuorum")
.detail("LogAntiQuorum", logAntiQuorum)
.detail("TransactionNum", transactionNum);
committed[transactionNum] = ConflictBatch::TransactionExecLogAntiQuorum;
} else {
// Send the ExecOp to
// - all the storage nodes in a single region and
// - only to storage nodes in local region in multi-region setup
// step 1: get the DatabaseConfiguration
auto result =
self->txnStateStore->readValue(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix)).get();
ASSERT(result.present());
state int usableRegions = atoi(result.get().toString().c_str());
// step 2: find the tag.id from locality info of the master
auto localityKey =
self->txnStateStore->readValue(tagLocalityListKeyFor(self->master.locality.dcId())).get();
int8_t locality = tagLocalityInvalid;
if (usableRegions > 1) {
if (!localityKey.present()) {
TraceEvent(SevError, "LocalityKeyNotPresentForMasterDCID");
ASSERT(localityKey.present());
}
locality = decodeTagLocalityListValue(localityKey.get());
}
std::set<Tag> allSources;
auto& m = (*pMutations)[mutationNum];
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid)
.detail("To", "all sources")
.detail("Mutation", m.toString())
.detail("Version", commitVersion);
std::vector<Tag> localTags;
auto tagKeys = self->txnStateStore->readRange(serverTagKeys).get();
for( auto& kv : tagKeys ) {
Tag t = decodeServerTagValue( kv.value );
if ((usableRegions > 1 && t.locality == locality)
|| (usableRegions == 1)) {
localTags.push_back(t);
}
allSources.insert(localTags.begin(), localTags.end());
}
auto te1 = TraceEvent("ProxyCommitTo", self->dbgid);
te1.detail("To", "all sources");
te1.detail("UidStr", uidStr);
te1.detail("Mutation", m.toString());
te1.detail("Version", commitVersion);
te1.detail("NumTags", allSources.size());
for (auto& tag : allSources) {
toCommit.addTag(tag);
}
toCommit.addTypedMessage(m, true /* allLocations */);
toCommit.setHasExecOp();
}
} else
UNREACHABLE();
@ -1078,15 +988,7 @@ ACTOR Future<Void> commitBatch(
else if (committed[t] == ConflictBatch::TransactionTooOld) {
trs[t].reply.sendError(transaction_too_old());
}
else if (committed[t] == ConflictBatch::TransactionNotPermitted) {
trs[t].reply.sendError(transaction_not_permitted());
}
else if (committed[t] == ConflictBatch::TransactionNotFullyRecovered) {
trs[t].reply.sendError(cluster_not_fully_recovered());
}
else if (committed[t] == ConflictBatch::TransactionExecLogAntiQuorum) {
trs[t].reply.sendError(txn_exec_log_anti_quorum());
} else {
else {
trs[t].reply.sendError(not_committed());
}
@ -1736,63 +1638,6 @@ ACTOR Future<Void> masterProxyServerCore(
rep.version = commitData.committedVersion.get();
req.reply.send(rep);
}
when(ExecRequest _execReq = waitNext(proxy.execReq.getFuture())) {
state ExecRequest execReq = _execReq;
if (execReq.debugID.present())
g_traceBatch.addEvent("TransactionDebug", execReq.debugID.get().first(),
"MasterProxyServer.masterProxyServerCore."
"ExecRequest");
TraceEvent("ExecRequest").detail("Payload", execReq.execPayload.toString());
// get the list of coordinators
state Optional<Value> coordinators = commitData.txnStateStore->readValue(coordinatorsKey).get();
state std::vector<NetworkAddress> coordinatorsAddr =
ClusterConnectionString(coordinators.get().toString()).coordinators();
state std::set<NetworkAddress> coordinatorsAddrSet;
for (int i = 0; i < coordinatorsAddr.size(); i++) {
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", coordinatorsAddr[i]);
coordinatorsAddrSet.insert(coordinatorsAddr[i]);
}
// get the list of workers
state std::vector<WorkerDetails> workers =
wait(commitData.db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest()));
// send the exec command to the list of workers which are
// coordinators
state vector<Future<Void>> execCoords;
for (int i = 0; i < workers.size(); i++) {
NetworkAddress primary = workers[i].interf.address();
Optional<NetworkAddress> secondary = workers[i].interf.tLog.getEndpoint().addresses.secondaryAddress;
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end()
|| (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
TraceEvent("ExecReqToCoordinator")
.detail("PrimaryWorkerAddr", primary)
.detail("SecondaryWorkerAddr", secondary);
execCoords.push_back(brokenPromiseToNever(workers[i].interf.execReq.getReply(ExecuteRequest(execReq.execPayload))));
}
}
if (execCoords.size() <= 0) {
TraceEvent(SevDebug, "CoordinatorWorkersNotFound");
execReq.reply.sendError(operation_failed());
} else {
try {
wait(timeoutError(waitForAll(execCoords), 10.0));
int numSucc = 0;
for (auto item : execCoords) {
if (item.isValid() && item.isReady()) {
++numSucc;
}
}
bool succ = (numSucc >= ((execCoords.size() + 1) / 2));
succ ? execReq.reply.send(Void()) : execReq.reply.sendError(operation_failed());
} catch (Error& e) {
TraceEvent("WaitingForAllExecCoords").error(e);
execReq.reply.sendError(broken_promise());
}
}
}
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
addActor.send(proxySnapCreate(snapReq, &commitData));
}

View File

@ -1314,207 +1314,6 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
}
}
void execProcessingHelper(TLogData* self,
Reference<LogData> logData,
TLogCommitRequest* req,
Standalone<VectorRef<Tag>>* execTags,
ExecCmdValueString* execArg,
StringRef* execCmd,
Version* execVersion,
vector<Future<Void>>* snapFailKeySetters,
vector<Future<Void>>* ignoredPops)
{
// inspect the messages to find if there is an Exec type and print
// it. message are prefixed by the length of the message and each
// field is prefixed by the length too
uint8_t type = MutationRef::MAX_ATOMIC_OP;
StringRef param2;
ArenaReader rd(req->arena, req->messages, Unversioned());
int32_t messageLength, rawLength;
uint16_t tagCount;
uint32_t sub;
while (!rd.empty()) {
Tag tmpTag;
bool hasTxsTag = false;
rd.checkpoint();
rd >> messageLength >> sub >> tagCount;
for (int i = 0; i < tagCount; i++) {
rd >> tmpTag;
if (tmpTag.locality == tagLocalityTxs || tmpTag == txsTag) {
hasTxsTag = true;
}
execTags->push_back(execTags->arena(), tmpTag);
}
if (!hasTxsTag) {
rd >> type;
if (type == MutationRef::Exec) {
break;
}
}
rawLength = messageLength + sizeof(messageLength);
rd.rewind();
rd.readBytes(rawLength);
}
int32_t len = 0;
if (type == MutationRef::Exec) {
// get param1
rd >> len;
*execCmd = StringRef((uint8_t const*)rd.readBytes(len), len);
// get param2
rd >> len;
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
.detail("Value", execCmd->toString())
.detail("Version", req->version);
execArg->setCmdValueString(param2);
execArg->dbgPrint();
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
*execVersion = req->version;
}
if (*execCmd == execSnap) {
// validation check specific to snap request
std::string reason;
if (!self->ignorePopRequest) {
*execVersion = invalidVersion;
reason = "SnapFailIgnorePopNotSet";
} else if (uidStr.toString() != self->ignorePopUid) {
*execVersion = invalidVersion;
reason = "SnapFailedDisableTLogUidMismatch";
}
if (*execVersion == invalidVersion) {
TraceEvent(SevWarn, "TLogSnapFailed")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnorePopRequest", self->ignorePopRequest)
.detail("Reason", reason)
.detail("Version", req->version);
TraceEvent("ExecCmdSnapCreate")
.detail("Uid", uidStr.toString())
.detail("Status", -1)
.detail("Tag", logData->allTags.begin()->toString())
.detail("Role", "TLog")
.detail("Version", req->version);
if (g_network->isSimulated()) {
// write SnapFailedTLog.$UID
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(uidStr);
Standalone<StringRef> valStr = LiteralStringRef("Success");
TraceEvent(SevDebug, "TLogKeyStr").detail("Value", keyStr);
snapFailKeySetters->push_back(runRYWTransaction(self->cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void>
{ tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->set(keyStr, valStr); return Void(); }));
}
}
}
if (*execCmd == execDisableTLogPop) {
self->ignorePopRequest = true;
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", uidStr.toString())
.detail("Version", req->version);
}
self->ignorePopUid = uidStr.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
TraceEvent("TLogExecCmdPopDisable")
.detail("ExecCmd", execCmd->toString())
.detail("UidStr", uidStr.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("Version", req->version);
}
if (*execCmd == execEnableTLogPop) {
if (self->ignorePopUid != uidStr.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", uidStr.toString())
.detail("Version", req->version);
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
}
self->toBePopped.clear();
TraceEvent("TLogExecCmdPopEnable")
.detail("ExecCmd", execCmd->toString())
.detail("UidStr", uidStr.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("Version", req->version);
}
}
}
ACTOR Future<Void> tLogSnapHelper(TLogData* self,
Reference<LogData> logData,
ExecCmdValueString* execArg,
Version version,
Version execVersion,
StringRef execCmd,
Standalone<VectorRef<Tag>> execTags)
{
state int err = 0;
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
state UID execUID = UID::fromString(uidStr.toString());
state bool otherRoleExeced = false;
// TLog is special, we need to snap at the execVersion.
// storage on the same node should not initiate a snap before TLog which will make
// the snap version at TLog unpredictable
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
TraceEvent("TLogCommitExecTraceTLog")
.detail("UidStr", uidStr.toString())
.detail("Status", err)
.detail("Tag", logData->allTags.begin()->toString())
.detail("OldTagSize", logData->allTags.size())
.detail("Role", "TLog");
// print the detailed status message
for (int i = 0; i < execTags.size(); i++) {
Version poppedTagVersion = -1;
auto tagv = logData->getTagData(execTags[i]);
if (!tagv) {
continue;
}
poppedTagVersion = tagv->popped;
TraceEvent te = TraceEvent(SevDebug, "TLogExecTraceDetailed");
te.detail("Uid", uidStr.toString());
te.detail("Status", err);
te.detail("Role", "TLog");
te.detail("ExecCmd", execCmd.toString());
te.detail("Param2", execArg->getCmdValueString().toString());
te.detail("Tag", tagv->tag.toString());
te.detail("Version", version);
te.detail("PoppedTagVersion", poppedTagVersion);
te.detail("PersistentDataVersion", logData->persistentDataVersion);
te.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion);
te.detail("QueueCommittedVersion", logData->queueCommittedVersion.get());
te.detail("IgnorePopUid", self->ignorePopUid);
}
return Void();
}
ACTOR Future<Void> tLogCommit(
TLogData* self,
TLogCommitRequest req,
@ -1549,58 +1348,21 @@ ACTOR Future<Void> tLogCommit(
wait( delayJittered(.005, TaskPriority::TLogCommit) );
}
// while exec op is being committed, no new transactions will be admitted.
// This property is useful for snapshot kind of operations which wants to
// take a snap of the disk image at a particular version (no data from
// future version to be included)
// NOTE: execOpCommitInProgress will not be set for exec commands which
// start with \xff
state bool execOpLockTaken = false;
if (logData->execOpCommitInProgress) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
}
if(logData->stopped) {
req.reply.sendError( tlog_stopped() );
return Void();
}
state Version execVersion = invalidVersion;
state ExecCmdValueString execArg;
state TLogQueueEntryRef qe;
state StringRef execCmd;
state Standalone<VectorRef<Tag>> execTags;
state vector<Future<Void>> snapFailKeySetters;
state vector<Future<Void>> playIgnoredPops;
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
if (req.hasExecOp) {
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("LogId", logData->logId)
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
if (!execOpLockTaken) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
} else {
ASSERT(logData->execOpLock.available() == 0);
}
ASSERT(execOpLockTaken);
}
}
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
commitMessages(self, logData, req.version, req.arena, req.messages);
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
TLogQueueEntryRef qe;
// Log the changes to the persistent queue, to be committed by commitQueue()
qe.version = req.version;
qe.knownCommittedVersion = logData->knownCommittedVersion;
@ -1615,7 +1377,6 @@ ACTOR Future<Void> tLogCommit(
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set( req.version );
wait(waitForAll(playIgnoredPops));
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
@ -1624,19 +1385,6 @@ ACTOR Future<Void> tLogCommit(
state Future<Void> stopped = logData->stopCommit.onTrigger();
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
if ((execVersion != invalidVersion) && execVersion <= logData->queueCommittedVersion.get()) {
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
ASSERT(execOpLockTaken);
logData->execOpCommitInProgress = false;
}
if (execOpLockTaken) {
logData->execOpLock.release();
execOpLockTaken = false;
}
execVersion = invalidVersion;
if(stopped.isReady()) {
ASSERT(logData->stopped);
req.reply.sendError( tlog_stopped() );
@ -1647,13 +1395,6 @@ ACTOR Future<Void> tLogCommit(
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
req.reply.send( logData->durableKnownCommittedVersion );
if (g_network->isSimulated()) {
if (snapFailKeySetters.size() > 0) {
TraceEvent(SevDebug, "SettingSnapFailKey");
wait(waitForAll(snapFailKeySetters));
TraceEvent(SevDebug, "SettingSnapFailKeyDone");
}
}
return Void();
}
@ -1805,7 +1546,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
@ -1824,7 +1565,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").error(e, true /*includeCancelled */);
TraceEvent("TLogSnapCreateError").error(e, true /*includeCancelled */);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {

View File

@ -225,14 +225,13 @@ struct TLogCommitRequest {
ReplyPromise<Version> reply;
Optional<UID> debugID;
bool hasExecOp;
TLogCommitRequest() {}
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, bool hasExecOp, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID), hasExecOp(hasExecOp){}
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID, hasExecOp);
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID);
}
};

View File

@ -1689,207 +1689,6 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
}
}
void execProcessingHelper(TLogData* self,
Reference<LogData> logData,
TLogCommitRequest* req,
Standalone<VectorRef<Tag>>* execTags,
ExecCmdValueString* execArg,
StringRef* execCmd,
Version* execVersion,
vector<Future<Void>>* snapFailKeySetters,
vector<Future<Void>>* ignoredPops)
{
// inspect the messages to find if there is an Exec type and print
// it. message are prefixed by the length of the message and each
// field is prefixed by the length too
uint8_t type = MutationRef::MAX_ATOMIC_OP;
StringRef param2;
ArenaReader rd(req->arena, req->messages, Unversioned());
int32_t messageLength, rawLength;
uint16_t tagCount;
uint32_t sub;
while (!rd.empty()) {
Tag tmpTag;
bool hasTxsTag = false;
rd.checkpoint();
rd >> messageLength >> sub >> tagCount;
for (int i = 0; i < tagCount; i++) {
rd >> tmpTag;
if (tmpTag.locality == tagLocalityTxs || tmpTag == txsTag) {
hasTxsTag = true;
}
execTags->push_back(execTags->arena(), tmpTag);
}
if (!hasTxsTag) {
rd >> type;
if (type == MutationRef::Exec) {
break;
}
}
rawLength = messageLength + sizeof(messageLength);
rd.rewind();
rd.readBytes(rawLength);
}
int32_t len = 0;
if (type == MutationRef::Exec) {
// get param1
rd >> len;
*execCmd = StringRef((uint8_t const*)rd.readBytes(len), len);
// get param2
rd >> len;
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
.detail("Value", execCmd->toString())
.detail("Version", req->version);
execArg->setCmdValueString(param2);
execArg->dbgPrint();
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
*execVersion = req->version;
}
if (*execCmd == execSnap) {
// validation check specific to snap request
std::string reason;
if (!self->ignorePopRequest) {
*execVersion = invalidVersion;
reason = "SnapFailIgnorePopNotSet";
} else if (uidStr.toString() != self->ignorePopUid) {
*execVersion = invalidVersion;
reason = "SnapFailedDisableTLogUidMismatch";
}
if (*execVersion == invalidVersion) {
TraceEvent(SevWarn, "TLogSnapFailed")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnorePopRequest", self->ignorePopRequest)
.detail("Reason", reason)
.detail("Version", req->version);
TraceEvent("ExecCmdSnapCreate")
.detail("Uid", uidStr.toString())
.detail("Status", -1)
.detail("Tag", logData->allTags.begin()->toString())
.detail("Role", "TLog")
.detail("Version", req->version);
if (g_network->isSimulated()) {
// write SnapFailedTLog.$UID
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(uidStr);
StringRef valStr = LiteralStringRef("Success");
TraceEvent(SevDebug, "TLogKeyStr").detail("Value", keyStr);
snapFailKeySetters->push_back(runRYWTransaction(self->cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void>
{ tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->set(keyStr, valStr); return Void(); }));
}
}
}
if (*execCmd == execDisableTLogPop) {
self->ignorePopRequest = true;
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", uidStr.toString())
.detail("Version", req->version);
}
self->ignorePopUid = uidStr.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
TraceEvent("TLogExecCmdPopDisable")
.detail("ExecCmd", execCmd->toString())
.detail("UidStr", uidStr.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("Version", req->version);
}
if (*execCmd == execEnableTLogPop) {
if (self->ignorePopUid != uidStr.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", uidStr.toString())
.detail("Version", req->version);
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
}
self->toBePopped.clear();
TraceEvent("TLogExecCmdPopEnable")
.detail("ExecCmd", execCmd->toString())
.detail("UidStr", uidStr.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("Version", req->version);
}
}
}
ACTOR Future<Void> tLogSnapHelper(TLogData* self,
Reference<LogData> logData,
ExecCmdValueString* execArg,
Version version,
Version execVersion,
StringRef execCmd,
Standalone<VectorRef<Tag>> execTags)
{
state int err = 0;
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
state UID execUID = UID::fromString(uidStr.toString());
state bool otherRoleExeced = false;
// TLog is special, we need to snap at the execVersion.
// storage on the same node should not initiate a snap before TLog which will make
// the snap version at TLog unpredictable
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
TraceEvent("TLogCommitExecTraceTLog")
.detail("UidStr", uidStr.toString())
.detail("Status", err)
.detail("Tag", logData->allTags.begin()->toString())
.detail("OldTagSize", logData->allTags.size())
.detail("Role", "TLog");
// print the detailed status message
for (int i = 0; i < execTags.size(); i++) {
Version poppedTagVersion = -1;
auto tagv = logData->getTagData(execTags[i]);
if (!tagv) {
continue;
}
poppedTagVersion = tagv->popped;
TraceEvent te = TraceEvent(SevDebug, "TLogExecTraceDetailed");
te.detail("Uid", uidStr.toString());
te.detail("Status", err);
te.detail("Role", "TLog");
te.detail("ExecCmd", execCmd.toString());
te.detail("Param2", execArg->getCmdValueString().toString());
te.detail("Tag", tagv->tag.toString());
te.detail("Version", version);
te.detail("PoppedTagVersion", poppedTagVersion);
te.detail("PersistentDataVersion", logData->persistentDataVersion);
te.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion);
te.detail("QueueCommittedVersion", logData->queueCommittedVersion.get());
te.detail("IgnorePopUid", self->ignorePopUid);
}
return Void();
}
ACTOR Future<Void> tLogCommit(
TLogData* self,
TLogCommitRequest req,
@ -1924,59 +1723,21 @@ ACTOR Future<Void> tLogCommit(
wait( delayJittered(.005, TaskPriority::TLogCommit) );
}
// while exec op is being committed, no new transactions will be admitted.
// This property is useful for snapshot kind of operations which wants to
// take a snap of the disk image at a particular version (not data from
// future version to be included)
// NOTE: execOpCommitInProgress will not be set for exec commands which
// start with \xff
state bool execOpLockTaken = false;
if (logData->execOpCommitInProgress) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
}
if(logData->stopped) {
req.reply.sendError( tlog_stopped() );
return Void();
}
state Version execVersion = invalidVersion;
state ExecCmdValueString execArg;
state TLogQueueEntryRef qe;
state StringRef execCmd;
state Standalone<VectorRef<Tag>> execTags;
state vector<Future<Void>> playIgnoredPops;
state vector<Future<Void>> snapFailKeySetters;
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
if (req.hasExecOp) {
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
if (execVersion != invalidVersion) {
TraceEvent(SevDebug, "SettingExecOpCommit")
.detail("LogId", logData->logId)
.detail("ExecVersion", execVersion)
.detail("Version", req.version);
logData->execOpCommitInProgress = true;
if (!execOpLockTaken) {
wait(logData->execOpLock.take());
execOpLockTaken = true;
} else {
ASSERT(logData->execOpLock.available() == 0);
}
ASSERT(execOpLockTaken);
}
}
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
commitMessages(self, logData, req.version, req.arena, req.messages);
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
TLogQueueEntryRef qe;
// Log the changes to the persistent queue, to be committed by commitQueue()
qe.version = req.version;
qe.knownCommittedVersion = logData->knownCommittedVersion;
@ -1991,7 +1752,6 @@ ACTOR Future<Void> tLogCommit(
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set( req.version );
wait(waitForAll(playIgnoredPops));
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
@ -2000,20 +1760,6 @@ ACTOR Future<Void> tLogCommit(
state Future<Void> stopped = logData->stopCommit.onTrigger();
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
if ((execVersion != invalidVersion) &&
execVersion <= logData->queueCommittedVersion.get()) {
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
}
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
ASSERT(execOpLockTaken);
logData->execOpCommitInProgress = false;
}
if (execOpLockTaken) {
logData->execOpLock.release();
execOpLockTaken = false;
}
execVersion = invalidVersion;
if(stopped.isReady()) {
ASSERT(logData->stopped);
req.reply.sendError( tlog_stopped() );
@ -2024,13 +1770,6 @@ ACTOR Future<Void> tLogCommit(
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
req.reply.send( logData->durableKnownCommittedVersion );
if (g_network->isSimulated()) {
if (snapFailKeySetters.size() > 0) {
TraceEvent(SevDebug, "SettingSnapFailKey");
wait(waitForAll(snapFailKeySetters));
TraceEvent(SevDebug, "SettingSnapFailKeyDone");
}
}
return Void();
}
@ -2184,7 +1923,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")

View File

@ -436,7 +436,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, data.getHasExecOp(), debugID ), TaskPriority::TLogCommitReply ) );
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::TLogCommitReply ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -1700,7 +1700,6 @@ int main(int argc, char* argv[]) {
std::string absDataFolder = abspath(dataFolder);
ini.LoadFile(joinPath(absDataFolder, "restartInfo.ini").c_str());
int backupFailed = true;
int backupVersion = 1;
const char* isRestoringStr = ini.GetValue("RESTORE", "isRestoring", NULL);
if (isRestoringStr) {
isRestoring = atoi(isRestoringStr);
@ -1708,142 +1707,77 @@ int main(int argc, char* argv[]) {
if (isRestoring && backupFailedStr) {
backupFailed = atoi(backupFailedStr);
}
const char* backupVersionStr = ini.GetValue("RESTORE", "BackupVersion", NULL);
if (isRestoring && backupVersionStr) {
backupVersion = atoi(backupVersionStr);
}
}
if (isRestoring && !backupFailed) {
if (backupVersion == 1) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
// delete all files (except fdb.cluster) in non-snap directories
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
if (returnList[i].find(snapStr) != std::string::npos) {
continue;
}
// delete all files (except fdb.cluster) in non-snap directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
if (dirEntry.find(snapStr) != std::string::npos) {
continue;
}
std::string childf = absDataFolder + "/" + returnList[i];
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (int j = 0; j < returnFiles.size(); j++) {
if (returnFiles[j] != "fdb.cluster" && returnFiles[j] != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + returnFiles[j]);
deleteFile(childf + "/" + returnFiles[j]);
}
std::string childf = absDataFolder + "/" + dirEntry;
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (const auto & fileEntry : returnFiles) {
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + fileEntry);
deleteFile(childf + "/" + fileEntry);
}
}
// move the contents from snap folder to the original folder,
// delete snap folders
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + returnList[i];
// delete snap directories which are not part of restoreSnapUID
if (returnList[i].find(snapStr) == std::string::npos) {
if (returnList[i].find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
}
continue;
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
}
// cleanup unwanted and partial directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
// delete snap directories which are not part of restoreSnapUID
if (dirEntry.find(snapStr) == std::string::npos) {
if (dirEntry.find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
std::string origDir = returnList[i].substr(0, 32);
std::string dirToRemove = absDataFolder + "/" + origDir;
TraceEvent("DeletingOriginalNonSnapDirectory").detail("FileBeingDeleted", dirToRemove);
platform::eraseDirectoryRecursive(dirToRemove);
renameFile(dirSrc, dirToRemove);
TraceEvent("RenamingSnapToOriginalDirectory")
.detail("Oldname", dirSrc)
.detail("Newname", dirToRemove);
continue;
}
} else if (backupVersion == 2) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
// delete all files (except fdb.cluster) in non-snap directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
if (dirEntry.find(snapStr) != std::string::npos) {
continue;
}
std::string childf = absDataFolder + "/" + dirEntry;
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (const auto & fileEntry : returnFiles) {
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + fileEntry);
deleteFile(childf + "/" + fileEntry);
}
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
// cleanup unwanted and partial directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
// delete snap directories which are not part of restoreSnapUID
if (dirEntry.find(snapStr) == std::string::npos) {
if (dirEntry.find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
}
continue;
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
}
// move snapshotted files to appropriate locations
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
// move snapshotted files to appropriate locations
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
std::string origDir = dirEntry.substr(0, 32);
std::string dirToMove = absDataFolder + "/" + origDir;
if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("tlog") != std::string::npos)) {
// restore tlog files
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("storage") != std::string::npos)) {
// restore storage files
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("coord") != std::string::npos)) {
// restore coordinator files
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
std::string origDir = dirEntry.substr(0, 32);
std::string dirToMove = absDataFolder + "/" + origDir;
if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("tlog") != std::string::npos)) {
// restore tlog files
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("storage") != std::string::npos)) {
// restore storage files
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("coord") != std::string::npos)) {
// restore coordinator files
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
}
}
}
}

View File

@ -27,30 +27,30 @@
#include "flow/SystemMonitor.h"
#include "flow/Util.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/VersionedMap.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/StorageMetrics.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/Smoother.h"
#include "flow/Stats.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "flow/TDMetric.actor.h"
#include <type_traits>
#include "flow/actorcompiler.h" // This must be the last #include.
@ -1907,12 +1907,9 @@ void addMutation( Reference<T>& target, Version version, MutationRef const& muta
}
template <class T>
void splitMutations(StorageServer* data, KeyRangeMap<T>& map, VerUpdateRef const& update, vector<int>& execIndex) {
void splitMutations(StorageServer* data, KeyRangeMap<T>& map, VerUpdateRef const& update) {
for(int i = 0; i < update.mutations.size(); i++) {
splitMutation(data, map, update.mutations[i], update.version);
if (update.mutations[i].type == MutationRef::Exec) {
execIndex.push_back(i);
}
}
}
@ -1931,53 +1928,10 @@ void splitMutation(StorageServer* data, KeyRangeMap<T>& map, MutationRef const&
addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
}
}
} else if (m.type == MutationRef::Exec) {
} else
ASSERT(false); // Unknown mutation type in splitMutations
}
ACTOR Future<Void>
snapHelper(StorageServer* data, MutationRef m, Version ver)
{
state std::string cmd = m.param1.toString();
if ((cmd == execDisableTLogPop) || (cmd == execEnableTLogPop)) {
TraceEvent("IgnoreNonSnapCommands").detail("ExecCommand", cmd);
return Void();
}
state ExecCmdValueString execArg(m.param2);
state StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
state int err = 0;
state Future<int> cmdErr;
state UID execUID = UID::fromString(uidStr.toString());
state bool skip = false;
if (cmd == execSnap && isTLogInSameNode()) {
skip = true;
}
// other storage has initiated the exec, so we can skip
if (!skip && isExecOpInProgress(execUID)) {
skip = true;
}
if (!skip) {
setExecOpInProgress(execUID);
int err = wait(execHelper(&execArg, data->folder, "role=storage", 1 /*version*/));
clearExecOpInProgress(execUID);
}
TraceEvent te = TraceEvent("ExecTraceStorage");
te.detail("Uid", uidStr.toString());
te.detail("Status", err);
te.detail("Role", "storage");
te.detail("Version", ver);
te.detail("Mutation", m.toString());
te.detail("Mid", data->thisServerID.toString());
te.detail("DurableVersion", data->durableVersion.get());
te.detail("DataVersion", data->version.get());
te.detail("Tag", data->tag.toString());
te.detail("SnapCreateSkipped", skip);
return Void();
}
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
state TraceInterval interval("FetchKeys");
state KeyRange keys = shard->keys;
@ -2085,28 +2039,22 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
if (this_block.more) {
Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
if (nfk != keys.end) {
state std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
// This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
// The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
state AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
keys = shard->keys;
// Split our prior updates. The ones that apply to our new, restricted key range will go back into shard->updates,
// and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
// What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
state vector<int> execIdxVec;
state std::deque< Standalone<VerUpdateRef> >::iterator u = updatesToSplit.begin();
std::deque< Standalone<VerUpdateRef> >::iterator u = updatesToSplit.begin();
for(; u != updatesToSplit.end(); ++u) {
ASSERT(execIdxVec.size() == 0);
splitMutations(data, data->shards, *u, execIdxVec);
for (auto execIdx : execIdxVec) {
wait(snapHelper(data, u->mutations[execIdx], u->version));
}
execIdxVec.clear();
splitMutations(data, data->shards, *u);
}
TEST( true );
@ -2299,8 +2247,7 @@ void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
adding->addMutation(version, mutation);
else if (readWrite)
readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
else if ((mutation.type != MutationRef::ClearRange)
&& (mutation.type != MutationRef::Exec)) {
else if (mutation.type != MutationRef::ClearRange) {
TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
ASSERT(false); // Mutation delivered to notAssigned shard!
}
@ -2715,9 +2662,6 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
state VerUpdateRef* pUpdate = &fii.changes[changeNum];
for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
if (pUpdate->mutations[mutationNum].type == MutationRef::Exec) {
wait(snapHelper(data, pUpdate->mutations[mutationNum], pUpdate->version));
}
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
injectedChanges = true;
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
@ -2790,9 +2734,6 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
++data->counters.atomicMutations;
break;
}
if (msg.type == MutationRef::Exec) {
wait(snapHelper(data, msg, ver));
}
}
else
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());

View File

@ -659,7 +659,7 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString(), 2 /* version */));
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
@ -1228,25 +1228,6 @@ ACTOR Future<Void> workerServer(
systemMonitor();
loggingTrigger = delay( loggingDelay, TaskPriority::FlushTrace );
}
when(state ExecuteRequest req = waitNext(interf.execReq.getFuture())) {
state ExecCmdValueString execArg(req.execPayload);
try {
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator", 1 /*version*/));
StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
auto tokenStr = "ExecTrace/Coordinators/" + uidStr.toString();
auto te = TraceEvent("ExecTraceCoordinators");
te.detail("Uid", uidStr.toString());
te.detail("Status", err);
te.detail("Role", "coordinator");
te.detail("Value", coordFolder);
te.detail("ExecPayload", execArg.getCmdValueString().toString());
te.trackLatest(tokenStr.c_str());
req.reply.send(Void());
} catch (Error& e) {
TraceEvent("ExecHelperError").error(e);
req.reply.sendError(broken_promise());
}
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {

View File

@ -82,7 +82,6 @@ public: // variables
std::string restartInfoLocation; // file location to store the snap restore info
int maxRetryCntToRetrieveMessage; // number of retires to do trackLatest
bool skipCheck; // disable check if the exec fails
int snapVersion; // snapVersion to invoke
public: // ctor & dtor
SnapTestWorkload(WorkloadContext const& wcx)
@ -98,7 +97,6 @@ public: // ctor & dtor
getOption(options, LiteralStringRef("restartInfoLocation"), LiteralStringRef("simfdb/restartInfo.ini"))
.toString();
skipCheck = false;
snapVersion = getOption(options, LiteralStringRef("version"), 1);
}
public: // workload functions
@ -153,34 +151,6 @@ public: // workload functions
void getMetrics(vector<PerfMetric>& m) override { TraceEvent("SnapTestWorkloadGetMetrics"); }
ACTOR Future<Void> snapExecHelper(SnapTestWorkload* self, Database cx, StringRef keyRef, StringRef valueRef) {
state Transaction tr(cx);
state int retry = 0;
loop {
try {
tr.execute(keyRef, valueRef);
wait(tr.commit());
break;
} catch (Error& e) {
++retry;
if (e.code() == error_code_txn_exec_log_anti_quorum) {
self->skipCheck = true;
break;
}
if (e.code() == error_code_cluster_not_fully_recovered) {
TraceEvent(SevWarnAlways, "ClusterNotFullyRecovered")
.error(e);
if (retry > 10) {
self->skipCheck = true;
break;
}
}
}
}
return Void();
}
ACTOR Future<Void> _create_keys(Database cx, std::string prefix, bool even = true) {
state Transaction tr(cx);
state vector<int64_t> keys;
@ -237,7 +207,7 @@ public: // workload functions
self->snapUID = deterministicRandom()->randomUniqueID();
try {
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create.sh");
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
wait(status);
break;
} catch (Error& e) {
@ -245,20 +215,11 @@ public: // workload functions
snapFailed = true;
break;
}
if (e.code() == error_code_cluster_not_fully_recovered) {
++retry;
if (retry > 10) {
snapFailed = true;
break;
}
}
if (self->snapVersion == 2) {
++retry;
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
if (retry > 5) {
snapFailed = true;
break;
}
++retry;
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
if (retry > 5) {
snapFailed = true;
break;
}
}
}
@ -268,7 +229,6 @@ public: // workload functions
std::string uidStr = self->snapUID.toString();
ini.SetValue("RESTORE", "RestoreSnapUID", uidStr.c_str());
ini.SetValue("RESTORE", "BackupFailed", format("%d", snapFailed).c_str());
ini.SetValue("RESTORE", "BackupVersion", format("%d", self->snapVersion).c_str());
ini.SaveFile(self->restartInfoLocation.c_str());
// write the snapUID to a file
TraceEvent("SnapshotCreateStatus").detail("Status", !snapFailed ? "Success" : "Failure");
@ -326,32 +286,6 @@ public: // workload functions
throw operation_failed();
}
} else if (self->testID == 4) {
// description: if disable of a TLog pop was not followed by a
// corresponding enable, then TLog will automatically enable the
// popping of TLogs. this test case validates that we auto
// enable the popping of TLogs
state Standalone<StringRef> payLoadRef = LiteralStringRef("empty-binary:uid=a36b2ca0e8dab0452ac3e12b6b926f4b");
wait(self->snapExecHelper(self, cx, execDisableTLogPop, payLoadRef));
} else if (self->testID == 5) {
// snapshot create without disabling pop of the TLog
StringRef uidStr = LiteralStringRef("d78b08d47f341158e9a54d4baaf4a4dd");
self->snapUID = UID::fromString(uidStr.toString());
state Standalone<StringRef> snapPayload = LiteralStringRef("/bin/"
"snap_create.sh:uid=").withSuffix(uidStr);
wait(self->snapExecHelper(self, cx, execSnap, snapPayload));
} else if (self->testID == 6) {
// disable popping of TLog and snapshot create with mis-matching
payLoadRef = LiteralStringRef("empty-binary:uid=f49d27ddf7a28b6549d930743e0ebdbe");
wait(self->snapExecHelper(self, cx, execDisableTLogPop, payLoadRef));
if (self->skipCheck) {
return Void();
}
StringRef uidStr = LiteralStringRef("ba61e9612a561d60bd83ad83e1b63568");
self->snapUID = UID::fromString(uidStr.toString());
snapPayload = LiteralStringRef("/bin/snap_create.sh:uid=").withSuffix(uidStr);
wait(self->snapExecHelper(self, cx, execSnap, snapPayload));
} else if (self->testID == 7) {
// create a snapshot with a non whitelisted binary path and operation
// should fail
state bool testedFailure = false;
@ -360,7 +294,7 @@ public: // workload functions
self->snapUID = deterministicRandom()->randomUniqueID();
try {
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create1.sh");
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
wait(status);
break;
} catch (Error& e) {

View File

@ -114,7 +114,6 @@ add_fdb_test(TEST_FILES fast/RandomUnitTests.txt)
add_fdb_test(TEST_FILES fast/SelectorCorrectness.txt)
add_fdb_test(TEST_FILES fast/Sideband.txt)
add_fdb_test(TEST_FILES fast/SidebandWithStatus.txt)
add_fdb_test(TEST_FILES fast/SnapTestFailAndDisablePop.txt)
add_fdb_test(TEST_FILES fast/SwizzledRollbackSideband.txt)
add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.txt)
add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.txt)

View File

@ -1,28 +0,0 @@
; verify that the TLog popping disable times out and switches to enable mode
; automatically, if not enabled specifically
testTitle=SnapTLogPopDisableTimeout
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=4
; snapCreate without TLogPopDisable
testTitle=SnapCreateWithNoDisablePop
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=5
; snapCreate and tlogPopDisable with mis-matched UID
testTitle=SnapCreateDisableTLogPopMismatch
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=6
; snapCreate with binary path that is not whitelisted
testTitle=SnapCreateNotWhitelistedBinaryPath
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=7

View File

@ -12,7 +12,6 @@ testTitle=SnapCyclePre
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testTitle=SnapCycleShutdown
;save and shutdown

View File

@ -25,7 +25,6 @@ testTitle=SnapTestTakeSnap
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testName=Attrition
testDuration=10.0

View File

@ -25,7 +25,6 @@ testTitle=SnapTestTakeSnap
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testTitle=SnapTestPost
;write 1000 Keys ending with odd numbers

View File

@ -13,7 +13,6 @@ testTitle=SnapSimpleTakeSnap
maxSnapDelay=5.0
testID=1
clearAfterTest=false
version=2
;write 1000 Keys ending with odd number
testTitle=SnapSimplePost
@ -23,7 +22,14 @@ testTitle=SnapSimplePost
testID=2
clearAfterTest=false
; save and shutdown
;snapCreate with binary path that is not whitelisted
testTitle=SnapCreateNotWhitelistedBinaryPath
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=4
;save and shutdown
testTitle=SnapSimpleShutdown
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini