Merge branch 'main' of github.com:apple/foundationdb into monitorusage
This commit is contained in:
commit
82ab299788
|
@ -34,20 +34,25 @@ Commit proxies would combine idempotency IDs for transactions within a batch. Th
|
|||
|
||||
## Value format
|
||||
```
|
||||
${protocol_version}(${n (1 byte)}${idempotency_id (n bytes)}${low_order_byte_of_batch_index})*
|
||||
${protocol_version}${timestamp}(${n (1 byte)}${idempotency_id (n bytes)}${low_order_byte_of_batch_index})*
|
||||
```
|
||||
|
||||
The batch index for each idempotency id can be reconstructed from the high order byte and low order bytes stored in the key and value, respectively. This is necessary for an "unknown_committed" transaction to recover their full version stamp. Batch index is a `short int`, i.e. 2 bytes.
|
||||
|
||||
The timestamp is the unix epoch stored as a little-endian signed 64-bit integer.
|
||||
|
||||
# Cleaning up old idempotency ids
|
||||
|
||||
After learning the result of an attempt to commit a transaction with an
|
||||
idempotency id, the client may inform the cluster that it's no longer interested
|
||||
in that id and the cluster can reclaim the space used to store the idempotency
|
||||
id. The happy-path reply to a CommitTransactionRequest will say which proxy this
|
||||
request should be sent to, and all idempotency ids for a database key will be
|
||||
sent to the same proxy so that it can clear the key once it receives all of
|
||||
them. The first proxy will also periodically clean up the oldest idempotency ids, based on a policy determined by two knobs. One knob will control the minimum lifetime of an idempotency id (i.e. don't delete anything younger than 1 day), and the other will control the target byte size of the idempotency keys (e.g. keep 100 MB of idempotency keys around).
|
||||
id. The commit proxy that committed a batch is responsible for cleaning all
|
||||
idempotency kv pairs from that batch, so clients must tell that specific proxy
|
||||
that they're done with the id. The first proxy will also periodically clean up
|
||||
the oldest idempotency ids, based on a policy determined by two knobs. One knob
|
||||
will control the minimum lifetime of an idempotency id (i.e. don't delete
|
||||
anything younger than 1 day), and the other will control the target byte size of
|
||||
the idempotency keys (e.g. keep 100 MB of idempotency keys around).
|
||||
|
||||
# Commit protocol
|
||||
|
||||
|
|
|
@ -1,82 +0,0 @@
|
|||
/*
|
||||
* DatabaseContext.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture.
|
||||
// Basically the reference count could be increased, or the same watch is refreshed, or the watch might be cancelled
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
watchCounterMap[mapKey].insert(version);
|
||||
return watchCounterMap[mapKey].size();
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
auto mapKeyIter = watchCounterMap.find(mapKey);
|
||||
if (mapKeyIter == std::end(watchCounterMap)) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto& versionSet = mapKeyIter->second;
|
||||
auto versionIter = versionSet.find(version);
|
||||
|
||||
if (versionIter == std::end(versionSet)) {
|
||||
// Version not found, the watch might be cleared before.
|
||||
return versionSet.size();
|
||||
}
|
||||
versionSet.erase(versionIter);
|
||||
|
||||
const auto count = versionSet.size();
|
||||
// The metadata might be deleted somewhere else, before calling this decreaseWatchRefCount
|
||||
if (auto metadata = getWatchMetadata(tenantID, key); metadata.isValid() && versionSet.size() == 0) {
|
||||
// It is a *must* to cancel the watchFutureSS manually. watchFutureSS waits for watchStorageServerResp, which
|
||||
// holds a reference to the metadata. If the ACTOR is not cancelled, it indirectly holds a Future waiting for
|
||||
// itself.
|
||||
metadata->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* IdempotencyId.cpp
|
||||
* IdempotencyId.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -18,9 +18,11 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/IdempotencyId.h"
|
||||
#include "fdbclient/IdempotencyId.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // this has to be the last include
|
||||
|
||||
struct IdempotencyIdKVBuilderImpl {
|
||||
Optional<Version> commitVersion;
|
||||
|
@ -40,6 +42,7 @@ void IdempotencyIdKVBuilder::add(const IdempotencyIdRef& id, uint16_t batchIndex
|
|||
ASSERT((batchIndex >> 8) == impl->batchIndexHighOrderByte.get());
|
||||
} else {
|
||||
impl->batchIndexHighOrderByte = batchIndex >> 8;
|
||||
impl->value << int64_t(now());
|
||||
}
|
||||
StringRef s = id.asStringRefUnsafe();
|
||||
impl->value << uint8_t(s.size());
|
||||
|
@ -53,19 +56,17 @@ Optional<KeyValue> IdempotencyIdKVBuilder::buildAndClear() {
|
|||
return {};
|
||||
}
|
||||
|
||||
BinaryWriter key{ Unversioned() };
|
||||
key.serializeBytes(idempotencyIdKeys.begin);
|
||||
key << bigEndian64(impl->commitVersion.get());
|
||||
key << impl->batchIndexHighOrderByte.get();
|
||||
|
||||
Value v = impl->value.toValue();
|
||||
|
||||
KeyRef key =
|
||||
makeIdempotencySingleKeyRange(v.arena(), impl->commitVersion.get(), impl->batchIndexHighOrderByte.get()).begin;
|
||||
|
||||
impl->value = BinaryWriter(IncludeVersion());
|
||||
impl->batchIndexHighOrderByte = Optional<uint8_t>();
|
||||
|
||||
Optional<KeyValue> result = KeyValue();
|
||||
result.get().arena() = v.arena();
|
||||
result.get().key = key.toValue(result.get().arena());
|
||||
result.get().key = key;
|
||||
result.get().value = v;
|
||||
return result;
|
||||
}
|
||||
|
@ -86,6 +87,8 @@ Optional<CommitResult> kvContainsIdempotencyId(const KeyValueRef& kv, const Idem
|
|||
|
||||
// Even if id is a substring of value, it may still not actually contain it.
|
||||
BinaryReader reader(kv.value.begin(), kv.value.size(), IncludeVersion());
|
||||
int64_t timestamp; // ignored
|
||||
reader >> timestamp;
|
||||
while (!reader.empty()) {
|
||||
uint8_t length;
|
||||
reader >> length;
|
||||
|
@ -93,13 +96,9 @@ Optional<CommitResult> kvContainsIdempotencyId(const KeyValueRef& kv, const Idem
|
|||
uint8_t lowOrderBatchIndex;
|
||||
reader >> lowOrderBatchIndex;
|
||||
if (candidate == needle) {
|
||||
BinaryReader reader(kv.key.begin(), kv.key.size(), Unversioned());
|
||||
reader.readBytes(idempotencyIdKeys.begin.size());
|
||||
Version commitVersion;
|
||||
reader >> commitVersion;
|
||||
commitVersion = bigEndian64(commitVersion);
|
||||
uint8_t highOrderBatchIndex;
|
||||
reader >> highOrderBatchIndex;
|
||||
decodeIdempotencyKey(kv.key, commitVersion, highOrderBatchIndex);
|
||||
return CommitResult{ commitVersion,
|
||||
static_cast<uint16_t>((uint16_t(highOrderBatchIndex) << 8) |
|
||||
uint16_t(lowOrderBatchIndex)) };
|
||||
|
@ -172,4 +171,35 @@ TEST_CASE("/fdbclient/IdempotencyId/serialization") {
|
|||
ASSERT(t == id);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
KeyRangeRef makeIdempotencySingleKeyRange(Arena& arena, Version version, uint8_t highOrderBatchIndex) {
|
||||
static const auto size =
|
||||
idempotencyIdKeys.begin.size() + sizeof(version) + sizeof(highOrderBatchIndex) + /*\x00*/ 1;
|
||||
|
||||
StringRef second = makeString(size, arena);
|
||||
auto* dst = mutateString(second);
|
||||
|
||||
memcpy(dst, idempotencyIdKeys.begin.begin(), idempotencyIdKeys.begin.size());
|
||||
dst += idempotencyIdKeys.begin.size();
|
||||
|
||||
version = bigEndian64(version);
|
||||
memcpy(dst, &version, sizeof(version));
|
||||
dst += sizeof(version);
|
||||
|
||||
*dst++ = highOrderBatchIndex;
|
||||
|
||||
*dst++ = 0;
|
||||
|
||||
ASSERT_EQ(dst - second.begin(), size);
|
||||
|
||||
return KeyRangeRef(second.removeSuffix("\x00"_sr), second);
|
||||
}
|
||||
|
||||
void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrderBatchIndex) {
|
||||
BinaryReader reader(key, Unversioned());
|
||||
reader.readBytes(idempotencyIdKeys.begin.size());
|
||||
reader >> commitVersion;
|
||||
commitVersion = bigEndian64(commitVersion);
|
||||
reader >> highOrderBatchIndex;
|
||||
}
|
|
@ -2163,14 +2163,14 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::addWatchCounter() {
|
||||
void DatabaseContext::addWatch() {
|
||||
if (outstandingWatches >= maxOutstandingWatches)
|
||||
throw too_many_watches();
|
||||
|
||||
++outstandingWatches;
|
||||
}
|
||||
|
||||
void DatabaseContext::removeWatchCounter() {
|
||||
void DatabaseContext::removeWatch() {
|
||||
--outstandingWatches;
|
||||
ASSERT(outstandingWatches >= 0);
|
||||
}
|
||||
|
@ -2382,6 +2382,25 @@ Database Database::createSimulatedExtraDatabase(std::string connectionString, Op
|
|||
return db;
|
||||
}
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
watchMap[std::make_pair(metadata->parameters->tenant.tenantId, metadata->parameters->key)] = metadata;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
watchMap.erase(std::make_pair(tenantId, key));
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
}
|
||||
|
||||
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
|
||||
ASSERT(db);
|
||||
return db->transactionDefaults;
|
||||
|
@ -3894,50 +3913,6 @@ Future<Void> getWatchFuture(Database cx, Reference<WatchParameters> parameters)
|
|||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: Since an ACTOR could receive multiple exceptions for a single catch clause, e.g. broken promise together with
|
||||
// operation cancelled, If the decreaseWatchRefCount is placed at the catch clause, it might be triggered for multiple
|
||||
// times. One could check if the SAV isSet, but seems a more intuitive way is to use RAII-style constructor/destructor
|
||||
// pair. Yet the object has to be constructed after a wait statement, so it must be trivially-constructible. This
|
||||
// requires move-assignment operator implemented.
|
||||
class WatchRefCountUpdater {
|
||||
Database cx;
|
||||
int64_t tenantID;
|
||||
KeyRef key;
|
||||
Version version;
|
||||
|
||||
public:
|
||||
WatchRefCountUpdater() = default;
|
||||
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_, const Version& ver)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_), version(ver) {}
|
||||
|
||||
WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) {
|
||||
// Since this class is only used by watchValueMap, and it is used *AFTER* a wait statement, this class is first
|
||||
// initialized by default constructor, then, after the wait, this function is called to assign the actual
|
||||
// database, key, etc., to re-initialize this object. At this stage, the reference count can be increased. And
|
||||
// since the database object is moved, the rvalue will have null reference to the DatabaseContext and will not
|
||||
// reduce the reference count.
|
||||
cx = std::move(other.cx);
|
||||
tenantID = std::move(other.tenantID);
|
||||
key = std::move(other.key);
|
||||
version = std::move(other.version);
|
||||
|
||||
cx->increaseWatchRefCount(tenantID, key, version);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~WatchRefCountUpdater() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key, version);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> watchValueMap(Future<Version> version,
|
||||
TenantInfo tenant,
|
||||
Key key,
|
||||
|
@ -3949,7 +3924,6 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies) {
|
||||
state Version ver = wait(version);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key, ver);
|
||||
|
||||
wait(getWatchFuture(cx,
|
||||
makeReference<WatchParameters>(
|
||||
|
@ -5482,11 +5456,11 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
cx->removeWatchCounter();
|
||||
cx->removeWatch();
|
||||
throw;
|
||||
}
|
||||
|
||||
cx->removeWatchCounter();
|
||||
cx->removeWatch();
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -5497,7 +5471,7 @@ Future<Version> Transaction::getRawReadVersion() {
|
|||
Future<Void> Transaction::watch(Reference<Watch> watch) {
|
||||
++trState->cx->transactionWatchRequests;
|
||||
|
||||
trState->cx->addWatchCounter();
|
||||
trState->cx->addWatch();
|
||||
watches.push_back(watch);
|
||||
return ::watch(
|
||||
watch,
|
||||
|
@ -6168,6 +6142,7 @@ ACTOR static Future<Optional<CommitResult>> determineCommitStatus(Reference<Tran
|
|||
IdempotencyIdRef idempotencyId) {
|
||||
state Transaction tr(trState->cx);
|
||||
state int retries = 0;
|
||||
state Version expiredVersion;
|
||||
state Span span("NAPI:determineCommitStatus"_loc, trState->spanContext);
|
||||
tr.span.setParent(span.context);
|
||||
loop {
|
||||
|
@ -6177,11 +6152,19 @@ ACTOR static Future<Optional<CommitResult>> determineCommitStatus(Reference<Tran
|
|||
tr.trState->authToken = trState->authToken;
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
KeyBackedObjectProperty<IdempotencyIdsExpiredVersion, _Unversioned> expiredKey(idempotencyIdsExpiredVersion,
|
||||
Unversioned());
|
||||
IdempotencyIdsExpiredVersion expiredVal = wait(expiredKey.getD(&tr));
|
||||
expiredVersion = expiredVal.expired;
|
||||
if (expiredVersion >= minPossibleCommitVersion) {
|
||||
throw commit_unknown_result_fatal();
|
||||
}
|
||||
Version rv = wait(tr.getReadVersion());
|
||||
TraceEvent("DetermineCommitStatusAttempt")
|
||||
.detail("IdempotencyId", idempotencyId.asStringRefUnsafe())
|
||||
.detail("Retries", retries)
|
||||
.detail("ReadVersion", rv)
|
||||
.detail("ExpiredVersion", expiredVersion)
|
||||
.detail("MinPossibleCommitVersion", minPossibleCommitVersion)
|
||||
.detail("MaxPossibleCommitVersion", maxPossibleCommitVersion);
|
||||
KeyRange possibleRange =
|
||||
|
@ -6425,6 +6408,12 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
|
||||
req.debugID = commitID;
|
||||
state Future<CommitID> reply;
|
||||
// Only gets filled in in the happy path where we don't have to commit on the first proxy or use provisional
|
||||
// proxies
|
||||
state int alternativeChosen = -1;
|
||||
// Only valid if alternativeChosen >= 0
|
||||
state Reference<CommitProxyInfo> proxiesUsed;
|
||||
|
||||
if (trState->options.commitOnFirstProxy) {
|
||||
if (trState->cx->clientInfo->get().firstCommitProxy.present()) {
|
||||
reply = throwErrorOr(brokenPromiseToMaybeDelivered(
|
||||
|
@ -6435,11 +6424,13 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
: Never();
|
||||
}
|
||||
} else {
|
||||
reply = basicLoadBalance(trState->cx->getCommitProxies(trState->useProvisionalProxies),
|
||||
proxiesUsed = trState->cx->getCommitProxies(trState->useProvisionalProxies);
|
||||
reply = basicLoadBalance(proxiesUsed,
|
||||
&CommitProxyInterface::commit,
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::True);
|
||||
AtMostOnce::True,
|
||||
&alternativeChosen);
|
||||
}
|
||||
state double grvTime = now();
|
||||
choose {
|
||||
|
@ -6489,6 +6480,12 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
ci.version,
|
||||
req,
|
||||
trState->tenant()));
|
||||
if (trState->automaticIdempotency && alternativeChosen >= 0) {
|
||||
// Automatic idempotency means we're responsible for best effort idempotency id clean up
|
||||
proxiesUsed->getInterface(alternativeChosen)
|
||||
.expireIdempotencyId.send(ExpireIdempotencyIdRequest{
|
||||
ci.version, uint8_t(ci.txnBatchId >> 8), trState->getTenantInfo() });
|
||||
}
|
||||
return Void();
|
||||
} else {
|
||||
// clear the RYW transaction which contains previous conflicting keys
|
||||
|
@ -6974,11 +6971,16 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
|
|||
throw e;
|
||||
}
|
||||
tr.idempotencyId = IdempotencyIdRef(tr.arena, IdempotencyIdRef(value.get()));
|
||||
trState->automaticIdempotency = false;
|
||||
break;
|
||||
case FDBTransactionOptions::AUTOMATIC_IDEMPOTENCY:
|
||||
validateOptionValueNotPresent(value);
|
||||
tr.idempotencyId = IdempotencyIdRef(
|
||||
tr.arena, IdempotencyIdRef(BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned())));
|
||||
if (!tr.idempotencyId.valid()) {
|
||||
tr.idempotencyId = IdempotencyIdRef(
|
||||
tr.arena,
|
||||
IdempotencyIdRef(BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned())));
|
||||
}
|
||||
trState->automaticIdempotency = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
|
|
|
@ -1022,6 +1022,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
// NOTE: 'token-name" can NOT contain '#' character
|
||||
init( REST_KMS_CONNECTOR_VALIDATION_TOKEN_DETAILS, "");
|
||||
|
||||
// Drop in-memory state associated with an idempotency id after this many seconds. Once dropped, this id cannot be
|
||||
// expired proactively, but will eventually get cleaned up by the idempotency id cleaner.
|
||||
init( IDEMPOTENCY_ID_IN_MEMORY_LIFETIME, 10);
|
||||
|
||||
// clang-format on
|
||||
|
||||
if (clientKnobs) {
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#define FDBCLIENT_BUILD_IDEMPOTENCY_ID_MUTATIONS_H
|
||||
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/IdempotencyId.h"
|
||||
#include "fdbclient/IdempotencyId.actor.h"
|
||||
|
||||
#pragma once
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/GlobalConfig.h"
|
||||
#include "fdbclient/GrvProxyInterface.h"
|
||||
#include "fdbclient/IdempotencyId.h"
|
||||
#include "fdbclient/IdempotencyId.actor.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbclient/VersionVector.h"
|
||||
|
@ -61,6 +61,7 @@ struct CommitProxyInterface {
|
|||
RequestStream<struct ProxySnapRequest> proxySnapReq;
|
||||
RequestStream<struct ExclusionSafetyCheckRequest> exclusionSafetyCheckReq;
|
||||
RequestStream<struct GetDDMetricsRequest> getDDMetrics;
|
||||
PublicRequestStream<struct ExpireIdempotencyIdRequest> expireIdempotencyId;
|
||||
|
||||
UID id() const { return commit.getEndpoint().token; }
|
||||
std::string toString() const { return id().shortString(); }
|
||||
|
@ -87,6 +88,8 @@ struct CommitProxyInterface {
|
|||
exclusionSafetyCheckReq =
|
||||
RequestStream<struct ExclusionSafetyCheckRequest>(commit.getEndpoint().getAdjustedEndpoint(8));
|
||||
getDDMetrics = RequestStream<struct GetDDMetricsRequest>(commit.getEndpoint().getAdjustedEndpoint(9));
|
||||
expireIdempotencyId =
|
||||
PublicRequestStream<struct ExpireIdempotencyIdRequest>(commit.getEndpoint().getAdjustedEndpoint(10));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,6 +106,7 @@ struct CommitProxyInterface {
|
|||
streams.push_back(proxySnapReq.getReceiver());
|
||||
streams.push_back(exclusionSafetyCheckReq.getReceiver());
|
||||
streams.push_back(getDDMetrics.getReceiver());
|
||||
streams.push_back(expireIdempotencyId.getReceiver());
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -151,6 +155,24 @@ struct ClientDBInfo {
|
|||
}
|
||||
};
|
||||
|
||||
struct ExpireIdempotencyIdRequest {
|
||||
constexpr static FileIdentifier file_identifier = 1900933;
|
||||
Version commitVersion = invalidVersion;
|
||||
uint8_t batchIndexHighByte = 0;
|
||||
TenantInfo tenant;
|
||||
|
||||
ExpireIdempotencyIdRequest() {}
|
||||
ExpireIdempotencyIdRequest(Version commitVersion, uint8_t batchIndexHighByte, TenantInfo tenant)
|
||||
: commitVersion(commitVersion), batchIndexHighByte(batchIndexHighByte), tenant(tenant) {}
|
||||
|
||||
bool verify() const { return tenant.isAuthorized(); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, commitVersion, batchIndexHighByte, tenant);
|
||||
}
|
||||
};
|
||||
|
||||
struct CommitID {
|
||||
constexpr static FileIdentifier file_identifier = 14254927;
|
||||
Version version; // returns invalidVersion if transaction conflicts
|
||||
|
|
|
@ -328,32 +328,14 @@ public:
|
|||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
|
||||
|
||||
// Increases the counter of the number of watches in this DatabaseContext by 1. If the number of watches is too
|
||||
// many, throws too_many_watches.
|
||||
void addWatchCounter();
|
||||
|
||||
// Decrease the counter of the number of watches in this DatabaseContext by 1
|
||||
void removeWatchCounter();
|
||||
// Update the watch counter for the database
|
||||
void addWatch();
|
||||
void removeWatch();
|
||||
|
||||
// watch map operations
|
||||
|
||||
// Gets the watch metadata per tenant id and key
|
||||
Reference<WatchMetadata> getWatchMetadata(int64_t tenantId, KeyRef key) const;
|
||||
|
||||
// Refreshes the watch metadata. If the same watch is used (this is determined by the tenant id and the key), the
|
||||
// metadata will be updated.
|
||||
void setWatchMetadata(Reference<WatchMetadata> metadata);
|
||||
|
||||
// Removes the watch metadata
|
||||
void deleteWatchMetadata(int64_t tenant, KeyRef key);
|
||||
|
||||
// Increases reference count to the given watch. Returns the number of references to the watch.
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
// Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be
|
||||
// removed. Returns the number of references to the watch.
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
void clearWatchMetadata();
|
||||
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
|
||||
|
@ -721,26 +703,8 @@ public:
|
|||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
using WatchMapKey = std::pair<int64_t, Key>;
|
||||
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
|
||||
using WatchMapValue = Reference<WatchMetadata>;
|
||||
using WatchMap_t = std::unordered_map<WatchMapKey, WatchMapValue, WatchMapKeyHasher>;
|
||||
|
||||
WatchMap_t watchMap;
|
||||
|
||||
// The reason of using a multiset of Versions as counter instead of a simpler integer counter is due to the
|
||||
// possible race condition:
|
||||
//
|
||||
// 1. A watch to key A is set, the watchValueMap ACTOR, noted as X, starts waiting.
|
||||
// 2. All watches are cleared due to connection string change.
|
||||
// 3. The watch to key A is restarted with watchValueMap ACTOR Y.
|
||||
// 4. X receives the cancel exception, and tries to dereference the counter. This causes Y gets cancelled.
|
||||
//
|
||||
// By introducing versions, this race condition is solved.
|
||||
using WatchCounterMapValue = std::multiset<Version>;
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, WatchCounterMapValue, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
std::unordered_map<std::pair<int64_t, Key>, Reference<WatchMetadata>, boost::hash<std::pair<int64_t, Key>>>
|
||||
watchMap;
|
||||
};
|
||||
|
||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* IdempotencyId.h
|
||||
* IdempotencyId.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -18,8 +18,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_IDEMPOTENCYID_H
|
||||
#define FDBCLIENT_IDEMPOTENCYID_H
|
||||
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
|
||||
// version.
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_IDEMPOTENCY_ID_ACTOR_G_H)
|
||||
#define FDBCLIENT_IDEMPOTENCY_ID_ACTOR_G_H
|
||||
#include "fdbclient/IdempotencyId.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_IDEMPOTENCY_ID_ACTOR_H)
|
||||
#define FDBCLIENT_IDEMPOTENCY_ID_ACTOR_H
|
||||
|
||||
#pragma once
|
||||
|
||||
|
@ -28,12 +33,24 @@
|
|||
#include "flow/Arena.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/serialize.h"
|
||||
#include "flow/actorcompiler.h" // this has to be the last include
|
||||
|
||||
struct CommitResult {
|
||||
Version commitVersion;
|
||||
uint16_t batchIndex;
|
||||
};
|
||||
|
||||
// The type of the value stored at the key |idempotencyIdsExpiredVersion|
|
||||
struct IdempotencyIdsExpiredVersion {
|
||||
static constexpr auto file_identifier = 3746945;
|
||||
Version expired = 0;
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, expired);
|
||||
}
|
||||
};
|
||||
|
||||
// See design/idempotency_ids.md for more information. Designed so that the common case of a random 16 byte id does not
|
||||
// usually require indirection. Either invalid or an id with length >= 16 and < 256.
|
||||
struct IdempotencyIdRef {
|
||||
|
@ -163,4 +180,10 @@ private:
|
|||
// Check if id is present in kv, and if so return the commit version and batchIndex
|
||||
Optional<CommitResult> kvContainsIdempotencyId(const KeyValueRef& kv, const IdempotencyIdRef& id);
|
||||
|
||||
#endif
|
||||
// Make a range containing only the idempotency key associated with version and highOrderBatchIndex
|
||||
KeyRangeRef makeIdempotencySingleKeyRange(Arena& arena, Version version, uint8_t highOrderBatchIndex);
|
||||
|
||||
void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrderBatchIndex);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
|
@ -268,6 +268,8 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
|||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||
|
||||
bool automaticIdempotency = false;
|
||||
|
||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||
TransactionState(TaskPriority taskID, SpanContext spanContext)
|
||||
: taskID(taskID), spanContext(spanContext), tenantSet(false) {}
|
||||
|
|
|
@ -992,6 +992,9 @@ public:
|
|||
std::string REST_KMS_CONNECTOR_GET_ENCRYPTION_KEYS_ENDPOINT;
|
||||
std::string REST_KMS_CONNECTOR_GET_BLOB_METADATA_ENDPOINT;
|
||||
|
||||
// Idempotency ids
|
||||
double IDEMPOTENCY_ID_IN_MEMORY_LIFETIME;
|
||||
|
||||
ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
|
||||
void initialize(Randomize, ClientKnobs*, IsSimulated);
|
||||
};
|
||||
|
|
|
@ -45,7 +45,7 @@ struct CheckpointMetaData {
|
|||
|
||||
constexpr static FileIdentifier file_identifier = 13804342;
|
||||
Version version;
|
||||
KeyRange range;
|
||||
std::vector<KeyRange> ranges;
|
||||
int16_t format; // CheckpointFormat.
|
||||
UID ssID; // Storage server ID on which this checkpoint is created.
|
||||
UID checkpointID; // A unique id for this checkpoint.
|
||||
|
@ -58,11 +58,15 @@ struct CheckpointMetaData {
|
|||
|
||||
CheckpointMetaData() = default;
|
||||
CheckpointMetaData(KeyRange const& range, CheckpointFormat format, UID const& ssID, UID const& checkpointID)
|
||||
: version(invalidVersion), range(range), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending),
|
||||
referenceCount(0), gcTime(0) {}
|
||||
: version(invalidVersion), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending),
|
||||
referenceCount(0), gcTime(0) {
|
||||
this->ranges.push_back(range);
|
||||
}
|
||||
CheckpointMetaData(Version version, KeyRange const& range, CheckpointFormat format, UID checkpointID)
|
||||
: version(version), range(range), format(format), ssID(UID()), checkpointID(checkpointID), state(Pending),
|
||||
referenceCount(0), gcTime(0) {}
|
||||
: version(version), format(format), ssID(UID()), checkpointID(checkpointID), state(Pending), referenceCount(0),
|
||||
gcTime(0) {
|
||||
this->ranges.push_back(range);
|
||||
}
|
||||
|
||||
CheckpointState getState() const { return static_cast<CheckpointState>(state); }
|
||||
|
||||
|
@ -73,7 +77,7 @@ struct CheckpointMetaData {
|
|||
void setFormat(CheckpointFormat format) { this->format = static_cast<int16_t>(format); }
|
||||
|
||||
std::string toString() const {
|
||||
std::string res = "Checkpoint MetaData:\nRange: " + range.toString() + "\nVersion: " + std::to_string(version) +
|
||||
std::string res = "Checkpoint MetaData:\nRange: " + describe(ranges) + "\nVersion: " + std::to_string(version) +
|
||||
"\nFormat: " + std::to_string(format) + "\nServer: " + ssID.toString() +
|
||||
"\nID: " + checkpointID.toString() + "\nState: " + std::to_string(static_cast<int>(state)) +
|
||||
"\n";
|
||||
|
@ -82,7 +86,7 @@ struct CheckpointMetaData {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, range, format, state, checkpointID, ssID, gcTime, serializedCheckpoint);
|
||||
serializer(ar, version, ranges, format, state, checkpointID, ssID, gcTime, serializedCheckpoint);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -99,23 +103,28 @@ struct DataMoveMetaData {
|
|||
constexpr static FileIdentifier file_identifier = 13804362;
|
||||
UID id; // A unique id for this data move.
|
||||
Version version;
|
||||
KeyRange range;
|
||||
std::vector<KeyRange> ranges;
|
||||
int priority;
|
||||
std::set<UID> src;
|
||||
std::set<UID> dest;
|
||||
std::set<UID> checkpoints;
|
||||
int16_t phase; // DataMoveMetaData::Phase.
|
||||
int8_t mode;
|
||||
|
||||
DataMoveMetaData() = default;
|
||||
DataMoveMetaData(UID id, Version version, KeyRange range)
|
||||
: id(id), version(version), range(std::move(range)), priority(0) {}
|
||||
DataMoveMetaData(UID id, KeyRange range) : id(id), version(invalidVersion), range(std::move(range)), priority(0) {}
|
||||
DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) {
|
||||
this->ranges.push_back(range);
|
||||
}
|
||||
DataMoveMetaData(UID id, KeyRange range) : id(id), version(invalidVersion), priority(0), mode(0) {
|
||||
this->ranges.push_back(range);
|
||||
}
|
||||
|
||||
Phase getPhase() const { return static_cast<Phase>(phase); }
|
||||
|
||||
void setPhase(Phase phase) { this->phase = static_cast<int16_t>(phase); }
|
||||
|
||||
std::string toString() const {
|
||||
std::string res = "DataMoveMetaData: [ID]: " + id.shortString() + " [Range]: " + range.toString() +
|
||||
std::string res = "DataMoveMetaData: [ID]: " + id.shortString() + " [Range]: " + describe(ranges) +
|
||||
" [Phase]: " + std::to_string(static_cast<int>(phase)) +
|
||||
" [Source Servers]: " + describe(src) + " [Destination Servers]: " + describe(dest);
|
||||
return res;
|
||||
|
@ -123,7 +132,7 @@ struct DataMoveMetaData {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, id, version, range, phase, src, dest);
|
||||
serializer(ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ description is not currently required but encouraged.
|
|||
description="Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit." />
|
||||
<Option name="idempotency_id" code="504"
|
||||
paramType="String" paramDescription="Unique ID"
|
||||
description="Associate this transaction with this ID for the purpose of checking whether or not this transaction has already committed. Must be at least 16 bytes and less than 256 bytes. This feature is in development and not ready for general use."
|
||||
description="Associate this transaction with this ID for the purpose of checking whether or not this transaction has already committed. Must be at least 16 bytes and less than 256 bytes. This feature is in development and not ready for general use. Unless the automatic_idempotency option is set after this option, the client will not automatically attempt to remove this id from the cluster after a successful commit."
|
||||
hidden="true" />
|
||||
<Option name="automatic_idempotency" code="505"
|
||||
description="Automatically assign a random 16 byte idempotency id for this transaction. Prevents commits from failing with ``commit_unknown_result``. WARNING: If you are also using the multiversion client or transaction timeouts, if either cluster_version_changed or transaction_timed_out was thrown during a commit, then that commit may have already succeeded or may succeed in the future. This feature is in development and not ready for general use."
|
||||
|
|
|
@ -757,12 +757,18 @@ Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalanc
|
|||
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const void*);
|
||||
|
||||
// A simpler version of LoadBalance that does not send second requests where the list of servers are always fresh
|
||||
//
|
||||
// If |alternativeChosen| is not null, then atMostOnce must be True, and if the returned future completes successfully
|
||||
// then *alternativeChosen will be the alternative to which the message was sent. *alternativeChosen must outlive the
|
||||
// returned future.
|
||||
ACTOR template <class Interface, class Request, class Multi, bool P>
|
||||
Future<REPLY_TYPE(Request)> basicLoadBalance(Reference<ModelInterface<Multi>> alternatives,
|
||||
RequestStream<Request, P> Interface::*channel,
|
||||
Request request = Request(),
|
||||
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce atMostOnce = AtMostOnce::False) {
|
||||
AtMostOnce atMostOnce = AtMostOnce::False,
|
||||
int* alternativeChosen = nullptr) {
|
||||
ASSERT(alternativeChosen == nullptr || atMostOnce == AtMostOnce::True);
|
||||
setReplyPriority(request, taskID);
|
||||
if (!alternatives)
|
||||
return Never();
|
||||
|
@ -791,6 +797,9 @@ Future<REPLY_TYPE(Request)> basicLoadBalance(Reference<ModelInterface<Multi>> al
|
|||
useAlt = (nextAlt + alternatives->size() - 1) % alternatives->size();
|
||||
|
||||
stream = &alternatives->get(useAlt, channel);
|
||||
if (alternativeChosen != nullptr) {
|
||||
*alternativeChosen = useAlt;
|
||||
}
|
||||
if (!IFailureMonitor::failureMonitor().getState(stream->getEndpoint()).failed)
|
||||
break;
|
||||
nextAlt = (nextAlt + 1) % alternatives->size();
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/IdempotencyId.h"
|
||||
#include "fdbclient/IdempotencyId.actor.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
|
@ -1616,6 +1616,14 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
|||
self->toCommit.writeTypedMessage(idempotencyIdSet);
|
||||
});
|
||||
|
||||
for (const auto& m : pProxyCommitData->idempotencyClears) {
|
||||
auto& tags = pProxyCommitData->tagsForKey(m.param1);
|
||||
self->toCommit.addTags(tags);
|
||||
// TODO(nwijetunga): Encrypt these mutations
|
||||
self->toCommit.writeTypedMessage(m);
|
||||
}
|
||||
pProxyCommitData->idempotencyClears = Standalone<VectorRef<MutationRef>>();
|
||||
|
||||
self->toCommit.saveTags(self->writtenTags);
|
||||
|
||||
pProxyCommitData->stats.mutations += self->mutationCount;
|
||||
|
@ -1864,10 +1872,14 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
// Reset all to zero, used to track the correct index of each commitTransacitonRef on each resolver
|
||||
|
||||
std::fill(self->nextTr.begin(), self->nextTr.end(), 0);
|
||||
std::unordered_map<uint8_t, int16_t> idCountsForKey;
|
||||
for (int t = 0; t < self->trs.size(); t++) {
|
||||
auto& tr = self->trs[t];
|
||||
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || tr.isLockAware())) {
|
||||
ASSERT_WE_THINK(self->commitVersion != invalidVersion);
|
||||
if (self->trs[t].idempotencyId.valid()) {
|
||||
idCountsForKey[uint8_t(t >> 8)] += 1;
|
||||
}
|
||||
tr.reply.send(CommitID(self->commitVersion, t, self->metadataVersionAfter));
|
||||
} else if (self->committed[t] == ConflictBatch::TransactionTooOld) {
|
||||
tr.reply.sendError(transaction_too_old());
|
||||
|
@ -1914,6 +1926,11 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
}
|
||||
}
|
||||
|
||||
for (auto [highOrderBatchIndex, count] : idCountsForKey) {
|
||||
pProxyCommitData->expectedIdempotencyIdCountForKey.send(
|
||||
ExpectedIdempotencyIdCountForKey{ self->commitVersion, count, highOrderBatchIndex });
|
||||
}
|
||||
|
||||
++pProxyCommitData->stats.commitBatchOut;
|
||||
pProxyCommitData->stats.txnCommitOut += self->trs.size();
|
||||
pProxyCommitData->stats.txnConflicts += self->trs.size() - self->commitCount;
|
||||
|
@ -2469,6 +2486,96 @@ ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
|
|||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct ExpireServerEntry {
|
||||
int64_t timeReceived;
|
||||
int expectedCount = 0;
|
||||
int receivedCount = 0;
|
||||
bool initialized = false;
|
||||
};
|
||||
|
||||
struct IdempotencyKey {
|
||||
Version version;
|
||||
uint8_t highOrderBatchIndex;
|
||||
bool operator==(const IdempotencyKey& other) const {
|
||||
return version == other.version && highOrderBatchIndex == other.highOrderBatchIndex;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace std {
|
||||
template <>
|
||||
struct hash<IdempotencyKey> {
|
||||
std::size_t operator()(const IdempotencyKey& key) const {
|
||||
std::size_t seed = 0;
|
||||
boost::hash_combine(seed, std::hash<Version>{}(key.version));
|
||||
boost::hash_combine(seed, std::hash<uint8_t>{}(key.highOrderBatchIndex));
|
||||
return seed;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace std
|
||||
|
||||
ACTOR static Future<Void> idempotencyIdsExpireServer(
|
||||
Database db,
|
||||
PublicRequestStream<ExpireIdempotencyIdRequest> expireIdempotencyId,
|
||||
PromiseStream<ExpectedIdempotencyIdCountForKey> expectedIdempotencyIdCountForKey,
|
||||
Standalone<VectorRef<MutationRef>>* idempotencyClears) {
|
||||
state std::unordered_map<IdempotencyKey, ExpireServerEntry> idStatus;
|
||||
state std::unordered_map<IdempotencyKey, ExpireServerEntry>::iterator iter;
|
||||
state int64_t purgeBefore;
|
||||
state IdempotencyKey key;
|
||||
state ExpireServerEntry* status = nullptr;
|
||||
state Future<Void> purgeOld = Void();
|
||||
loop {
|
||||
choose {
|
||||
when(ExpireIdempotencyIdRequest req = waitNext(expireIdempotencyId.getFuture())) {
|
||||
key = IdempotencyKey{ req.commitVersion, req.batchIndexHighByte };
|
||||
status = &idStatus[key];
|
||||
status->receivedCount += 1;
|
||||
CODE_PROBE(status->expectedCount == 0, "ExpireIdempotencyIdRequest received before count is known");
|
||||
if (status->expectedCount > 0) {
|
||||
ASSERT_LE(status->receivedCount, status->expectedCount);
|
||||
}
|
||||
}
|
||||
when(ExpectedIdempotencyIdCountForKey req = waitNext(expectedIdempotencyIdCountForKey.getFuture())) {
|
||||
key = IdempotencyKey{ req.commitVersion, req.batchIndexHighByte };
|
||||
status = &idStatus[key];
|
||||
ASSERT_EQ(status->expectedCount, 0);
|
||||
status->expectedCount = req.idempotencyIdCount;
|
||||
}
|
||||
when(wait(purgeOld)) {
|
||||
purgeOld = delay(SERVER_KNOBS->IDEMPOTENCY_ID_IN_MEMORY_LIFETIME);
|
||||
purgeBefore = now() - SERVER_KNOBS->IDEMPOTENCY_ID_IN_MEMORY_LIFETIME;
|
||||
for (iter = idStatus.begin(); iter != idStatus.end();) {
|
||||
// We have exclusive access to idStatus in this when block, so iter will still be valid after the
|
||||
// wait
|
||||
wait(yield());
|
||||
if (iter->second.timeReceived < purgeBefore) {
|
||||
iter = idStatus.erase(iter);
|
||||
} else {
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (status->initialized) {
|
||||
if (status->receivedCount == status->expectedCount) {
|
||||
auto keyRange =
|
||||
makeIdempotencySingleKeyRange(idempotencyClears->arena(), key.version, key.highOrderBatchIndex);
|
||||
idempotencyClears->push_back(idempotencyClears->arena(),
|
||||
MutationRef(MutationRef::ClearRange, keyRange.begin, keyRange.end));
|
||||
idStatus.erase(key);
|
||||
}
|
||||
} else {
|
||||
status->timeReceived = now();
|
||||
status->initialized = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct TransactionStateResolveContext {
|
||||
|
@ -2733,6 +2840,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
addActor.send(rejoinServer(proxy, &commitData));
|
||||
addActor.send(ddMetricsRequestServer(proxy, db));
|
||||
addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost));
|
||||
addActor.send(idempotencyIdsExpireServer(openDBOnServer(db),
|
||||
proxy.expireIdempotencyId,
|
||||
commitData.expectedIdempotencyIdCountForKey,
|
||||
&commitData.idempotencyClears));
|
||||
|
||||
// wait for txnStateStore recovery
|
||||
wait(success(commitData.txnStateStore->readValue(StringRef())));
|
||||
|
|
|
@ -304,6 +304,7 @@ class DDTxnProcessorImpl {
|
|||
for (int i = 0; i < dms.size(); ++i) {
|
||||
auto dataMove = std::make_shared<DataMove>(decodeDataMoveValue(dms[i].value), true);
|
||||
const DataMoveMetaData& meta = dataMove->meta;
|
||||
ASSERT(!meta.ranges.empty());
|
||||
for (const UID& id : meta.src) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
|
@ -325,11 +326,11 @@ class DDTxnProcessorImpl {
|
|||
std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end());
|
||||
std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end());
|
||||
|
||||
auto ranges = result->dataMoveMap.intersectingRanges(meta.range);
|
||||
auto ranges = result->dataMoveMap.intersectingRanges(meta.ranges.front());
|
||||
for (auto& r : ranges) {
|
||||
ASSERT(!r.value()->valid);
|
||||
}
|
||||
result->dataMoveMap.insert(meta.range, std::move(dataMove));
|
||||
result->dataMoveMap.insert(meta.ranges.front(), std::move(dataMove));
|
||||
++numDataMoves;
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int pr
|
|||
return;
|
||||
}
|
||||
|
||||
ASSERT(this->meta.range.contains(range));
|
||||
ASSERT(!this->meta.ranges.empty() && this->meta.ranges.front().contains(range));
|
||||
|
||||
if (!shard.hasDest) {
|
||||
TraceEvent(SevError, "DataMoveValidationError")
|
||||
|
@ -494,17 +494,21 @@ public:
|
|||
|
||||
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
|
||||
const DataMoveMetaData& meta = it.value()->meta;
|
||||
if (meta.ranges.empty()) {
|
||||
TraceEvent(SevWarnAlways, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
|
||||
continue;
|
||||
}
|
||||
if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
|
||||
RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
|
||||
RelocateShard rs(meta.ranges.front(), DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
|
||||
rs.dataMoveId = meta.id;
|
||||
rs.cancelled = true;
|
||||
self->relocationProducer.send(rs);
|
||||
TraceEvent("DDInitScheduledCancelDataMove", self->ddId).detail("DataMove", meta.toString());
|
||||
} else if (it.value()->valid) {
|
||||
TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString());
|
||||
ASSERT(meta.range == it.range());
|
||||
ASSERT(meta.ranges.front() == it.range());
|
||||
// TODO: Persist priority in DataMoveMetaData.
|
||||
RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
|
||||
RelocateShard rs(meta.ranges.front(), DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
|
||||
rs.dataMoveId = meta.id;
|
||||
rs.dataMove = it.value();
|
||||
std::vector<ShardsAffectedByTeamFailure::Team> teams;
|
||||
|
|
|
@ -1662,3 +1662,43 @@ IDiskQueue* openDiskQueue(std::string basename,
|
|||
int64_t fileSizeWarningLimit) {
|
||||
return new DiskQueue_PopUncommitted(basename, ext, dbgid, dqv, fileSizeWarningLimit);
|
||||
}
|
||||
|
||||
TEST_CASE("performance/fdbserver/DiskQueue") {
|
||||
state IDiskQueue* queue =
|
||||
openDiskQueue("test-", "fdq", deterministicRandom()->randomUniqueID(), DiskQueueVersion::V2);
|
||||
state std::string valueString = std::string(10e6, '.');
|
||||
state StringRef valueStr((uint8_t*)valueString.c_str(), 10e6);
|
||||
state std::deque<IDiskQueue::location> locations;
|
||||
state int loopCount = 0;
|
||||
state Future<Void> lastCommit = Void();
|
||||
bool fullyRecovered = wait(queue->initializeRecovery(0));
|
||||
if (!fullyRecovered) {
|
||||
loop {
|
||||
Standalone<StringRef> h = wait(queue->readNext(1e6));
|
||||
if (h.size() < 1e6) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
while (loopCount < 4000) {
|
||||
if (loopCount % 100 == 0) {
|
||||
printf("loop count: %d\n", loopCount);
|
||||
}
|
||||
if (++loopCount % 2 == 0) {
|
||||
state IDiskQueue::location frontLocation = locations.front();
|
||||
locations.pop_front();
|
||||
if (locations.size() > 10) {
|
||||
Standalone<StringRef> r = wait(queue->read(frontLocation, locations.front(), CheckHashes::True));
|
||||
}
|
||||
queue->pop(frontLocation);
|
||||
}
|
||||
wait(delay(0.001));
|
||||
locations.push_back(queue->push(valueStr));
|
||||
Future<Void> prevCommit = lastCommit;
|
||||
lastCommit = queue->commit();
|
||||
wait(prevCommit);
|
||||
}
|
||||
queue->dispose();
|
||||
wait(queue->onClosed());
|
||||
return Void();
|
||||
}
|
|
@ -1287,7 +1287,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
TraceEvent(SevVerbose, "StartMoveShardsFoundDataMove", relocationIntervalId)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
.detail("DataMove", dataMove.toString());
|
||||
ASSERT(dataMove.range.begin == keys.begin);
|
||||
ASSERT(!dataMove.ranges.empty() && dataMove.ranges.front().begin == keys.begin);
|
||||
if (dataMove.getPhase() == DataMoveMetaData::Deleting) {
|
||||
TraceEvent(SevVerbose, "StartMoveShardsDataMove", relocationIntervalId)
|
||||
.detail("DataMoveBeingDeleted", dataMoveId);
|
||||
|
@ -1296,10 +1296,10 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
if (dataMove.getPhase() == DataMoveMetaData::Running) {
|
||||
TraceEvent(SevVerbose, "StartMoveShardsDataMove", relocationIntervalId)
|
||||
.detail("DataMoveAlreadyCommitted", dataMoveId);
|
||||
ASSERT(keys == dataMove.range);
|
||||
ASSERT(keys == dataMove.ranges.front());
|
||||
return Void();
|
||||
}
|
||||
begin = dataMove.range.end;
|
||||
begin = dataMove.ranges.front().end;
|
||||
} else {
|
||||
dataMove.id = dataMoveId;
|
||||
TraceEvent(SevVerbose, "StartMoveKeysNewDataMove", relocationIntervalId)
|
||||
|
@ -1441,7 +1441,8 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
&tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysValue(dataMoveId)));
|
||||
}
|
||||
|
||||
dataMove.range = KeyRangeRef(keys.begin, currentKeys.end);
|
||||
dataMove.ranges.clear();
|
||||
dataMove.ranges.push_back(KeyRangeRef(keys.begin, currentKeys.end));
|
||||
dataMove.dest.insert(servers.begin(), servers.end());
|
||||
}
|
||||
|
||||
|
@ -1471,7 +1472,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
.detail("DataMoveKey", dataMoveKeyFor(dataMoveId))
|
||||
.detail("CommitVersion", tr.getCommittedVersion())
|
||||
.detail("DeltaRange", currentKeys.toString())
|
||||
.detail("Range", dataMove.range.toString())
|
||||
.detail("Range", describe(dataMove.ranges))
|
||||
.detail("DataMove", dataMove.toString());
|
||||
|
||||
dataMove = DataMoveMetaData();
|
||||
|
@ -1628,7 +1629,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
throw data_move_cancelled();
|
||||
}
|
||||
ASSERT(dataMove.getPhase() == DataMoveMetaData::Running);
|
||||
range = dataMove.range;
|
||||
ASSERT(!dataMove.ranges.empty());
|
||||
range = dataMove.ranges.front();
|
||||
} else {
|
||||
TraceEvent(SevWarn, "FinishMoveShardsDataMoveDeleted", relocationIntervalId)
|
||||
.detail("DataMoveID", dataMoveId);
|
||||
|
@ -1766,7 +1768,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
|
||||
wait(waitForAll(actors));
|
||||
|
||||
if (range.end == dataMove.range.end) {
|
||||
if (range.end == dataMove.ranges.front().end) {
|
||||
tr.clear(dataMoveKeyFor(dataMoveId));
|
||||
complete = true;
|
||||
TraceEvent(SevVerbose, "FinishMoveShardsDeleteMetaData", dataMoveId)
|
||||
|
@ -1776,7 +1778,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
.detail("DataMoveID", dataMoveId)
|
||||
.detail("CurrentRange", range)
|
||||
.detail("NewDataMoveMetaData", dataMove.toString());
|
||||
dataMove.range = KeyRangeRef(range.end, dataMove.range.end);
|
||||
dataMove.ranges.front() = KeyRangeRef(range.end, dataMove.ranges.front().end);
|
||||
tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove));
|
||||
}
|
||||
|
||||
|
@ -2229,9 +2231,10 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
Optional<Value> val = wait(tr.get(dataMoveKeyFor(destId)));
|
||||
if (val.present()) {
|
||||
state DataMoveMetaData dataMove = decodeDataMoveValue(val.get());
|
||||
ASSERT(!dataMove.ranges.empty());
|
||||
TraceEvent(SevVerbose, "RemoveRangeFoundDataMove", serverID)
|
||||
.detail("DataMoveMetaData", dataMove.toString());
|
||||
if (range == dataMove.range) {
|
||||
if (range == dataMove.ranges.front()) {
|
||||
tr.clear(dataMoveKeyFor(destId));
|
||||
} else {
|
||||
dataMove.setPhase(DataMoveMetaData::Deleting);
|
||||
|
@ -2350,10 +2353,11 @@ ACTOR Future<Void> cleanUpDataMove(Database occ,
|
|||
Optional<Value> val = wait(tr.get(dataMoveKeyFor(dataMoveId)));
|
||||
if (val.present()) {
|
||||
dataMove = decodeDataMoveValue(val.get());
|
||||
ASSERT(!dataMove.ranges.empty());
|
||||
TraceEvent(SevVerbose, "CleanUpDataMoveMetaData", dataMoveId)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
.detail("DataMoveMetaData", dataMove.toString());
|
||||
range = dataMove.range;
|
||||
range = dataMove.ranges.front();
|
||||
ASSERT(!range.empty());
|
||||
} else {
|
||||
TraceEvent(SevDebug, "CleanUpDataMoveNotExist", dataMoveId).detail("DataMoveID", dataMoveId);
|
||||
|
@ -2419,14 +2423,14 @@ ACTOR Future<Void> cleanUpDataMove(Database occ,
|
|||
currentShards[i + 1].value);
|
||||
}
|
||||
|
||||
if (range.end == dataMove.range.end) {
|
||||
if (range.end == dataMove.ranges.front().end) {
|
||||
tr.clear(dataMoveKeyFor(dataMoveId));
|
||||
complete = true;
|
||||
TraceEvent(SevVerbose, "CleanUpDataMoveDeleteMetaData", dataMoveId)
|
||||
.detail("DataMoveID", dataMove.toString());
|
||||
|
||||
} else {
|
||||
dataMove.range = KeyRangeRef(range.end, dataMove.range.end);
|
||||
dataMove.ranges.front() = KeyRangeRef(range.end, dataMove.ranges.front().end);
|
||||
dataMove.setPhase(DataMoveMetaData::Deleting);
|
||||
tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove));
|
||||
TraceEvent(SevVerbose, "CleanUpDataMovePartial", dataMoveId)
|
||||
|
|
|
@ -753,6 +753,8 @@ ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
|
|||
.detail("InitialState", initialState.toString())
|
||||
.detail("CheckpointDir", dir);
|
||||
|
||||
ASSERT(!initialState.ranges.empty());
|
||||
|
||||
state std::shared_ptr<CheckpointMetaData> metaData = std::make_shared<CheckpointMetaData>(initialState);
|
||||
|
||||
if (metaData->format == RocksDBColumnFamily) {
|
||||
|
@ -771,7 +773,7 @@ ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
|
|||
} else if (metaData->format == RocksDB) {
|
||||
std::shared_ptr<rocksdb::SstFileWriter> writer =
|
||||
std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options());
|
||||
wait(fetchCheckpointRange(cx, metaData, metaData->range, dir, writer, cFun));
|
||||
wait(fetchCheckpointRange(cx, metaData, metaData->ranges.front(), dir, writer, cFun));
|
||||
}
|
||||
|
||||
return *metaData;
|
||||
|
|
|
@ -160,6 +160,16 @@ struct ProxyStats {
|
|||
}
|
||||
};
|
||||
|
||||
struct ExpectedIdempotencyIdCountForKey {
|
||||
Version commitVersion = invalidVersion;
|
||||
int16_t idempotencyIdCount = 0;
|
||||
uint8_t batchIndexHighByte = 0;
|
||||
|
||||
ExpectedIdempotencyIdCountForKey() {}
|
||||
ExpectedIdempotencyIdCountForKey(Version commitVersion, int16_t idempotencyIdCount, uint8_t batchIndexHighByte)
|
||||
: commitVersion(commitVersion), idempotencyIdCount(idempotencyIdCount), batchIndexHighByte(batchIndexHighByte) {}
|
||||
};
|
||||
|
||||
struct ProxyCommitData {
|
||||
UID dbgid;
|
||||
int64_t commitBatchesMemBytesCount;
|
||||
|
@ -226,6 +236,9 @@ struct ProxyCommitData {
|
|||
|
||||
bool isEncryptionEnabled = false;
|
||||
|
||||
PromiseStream<ExpectedIdempotencyIdCountForKey> expectedIdempotencyIdCountForKey;
|
||||
Standalone<VectorRef<MutationRef>> idempotencyClears;
|
||||
|
||||
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
|
||||
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
|
||||
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
|
||||
|
|
|
@ -2187,8 +2187,8 @@ ACTOR Future<Void> getCheckpointQ(StorageServer* self, GetCheckpointRequest req)
|
|||
std::unordered_map<UID, CheckpointMetaData>::iterator it = self->checkpoints.begin();
|
||||
for (; it != self->checkpoints.end(); ++it) {
|
||||
const CheckpointMetaData& md = it->second;
|
||||
if (md.version == req.version && md.format == req.format && md.range.contains(req.range) &&
|
||||
md.getState() == CheckpointMetaData::Complete) {
|
||||
if (md.version == req.version && md.format == req.format && !md.ranges.empty() &&
|
||||
md.ranges.front().contains(req.range) && md.getState() == CheckpointMetaData::Complete) {
|
||||
req.reply.send(md);
|
||||
TraceEvent(SevDebug, "ServeGetCheckpointEnd", self->thisServerID).detail("Checkpoint", md.toString());
|
||||
break;
|
||||
|
@ -8911,9 +8911,9 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
|
||||
ASSERT(metaData.ssID == data->thisServerID);
|
||||
ASSERT(metaData.ssID == data->thisServerID && !metaData.ranges.empty());
|
||||
const CheckpointRequest req(metaData.version,
|
||||
metaData.range,
|
||||
metaData.ranges.front(),
|
||||
static_cast<CheckpointFormat>(metaData.format),
|
||||
metaData.checkpointID,
|
||||
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());
|
||||
|
@ -9405,7 +9405,7 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
|||
// When a shard is moved out, delete all related checkpoints created for data move.
|
||||
if (!available) {
|
||||
for (auto& [id, checkpoint] : self->checkpoints) {
|
||||
if (checkpoint.range.intersects(keys)) {
|
||||
if (!checkpoint.ranges.empty() && checkpoint.ranges.front().intersects(keys)) {
|
||||
Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString());
|
||||
checkpoint.setState(CheckpointMetaData::Deleting);
|
||||
self->addMutationToMutationLog(
|
||||
|
|
|
@ -24,6 +24,20 @@
|
|||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
struct ValueType {
|
||||
static constexpr FileIdentifier file_identifier = 9556754;
|
||||
|
||||
Value idempotencyId;
|
||||
int64_t createdTime;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, idempotencyId, createdTime);
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
// This tests launches a bunch of transactions with idempotency ids (so they should be idempotent automatically). Each
|
||||
// transaction sets a version stamped key, and then we check that the right number of transactions were committed.
|
||||
// If a transaction commits multiple times or doesn't commit, that probably indicates a problem with
|
||||
|
@ -32,10 +46,14 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
|
|||
static constexpr auto NAME = "AutomaticIdempotencyCorrectness";
|
||||
int64_t numTransactions;
|
||||
Key keyPrefix;
|
||||
double automaticPercentage;
|
||||
|
||||
bool ok = true;
|
||||
|
||||
AutomaticIdempotencyWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
numTransactions = getOption(options, "numTransactions"_sr, 1000);
|
||||
keyPrefix = KeyRef(getOption(options, "keyPrefix"_sr, "automaticIdempotencyKeyPrefix"_sr));
|
||||
numTransactions = getOption(options, "numTransactions"_sr, 2500);
|
||||
keyPrefix = KeyRef(getOption(options, "keyPrefix"_sr, "/autoIdempotency/"_sr));
|
||||
automaticPercentage = getOption(options, "automaticPercentage"_sr, 0.1);
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
@ -52,12 +70,20 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
|
|||
TraceEvent("IdempotencyIdWorkloadTransaction").detail("Id", idempotencyId);
|
||||
wait(runRYWTransaction(
|
||||
cx, [self = self, idempotencyId = idempotencyId](Reference<ReadYourWritesTransaction> tr) {
|
||||
// If we don't set AUTOMATIC_IDEMPOTENCY the idempotency id won't automatically get cleaned up, so
|
||||
// it should create work for the cleaner.
|
||||
tr->setOption(FDBTransactionOptions::IDEMPOTENCY_ID, idempotencyId);
|
||||
if (deterministicRandom()->random01() < self->automaticPercentage) {
|
||||
// We also want to exercise the automatic idempotency code path.
|
||||
tr->setOption(FDBTransactionOptions::AUTOMATIC_IDEMPOTENCY);
|
||||
}
|
||||
uint32_t index = self->keyPrefix.size();
|
||||
Value suffix = makeString(14);
|
||||
memset(mutateString(suffix), 0, 10);
|
||||
memcpy(mutateString(suffix) + 10, &index, 4);
|
||||
tr->atomicOp(self->keyPrefix.withSuffix(suffix), idempotencyId, MutationRef::SetVersionstampedKey);
|
||||
tr->atomicOp(self->keyPrefix.withSuffix(suffix),
|
||||
ObjectWriter::toValue(ValueType{ idempotencyId, int64_t(now()) }, Unversioned()),
|
||||
MutationRef::SetVersionstampedKey);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
}
|
||||
|
@ -68,10 +94,46 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
|
|||
if (clientId != 0) {
|
||||
return true;
|
||||
}
|
||||
return runRYWTransaction(cx, [this](Reference<ReadYourWritesTransaction> tr) { return _check(this, tr); });
|
||||
return testAll(this, cx);
|
||||
}
|
||||
|
||||
ACTOR static Future<bool> _check(AutomaticIdempotencyWorkload* self, Reference<ReadYourWritesTransaction> tr) {
|
||||
ACTOR static Future<bool> testAll(AutomaticIdempotencyWorkload* self, Database db) {
|
||||
wait(runRYWTransaction(db,
|
||||
[=](Reference<ReadYourWritesTransaction> tr) { return logIdempotencyIds(self, tr); }));
|
||||
wait(runRYWTransaction(db, [=](Reference<ReadYourWritesTransaction> tr) { return testIdempotency(self, tr); }));
|
||||
return self->ok;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> logIdempotencyIds(AutomaticIdempotencyWorkload* self,
|
||||
Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
RangeResult result = wait(tr->getRange(idempotencyIdKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!result.more);
|
||||
for (const auto& [k, v] : result) {
|
||||
Version commitVersion;
|
||||
uint8_t highOrderBatchIndex;
|
||||
decodeIdempotencyKey(k, commitVersion, highOrderBatchIndex);
|
||||
BinaryReader valReader(v, IncludeVersion());
|
||||
int64_t timestamp; // ignored
|
||||
valReader >> timestamp;
|
||||
while (!valReader.empty()) {
|
||||
uint8_t length;
|
||||
valReader >> length;
|
||||
StringRef id{ reinterpret_cast<const uint8_t*>(valReader.readBytes(length)), length };
|
||||
uint8_t lowOrderBatchIndex;
|
||||
valReader >> lowOrderBatchIndex;
|
||||
TraceEvent("IdempotencyIdWorkloadIdCommitted")
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("HighOrderBatchIndex", highOrderBatchIndex)
|
||||
.detail("Id", id);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Check that each transaction committed exactly once.
|
||||
ACTOR static Future<Void> testIdempotency(AutomaticIdempotencyWorkload* self,
|
||||
Reference<ReadYourWritesTransaction> tr) {
|
||||
RangeResult result = wait(tr->getRange(prefixRange(self->keyPrefix), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!result.more);
|
||||
std::unordered_set<Value> ids;
|
||||
|
@ -79,13 +141,27 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
|
|||
for (const auto& [k, v] : result) {
|
||||
ids.emplace(v);
|
||||
}
|
||||
for (const auto& [k, rawValue] : result) {
|
||||
auto v = ObjectReader::fromStringRef<ValueType>(rawValue, Unversioned());
|
||||
BinaryReader reader(k, Unversioned());
|
||||
reader.readBytes(self->keyPrefix.size());
|
||||
Version commitVersion;
|
||||
reader >> commitVersion;
|
||||
commitVersion = bigEndian64(commitVersion);
|
||||
uint8_t highOrderBatchIndex;
|
||||
reader >> highOrderBatchIndex;
|
||||
TraceEvent("IdempotencyIdWorkloadTransactionCommitted")
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("HighOrderBatchIndex", highOrderBatchIndex)
|
||||
.detail("Key", k)
|
||||
.detail("Id", v.idempotencyId)
|
||||
.detail("CreatedTime", v.createdTime);
|
||||
}
|
||||
if (ids.size() != self->clientCount * self->numTransactions) {
|
||||
for (const auto& [k, v] : result) {
|
||||
TraceEvent("IdempotencyIdWorkloadTransactionCommitted").detail("Id", v);
|
||||
}
|
||||
self->ok = false;
|
||||
}
|
||||
ASSERT_EQ(ids.size(), self->clientCount * self->numTransactions);
|
||||
return true;
|
||||
return Void();
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
|
|
|
@ -334,7 +334,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
|
|||
dataMoveId,
|
||||
moveKeysLock,
|
||||
&self->cleanUpDataMoveParallelismLock,
|
||||
dataMove.range,
|
||||
dataMove.ranges.front(),
|
||||
&ddEnabledState));
|
||||
TraceEvent("TestCancelDataMoveEnd").detail("DataMove", dataMove.toString());
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ ERROR( tlog_stopped, 1011, "TLog stopped" )
|
|||
ERROR( server_request_queue_full, 1012, "Server request queue is full" )
|
||||
ERROR( not_committed, 1020, "Transaction not committed due to conflict with another transaction" )
|
||||
ERROR( commit_unknown_result, 1021, "Transaction may or may not have committed" )
|
||||
ERROR( commit_unknown_result_fatal, 1022, "Idempotency id for transaction may have expired, so the commit status of the transaction cannot be determined" )
|
||||
ERROR( transaction_cancelled, 1025, "Operation aborted because the transaction was cancelled" )
|
||||
ERROR( connection_failed, 1026, "Network connection failed" )
|
||||
ERROR( coordinators_changed, 1027, "Coordination servers have changed" )
|
||||
|
|
|
@ -788,7 +788,7 @@ public:
|
|||
T const& get() const { return sav->get(); }
|
||||
T getValue() const { return get(); }
|
||||
|
||||
bool isValid() const { return sav != nullptr; }
|
||||
bool isValid() const { return sav != 0; }
|
||||
bool isReady() const { return sav->isSet(); }
|
||||
bool isError() const { return sav->isError(); }
|
||||
// returns true if get can be called on this future (counterpart of canBeSet on Promises)
|
||||
|
@ -798,12 +798,16 @@ public:
|
|||
return sav->error_state;
|
||||
}
|
||||
|
||||
Future() : sav(nullptr) {}
|
||||
Future() : sav(0) {}
|
||||
Future(const Future<T>& rhs) : sav(rhs.sav) {
|
||||
if (sav)
|
||||
sav->addFutureRef();
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future copied for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) {
|
||||
rhs.sav = 0;
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future moved for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) { rhs.sav = nullptr; }
|
||||
Future(const T& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(presentValue); }
|
||||
Future(T&& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(std::move(presentValue)); }
|
||||
Future(Never) : sav(new SAV<T>(1, 0)) { sav->send(Never()); }
|
||||
|
@ -815,6 +819,7 @@ public:
|
|||
#endif
|
||||
|
||||
~Future() {
|
||||
// if (sav && sav->endpoint.isValid()) std::cout << "Future destroyed for " << sav->endpoint.key << std::endl;
|
||||
if (sav)
|
||||
sav->delFutureRef();
|
||||
}
|
||||
|
@ -830,7 +835,7 @@ public:
|
|||
if (sav)
|
||||
sav->delFutureRef();
|
||||
sav = rhs.sav;
|
||||
rhs.sav = nullptr;
|
||||
rhs.sav = 0;
|
||||
}
|
||||
}
|
||||
bool operator==(const Future& rhs) { return rhs.sav == sav; }
|
||||
|
@ -843,23 +848,25 @@ public:
|
|||
|
||||
void addCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackAndDelFutureRef(cb);
|
||||
sav = nullptr;
|
||||
sav = 0;
|
||||
}
|
||||
|
||||
void addYieldedCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addYieldedCallbackAndDelFutureRef(cb);
|
||||
sav = nullptr;
|
||||
sav = 0;
|
||||
}
|
||||
|
||||
void addCallbackChainAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackChainAndDelFutureRef(cb);
|
||||
sav = nullptr;
|
||||
sav = 0;
|
||||
}
|
||||
|
||||
int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
|
||||
int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
|
||||
|
||||
explicit Future(SAV<T>* sav) : sav(sav) {}
|
||||
explicit Future(SAV<T>* sav) : sav(sav) {
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future created for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
|
||||
private:
|
||||
SAV<T>* sav;
|
||||
|
|
|
@ -6,5 +6,5 @@ startDelay = 0
|
|||
|
||||
[[test.workload]]
|
||||
testName = 'UnitTests'
|
||||
maxTestCases = 0
|
||||
maxTestCases = 1
|
||||
testsMatching = '/'
|
||||
|
|
Loading…
Reference in New Issue