Merge branch 'master' into add-cache-memory-parameter

# Conflicts:
#	documentation/sphinx/source/release-notes.rst
This commit is contained in:
A.J. Beamon 2019-07-25 13:41:57 -07:00
commit a92b6cd3d1
38 changed files with 1101 additions and 121 deletions

View File

@ -158,7 +158,7 @@ function(add_flow_target)
message(FATAL_ERROR "No sources provided")
endif()
if(OPEN_FOR_IDE)
set(sources ${AFT_SRCS} ${AFT_DISABLE_ACTOR_WRITHOUT_WAIT_WARNING} ${AFT_ADDL_SRCS})
set(sources ${AFT_SRCS} ${AFT_DISABLE_ACTOR_WITHOUT_WAIT_WARNING} ${AFT_ADDL_SRCS})
add_library(${AFT_NAME} OBJECT ${sources})
else()
foreach(src IN LISTS AFT_SRCS AFT_DISABLE_ACTOR_WITHOUT_WAIT_WARNING)

View File

@ -9,6 +9,8 @@ Features
--------
* Improved team collection for data distribution that builds a balanced number of teams per server and gurantees that each server has at least one team. `(PR #1785) <https://github.com/apple/foundationdb/pull/1785>`_.
* CMake is now our official build system. The Makefile based build system is deprecated.
Performance
-----------
@ -37,6 +39,7 @@ Bindings
* Added a new API to get the approximated transaction size before commit, e.g., ``fdb_transaction_get_approximate_size`` in the C binding. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* C: ``fdb_future_get_version`` has been renamed to ``fdb_future_get_int64``. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* C: Applications linking to libfdb_c can now use ``pkg-config foundationdb-client`` or ``find_package(FoundationDB-Client ...)`` (for cmake) to get the proper flags for compiling and linking. `(PR #1636) <https://github.com/apple/foundationdb/pull/1636>`_.
* Go: The Go bindings now require Go version 1.11 or later.
* Go: Fix issue with finalizers running too early that could lead to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
* Added transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.
@ -45,6 +48,9 @@ Other Changes
-------------
* Trace files are now ordered lexicographically. This means that the filename format for trace files did change. `(PR #1828) <https://github.com/apple/foundationdb/pull/1828>`_.
* FoundationDB can now be built with clang and libc++ on Linux `(PR #1666) <https://github.com/apple/foundationdb/pull/1666>`_.
* Added experimental framework to run C and Java clients in simulator `(PR #1678) <https://github.com/apple/foundationdb/pull/1678>`_.
* Added new network option for client buggify which will randomly throw expected exceptions in the client. Intended for client testing `(PR #1417) <https://github.com/apple/foundationdb/pull/1417>`_.
* Added ``--cache_memory`` parameter for ``fdbserver`` processes to control the amount of memory dedicated to caching pages read from disk. `(PR #1889) <https://github.com/apple/foundationdb/pull/1889>`_.
Earlier release notes

View File

@ -2170,7 +2170,19 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
}
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
wait(makeInterruptable(mgmtSnapCreate(db, snapCmd)));
try {
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd, 2 /* version */)));
int version = 2;
if (version == 1) {
printf("Snapshots tagged with UID: %s, check logs for status\n", snapUID.toString().c_str());
} else {
printf("Snapshots create succeeded with UID: %s\n", snapUID.toString().c_str());
}
} catch (Error& e) {
fprintf(stderr, "Snapshot create failed, %d (%s)."
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());
return true;
}
return false;
}

View File

@ -1403,11 +1403,12 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
rd >> oldMode;
}
}
if (!mode) {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << dataDistributionModeLock;
tr.set( moveKeysLockOwnerKey, wrMyOwner.toValue() );
}
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << dataDistributionModeLock;
tr.set( moveKeysLockOwnerKey, wrMyOwner.toValue() );
BinaryWriter wrLastWrite(Unversioned());
wrLastWrite << deterministicRandom()->randomUniqueID();
tr.set( moveKeysLockWriteKey, wrLastWrite.toValue() );
tr.set( dataDistributionModeKey, wr.toValue() );
@ -1481,27 +1482,23 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vec
return inProgressExclusion;
}
ACTOR Future<Void> mgmtSnapCreate(Database cx, StringRef snapCmd) {
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version) {
state int retryCount = 0;
loop {
state UID snapUID = deterministicRandom()->randomUniqueID();
try {
wait(snapCreate(cx, snapCmd, snapUID));
printf("Snapshots tagged with UID: %s, check logs for status\n", snapUID.toString().c_str());
wait(snapCreate(cx, snapCmd, snapUID, version));
TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID);
break;
return snapUID;
} catch (Error& e) {
++retryCount;
TraceEvent(retryCount > 3 ? SevWarn : SevInfo, "SnapCreateFailed").error(e);
if (retryCount > 3) {
fprintf(stderr, "Snapshot create failed, %d (%s)."
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());
throw;
}
}
}
return Void();
}
ACTOR Future<Void> waitForFullReplication( Database cx ) {

View File

@ -195,7 +195,7 @@ bool schemaMatch( json_spirit::mValue const& schema, json_spirit::mValue const&
// execute payload in 'snapCmd' on all the coordinators, TLogs and
// storage nodes
ACTOR Future<Void> mgmtSnapCreate(Database cx, StringRef snapCmd);
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version);
#include "flow/unactorcompiler.h"
#endif

View File

@ -49,9 +49,9 @@ struct MasterProxyInterface {
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
RequestStream< struct TxnStateRequest > txnState;
RequestStream<struct ExecRequest> execReq;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct ExecRequest > execReq;
RequestStream< struct ProxySnapRequest > proxySnapReq;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
@ -63,7 +63,7 @@ struct MasterProxyInterface {
void serialize(Archive& ar) {
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics, execReq);
txnState, getHealthMetrics, execReq, proxySnapReq);
}
void initEndpoints() {
@ -350,4 +350,22 @@ struct ExecRequest
}
};
struct ProxySnapRequest
{
constexpr static FileIdentifier file_identifier = 22204900;
Arena arena;
StringRef snapPayload;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
explicit ProxySnapRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit ProxySnapRequest(StringRef snap, UID snapUID, Optional<UID> debugID = Optional<UID>()) : snapPayload(snap), snapUID(snapUID), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapPayload, snapUID, reply, arena, debugID);
}
};
#endif

