From 6431513ad0aa755c386c855a766e7e9b029c7901 Mon Sep 17 00:00:00 2001 From: sramamoorthy Date: Wed, 20 Mar 2019 09:29:09 -0700 Subject: [PATCH] Fail exec req until the cluster is fully_recovered --- fdbserver/MasterProxyServer.actor.cpp | 120 ++++++++++++++----------- fdbserver/workloads/SnapTest.actor.cpp | 1 - 2 files changed, 66 insertions(+), 55 deletions(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index b8262c63bf..520843bcb5 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -222,6 +222,7 @@ struct ProxyCommitData { RequestStream getConsistentReadVersion; RequestStream commit; Database cx; + Reference> db; EventMetricHandle singleKeyMutationEvent; std::map> storageCache; @@ -258,7 +259,7 @@ struct ProxyCommitData { lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0), localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), - firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), + firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), db(db), singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0) {} }; @@ -730,52 +731,63 @@ ACTOR Future commitBatch( } toCommit.addTypedMessage(m); } else if (m.type == MutationRef::Exec) { - auto ranges = self->keyInfo.intersectingRanges(allKeys); - std::set allSources; + if(self->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) { + // Cluster is not fully recovered and needs TLogs + // from previous generation for full recovery. + // Currently, snapshot of old tlog generation is not + // supported and hence failing the snapshot request until + // cluster is fully_recovered. + TraceEvent("ExecTransactionConflict") + .detail("TransactionNum", transactionNum); + committed[transactionNum] = ConflictBatch::TransactionConflict; + } else { + auto ranges = self->keyInfo.intersectingRanges(allKeys); + std::set allSources; - if (debugMutation("ProxyCommit", commitVersion, m)) - TraceEvent("ProxyCommitTo", self->dbgid) - .detail("To", "all sources") - .detail("Mutation", m.toString()) - .detail("Version", commitVersion); + if (debugMutation("ProxyCommit", commitVersion, m)) + TraceEvent("ProxyCommitTo", self->dbgid) + .detail("To", "all sources") + .detail("Mutation", m.toString()) + .detail("Version", commitVersion); - for (auto r : ranges) { - auto& tags = r.value().tags; - if (!tags.size()) { - for (auto info : r.value().src_info) { - tags.push_back(info->tag); + for (auto r : ranges) { + auto& tags = r.value().tags; + if (!tags.size()) { + for (auto info : r.value().src_info) { + tags.push_back(info->tag); + } + for (auto info : r.value().dest_info) { + tags.push_back(info->tag); + } + uniquify(tags); } - for (auto info : r.value().dest_info) { - tags.push_back(info->tag); - } - uniquify(tags); + allSources.insert(tags.begin(), tags.end()); } - allSources.insert(tags.begin(), tags.end()); - } - auto param2 = m.param2.toString(); - ExecCmdValueString execArg(param2); - execArg.dbgPrint(); - auto uidStr = execArg.getBinaryArgValue("uid"); - auto tokenStr = "ExecTrace/Proxy/" + uidStr; + auto param2 = m.param2.toString(); + ExecCmdValueString execArg(param2); + execArg.dbgPrint(); + auto uidStr = execArg.getBinaryArgValue("uid"); + auto tokenStr = "ExecTrace/Proxy/" + uidStr; - auto te1 = TraceEvent("ProxyCommitTo", self->dbgid); - te1.detail("To", "all sources"); - te1.detail("Mutation", m.toString()); - te1.detail("Version", commitVersion); - te1.detail("NumTags", allSources.size()); - if (m.param1 == execSnap) { - te1.trackLatest(tokenStr.c_str()); + auto te1 = TraceEvent("ProxyCommitTo", self->dbgid); + te1.detail("To", "all sources"); + te1.detail("Mutation", m.toString()); + te1.detail("Version", commitVersion); + te1.detail("NumTags", allSources.size()); + if (m.param1 == execSnap) { + te1.trackLatest(tokenStr.c_str()); + } + int i = 0; + std::string allTagString; + for (auto& tag : allSources) { + allTagString += tag.toString() + ","; + toCommit.addTag(tag); + } + TraceEvent(SevDebug, "TagInfo").detail("Tags", allTagString); + toCommit.addTypedMessage(m, true /* allLocations */); + toCommit.setHasExecOp(); } - int i = 0; - std::string allTagString; - for (auto& tag : allSources) { - allTagString += tag.toString() + ","; - toCommit.addTag(tag); - } - TraceEvent(SevDebug, "TagInfo").detail("Tags", allTagString); - toCommit.addTypedMessage(m, true /* allLocations */); - toCommit.setHasExecOp(); } else UNREACHABLE(); @@ -1464,21 +1476,21 @@ ACTOR Future masterProxyServerCore( //TraceEvent("ProxyInit1", proxy.id()); // Wait until we can load the "real" logsystem, since we don't support switching them currently - while (!(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) { + while (!(commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) { //TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch); - wait(db->onChange()); + wait(commitData.db->onChange()); } - state Future dbInfoChange = db->onChange(); + state Future dbInfoChange = commitData.db->onChange(); //TraceEvent("ProxyInit3", proxy.id()); - commitData.resolvers = db->get().resolvers; + commitData.resolvers = commitData.db->get().resolvers; ASSERT(commitData.resolvers.size() != 0); auto rs = commitData.keyResolvers.modify(allKeys); for(auto r = rs.begin(); r != rs.end(); ++r) r->value().emplace_back(0,0); - commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor); + commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor); commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference>(), false); commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true); @@ -1487,8 +1499,8 @@ ACTOR Future masterProxyServerCore( state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR)); TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit); - addActor.send(monitorRemoteCommitted(&commitData, db)); - addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply)); + addActor.send(monitorRemoteCommitted(&commitData, commitData.db)); + addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply)); addActor.send(readRequestServer(proxy, &commitData)); addActor.send(rejoinServer(proxy, &commitData)); addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply)); @@ -1499,21 +1511,21 @@ ACTOR Future masterProxyServerCore( int commitBatchByteLimit = (int)std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX, std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN, - SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER))); + SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(commitData.db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER))); commitBatcherActor = commitBatcher(&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit); loop choose{ when( wait( dbInfoChange ) ) { - dbInfoChange = db->onChange(); - if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) { - commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor); + dbInfoChange = commitData.db->onChange(); + if(commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) { + commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor); for(auto it : commitData.tag_popped) { commitData.logSystem->pop(it.second, it.first); } commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog); } - Optional newLatencyBandConfig = db->get().latencyBandConfig; + Optional newLatencyBandConfig = commitData.db->get().latencyBandConfig; if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present() || (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig)) @@ -1546,7 +1558,7 @@ ACTOR Future masterProxyServerCore( const vector &trs = batchedRequests.first; int batchBytes = batchedRequests.second; //TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount); - if (trs.size() || (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) { + if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) { lastCommit = now(); if (trs.size() || lastCommitComplete.isReady()) { @@ -1585,7 +1597,7 @@ ACTOR Future masterProxyServerCore( // get the list of workers state std::vector workers = - wait(db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest())); + wait(commitData.db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest())); // send the exec command to the list of workers which are // coordinators diff --git a/fdbserver/workloads/SnapTest.actor.cpp b/fdbserver/workloads/SnapTest.actor.cpp index 02f62ec78b..22f5f0e0c9 100644 --- a/fdbserver/workloads/SnapTest.actor.cpp +++ b/fdbserver/workloads/SnapTest.actor.cpp @@ -217,7 +217,6 @@ public: // workload functions throw operation_failed(); } } - wait(delay(30.0)); } CSimpleIni ini; ini.SetUnicode();