Merge branch 'main' into anoyes/idempotency-status-json

This commit is contained in:
Andrew Noyes 2022-12-14 17:15:22 -08:00
commit fd98e6f474
15 changed files with 401 additions and 31 deletions

View File

@ -224,7 +224,11 @@ if(NOT WIN32)
# Make sure that fdb_c.h is compatible with c90
add_executable(fdb_c90_test test/fdb_c90_test.c)
set_property(TARGET fdb_c90_test PROPERTY C_STANDARD 90)
target_compile_options(fdb_c90_test PRIVATE -Wall -Wextra -Wpedantic -Werror)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
target_compile_options(fdb_c90_test PRIVATE -Wall -Wextra -Wpedantic -Wno-gnu-line-marker -Werror)
else ()
target_compile_options(fdb_c90_test PRIVATE -Wall -Wextra -Wpedantic -Werror)
endif ()
target_link_libraries(fdb_c90_test PRIVATE fdb_c)
endif()

View File

@ -541,9 +541,11 @@ void initHelp() {
"Fetch the current read version",
"Displays the current read version of the database or currently running transaction.");
helpMap["quota"] = CommandHelp("quota",
"quota [get <tag> [reserved_throughput|total_throughput] | set <tag> "
"[reserved_throughput|total_throughput] <value> | clear <tag>]",
"Get, modify, or clear the throughput quota for the specified tag.");
"quota [get <tag> [reserved_throughput|total_throughput|storage] | "
"set <tag> [reserved_throughput|total_throughput|storage] <value> | "
"clear <tag>]",
"Get, modify, or clear the reserved/total throughput quota (in bytes/s) or "
"storage quota (in bytes) for the specified tag.");
helpMap["reset"] =
CommandHelp("reset",
"reset the current transaction",

View File

@ -21,6 +21,7 @@
#include "fdbclient/IdempotencyId.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "flow/BooleanParam.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // this has to be the last include
@ -204,14 +205,17 @@ void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrder
reader >> highOrderBatchIndex;
}
// Find the youngest or oldest idempotency id key in `range` (depending on `reverse`)
FDB_BOOLEAN_PARAM(Oldest);
// Find the youngest or oldest idempotency id key in `range` (depending on `oldest`)
// Write the timestamp to `*time` and the version to `*version` when non-null.
ACTOR static Future<Optional<Key>> getBoundary(Reference<ReadYourWritesTransaction> tr,
KeyRange range,
Reverse reverse,
Oldest oldest,
Version* version,
int64_t* time) {
RangeResult result = wait(tr->getRange(range, /*limit*/ 1, Snapshot::False, reverse));
RangeResult result =
wait(tr->getRange(range, /*limit*/ 1, Snapshot::False, oldest ? Reverse::False : Reverse::True));
if (!result.size()) {
return Optional<Key>();
}
@ -263,4 +267,106 @@ ACTOR Future<JsonBuilderObject> getIdmpKeyStatus(Database db) {
wait(tr->onError(e));
}
}
}
}
ACTOR Future<Void> cleanIdempotencyIds(Database db, double minAgeSeconds) {
state int64_t idmpKeySize;
state int64_t candidateDeleteSize;
state KeyRange finalRange;
state Reference<ReadYourWritesTransaction> tr;
// Only assigned to once
state Key oldestKey;
state Version oldestVersion;
state int64_t oldestTime;
// Assigned to multiple times looking for a suitable range
state Version candidateDeleteVersion;
state int64_t candidateDeleteTime;
state KeyRange candidateRangeToClean;
tr = makeReference<ReadYourWritesTransaction>(db);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Check if any keys are older than minAgeSeconds
Optional<Key> oldestKey_ =
wait(getBoundary(tr, idempotencyIdKeys, Oldest::True, &oldestVersion, &oldestTime));
if (!oldestKey_.present()) {
break;
}
oldestKey = oldestKey_.get();
if (int64_t(now()) - oldestTime < minAgeSeconds) {
break;
}
// Only used for a trace event
wait(store(idmpKeySize, tr->getEstimatedRangeSizeBytes(idempotencyIdKeys)));
// Get the version of the most recent idempotency ID
wait(success(
getBoundary(tr, idempotencyIdKeys, Oldest::False, &candidateDeleteVersion, &candidateDeleteTime)));
// Keep dividing the candidate range until clearing it would not delete something younger than
// minAgeSeconds
loop {
candidateRangeToClean =
KeyRangeRef(oldestKey,
BinaryWriter::toValue(bigEndian64(candidateDeleteVersion + 1), Unversioned())
.withPrefix(idempotencyIdKeys.begin));
// We know that we're okay deleting oldestVersion at this point. Go ahead and do that.
if (oldestVersion == candidateDeleteVersion) {
break;
}
// Find the youngest key in candidate range
wait(success(getBoundary(
tr, candidateRangeToClean, Oldest::False, &candidateDeleteVersion, &candidateDeleteTime)));
// Update the range so that it ends at an idempotency id key. Since we're binary searching, the
// candidate range was probably too large before.
candidateRangeToClean =
KeyRangeRef(oldestKey,
BinaryWriter::toValue(bigEndian64(candidateDeleteVersion + 1), Unversioned())
.withPrefix(idempotencyIdKeys.begin));
wait(store(candidateDeleteSize, tr->getEstimatedRangeSizeBytes(candidateRangeToClean)));
int64_t youngestAge = int64_t(now()) - candidateDeleteTime;
TraceEvent("IdempotencyIdsCleanerCandidateDelete")
.detail("Range", candidateRangeToClean.toString())
.detail("IdmpKeySizeEstimate", idmpKeySize)
.detail("YoungestIdAge", youngestAge)
.detail("MinAgeSeconds", minAgeSeconds)
.detail("ClearRangeSizeEstimate", candidateDeleteSize);
if (youngestAge > minAgeSeconds) {
break;
}
candidateDeleteVersion = (oldestVersion + candidateDeleteVersion) / 2;
}
finalRange = KeyRangeRef(idempotencyIdKeys.begin, candidateRangeToClean.end);
if (!finalRange.empty()) {
tr->addReadConflictRange(finalRange);
tr->clear(finalRange);
tr->set(idempotencyIdsExpiredVersion,
ObjectWriter::toValue(IdempotencyIdsExpiredVersion{ candidateDeleteVersion }, Unversioned()));
TraceEvent("IdempotencyIdsCleanerAttempt")
.detail("Range", finalRange.toString())
.detail("IdmpKeySizeEstimate", idmpKeySize)
.detail("ClearRangeSizeEstimate", candidateDeleteSize)
.detail("ExpiredVersion", candidateDeleteVersion)
.detail("ExpiredVersionAgeEstimate", static_cast<int64_t>(now()) - candidateDeleteTime);
wait(tr->commit());
}
break;
} catch (Error& e) {
TraceEvent("IdempotencyIdsCleanerError").error(e);
wait(tr->onError(e));
}
}
return Void();
}

