Merge branch 'cost'
This commit is contained in:
commit
5986ed3a3b
|
@ -157,12 +157,14 @@ struct CommitTransactionRequest : TimedRequest {
|
|||
ReplyPromise<CommitID> reply;
|
||||
uint32_t flags;
|
||||
Optional<UID> debugID;
|
||||
TransactionCommitCostEstimation commitCostEstimation;
|
||||
TagSet tagSet;
|
||||
|
||||
CommitTransactionRequest() : flags(0) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, transaction, reply, arena, flags, debugID);
|
||||
serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -3363,6 +3363,34 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
}
|
||||
}
|
||||
|
||||
TransactionCommitCostEstimation Transaction::estimateCommitCosts(bool traceEvent) const {
|
||||
auto it = tr.transaction.mutations.cbegin();
|
||||
auto end = tr.transaction.mutations.cend();
|
||||
TransactionCommitCostEstimation trCommitCosts;
|
||||
for (; it != end; ++it) {
|
||||
if (it->type == MutationRef::Type::SetValue) {
|
||||
trCommitCosts.bytesWrite += it->expectedSize();
|
||||
trCommitCosts.numWrite ++;
|
||||
}
|
||||
else if (it->isAtomicOp()) {
|
||||
trCommitCosts.bytesAtomicWrite += it->expectedSize();
|
||||
trCommitCosts.numAtomicWrite ++;
|
||||
}
|
||||
else if (it->type == MutationRef::Type::ClearRange) {
|
||||
// TODO how much data a clear would delete: 1. shard-map 2. storage server
|
||||
}
|
||||
}
|
||||
if (traceEvent) {
|
||||
TraceEvent(SevDebug, "TransactionCostEstimation")
|
||||
.detail("numWrite", trCommitCosts.numWrite)
|
||||
.detail("byteWrites", trCommitCosts.bytesWrite)
|
||||
.detail("numAtomicWrite", trCommitCosts.numAtomicWrite)
|
||||
.detail("bytesAtomicWrite", trCommitCosts.bytesAtomicWrite)
|
||||
.detail("bytesClearEst", trCommitCosts.bytesClearEst);
|
||||
}
|
||||
return trCommitCosts;
|
||||
}
|
||||
|
||||
Future<Void> Transaction::commitMutations() {
|
||||
try {
|
||||
//if this is a read-only transaction return immediately
|
||||
|
@ -3381,6 +3409,8 @@ Future<Void> Transaction::commitMutations() {
|
|||
|
||||
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
|
||||
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
|
||||
tr.commitCostEstimation = estimateCommitCosts(true);
|
||||
tr.tagSet = options.tags;
|
||||
|
||||
size_t transactionSize = getSize();
|
||||
if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
|
||||
|
|
|
@ -320,6 +320,8 @@ public:
|
|||
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
|
||||
}
|
||||
|
||||
TransactionCommitCostEstimation estimateCommitCosts(bool traceEvent = false) const;
|
||||
|
||||
private:
|
||||
Future<Version> getReadVersion(uint32_t flags);
|
||||
Database cx;
|
||||
|
|
|
@ -124,4 +124,16 @@ struct HaltRatekeeperRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct TransactionCommitCostEstimation {
|
||||
int64_t numWrite = 0;
|
||||
int64_t numAtomicWrite = 0;
|
||||
unsigned long bytesWrite = 0;
|
||||
unsigned long bytesClearEst = 0;
|
||||
unsigned long bytesAtomicWrite = 0;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite);
|
||||
}
|
||||
};
|
||||
#endif //FDBSERVER_RATEKEEPERINTERFACE_H
|
||||
|
|
Loading…
Reference in New Issue