solve some code reviews

This commit is contained in:
Xiaoxi Wang 2020-07-14 17:19:55 +00:00
parent c56daf3be7
commit a310faf9d1
6 changed files with 31 additions and 24 deletions

View File

@ -232,7 +232,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
init( COMMIT_CLEAR_COST_ESTIMATE_METHOD, 0.0);
init( EXPENSIVE_COMMIT_COST_ESTIMATION_FRAC, 0.0);
// clang-format on
}

View File

@ -219,7 +219,7 @@ public:
double TAG_THROTTLE_RECHECK_INTERVAL;
double TAG_THROTTLE_EXPIRATION_INTERVAL;
double COMMIT_CLEAR_COST_ESTIMATE_METHOD; // 0 --> estimate through shard-map; 1 --> estimate by asking StorageServer
double EXPENSIVE_COMMIT_COST_ESTIMATION_FRAC; // 0 --> estimate through shard-map; 1 --> estimate by asking StorageServer
// (more accurate but more expensive)
ClientKnobs();

View File

@ -3269,7 +3269,8 @@ ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* s
trCommitCosts.bytesAtomicWrite += it->expectedSize();
trCommitCosts.numAtomicWrite++;
} else if (it->type == MutationRef::Type::ClearRange) {
if (deterministicRandom()->random01() < CLIENT_KNOBS->COMMIT_CLEAR_COST_ESTIMATE_METHOD) {
trCommitCosts.numClear ++;
if (deterministicRandom()->random01() < CLIENT_KNOBS->EXPENSIVE_COMMIT_COST_ESTIMATION_FRAC) {
try {
StorageMetrics m = wait(self->getStorageMetrics(KeyRangeRef(it->param1, it->param2), -1));
trCommitCosts.bytesClearEst += m.bytes;

View File

@ -447,6 +447,7 @@ struct ProxyCommitData {
NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation;
TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
@ -805,9 +806,11 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
}
// Commit one batch of transactions trs
ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRequest> trs,
int currentBatchMemBytesCount,
TransactionTagMap<TransactionCommitCostEstimation>* transactionTagCommitCostEst) {
ACTOR Future<Void> commitBatch(
ProxyCommitData* self,
vector<CommitTransactionRequest> trs,
int currentBatchMemBytesCount)
{
//WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as possible
state int64_t localBatchNumber = ++self->localCommitBatchesStarted;
state LogPushData toCommit(self->logSystem);
@ -1338,7 +1341,7 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self, vector<CommitTransactionRe
trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
// aggregate commit cost estimation iff committed
for (auto& tag : trs[t].tagSet) {
(*transactionTagCommitCostEst)[tag] += trs[t].commitCostEstimation;
(self->transactionTagCommitCostEst)[tag] += trs[t].commitCostEstimation;
}
}
else if (committed[t] == ConflictBatch::TransactionTooOld) {
@ -1545,10 +1548,12 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
}
ACTOR static Future<Void> transactionStarter(
MasterProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db, PromiseStream<Future<Void>> addActor,
MasterProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<TransactionCommitCostEstimation>* transactionTagCommitCostEst) {
GetHealthMetricsReply* detailedHealthMetricsReply)
{
state double lastGRVTime = 0;
state PromiseStream<Void> GRVTimer;
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
@ -1571,7 +1576,7 @@ ACTOR static Future<Void> transactionStarter(
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
transactionTagCommitCostEst));
&(commitData->transactionTagCommitCostEst)));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo,
&transactionTagCounter));
@ -2070,8 +2075,6 @@ ACTOR Future<Void> masterProxyServerCore(
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
state TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
addActor.send( traceRole(Role::MASTER_PROXY, proxy.id()) );
@ -2105,7 +2108,7 @@ ACTOR Future<Void> masterProxyServerCore(
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply, &transactionTagCommitCostEst));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, addActor, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
@ -2149,7 +2152,7 @@ ACTOR Future<Void> masterProxyServerCore(
lastCommit = now();
if (trs.size() || lastCommitComplete.isReady()) {
lastCommitComplete = commitBatch(&commitData, trs, batchBytes, &transactionTagCommitCostEst);
lastCommitComplete = commitBatch(&commitData, trs, batchBytes);
addActor.send(lastCommitComplete);
}
}

View File

@ -1290,6 +1290,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
.detail("ByteWrites", tag.second.bytesWrite)
.detail("NumAtomicWrite", tag.second.numAtomicWrite)
.detail("BytesAtomicWrite", tag.second.bytesAtomicWrite)
.detail("NumClear", tag.second.numClear)
.detail("BytesClearEst", tag.second.bytesClearEst);
}
}

View File

@ -78,18 +78,20 @@ struct ClientTagThrottleLimits {
struct TransactionCommitCostEstimation {
int64_t numWrite = 0;
int64_t numAtomicWrite = 0;
int64_t numClear = 0;
unsigned long bytesWrite = 0;
unsigned long bytesClearEst = 0;
unsigned long bytesAtomicWrite = 0;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite);
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear);
}
TransactionCommitCostEstimation& operator += (const TransactionCommitCostEstimation& other) {
numWrite += other.numWrite;
numAtomicWrite += other.numAtomicWrite;
numClear += other.numClear;
bytesWrite += other.bytesWrite;
bytesClearEst += other.bytesClearEst;
bytesAtomicWrite += other.numAtomicWrite;