View File

@ -29,6 +29,7 @@
#include "fdbclient/FailureMonitorClient.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/MutationList.h"
@ -3439,7 +3440,7 @@ void enableClientInfoLogging() {
TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
}
ACTOR Future<Void> snapCreate(Database inputCx, StringRef snapCmd, UID snapUID) {
ACTOR Future<Void> snapCreateVersion1(Database inputCx, StringRef snapCmd, UID snapUID) {
state Transaction tr(inputCx);
state DatabaseContext* cx = inputCx.getPtr();
// remember the client ID before the snap operation
@ -3452,7 +3453,7 @@ ACTOR Future<Void> snapCreate(Database inputCx, StringRef snapCmd, UID snapUID)
StringRef snapCmdArgs = snapCmd;
StringRef snapCmdPart = snapCmdArgs.eat(":");
state Standalone<StringRef> snapUIDRef(snapUID.toString());
Standalone<StringRef> snapUIDRef(snapUID.toString());
state Standalone<StringRef> snapPayloadRef = snapCmdPart
.withSuffix(LiteralStringRef(":uid="))
.withSuffix(snapUIDRef)
@ -3537,3 +3538,89 @@ ACTOR Future<Void> snapCreate(Database inputCx, StringRef snapCmd, UID snapUID)
TraceEvent("SnapCreateComplete").detail("UID", snapUID);
return Void();
}
ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef snapPayload, UID snapUID, Optional<UID> debugID) {
TraceEvent("NativeAPI.SnapshotDatabaseEnter")
.detail("SnapPayload", snapPayload)
.detail("SnapUID", snapUID);
try {
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.snapshotDatabase.Before");
}
ProxySnapRequest req(snapPayload, snapUID, debugID);
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, req, cx->taskID, true /*atmostOnce*/ ));
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
"NativeAPI.SnapshotDatabase.After");
} catch (Error& e) {
TraceEvent("NativeAPI.SnapshotDatabaseError")
.detail("SnapPayload", snapPayload)
.detail("SnapUID", snapUID)
.error(e, true /* includeCancelled */);
throw;
}
return Void();
}
ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUID) {
// remember the client ID before the snap operation
state UID preSnapClientUID = cx->clientInfo->get().id;
TraceEvent("SnapCreateEnterVersion2")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
StringRef snapCmdArgs = snapCmd;
StringRef snapCmdPart = snapCmdArgs.eat(":");
Standalone<StringRef> snapUIDRef(snapUID.toString());
Standalone<StringRef> snapPayloadRef = snapCmdPart
.withSuffix(LiteralStringRef(":uid="))
.withSuffix(snapUIDRef)
.withSuffix(LiteralStringRef(","))
.withSuffix(snapCmdArgs);
try {
Future<Void> exec = snapshotDatabase(Reference<DatabaseContext>::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID);
wait(exec);
} catch (Error& e) {
TraceEvent("SnapshotDatabaseErrorVersion2")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.error(e);
throw;
}
UID postSnapClientUID = cx->clientInfo->get().id;
if (preSnapClientUID != postSnapClientUID) {
// if the client IDs changed then we fail the snapshot
TraceEvent("UIDMismatchVersion2")
.detail("SnapPreSnapClientUID", preSnapClientUID)
.detail("SnapPostSnapClientUID", postSnapClientUID);
throw coordinators_changed();
}
TraceEvent("SnapCreateExitVersion2")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
return Void();
}
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version) {
if (version == 1) {
wait(snapCreateVersion1(cx, snapCmd, snapUID));
return Void();
}
state int oldMode = wait( setDDMode( cx, 0 ) );
try {
wait(snapCreateVersion2(cx, snapCmd, snapUID));
} catch (Error& e) {
state Error err = e;
wait(success( setDDMode( cx, oldMode ) ));
throw err;
}
wait(success( setDDMode( cx, oldMode ) ));
return Void();
}

View File

@ -344,7 +344,7 @@ int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::num
// Takes a snapshot of the cluster, specifically the following persistent
// states: coordinator, TLog and storage state
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID);
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version);
#include "flow/unactorcompiler.h"
#endif

View File

@ -18,21 +18,24 @@
* limitations under the License.
*/
#include "flow/ActorCollection.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/Knobs.h"
#include <set>
#include <sstream>
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbrpc/Replication.h"
#include "flow/UnitTest.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WaitFailure.h"
#include "flow/ActorCollection.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class TCTeamInfo;
@ -3997,9 +4000,134 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
TraceEvent("SnapDataDistributor.SnapReqEnter")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
try {
// disable tlog pop on local tlog nodes
state std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs(false);
std::vector<Future<Void>> disablePops;
for (const auto & tlog : tlogs) {
disablePops.push_back(
transformErrors(throwErrorOr(tlog.disablePopRequest.tryGetReply(TLogDisablePopRequest(snapReq.snapUID))), operation_failed())
);
}
wait(waitForAll(disablePops));
TraceEvent("SnapDataDistributor.AfterDisableTLogPop")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local storage nodes
std::vector<WorkerInterface> storageWorkers = wait(getStorageWorkers(cx, db, true /* localOnly */));
TraceEvent("SnapDataDistributor.GotStorageWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<Void>> storageSnapReqs;
for (const auto & worker : storageWorkers) {
storageSnapReqs.push_back(
transformErrors(throwErrorOr(worker.workerSnapReq.tryGetReply(WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("storage")))), operation_failed())
);
}
wait(waitForAll(storageSnapReqs));
TraceEvent("SnapDataDistributor.AfterSnapStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local tlog nodes
std::vector<Future<Void>> tLogSnapReqs;
for (const auto & tlog : tlogs) {
tLogSnapReqs.push_back(
transformErrors(throwErrorOr(tlog.snapRequest.tryGetReply(TLogSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("tlog")))), operation_failed())
);
}
wait(waitForAll(tLogSnapReqs));
TraceEvent("SnapDataDistributor.AfterTLogStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// enable tlog pop on local tlog nodes
std::vector<Future<Void>> enablePops;
for (const auto & tlog : tlogs) {
enablePops.push_back(
transformErrors(throwErrorOr(tlog.enablePopRequest.tryGetReply(TLogEnablePopRequest(snapReq.snapUID))), operation_failed())
);
}
wait(waitForAll(enablePops));
TraceEvent("SnapDataDistributor.AfterEnableTLogPops")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap the coordinators
std::vector<WorkerInterface> coordWorkers = wait(getCoordWorkers(cx, db));
TraceEvent("SnapDataDistributor.GotCoordWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<Void>> coordSnapReqs;
for (const auto & worker : coordWorkers) {
coordSnapReqs.push_back(
transformErrors(throwErrorOr(worker.workerSnapReq.tryGetReply(WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("coord")))), operation_failed())
);
}
wait(waitForAll(coordSnapReqs));
TraceEvent("SnapDataDistributor.AfterSnapCoords")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
} catch (Error& e) {
TraceEvent("SnapDataDistributor.SnapReqExit")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID)
.error(e, true /*includeCancelled */);
throw e;
}
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<Void> dbInfoChange = db->onChange();
double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
try {
choose {
when (wait(dbInfoChange)) {
TraceEvent("SnapDDCreateDBInfoChanged")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(operation_failed());
}
when (wait(ddSnapCreateCore(snapReq, db))) {
TraceEvent("SnapDDCreateSuccess")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.send(Void());
}
when (wait(delay(delayTime))) {
TraceEvent("SnapDDCreateTimedOut")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(operation_failed());
}
}
} catch (Error& e) {
TraceEvent("SnapDDCreateError")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID)
.error(e, true /*includeCancelled */);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
state ActorCollection actors(false);
self->addActor.send(actors.getResult());
try {
TraceEvent("DataDistributorRunning", di.id());
@ -4016,6 +4144,9 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break;
}
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db));
}
}
}
catch ( Error &err ) {

View File

@ -29,6 +29,7 @@ struct DataDistributorInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltDataDistributorRequest> haltDataDistributor;
struct LocalityData locality;
RequestStream<struct DistributorSnapRequest> distributorSnapReq;
DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {}
@ -45,7 +46,7 @@ struct DataDistributorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltDataDistributor, locality);
serializer(ar, waitFailure, haltDataDistributor, locality, distributorSnapReq);
}
};
@ -63,4 +64,22 @@ struct HaltDataDistributorRequest {
}
};
struct DistributorSnapRequest
{
constexpr static FileIdentifier file_identifier = 22204900;
Arena arena;
StringRef snapPayload;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
explicit DistributorSnapRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit DistributorSnapRequest(StringRef snap, UID snapUID, Optional<UID> debugID = Optional<UID>()) : snapPayload(snap), snapUID(snapUID), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapPayload, snapUID, reply, arena, debugID);
}
};
#endif //FDBSERVER_DATADISTRIBUTORINTERFACE_H

View File

@ -10,6 +10,7 @@
#if defined(CMAKE_BUILD) || !defined(_WIN32)
#include "versions.h"
#endif
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ExecCmdValueString::ExecCmdValueString(StringRef pCmdValueString) {
@ -84,13 +85,13 @@ void ExecCmdValueString::dbgPrint() {
}
#if defined(_WIN32) || defined(__APPLE__)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime)
{
wait(delay(0.0));
return 0;
}
#else
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime)
{
state std::string argsString;
for (auto const& elem : paramList) {
@ -103,10 +104,15 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
state boost::process::child c(binPath, boost::process::args(paramList),
boost::process::std_err > boost::process::null);
// for async calls in simulator, always delay by a fixed time, otherwise
// the predictability of the simulator breaks
// for async calls in simulator, always delay by a deterinistic amount of time and do the call
// synchronously, otherwise the predictability of the simulator breaks
if (!isSync && g_network->isSimulated()) {
wait(delay(deterministicRandom()->random01()));
double snapDelay = std::max(maxSimDelayTime - 1, 0.0);
// add some randomness
snapDelay += deterministicRandom()->random01();
TraceEvent("SnapDelaySpawnProcess")
.detail("SnapDelay", snapDelay);
wait(delay(snapDelay));
}
if (!isSync && !g_network->isSimulated()) {
@ -147,10 +153,11 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
}
#endif
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role) {
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int snapVersion) {
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
state int err = 0;
state Future<int> cmdErr;
state double maxWaitTime = (snapVersion == 2) ? SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT : 3.0;
if (!g_network->isSimulated()) {
// get bin path
auto snapBin = execArg->getBinaryPath();
@ -169,17 +176,24 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
versionString += version;
paramList.push_back(versionString);
paramList.push_back(role);
cmdErr = spawnProcess(snapBin.toString(), paramList, 3.0, false /*isSync*/);
cmdErr = spawnProcess(snapBin.toString(), paramList, maxWaitTime, false /*isSync*/, 0);
wait(success(cmdErr));
err = cmdErr.get();
} else {
// copy the files
state std::string folderFrom = folder + "/.";
state std::string folderTo = folder + "-snap-" + uidStr.toString();
double maxSimDelayTime = 1.0;
if (snapVersion == 1) {
folderTo = folder + "-snap-" + uidStr.toString();
} else {
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
maxSimDelayTime = 10.0;
}
std::vector<std::string> paramList;
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, false /*isSync*/);
cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, maxSimDelayTime);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {
@ -188,7 +202,7 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
paramList.push_back("-a");
paramList.push_back(folderFrom);
paramList.push_back(folderTo);
cmdErr = spawnProcess(cpBin, paramList, 3.0, true /*isSync*/);
cmdErr = spawnProcess(cpBin, paramList, maxWaitTime, true /*isSync*/, 1.0);
wait(success(cmdErr));
err = cmdErr.get();
}
@ -233,3 +247,34 @@ bool isTLogInSameNode() {
NetworkAddress addr = g_network->getLocalAddress();
return tLogsAlive[addr].size() >= 1;
}
struct StorageVersionInfo {
Version version;
Version durableVersion;
};
// storage nodes get snapshotted through the worker interface which does not have context about version information,
// following info is gathered at worker level to facilitate printing of version info during storage snapshots.
typedef std::map<UID, StorageVersionInfo> UidStorageVersionInfo;
std::map<NetworkAddress, UidStorageVersionInfo> workerStorageVersionInfo;
void setDataVersion(UID uid, Version version) {
NetworkAddress addr = g_network->getLocalAddress();
workerStorageVersionInfo[addr][uid].version = version;
}
void setDataDurableVersion(UID uid, Version durableVersion) {
NetworkAddress addr = g_network->getLocalAddress();
workerStorageVersionInfo[addr][uid].durableVersion = durableVersion;
}
void printStorageVersionInfo() {
NetworkAddress addr = g_network->getLocalAddress();
for (auto itr = workerStorageVersionInfo[addr].begin(); itr != workerStorageVersionInfo[addr].end(); itr++) {
TraceEvent("StorageVersionInfo")
.detail("UID", itr->first)
.detail("Version", itr->second.version)
.detail("DurableVersion", itr->second.durableVersion);
}
}

View File

@ -11,6 +11,7 @@
#include "flow/Arena.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "fdbclient/FDBTypes.h"
// execute/snapshot command takes two arguments: <param1> <param2>
// param1 - represents the command type/name
@ -47,10 +48,11 @@ private: // data
// spawns a process pointed by `binPath` and the arguments provided at `paramList`,
// if the process spawned takes more than `maxWaitTime` then it will be killed
// if isSync is set to true then the process will be synchronously executed
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync);
// if async and in simulator then delay spawning the process to max of maxSimDelayTime
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime);
// helper to run all the work related to running the exec command
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role);
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int version);
// returns true if the execUID op is in progress
bool isExecOpInProgress(UID execUID);
@ -67,5 +69,12 @@ void unregisterTLog(UID uid);
// checks if there is any non-stopped TLog instance
bool isTLogInSameNode();
// set the data version for the specified storage server UID
void setDataVersion(UID uid, Version version);
// set the data durable version for the specified storage server UID
void setDataDurableVersion(UID uid, Version version);
// print the version info all the storages servers on this node
void printStorageVersionInfo();
#include "flow/unactorcompiler.h"
#endif

