2017-05-26 04:48:44 +08:00
|
|
|
/*
|
|
|
|
* Resolver.actor.cpp
|
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2019-02-18 07:41:16 +08:00
|
|
|
#include "fdbclient/NativeAPI.actor.h"
|
2021-04-28 00:32:45 +08:00
|
|
|
#include "fdbclient/Notified.h"
|
2021-09-30 05:09:21 +08:00
|
|
|
#include "fdbclient/StorageServerInterface.h"
|
2021-04-28 00:32:45 +08:00
|
|
|
#include "fdbclient/SystemData.h"
|
2021-09-30 05:09:21 +08:00
|
|
|
#include "fdbserver/ApplyMetadataMutation.h"
|
2020-07-26 01:00:29 +08:00
|
|
|
#include "fdbserver/ConflictSet.h"
|
2021-09-30 05:09:21 +08:00
|
|
|
#include "fdbserver/IKeyValueStore.h"
|
2018-10-20 01:30:13 +08:00
|
|
|
#include "fdbserver/Knobs.h"
|
2021-09-30 05:09:21 +08:00
|
|
|
#include "fdbserver/LogSystem.h"
|
|
|
|
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
2021-04-28 00:32:45 +08:00
|
|
|
#include "fdbserver/MasterInterface.h"
|
|
|
|
#include "fdbserver/ResolverInterface.h"
|
2021-09-30 05:09:21 +08:00
|
|
|
#include "fdbserver/RestoreUtil.h"
|
2018-10-20 01:30:13 +08:00
|
|
|
#include "fdbserver/ServerDBInfo.h"
|
|
|
|
#include "fdbserver/StorageMetrics.h"
|
2021-04-28 00:32:45 +08:00
|
|
|
#include "fdbserver/WaitFailure.h"
|
|
|
|
#include "fdbserver/WorkerInterface.actor.h"
|
|
|
|
#include "flow/ActorCollection.h"
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
namespace {
|
|
|
|
struct ProxyRequestsInfo {
|
|
|
|
std::map<Version, ResolveTransactionBatchReply> outstandingBatches;
|
|
|
|
Version lastVersion;
|
|
|
|
|
|
|
|
ProxyRequestsInfo() : lastVersion(-1) {}
|
|
|
|
};
|
2021-03-11 02:06:03 +08:00
|
|
|
} // namespace
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
namespace {
|
2017-05-26 04:48:44 +08:00
|
|
|
struct Resolver : ReferenceCounted<Resolver> {
|
|
|
|
UID dbgid;
|
2020-09-16 13:29:49 +08:00
|
|
|
int commitProxyCount, resolverCount;
|
2017-05-26 04:48:44 +08:00
|
|
|
NotifiedVersion version;
|
|
|
|
AsyncVar<Version> neededVersion;
|
|
|
|
|
|
|
|
Map<Version, Standalone<VectorRef<StateTransactionRef>>> recentStateTransactions;
|
|
|
|
Deque<std::pair<Version, int64_t>> recentStateTransactionSizes;
|
|
|
|
AsyncVar<int64_t> totalStateBytes;
|
|
|
|
AsyncTrigger checkNeededVersion;
|
|
|
|
std::map<NetworkAddress, ProxyRequestsInfo> proxyInfoMap;
|
2021-03-11 02:06:03 +08:00
|
|
|
ConflictSet* conflictSet;
|
2017-05-26 04:48:44 +08:00
|
|
|
TransientStorageMetricSample iopsSample;
|
|
|
|
|
2021-09-30 05:09:21 +08:00
|
|
|
// Use LogSystem as backend for txnStateStore. However, the real commit
|
|
|
|
// happens at commit proxies and we never "write" to the LogSystem at
|
|
|
|
// Resolvers.
|
|
|
|
LogSystemDiskQueueAdapter* logAdapter;
|
|
|
|
Reference<ILogSystem> logSystem;
|
|
|
|
IKeyValueStore* txnStateStore;
|
|
|
|
|
|
|
|
std::map<UID, Reference<StorageInfo>> storageCache;
|
|
|
|
KeyRangeMap<ServerCacheInfo> keyInfo; // keyrange -> all storage servers in all DCs for the keyrange
|
2021-10-03 11:25:47 +08:00
|
|
|
std::unordered_map<UID, StorageServerInterface> tssMapping;
|
2021-09-30 05:09:21 +08:00
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
Version debugMinRecentStateVersion;
|
2020-03-06 02:49:21 +08:00
|
|
|
|
|
|
|
CounterCollection cc;
|
|
|
|
Counter resolveBatchIn;
|
|
|
|
Counter resolveBatchStart;
|
|
|
|
Counter resolvedTransactions;
|
|
|
|
Counter resolvedBytes;
|
|
|
|
Counter resolvedReadConflictRanges;
|
|
|
|
Counter resolvedWriteConflictRanges;
|
|
|
|
Counter transactionsAccepted;
|
|
|
|
Counter transactionsTooOld;
|
|
|
|
Counter transactionsConflicted;
|
|
|
|
Counter resolvedStateTransactions;
|
|
|
|
Counter resolvedStateMutations;
|
|
|
|
Counter resolvedStateBytes;
|
|
|
|
Counter resolveBatchOut;
|
|
|
|
Counter metricsRequests;
|
|
|
|
Counter splitRequests;
|
|
|
|
|
|
|
|
Future<Void> logger;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
Resolver(UID dbgid, int commitProxyCount, int resolverCount)
|
|
|
|
: dbgid(dbgid), commitProxyCount(commitProxyCount), resolverCount(resolverCount), version(-1),
|
|
|
|
conflictSet(newConflictSet()), iopsSample(SERVER_KNOBS->KEY_BYTES_PER_SAMPLE), debugMinRecentStateVersion(0),
|
|
|
|
cc("Resolver", dbgid.toString()), resolveBatchIn("ResolveBatchIn", cc),
|
|
|
|
resolveBatchStart("ResolveBatchStart", cc), resolvedTransactions("ResolvedTransactions", cc),
|
|
|
|
resolvedBytes("ResolvedBytes", cc), resolvedReadConflictRanges("ResolvedReadConflictRanges", cc),
|
|
|
|
resolvedWriteConflictRanges("ResolvedWriteConflictRanges", cc),
|
|
|
|
transactionsAccepted("TransactionsAccepted", cc), transactionsTooOld("TransactionsTooOld", cc),
|
|
|
|
transactionsConflicted("TransactionsConflicted", cc),
|
|
|
|
resolvedStateTransactions("ResolvedStateTransactions", cc),
|
|
|
|
resolvedStateMutations("ResolvedStateMutations", cc), resolvedStateBytes("ResolvedStateBytes", cc),
|
|
|
|
resolveBatchOut("ResolveBatchOut", cc), metricsRequests("MetricsRequests", cc),
|
|
|
|
splitRequests("SplitRequests", cc) {
|
|
|
|
specialCounter(cc, "Version", [this]() { return this->version.get(); });
|
|
|
|
specialCounter(cc, "NeededVersion", [this]() { return this->neededVersion.get(); });
|
|
|
|
specialCounter(cc, "TotalStateBytes", [this]() { return this->totalStateBytes.get(); });
|
2020-03-06 02:49:21 +08:00
|
|
|
|
|
|
|
logger = traceCounters("ResolverMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ResolverMetrics");
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
~Resolver() { destroyConflictSet(conflictSet); }
|
2017-05-26 04:48:44 +08:00
|
|
|
};
|
2019-09-12 07:26:40 +08:00
|
|
|
} // namespace
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatchRequest req) {
|
2017-05-26 04:48:44 +08:00
|
|
|
state Optional<UID> debugID;
|
2020-08-29 03:02:51 +08:00
|
|
|
state Span span("R:resolveBatch"_loc, req.spanContext);
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
// The first request (prevVersion < 0) comes from the master
|
2021-03-11 02:06:03 +08:00
|
|
|
state NetworkAddress proxyAddress =
|
|
|
|
req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();
|
|
|
|
state ProxyRequestsInfo& proxyInfo = self->proxyInfoMap[proxyAddress];
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2020-03-06 02:49:21 +08:00
|
|
|
++self->resolveBatchIn;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (req.debugID.present()) {
|
2019-05-11 05:01:52 +08:00
|
|
|
debugID = nondeterministicRandom()->randomUniqueID();
|
2017-05-26 04:48:44 +08:00
|
|
|
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), debugID.get().first());
|
2021-03-11 02:06:03 +08:00
|
|
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.Before");
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
/*TraceEvent("ResolveBatchStart", self->dbgid).detail("From", proxyAddress).detail("Version", req.version).detail("PrevVersion", req.prevVersion).detail("StateTransactions", req.txnStateTransactions.size())
|
2021-03-11 02:06:03 +08:00
|
|
|
.detail("RecentStateTransactions", self->recentStateTransactionSizes.size()).detail("LastVersion",
|
|
|
|
proxyInfo.lastVersion).detail("FirstVersion", self->recentStateTransactionSizes.empty() ? -1 :
|
|
|
|
self->recentStateTransactionSizes.front().first) .detail("ResolverVersion", self->version.get());*/
|
|
|
|
|
|
|
|
while (self->totalStateBytes.get() > SERVER_KNOBS->RESOLVER_STATE_MEMORY_LIMIT &&
|
|
|
|
self->recentStateTransactionSizes.size() &&
|
|
|
|
proxyInfo.lastVersion > self->recentStateTransactionSizes.front().first &&
|
|
|
|
req.version > self->neededVersion.get()) {
|
2018-06-09 02:11:08 +08:00
|
|
|
/*TraceEvent("ResolveBatchDelay").detail("From", proxyAddress).detail("StateBytes", self->totalStateBytes.get()).detail("RecentStateTransactionSize", self->recentStateTransactionSizes.size())
|
2021-03-11 02:06:03 +08:00
|
|
|
.detail("LastVersion", proxyInfo.lastVersion).detail("RequestVersion", req.version).detail("NeededVersion",
|
|
|
|
self->neededVersion.get()) .detail("RecentStateVer", self->recentStateTransactions.begin()->key);*/
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
wait(self->totalStateBytes.onChange() || self->neededVersion.onChange());
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (debugID.present()) {
|
|
|
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterQueueSizeCheck");
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
loop {
|
2021-03-11 02:06:03 +08:00
|
|
|
if (self->recentStateTransactionSizes.size() &&
|
|
|
|
proxyInfo.lastVersion <= self->recentStateTransactionSizes.front().first) {
|
|
|
|
self->neededVersion.set(std::max(self->neededVersion.get(), req.prevVersion));
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
choose {
|
2021-03-11 02:06:03 +08:00
|
|
|
when(wait(self->version.whenAtLeast(req.prevVersion))) { break; }
|
|
|
|
when(wait(self->checkNeededVersion.onTrigger())) {}
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-25 17:47:35 +08:00
|
|
|
if (check_yield(TaskPriority::DefaultEndpoint)) {
|
2021-03-11 02:06:03 +08:00
|
|
|
wait(delay(0, TaskPriority::Low) || delay(SERVER_KNOBS->COMMIT_SLEEP_TIME)); // FIXME: Is this still right?
|
2019-06-25 17:47:35 +08:00
|
|
|
g_network->setCurrentTask(TaskPriority::DefaultEndpoint);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (self->version.get() ==
|
|
|
|
req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
|
2020-03-06 02:49:21 +08:00
|
|
|
++self->resolveBatchStart;
|
|
|
|
self->resolvedTransactions += req.transactions.size();
|
|
|
|
self->resolvedBytes += req.transactions.expectedSize();
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (proxyInfo.lastVersion > 0) {
|
|
|
|
proxyInfo.outstandingBatches.erase(proxyInfo.outstandingBatches.begin(),
|
|
|
|
proxyInfo.outstandingBatches.upper_bound(req.lastReceivedVersion));
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
Version firstUnseenVersion = proxyInfo.lastVersion + 1;
|
|
|
|
proxyInfo.lastVersion = req.version;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (req.debugID.present())
|
2017-05-26 04:48:44 +08:00
|
|
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
|
2020-03-27 06:52:30 +08:00
|
|
|
|
|
|
|
ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version];
|
2020-03-25 00:48:03 +08:00
|
|
|
|
2021-09-17 08:42:34 +08:00
|
|
|
std::vector<int> commitList;
|
|
|
|
std::vector<int> tooOldList;
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
// Detect conflicts
|
|
|
|
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
|
2020-03-25 00:48:03 +08:00
|
|
|
ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
|
2017-05-26 04:48:44 +08:00
|
|
|
int keys = 0;
|
2021-03-11 02:06:03 +08:00
|
|
|
for (int t = 0; t < req.transactions.size(); t++) {
|
|
|
|
conflictBatch.addTransaction(req.transactions[t]);
|
2020-03-06 02:49:21 +08:00
|
|
|
self->resolvedReadConflictRanges += req.transactions[t].read_conflict_ranges.size();
|
|
|
|
self->resolvedWriteConflictRanges += req.transactions[t].write_conflict_ranges.size();
|
2021-03-11 02:06:03 +08:00
|
|
|
keys += req.transactions[t].write_conflict_ranges.size() * 2 +
|
|
|
|
req.transactions[t].read_conflict_ranges.size() * 2;
|
|
|
|
|
|
|
|
if (self->resolverCount > 1) {
|
|
|
|
for (auto it : req.transactions[t].write_conflict_ranges)
|
|
|
|
self->iopsSample.addAndExpire(
|
|
|
|
it.begin, SERVER_KNOBS->SAMPLE_OFFSET_PER_KEY + it.begin.size(), expire);
|
|
|
|
for (auto it : req.transactions[t].read_conflict_ranges)
|
|
|
|
self->iopsSample.addAndExpire(
|
|
|
|
it.begin, SERVER_KNOBS->SAMPLE_OFFSET_PER_KEY + it.begin.size(), expire);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
conflictBatch.detectConflicts(
|
|
|
|
req.version, req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS, commitList, &tooOldList);
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
reply.debugID = req.debugID;
|
2021-03-11 02:06:03 +08:00
|
|
|
reply.committed.resize(reply.arena, req.transactions.size());
|
|
|
|
for (int c = 0; c < commitList.size(); c++)
|
2017-05-26 04:48:44 +08:00
|
|
|
reply.committed[commitList[c]] = ConflictBatch::TransactionCommitted;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
for (int c = 0; c < tooOldList.size(); c++) {
|
2020-03-06 02:49:21 +08:00
|
|
|
ASSERT(reply.committed[tooOldList[c]] == ConflictBatch::TransactionConflict);
|
2017-05-26 04:48:44 +08:00
|
|
|
reply.committed[tooOldList[c]] = ConflictBatch::TransactionTooOld;
|
2020-03-06 02:49:21 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
self->transactionsAccepted += commitList.size();
|
|
|
|
self->transactionsTooOld += tooOldList.size();
|
|
|
|
self->transactionsConflicted += req.transactions.size() - commitList.size() - tooOldList.size();
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ASSERT(req.prevVersion >= 0 ||
|
|
|
|
req.txnStateTransactions.size() == 0); // The master's request should not have any state transactions
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
auto& stateTransactions = self->recentStateTransactions[req.version];
|
2020-03-06 02:49:21 +08:00
|
|
|
int64_t stateMutations = 0;
|
2017-05-26 04:48:44 +08:00
|
|
|
int64_t stateBytes = 0;
|
2021-09-30 13:10:10 +08:00
|
|
|
LogPushData toCommit(self->logSystem); // For accumulating private mutations
|
2021-10-01 02:59:40 +08:00
|
|
|
ResolverData resolverData(self->dbgid,
|
|
|
|
self->logSystem,
|
|
|
|
self->txnStateStore,
|
|
|
|
&self->keyInfo,
|
|
|
|
&toCommit,
|
|
|
|
req.version + 1,
|
2021-10-03 11:25:47 +08:00
|
|
|
&self->storageCache,
|
|
|
|
&self->tssMapping);
|
2021-03-11 02:06:03 +08:00
|
|
|
for (int t : req.txnStateTransactions) {
|
2020-03-06 02:49:21 +08:00
|
|
|
stateMutations += req.transactions[t].mutations.size();
|
2017-05-26 04:48:44 +08:00
|
|
|
stateBytes += req.transactions[t].mutations.expectedSize();
|
2021-03-11 02:06:03 +08:00
|
|
|
stateTransactions.push_back_deep(
|
|
|
|
stateTransactions.arena(),
|
|
|
|
StateTransactionRef(reply.committed[t] == ConflictBatch::TransactionCommitted,
|
|
|
|
req.transactions[t].mutations));
|
2021-09-30 13:10:10 +08:00
|
|
|
|
|
|
|
// Generate private mutations for metadata mutations
|
|
|
|
if (reply.committed[t] == ConflictBatch::TransactionCommitted) {
|
2021-10-03 11:25:47 +08:00
|
|
|
applyMetadataMutations(req.transactions[t].spanContext, resolverData, req.transactions[t].mutations);
|
2021-09-30 13:10:10 +08:00
|
|
|
}
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
2021-10-01 08:17:14 +08:00
|
|
|
|
|
|
|
// Adds private mutation messages to the reply message.
|
|
|
|
auto privateMutations = toCommit.getAllMessages();
|
|
|
|
for (const auto& mutations : privateMutations) {
|
|
|
|
reply.privateMutations.push_back(reply.arena, mutations);
|
|
|
|
reply.arena.dependsOn(mutations.arena());
|
|
|
|
}
|
2021-10-04 12:04:04 +08:00
|
|
|
reply.privateMutationCount = toCommit.getMutationCount();;
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2020-03-06 02:49:21 +08:00
|
|
|
self->resolvedStateTransactions += req.txnStateTransactions.size();
|
|
|
|
self->resolvedStateMutations += stateMutations;
|
|
|
|
self->resolvedStateBytes += stateBytes;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (stateBytes > 0)
|
2021-05-11 07:32:02 +08:00
|
|
|
self->recentStateTransactionSizes.emplace_back(req.version, stateBytes);
|
2021-03-11 02:06:03 +08:00
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
ASSERT(req.version >= firstUnseenVersion);
|
|
|
|
ASSERT(firstUnseenVersion >= self->debugMinRecentStateVersion);
|
|
|
|
|
|
|
|
TEST(firstUnseenVersion == req.version); // Resolver first unseen version is current version
|
|
|
|
|
|
|
|
auto stateTransactionItr = self->recentStateTransactions.lower_bound(firstUnseenVersion);
|
|
|
|
auto endItr = self->recentStateTransactions.lower_bound(req.version);
|
2021-03-11 02:06:03 +08:00
|
|
|
for (; stateTransactionItr != endItr; ++stateTransactionItr) {
|
|
|
|
reply.stateMutations.push_back(reply.arena, stateTransactionItr->value);
|
|
|
|
reply.arena.dependsOn(stateTransactionItr->value.arena());
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
//TraceEvent("ResolveBatch", self->dbgid).detail("PrevVersion", req.prevVersion).detail("Version", req.version).detail("StateTransactionVersions", self->recentStateTransactionSizes.size()).detail("StateBytes", stateBytes).detail("FirstVersion", self->recentStateTransactionSizes.empty() ? -1 : self->recentStateTransactionSizes.front().first).detail("StateMutationsIn", req.txnStateTransactions.size()).detail("StateMutationsOut", reply.stateMutations.size()).detail("From", proxyAddress);
|
|
|
|
|
|
|
|
ASSERT(!proxyInfo.outstandingBatches.empty());
|
2021-03-11 02:06:03 +08:00
|
|
|
ASSERT(self->proxyInfoMap.size() <= self->commitProxyCount + 1);
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
// SOMEDAY: This is O(n) in number of proxies. O(log n) solution using appropriate data structure?
|
|
|
|
Version oldestProxyVersion = req.version;
|
2021-03-11 02:06:03 +08:00
|
|
|
for (auto itr = self->proxyInfoMap.begin(); itr != self->proxyInfoMap.end(); ++itr) {
|
2020-09-11 08:44:15 +08:00
|
|
|
//TraceEvent("ResolveBatchProxyVersion", self->dbgid).detail("CommitProxy", itr->first).detail("Version", itr->second.lastVersion);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (itr->first.isValid()) { // Don't consider the first master request
|
2017-05-26 04:48:44 +08:00
|
|
|
oldestProxyVersion = std::min(itr->second.lastVersion, oldestProxyVersion);
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
2017-05-26 04:48:44 +08:00
|
|
|
// The master's request version should never prevent us from clearing recentStateTransactions
|
2021-03-11 02:06:03 +08:00
|
|
|
ASSERT(self->debugMinRecentStateVersion == 0 ||
|
|
|
|
self->debugMinRecentStateVersion > itr->second.lastVersion);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST(oldestProxyVersion == req.version); // The proxy that sent this request has the oldest current version
|
2021-03-11 02:06:03 +08:00
|
|
|
TEST(oldestProxyVersion !=
|
|
|
|
req.version); // The proxy that sent this request does not have the oldest current version
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
bool anyPopped = false;
|
2021-03-11 02:06:03 +08:00
|
|
|
if (firstUnseenVersion <= oldestProxyVersion && self->proxyInfoMap.size() == self->commitProxyCount + 1) {
|
2017-05-26 04:48:44 +08:00
|
|
|
TEST(true); // Deleting old state transactions
|
2021-03-11 02:06:03 +08:00
|
|
|
self->recentStateTransactions.erase(self->recentStateTransactions.begin(),
|
|
|
|
self->recentStateTransactions.upper_bound(oldestProxyVersion));
|
2017-05-26 04:48:44 +08:00
|
|
|
self->debugMinRecentStateVersion = oldestProxyVersion + 1;
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
while (self->recentStateTransactionSizes.size() &&
|
|
|
|
self->recentStateTransactionSizes.front().first <= oldestProxyVersion) {
|
2017-05-26 04:48:44 +08:00
|
|
|
anyPopped = true;
|
|
|
|
stateBytes -= self->recentStateTransactionSizes.front().second;
|
|
|
|
self->recentStateTransactionSizes.pop_front();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
self->version.set(req.version);
|
|
|
|
bool breachedLimit = self->totalStateBytes.get() <= SERVER_KNOBS->RESOLVER_STATE_MEMORY_LIMIT &&
|
|
|
|
self->totalStateBytes.get() + stateBytes > SERVER_KNOBS->RESOLVER_STATE_MEMORY_LIMIT;
|
2017-05-26 04:48:44 +08:00
|
|
|
self->totalStateBytes.setUnconditional(self->totalStateBytes.get() + stateBytes);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (anyPopped || breachedLimit) {
|
2017-05-26 04:48:44 +08:00
|
|
|
self->checkNeededVersion.trigger();
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (req.debugID.present())
|
2017-05-26 04:48:44 +08:00
|
|
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.After");
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
2017-05-26 04:48:44 +08:00
|
|
|
TEST(true); // Duplicate resolve batch request
|
|
|
|
//TraceEvent("DupResolveBatchReq", self->dbgid).detail("From", proxyAddress);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto proxyInfoItr = self->proxyInfoMap.find(proxyAddress);
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (proxyInfoItr != self->proxyInfoMap.end()) {
|
2017-05-26 04:48:44 +08:00
|
|
|
auto batchItr = proxyInfoItr->second.outstandingBatches.find(req.version);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (batchItr != proxyInfoItr->second.outstandingBatches.end()) {
|
2017-05-26 04:48:44 +08:00
|
|
|
req.reply.send(batchItr->second);
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
2017-05-26 04:48:44 +08:00
|
|
|
TEST(true); // No outstanding batches for version on proxy
|
|
|
|
req.reply.send(Never());
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
|
|
|
ASSERT_WE_THINK(false); // The first non-duplicate request with this proxyAddress, including this one, should
|
|
|
|
// have inserted this item in the map!
|
|
|
|
// TEST(true); // No prior proxy requests
|
2017-05-26 04:48:44 +08:00
|
|
|
req.reply.send(Never());
|
|
|
|
}
|
|
|
|
|
2020-03-06 02:49:21 +08:00
|
|
|
++self->resolveBatchOut;
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2021-09-30 05:09:21 +08:00
|
|
|
namespace {
|
|
|
|
|
2021-09-30 07:37:01 +08:00
|
|
|
// TODO: refactor with the one in CommitProxyServer.actor.cpp
|
2021-09-30 05:09:21 +08:00
|
|
|
struct TransactionStateResolveContext {
|
|
|
|
// Maximum sequence for txnStateRequest, this is defined when the request last flag is set.
|
|
|
|
Sequence maxSequence = std::numeric_limits<Sequence>::max();
|
|
|
|
|
|
|
|
// Flags marks received transaction state requests, we only process the transaction request when *all* requests are
|
|
|
|
// received.
|
|
|
|
std::unordered_set<Sequence> receivedSequences;
|
|
|
|
|
|
|
|
Reference<Resolver> pResolverData;
|
|
|
|
|
|
|
|
// Pointer to transaction state store, shortcut for commitData.txnStateStore
|
|
|
|
IKeyValueStore* pTxnStateStore = nullptr;
|
|
|
|
|
|
|
|
// Actor streams
|
|
|
|
PromiseStream<Future<Void>>* pActors = nullptr;
|
|
|
|
|
|
|
|
// Flag reports if the transaction state request is complete. This request should only happen during recover, i.e.
|
2021-09-30 07:37:01 +08:00
|
|
|
// once per Resolver.
|
2021-09-30 05:09:21 +08:00
|
|
|
bool processed = false;
|
|
|
|
|
|
|
|
TransactionStateResolveContext() = default;
|
|
|
|
|
|
|
|
TransactionStateResolveContext(Reference<Resolver> pResolverData_, PromiseStream<Future<Void>>* pActors_)
|
|
|
|
: pResolverData(pResolverData_), pTxnStateStore(pResolverData_->txnStateStore), pActors(pActors_) {
|
|
|
|
ASSERT(pTxnStateStore != nullptr);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext) {
|
|
|
|
state KeyRange txnKeys = allKeys;
|
|
|
|
state std::map<Tag, UID> tag_uid;
|
|
|
|
|
|
|
|
RangeResult UIDtoTagMap = pContext->pTxnStateStore->readRange(serverTagKeys).get();
|
|
|
|
for (const KeyValueRef& kv : UIDtoTagMap) {
|
|
|
|
tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key);
|
|
|
|
}
|
|
|
|
|
|
|
|
loop {
|
|
|
|
wait(yield());
|
|
|
|
|
|
|
|
RangeResult data =
|
|
|
|
pContext->pTxnStateStore
|
|
|
|
->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES)
|
|
|
|
.get();
|
|
|
|
if (!data.size())
|
|
|
|
break;
|
|
|
|
|
|
|
|
((KeyRangeRef&)txnKeys) = KeyRangeRef(keyAfter(data.back().key, txnKeys.arena()), txnKeys.end);
|
|
|
|
|
|
|
|
MutationsVec mutations;
|
|
|
|
std::vector<std::pair<MapPair<Key, ServerCacheInfo>, int>> keyInfoData;
|
|
|
|
std::vector<UID> src, dest;
|
|
|
|
ServerCacheInfo info;
|
|
|
|
// NOTE: An ACTOR will be compiled into several classes, the this pointer is from one of them.
|
|
|
|
auto updateTagInfo = [this](const std::vector<UID>& uids,
|
|
|
|
std::vector<Tag>& tags,
|
|
|
|
std::vector<Reference<StorageInfo>>& storageInfoItems) {
|
|
|
|
for (const auto& id : uids) {
|
|
|
|
auto storageInfo = getStorageInfo(id, &pContext->pResolverData->storageCache, pContext->pTxnStateStore);
|
|
|
|
ASSERT(storageInfo->tag != invalidTag);
|
|
|
|
tags.push_back(storageInfo->tag);
|
|
|
|
storageInfoItems.push_back(storageInfo);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
for (auto& kv : data) {
|
|
|
|
if (!kv.key.startsWith(keyServersPrefix)) {
|
|
|
|
mutations.emplace_back(mutations.arena(), MutationRef::SetValue, kv.key, kv.value);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyRef k = kv.key.removePrefix(keyServersPrefix);
|
|
|
|
if (k == allKeys.end) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
decodeKeyServersValue(tag_uid, kv.value, src, dest);
|
|
|
|
|
|
|
|
info.tags.clear();
|
|
|
|
|
|
|
|
info.src_info.clear();
|
|
|
|
updateTagInfo(src, info.tags, info.src_info);
|
|
|
|
|
|
|
|
info.dest_info.clear();
|
|
|
|
updateTagInfo(dest, info.tags, info.dest_info);
|
|
|
|
|
|
|
|
uniquify(info.tags);
|
|
|
|
keyInfoData.emplace_back(MapPair<Key, ServerCacheInfo>(k, info), 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
// insert keyTag data separately from metadata mutations so that we can do one bulk insert which
|
|
|
|
// avoids a lot of map lookups.
|
|
|
|
pContext->pResolverData->keyInfo.rawInsert(keyInfoData);
|
|
|
|
|
2021-09-30 07:37:01 +08:00
|
|
|
ResolverData resolverData(
|
|
|
|
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo);
|
2021-09-30 05:09:21 +08:00
|
|
|
|
2021-09-30 07:37:01 +08:00
|
|
|
applyMetadataMutations(SpanID(), resolverData, mutations);
|
2021-09-30 05:09:21 +08:00
|
|
|
} // loop
|
|
|
|
|
|
|
|
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
2021-10-01 04:13:48 +08:00
|
|
|
// pContext->pCommitData->locked = lockedKey.present() && lockedKey.get().size();
|
|
|
|
// pContext->pCommitData->metadataVersion = pContext->pTxnStateStore->readValue(metadataVersionKey).get();
|
2021-09-30 05:09:21 +08:00
|
|
|
|
2021-10-01 02:59:40 +08:00
|
|
|
pContext->pTxnStateStore->enableSnapshot();
|
2021-09-30 05:09:21 +08:00
|
|
|
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveContext* pContext,
|
|
|
|
TxnStateRequest request) {
|
|
|
|
state const TxnStateRequest& req = request;
|
|
|
|
state Resolver& resolverData = *pContext->pResolverData;
|
|
|
|
state PromiseStream<Future<Void>>& addActor = *pContext->pActors;
|
|
|
|
state Sequence& maxSequence = pContext->maxSequence;
|
|
|
|
state ReplyPromise<Void> reply = req.reply;
|
|
|
|
state std::unordered_set<Sequence>& txnSequences = pContext->receivedSequences;
|
|
|
|
|
|
|
|
ASSERT(pContext->pResolverData.getPtr() != nullptr);
|
|
|
|
ASSERT(pContext->pActors != nullptr);
|
|
|
|
|
|
|
|
if (pContext->receivedSequences.count(request.sequence)) {
|
|
|
|
// This part is already received. Still we will re-broadcast it to other CommitProxies & Resolvers
|
|
|
|
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
|
|
|
|
wait(yield());
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (request.last) {
|
|
|
|
// This is the last piece of subsequence, yet other pieces might still on the way.
|
|
|
|
pContext->maxSequence = request.sequence + 1;
|
|
|
|
}
|
|
|
|
pContext->receivedSequences.insert(request.sequence);
|
|
|
|
|
2021-10-01 04:13:48 +08:00
|
|
|
// ASSERT(!pContext->pResolverData->validState.isSet());
|
2021-09-30 05:09:21 +08:00
|
|
|
|
|
|
|
for (auto& kv : request.data) {
|
|
|
|
pContext->pTxnStateStore->set(kv, &request.arena);
|
|
|
|
}
|
|
|
|
pContext->pTxnStateStore->commit(true);
|
|
|
|
|
|
|
|
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
|
|
|
// Received all components of the txnStateRequest
|
|
|
|
ASSERT(!pContext->processed);
|
|
|
|
wait(processCompleteTransactionStateRequest(pContext));
|
|
|
|
pContext->processed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
|
|
|
|
wait(yield());
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
} // anonymous namespace
|
|
|
|
|
|
|
|
ACTOR Future<Void> resolverCore(ResolverInterface resolver,
|
|
|
|
InitializeResolverRequest initReq,
|
|
|
|
Reference<AsyncVar<ServerDBInfo> const> db) {
|
2020-09-11 08:44:15 +08:00
|
|
|
state Reference<Resolver> self(new Resolver(resolver.id(), initReq.commitProxyCount, initReq.resolverCount));
|
2017-05-26 04:48:44 +08:00
|
|
|
state ActorCollection actors(false);
|
|
|
|
state Future<Void> doPollMetrics = self->resolverCount > 1 ? Void() : Future<Void>(Never());
|
2021-03-11 02:06:03 +08:00
|
|
|
actors.add(waitFailureServer(resolver.waitFailure.getFuture()));
|
|
|
|
actors.add(traceRole(Role::RESOLVER, resolver.id()));
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
TraceEvent("ResolverInit", resolver.id()).detail("RecoveryCount", initReq.recoveryCount);
|
2021-09-30 05:09:21 +08:00
|
|
|
|
|
|
|
// Wait until we can load the "real" logsystem, since we don't support switching them currently
|
|
|
|
while (!(db->get().master.id() == initReq.masterId &&
|
|
|
|
db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
|
|
|
|
//TraceEvent("ResolverInit2", resolver.id()).detail("LSEpoch", db->get().logSystemConfig.epoch);
|
|
|
|
wait(db->onChange());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize txnStateStore
|
|
|
|
state PromiseStream<Future<Void>> addActor;
|
2021-09-30 11:32:51 +08:00
|
|
|
state Future<Void> onError =
|
|
|
|
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_resolver_failed());
|
2021-09-30 05:09:21 +08:00
|
|
|
self->logSystem = ILogSystem::fromServerDBInfo(resolver.id(), db->get(), false, addActor);
|
2021-10-01 04:13:48 +08:00
|
|
|
self->logAdapter = new LogSystemDiskQueueAdapter(self->logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
|
2021-09-30 05:09:21 +08:00
|
|
|
self->txnStateStore = keyValueStoreLogSystem(self->logAdapter, resolver.id(), 2e9, true, true, true);
|
|
|
|
|
|
|
|
// wait for txnStateStore recovery
|
|
|
|
wait(success(self->txnStateStore->readValue(StringRef())));
|
|
|
|
|
|
|
|
// This has to be declared after the self->txnStateStore get initialized
|
|
|
|
state TransactionStateResolveContext transactionStateResolveContext(self, &addActor);
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
loop choose {
|
2021-03-11 02:06:03 +08:00
|
|
|
when(ResolveTransactionBatchRequest batch = waitNext(resolver.resolve.getFuture())) {
|
|
|
|
actors.add(resolveBatch(self, batch));
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
when(ResolutionMetricsRequest req = waitNext(resolver.metrics.getFuture())) {
|
2020-03-06 02:49:21 +08:00
|
|
|
++self->metricsRequests;
|
2017-05-26 04:48:44 +08:00
|
|
|
req.reply.send(self->iopsSample.getEstimate(allKeys));
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
when(ResolutionSplitRequest req = waitNext(resolver.split.getFuture())) {
|
2020-03-06 02:49:21 +08:00
|
|
|
++self->splitRequests;
|
2017-05-26 04:48:44 +08:00
|
|
|
ResolutionSplitReply rep;
|
|
|
|
rep.key = self->iopsSample.splitEstimate(req.range, req.offset, req.front);
|
2021-03-11 02:06:03 +08:00
|
|
|
rep.used = self->iopsSample.getEstimate(req.front ? KeyRangeRef(req.range.begin, rep.key)
|
|
|
|
: KeyRangeRef(rep.key, req.range.end));
|
2017-05-26 04:48:44 +08:00
|
|
|
req.reply.send(rep);
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
when(wait(actors.getResult())) {}
|
2021-09-30 11:32:51 +08:00
|
|
|
when(wait(onError)) {}
|
2021-03-11 02:06:03 +08:00
|
|
|
when(wait(doPollMetrics)) {
|
2017-05-26 04:48:44 +08:00
|
|
|
self->iopsSample.poll();
|
|
|
|
doPollMetrics = delay(SERVER_KNOBS->SAMPLE_POLL_TIME);
|
|
|
|
}
|
2021-09-30 05:09:21 +08:00
|
|
|
when(TxnStateRequest request = waitNext(resolver.txnState.getFuture())) {
|
|
|
|
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request));
|
|
|
|
}
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-12 12:26:47 +08:00
|
|
|
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
|
2021-03-11 02:06:03 +08:00
|
|
|
uint64_t recoveryCount,
|
|
|
|
ResolverInterface myInterface) {
|
2017-05-26 04:48:44 +08:00
|
|
|
loop {
|
2021-03-11 02:06:03 +08:00
|
|
|
if (db->get().recoveryCount >= recoveryCount &&
|
|
|
|
!std::count(db->get().resolvers.begin(), db->get().resolvers.end(), myInterface))
|
2017-05-26 04:48:44 +08:00
|
|
|
throw worker_removed();
|
2021-03-11 02:06:03 +08:00
|
|
|
wait(db->onChange());
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ACTOR Future<Void> resolver(ResolverInterface resolver,
|
|
|
|
InitializeResolverRequest initReq,
|
2021-07-12 12:26:47 +08:00
|
|
|
Reference<AsyncVar<ServerDBInfo> const> db) {
|
2017-05-26 04:48:44 +08:00
|
|
|
try {
|
2021-09-30 05:09:21 +08:00
|
|
|
state Future<Void> core = resolverCore(resolver, initReq, db);
|
2017-05-26 04:48:44 +08:00
|
|
|
loop choose {
|
2021-03-11 02:06:03 +08:00
|
|
|
when(wait(core)) { return Void(); }
|
|
|
|
when(wait(checkRemoved(db, initReq.recoveryCount, resolver))) {}
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
} catch (Error& e) {
|
|
|
|
if (e.code() == error_code_actor_cancelled || e.code() == error_code_worker_removed) {
|
2021-03-11 02:06:03 +08:00
|
|
|
TraceEvent("ResolverTerminated", resolver.id()).error(e, true);
|
2017-05-26 04:48:44 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|