snap v2: worker and other helper related changes

This commit is contained in:
sramamoorthy 2019-06-19 11:20:44 -07:00 committed by Alex Miller
parent f4e257e464
commit 8f1f0c0435
7 changed files with 175 additions and 12 deletions

View File

@ -84,13 +84,13 @@ void ExecCmdValueString::dbgPrint() {
}
#if defined(_WIN32) || defined(__APPLE__)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime)
{
wait(delay(0.0));
return 0;
}
#else
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync)
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime)
{
state std::string argsString;
for (auto const& elem : paramList) {
@ -106,7 +106,15 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
// for async calls in simulator, always delay by a fixed time, otherwise
// the predictability of the simulator breaks
if (!isSync && g_network->isSimulated()) {
wait(delay(deterministicRandom()->random01()));
double snapDelay = 0.0;
if (maxSimDelayTime > 1.0) {
int delayTime = (int) round(maxSimDelayTime - 1);
snapDelay += deterministicRandom()->randomInt(0, delayTime);
}
snapDelay += deterministicRandom()->random01();
TraceEvent("SnapDelaySpawnProcess")
.detail("SnapDelay", snapDelay);
wait(delay(snapDelay));
}
if (!isSync && !g_network->isSimulated()) {
@ -147,7 +155,7 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
}
#endif
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role) {
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int snapVersion) {
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
state int err = 0;
state Future<int> cmdErr;
@ -169,17 +177,24 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
versionString += version;
paramList.push_back(versionString);
paramList.push_back(role);
cmdErr = spawnProcess(snapBin.toString(), paramList, 3.0, false /*isSync*/);
cmdErr = spawnProcess(snapBin.toString(), paramList, 3.0, false /*isSync*/, 0);
wait(success(cmdErr));
err = cmdErr.get();
} else {
// copy the files
state std::string folderFrom = folder + "/.";
state std::string folderTo = folder + "-snap-" + uidStr.toString();
double maxSimDelayTime = 1.0;
if (snapVersion == 1) {
folderTo = folder + "-snap-" + uidStr.toString();
} else {
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
maxSimDelayTime = 10.0;
}
std::vector<std::string> paramList;
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, false /*isSync*/);
cmdErr = spawnProcess(mkdirBin, paramList, 3.0, false /*isSync*/, maxSimDelayTime);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {
@ -188,7 +203,7 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
paramList.push_back("-a");
paramList.push_back(folderFrom);
paramList.push_back(folderTo);
cmdErr = spawnProcess(cpBin, paramList, 3.0, true /*isSync*/);
cmdErr = spawnProcess(cpBin, paramList, 3.0, true /*isSync*/, 1.0);
wait(success(cmdErr));
err = cmdErr.get();
}
@ -233,3 +248,32 @@ bool isTLogInSameNode() {
NetworkAddress addr = g_network->getLocalAddress();
return tLogsAlive[addr].size() >= 1;
}
struct StorageVersionInfo {
Version version;
Version durableVersion;
};
typedef std::map<UID, StorageVersionInfo> UidStorageVersionInfo;
std::map<NetworkAddress, UidStorageVersionInfo> workerStorageVersionInfo;
void setDataVersion(UID uid, Version version) {
NetworkAddress addr = g_network->getLocalAddress();
workerStorageVersionInfo[addr][uid].version = version;
}
void setDataDurableVersion(UID uid, Version durableVersion) {
NetworkAddress addr = g_network->getLocalAddress();
workerStorageVersionInfo[addr][uid].durableVersion = durableVersion;
}
void printStorageVersionInfo() {
NetworkAddress addr = g_network->getLocalAddress();
for (auto itr = workerStorageVersionInfo[addr].begin(); itr != workerStorageVersionInfo[addr].end(); itr++) {
TraceEvent("StorageVersionInfo")
.detail("UID", itr->first)
.detail("Version", itr->second.version)
.detail("DurableVersion", itr->second.durableVersion);
}
}

View File

@ -11,6 +11,7 @@
#include "flow/Arena.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h"
#include "fdbclient/FDBTypes.h"
// execute/snapshot command takes two arguments: <param1> <param2>
// param1 - represents the command type/name
@ -47,10 +48,11 @@ private: // data
// spawns a process pointed by `binPath` and the arguments provided at `paramList`,
// if the process spawned takes more than `maxWaitTime` then it will be killed
// if isSync is set to true then the process will be synchronously executed
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync);
// if async and in simulator delay spawning the process to max of maxSimDelayTime
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime);
// helper to run all the work related to running the exec command
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role);
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int version);
// returns true if the execUID op is in progress
bool isExecOpInProgress(UID execUID);
@ -67,5 +69,12 @@ void unregisterTLog(UID uid);
// checks if there is any non-stopped TLog instance
bool isTLogInSameNode();
// set the data version for the specified storage server UID
void setDataVersion(UID uid, Version version);
// set the data durable version for the specified storage server UID
void setDataDurableVersion(UID uid, Version version);
// print the version info all the storages servers on this node
void printStorageVersionInfo();
#include "flow/unactorcompiler.h"
#endif

View File