View File

@ -1062,6 +1062,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Drop in-memory state associated with an idempotency id after this many seconds. Once dropped, this id cannot be
// expired proactively, but will eventually get cleaned up by the idempotency id cleaner.
init( IDEMPOTENCY_ID_IN_MEMORY_LIFETIME, 10);
// Attempt to clean old idempotency ids automatically this often
init( IDEMPOTENCY_IDS_CLEANER_POLLING_INTERVAL, 10);
// Don't clean idempotency ids younger than this
init( IDEMPOTENCY_IDS_MIN_AGE_SECONDS, 3600 * 24 * 7);
// clang-format on

View File

@ -44,6 +44,8 @@ struct CommitResult {
// The type of the value stored at the key |idempotencyIdsExpiredVersion|
struct IdempotencyIdsExpiredVersion {
static constexpr auto file_identifier = 3746945;
// Any version at or below expired might have had its idempotency id expired. Any version greater than `expired`
// definitely has not had it's idempotency id expired.
Version expired = 0;
int64_t expiredTime = 0;
@ -187,7 +189,15 @@ KeyRangeRef makeIdempotencySingleKeyRange(Arena& arena, Version version, uint8_t
void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrderBatchIndex);
ACTOR Future<JsonBuilderObject> getIdmpKeyStatus(Database db);
// Delete zero or more idempotency ids older than minAgeSeconds
//
// Normally idempotency ids are deleted as part of the normal commit process, so this only needs to clean ids that
// leaked during a failure scenario. The rate of leaked idempotency ids should be low. The rate is zero during normal
// operation, and proportional to the number of in-flight transactions during a failure scenario.
ACTOR Future<Void> cleanIdempotencyIds(Database db, double minAgeSeconds);
#include "flow/unactorcompiler.h"
#endif

View File

@ -1018,6 +1018,8 @@ public:
// Idempotency ids
double IDEMPOTENCY_ID_IN_MEMORY_LIFETIME;
double IDEMPOTENCY_IDS_CLEANER_POLLING_INTERVAL;
double IDEMPOTENCY_IDS_MIN_AGE_SECONDS;
ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
void initialize(Randomize, ClientKnobs*, IsSimulated);

View File

@ -259,16 +259,23 @@ void LatencySample::logSample() {
std::string port_str = std::to_string(addr.port);
switch (model) {
case MetricsDataModel::OTLP: {
if (metrics->histMap.find(IMetric::id) != metrics->histMap.end()) {
metrics->histMap[IMetric::id].points.emplace_back(
sketch.getErrorGuarantee(), sketch.getSamples(), sketch.min(), sketch.max(), sketch.getSum());
} else {
metrics->histMap[IMetric::id] = OTEL::OTELHistogram(
name, sketch.getErrorGuarantee(), sketch.getSamples(), sketch.min(), sketch.max(), sketch.getSum());
// We only want to emit the entire DDSketch if the knob is set
if (FLOW_KNOBS->METRICS_EMIT_DDSKETCH) {
if (metrics->histMap.find(IMetric::id) != metrics->histMap.end()) {
metrics->histMap[IMetric::id].points.emplace_back(
sketch.getErrorGuarantee(), sketch.getSamples(), sketch.min(), sketch.max(), sketch.getSum());
} else {
metrics->histMap[IMetric::id] = OTEL::OTELHistogram(name,
sketch.getErrorGuarantee(),
sketch.getSamples(),
sketch.min(),
sketch.max(),
sketch.getSum());
}
metrics->histMap[IMetric::id].points.back().addAttribute("ip", ip_str);
metrics->histMap[IMetric::id].points.back().addAttribute("port", port_str);
metrics->histMap[IMetric::id].points.back().startTime = sampleEmit;
}
metrics->histMap[IMetric::id].points.back().addAttribute("ip", ip_str);
metrics->histMap[IMetric::id].points.back().addAttribute("port", port_str);
metrics->histMap[IMetric::id].points.back().startTime = sampleEmit;
createOtelGauge(p50id, name + "p50", p50);
createOtelGauge(p90id, name + "p90", p90);
createOtelGauge(p95id, name + "p95", p95);

View File

@ -2940,10 +2940,19 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(ddMetricsRequestServer(proxy, db));
addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost));
addActor.send(idempotencyIdsExpireServer(openDBOnServer(db),
proxy.expireIdempotencyId,
commitData.expectedIdempotencyIdCountForKey,
&commitData.idempotencyClears));
auto openDb = openDBOnServer(db);
if (firstProxy) {
addActor.send(recurringAsync(
[openDb = openDb]() { return cleanIdempotencyIds(openDb, SERVER_KNOBS->IDEMPOTENCY_IDS_MIN_AGE_SECONDS); },
SERVER_KNOBS->IDEMPOTENCY_IDS_CLEANER_POLLING_INTERVAL,
true,
SERVER_KNOBS->IDEMPOTENCY_IDS_CLEANER_POLLING_INTERVAL));
}
addActor.send(idempotencyIdsExpireServer(
openDb, proxy.expireIdempotencyId, commitData.expectedIdempotencyIdCountForKey, &commitData.idempotencyClears));
if (SERVER_KNOBS->STORAGE_QUOTA_ENABLED) {
addActor.send(monitorTenantsOverStorageQuota(proxy.id(), db, &commitData));
}

View File

@ -2751,10 +2751,35 @@ static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChan
}
}
// The normal read case for a change feed stream query is that it will first read the disk portion, which is at a lower
// version than the memory portion, and then will effectively switch to reading only the memory portion. The complexity
// lies in the fact that the feed does not know the switchover point ahead of time before reading from disk, and the
// switchover point is constantly changing as the SS persists the in-memory data to disk. As a result, the
// implementation first reads from memory, then reads from disk if necessary, then merges the result and potentially
// discards the in-memory read data if the disk data is large and behind the in-memory data. The goal of
// FeedDiskReadState is that we want to skip doing the full memory read if we still have a lot of disk reads to catch up
// on. In the DISK_CATCHUP phase, the feed query will read only the first row from memory, to
// determine if it's hit the switchover point, instead of reading (potentially) both in the normal phase. We also want
// to default to the normal behavior at the start in case there is not a lot of disk data. This guarantees that if we
// somehow incorrectly went into DISK_CATCHUP when there wasn't much more data on disk, we only have one cycle of
// getChangeFeedMutations in the incorrect mode that returns a smaller result before switching to NORMAL mode.
//
// Put another way, the state transitions are:
//
// STARTING ->
// DISK_CATCHUP (if after the first read, there is more disk data to read before the first memory data)
// NORMAL (otherwise)
// DISK_CATCHUP ->
// still DISK_CATCHUP (if there is still more disk data to read before the first memory data)
// NORMAL (otherwise)
// NORMAL -> NORMAL (always)
enum FeedDiskReadState { STARTING, NORMAL, DISK_CATCHUP };
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
ChangeFeedStreamRequest req,
bool inverted,
bool atLatest) {
bool atLatest,
FeedDiskReadState* feedDiskReadState) {
state ChangeFeedStreamReply reply;
state ChangeFeedStreamReply memoryReply;
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
@ -2823,9 +2848,15 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (req.end > emptyVersion + 1) {
auto it = searchChangeFeedStart(feedInfo->mutations, req.begin, atLatest);
while (it != feedInfo->mutations.end()) {
// If DISK_CATCHUP, only read 1 mutation from the memory queue
if (it->version >= req.end || it->version > dequeVersion || remainingLimitBytes <= 0) {
break;
}
if ((*feedDiskReadState) == FeedDiskReadState::DISK_CATCHUP && !memoryReply.mutations.empty()) {
// so we don't add an empty mutation at the end
remainingLimitBytes = -1;
break;
}
MutationsAndVersionRef m = *it;
if (doFilterMutations) {
m = filterMutations(memoryReply.arena, *it, req.range, inverted);
@ -2980,6 +3011,28 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
lastVersion = version;
lastKnownCommitted = knownCommittedVersion;
}
if ((*feedDiskReadState) == FeedDiskReadState::STARTING ||
(*feedDiskReadState) == FeedDiskReadState::DISK_CATCHUP) {
if (!memoryReply.mutations.empty() && !reply.mutations.empty() &&
reply.mutations.back().version < memoryReply.mutations.front().version && remainingDurableBytes <= 0) {
// if we read a full batch from disk and the entire disk read was still less than the first memory
// mutation, switch to disk_catchup mode
*feedDiskReadState = FeedDiskReadState::DISK_CATCHUP;
CODE_PROBE(true, "Feed switching to disk_catchup mode");
} else {
// for testing
if ((*feedDiskReadState) == FeedDiskReadState::STARTING && BUGGIFY_WITH_PROB(0.001)) {
*feedDiskReadState = FeedDiskReadState::DISK_CATCHUP;
CODE_PROBE(true, "Feed forcing disk_catchup mode");
} else {
// else switch to normal mode
CODE_PROBE(true, "Feed switching to normal mode");
*feedDiskReadState = FeedDiskReadState::NORMAL;
}
}
}
if (remainingDurableBytes > 0) {
reply.arena.dependsOn(memoryReply.arena);
auto it = memoryReply.mutations.begin();
@ -3001,6 +3054,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
} else {
reply = memoryReply;
*feedDiskReadState = FeedDiskReadState::NORMAL;
}
bool gotAll = remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion;
@ -3159,6 +3213,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
state Span span("SS:getChangeFeedStream"_loc, req.spanContext);
state bool atLatest = false;
state bool removeUID = false;
state FeedDiskReadState feedDiskReadState = STARTING;
state Optional<Version> blockedVersion;
try {
@ -3244,7 +3299,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
// keep this as not state variable so it is freed after sending to reduce memory
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
getChangeFeedMutations(data, req, false, atLatest);
getChangeFeedMutations(data, req, false, atLatest, &feedDiskReadState);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;

View File

@ -46,14 +46,19 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
static constexpr auto NAME = "AutomaticIdempotencyCorrectness";
int64_t numTransactions;
Key keyPrefix;
int64_t minMinAgeSeconds;
double automaticPercentage;
constexpr static double slop = 2.0;
double pollingInterval;
bool ok = true;
AutomaticIdempotencyWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
numTransactions = getOption(options, "numTransactions"_sr, 2500);
keyPrefix = KeyRef(getOption(options, "keyPrefix"_sr, "/autoIdempotency/"_sr));
minMinAgeSeconds = getOption(options, "minMinAgeSeconds"_sr, 15);
automaticPercentage = getOption(options, "automaticPercentage"_sr, 0.1);
pollingInterval = getOption(options, "pollingInterval"_sr, 5.0);
}
Future<Void> setup(Database const& cx) override { return Void(); }
@ -101,6 +106,7 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
wait(runRYWTransaction(db,
[=](Reference<ReadYourWritesTransaction> tr) { return logIdempotencyIds(self, tr); }));
wait(runRYWTransaction(db, [=](Reference<ReadYourWritesTransaction> tr) { return testIdempotency(self, tr); }));
wait(testCleaner(self, db));
return self->ok;
}
@ -164,6 +170,167 @@ struct AutomaticIdempotencyWorkload : TestWorkload {
return Void();
}
ACTOR static Future<int64_t> getOldestCreatedTime(AutomaticIdempotencyWorkload* self, Database db) {
state ReadYourWritesTransaction tr(db);
state RangeResult result;
state Key key;
state Version commitVersion;
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
wait(store(result, tr.getRange(idempotencyIdKeys, /*limit*/ 1)));
if (result.empty()) {
TraceEvent("AutomaticIdempotencyNoIdsLeft").log();
return -1;
}
for (const auto& [k, v] : result) {
uint8_t highOrderBatchIndex;
decodeIdempotencyKey(k, commitVersion, highOrderBatchIndex);
// Decode the first idempotency id in the value
BinaryReader valReader(v.begin(), v.size(), IncludeVersion());
int64_t timeStamp; // ignored
valReader >> timeStamp;
uint8_t length;
valReader >> length;
StringRef id{ reinterpret_cast<const uint8_t*>(valReader.readBytes(length)), length };
uint8_t lowOrderBatchIndex;
valReader >> lowOrderBatchIndex;
// Recover the key written in the transaction associated with this idempotency id
BinaryWriter keyWriter(Unversioned());
keyWriter.serializeBytes(self->keyPrefix);
keyWriter.serializeBinaryItem(bigEndian64(commitVersion));
keyWriter.serializeBinaryItem(highOrderBatchIndex);
keyWriter.serializeBinaryItem(lowOrderBatchIndex);
key = keyWriter.toValue();
// We need to use a different transaction because we set READ_SYSTEM_KEYS on this one, and we might
// be using a tenant.
Optional<Value> entry = wait(runRYWTransaction(
db, [key = key](Reference<ReadYourWritesTransaction> tr) { return tr->get(key); }));
if (!entry.present()) {
TraceEvent(SevError, "AutomaticIdempotencyKeyMissing")
.detail("Key", key)
.detail("CommitVersion", commitVersion)
.detail("ReadVersion", tr.getReadVersion().get());
}
ASSERT(entry.present());
auto e = ObjectReader::fromStringRef<ValueType>(entry.get(), Unversioned());
return e.createdTime;
}
ASSERT(false);
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<bool> testCleanerOneIteration(AutomaticIdempotencyWorkload* self,
Database db,
ActorCollection* actors,
int64_t minAgeSeconds,
const std::vector<int64_t>* createdTimes) {
state Future<Void> cleaner = recurringAsync(
[db = db, minAgeSeconds = minAgeSeconds]() { return cleanIdempotencyIds(db, minAgeSeconds); },
self->pollingInterval,
true,
self->pollingInterval);
state int64_t oldestCreatedTime;
state int64_t successes = 0;
actors->add(cleaner);
loop {
// Oldest created time of a transaction from the workload which still has an idempotency id
wait(store(oldestCreatedTime, getOldestCreatedTime(self, db)));
if (oldestCreatedTime == -1) {
return true; // Test can't make meaningful progress anymore
}
// oldestCreatedTime could seem too high if there's a large gap in the age
// of entries, so account for this by making oldestCreatedTime one more than
// the youngest entry that actually got deleted.
auto iter = std::lower_bound(createdTimes->begin(), createdTimes->end(), oldestCreatedTime);
if (iter != createdTimes->begin()) {
--iter;
oldestCreatedTime = *iter + 1;
}
auto maxActualAge = int64_t(now()) - oldestCreatedTime;
if (maxActualAge > minAgeSeconds * self->slop) {
CODE_PROBE(true, "Idempotency cleaner more to clean");
TraceEvent("AutomaticIdempotencyCleanerMoreToClean")
.detail("MaxActualAge", maxActualAge)
.detail("MinAgePolicy", minAgeSeconds);
successes = 0;
// Cleaning should happen eventually
} else if (maxActualAge < minAgeSeconds / self->slop) {
TraceEvent(SevError, "AutomaticIdempotencyCleanedTooMuch")
.detail("MaxActualAge", maxActualAge)
.detail("MinAgePolicy", minAgeSeconds);
self->ok = false;
ASSERT(false);
} else {
++successes;
TraceEvent("AutomaticIdempotencyCleanerSuccess")
.detail("MaxActualAge", maxActualAge)
.detail("MinAgePolicy", minAgeSeconds)
.detail("Successes", successes);
if (successes >= 10) {
break;
}
}
wait(delay(self->pollingInterval));
}
cleaner.cancel();
return false;
}
ACTOR static Future<std::vector<int64_t>> getCreatedTimes(AutomaticIdempotencyWorkload* self,
Reference<ReadYourWritesTransaction> tr) {
RangeResult result = wait(tr->getRange(prefixRange(self->keyPrefix), CLIENT_KNOBS->TOO_MANY));
ASSERT(!result.more);
std::vector<int64_t> createdTimes;
for (const auto& [k, v] : result) {
auto e = ObjectReader::fromStringRef<ValueType>(v, Unversioned());
createdTimes.emplace_back(e.createdTime);
}
std::sort(createdTimes.begin(), createdTimes.end());
return createdTimes;
}
// Check that min age is respected. Also test that we can tolerate concurrent cleaners.
ACTOR static Future<Void> testCleaner(AutomaticIdempotencyWorkload* self, Database db) {
state ActorCollection actors;
state int64_t minAgeSeconds;
state std::vector<int64_t> createdTimes;
// Initialize minAgeSeconds to match the current status
wait(store(minAgeSeconds, fmap([](int64_t t) { return int64_t(now()) - t; }, getOldestCreatedTime(self, db))) &&
store(createdTimes, runRYWTransaction(db, [self = self](Reference<ReadYourWritesTransaction> tr) {
return getCreatedTimes(self, tr);
})));
// Slowly and somewhat randomly allow the cleaner to do more cleaning. Observe that it cleans some, but not too
// much.
loop {
minAgeSeconds *= 1 / (self->slop * 2);
if (minAgeSeconds < self->minMinAgeSeconds) {
break;
}
choose {
when(bool done = wait(testCleanerOneIteration(self, db, &actors, minAgeSeconds, &createdTimes))) {
if (done) {
break;
}
}
when(wait(actors.getResult())) {
ASSERT(false);
}
}
}
return Void();
}
void getMetrics(std::vector<PerfMetric>& m) override {}
};

View File

@ -95,6 +95,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( STATSD_UDP_EMISSION_PORT, 8125 );
init( OTEL_UDP_EMISSION_ADDR, "127.0.0.1");
init( OTEL_UDP_EMISSION_PORT, 8903 );
init( METRICS_EMIT_DDSKETCH, false ); // Determines if DDSketch buckets will get emitted
//connectionMonitor
init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0;

View File

@ -222,7 +222,7 @@ LoadedTLSConfig TLSConfig::loadSync() const {
try {
loaded.tlsCertBytes = readFileBytes(certPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
throw;
}
} else {
@ -234,7 +234,7 @@ LoadedTLSConfig TLSConfig::loadSync() const {
try {
loaded.tlsKeyBytes = readFileBytes(keyPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
throw;
}
} else {
@ -246,7 +246,7 @@ LoadedTLSConfig TLSConfig::loadSync() const {
try {
loaded.tlsCABytes = readFileBytes(CAPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS CA [%s]: %s\n", CAPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS CA [%s]: %s\n", CAPath.c_str(), e.what());
throw;
}
} else {
@ -315,13 +315,13 @@ ACTOR Future<LoadedTLSConfig> TLSConfig::loadAsync(const TLSConfig* self) {
wait(waitForAll(reads));
} catch (Error& e) {
if (certIdx != -1 && reads[certIdx].isError()) {
fprintf(stderr, "Failure reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
} else if (keyIdx != -1 && reads[keyIdx].isError()) {
fprintf(stderr, "Failure reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
} else if (caIdx != -1 && reads[caIdx].isError()) {
fprintf(stderr, "Failure reading TLS Key [%s]: %s\n", CAPath.c_str(), e.what());
fprintf(stderr, "Warning: Error reading TLS Key [%s]: %s\n", CAPath.c_str(), e.what());
} else {
fprintf(stderr, "Failure reading TLS needed file: %s\n", e.what());
fprintf(stderr, "Warning: Error reading TLS needed file: %s\n", e.what());
}
throw;

View File

@ -152,6 +152,7 @@ public:
std::string OTEL_UDP_EMISSION_ADDR;
int STATSD_UDP_EMISSION_PORT;
int OTEL_UDP_EMISSION_PORT;
bool METRICS_EMIT_DDSKETCH;
// run loop profiling
double RUN_LOOP_PROFILING_INTERVAL;

View File

@ -66,7 +66,7 @@ enum DataPointFlags { FLAG_NONE = 0, FLAG_NO_RECORDED_VALUE };
class NumberDataPoint {
public:
double startTime; // 9 bytes in msgpack
double startTime = -1; // 9 bytes in msgpack
double recordTime; // 9 bytes in msgpack
std::vector<Attribute> attributes; // Variable size: assume to be 23 bytes
std::variant<int64_t, double> val; // 9 bytes in msgpack

View File

@ -3,6 +3,8 @@ testTitle = 'AutomaticIdempotency'
[[test.workload]]
testName = 'AutomaticIdempotencyCorrectness'
minMinAgeSeconds = 15
pollingInterval = 5.0
[[test.workload]]
testName='Attrition'