Generate private mutations at Resolvers

This commit is contained in:
Jingyu Zhou 2021-09-29 22:10:10 -07:00 committed by Dan Lambright
parent f79ca8d7fb
commit fbc6f45190
3 changed files with 50 additions and 14 deletions

View File

@ -27,6 +27,7 @@
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "flow/Error.h"
Reference<StorageInfo> getStorageInfo(UID id,
std::map<UID, Reference<StorageInfo>>* storageCache,
@ -81,8 +82,9 @@ public:
ResolverData& resolverData_,
const VectorRef<MutationRef>& mutations_)
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
txnStateStore(resolverData_.txnStateStore), confChange(resolverData_.confChanges),
keyInfo(resolverData_.keyInfo), initialCommit(true) {}
txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem), popVersion(resolverData_.popVersion),
keyInfo(resolverData_.keyInfo), initialCommit(resolverData_.initialCommit), forResolver(true) {}
private:
// The following variables are incoming parameters
@ -120,6 +122,9 @@ private:
// true if the mutations were already written to the txnStateStore as part of recovery
bool initialCommit = false;
// true if called from Resolver
bool forResolver = false;
private:
// The following variables are used internally
@ -256,7 +261,7 @@ private:
void checkSetStorageCachePrefix(MutationRef m) {
if (!m.param1.startsWith(storageCachePrefix))
return;
if (cacheInfo) {
if (cacheInfo || forResolver) {
KeyRef k = m.param1.removePrefix(storageCachePrefix);
// Create a private mutation for storage servers
@ -267,7 +272,7 @@ private:
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
cachedRangeInfo[k] = privatized;
}
if (k != allKeys.end) {
if (cacheInfo && k != allKeys.end) {
KeyRef end = cacheInfo->rangeContaining(k).end();
std::vector<uint16_t> serverIndices;
decodeStorageCacheValue(m.param2, serverIndices);
@ -608,10 +613,7 @@ private:
}
void checkClearTagLocalityListKeys(KeyRangeRef range) {
if (!tagLocalityListKeys.intersects(range)) {
return;
}
if (initialCommit) {
if (!tagLocalityListKeys.intersects(range) || initialCommit) {
return;
}
txnStateStore->clear(range & tagLocalityListKeys);
@ -634,8 +636,12 @@ private:
.detail("PopVersion", popVersion)
.detail("Tag", tag.toString())
.detail("Server", decodeServerTagKey(kv.key));
logSystem->pop(popVersion, decodeServerTagValue(kv.value));
(*tag_popped)[tag] = popVersion;
if (!forResolver) {
logSystem->pop(popVersion, decodeServerTagValue(kv.value));
(*tag_popped)[tag] = popVersion;
}
ASSERT_WE_THINK(forResolver && tag_popped == nullptr);
ASSERT_WE_THINK(!forResolver && tag_popped != nullptr);
if (toCommit) {
MutationRef privatized = m;
@ -697,8 +703,12 @@ private:
.detail("PopVersion", popVersion)
.detail("Tag", tag.toString())
.detail("Version", decodeServerTagHistoryKey(kv.key));
logSystem->pop(popVersion, tag);
(*tag_popped)[tag] = popVersion;
if (!forResolver) {
logSystem->pop(popVersion, tag);
(*tag_popped)[tag] = popVersion;
}
ASSERT_WE_THINK(forResolver && tag_popped == nullptr);
ASSERT_WE_THINK(!forResolver && tag_popped != nullptr);
}
}
if (!initialCommit)

View File

@ -25,11 +25,13 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/MutationList.h"
#include "fdbclient/Notified.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/ProxyCommitData.actor.h"
#include "flow/FastRef.h"
// Resolver's data for applyMetadataMutations() calls.
struct ResolverData {
@ -37,10 +39,25 @@ struct ResolverData {
IKeyValueStore* txnStateStore = nullptr;
KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr;
Arena arena;
bool confChanges;
bool confChanges = false;
bool initialCommit = false;
Reference<ILogSystem> logSystem = Reference<ILogSystem>();
LogPushData* toCommit = nullptr;
Version popVersion = 0; // exclusive, usually set to commitVersion + 1
// For initial broadcast
ResolverData(UID debugId, IKeyValueStore* store, KeyRangeMap<ServerCacheInfo>* info)
: dbgid(debugId), txnStateStore(store), keyInfo(info) {}
: dbgid(debugId), txnStateStore(store), keyInfo(info), initialCommit(true) {}
// For transaction batches that contain metadata mutations
ResolverData(UID debugId,
Reference<ILogSystem> logSystem,
IKeyValueStore* store,
KeyRangeMap<ServerCacheInfo>* info,
LogPushData* toCommit,
Version popVersion)
: dbgid(debugId), txnStateStore(store), keyInfo(info), logSystem(logSystem), toCommit(toCommit),
popVersion(popVersion) {}
};
inline bool isMetadataMutation(MutationRef const& m) {

View File

@ -236,6 +236,9 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
auto& stateTransactions = self->recentStateTransactions[req.version];
int64_t stateMutations = 0;
int64_t stateBytes = 0;
LogPushData toCommit(self->logSystem); // For accumulating private mutations
ResolverData resolverData(
self->dbgid, self->logSystem, self->txnStateStore, &self->keyInfo, &toCommit, req.version + 1);
for (int t : req.txnStateTransactions) {
stateMutations += req.transactions[t].mutations.size();
stateBytes += req.transactions[t].mutations.expectedSize();
@ -243,7 +246,13 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
stateTransactions.arena(),
StateTransactionRef(reply.committed[t] == ConflictBatch::TransactionCommitted,
req.transactions[t].mutations));
// Generate private mutations for metadata mutations
if (reply.committed[t] == ConflictBatch::TransactionCommitted) {
applyMetadataMutations(SpanID(), resolverData, req.transactions[t].mutations);
}
}
// TODO: add private mutations & resolverData.confChanges to Reply messages
self->resolvedStateTransactions += req.txnStateTransactions.size();
self->resolvedStateMutations += stateMutations;