View File

@ -81,6 +81,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( TLOG_DEGRADED_DURATION, 5.0 );
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );
init( BEST_TEAM_STUCK_DELAY, 1.0 );

View File

@ -328,6 +328,9 @@ public:
int64_t TLOG_RECOVER_MEMORY_LIMIT;
double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
// disk snapshot
double SNAP_CREATE_MAX_TIMEOUT;
double MAX_TRANSACTIONS_PER_BYTE;
int64_t MIN_FREE_SPACE;

View File

@ -214,9 +214,13 @@ struct LogSystemConfig {
return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str());
}
std::vector<TLogInterface> allLocalLogs() const {
std::vector<TLogInterface> allLocalLogs(bool includeSatellite = true) const {
std::vector<TLogInterface> results;
for( int i = 0; i < tLogs.size(); i++ ) {
// skip satellite TLogs, if it was not needed
if (!includeSatellite && tLogs[i].locality == tagLocalitySatellite) {
continue;
}
if(tLogs[i].isLocal) {
for( int j = 0; j < tLogs[i].tLogs.size(); j++ ) {
if( tLogs[i].tLogs[j].present() ) {

View File

@ -30,6 +30,7 @@
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
@ -1523,6 +1524,85 @@ ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self) {
}
}
ACTOR Future<Void>
proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* commitData)
{
TraceEvent("SnapMasterProxy.SnapReqEnter")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
try {
// whitelist check
ExecCmdValueString execArg(snapReq.snapPayload);
StringRef binPath = execArg.getBinaryPath();
if (!isWhitelisted(commitData->whitelistedBinPathVec, binPath)) {
TraceEvent("SnapMasterProxy.WhiteListCheckFailed")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
throw transaction_not_permitted();
}
// db fully recovered check
if (commitData->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
// Cluster is not fully recovered and needs TLogs
// from previous generation for full recovery.
// Currently, snapshot of old tlog generation is not
// supported and hence failing the snapshot request until
// cluster is fully_recovered.
TraceEvent("SnapMasterProxy.ClusterNotFullyRecovered")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
throw cluster_not_fully_recovered();
}
auto result =
commitData->txnStateStore->readValue(LiteralStringRef("log_anti_quorum").withPrefix(configKeysPrefix)).get();
int logAntiQuorum = 0;
if (result.present()) {
logAntiQuorum = atoi(result.get().toString().c_str());
}
// FIXME: logAntiQuorum not supported, remove it later,
// In version2, we probably don't need this limtiation, but this needs to be tested.
if (logAntiQuorum > 0) {
TraceEvent("SnapMasterProxy.LogAnitQuorumNotSupported")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
throw txn_exec_log_anti_quorum();
}
// send a snap request to DD
if (!commitData->db->get().distributor.present()) {
TraceEvent(SevWarnAlways, "DataDistributorNotPresent");
throw operation_failed();
}
state Future<ErrorOr<Void>> ddSnapReq =
commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
try {
wait(throwErrorOr(ddSnapReq));
} catch (Error& e) {
TraceEvent("SnapMasterProxy.DDSnapResponseError")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID)
.error(e, true /*includeCancelled*/ );
throw e;
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("SnapMasterProxy.SnapReqError")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID)
.error(e, true /*includeCancelled*/);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
TraceEvent("SnapMasterProxy.SnapReqExit")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
return Void();
}
ACTOR Future<Void> masterProxyServerCore(
MasterProxyInterface proxy,
MasterInterface master,
@ -1713,6 +1793,9 @@ ACTOR Future<Void> masterProxyServerCore(
}
}
}
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
addActor.send(proxySnapCreate(snapReq, &commitData));
}
when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
state ReplyPromise<Void> reply = req.reply;
if(req.last) maxSequence = req.sequence + 1;

View File

@ -920,8 +920,8 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Reference<LogData> logData ) {
if (self->ignorePopRequest && inputTag.locality != tagLocalityTxs && inputTag != txsTag) {
TraceEvent("IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->toBePopped.find(inputTag) == self->toBePopped.end()
|| to > self->toBePopped[inputTag]) {
@ -1478,7 +1478,7 @@ ACTOR Future<Void> tLogSnapHelper(TLogData* self,
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog"));
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
@ -1796,6 +1796,81 @@ void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingM
req.reply.send( reply );
}
ACTOR Future<Void>
tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
if (err != 0) {
throw operation_failed();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").error(e, true /*includeCancelled */);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void>
tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", enablePopReq.snapUID.toString());
enablePopReq.reply.sendError(operation_failed());
return Void();
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
vector<Future<Void>> ignoredPops;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
TraceEvent("TLogExecCmdPopEnable")
.detail("UidStr", enablePopReq.snapUID.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
enablePopReq.reply.send(Void());
return Void();
}
ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Reference<LogData> logData, PromiseStream<Void> warningCollectorInput ) {
state Future<Void> dbInfoChange = Void();
@ -1857,6 +1932,30 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
else
req.reply.sendError( tlog_stopped() );
}
when( TLogDisablePopRequest req = waitNext( tli.disablePopRequest.getFuture() ) ) {
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", req.snapUID.toString())
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
req.reply.sendError(operation_failed());
} else {
//FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
self->ignorePopRequest = true;
self->ignorePopUid = req.snapUID.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
req.reply.send(Void());
}
}
when( TLogEnablePopRequest enablePopReq = waitNext( tli.enablePopRequest.getFuture() ) ) {
logData->addActor.send( tLogEnablePopReq( enablePopReq, self, logData) );
}
when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) {
logData->addActor.send( tLogSnapCreate( snapReq, self, logData) );
}
}
}

