Implemented first version of Transaction::getTotalCost, along with workload
This commit is contained in:
parent
914dfd7438
commit
84aa815026
|
@ -3441,6 +3441,8 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
|
|||
}
|
||||
trState->cx->getValueCompleted->latency = timer_int() - startTime;
|
||||
trState->cx->getValueCompleted->log();
|
||||
trState->totalCost +=
|
||||
getReadOperationCost(key.size() + (reply.value.present() ? reply.value.get().size() : 0));
|
||||
|
||||
if (getValueID.present()) {
|
||||
g_traceBatch.addEvent("GetValueDebug",
|
||||
|
@ -5751,6 +5753,7 @@ void Transaction::set(const KeyRef& key, const ValueRef& value, AddConflictRange
|
|||
auto r = singleKeyRange(key, req.arena);
|
||||
auto v = ValueRef(req.arena, value);
|
||||
t.mutations.emplace_back(req.arena, MutationRef::SetValue, r.begin, v);
|
||||
trState->totalCost += getWriteOperationCost(key.expectedSize() + value.expectedSize());
|
||||
|
||||
if (addConflictRange) {
|
||||
t.write_conflict_ranges.push_back(req.arena, r);
|
||||
|
@ -5780,6 +5783,7 @@ void Transaction::atomicOp(const KeyRef& key,
|
|||
auto v = ValueRef(req.arena, operand);
|
||||
|
||||
t.mutations.emplace_back(req.arena, operationType, r.begin, v);
|
||||
trState->totalCost += getWriteOperationCost(key.expectedSize());
|
||||
|
||||
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
|
||||
t.write_conflict_ranges.push_back(req.arena, r);
|
||||
|
@ -6215,14 +6219,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
|
|||
state int i = 0;
|
||||
|
||||
for (; i < transaction->mutations.size(); ++i) {
|
||||
auto* it = &transaction->mutations[i];
|
||||
auto const& mutation = transaction->mutations[i];
|
||||
|
||||
if (it->type == MutationRef::Type::SetValue || it->isAtomicOp()) {
|
||||
if (mutation.type == MutationRef::Type::SetValue || mutation.isAtomicOp()) {
|
||||
trCommitCosts.opsCount++;
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(it->expectedSize());
|
||||
} else if (it->type == MutationRef::Type::ClearRange) {
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(mutation.expectedSize());
|
||||
} else if (mutation.type == MutationRef::Type::ClearRange) {
|
||||
trCommitCosts.opsCount++;
|
||||
keyRange = KeyRangeRef(it->param1, it->param2);
|
||||
keyRange = KeyRangeRef(mutation.param1, mutation.param2);
|
||||
if (trState->options.expensiveClearCostEstimation) {
|
||||
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY));
|
||||
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
|
||||
|
|
|
@ -249,6 +249,9 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
|||
SpanContext spanContext;
|
||||
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
|
||||
bool readVersionObtainedFromGrvProxy;
|
||||
// Measured by summing the bytes accessed by each read and write operations,
|
||||
// after rounding up to the nearest page size and applying a write penalty
|
||||
uint64_t totalCost = 0;
|
||||
|
||||
// Special flag to skip prepending tenant prefix to mutations and conflict ranges
|
||||
// when a dummy, internal transaction gets commited. The sole purpose of commitDummyTransaction() is to
|
||||
|
@ -445,6 +448,8 @@ public:
|
|||
// May be called only after commit() returns success
|
||||
Version getCommittedVersion() const { return trState->committedVersion; }
|
||||
|
||||
uint64_t getTotalCost() const { return trState->totalCost; }
|
||||
|
||||
// Will be fulfilled only after commit() returns success
|
||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
|
@ -568,6 +573,10 @@ inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
|||
return (bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1;
|
||||
}
|
||||
|
||||
inline uint64_t getReadOperationCost(uint64_t bytes) {
|
||||
return (bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1;
|
||||
}
|
||||
|
||||
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
|
||||
// will be 1. Otherwise, the value will be 0.
|
||||
// Returns the FDB version at which the transaction was committed.
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/TransactionTagCounter.h"
|
||||
#include "flow/Trace.h"
|
||||
|
@ -90,9 +91,6 @@ class TransactionTagCounterImpl {
|
|||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||
|
||||
// Round up to the nearest page size
|
||||
static int64_t costFunction(int64_t bytes) { return (bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
|
||||
public:
|
||||
TransactionTagCounterImpl(UID thisServerID)
|
||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
||||
|
@ -101,7 +99,7 @@ public:
|
|||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if (tags.present()) {
|
||||
CODE_PROBE(true, "Tracking transaction tag in counter");
|
||||
double cost = costFunction(bytes);
|
||||
auto const cost = getReadOperationCost(bytes);
|
||||
for (auto& tag : tags.get()) {
|
||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
||||
topTags.incrementCount(tag, count, cost);
|
||||
|
|
|
@ -1984,7 +1984,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
||||
data->transactionTagCounter.addRequest(req.tags, req.key.size() + resultSize);
|
||||
|
||||
++data->counters.finishedQueries;
|
||||
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* TransactionCost.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
class TransactionCostWorkload : public TestWorkload {
|
||||
int iterations{ 1000 };
|
||||
Key prefix;
|
||||
bool debugTransactions{ false };
|
||||
|
||||
static constexpr auto transactionTypes = 3;
|
||||
|
||||
ACTOR static Future<Void> read(Database cx, Optional<UID> debugID) {
|
||||
state Transaction tr(cx);
|
||||
if (debugID.present()) {
|
||||
tr.debugTransaction(debugID.get());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
ASSERT_EQ(tr.getTotalCost(), 0);
|
||||
wait(success(tr.get("foo"_sr)));
|
||||
ASSERT_EQ(tr.getTotalCost(), 1);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> write(Database cx, Optional<UID> debugID) {
|
||||
state Transaction tr(cx);
|
||||
if (debugID.present()) {
|
||||
tr.debugTransaction(debugID.get());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
ASSERT_EQ(tr.getTotalCost(), 0);
|
||||
tr.set("foo"_sr, "bar"_sr);
|
||||
ASSERT_EQ(tr.getTotalCost(), 1);
|
||||
wait(tr.commit());
|
||||
ASSERT_EQ(tr.getTotalCost(), 1);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TransactionCost_Error").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> clear(Database cx, Optional<UID> debugID) {
|
||||
state Transaction tr(cx);
|
||||
if (debugID.present()) {
|
||||
tr.debugTransaction(debugID.get());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
ASSERT_EQ(tr.getTotalCost(), 0);
|
||||
tr.clear(singleKeyRange("foo"_sr));
|
||||
wait(tr.commit());
|
||||
ASSERT_EQ(tr.getTotalCost(), 0);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> start(TransactionCostWorkload* self, Database cx) {
|
||||
state uint64_t i = 0;
|
||||
state Future<Void> f;
|
||||
for (; i < self->iterations; ++i) {
|
||||
int rand = deterministicRandom()->randomInt(0, transactionTypes);
|
||||
auto const debugID = self->debugTransactions ? UID(i << 32, i << 32) : Optional<UID>();
|
||||
if (rand == 0) {
|
||||
f = read(cx, debugID);
|
||||
} else if (rand == 1) {
|
||||
f = write(cx, debugID);
|
||||
} else if (rand == 2) {
|
||||
f = clear(cx, debugID);
|
||||
}
|
||||
wait(f);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
TransactionCostWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
iterations = getOption(options, "iterations"_sr, 1000);
|
||||
prefix = getOption(options, "prefix"_sr, "transactionCost/"_sr);
|
||||
debugTransactions = getOption(options, "debug"_sr, false);
|
||||
}
|
||||
|
||||
static constexpr auto NAME = "TransactionCost";
|
||||
|
||||
std::string description() const override { return NAME; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
Future<Void> start(Database const& cx) override { return start(this, cx); }
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<TransactionCostWorkload> Transaction("TransactionCost");
|
|
@ -240,6 +240,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/Throttling.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionCost.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/WriteTagThrottling.toml)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
[[test]]
|
||||
testTitle = 'TransactionCostTest'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'TransactionCost'
|
||||
iterations = 1000
|
Loading…
Reference in New Issue