@ -133,6 +133,40 @@ int64_t getPoppedVersionLag( const TraceEventFields& md ) {
return persistentDataDurableVersion - queuePoppedVersion;
}
ACTOR Future<vector<WorkerInterface>> getCoordWorkers( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
state Future<std::vector<WorkerDetails>> workersFuture = getWorkers(dbInfo);
state std::vector<WorkerDetails> workers = wait(workersFuture);
Optional<Value> coordinators = wait(
runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>>
{
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return tr->get(coordinatorsKey);
}));
if (!coordinators.present()) {
throw operation_failed();
}
state std::vector<NetworkAddress> coordinatorsAddr =
ClusterConnectionString(coordinators.get().toString()).coordinators();
state std::set<NetworkAddress> coordinatorsAddrSet;
for (const auto & addr : coordinatorsAddr) {
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", addr);
coordinatorsAddrSet.insert(addr);
}
vector<WorkerInterface> result;
for(const auto & worker : workers) {
NetworkAddress primary = worker.interf.address();
Optional<NetworkAddress> secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress;
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end()
|| (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
result.push_back(worker.interf);
}
}
return result;
}
// This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t,int64_t>> getTLogQueueInfo( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
@ -195,6 +229,30 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
}
}
ACTOR Future<vector<WorkerInterface>> getStorageWorkers( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, bool localOnly ) {
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
state Future<std::vector<WorkerDetails>> workersFuture = getWorkers(dbInfo);
state std::vector<StorageServerInterface> servers = wait(serversFuture);
state std::vector<WorkerDetails> workers = wait(workersFuture);
vector<WorkerInterface> result;
std::map<NetworkAddress, WorkerInterface> workersMap;
for(const auto & worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
// FIXME: impelement localOnly
for (const auto & server : servers) {
auto itr = workersMap.find(server.address());
if(itr == workersMap.end()) {
TraceEvent(SevWarn, "GetStorageWorkers").detail("Reason", "Could not find worker for storage server").detail("SS", server.id());
continue;
}
result.push_back(itr->second);
}
return result;
}
//Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");

View File

@ -38,6 +38,8 @@ Future<vector<StorageServerInterface>> getStorageServers( Database const& cx, bo
Future<vector<WorkerDetails>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<WorkerInterface> getMasterWorker( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> repairDeadDatacenter(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, bool const& localOnly );
Future<vector<WorkerInterface>> getCoordWorkers( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
#include "flow/unactorcompiler.h"
#endif

View File

@ -61,6 +61,7 @@ struct WorkerInterface {
RequestStream< struct TraceBatchDumpRequest > traceBatchDumpRequest;
RequestStream< struct DiskStoreRequest > diskStoreRequest;
RequestStream<struct ExecuteRequest> execReq;
RequestStream<struct WorkerSnapRequest> workerSnapReq;
TesterInterface testerInterface;
@ -72,7 +73,7 @@ struct WorkerInterface {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq);
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq);
}
};
@ -258,6 +259,23 @@ struct ExecuteRequest {
}
};
struct WorkerSnapRequest {
constexpr static FileIdentifier file_identifier = 8194122;
ReplyPromise<Void> reply;
Arena arena;
StringRef snapPayload;
UID snapUID;
StringRef role;
WorkerSnapRequest(StringRef snapPayload, UID snapUID, StringRef role) : snapPayload(snapPayload), snapUID(snapUID), role(role) {}
WorkerSnapRequest() : snapPayload() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, snapPayload, snapUID, role, arena);
}
};
struct LoadedReply {
constexpr static FileIdentifier file_identifier = 9956350;
Standalone<StringRef> payload;

View File

@ -1615,6 +1615,7 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion )
Future<Void> checkFatalError = data->otherError.getFuture();
data->durableVersion.set( nextDurableVersion );
setDataDurableVersion(data->thisServerID, data->durableVersion.get());
if (checkFatalError.isReady()) checkFatalError.get();
//TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
@ -1960,7 +1961,7 @@ snapHelper(StorageServer* data, MutationRef m, Version ver)
if (!skip) {
setExecOpInProgress(execUID);
int err = wait(execHelper(&execArg, data->folder, "role=storage"));
int err = wait(execHelper(&execArg, data->folder, "role=storage", 1 /*version*/));
clearExecOpInProgress(execUID);
}
TraceEvent te = TraceEvent("ExecTraceStorage");
@ -2821,6 +2822,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->version.set( ver ); // Triggers replies to waiting gets for new version(s)
setDataVersion(data->thisServerID, data->version.get());
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;

View File

@ -655,6 +655,29 @@ void endRole(const Role &role, UID id, std::string reason, bool ok, Error e) {
}
}
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFolder) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString(), 2 /* version */));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", snapFolder)
.detail("ExecPayload", snapReq.snapPayload);
if (snapReq.role.toString() == "storage") {
printStorageVersionInfo();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("ExecHelperError").error(e);
snapReq.reply.sendError(e);
}
return Void();
}
ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<ClusterConnectionFile> connFile, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
@ -1201,7 +1224,7 @@ ACTOR Future<Void> workerServer(
when(state ExecuteRequest req = waitNext(interf.execReq.getFuture())) {
state ExecCmdValueString execArg(req.execPayload);
try {
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator"));
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator", 1 /*version*/));
StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
auto tokenStr = "ExecTrace/Coordinators/" + uidStr.toString();
auto te = TraceEvent("ExecTraceCoordinators");
@ -1217,6 +1240,13 @@ ACTOR Future<Void> workerServer(
req.reply.sendError(broken_promise());
}
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when( wait( errorForwarders.getResult() ) ) {}
when( wait( handleErrors ) ) {}
}