Separate \xff keyspace from keyResolvers
This is needed to make sure all Resolvers process all metadata mutations. For commit proxy, we add all read/write conflict ranges for metadata mutations to all resolvers. We also send lock_aware flag for each transaction to resolvers.
This commit is contained in:
parent
b47da4f2ea
commit
34e9932778
|
@ -184,9 +184,10 @@ struct CommitTransactionRef {
|
|||
report_conflicting_keys(from.report_conflicting_keys) {}
|
||||
VectorRef<KeyRangeRef> read_conflict_ranges;
|
||||
VectorRef<KeyRangeRef> write_conflict_ranges;
|
||||
VectorRef<MutationRef> mutations;
|
||||
VectorRef<MutationRef> mutations; // metadata mutations
|
||||
Version read_snapshot;
|
||||
bool report_conflicting_keys;
|
||||
bool lock_aware; // set when metadata mutations are present
|
||||
SpanID spanContext;
|
||||
|
||||
template <class Ar>
|
||||
|
@ -198,6 +199,7 @@ struct CommitTransactionRef {
|
|||
mutations,
|
||||
read_snapshot,
|
||||
report_conflicting_keys,
|
||||
lock_aware,
|
||||
spanContext);
|
||||
} else {
|
||||
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
|
||||
|
@ -205,7 +207,7 @@ struct CommitTransactionRef {
|
|||
serializer(ar, report_conflicting_keys);
|
||||
}
|
||||
if (ar.protocolVersion().hasSpanContext()) {
|
||||
serializer(ar, spanContext);
|
||||
serializer(ar, lock_aware, spanContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,9 @@
|
|||
#include <algorithm>
|
||||
#include <tuple>
|
||||
|
||||
#include <fdbclient/DatabaseContext.h>
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
|
@ -87,14 +88,19 @@ ACTOR void discardCommit(UID id, Future<LogSystemDiskQueueAdapter::CommitMessage
|
|||
}
|
||||
|
||||
struct ResolutionRequestBuilder {
|
||||
ProxyCommitData* self;
|
||||
const ProxyCommitData* self;
|
||||
|
||||
// One request per resolver.
|
||||
std::vector<ResolveTransactionBatchRequest> requests;
|
||||
|
||||
// Txn i to resolvers that have i'th data sent
|
||||
std::vector<std::vector<int>> transactionResolverMap;
|
||||
std::vector<CommitTransactionRef*> outTr;
|
||||
std::vector<std::vector<std::vector<int>>>
|
||||
txReadConflictRangeIndexMap; // Used to report conflicting keys, the format is
|
||||
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
|
||||
// -> read_conflict_range's original index in the commitTransactionRef
|
||||
|
||||
// Used to report conflicting keys, the format is
|
||||
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
|
||||
// -> read_conflict_range's original index in the commitTransactionRef
|
||||
std::vector<std::vector<std::vector<int>>> txReadConflictRangeIndexMap;
|
||||
|
||||
ResolutionRequestBuilder(ProxyCommitData* self,
|
||||
Version version,
|
||||
|
@ -121,39 +127,10 @@ struct ResolutionRequestBuilder {
|
|||
return *out;
|
||||
}
|
||||
|
||||
void addTransaction(CommitTransactionRequest& trRequest, int transactionNumberInBatch) {
|
||||
auto& trIn = trRequest.transaction;
|
||||
// SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here
|
||||
outTr.assign(requests.size(), nullptr);
|
||||
ASSERT(transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768);
|
||||
|
||||
bool isTXNStateTransaction = false;
|
||||
for (auto& m : trIn.mutations) {
|
||||
if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
transformVersionstampMutation(m, &MutationRef::param1, requests[0].version, transactionNumberInBatch);
|
||||
trIn.write_conflict_ranges.push_back(requests[0].arena, singleKeyRange(m.param1, requests[0].arena));
|
||||
} else if (m.type == MutationRef::SetVersionstampedValue) {
|
||||
transformVersionstampMutation(m, &MutationRef::param2, requests[0].version, transactionNumberInBatch);
|
||||
}
|
||||
if (isMetadataMutation(m)) {
|
||||
isTXNStateTransaction = true;
|
||||
getOutTransaction(0, trIn.read_snapshot).mutations.push_back(requests[0].arena, m);
|
||||
for (int r = 1; r < requests.size() && SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS; r++) {
|
||||
// Keep metadata in sync on all resolvers
|
||||
getOutTransaction(r, trIn.read_snapshot).mutations.push_back(requests[r].arena, m);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isTXNStateTransaction && !trRequest.isLockAware()) {
|
||||
// This mitigates https://github.com/apple/foundationdb/issues/3647. Since this transaction is not lock
|
||||
// aware, if this transaction got a read version then \xff/dbLocked must not have been set at this
|
||||
// transaction's read snapshot. If that changes by commit time, then it won't commit on any proxy because of
|
||||
// a conflict. A client could set a read version manually so this isn't totally bulletproof.
|
||||
trIn.read_conflict_ranges.push_back(trRequest.arena, KeyRangeRef(databaseLockedKey, databaseLockedKeyEnd));
|
||||
}
|
||||
std::vector<std::vector<int>> rCRIndexMap(
|
||||
requests.size()); // [resolver_index][read_conflict_range_index_on_the_resolver]
|
||||
// -> read_conflict_range's original index
|
||||
// Returns a read conflict index map: [resolver_index][read_conflict_range_index_on_the_resolver]
|
||||
// -> read_conflict_range's original index
|
||||
std::vector<std::vector<int>> addReadConflictRanges(CommitTransactionRef& trIn) {
|
||||
std::vector<std::vector<int>> rCRIndexMap(requests.size());
|
||||
for (int idx = 0; idx < trIn.read_conflict_ranges.size(); ++idx) {
|
||||
const auto& r = trIn.read_conflict_ranges[idx];
|
||||
auto ranges = self->keyResolvers.intersectingRanges(r);
|
||||
|
@ -166,6 +143,11 @@ struct ResolutionRequestBuilder {
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && systemKeys.intersects(r)) {
|
||||
for (int k = 0; k < self->resolvers.size(); k++) {
|
||||
resolvers.insert(k);
|
||||
}
|
||||
}
|
||||
ASSERT(resolvers.size());
|
||||
for (int resolver : resolvers) {
|
||||
getOutTransaction(resolver, trIn.read_snapshot)
|
||||
|
@ -173,17 +155,73 @@ struct ResolutionRequestBuilder {
|
|||
rCRIndexMap[resolver].push_back(idx);
|
||||
}
|
||||
}
|
||||
txReadConflictRangeIndexMap.push_back(std::move(rCRIndexMap));
|
||||
return rCRIndexMap;
|
||||
}
|
||||
|
||||
void addWriteConflictRanges(CommitTransactionRef& trIn) {
|
||||
for (auto& r : trIn.write_conflict_ranges) {
|
||||
auto ranges = self->keyResolvers.intersectingRanges(r);
|
||||
std::set<int> resolvers;
|
||||
for (auto& ir : ranges)
|
||||
resolvers.insert(ir.value().back().second);
|
||||
for (auto& ir : ranges) {
|
||||
auto& version_resolver = ir.value();
|
||||
if (!version_resolver.empty()) {
|
||||
resolvers.insert(version_resolver.back().second);
|
||||
}
|
||||
}
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && systemKeys.intersects(r)) {
|
||||
for (int k = 0; k < self->resolvers.size(); k++) {
|
||||
resolvers.insert(k);
|
||||
}
|
||||
}
|
||||
ASSERT(resolvers.size());
|
||||
for (int resolver : resolvers)
|
||||
getOutTransaction(resolver, trIn.read_snapshot)
|
||||
.write_conflict_ranges.push_back(requests[resolver].arena, r);
|
||||
}
|
||||
}
|
||||
|
||||
void addTransaction(CommitTransactionRequest& trRequest, Version ver, int transactionNumberInBatch) {
|
||||
auto& trIn = trRequest.transaction;
|
||||
// SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here
|
||||
outTr.assign(requests.size(), nullptr);
|
||||
ASSERT(transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768);
|
||||
|
||||
bool isTXNStateTransaction = false;
|
||||
for (auto& m : trIn.mutations) {
|
||||
DEBUG_MUTATION("AddTr", ver, m, self->dbgid).detail("Idx", transactionNumberInBatch);
|
||||
if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
transformVersionstampMutation(m, &MutationRef::param1, requests[0].version, transactionNumberInBatch);
|
||||
trIn.write_conflict_ranges.push_back(requests[0].arena, singleKeyRange(m.param1, requests[0].arena));
|
||||
} else if (m.type == MutationRef::SetVersionstampedValue) {
|
||||
transformVersionstampMutation(m, &MutationRef::param2, requests[0].version, transactionNumberInBatch);
|
||||
}
|
||||
if (isMetadataMutation(m)) {
|
||||
isTXNStateTransaction = true;
|
||||
auto& tr = getOutTransaction(0, trIn.read_snapshot);
|
||||
tr.mutations.push_back(requests[0].arena, m);
|
||||
tr.lock_aware = trRequest.isLockAware();
|
||||
|
||||
for (int r = 1; r < self->resolvers.size() && SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS; r++) {
|
||||
// Keep metadata in sync on all resolvers
|
||||
auto& outTransaction = getOutTransaction(r, trIn.read_snapshot);
|
||||
outTransaction.mutations.push_back(requests[r].arena, m);
|
||||
outTransaction.lock_aware = trRequest.isLockAware();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isTXNStateTransaction && !trRequest.isLockAware()) {
|
||||
// This mitigates https://github.com/apple/foundationdb/issues/3647. Since this transaction is not lock
|
||||
// aware, if this transaction got a read version then \xff/dbLocked must not have been set at this
|
||||
// transaction's read snapshot. If that changes by commit time, then it won't commit on any proxy because of
|
||||
// a conflict. A client could set a read version manually so this isn't totally bulletproof.
|
||||
trIn.read_conflict_ranges.push_back(trRequest.arena, KeyRangeRef(databaseLockedKey, databaseLockedKeyEnd));
|
||||
}
|
||||
|
||||
std::vector<std::vector<int>> rCRIndexMap = addReadConflictRanges(trIn);
|
||||
txReadConflictRangeIndexMap.push_back(std::move(rCRIndexMap));
|
||||
|
||||
addWriteConflictRanges(trIn);
|
||||
|
||||
if (isTXNStateTransaction) {
|
||||
for (int r = 0; r < requests.size(); r++) {
|
||||
int transactionNumberInRequest =
|
||||
|
@ -752,7 +790,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
int conflictRangeCount = 0;
|
||||
self->maxTransactionBytes = 0;
|
||||
for (int t = 0; t < trs.size(); t++) {
|
||||
requests.addTransaction(trs[t], t);
|
||||
requests.addTransaction(trs[t], self->commitVersion, t);
|
||||
conflictRangeCount +=
|
||||
trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size();
|
||||
//TraceEvent("MPTransactionDump", self->dbgid).detail("Snapshot", trs[t].transaction.read_snapshot);
|
||||
|
@ -945,15 +983,18 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
|||
self->forceRecovery = false;
|
||||
}
|
||||
}
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
ResolveTransactionBatchReply& reply = self->resolution[0];
|
||||
self->toCommit.setMutations(reply.privateMutationCount, reply.privateMutations);
|
||||
}
|
||||
if (self->forceRecovery) {
|
||||
for (; t < trs.size(); t++)
|
||||
self->committed[t] = ConflictBatch::TransactionConflict;
|
||||
TraceEvent(SevWarn, "RestartingTxnSubsystem", pProxyCommitData->dbgid).detail("Stage", "AwaitCommit");
|
||||
}
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && !self->forceRecovery) {
|
||||
// TODO: forceRecovery set some transaction to conflict
|
||||
|
||||
ResolveTransactionBatchReply& reply = self->resolution[0];
|
||||
// ASSERT_WE_THINK(privateMutations.size() == reply.privateMutations.size());
|
||||
self->toCommit.setMutations(reply.privateMutationCount, reply.privateMutations);
|
||||
}
|
||||
|
||||
self->lockedKey = pProxyCommitData->txnStateStore->readValue(databaseLockedKey).get();
|
||||
self->lockedAfter = self->lockedKey.present() && self->lockedKey.get().size();
|
||||
|
@ -1531,7 +1572,18 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
if (r->value().size() && r->value().front().first < oldestVersion)
|
||||
r->value().front().first = 0;
|
||||
}
|
||||
pProxyCommitData->keyResolvers.coalesce(allKeys);
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
pProxyCommitData->keyResolvers.coalesce(normalKeys);
|
||||
auto& versions = pProxyCommitData->systemKeyVersions;
|
||||
while (versions.size() > 1 && versions[1] < oldestVersion) {
|
||||
versions.pop_front();
|
||||
}
|
||||
if (!versions.empty() && versions[0] < oldestVersion) {
|
||||
versions[0] = 0;
|
||||
}
|
||||
} else {
|
||||
pProxyCommitData->keyResolvers.coalesce(allKeys);
|
||||
}
|
||||
if (pProxyCommitData->keyResolvers.size() != lastSize)
|
||||
TraceEvent("KeyResolverSize", pProxyCommitData->dbgid)
|
||||
.detail("Size", pProxyCommitData->keyResolvers.size());
|
||||
|
@ -2205,9 +2257,12 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
"ToResolver_" + commitData.resolvers[i].id().toString(),
|
||||
Histogram::Unit::microseconds));
|
||||
}
|
||||
auto rs = commitData.keyResolvers.modify(allKeys);
|
||||
|
||||
// Initialize keyResolvers map
|
||||
auto rs = commitData.keyResolvers.modify(SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? normalKeys : allKeys);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r)
|
||||
r->value().emplace_back(0, 0);
|
||||
commitData.systemKeyVersions.push_back(0);
|
||||
|
||||
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
|
||||
commitData.logAdapter =
|
||||
|
|
|
@ -176,6 +176,9 @@ struct ProxyCommitData {
|
|||
uint64_t commitVersionRequestNumber;
|
||||
uint64_t mostRecentProcessedRequestNumber;
|
||||
KeyRangeMap<Deque<std::pair<Version, int>>> keyResolvers;
|
||||
// When all resolvers process system keys (for private mutations), the "keyResolvers"
|
||||
// only tracks normalKeys. This is used for tracking versions for systemKeys.
|
||||
Deque<Version> systemKeyVersions;
|
||||
KeyRangeMap<ServerCacheInfo> keyInfo; // keyrange -> all storage servers in all DCs for the keyrange
|
||||
KeyRangeMap<bool> cacheInfo;
|
||||
std::map<Key, ApplyMutationsData> uid_applyMutationsData;
|
||||
|
|
|
@ -246,6 +246,11 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
|
|||
req.version + 1,
|
||||
&self->storageCache,
|
||||
&self->tssMapping);
|
||||
bool isLocked = false;
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
auto lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
|
||||
isLocked = lockedKey.present() && lockedKey.get().size();
|
||||
}
|
||||
for (int t : req.txnStateTransactions) {
|
||||
stateMutations += req.transactions[t].mutations.size();
|
||||
stateBytes += req.transactions[t].mutations.expectedSize();
|
||||
|
@ -254,9 +259,12 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
|
|||
StateTransactionRef(reply.committed[t] == ConflictBatch::TransactionCommitted,
|
||||
req.transactions[t].mutations));
|
||||
|
||||
// for (const auto& m : req.transactions[t].mutations)
|
||||
// DEBUG_MUTATION("Resolver", req.version, m, self->dbgid);
|
||||
|
||||
// Generate private mutations for metadata mutations
|
||||
if (reply.committed[t] == ConflictBatch::TransactionCommitted &&
|
||||
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) {
|
||||
applyMetadataMutations(req.transactions[t].spanContext, resolverData, req.transactions[t].mutations);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue