snap v2: DD changes - snapshot orchestration logic

This commit is contained in:
sramamoorthy 2019-06-19 11:12:24 -07:00 committed by Alex Miller
parent d0793f5ca2
commit ba6bccce73
2 changed files with 123 additions and 1 deletions

View File

@ -26,9 +26,12 @@
#include "fdbserver/Knobs.h"
#include <set>
#include <sstream>
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbrpc/Replication.h"
#include "flow/UnitTest.h"
@ -3997,9 +4000,106 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Database cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
state double snapTimeout = g_network->isSimulated() ? 10.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
TraceEvent("SnapDataDistributor.SnapReqEnter")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
try {
// disable tlog pop on local tlog nodes
std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs();
state std::vector<Future<Void>> disablePops;
for (const auto & tlog : tlogs) {
disablePops.push_back(
timeoutError(tlog.disablePopRequest.getReply(TLogDisablePopRequest(snapReq.snapUID)), snapTimeout)
);
}
wait(waitForAll(disablePops));
TraceEvent("SnapDataDistributor.AfterDisableTLogPop")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local storage nodes
std::vector<WorkerInterface> storageWorkers = wait(timeoutError(getStorageWorkers(cx, db, true /* localOnly */), snapTimeout));
TraceEvent("SnapDataDistributor.GotStorageWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
state std::vector<Future<Void>> storageSnapReqs;
for (const auto & worker : storageWorkers) {
storageSnapReqs.push_back(
timeoutError(worker.workerSnapReq.getReply(WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("storage"))),
snapTimeout)
);
}
wait(waitForAll(storageSnapReqs));
TraceEvent("SnapDataDistributor.AfterSnapStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local tlog nodes
std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs();
state std::vector<Future<Void>> tLogSnapReqs;
for (const auto & tlog : tlogs) {
tLogSnapReqs.push_back(
timeoutError(tlog.snapRequest.getReply(TLogSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("tlog"))),
snapTimeout)
);
}
wait(waitForAll(tLogSnapReqs));
TraceEvent("SnapDataDistributor.AfterTLogStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// enable tlog pop on local tlog nodes
std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs();
state std::vector<Future<Void>> enablePops;
for (const auto & tlog : tlogs) {
disablePops.push_back(
timeoutError(tlog.enablePopRequest.getReply(TLogEnablePopRequest(snapReq.snapUID)), snapTimeout)
);
}
wait(waitForAll(enablePops));
TraceEvent("SnapDataDistributor.AfterEnableTLogPops")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap the coordinators
std::vector<WorkerInterface> coordWorkers = wait(timeoutError(getCoordWorkers(cx, db), snapTimeout));
TraceEvent("SnapDataDistributor.GotCoordWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
state std::vector<Future<Void>> coordSnapReqs;
for (const auto & worker : coordWorkers) {
coordSnapReqs.push_back(
timeoutError(worker.workerSnapReq.getReply(WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, LiteralStringRef("coord"))),
snapTimeout)
);
}
wait(waitForAll(coordSnapReqs));
TraceEvent("SnapDataDistributor.AfterSnapCoords")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("SnapDataDistributor.SnapReqExit")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID)
.error(e, true /*includeCancelled */);
if (e.code() == error_code_operation_cancelled) {
snapReq.reply.sendError(broken_promise());
} else {
snapReq.reply.sendError(e);
}
}
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
state Database cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
state ActorCollection actors(false);
try {
TraceEvent("DataDistributorRunning", di.id());
@ -4016,6 +4116,9 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break;
}
when(state DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db));
}
}
}
catch ( Error &err ) {

View File

@ -29,6 +29,7 @@ struct DataDistributorInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltDataDistributorRequest> haltDataDistributor;
struct LocalityData locality;
RequestStream<struct DistributorSnapRequest> distributorSnapReq;
DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {}
@ -45,7 +46,7 @@ struct DataDistributorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltDataDistributor, locality);
serializer(ar, waitFailure, haltDataDistributor, locality, distributorSnapReq);
}
};
@ -63,4 +64,22 @@ struct HaltDataDistributorRequest {
}
};
struct DistributorSnapRequest
{
constexpr static FileIdentifier file_identifier = 22204900;
Arena arena;
StringRef snapPayload;
UID snapUID;
ReplyPromise<Void> reply;
Optional<UID> debugID;
explicit DistributorSnapRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit DistributorSnapRequest(StringRef snap, UID snapUID, Optional<UID> debugID = Optional<UID>()) : snapPayload(snap), snapUID(snapUID), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapPayload, snapUID, reply, arena, debugID);
}
};
#endif //FDBSERVER_DATADISTRIBUTORINTERFACE_H