View File

@ -22,8 +22,10 @@
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"
#include "flow/Trace.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
@ -133,6 +135,39 @@ int64_t getPoppedVersionLag( const TraceEventFields& md ) {
return persistentDataDurableVersion - queuePoppedVersion;
}
ACTOR Future<vector<WorkerInterface>> getCoordWorkers( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
Optional<Value> coordinators = wait(
runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>>
{
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return tr->get(coordinatorsKey);
}));
if (!coordinators.present()) {
throw operation_failed();
}
std::vector<NetworkAddress> coordinatorsAddr =
ClusterConnectionString(coordinators.get().toString()).coordinators();
std::set<NetworkAddress> coordinatorsAddrSet;
for (const auto & addr : coordinatorsAddr) {
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", addr);
coordinatorsAddrSet.insert(addr);
}
vector<WorkerInterface> result;
for(const auto & worker : workers) {
NetworkAddress primary = worker.interf.address();
Optional<NetworkAddress> secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress;
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end()
|| (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
result.push_back(worker.interf);
}
}
return result;
}
// This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t,int64_t>> getTLogQueueInfo( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
@ -195,6 +230,42 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
}
}
ACTOR Future<vector<WorkerInterface>> getStorageWorkers( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, bool localOnly ) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap;
std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
for(const auto & worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
Optional<Value> regionsValue = wait(
runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>>
{
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return tr->get(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix));
}));
ASSERT(regionsValue.present());
int usableRegions = atoi(regionsValue.get().toString().c_str());
auto masterDcId = dbInfo->get().master.locality.dcId();
vector<WorkerInterface> result;
for (const auto & server : servers) {
TraceEvent(SevDebug, "DcIdInfo")
.detail("ServerLocalityID", server.locality.dcId())
.detail("MasterDcID", masterDcId);
if (!localOnly || (usableRegions == 1 || server.locality.dcId() == masterDcId)) {
auto itr = workersMap.find(server.address());
if(itr == workersMap.end()) {
TraceEvent(SevWarn, "GetStorageWorkers").detail("Reason", "Could not find worker for storage server").detail("SS", server.id());
throw operation_failed();
}
result.push_back(itr->second);
}
}
return result;
}
//Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");

View File

@ -38,6 +38,8 @@ Future<vector<StorageServerInterface>> getStorageServers( Database const& cx, bo
Future<vector<WorkerDetails>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<WorkerInterface> getMasterWorker( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> repairDeadDatacenter(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, bool const& localOnly );
Future<vector<WorkerInterface>> getCoordWorkers( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
#include "flow/unactorcompiler.h"
#endif

View File

@ -45,6 +45,10 @@ struct TLogInterface {
RequestStream< struct TLogConfirmRunningRequest > confirmRunning; // used for getReadVersion requests from client
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream< struct TLogRecoveryFinishedRequest > recoveryFinished;
RequestStream< struct TLogDisablePopRequest> disablePopRequest;
RequestStream< struct TLogEnablePopRequest> enablePopRequest;
RequestStream< struct TLogSnapRequest> snapRequest;
TLogInterface() {}
explicit TLogInterface(LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
@ -69,7 +73,8 @@ struct TLogInterface {
ASSERT(ar.isDeserializing || uniqueID != UID());
}
serializer(ar, uniqueID, sharedTLogID, locality, peekMessages, popMessages
, commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished);
, commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished
, disablePopRequest, enablePopRequest, snapRequest);
}
};
@ -231,6 +236,7 @@ struct TLogCommitRequest {
}
};
struct TLogQueuingMetricsReply {
constexpr static FileIdentifier file_identifier = 12206626;
double localTime;
@ -255,4 +261,53 @@ struct TLogQueuingMetricsRequest {
}
};
struct TLogDisablePopRequest {
constexpr static FileIdentifier file_identifier = 4022806;
Arena arena;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
TLogDisablePopRequest() = default;
TLogDisablePopRequest(const UID uid) : snapUID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapUID, reply, arena, debugID);
}
};
struct TLogEnablePopRequest {
constexpr static FileIdentifier file_identifier = 4022809;
Arena arena;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
TLogEnablePopRequest() = default;
TLogEnablePopRequest(const UID uid) : snapUID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapUID, reply, arena, debugID);
}
};
struct TLogSnapRequest {
constexpr static FileIdentifier file_identifier = 8184128;
ReplyPromise<Void> reply;
Arena arena;
StringRef snapPayload;
UID snapUID;
StringRef role;
TLogSnapRequest(StringRef snapPayload, UID snapUID, StringRef role) : snapPayload(snapPayload), snapUID(snapUID), role(role) {}
TLogSnapRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, snapPayload, snapUID, role, arena);
}
};
#endif

View File

