Fail exec req until the cluster is fully_recovered

This commit is contained in:
sramamoorthy 2019-03-20 09:29:09 -07:00 committed by Alex Miller
parent 4016f16c76
commit 6431513ad0
2 changed files with 66 additions and 55 deletions

View File

@ -222,6 +222,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
@ -258,7 +259,7 @@ struct ProxyCommitData {
lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0),
getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)),
firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0)
{}
};
@ -730,52 +731,63 @@ ACTOR Future<Void> commitBatch(
}
toCommit.addTypedMessage(m);
} else if (m.type == MutationRef::Exec) {
auto ranges = self->keyInfo.intersectingRanges(allKeys);
std::set<Tag> allSources;
if(self->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
// Cluster is not fully recovered and needs TLogs
// from previous generation for full recovery.
// Currently, snapshot of old tlog generation is not
// supported and hence failing the snapshot request until
// cluster is fully_recovered.
TraceEvent("ExecTransactionConflict")
.detail("TransactionNum", transactionNum);
committed[transactionNum] = ConflictBatch::TransactionConflict;
} else {
auto ranges = self->keyInfo.intersectingRanges(allKeys);
std::set<Tag> allSources;
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid)
.detail("To", "all sources")
.detail("Mutation", m.toString())
.detail("Version", commitVersion);
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid)
.detail("To", "all sources")
.detail("Mutation", m.toString())
.detail("Version", commitVersion);
for (auto r : ranges) {
auto& tags = r.value().tags;
if (!tags.size()) {
for (auto info : r.value().src_info) {
tags.push_back(info->tag);
for (auto r : ranges) {
auto& tags = r.value().tags;
if (!tags.size()) {
for (auto info : r.value().src_info) {
tags.push_back(info->tag);
}
for (auto info : r.value().dest_info) {
tags.push_back(info->tag);
}
uniquify(tags);
}
for (auto info : r.value().dest_info) {
tags.push_back(info->tag);
}
uniquify(tags);
allSources.insert(tags.begin(), tags.end());
}
allSources.insert(tags.begin(), tags.end());
}
auto param2 = m.param2.toString();
ExecCmdValueString execArg(param2);
execArg.dbgPrint();
auto uidStr = execArg.getBinaryArgValue("uid");
auto tokenStr = "ExecTrace/Proxy/" + uidStr;
auto param2 = m.param2.toString();
ExecCmdValueString execArg(param2);
execArg.dbgPrint();
auto uidStr = execArg.getBinaryArgValue("uid");
auto tokenStr = "ExecTrace/Proxy/" + uidStr;
auto te1 = TraceEvent("ProxyCommitTo", self->dbgid);
te1.detail("To", "all sources");
te1.detail("Mutation", m.toString());
te1.detail("Version", commitVersion);
te1.detail("NumTags", allSources.size());
if (m.param1 == execSnap) {
te1.trackLatest(tokenStr.c_str());
auto te1 = TraceEvent("ProxyCommitTo", self->dbgid);
te1.detail("To", "all sources");
te1.detail("Mutation", m.toString());
te1.detail("Version", commitVersion);
te1.detail("NumTags", allSources.size());
if (m.param1 == execSnap) {
te1.trackLatest(tokenStr.c_str());
}
int i = 0;
std::string allTagString;
for (auto& tag : allSources) {
allTagString += tag.toString() + ",";
toCommit.addTag(tag);
}
TraceEvent(SevDebug, "TagInfo").detail("Tags", allTagString);
toCommit.addTypedMessage(m, true /* allLocations */);
toCommit.setHasExecOp();
}
int i = 0;
std::string allTagString;
for (auto& tag : allSources) {
allTagString += tag.toString() + ",";
toCommit.addTag(tag);
}
TraceEvent(SevDebug, "TagInfo").detail("Tags", allTagString);
toCommit.addTypedMessage(m, true /* allLocations */);
toCommit.setHasExecOp();
} else
UNREACHABLE();
@ -1464,21 +1476,21 @@ ACTOR Future<Void> masterProxyServerCore(
//TraceEvent("ProxyInit1", proxy.id());
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
while (!(commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
wait(db->onChange());
wait(commitData.db->onChange());
}
state Future<Void> dbInfoChange = db->onChange();
state Future<Void> dbInfoChange = commitData.db->onChange();
//TraceEvent("ProxyInit3", proxy.id());
commitData.resolvers = db->get().resolvers;
commitData.resolvers = commitData.db->get().resolvers;
ASSERT(commitData.resolvers.size() != 0);
auto rs = commitData.keyResolvers.modify(allKeys);
for(auto r = rs.begin(); r != rs.end(); ++r)
r->value().emplace_back(0,0);
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<PeekSpecialInfo>>(), false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
@ -1487,8 +1499,8 @@ ACTOR Future<Void> masterProxyServerCore(
state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData, db));
addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(monitorRemoteCommitted(&commitData, commitData.db));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
@ -1499,21 +1511,21 @@ ACTOR Future<Void> masterProxyServerCore(
int commitBatchByteLimit =
(int)std::min<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX,
std::max<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN,
SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER)));
SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(commitData.db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER)));
commitBatcherActor = commitBatcher(&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit);
loop choose{
when( wait( dbInfoChange ) ) {
dbInfoChange = db->onChange();
if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
dbInfoChange = commitData.db->onChange();
if(commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
for(auto it : commitData.tag_popped) {
commitData.logSystem->pop(it.second, it.first);
}
commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog);
}
Optional<LatencyBandConfig> newLatencyBandConfig = db->get().latencyBandConfig;
Optional<LatencyBandConfig> newLatencyBandConfig = commitData.db->get().latencyBandConfig;
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
@ -1546,7 +1558,7 @@ ACTOR Future<Void> masterProxyServerCore(
const vector<CommitTransactionRequest> &trs = batchedRequests.first;
int batchBytes = batchedRequests.second;
//TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
if (trs.size() || (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
lastCommit = now();
if (trs.size() || lastCommitComplete.isReady()) {
@ -1585,7 +1597,7 @@ ACTOR Future<Void> masterProxyServerCore(
// get the list of workers
state std::vector<WorkerDetails> workers =
wait(db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest()));
wait(commitData.db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest()));
// send the exec command to the list of workers which are
// coordinators

View File

@ -217,7 +217,6 @@ public: // workload functions
throw operation_failed();
}
}
wait(delay(30.0));
}
CSimpleIni ini;
ini.SetUnicode();