Rename CommitBatch::Context to CommitBatch::CommitBatchContext
This commit is contained in:
parent
d8bb36c4c8
commit
b9f41f3975
|
@ -595,7 +595,7 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
|
|||
|
||||
namespace CommitBatch {
|
||||
|
||||
struct Context {
|
||||
struct CommitBatchContext {
|
||||
using StoreCommit_t = std::vector<std::pair<Future<LogSystemDiskQueueAdapter::CommitMessage>, Future<Void>>>;
|
||||
|
||||
ProxyCommitData* const pProxyCommitData;
|
||||
|
@ -673,10 +673,7 @@ struct Context {
|
|||
|
||||
double commitStartTime;
|
||||
|
||||
Context(
|
||||
ProxyCommitData*,
|
||||
const std::vector<CommitTransactionRequest>*,
|
||||
const int);
|
||||
CommitBatchContext(ProxyCommitData*, const std::vector<CommitTransactionRequest>*, const int);
|
||||
|
||||
void setupTraceBatch();
|
||||
|
||||
|
@ -684,21 +681,19 @@ private:
|
|||
void evaluateBatchSize();
|
||||
};
|
||||
|
||||
Context::Context(
|
||||
ProxyCommitData* const pProxyCommitData_,
|
||||
const std::vector<CommitTransactionRequest>* trs_,
|
||||
const int currentBatchMemBytesCount) :
|
||||
CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_,
|
||||
const std::vector<CommitTransactionRequest>* trs_,
|
||||
const int currentBatchMemBytesCount)
|
||||
:
|
||||
|
||||
pProxyCommitData(pProxyCommitData_),
|
||||
trs(std::move(*const_cast<std::vector<CommitTransactionRequest>*>(trs_))),
|
||||
currentBatchMemBytesCount(currentBatchMemBytesCount),
|
||||
pProxyCommitData(pProxyCommitData_), trs(std::move(*const_cast<std::vector<CommitTransactionRequest>*>(trs_))),
|
||||
currentBatchMemBytesCount(currentBatchMemBytesCount),
|
||||
|
||||
startTime(g_network->now()),
|
||||
startTime(g_network->now()),
|
||||
|
||||
localBatchNumber(++pProxyCommitData->localCommitBatchesStarted),
|
||||
toCommit(pProxyCommitData->logSystem),
|
||||
|
||||
committed(trs.size()) {
|
||||
localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem),
|
||||
|
||||
committed(trs.size()) {
|
||||
|
||||
evaluateBatchSize();
|
||||
|
||||
|
@ -717,7 +712,7 @@ Context::Context(
|
|||
ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);
|
||||
}
|
||||
|
||||
void Context::setupTraceBatch() {
|
||||
void CommitBatchContext::setupTraceBatch() {
|
||||
for (const auto& tr : trs) {
|
||||
if (tr.debugID.present()) {
|
||||
if (!debugID.present()) {
|
||||
|
@ -742,7 +737,7 @@ void Context::setupTraceBatch() {
|
|||
}
|
||||
}
|
||||
|
||||
void Context::evaluateBatchSize() {
|
||||
void CommitBatchContext::evaluateBatchSize() {
|
||||
for (const auto& tr : trs) {
|
||||
const auto& mutations = tr.transaction.mutations;
|
||||
batchOperations += mutations.size();
|
||||
|
@ -750,7 +745,7 @@ void Context::evaluateBatchSize() {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> preresolutionProcessing(Context* self) {
|
||||
ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
|
||||
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state std::vector<CommitTransactionRequest>& trs = self->trs;
|
||||
|
@ -808,7 +803,7 @@ ACTOR Future<Void> preresolutionProcessing(Context* self) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getResolution(Context* self) {
|
||||
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
||||
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
|
||||
// resolution processing but is still using CPU
|
||||
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
|
||||
|
@ -882,7 +877,7 @@ void assertResolutionStateMutationsSizeConsistent(
|
|||
}
|
||||
|
||||
// Compute and apply "metadata" effects of each other proxy's most recent batch
|
||||
void applyMetadataEffect(Context* self) {
|
||||
void applyMetadataEffect(CommitBatchContext* self) {
|
||||
bool initialState = self->isMyFirstBatch;
|
||||
self->firstStateMutations = self->isMyFirstBatch;
|
||||
for (int versionIndex = 0; versionIndex < self->resolution[0].stateMutations.size(); versionIndex++) {
|
||||
|
@ -925,7 +920,7 @@ void applyMetadataEffect(Context* self) {
|
|||
}
|
||||
|
||||
/// Determine which transactions actually committed (conservatively) by combining results from the resolvers
|
||||
void determineCommittedTransactions(Context* self) {
|
||||
void determineCommittedTransactions(CommitBatchContext* self) {
|
||||
auto pProxyCommitData = self->pProxyCommitData;
|
||||
const auto& trs = self->trs;
|
||||
|
||||
|
@ -968,7 +963,7 @@ void determineCommittedTransactions(Context* self) {
|
|||
}
|
||||
|
||||
// This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore, changes to storage servers' responsibilities)
|
||||
ACTOR Future<Void> applyMetadataToCommittedTransactions(Context* self) {
|
||||
ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self) {
|
||||
auto pProxyCommitData = self->pProxyCommitData;
|
||||
const auto& trs = self->trs;
|
||||
|
||||
|
@ -1012,7 +1007,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(Context* self) {
|
|||
}
|
||||
|
||||
/// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' tags
|
||||
ACTOR Future<Void> assignMutationsToStorageServers(Context* self) {
|
||||
ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state std::vector<CommitTransactionRequest>& trs = self->trs;
|
||||
|
||||
|
@ -1131,7 +1126,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(Context* self) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> postResolution(Context* self) {
|
||||
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state std::vector<CommitTransactionRequest>& trs = self->trs;
|
||||
state const int64_t localBatchNumber = self->localBatchNumber;
|
||||
|
@ -1258,7 +1253,7 @@ ACTOR Future<Void> postResolution(Context* self) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> transactionLogging(Context* self) {
|
||||
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
|
||||
try {
|
||||
|
@ -1293,7 +1288,7 @@ ACTOR Future<Void> transactionLogging(Context* self) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> reply(Context* self) {
|
||||
ACTOR Future<Void> reply(CommitBatchContext* self) {
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
|
||||
const Optional<UID>& debugID = self->debugID;
|
||||
|
@ -1438,7 +1433,7 @@ ACTOR Future<Void> commitBatch(
|
|||
vector<CommitTransactionRequest>* trs,
|
||||
int currentBatchMemBytesCount) {
|
||||
//WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as possible
|
||||
state CommitBatch::Context context(self, trs, currentBatchMemBytesCount);
|
||||
state CommitBatch::CommitBatchContext context(self, trs, currentBatchMemBytesCount);
|
||||
|
||||
// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches) so we need to downgrade here
|
||||
wait(delay(0, TaskPriority::ProxyCommit));
|
||||
|
|
Loading…
Reference in New Issue