@ -1170,8 +1170,8 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Reference<LogData> logData ) {
if (self->ignorePopRequest && inputTag.locality != tagLocalityTxs && inputTag != txsTag) {
TraceEvent("IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->toBePopped.find(inputTag) == self->toBePopped.end()
|| to > self->toBePopped[inputTag]) {
@ -1221,8 +1221,8 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
TraceEvent("EnableTLogPlayAllIgnoredPops");
// use toBePopped and issue all the pops
state std::map<Tag, Version>::iterator it;
state vector<Future<Void>> ignoredPops;
std::map<Tag, Version>::iterator it;
vector<Future<Void>> ignoredPops;
self->ignorePopRequest = false;
self->ignorePopUid = "";
self->ignorePopDeadline = 0.0;
@ -1853,7 +1853,7 @@ ACTOR Future<Void> tLogSnapHelper(TLogData* self,
ASSERT(!isExecOpInProgress(execUID));
if (!otherRoleExeced) {
setExecOpInProgress(execUID);
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog"));
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
err = tmpErr;
clearExecOpInProgress(execUID);
}
@ -2174,6 +2174,83 @@ void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingM
req.reply.send( reply );
}
ACTOR Future<Void>
tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
if (err != 0) {
throw operation_failed();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").error(e, true /*includeCancelled */);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void>
tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", enablePopReq.snapUID.toString());
enablePopReq.reply.sendError(operation_failed());
return Void();
}
TraceEvent("EnableTLogPlayAllIgnoredPops2");
// use toBePopped and issue all the pops
std::map<Tag, Version>::iterator it;
state vector<Future<Void>> ignoredPops;
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop")
.detail("Tag", it->first.toString())
.detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
TraceEvent("TLogExecCmdPopEnable")
.detail("UidStr", enablePopReq.snapUID.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnporePopRequest", self->ignorePopRequest)
.detail("IgnporePopDeadline", self->ignorePopDeadline)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
enablePopReq.reply.send(Void());
return Void();
}
ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Reference<LogData> logData, PromiseStream<Void> warningCollectorInput ) {
state Future<Void> dbInfoChange = Void();
@ -2235,6 +2312,30 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
else
req.reply.sendError( tlog_stopped() );
}
when( TLogDisablePopRequest req = waitNext( tli.disablePopRequest.getFuture() ) ) {
if (self->ignorePopUid != "") {
TraceEvent(SevWarn, "TLogPopDisableonDisable")
.detail("IgnorePopUid", self->ignorePopUid)
.detail("UidStr", req.snapUID.toString())
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
req.reply.sendError(operation_failed());
} else {
//FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
self->ignorePopRequest = true;
self->ignorePopUid = req.snapUID.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
req.reply.send(Void());
}
}
when( TLogEnablePopRequest enablePopReq = waitNext( tli.enablePopRequest.getFuture() ) ) {
logData->addActor.send( tLogEnablePopReq( enablePopReq, self, logData) );
}
when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) {
logData->addActor.send( tLogSnapCreate( snapReq, self, logData) );
}
}
}

View File

@ -1302,6 +1302,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
knownCommittedVersion = std::max(knownCommittedVersion, results[i].knownCommittedVersion);
}
if (knownCommittedVersion > results[new_safe_range_begin].end) {
knownCommittedVersion = results[new_safe_range_begin].end;
}
TraceEvent("GetDurableResult", dbgid).detail("Required", requiredCount).detail("Present", results.size()).detail("ServerState", sServerState)
.detail("RecoveryVersion", ((safe_range_end > 0) && (safe_range_end-1 < results.size())) ? results[ safe_range_end-1 ].end : -1)
.detail("EndVersion", results[ new_safe_range_begin ].end).detail("SafeBegin", safe_range_begin).detail("SafeEnd", safe_range_end)
@ -1588,17 +1592,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->logSystemType = prevState.logSystemType;
logSystem->rejoins = rejoins;
logSystem->lockResults = lockResults;
logSystem->recoverAt = minEnd;
if (knownCommittedVersion > minEnd) {
// FIXME: Remove the Sev40 once disk snapshot v2 feature is enabled, in all other
// code paths we should never be here.
TraceEvent(SevError, "KCVIsInvalid")
.detail("KnownCommittedVersion", knownCommittedVersion)
.detail("MinEnd", minEnd);
logSystem->knownCommittedVersion = minEnd;
} else {
logSystem->knownCommittedVersion = knownCommittedVersion;
knownCommittedVersion = minEnd;
}
logSystem->recoverAt = minEnd;
logSystem->knownCommittedVersion = knownCommittedVersion;
TraceEvent(SevDebug, "FinalRecoveryVersionInfo")
.detail("KCV", knownCommittedVersion)
.detail("MinEnd", minEnd);
logSystem->remoteLogsWrittenToCoreState = true;
logSystem->stopped = true;
logSystem->pseudoLocalities = prevState.pseudoLocalities;

View File

