Add original changes

This commit is contained in:
Lukas Joswiak 2020-08-27 16:16:05 -07:00
parent 1ca7fe1a05
commit 2a58e775d2
6 changed files with 31 additions and 22 deletions

View File

@ -39,10 +39,10 @@ inline bool isMetadataMutation(MutationRef const& m) {
Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInfo>>* storageCache, IKeyValueStore* txnStateStore);
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* pToCommit, bool& confChange,
Version popVersion, bool initialCommit);
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore);
void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena,
Reference<ILogSystem> logSystem, const VectorRef<MutationRef>& mutations,
LogPushData* pToCommit, bool& confChange, Version popVersion, bool initialCommit);
void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena,
const VectorRef<MutationRef>& mutations, IKeyValueStore* txnStateStore);
#endif

View File

@ -679,7 +679,7 @@ struct ILogSystem {
// Never returns normally, but throws an error if the subsystem stops working
//Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, Optional<UID> debugID = Optional<UID>() ) = 0;
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, SpanID const& spanContext, Optional<UID> debugID = Optional<UID>() ) = 0;
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier)
// Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N)
// Changes the version number of the bundle to be version (unblocking the next push)

View File

@ -296,6 +296,8 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
toCommit->addTransactionInfo(SpanID());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation)
{
@ -357,7 +359,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
auto& tags = self->tagsForKey(backupMutation.param1);
toCommit->addTags(tags);
toCommit->addTypedMessage(backupMutation);
toCommit->writeTypedMessage(backupMutation);
// if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) {
// TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString())
@ -396,7 +398,7 @@ struct CommitBatchContext {
int batchOperations = 0;
Span span = Span("MP:commitBatch"_loc);
Span span;
int64_t batchBytes = 0;
@ -476,7 +478,9 @@ CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_,
localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem),
committed(trs.size()) {
committed(trs.size()),
span("MP:commitBatch"_loc) {
evaluateBatchSize();
@ -671,7 +675,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
for (int resolver = 0; resolver < self->resolution.size(); resolver++)
committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
if (committed) {
applyMetadataMutations(*self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem,
applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem,
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
/* pToCommit= */ nullptr, self->forceRecovery,
/* popVersion= */ 0, /* initialCommit */ false);
@ -754,7 +758,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
for (t = 0; t < trs.size() && !self->forceRecovery; t++) {
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) {
self->commitCount++;
applyMetadataMutations(*pProxyCommitData, self->arena, pProxyCommitData->logSystem,
applyMetadataMutations(trs[t].spanContext, *pProxyCommitData, self->arena, pProxyCommitData->logSystem,
trs[t].transaction.mutations, &self->toCommit, self->forceRecovery,
self->commitVersion + 1, /* initialCommit= */ false);
}
@ -803,6 +807,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
for (; mutationNum < pMutations->size(); mutationNum++) {
if(self->yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
self->yieldBytes = 0;
@ -857,7 +864,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if(pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.addTypedMessage(m);
self->toCommit.writeTypedMessage(m);
}
else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
@ -908,7 +915,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if(pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.addTypedMessage(m);
self->toCommit.writeTypedMessage(m);
} else {
UNREACHABLE();
}
@ -1049,7 +1056,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
if(firstMessage) {
self->toCommit.addTxsTag();
}
self->toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage);
self->toCommit.writeMessage(StringRef(m.begin(), m.size()), !firstMessage);
firstMessage = false;
}
@ -1064,7 +1071,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
self->commitStartTime = now();
pProxyCommitData->lastStartCommit = self->commitStartTime;
self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->debugID );
self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->span.context, self->debugID );
if (!self->forceRecovery) {
ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber-1);
@ -1806,7 +1813,7 @@ ACTOR Future<Void> masterProxyServerCore(
Arena arena;
bool confChanges;
applyMetadataMutations(commitData, arena, Reference<ILogSystem>(), mutations,
applyMetadataMutations(SpanID(), commitData, arena, Reference<ILogSystem>(), mutations,
/* pToCommit= */ nullptr, confChanges,
/* popVersion= */ 0, /* initialCommit= */ true);
}

View File

@ -240,6 +240,7 @@ struct TLogCommitReply {
struct TLogCommitRequest {
constexpr static FileIdentifier file_identifier = 4022206;
SpanID spanContext;
Arena arena;
Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
@ -249,11 +250,11 @@ struct TLogCommitRequest {
Optional<UID> debugID;
TLogCommitRequest() {}
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
TLogCommitRequest( const SpanID& context, const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: spanContext(context), arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID);
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, spanContext, debugID);
}
};

View File

@ -527,7 +527,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Future<Version> push(Version prevVersion, Version version, Version knownCommittedVersion,
Version minKnownCommittedVersion, LogPushData& data, Optional<UID> debugID) final {
Version minKnownCommittedVersion, LogPushData& data,
SpanID const& spanContext, Optional<UID> debugID) final {
// FIXME: Randomize request order as in LegacyLogSystem?
vector<Future<Void>> quorumResults;
vector<Future<TLogCommitReply>> allReplies;
@ -542,7 +543,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) );
allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( spanContext, msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -1613,7 +1613,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
}
}
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
applyMetadataMutations(SpanID(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
self->txnStateStore);
mmApplied = tr.mutations.size();