snap v2: master proxy related changes
This commit is contained in:
parent
209448807d
commit
d0793f5ca2
|
@ -49,9 +49,9 @@ struct MasterProxyInterface {
|
|||
|
||||
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
|
||||
RequestStream< struct TxnStateRequest > txnState;
|
||||
RequestStream<struct ExecRequest> execReq;
|
||||
|
||||
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
|
||||
RequestStream<struct ExecRequest> execReq;
|
||||
RequestStream<struct ProxySnapRequest> proxySnapReq;
|
||||
|
||||
UID id() const { return commit.getEndpoint().token; }
|
||||
std::string toString() const { return id().shortString(); }
|
||||
|
@ -63,7 +63,7 @@ struct MasterProxyInterface {
|
|||
void serialize(Archive& ar) {
|
||||
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
|
||||
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
|
||||
txnState, getHealthMetrics, execReq);
|
||||
txnState, getHealthMetrics, execReq, proxySnapReq);
|
||||
}
|
||||
|
||||
void initEndpoints() {
|
||||
|
@ -350,4 +350,22 @@ struct ExecRequest
|
|||
}
|
||||
};
|
||||
|
||||
struct ProxySnapRequest
|
||||
{
|
||||
constexpr static FileIdentifier file_identifier = 22204900;
|
||||
Arena arena;
|
||||
StringRef snapPayload;
|
||||
UID snapUID;
|
||||
ReplyPromise<Void> reply;
|
||||
Optional<UID> debugID;
|
||||
|
||||
explicit ProxySnapRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
|
||||
explicit ProxySnapRequest(StringRef snap, UID snapUID, Optional<UID> debugID = Optional<UID>()) : snapPayload(snap), snapUID(snapUID), debugID(debugID) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, snapPayload, snapUID, reply, arena, debugID);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/DataDistributorInterface.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -1523,6 +1524,87 @@ ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
ACTOR Future<Void>
|
||||
proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* commitData)
|
||||
{
|
||||
TraceEvent("SnapMasterProxy.SnapReqEnter")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
try {
|
||||
// whitelist check
|
||||
state ExecCmdValueString execArg(snapReq.snapPayload);
|
||||
state StringRef binPath = execArg.getBinaryPath();
|
||||
if (!isWhitelisted(commitData->whitelistedBinPathVec, binPath)) {
|
||||
TraceEvent("SnapMasterProxy.WhiteListCheckFailed")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw transaction_not_permitted();
|
||||
}
|
||||
// db fully recovered check
|
||||
if (commitData->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
// Cluster is not fully recovered and needs TLogs
|
||||
// from previous generation for full recovery.
|
||||
// Currently, snapshot of old tlog generation is not
|
||||
// supported and hence failing the snapshot request until
|
||||
// cluster is fully_recovered.
|
||||
TraceEvent("SnapMasterProxy.ClusterNotFullyRecovered")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw cluster_not_fully_recovered();
|
||||
}
|
||||
auto result =
|
||||
commitData->txnStateStore->readValue(LiteralStringRef("log_anti_quorum").withPrefix(configKeysPrefix)).get();
|
||||
int logAntiQuorum = 0;
|
||||
if (result.present()) {
|
||||
logAntiQuorum = atoi(result.get().toString().c_str());
|
||||
}
|
||||
// FIXME: logAntiQuorum not supported, remove it later,
|
||||
// snap feature should just work fine version 2
|
||||
if (logAntiQuorum > 0) {
|
||||
TraceEvent("SnapMasterProxy.LogAnitQuorumNotSupported")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw txn_exec_log_anti_quorum();
|
||||
}
|
||||
|
||||
// send a snap request to DD
|
||||
if (!commitData->db->get().distributor.present()) {
|
||||
TraceEvent(SevWarnAlways, "DataDistributorNotPresent");
|
||||
throw operation_failed();
|
||||
}
|
||||
Future<Void> ddSnapReq = brokenPromiseToNever(
|
||||
commitData->db->get().distributor.get().distributorSnapReq.getReply(DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID))
|
||||
);
|
||||
try {
|
||||
double snapTimeout = g_network->isSimulated() ? 10.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
|
||||
double maxTimeout = 7 * snapTimeout;
|
||||
wait(timeoutError(ddSnapReq, maxTimeout));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapMasterProxy.DDSnapResponseError")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled*/ );
|
||||
throw;
|
||||
}
|
||||
snapReq.reply.send(Void());
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapMasterProxy.SnapReqError")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled*/);
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
snapReq.reply.sendError(broken_promise());
|
||||
} else {
|
||||
snapReq.reply.sendError(e);
|
||||
}
|
||||
}
|
||||
TraceEvent("SnapMasterProxy.SnapReqSuccess")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> masterProxyServerCore(
|
||||
MasterProxyInterface proxy,
|
||||
MasterInterface master,
|
||||
|
@ -1713,6 +1795,9 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
}
|
||||
}
|
||||
}
|
||||
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
|
||||
addActor.send(proxySnapCreate(snapReq, &commitData));
|
||||
}
|
||||
when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
|
||||
state ReplyPromise<Void> reply = req.reply;
|
||||
if(req.last) maxSequence = req.sequence + 1;
|
||||
|
|
Loading…
Reference in New Issue