@ -61,6 +61,7 @@ struct WorkerInterface {
RequestStream< struct TraceBatchDumpRequest > traceBatchDumpRequest;
RequestStream< struct DiskStoreRequest > diskStoreRequest;
RequestStream<struct ExecuteRequest> execReq;
RequestStream<struct WorkerSnapRequest> workerSnapReq;
TesterInterface testerInterface;
@ -72,7 +73,7 @@ struct WorkerInterface {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq);
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq);
}
};
@ -258,6 +259,23 @@ struct ExecuteRequest {
}
};
struct WorkerSnapRequest {
constexpr static FileIdentifier file_identifier = 8194122;
ReplyPromise<Void> reply;
Arena arena;
StringRef snapPayload;
UID snapUID;
StringRef role;
WorkerSnapRequest(StringRef snapPayload, UID snapUID, StringRef role) : snapPayload(snapPayload), snapUID(snapUID), role(role) {}
WorkerSnapRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, snapPayload, snapUID, role, arena);
}
};
struct LoadedReply {
constexpr static FileIdentifier file_identifier = 9956350;
Standalone<StringRef> payload;

View File

@ -877,6 +877,20 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(const Cl
return std::make_pair(publicNetworkAddresses, listenNetworkAddresses);
}
// moves files from 'dirSrc' to 'dirToMove' if their name contains 'role'
void restoreRoleFilesHelper(std::string dirSrc, std::string dirToMove, std::string role) {
std::vector<std::string> returnFiles = platform::listFiles(dirSrc, "");
for (const auto & fileEntry: returnFiles) {
if (fileEntry != "fdb.cluster" && fileEntry.find(role) != std::string::npos) {
//rename files
TraceEvent("RenamingSnapFile")
.detail("Oldname", dirSrc + "/" + fileEntry)
.detail("Newname", dirToMove + "/" + fileEntry);
renameFile(dirSrc + "/" + fileEntry, dirToMove + "/" + fileEntry);
}
}
}
int main(int argc, char* argv[]) {
try {
platformInit();
@ -1706,6 +1720,7 @@ int main(int argc, char* argv[]) {
std::string absDataFolder = abspath(dataFolder);
ini.LoadFile(joinPath(absDataFolder, "restartInfo.ini").c_str());
int backupFailed = true;
int backupVersion = 1;
const char* isRestoringStr = ini.GetValue("RESTORE", "isRestoring", NULL);
if (isRestoringStr) {
isRestoring = atoi(isRestoringStr);
@ -1713,64 +1728,142 @@ int main(int argc, char* argv[]) {
if (isRestoring && backupFailedStr) {
backupFailed = atoi(backupFailedStr);
}
const char* backupVersionStr = ini.GetValue("RESTORE", "BackupVersion", NULL);
if (isRestoring && backupVersionStr) {
backupVersion = atoi(backupVersionStr);
}
}
if (isRestoring && !backupFailed) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
if (backupVersion == 1) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
// delete all files (except fdb.cluster) in non-snap directories
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
if (returnList[i].find(snapStr) != std::string::npos) {
continue;
}
// delete all files (except fdb.cluster) in non-snap directories
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
if (returnList[i].find(snapStr) != std::string::npos) {
continue;
}
std::string childf = absDataFolder + "/" + returnList[i];
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (int j = 0; j < returnFiles.size(); j++) {
if (returnFiles[j] != "fdb.cluster" && returnFiles[j] != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + returnFiles[j]);
deleteFile(childf + "/" + returnFiles[j]);
std::string childf = absDataFolder + "/" + returnList[i];
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (int j = 0; j < returnFiles.size(); j++) {
if (returnFiles[j] != "fdb.cluster" && returnFiles[j] != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + returnFiles[j]);
deleteFile(childf + "/" + returnFiles[j]);
}
}
}
}
// move the contents from snap folder to the original folder,
// delete snap folders
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + returnList[i];
// delete snap directories which are not part of restoreSnapUID
if (returnList[i].find(snapStr) == std::string::npos) {
if (returnList[i].find("snap") != std::string::npos) {
// move the contents from snap folder to the original folder,
// delete snap folders
for (int i = 0; i < returnList.size(); i++) {
if (returnList[i] == "." || returnList[i] == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + returnList[i];
// delete snap directories which are not part of restoreSnapUID
if (returnList[i].find(snapStr) == std::string::npos) {
if (returnList[i].find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
}
continue;
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
continue;
std::string origDir = returnList[i].substr(0, 32);
std::string dirToRemove = absDataFolder + "/" + origDir;
TraceEvent("DeletingOriginalNonSnapDirectory").detail("FileBeingDeleted", dirToRemove);
platform::eraseDirectoryRecursive(dirToRemove);
renameFile(dirSrc, dirToRemove);
TraceEvent("RenamingSnapToOriginalDirectory")
.detail("Oldname", dirSrc)
.detail("Newname", dirToRemove);
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
} else if (backupVersion == 2) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
// delete all files (except fdb.cluster) in non-snap directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
if (dirEntry.find(snapStr) != std::string::npos) {
continue;
}
std::string childf = absDataFolder + "/" + dirEntry;
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (const auto & fileEntry : returnFiles) {
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
TraceEvent("DeletingNonSnapfiles")
.detail("FileBeingDeleted", childf + "/" + fileEntry);
deleteFile(childf + "/" + fileEntry);
}
}
}
std::string origDir = returnList[i].substr(0, 32);
std::string dirToRemove = absDataFolder + "/" + origDir;
TraceEvent("DeletingOriginalNonSnapDirectory").detail("FileBeingDeleted", dirToRemove);
platform::eraseDirectoryRecursive(dirToRemove);
renameFile(dirSrc, dirToRemove);
TraceEvent("RenamingSnapToOriginalDirectory")
.detail("Oldname", dirSrc)
.detail("Newname", dirToRemove);
// cleanup unwanted and partial directories
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
// delete snap directories which are not part of restoreSnapUID
if (dirEntry.find(snapStr) == std::string::npos) {
if (dirEntry.find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
}
continue;
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
}
// move snapshotted files to appropriate locations
for (const auto & dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
std::string origDir = dirEntry.substr(0, 32);
std::string dirToMove = absDataFolder + "/" + origDir;
if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("tlog") != std::string::npos)) {
// restore tlog files
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("storage") != std::string::npos)) {
// restore storage files
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("coord") != std::string::npos)) {
// restore coordinator files
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
}
}
}
}
}

View File

@ -1615,6 +1615,7 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion )
Future<Void> checkFatalError = data->otherError.getFuture();
data->durableVersion.set( nextDurableVersion );
setDataDurableVersion(data->thisServerID, data->durableVersion.get());
if (checkFatalError.isReady()) checkFatalError.get();
//TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
@ -1960,7 +1961,7 @@ snapHelper(StorageServer* data, MutationRef m, Version ver)
if (!skip) {
setExecOpInProgress(execUID);
int err = wait(execHelper(&execArg, data->folder, "role=storage"));
int err = wait(execHelper(&execArg, data->folder, "role=storage", 1 /*version*/));
clearExecOpInProgress(execUID);
}
TraceEvent te = TraceEvent("ExecTraceStorage");
@ -2821,6 +2822,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->version.set( ver ); // Triggers replies to waiting gets for new version(s)
setDataVersion(data->thisServerID, data->version.get());
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;

View File

