Add private mutations to Resolver reply messages

This commit is contained in:
Jingyu Zhou 2021-09-30 17:17:14 -07:00 committed by Dan Lambright
parent ad64ee9858
commit 9ecdbc39ee
5 changed files with 31 additions and 2 deletions

View File

@ -47,6 +47,7 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Trace.h"
@ -936,6 +937,13 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
self->forceRecovery = false;
}
}
auto privateMutations = self->toCommit.getAllMessages();
ResolveTransactionBatchReply& reply = self->resolution[0];
ASSERT_WE_THINK(privateMutations.size() == reply.privateMutations.size());
for (int i = 0; i < privateMutations.size(); i++) {
std::cout << i << "\n" << printable(privateMutations[i]) << "\n" << printable(reply.privateMutations[i]) << "\n\n";
ASSERT_WE_THINK(privateMutations[i] == reply.privateMutations[i]);
}
if (self->forceRecovery) {
for (; t < trs.size(); t++)
self->committed[t] = ConflictBatch::TransactionConflict;

View File

@ -308,6 +308,15 @@ void LogPushData::writeMessage(StringRef rawMessageWithoutLength, bool usePrevio
}
}
std::vector<Standalone<StringRef>> LogPushData::getAllMessages() {
std::vector<Standalone<StringRef>> results;
results.reserve(messagesWriter.size());
for (int loc = 0; loc < messagesWriter.size(); loc++) {
results.push_back(getMessages(loc));
}
return results;
}
void LogPushData::recordEmptyMessage(int loc, const Standalone<StringRef>& value) {
if (!isEmptyMessage[loc]) {
BinaryWriter w(AssumeVersion(g_network->protocolVersion()));

View File

@ -794,6 +794,9 @@ struct LogPushData : NonCopyable {
Standalone<StringRef> getMessages(int loc) { return messagesWriter[loc].toValue(); }
// Returns all locations' messages, including empty ones.
std::vector<Standalone<StringRef>> getAllMessages();
// Records if a tlog (specified by "loc") will receive an empty version batch message.
// "value" is the message returned by getMessages() call.
void recordEmptyMessage(int loc, const Standalone<StringRef>& value);

View File

@ -257,7 +257,13 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
applyMetadataMutations(SpanID(), resolverData, req.transactions[t].mutations);
}
}
// TODO: add private mutations & resolverData.confChanges to Reply messages
// Adds private mutation messages to the reply message.
auto privateMutations = toCommit.getAllMessages();
for (const auto& mutations : privateMutations) {
reply.privateMutations.push_back(reply.arena, mutations);
reply.arena.dependsOn(mutations.arena());
}
self->resolvedStateTransactions += req.txnStateTransactions.size();
self->resolvedStateMutations += stateMutations;

View File

@ -90,9 +90,12 @@ struct ResolveTransactionBatchReply {
std::map<int, VectorRef<int>>
conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver
// Privatized mutations with tags, one for each TLog location
VectorRef<StringRef> privateMutations;
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, arena);
serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, privateMutations, arena);
}
};