When a commit is cancelled, record all unresponsive/failed TLogs (#11459)

* When a commit is cancelled, record all unresponsive/failed TLogs

* fixup!

* fixup!

* fixup!
This commit is contained in:
Xiaoge Su 2024-07-23 17:26:11 -07:00 committed by GitHub
parent 74990e44bd
commit b324bfb9b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 222 additions and 134 deletions

View File

@ -36,7 +36,6 @@
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/VersionVector.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"

View File

@ -19,6 +19,7 @@
*/
#include <algorithm>
#include <string_view>
#include <tuple>
#include <variant>
@ -625,15 +626,27 @@ ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Referen
namespace CommitBatch {
constexpr const std::string_view UNSET = std::string_view();
constexpr const std::string_view INITIALIZE = "initialize"sv;
constexpr const std::string_view PRE_RESOLUTION = "preResolution"sv;
constexpr const std::string_view RESOLUTION = "resolution"sv;
constexpr const std::string_view POST_RESOLUTION = "postResolution"sv;
constexpr const std::string_view TRANSACTION_LOGGING = "transactionLogging"sv;
constexpr const std::string_view REPLY = "reply"sv;
constexpr const std::string_view COMPLETE = "complete"sv;
struct CommitBatchContext {
using StoreCommit_t = std::vector<std::pair<Future<LogSystemDiskQueueAdapter::CommitMessage>, Future<Void>>>;
ProxyCommitData* const pProxyCommitData;
std::vector<CommitTransactionRequest> trs;
int currentBatchMemBytesCount;
const int currentBatchMemBytesCount;
double startTime;
// The current stage of batch commit
std::string_view stage = UNSET;
// If encryption is enabled this value represents the total time (in nanoseconds) that was spent on encryption in
// the commit proxy for a given Commit Batch
Optional<double> encryptionTime;
@ -2437,14 +2450,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// Issue acs mutation at the end of this commit batch
addAccumulativeChecksumMutations(self);
}
self->loggingComplete = pProxyCommitData->logSystem->push(self->prevVersion,
self->commitVersion,
pProxyCommitData->committedVersion.get(),
pProxyCommitData->minKnownCommittedVersion,
self->toCommit,
span.context,
self->debugID,
tpcvMap);
const auto versionSet = ILogSystem::PushVersionSet{ self->prevVersion,
self->commitVersion,
pProxyCommitData->committedVersion.get(),
pProxyCommitData->minKnownCommittedVersion };
self->loggingComplete =
pProxyCommitData->logSystem->push(versionSet, self->toCommit, span.context, self->debugID, tpcvMap);
float ratio = self->toCommit.getEmptyMessageRatio();
pProxyCommitData->stats.commitBatchingEmptyMessageRatio.addMeasurement(ratio);
@ -2725,46 +2736,78 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
return Void();
}
} // namespace CommitBatch
// Commit one batch of transactions trs
ACTOR Future<Void> commitBatch(ProxyCommitData* self,
std::vector<CommitTransactionRequest>* trs,
int currentBatchMemBytesCount) {
ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
// WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as
// possible
state CommitBatch::CommitBatchContext context(self, trs, currentBatchMemBytesCount);
pContext->stage = INITIALIZE;
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::Commit;
// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches)
// so we need to downgrade here
wait(delay(0, TaskPriority::ProxyCommit));
context.pProxyCommitData->lastVersionTime = context.startTime;
++context.pProxyCommitData->stats.commitBatchIn;
context.setupTraceBatch();
pContext->pProxyCommitData->lastVersionTime = pContext->startTime;
++pContext->pProxyCommitData->stats.commitBatchIn;
pContext->setupTraceBatch();
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined
/// and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
wait(CommitBatch::preresolutionProcessing(&context));
if (context.rejected) {
self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
pContext->stage = PRE_RESOLUTION;
wait(CommitBatch::preresolutionProcessing(pContext));
if (pContext->rejected) {
pContext->pProxyCommitData->commitBatchesMemBytesCount -= pContext->currentBatchMemBytesCount;
return Void();
}
/////// Phase 2: Resolution (waiting on the network; pipelined)
wait(CommitBatch::getResolution(&context));
pContext->stage = RESOLUTION;
wait(CommitBatch::getResolution(pContext));
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but
/// doesn't need to be)
wait(CommitBatch::postResolution(&context));
pContext->stage = POST_RESOLUTION;
wait(CommitBatch::postResolution(pContext));
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
wait(CommitBatch::transactionLogging(&context));
pContext->stage = TRANSACTION_LOGGING;
wait(CommitBatch::transactionLogging(pContext));
/////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for
/// latency)
wait(CommitBatch::reply(&context));
pContext->stage = REPLY;
wait(CommitBatch::reply(pContext));
pContext->stage = COMPLETE;
return Void();
}
} // namespace CommitBatch
ACTOR Future<Void> commitBatch(ProxyCommitData* pCommitData,
std::vector<CommitTransactionRequest>* trs,
int currentBatchMemBytesCount) {
state CommitBatch::CommitBatchContext context(pCommitData, trs, currentBatchMemBytesCount);
Future<Void> commit = CommitBatch::commitBatchImpl(&context);
// When encryption is enabled, cipher key fetching issue (e.g KMS outage) is detected by the
// encryption monitor. In that case, commit timeout is expected and timeout error is suppressed. But
// we still want to trigger recovery occasionally (with the COMMIT_PROXY_MAX_LIVENESS_TIMEOUT), in
// the hope that the cipher key fetching issue could be resolve by recovery (e.g, if one CP have
// networking issue connecting to EKP, and recovery may exclude the CP).
Future<Void> livenessTimeout = timeoutErrorIfCleared(
commit, pCommitData->encryptionMonitor->degraded(), SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT);
Future<Void> maxLivenessTimeout = timeoutError(livenessTimeout, SERVER_KNOBS->COMMIT_PROXY_MAX_LIVENESS_TIMEOUT);
try {
wait(maxLivenessTimeout);
} catch (Error& err) {
TraceEvent(SevInfo, "CommitBatchFailed").detail("Stage", context.stage).detail("ErrorCode", err.code());
throw failed_to_progress();
}
return Void();
}
@ -3965,32 +4008,31 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
when(std::pair<std::vector<CommitTransactionRequest>, int> batchedRequests =
waitNext(batchedCommits.getFuture())) {
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
/*
TraceEvent("CommitProxyCTR", proxy.id())
.detail("CommitTransactions", trs.size())
.detail("TransactionRate", transactionRate)
.detail("TransactionQueue", transactionQueue.size())
.detail("ReleasedTransactionCount", transactionCount);
TraceEvent("CommitProxyCore", commitData.dbgid)
.detail("TxSize", trs.size())
.detail("MasterLifetime", masterLifetime.toString())
.detail("DbMasterLifetime", commitData.db->get().masterLifetime.toString())
.detail("RecoveryState", commitData.db->get().recoveryState)
.detail("CCInf", commitData.db->get().clusterInterface.id().toString());
*/
const std::vector<CommitTransactionRequest>& trs = batchedRequests.first;
int batchBytes = batchedRequests.second;
//TraceEvent("CommitProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
//TraceEvent("CommitProxyCore", commitData.dbgid).detail("TxSize", trs.size()).detail("MasterLifetime", masterLifetime.toString()).detail("DbMasterLifetime", commitData.db->get().masterLifetime.toString()).detail("RecoveryState", commitData.db->get().recoveryState).detail("CCInf", commitData.db->get().clusterInterface.id().toString());
if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
masterLifetime.isEqual(commitData.db->get().masterLifetime))) {
const int batchBytes = batchedRequests.second;
if (trs.size() ||
(commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
masterLifetime.isEqual(commitData.db->get().masterLifetime) && lastCommitComplete.isReady())) {
if (trs.size() || lastCommitComplete.isReady()) {
// When encryption is enabled, cipher key fetching issue (e.g KMS outage) is detected by the
// encryption monitor. In that case, commit timeout is expected and timeout error is suppressed. But
// we still want to trigger recovery occasionally (with the COMMIT_PROXY_MAX_LIVENESS_TIMEOUT), in
// the hope that the cipher key fetching issue could be resolve by recovery (e.g, if one CP have
// networking issue connecting to EKP, and recovery may exclude the CP).
lastCommitComplete = transformError(
timeoutError(
timeoutErrorIfCleared(
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes),
commitData.encryptionMonitor->degraded(),
SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT),
SERVER_KNOBS->COMMIT_PROXY_MAX_LIVENESS_TIMEOUT),
timed_out(),
failed_to_progress());
addActor.send(lastCommitComplete);
}
lastCommitComplete =
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes);
addActor.send(lastCommitComplete);
}
}
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {

View File

@ -20,17 +20,42 @@
#include "fdbserver/TagPartitionedLogSystem.actor.h"
#include <utility>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Version> minVersionWhenReady(Future<Void> f, std::vector<Future<TLogCommitReply>> replies) {
wait(f);
Version minVersion = std::numeric_limits<Version>::max();
for (auto& reply : replies) {
if (reply.isReady() && !reply.isError()) {
minVersion = std::min(minVersion, reply.get().version);
ACTOR Future<Version> minVersionWhenReady(Future<Void> f,
std::vector<std::pair<UID, Future<TLogCommitReply>>> replies) {
try {
wait(f);
Version minVersion = std::numeric_limits<Version>::max();
for (const auto& [_tlogID, reply] : replies) {
if (reply.isReady() && !reply.isError()) {
minVersion = std::min(minVersion, reply.get().version);
}
}
return minVersion;
} catch (Error& err) {
if (err.code() == error_code_operation_cancelled) {
TraceEvent(g_network->isSimulated() ? SevInfo : SevWarnAlways, "TLogPushCancelled");
int index = 0;
for (const auto& [tlogID, reply] : replies) {
if (reply.isReady()) {
continue;
}
std::string message;
if (reply.isError()) {
// FIXME Use C++20 format when it is available
message = format("TLogPushRespondError%04d", index++);
} else {
message = format("TLogPushNoResponse%04d", index++);
}
TraceEvent(g_network->isSimulated() ? SevInfo : SevWarnAlways, message.c_str())
.detail("TLogID", tlogID);
}
}
throw;
}
return minVersion;
}
LogSet::LogSet(const TLogSet& tLogSet)
@ -545,30 +570,25 @@ ACTOR Future<TLogCommitReply> TagPartitionedLogSystem::recordPushMetrics(Referen
return t;
}
Future<Version> TagPartitionedLogSystem::push(Version prevVersion,
Version version,
Version knownCommittedVersion,
Version minKnownCommittedVersion,
Future<Version> TagPartitionedLogSystem::push(const ILogSystem::PushVersionSet& versionSet,
LogPushData& data,
SpanContext const& spanContext,
Optional<UID> debugID,
Optional<std::unordered_map<uint16_t, Version>> tpcvMap) {
// FIXME: Randomize request order as in LegacyLogSystem?
std::vector<Future<Void>> quorumResults;
std::vector<Future<TLogCommitReply>> allReplies;
int location = 0;
Span span("TPLS:push"_loc, spanContext);
Version prevVersion = versionSet.prevVersion;
std::unordered_map<int, int> tLogCount;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
int location = 0;
int logGroupLocal = 0;
for (auto& it : tLogs) {
const auto& tpcvMapRef = tpcvMap.get();
for (const auto& it : tLogs) {
if (!it->isLocal) {
continue;
}
for (int loc = 0; loc < it->logServers.size(); loc++) {
if (tpcvMap.get().find(location) != tpcvMap.get().end()) {
for (size_t loc = 0; loc < it->logServers.size(); loc++) {
if (tpcvMapRef.contains(location)) {
tLogCount[logGroupLocal]++;
}
location++;
@ -576,57 +596,72 @@ Future<Version> TagPartitionedLogSystem::push(Version prevVersion,
logGroupLocal++;
}
}
int logGroupLocal = 0;
for (auto& it : tLogs) {
if (it->isLocal && it->logServers.size()) {
if (it->connectionResetTrackers.size() == 0) {
for (int i = 0; i < it->logServers.size(); i++) {
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
}
}
if (it->tlogPushDistTrackers.empty()) {
for (int i = 0; i < it->logServers.size(); i++) {
it->tlogPushDistTrackers.push_back(
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
it->logServers[i]->get().interf().address().toString(),
Histogram::Unit::milliseconds));
}
}
std::vector<Future<Void>> tLogCommitResults;
for (int loc = 0; loc < it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
data.recordEmptyMessage(location, msg);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
if (tpcvMap.get().find(location) != tpcvMap.get().end()) {
prevVersion = tpcvMap.get()[location];
} else {
location++;
continue;
}
}
allReplies.push_back(recordPushMetrics(
it->connectionResetTrackers[loc],
it->tlogPushDistTrackers[loc],
it->logServers[loc]->get().interf().address(),
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
msg.arena(),
prevVersion,
version,
knownCommittedVersion,
minKnownCommittedVersion,
msg,
tLogCount[logGroupLocal],
debugID),
TaskPriority::ProxyTLogCommitReply)));
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);
location++;
}
quorumResults.push_back(quorum(tLogCommitResults, tLogCommitResults.size() - it->tLogWriteAntiQuorum));
logGroupLocal++;
int location = 0;
int logGroupLocal = 0;
std::vector<Future<Void>> quorumResults;
std::vector<std::pair<UID, Future<TLogCommitReply>>> allReplies;
const Span span("TPLS:push"_loc, spanContext);
for (auto& it : tLogs) {
if (!it->isLocal) {
// Remote TLogs should read from LogRouter
continue;
}
if (it->logServers.size() == 0) {
// Empty TLog set
continue;
}
if (it->connectionResetTrackers.size() == 0) {
for (int i = 0; i < it->logServers.size(); i++) {
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
}
}
if (it->tlogPushDistTrackers.empty()) {
for (int i = 0; i < it->logServers.size(); i++) {
it->tlogPushDistTrackers.push_back(
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
it->logServers[i]->get().interf().address().toString(),
Histogram::Unit::milliseconds));
}
}
std::vector<Future<Void>> tLogCommitResults;
for (size_t loc = 0; loc < it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
data.recordEmptyMessage(location, msg);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
if (tpcvMap.get().contains(location)) {
prevVersion = tpcvMap.get()[location];
} else {
location++;
continue;
}
}
const auto& interface = it->logServers[loc]->get().interf();
const auto request = TLogCommitRequest(spanContext,
msg.arena(),
prevVersion,
versionSet.version,
versionSet.knownCommittedVersion,
versionSet.minKnownCommittedVersion,
msg,
tLogCount[logGroupLocal],
debugID);
auto tLogReply = recordPushMetrics(it->connectionResetTrackers[loc],
it->tlogPushDistTrackers[loc],
interface.address(),
interface.commit.getReply(request, TaskPriority::ProxyTLogCommitReply));
allReplies.emplace_back(interface.id(), tLogReply);
Future<Void> commitSuccess = success(tLogReply);
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);
location++;
}
quorumResults.push_back(quorum(tLogCommitResults, tLogCommitResults.size() - it->tLogWriteAntiQuorum));
logGroupLocal++;
}
return minVersionWhenReady(waitForAll(quorumResults), allReplies);
@ -1697,6 +1732,10 @@ Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
recruitmentStalled);
}
LogSystemType TagPartitionedLogSystem::getLogSystemType() const {
return logSystemType;
}
LogSystemConfig TagPartitionedLogSystem::getLogSystemConfig() const {
LogSystemConfig logSystemConfig(epoch);
logSystemConfig.logSystemType = logSystemType;

View File

@ -208,7 +208,8 @@ ACTOR Future<Void> TLogTestContext::sendPushMessages(TLogTestContext* pTLogTestC
toCommit.addTags(tags);
toCommit.writeTypedMessage(m);
}
Future<Version> loggingComplete = pTLogTestContext->ls->push(prev, next, prev, prev, toCommit, SpanContext());
const auto versionSet = ILogSystem::PushVersionSet{ prev, next, prev, prev };
Future<Version> loggingComplete = pTLogTestContext->ls->push(versionSet, toCommit, SpanContext());
Version ver = wait(loggingComplete);
ASSERT_LE(ver, next);
prev++;

View File

@ -21,24 +21,25 @@
#ifndef FDBSERVER_LOGSYSTEM_H
#define FDBSERVER_LOGSYSTEM_H
#include <cstdint>
#include <set>
#include <stdint.h>
#include <vector>
#include "fdbserver/SpanContextMessage.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/MutationTracking.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/Histogram.h"
#include "flow/IndexedSet.h"
#include "flow/Knobs.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Replication.h"
struct DBCoreState;
struct TLogSet;
@ -516,11 +517,14 @@ struct ILogSystem {
virtual Future<Void> onError() const = 0;
// Never returns normally, but throws an error if the subsystem stops working
// Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
virtual Future<Version> push(Version prevVersion,
Version version,
Version knownCommittedVersion,
Version minKnownCommittedVersion,
struct PushVersionSet {
Version prevVersion;
Version version;
Version knownCommittedVersion;
Version minKnownCommittedVersion;
};
virtual Future<Version> push(const PushVersionSet& verisonSet,
LogPushData& data,
SpanContext const& spanContext,
Optional<UID> debugID = Optional<UID>(),
@ -658,6 +662,9 @@ struct ILogSystem {
// using fromLogSystemConfig()
virtual LogSystemConfig getLogSystemConfig() const = 0;
// Returns the type of LogSystem, this should be faster than using RTTI
virtual LogSystemType getLogSystemType() const = 0;
virtual Standalone<StringRef> getLogsValue() const = 0;
// Returns when the log system configuration has changed due to a tlog rejoin.
@ -684,6 +691,7 @@ struct ILogSystem {
virtual void stopRejoins() = 0;
// XXX: Should Tag related functions stay inside TagPartitionedLogSystem??
// Returns the pseudo tag to be popped for the given process class. If the
// process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) const = 0;

View File

@ -149,6 +149,8 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
UID getDebugID() const final;
LogSystemType getLogSystemType() const final;
void addPseudoLocality(int8_t locality);
Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) const final;
@ -199,10 +201,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
NetworkAddress addr,
Future<TLogCommitReply> in);
Future<Version> push(Version prevVersion,
Version version,
Version knownCommittedVersion,
Version minKnownCommittedVersion,
Future<Version> push(const ILogSystem::PushVersionSet& versionSet,
LogPushData& data,
SpanContext const& spanContext,
Optional<UID> debugID,