@ -1024,6 +1024,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
state double databasePingDelay = 1e9;
state ISimulator::BackupAgentType simBackupAgents = ISimulator::NoBackupAgents;
state ISimulator::BackupAgentType simDrAgents = ISimulator::NoBackupAgents;
state bool enableDD = false;
if (tests.empty()) useDB = true;
for( auto iter = tests.begin(); iter != tests.end(); ++iter ) {
if( iter->useDB ) useDB = true;
@ -1036,6 +1037,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
if (iter->simDrAgents != ISimulator::NoBackupAgents) {
simDrAgents = iter->simDrAgents;
}
enableDD = enableDD || getOption(iter->options[0], LiteralStringRef("enableDD"), false);
}
if (g_network->isSimulated()) {
@ -1058,6 +1060,9 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
if(useDB && startingConfiguration != StringRef()) {
try {
wait(timeoutError(changeConfiguration(cx, testers, startingConfiguration), 2000.0));
if (g_network->isSimulated() && enableDD) {
wait(success(setDDMode(cx, 1)));
}
}
catch(Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration");

View File

@ -655,6 +655,36 @@ void endRole(const Role &role, UID id, std::string reason, bool ok, Error e) {
}
}
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFolder) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", snapFolder)
.detail("ExecPayload", snapReq.snapPayload);
if (err != 0) {
throw operation_failed();
}
if (snapReq.role.toString() == "storage") {
printStorageVersionInfo();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("ExecHelperError").error(e, true /*includeCancelled*/);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<ClusterConnectionFile> connFile, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
@ -1201,7 +1231,7 @@ ACTOR Future<Void> workerServer(
when(state ExecuteRequest req = waitNext(interf.execReq.getFuture())) {
state ExecCmdValueString execArg(req.execPayload);
try {
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator"));
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator", 1 /*version*/));
StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
auto tokenStr = "ExecTrace/Coordinators/" + uidStr.toString();
auto te = TraceEvent("ExecTraceCoordinators");
@ -1217,6 +1247,13 @@ ACTOR Future<Void> workerServer(
req.reply.sendError(broken_promise());
}
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when( wait( errorForwarders.getResult() ) ) {}
when( wait( handleErrors ) ) {}
}

View File

@ -82,6 +82,7 @@ public: // variables
std::string restartInfoLocation; // file location to store the snap restore info
int maxRetryCntToRetrieveMessage; // number of retires to do trackLatest
bool skipCheck; // disable check if the exec fails
int snapVersion; // snapVersion to invoke
public: // ctor & dtor
SnapTestWorkload(WorkloadContext const& wcx)
@ -97,6 +98,7 @@ public: // ctor & dtor
getOption(options, LiteralStringRef("restartInfoLocation"), LiteralStringRef("simfdb/restartInfo.ini"))
.toString();
skipCheck = false;
snapVersion = getOption(options, LiteralStringRef("version"), 1);
}
public: // workload functions
@ -235,7 +237,7 @@ public: // workload functions
self->snapUID = deterministicRandom()->randomUniqueID();
try {
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create.sh");
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
wait(status);
break;
} catch (Error& e) {
@ -250,6 +252,14 @@ public: // workload functions
break;
}
}
if (self->snapVersion == 2) {
++retry;
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
if (retry > 5) {
snapFailed = true;
break;
}
}
}
}
CSimpleIni ini;
@ -258,6 +268,7 @@ public: // workload functions
std::string uidStr = self->snapUID.toString();
ini.SetValue("RESTORE", "RestoreSnapUID", uidStr.c_str());
ini.SetValue("RESTORE", "BackupFailed", format("%d", snapFailed).c_str());
ini.SetValue("RESTORE", "BackupVersion", format("%d", self->snapVersion).c_str());
ini.SaveFile(self->restartInfoLocation.c_str());
// write the snapUID to a file
TraceEvent("SnapshotCreateStatus").detail("Status", !snapFailed ? "Success" : "Failure");
@ -349,7 +360,7 @@ public: // workload functions
self->snapUID = deterministicRandom()->randomUniqueID();
try {
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create1.sh");
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
wait(status);
break;
} catch (Error& e) {

View File

@ -198,6 +198,7 @@ Future<T> timeoutError( Future<T> what, double time, TaskPriority taskID = TaskP
}
}
ACTOR template <class T>
Future<T> delayed( Future<T> what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay ) {
try {

View File

@ -148,6 +148,9 @@ add_fdb_test(
add_fdb_test(
TEST_FILES restarting/StorefrontTestRestart-1.txt
restarting/StorefrontTestRestart-2.txt)
add_fdb_test(
TEST_FILES restarting/from_6.2.0/SnapTestAttrition-1.txt
restarting/from_6.2.0/SnapTestAttrition-2.txt)
add_fdb_test(
TEST_FILES restarting/from_6.2.0/SnapTestSimpleRestart-1.txt
restarting/from_6.2.0/SnapTestSimpleRestart-2.txt)

View File

@ -12,6 +12,7 @@ testTitle=SnapCyclePre
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testTitle=SnapCycleShutdown
;save and shutdown

View File

@ -1,8 +1,10 @@
testTitle=SnapCycleRestore
;Post snap restore test
runSetup=false
testName=Cycle
transactionsPerSecond=2500.0
nodeCount=2500
testDuration=10.0
expectedRate=0
runSetup=false
testName=Cycle
transactionsPerSecond=2500.0
nodeCount=2500
testDuration=10.0
expectedRate=0
buggify=off
enableDD=true

View File

@ -0,0 +1,46 @@
testTitle=SnapTestPre
;write 1000 Keys ending with even numbers
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=0
clearAfterTest=false
testTitle=SnapTestTakeSnap
;Take snap and do read/write
testName=ReadWrite
testDuration=10.0
transactionsPerSecond=10000
writesPerTransactionA=0
readsPerTransactionA=10
writesPerTransactionB=10
readsPerTransactionB=1
alpha=0.5
nodeCount=100000
valueBytes=16
discardEdgeMeasurements=false
testName=SnapTest
numSnaps=1
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testName=Attrition
testDuration=10.0
testTitle=SnapTestPost
;write 1000 Keys ending with odd numbers
testName=SnapTest
numSnaps=1
maxSnapDelay=25.0
testID=2
clearAfterTest=false
; save and shutdown
testTitle=SnapSimpleShutdown
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0
isRestoring=1

View File

@ -0,0 +1,9 @@
; verify all keys are even numbered
testTitle=SnapTestVerify
testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=3
restartInfoLocation=simfdb/restartInfo.ini
buggify=off
enableDD=true

View File

@ -25,6 +25,7 @@ testTitle=SnapTestTakeSnap
maxSnapDelay=10.0
testID=1
clearAfterTest=false
version=2
testTitle=SnapTestPost
;write 1000 Keys ending with odd numbers

View File

@ -4,3 +4,5 @@ testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=3
buggify=off
enableDD=true

View File

@ -13,6 +13,7 @@ testTitle=SnapSimpleTakeSnap
maxSnapDelay=5.0
testID=1
clearAfterTest=false
version=2
;write 1000 Keys ending with odd number
testTitle=SnapSimplePost

View File

@ -4,3 +4,5 @@ testName=SnapTest
numSnaps=1
maxSnapDelay=3.0
testID=3
buggify=off
enableDD=true