Merge branch 'main' of https://github.com/apple/foundationdb into feature/main/wiggleDelay

This commit is contained in:
Xiaoxi Wang 2022-12-11 14:27:22 -08:00
commit c12de23824
126 changed files with 1829 additions and 531 deletions

View File

@ -169,7 +169,6 @@ typedef struct mappedkeyvalue {
* take the shortcut. */
FDBGetRangeReqAndResult getRange;
unsigned char buffer[32];
fdb_bool_t boundaryAndExist;
} FDBMappedKeyValue;
#pragma pack(push, 4)

View File

@ -89,9 +89,9 @@ public:
};
class alignas(64) ThreadStatistics {
uint64_t conflicts;
uint64_t total_errors;
uint64_t total_timeouts;
uint64_t conflicts{ 0 };
uint64_t total_errors{ 0 };
uint64_t total_timeouts{ 0 };
std::array<uint64_t, MAX_OP> ops;
std::array<uint64_t, MAX_OP> errors;
std::array<uint64_t, MAX_OP> timeouts;
@ -101,7 +101,11 @@ class alignas(64) ThreadStatistics {
public:
ThreadStatistics() noexcept {
memset(this, 0, sizeof(ThreadStatistics));
std::fill(ops.begin(), ops.end(), 0);
std::fill(errors.begin(), errors.end(), 0);
std::fill(timeouts.begin(), timeouts.end(), 0);
std::fill(latency_samples.begin(), latency_samples.end(), 0);
std::fill(latency_us_total.begin(), latency_us_total.end(), 0);
sketches.resize(MAX_OP);
}

View File

@ -182,17 +182,14 @@ struct GetMappedRangeResult {
const std::string& value,
const std::string& begin,
const std::string& end,
const std::vector<std::pair<std::string, std::string>>& range_results,
fdb_bool_t boundaryAndExist)
: key(key), value(value), begin(begin), end(end), range_results(range_results),
boundaryAndExist(boundaryAndExist) {}
const std::vector<std::pair<std::string, std::string>>& range_results)
: key(key), value(value), begin(begin), end(end), range_results(range_results) {}
std::string key;
std::string value;
std::string begin;
std::string end;
std::vector<std::pair<std::string, std::string>> range_results;
fdb_bool_t boundaryAndExist;
};
std::vector<MappedKV> mkvs;
// True if values remain in the key range requested.
@ -317,7 +314,6 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
auto value = extractString(mkv.value);
auto begin = extractString(mkv.getRange.begin.key);
auto end = extractString(mkv.getRange.end.key);
bool boundaryAndExist = mkv.boundaryAndExist;
// std::cout << "key:" << key << " value:" << value << " begin:" << begin << " end:" << end << std::endl;
std::vector<std::pair<std::string, std::string>> range_results;
@ -328,7 +324,7 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
range_results.emplace_back(k, v);
// std::cout << "[" << i << "]" << k << " -> " << v << std::endl;
}
result.mkvs.emplace_back(key, value, begin, end, range_results, boundaryAndExist);
result.mkvs.emplace_back(key, value, begin, end, range_results);
}
return result;
}
@ -1096,9 +1092,7 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
CHECK(!result.more);
int id = beginId;
bool boundary;
for (int i = 0; i < expectSize; i++, id++) {
boundary = i == 0 || i == expectSize - 1;
const auto& mkv = result.mkvs[i];
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
@ -1109,8 +1103,6 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
} else {
CHECK(EMPTY.compare(mkv.key) == 0);
}
bool empty = mkv.range_results.empty();
CHECK(mkv.boundaryAndExist == (boundary && !empty));
CHECK(EMPTY.compare(mkv.value) == 0);
CHECK(mkv.range_results.size() == SPLIT_SIZE);
for (int split = 0; split < SPLIT_SIZE; split++) {
@ -1154,9 +1146,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_missing_all_secondary") {
CHECK(!result.more);
int id = beginId;
bool boundary;
for (int i = 0; i < expectSize; i++, id++) {
boundary = i == 0 || i == expectSize - 1;
const auto& mkv = result.mkvs[i];
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
@ -1167,8 +1157,6 @@ TEST_CASE("fdb_transaction_get_mapped_range_missing_all_secondary") {
} else {
CHECK(EMPTY.compare(mkv.key) == 0);
}
bool empty = mkv.range_results.empty();
CHECK(mkv.boundaryAndExist == (boundary && !empty));
CHECK(EMPTY.compare(mkv.value) == 0);
}
break;

View File

@ -612,14 +612,14 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureMappedResults_Future
FDBMappedKeyValue kvm = kvms[i];
int kvm_count = kvm.getRange.m_size;
// now it has 5 field, key, value, getRange.begin, getRange.end, boundaryAndExist
// now it has 4 field, key, value, getRange.begin, getRange.end
// this needs to change if FDBMappedKeyValue definition is changed.
const int totalFieldFDBMappedKeyValue = 5;
const int totalFieldFDBMappedKeyValue = 4;
const int totalLengths = totalFieldFDBMappedKeyValue + kvm_count * 2;
int totalBytes = kvm.key.key_length + kvm.value.key_length + kvm.getRange.begin.key.key_length +
kvm.getRange.end.key.key_length + sizeof(kvm.boundaryAndExist);
kvm.getRange.end.key.key_length;
for (int i = 0; i < kvm_count; i++) {
auto kv = kvm.getRange.data[i];
totalBytes += kv.key_length + kv.value_length;
@ -663,7 +663,6 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureMappedResults_Future
cpBytesAndLength(pByte, pLength, kvm.value);
cpBytesAndLength(pByte, pLength, kvm.getRange.begin.key);
cpBytesAndLength(pByte, pLength, kvm.getRange.end.key);
cpBytesAndLengthInner(pByte, pLength, (uint8_t*)&(kvm.boundaryAndExist), sizeof(kvm.boundaryAndExist));
for (int kvm_i = 0; kvm_i < kvm_count; kvm_i++) {
auto kv = kvm.getRange.data[kvm_i];
cpBytesAndLengthInner(pByte, pLength, kv.key, kv.key_length);

View File

@ -209,11 +209,6 @@ class MappedRangeQueryIntegrationTest {
assertByteArrayEquals(indexEntryKey(id), mappedKeyValue.getKey());
assertByteArrayEquals(EMPTY, mappedKeyValue.getValue());
assertByteArrayEquals(indexEntryKey(id), mappedKeyValue.getKey());
if (id == begin || id == end - 1) {
Assertions.assertTrue(mappedKeyValue.getBoundaryAndExist());
} else {
Assertions.assertFalse(mappedKeyValue.getBoundaryAndExist());
}
byte[] prefix = recordKeyPrefix(id);
assertByteArrayEquals(prefix, mappedKeyValue.getRangeBegin());
prefix[prefix.length - 1] = (byte)0x01;

View File

@ -33,27 +33,22 @@ public class MappedKeyValue extends KeyValue {
private final byte[] rangeBegin;
private final byte[] rangeEnd;
private final List<KeyValue> rangeResult;
private final int boundaryAndExist;
// now it has 5 field, key, value, getRange.begin, getRange.end, boundaryAndExist
// now it has 4 fields, key, value, getRange.begin, getRange.end
// this needs to change if FDBMappedKeyValue definition is changed.
private static final int TOTAL_SERIALIZED_FIELD_FDBMappedKeyValue = 5;
private static final int TOTAL_SERIALIZED_FIELD_FDBMappedKeyValue = 4;
public MappedKeyValue(byte[] key, byte[] value, byte[] rangeBegin, byte[] rangeEnd, List<KeyValue> rangeResult,
int boundaryAndExist) {
public MappedKeyValue(byte[] key, byte[] value, byte[] rangeBegin, byte[] rangeEnd, List<KeyValue> rangeResult) {
super(key, value);
this.rangeBegin = rangeBegin;
this.rangeEnd = rangeEnd;
this.rangeResult = rangeResult;
this.boundaryAndExist = boundaryAndExist;
}
public byte[] getRangeBegin() { return rangeBegin; }
public byte[] getRangeEnd() { return rangeEnd; }
public boolean getBoundaryAndExist() { return boundaryAndExist == 0 ? false : true; }
public List<KeyValue> getRangeResult() { return rangeResult; }
public static MappedKeyValue fromBytes(byte[] bytes, int[] lengths) {
@ -69,8 +64,6 @@ public class MappedKeyValue extends KeyValue {
byte[] value = takeBytes(offset, bytes, lengths);
byte[] rangeBegin = takeBytes(offset, bytes, lengths);
byte[] rangeEnd = takeBytes(offset, bytes, lengths);
byte[] boundaryAndExistBytes = takeBytes(offset, bytes, lengths);
int boundaryAndExist = ByteBuffer.wrap(boundaryAndExistBytes).order(ByteOrder.LITTLE_ENDIAN).getInt();
if ((lengths.length - TOTAL_SERIALIZED_FIELD_FDBMappedKeyValue) % 2 != 0) {
throw new IllegalArgumentException("There needs to be an even number of lengths!");
@ -82,7 +75,7 @@ public class MappedKeyValue extends KeyValue {
byte[] v = takeBytes(offset, bytes, lengths);
rangeResult.add(new KeyValue(k, v));
}
return new MappedKeyValue(key, value, rangeBegin, rangeEnd, rangeResult, boundaryAndExist);
return new MappedKeyValue(key, value, rangeBegin, rangeEnd, rangeResult);
}
static class Offset {
@ -109,17 +102,14 @@ public class MappedKeyValue extends KeyValue {
return false;
MappedKeyValue rhs = (MappedKeyValue) obj;
return Arrays.equals(rangeBegin, rhs.rangeBegin)
&& Arrays.equals(rangeEnd, rhs.rangeEnd)
&& Objects.equals(rangeResult, rhs.rangeResult)
&& boundaryAndExist == rhs.boundaryAndExist;
return Arrays.equals(rangeBegin, rhs.rangeBegin) && Arrays.equals(rangeEnd, rhs.rangeEnd) &&
Objects.equals(rangeResult, rhs.rangeResult);
}
@Override
public int hashCode() {
int hashForResult = rangeResult == null ? 0 : rangeResult.hashCode();
return 17 +
(29 * hashForResult + boundaryAndExist + 37 * Arrays.hashCode(rangeBegin) + Arrays.hashCode(rangeEnd));
return 17 + (29 * hashForResult + 37 * Arrays.hashCode(rangeBegin) + Arrays.hashCode(rangeEnd));
}
@Override
@ -128,7 +118,6 @@ public class MappedKeyValue extends KeyValue {
sb.append("rangeBegin=").append(ByteArrayUtil.printable(rangeBegin));
sb.append(", rangeEnd=").append(ByteArrayUtil.printable(rangeEnd));
sb.append(", rangeResult=").append(rangeResult);
sb.append(", boundaryAndExist=").append(boundaryAndExist);
sb.append('}');
return super.toString() + "->" + sb.toString();
}

View File

@ -51,8 +51,6 @@ class MappedRangeResultDirectBufferIterator extends DirectBufferIterator impleme
final byte[] value = getString();
final byte[] rangeBegin = getString();
final byte[] rangeEnd = getString();
final byte[] boundaryAndExistBytes = getString();
final int boundaryAndExist = ByteBuffer.wrap(boundaryAndExistBytes).getInt();
final int rangeResultSize = byteBuffer.getInt();
List<KeyValue> rangeResult = new ArrayList();
for (int i = 0; i < rangeResultSize; i++) {
@ -61,7 +59,7 @@ class MappedRangeResultDirectBufferIterator extends DirectBufferIterator impleme
rangeResult.add(new KeyValue(k, v));
}
current += 1;
return new MappedKeyValue(key, value, rangeBegin, rangeEnd, rangeResult, boundaryAndExist);
return new MappedKeyValue(key, value, rangeBegin, rangeEnd, rangeResult);
}
private byte[] getString() {

View File

@ -47,7 +47,7 @@ struct Hash {
struct _ht { /* the hash table */
int count; /* Number of entries with this hash */
HashElem* chain; /* Pointer to first entry with this hash */
} * ht;
}* ht;
};
/* Each element in the hash table is an instance of the following

View File

@ -4623,17 +4623,17 @@ struct sqlite3_index_info {
unsigned char op; /* Constraint operator */
unsigned char usable; /* True if this constraint is usable */
int iTermOffset; /* Used internally - xBestIndex should ignore */
} * aConstraint; /* Table of WHERE clause constraints */
}* aConstraint; /* Table of WHERE clause constraints */
int nOrderBy; /* Number of terms in the ORDER BY clause */
struct sqlite3_index_orderby {
int iColumn; /* Column number */
unsigned char desc; /* True for DESC. False for ASC. */
} * aOrderBy; /* The ORDER BY clause */
}* aOrderBy; /* The ORDER BY clause */
/* Outputs */
struct sqlite3_index_constraint_usage {
int argvIndex; /* if >0, constraint is part of argv to xFilter */
unsigned char omit; /* Do not code a test for this constraint */
} * aConstraintUsage;
}* aConstraintUsage;
int idxNum; /* Number used to identify the index */
char* idxStr; /* String, possibly obtained from sqlite3_malloc */
int needToFreeIdxStr; /* Free idxStr using sqlite3_free() if true */

View File

@ -63,7 +63,9 @@ ACTOR Future<Void> simpleTimer() {
ACTOR Future<Void> someFuture(Future<int> ready) {
// loop choose {} works as well here - the braces are optional
loop choose {
when(wait(delay(0.5))) { std::cout << "Still waiting...\n"; }
when(wait(delay(0.5))) {
std::cout << "Still waiting...\n";
}
when(int r = wait(ready)) {
std::cout << format("Ready %d\n", r);
wait(delay(double(r)));
@ -84,8 +86,12 @@ ACTOR Future<Void> promiseDemo() {
ACTOR Future<Void> eventLoop(AsyncTrigger* trigger) {
loop choose {
when(wait(delay(0.5))) { std::cout << "Still waiting...\n"; }
when(wait(trigger->onTrigger())) { std::cout << "Triggered!\n"; }
when(wait(delay(0.5))) {
std::cout << "Still waiting...\n";
}
when(wait(trigger->onTrigger())) {
std::cout << "Triggered!\n";
}
}
}
@ -185,7 +191,9 @@ ACTOR Future<Void> echoServer() {
when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
req.reply.send(echoServer);
}
when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); }
when(EchoRequest req = waitNext(echoServer.echo.getFuture())) {
req.reply.send(req.message);
}
when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
}

View File

@ -2595,7 +2595,9 @@ ACTOR Future<Void> expireBackupData(const char* name,
lastProgress = p;
}
}
when(wait(expire)) { break; }
when(wait(expire)) {
break;
}
}
}
@ -2638,7 +2640,9 @@ ACTOR Future<Void> deleteBackupContainer(const char* name,
loop {
choose {
when(wait(done)) { break; }
when(wait(done)) {
break;
}
when(wait(delay(5))) {
if (numDeleted != lastUpdate) {
printf("\r%d...", numDeleted);
@ -3044,7 +3048,7 @@ static std::vector<std::vector<StringRef>> parseLine(std::string& line, bool& er
static void addKeyRange(std::string optionValue, Standalone<VectorRef<KeyRangeRef>>& keyRanges) {
bool err = false, partial = false;
int tokenArray = 0;
[[maybe_unused]] int tokenArray = 0;
auto parsed = parseLine(optionValue, err, partial);

View File

@ -685,7 +685,9 @@ ACTOR template <class T>
Future<T> makeInterruptable(Future<T> f) {
Future<Void> interrupt = LineNoise::onKeyboardInterrupt();
choose {
when(T t = wait(f)) { return t; }
when(T t = wait(f)) {
return t;
}
when(wait(interrupt)) {
f.cancel();
throw operation_cancelled();

View File

@ -1182,7 +1182,9 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
choose {
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
when(wait(delay(5.0))) {
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
}
}
TraceEvent("ChangeQuorumCheckerSetCoordinatorsKey")
.detail("CurrentCoordinators", old.toString())
@ -1284,7 +1286,9 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
TaskPriority::CoordinationReply));
choose {
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
when(wait(delay(5.0))) {
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
}
}
tr.set(coordinatorsKey, newClusterConnectionString.toString());

View File

@ -652,7 +652,9 @@ ACTOR Future<Void> asyncDeserializeClusterInterface(Reference<AsyncVar<Value>> s
state Future<Void> deserializer = asyncDeserialize(serializedInfo, knownLeader);
loop {
choose {
when(wait(deserializer)) { UNSTOPPABLE_ASSERT(false); }
when(wait(deserializer)) {
UNSTOPPABLE_ASSERT(false);
}
when(wait(knownLeader->onChange())) {
if (knownLeader->get().present()) {
outKnownLeader->set(knownLeader->get().get().clientInterface);

View File

@ -26,6 +26,7 @@
#include <limits>
#include <memory>
#include <regex>
#include <string>
#include <unordered_set>
#include <tuple>
#include <utility>
@ -1016,7 +1017,9 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
proxiesChangeTrigger->trigger();
}
}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
when(wait(actors.getResult())) {
UNSTOPPABLE_ASSERT(false);
}
}
}
}
@ -1498,9 +1501,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
int _apiVersion,
IsSwitchable switchable,
Optional<TenantName> defaultTenant)
: lockAware(lockAware), switchable(switchable), connectionRecord(connectionRecord), proxyProvisional(false),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), defaultTenant(defaultTenant),
internal(internal), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
: dbId(deterministicRandom()->randomUniqueID()), lockAware(lockAware), switchable(switchable),
connectionRecord(connectionRecord), proxyProvisional(false), clientLocality(clientLocality),
enableLocalityLoadBalance(enableLocalityLoadBalance), defaultTenant(defaultTenant), internal(internal),
cc("TransactionMetrics", dbId.toString()), transactionReadVersions("ReadVersions", cc),
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
@ -1531,11 +1535,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), anyBGReads(false),
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
ccBG("BlobGranuleReadMetrics", dbId.toString()), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
ccFeed("ChangeFeedClientMetrics", dbId.toString()), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
@ -1547,8 +1551,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
dbId = deterministicRandom()->randomUniqueID();
TraceEvent("DatabaseContextCreated", dbId).backtrace();
connected = (clientInfo->get().commitProxies.size() && clientInfo->get().grvProxies.size())
@ -3360,7 +3362,6 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
span.addAttribute("tenant"_sr, trState->tenant().get());
}
span.addAttribute("key"_sr, key);
trState->cx->validateVersion(ver);
loop {
@ -3402,7 +3403,9 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
std::vector<Error>{ transaction_too_old(), future_version() });
}
choose {
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(GetValueReply _reply = wait(loadBalance(
trState->cx.getPtr(),
locationInfo.locations,
@ -3549,7 +3552,9 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
state GetKeyReply reply;
try {
choose {
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(GetKeyReply _reply = wait(loadBalance(
trState->cx.getPtr(),
locationInfo.locations,
@ -3713,7 +3718,9 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { wait(Never()); }
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) {
wait(Never());
}
}
if (watchValueID.present()) {
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After");
@ -4076,7 +4083,9 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
state GetKeyValuesFamilyReply rep;
try {
choose {
when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
when(wait(trState->cx->connectionFileChanged())) {
throw transaction_too_old();
}
when(GetKeyValuesFamilyReply _rep = wait(loadBalance(
trState->cx.getPtr(),
locations[shard].locations,
@ -4284,7 +4293,6 @@ int64_t inline getRangeResultFamilyBytes(MappedRangeResultRef result) {
int64_t bytes = 0;
for (const MappedKeyValueRef& mappedKeyValue : result) {
bytes += mappedKeyValue.key.size() + mappedKeyValue.value.size();
bytes += sizeof(mappedKeyValue.boundaryAndExist);
auto& reqAndResult = mappedKeyValue.reqAndResult;
if (std::holds_alternative<GetValueReqAndResultRef>(reqAndResult)) {
auto getValue = std::get<GetValueReqAndResultRef>(reqAndResult);
@ -4976,7 +4984,9 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
return Void();
}
when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) { rep = _rep; }
when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) {
rep = _rep;
}
}
++trState->cx->transactionPhysicalReadsCompleted;
} catch (Error& e) {
@ -5473,7 +5483,9 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
loop {
choose {
// NativeAPI watchValue future finishes or errors
when(wait(watch->watchFuture)) { break; }
when(wait(watch->watchFuture)) {
break;
}
when(wait(cx->connectionFileChanged())) {
CODE_PROBE(true, "Recreated a watch after switch");
@ -7065,7 +7077,9 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpa
state Future<Void> onProxiesChanged = cx->onProxiesChanged();
choose {
when(wait(onProxiesChanged)) { onProxiesChanged = cx->onProxiesChanged(); }
when(wait(onProxiesChanged)) {
onProxiesChanged = cx->onProxiesChanged();
}
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies(
flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)),
@ -7491,7 +7505,9 @@ ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
needToConnect = false;
}
choose {
when(wait(coordinator->onChange())) { needToConnect = true; }
when(wait(coordinator->onChange())) {
needToConnect = true;
}
when(ProtocolVersion pv = wait(protocolVersion)) {
if (!expectedVersion.present() || expectedVersion.get() != pv) {
@ -8878,7 +8894,9 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
void Transaction::setTransactionID(UID id) {
ASSERT(getSize() == 0);
trState->spanContext = SpanContext(id, trState->spanContext.spanID);
trState->spanContext = SpanContext(id, trState->spanContext.spanID, trState->spanContext.m_Flags);
tr.spanContext = trState->spanContext;
span.context = trState->spanContext;
}
void Transaction::setToken(uint64_t token) {
@ -9067,8 +9085,12 @@ ACTOR static Future<std::vector<CheckpointMetaData>> getCheckpointMetaDataForRan
}
choose {
when(wait(cx->connectionFileChanged())) { cx->invalidateCache(KeyRef(), range); }
when(wait(waitForAll(futures))) { break; }
when(wait(cx->connectionFileChanged())) {
cx->invalidateCache(KeyRef(), range);
}
when(wait(waitForAll(futures))) {
break;
}
when(wait(delay(timeout))) {
TraceEvent(SevWarn, "GetCheckpointTimeout").detail("Range", range).detail("Version", version);
}
@ -9523,6 +9545,9 @@ ACTOR Future<Void> changeFeedTSSValidator(ChangeFeedStreamRequest req,
Version next = waitNext(data->get().ssStreamSummary.getFuture());
ssSummary.push_back(next);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() != error_code_end_of_stream) {
data->get().complete();
if (e.code() != error_code_operation_cancelled) {
@ -9672,15 +9697,11 @@ Version ChangeFeedData::getVersion() {
// native api has consumed and processed, them, and then the fdb client has consumed all of the mutations.
ACTOR Future<Void> changeFeedWaitLatest(Reference<ChangeFeedData> self, Version version) {
// wait on SS to have sent up through version
int desired = 0;
int waiting = 0;
std::vector<Future<Void>> allAtLeast;
for (auto& it : self->storageData) {
if (it->version.get() < version) {
waiting++;
if (version > it->desired.get()) {
it->desired.set(version);
desired++;
}
allAtLeast.push_back(it->version.whenAtLeast(version));
}
@ -9739,8 +9760,12 @@ ACTOR Future<Void> changeFeedWhenAtLatest(Reference<ChangeFeedData> self, Versio
// only allowed to use empty versions if you're caught up
Future<Void> waitEmptyVersion = (self->notAtLatest.get() == 0) ? changeFeedWaitLatest(self, version) : Never();
choose {
when(wait(waitEmptyVersion)) { break; }
when(wait(lastReturned)) { break; }
when(wait(waitEmptyVersion)) {
break;
}
when(wait(lastReturned)) {
break;
}
when(wait(self->refresh.getFuture())) {}
when(wait(self->notAtLatest.onChange())) {}
}

View File

@ -209,8 +209,12 @@ class GetGenerationQuorum {
}
try {
choose {
when(ConfigGeneration generation = wait(self->result.getFuture())) { return generation; }
when(wait(self->actors.getResult())) { ASSERT(false); }
when(ConfigGeneration generation = wait(self->result.getFuture())) {
return generation;
}
when(wait(self->actors.getResult())) {
ASSERT(false);
}
}
} catch (Error& e) {
if (e.code() == error_code_failed_to_reach_quorum) {

View File

@ -356,16 +356,24 @@ public:
Req req,
Snapshot snapshot) {
choose {
when(typename Req::Result result = wait(readThrough(ryw, req, snapshot))) { return result; }
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
when(typename Req::Result result = wait(readThrough(ryw, req, snapshot))) {
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
}
}
ACTOR template <class Req>
static Future<typename Req::Result> readWithConflictRangeSnapshot(ReadYourWritesTransaction* ryw, Req req) {
state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
choose {
when(typename Req::Result result = wait(read(ryw, req, &it))) { return result; }
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
when(typename Req::Result result = wait(read(ryw, req, &it))) {
return result;
}
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
}
}
ACTOR template <class Req>
@ -381,7 +389,9 @@ public:
addConflictRange(ryw, req, it.extractWriteMapIterator(), result);
return result;
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
}
}
template <class Req>
@ -1201,7 +1211,9 @@ public:
addConflictRangeAndMustUnmodified<backwards>(ryw, req, writes, result);
return result;
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
}
}
@ -1452,9 +1464,13 @@ public:
ACTOR static Future<Version> getReadVersion(ReadYourWritesTransaction* ryw) {
choose {
when(Version v = wait(ryw->tr.getReadVersion())) { return v; }
when(Version v = wait(ryw->tr.getReadVersion())) {
return v;
}
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
when(wait(ryw->resetPromise.getFuture())) {
throw internal_error();
}
}
}
};

View File

@ -489,7 +489,9 @@ ACTOR Future<Void> deleteRecursively_impl(Reference<S3BlobStoreEndpoint> b,
loop {
choose {
// Throw if done throws, otherwise don't stop until end_of_stream
when(wait(done)) { done = Never(); }
when(wait(done)) {
done = Never();
}
when(S3BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
for (auto& object : list.objects) {
@ -1205,7 +1207,9 @@ ACTOR Future<S3BlobStoreEndpoint::ListResult> listObjects_impl(Reference<S3BlobS
loop {
choose {
// Throw if done throws, otherwise don't stop until end_of_stream
when(wait(done)) { done = Never(); }
when(wait(done)) {
done = Never();
}
when(S3BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) {
results.commonPrefixes.insert(

View File

@ -304,7 +304,9 @@ ACTOR Future<RangeResult> SpecialKeySpace::checkRYWValid(SpecialKeySpace* sks,
wait(SpecialKeySpace::getRangeAggregationActor(sks, ryw, begin, end, limits, reverse))) {
return result;
}
when(wait(ryw->resetFuture())) { throw internal_error(); }
when(wait(ryw->resetFuture())) {
throw internal_error();
}
}
}

View File

@ -870,13 +870,13 @@ TaskBucket::TaskBucket(const Subspace& subspace,
AccessSystemKeys sysAccess,
PriorityBatch priorityBatch,
LockAware lockAware)
: cc("TaskBucket"), dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc), dispatchErrors("DispatchErrors", cc),
: dbgid(deterministicRandom()->randomUniqueID()), cc("TaskBucket", dbgid.toString()),
dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc), dispatchErrors("DispatchErrors", cc),
dispatchDoTasks("DispatchDoTasks", cc), dispatchEmptyTasks("DispatchEmptyTasks", cc),
dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc), dbgid(deterministicRandom()->randomUniqueID()),
prefix(subspace), active(prefix.get("ac"_sr)), pauseKey(prefix.pack("pause"_sr)), available(prefix.get("av"_sr)),
available_prioritized(prefix.get("avp"_sr)), timeouts(prefix.get("to"_sr)),
timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS), system_access(sysAccess), priority_batch(priorityBatch),
lockAware(lockAware) {}
dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc), prefix(subspace), active(prefix.get("ac"_sr)),
pauseKey(prefix.pack("pause"_sr)), available(prefix.get("av"_sr)), available_prioritized(prefix.get("avp"_sr)),
timeouts(prefix.get("to"_sr)), timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS), system_access(sysAccess),
priority_batch(priorityBatch), lockAware(lockAware) {}
TaskBucket::~TaskBucket() {}

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/Msgpack.h"
#include "flow/Msgpack.h"
#include "fdbclient/Tracing.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
@ -447,8 +447,6 @@ TEST_CASE("/flow/Tracing/AddAttributes") {
SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
TraceFlags::sampled));
IKnobCollection::getMutableGlobalKnobCollection().setKnob("tracing_span_attributes_enabled",
KnobValueRef::create(bool{ true }));
auto arena = span1.arena;
span1.addAttribute(StringRef(arena, "foo"_sr), StringRef(arena, "bar"_sr));
span1.addAttribute(StringRef(arena, "operation"_sr), StringRef(arena, "grv"_sr));
@ -567,8 +565,6 @@ std::string readMPString(uint8_t* index) {
// Windows doesn't like lack of header and declaration of constructor for FastUDPTracer
#ifndef WIN32
TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
IKnobCollection::getMutableGlobalKnobCollection().setKnob("tracing_span_attributes_enabled",
KnobValueRef::create(bool{ true }));
Span span1("encoded_span"_loc);
auto request = MsgpackBuffer{ .buffer = std::make_unique<uint8_t[]>(kTraceBufferSize),
.data_size = 0,

View File

@ -112,7 +112,7 @@ void populateVersionVector(VersionVector& vv,
int tagsPerLocality = tagCount / localityCount;
// Populate localities.
for (int i = 0; localities.size() < (size_t)localityCount; i++) {
while (localities.size() < (size_t)localityCount) {
int8_t locality = deterministicRandom()->randomInt(tagLocalityInvalid + 1, INT8_MAX);
if (std::find(localities.begin(), localities.end(), locality) == localities.end()) {
localities.push_back(locality);

View File

@ -439,6 +439,8 @@ public:
void expireThrottles();
UID dbId;
// Key DB-specific information
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord;
AsyncTrigger proxiesChangeTrigger;
@ -521,7 +523,6 @@ public:
// servers by their tags).
std::unordered_map<UID, Tag> ssidTagMapping;
UID dbId;
IsInternal internal; // Only contexts created through the C client and fdbcli are non-internal
PrioritizedTransactionTagMap<ClientTagThrottleData> throttledTags;

View File

@ -814,18 +814,9 @@ struct MappedKeyValueRef : KeyValueRef {
MappedReqAndResultRef reqAndResult;
// boundary KVs are always returned so that caller can use it as a continuation,
// for non-boundary KV, it is always false.
// for boundary KV, it is true only when the secondary query succeeds(return non-empty).
// Note: only MATCH_INDEX_MATCHED_ONLY and MATCH_INDEX_UNMATCHED_ONLY modes can make use of it,
// to decide whether the boudnary is a match/unmatch.
// In the case of MATCH_INDEX_ALL and MATCH_INDEX_NONE, caller should not care if boundary has a match or not.
bool boundaryAndExist;
MappedKeyValueRef() = default;
MappedKeyValueRef(Arena& a, const MappedKeyValueRef& copyFrom) : KeyValueRef(a, copyFrom) {
const auto& reqAndResultCopyFrom = copyFrom.reqAndResult;
boundaryAndExist = copyFrom.boundaryAndExist;
if (std::holds_alternative<GetValueReqAndResultRef>(reqAndResultCopyFrom)) {
auto getValue = std::get<GetValueReqAndResultRef>(reqAndResultCopyFrom);
reqAndResult = GetValueReqAndResultRef(a, getValue);
@ -839,7 +830,7 @@ struct MappedKeyValueRef : KeyValueRef {
bool operator==(const MappedKeyValueRef& rhs) const {
return static_cast<const KeyValueRef&>(*this) == static_cast<const KeyValueRef&>(rhs) &&
reqAndResult == rhs.reqAndResult && boundaryAndExist == rhs.boundaryAndExist;
reqAndResult == rhs.reqAndResult;
}
bool operator!=(const MappedKeyValueRef& rhs) const { return !(rhs == *this); }
@ -849,7 +840,7 @@ struct MappedKeyValueRef : KeyValueRef {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((KeyValueRef&)*this), reqAndResult, boundaryAndExist);
serializer(ar, ((KeyValueRef&)*this), reqAndResult);
}
};

View File

@ -80,7 +80,6 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
* and take the shortcut. */
FDBGetRangeReqAndResult getRange;
unsigned char buffer[32];
bool boundaryAndExist;
} FDBMappedKeyValue;
#pragma pack(push, 4)

View File

@ -80,8 +80,9 @@ public:
Future<Void> done() { return reader; }
private:
Version beginVersion, endVersion, currentBeginVersion;
unsigned pipelineDepth;
[[maybe_unused]] Version beginVersion;
Version endVersion, currentBeginVersion;
[[maybe_unused]] unsigned pipelineDepth;
Future<Void> reader;
};

View File

@ -274,6 +274,7 @@ public:
Database src;
Map<Key, Future<Reference<KeyRangeMap<Version>>>> key_version;
UID dbgid;
CounterCollection cc;
Counter dispatchSlotChecksStarted;
@ -281,7 +282,6 @@ public:
Counter dispatchDoTasks;
Counter dispatchEmptyTasks;
Counter dispatchSlotChecksComplete;
UID dbgid;
double getTimeoutSeconds() const { return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND; }

View File

@ -230,9 +230,7 @@ public:
}
Span& addAttribute(const StringRef& key, const StringRef& value) {
if (FLOW_KNOBS->TRACING_SPAN_ATTRIBUTES_ENABLED) {
attributes.push_back_deep(arena, KeyValueRef(key, value));
}
attributes.push_back_deep(arena, KeyValueRef(key, value));
return *this;
}
@ -273,4 +271,4 @@ struct ITracer {
virtual void trace(Span const& span) = 0;
};
void openTracer(TracerType type);
void openTracer(TracerType type);

View File

@ -685,7 +685,7 @@ public:
}
Future<Void> forgetVersionsBeforeAsync(Version newOldestVersion, TaskPriority taskID = TaskPriority::DefaultYield) {
ASSERT(newOldestVersion <= latestVersion);
ASSERT_LE(newOldestVersion, latestVersion);
auto r = upper_bound(roots.begin(), roots.end(), newOldestVersion, rootsComparator());
auto upper = r;
--r;

View File

@ -13,7 +13,7 @@
#include "json_spirit_value.h"
#include "json_spirit_error_position.h"
//#define BOOST_SPIRIT_THREADSAFE // uncomment for multithreaded use, requires linking to boost.thread
// #define BOOST_SPIRIT_THREADSAFE // uncomment for multithreaded use, requires linking to boost.thread
#include <boost/bind/bind.hpp>
#include <boost/function.hpp>

View File

@ -0,0 +1,62 @@
#include "fdbrpc/DDSketch.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include <limits>
#include <random>
#include "flow/actorcompiler.h" // has to be last include
void forceLinkDDSketchTests() {}
TEST_CASE("/fdbrpc/ddsketch/accuracy") {
int TRY = 100, SIZE = 1e6;
const int totalPercentiles = 7;
double targetPercentiles[totalPercentiles] = { .0001, .01, .1, .50, .90, .99, .9999 };
double stat[totalPercentiles] = { 0 };
for (int t = 0; t < TRY; t++) {
DDSketch<double> dd;
std::vector<double> nums;
for (int i = 0; i < SIZE; i++) {
static double a = 1, b = 1; // a skewed distribution
auto y = deterministicRandom()->random01();
auto num = b / pow(1 - y, 1 / a);
nums.push_back(num);
dd.addSample(num);
}
std::sort(nums.begin(), nums.end());
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
double percentile = targetPercentiles[percentID];
double ground = nums[percentile * (SIZE - 1)], ddvalue = dd.percentile(percentile);
double relativeError = fabs(ground - ddvalue) / ground;
stat[percentID] += relativeError;
}
}
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
printf("%.4lf per, relative error %.4lf\n", targetPercentiles[percentID], stat[percentID] / TRY);
}
return Void();
}
TEST_CASE("/fdbrpc/ddsketch/correctness") {
DDSketch<double> dd;
for (int i = 0; i < 4000; i++) {
// This generates a uniform real disitribution between the range of
// [0.0004, 0.01]
double sample = (static_cast<double>(deterministicRandom()->randomSkewedUInt32(40, 1000)) / 100000);
dd.addSample(sample);
}
double p50 = dd.percentile(0.5);
ASSERT(p50 > 0 && p50 != std::numeric_limits<double>::infinity());
double p90 = dd.percentile(0.9);
ASSERT(p90 > 0 && p90 != std::numeric_limits<double>::infinity());
double p95 = dd.percentile(0.95);
ASSERT(p95 > 0 && p95 != std::numeric_limits<double>::infinity());
double p99 = dd.percentile(0.99);
ASSERT(p99 > 0 && p99 != std::numeric_limits<double>::infinity());
double p999 = dd.percentile(0.999);
ASSERT(p999 > 0 && p999 != std::numeric_limits<double>::infinity());
return Void{};
}

View File

@ -53,7 +53,9 @@ ACTOR Future<Void> waitForContinuousFailure(IFailureMonitor* monitor,
choose {
when(wait(monitor->onStateEqual(endpoint, FailureStatus(false)))) {
} // SOMEDAY: Use onStateChanged() for efficiency
when(wait(delay(waitDelay))) { return Void(); }
when(wait(delay(waitDelay))) {
return Void();
}
}
}
}

View File

@ -572,7 +572,9 @@ ACTOR Future<Void> connectionMonitor(Reference<Peer> peer) {
}
break;
}
when(wait(peer->resetPing.onTrigger())) { break; }
when(wait(peer->resetPing.onTrigger())) {
break;
}
}
}
}
@ -668,7 +670,9 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
choose {
when(wait(self->dataToSend.onTrigger())) {}
when(wait(retryConnectF)) { break; }
when(wait(retryConnectF)) {
break;
}
}
}
@ -717,7 +721,9 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
self->prependConnectPacket();
reader = connectionReader(self->transport, conn, self, Promise<Reference<Peer>>());
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { throw connection_failed(); }
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
}
} catch (Error& e) {
++self->connectFailedCount;
@ -1465,7 +1471,9 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
ASSERT(false);
return Void();
}
when(Reference<Peer> p = wait(onConnected.getFuture())) { p->onIncomingConnection(p, conn, reader); }
when(Reference<Peer> p = wait(onConnected.getFuture())) {
p->onIncomingConnection(p, conn, reader);
}
when(wait(delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
CODE_PROBE(true, "Incoming connection timed out");
throw timed_out();

View File

@ -50,7 +50,9 @@ ACTOR Future<Void> allAlternativesFailedDelay(Future<Void> okFuture) {
choose {
when(wait(okFuture)) {}
when(wait(::delayJittered(delay))) { throw all_alternatives_failed(); }
when(wait(::delayJittered(delay))) {
throw all_alternatives_failed();
}
}
return Void();
}

View File

@ -77,13 +77,11 @@ bool PolicyOne::selectReplicas(Reference<LocalitySet>& fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results) {
int totalUsed = 0;
int itemsUsed = 0;
if (alsoServers.size()) {
totalUsed++;
} else if (fromServers->size()) {
auto randomEntry = fromServers->random();
results.push_back(randomEntry);
itemsUsed++;
totalUsed++;
}
return (totalUsed > 0);

View File

@ -19,6 +19,14 @@
*/
#include "fdbrpc/Stats.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/OTELMetrics.h"
#include "flow/TDMetric.actor.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/network.h"
#include <string>
#include "flow/actorcompiler.h" // has to be last include
Counter::Counter(std::string const& name, CounterCollection& collection)
@ -81,8 +89,36 @@ void Counter::clear() {
metric = 0;
}
void CounterCollection::logToTraceEvent(TraceEvent& te) const {
void CounterCollection::logToTraceEvent(TraceEvent& te) {
NetworkAddress addr = g_network->getLocalAddress();
for (ICounter* c : counters) {
MetricCollection* metrics = MetricCollection::getMetricCollection();
if (metrics != nullptr) {
std::string ip_str = addr.ip.toString();
std::string port_str = std::to_string(addr.port);
uint64_t val = c->getValue();
switch (c->model) {
case MetricsDataModel::OTLP: {
if (metrics->sumMap.find(c->id) != metrics->sumMap.end()) {
metrics->sumMap[c->id].points.emplace_back(static_cast<int64_t>(val));
} else {
metrics->sumMap[c->id] = OTEL::OTELSum(name + "." + c->getName(), val);
}
metrics->sumMap[c->id].points.back().addAttribute("ip", ip_str);
metrics->sumMap[c->id].points.back().addAttribute("port", port_str);
metrics->sumMap[c->id].points.back().startTime = logTime;
}
case MetricsDataModel::STATSD: {
std::vector<std::pair<std::string, std::string>> statsd_attributes{ { "ip", ip_str },
{ "port", port_str } };
metrics->statsd_message.push_back(createStatsdMessage(
c->getName(), StatsDMetric::COUNTER, std::to_string(val) /*, statsd_attributes*/));
}
case MetricsDataModel::NONE:
default: {
}
}
}
te.detail(c->getName().c_str(), c);
c->resetInterval();
}
@ -180,3 +216,84 @@ void LatencyBands::clearBands() {
LatencyBands::~LatencyBands() {
clearBands();
}
LatencySample::LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
: name(name), IMetric(knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL)), id(id), sampleEmit(now()), sketch(accuracy),
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
logger = recurring([this]() { logSample(); }, loggingInterval);
p50id = deterministicRandom()->randomUniqueID();
p90id = deterministicRandom()->randomUniqueID();
p95id = deterministicRandom()->randomUniqueID();
p99id = deterministicRandom()->randomUniqueID();
p999id = deterministicRandom()->randomUniqueID();
}
void LatencySample::addMeasurement(double measurement) {
sketch.addSample(measurement);
}
void LatencySample::logSample() {
double p25 = sketch.percentile(0.25);
double p50 = sketch.mean();
double p90 = sketch.percentile(0.9);
double p95 = sketch.percentile(0.95);
double p99 = sketch.percentile(0.99);
double p99_9 = sketch.percentile(0.999);
TraceEvent(name.c_str(), id)
.detail("Count", sketch.getPopulationSize())
.detail("Elapsed", now() - sampleEmit)
.detail("Min", sketch.min())
.detail("Max", sketch.max())
.detail("Mean", sketch.mean())
.detail("Median", p50)
.detail("P25", p25)
.detail("P90", p90)
.detail("P95", p95)
.detail("P99", p99)
.detail("P99.9", p99_9)
.trackLatest(latencySampleEventHolder->trackingKey);
MetricCollection* metrics = MetricCollection::getMetricCollection();
if (metrics != nullptr) {
NetworkAddress addr = g_network->getLocalAddress();
std::string ip_str = addr.ip.toString();
std::string port_str = std::to_string(addr.port);
switch (model) {
case MetricsDataModel::OTLP: {
if (metrics->histMap.find(IMetric::id) != metrics->histMap.end()) {
metrics->histMap[IMetric::id].points.emplace_back(
sketch.getErrorGuarantee(), sketch.getSamples(), sketch.min(), sketch.max(), sketch.getSum());
} else {
metrics->histMap[IMetric::id] = OTEL::OTELHistogram(
name, sketch.getErrorGuarantee(), sketch.getSamples(), sketch.min(), sketch.max(), sketch.getSum());
}
metrics->histMap[IMetric::id].points.back().addAttribute("ip", ip_str);
metrics->histMap[IMetric::id].points.back().addAttribute("port", port_str);
metrics->histMap[IMetric::id].points.back().startTime = sampleEmit;
createOtelGauge(p50id, name + "p50", p50);
createOtelGauge(p90id, name + "p90", p90);
createOtelGauge(p95id, name + "p95", p95);
createOtelGauge(p99id, name + "p99", p99);
createOtelGauge(p999id, name + "p99_9", p99_9);
}
case MetricsDataModel::STATSD: {
std::vector<std::pair<std::string, std::string>> statsd_attributes{ { "ip", ip_str },
{ "port", port_str } };
auto median_gauge =
createStatsdMessage(name + "p50", StatsDMetric::GAUGE, std::to_string(p50) /*, statsd_attributes*/);
auto p90_gauge =
createStatsdMessage(name + "p90", StatsDMetric::GAUGE, std::to_string(p90) /*, statsd_attributes*/);
auto p95_gauge =
createStatsdMessage(name + "p95", StatsDMetric::GAUGE, std::to_string(p95) /*, statsd_attributes*/);
auto p99_gauge =
createStatsdMessage(name + "p99", StatsDMetric::GAUGE, std::to_string(p99) /*, statsd_attributes*/);
auto p999_gauge =
createStatsdMessage(name + "p99.9", StatsDMetric::GAUGE, std::to_string(p99_9) /*, statsd_attributes*/);
}
case MetricsDataModel::NONE:
default: {
}
}
}
sketch.clear();
sampleEmit = now();
}

View File

@ -257,7 +257,9 @@ ACTOR template <int N, class X>
ACTOR template <class A, class B>
[[flow_allow_discard]] Future<Void> switchTest(FutureStream<A> as, Future<B> oneb) {
loop choose {
when(A a = waitNext(as)) { std::cout << "A " << a << std::endl; }
when(A a = waitNext(as)) {
std::cout << "A " << a << std::endl;
}
when(B b = wait(oneb)) {
std::cout << "B " << b << std::endl;
break;
@ -283,7 +285,7 @@ public:
#if !defined(__INTEL_COMPILER)
void operator delete(void* buf) {
std::cout << "Freeing buffer" << std::endl;
delete[](int*) buf;
delete[] (int*)buf;
}
#endif
@ -614,8 +616,12 @@ void returnCancelRaceTest() {
ACTOR [[flow_allow_discard]] Future<int> chooseTest(Future<int> a, Future<int> b) {
choose {
when(int A = wait(a)) { return A; }
when(int B = wait(b)) { return B; }
when(int A = wait(a)) {
return A;
}
when(int B = wait(b)) {
return B;
}
}
}
@ -960,8 +966,12 @@ ACTOR [[flow_allow_discard]] Future<int> introAdd(Future<int> a, Future<int> b)
ACTOR [[flow_allow_discard]] Future<int> introFirst(Future<int> a, Future<int> b) {
choose {
when(int x = wait(a)) { return x; }
when(int x = wait(b)) { return x; }
when(int x = wait(a)) {
return x;
}
when(int x = wait(b)) {
return x;
}
}
}

View File

@ -52,7 +52,9 @@ Future<T> sendErrorOnShutdown(Future<T> in, bool assertOnCancel = false) {
when(wait(success(g_simulator->getCurrentProcess()->shutdownSignal.getFuture()))) {
throw io_error().asInjectedFault();
}
when(T rep = wait(in)) { return rep; }
when(T rep = wait(in)) {
return rep;
}
}
} catch (Error& e) {
ASSERT(e.code() != error_code_actor_cancelled || !assertOnCancel);
@ -82,7 +84,9 @@ public:
when(wait(success(g_simulator->getCurrentProcess()->shutdownSignal.getFuture()))) {
throw io_error().asInjectedFault();
}
when(Reference<IAsyncFile> f = wait(wrappedFile)) { return makeReference<AsyncFileDetachable>(f); }
when(Reference<IAsyncFile> f = wait(wrappedFile)) {
return makeReference<AsyncFileDetachable>(f);
}
}
}
@ -507,7 +511,9 @@ private:
state bool saveDurable = true;
choose {
when(wait(delay(delayDuration))) {}
when(bool durable = wait(startSyncFuture)) { saveDurable = durable; }
when(bool durable = wait(startSyncFuture)) {
saveDurable = durable;
}
}
debugFileCheck("AsyncFileNonDurableWriteAfterWait", self->filename, dataCopy.begin(), offset, length);
@ -684,7 +690,9 @@ private:
state bool saveDurable = true;
choose {
when(wait(delay(delayDuration))) {}
when(bool durable = wait(startSyncFuture)) { saveDurable = durable; }
when(bool durable = wait(startSyncFuture)) {
saveDurable = durable;
}
}
if (g_network->check_yield(TaskPriority::DefaultYield)) {

View File

@ -32,11 +32,24 @@ template <class T>
class ContinuousSample {
public:
explicit ContinuousSample(int sampleSize)
: sampleSize(sampleSize), populationSize(0), sorted(true), _min(T()), _max(T()) {}
: sampleSize(sampleSize), populationSize(0), sorted(true), _min(T()), _max(T()), _sum(T()) {}
void swap(ContinuousSample<T>& other) {
std::swap(samples, other.samples);
std::swap(_min, other._min);
std::swap(_max, other._max);
std::swap(_sum, other._sum);
std::swap(populationSize, other.populationSize);
std::swap(sorted, other.sorted);
std::swap(sampleSize, other.sampleSize);
}
ContinuousSample<T>& addSample(T sample) {
if (!populationSize)
_min = _max = sample;
_sum = _min = _max = sample;
else {
_sum += sample;
}
populationSize++;
sorted = false;
@ -51,6 +64,10 @@ public:
return *this;
}
std::vector<T> getSamples() const { return samples; }
double sum() const { return _sum; }
double mean() const {
if (!samples.size())
return 0;
@ -78,7 +95,7 @@ public:
samples.clear();
populationSize = 0;
sorted = true;
_min = _max = 0; // Doesn't work for all T
_min = _max = _sum = 0; // Doesn't work for all T
}
uint64_t getPopulationSize() const { return populationSize; }
@ -88,7 +105,7 @@ private:
uint64_t populationSize;
bool sorted;
std::vector<T> samples;
T _min, _max;
T _min, _max, _sum;
void sort() {
if (!sorted && samples.size() > 1)

View File

@ -170,6 +170,7 @@ public:
T min() const { return minValue; }
T max() const { return maxValue; }
T getSum() const { return sum; }
void clear() {
std::fill(buckets.begin(), buckets.end(), 0);
@ -185,6 +186,8 @@ public:
size_t getBucketSize() const { return buckets.size(); }
std::vector<uint32_t> getSamples() const { return buckets; }
DDSketchBase<Impl, T>& mergeWith(const DDSketchBase<Impl, T>& anotherSketch) {
// Must have the same guarantee
ASSERT(fabs(errorGuarantee - anotherSketch.errorGuarantee) < EPS &&
@ -205,7 +208,7 @@ protected:
double errorGuarantee; // As defined in the paper
uint64_t populationSize, zeroPopulationSize; // we need to separately count 0s
std::vector<uint64_t> buckets;
std::vector<uint32_t> buckets;
T minValue, maxValue, sum;
void setBucketSize(size_t capacity) { buckets.resize(capacity, 0); }
};
@ -214,7 +217,7 @@ protected:
template <class T>
class DDSketch : public DDSketchBase<DDSketch<T>, T> {
public:
explicit DDSketch(double errorGuarantee = 0.01)
explicit DDSketch(double errorGuarantee = 0.005)
: DDSketchBase<DDSketch<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) {
ASSERT(errorGuarantee > 0);
@ -228,7 +231,10 @@ public:
return ceil(fastLogger::fastlog(sample) * multiplier) + offset;
}
T getValue(size_t index) { return fastLogger::reverseLog((index - offset) / multiplier) * 2.0 / (1 + gamma); }
T getValue(size_t index) {
return fastLogger::reverseLog((static_cast<double>(index) - static_cast<double>(offset)) / multiplier) * 2.0 /
(1 + gamma);
}
private:
double gamma, multiplier;
@ -248,7 +254,9 @@ public:
size_t getIndex(T sample) { return ceil(log(sample) / logGamma) + offset; }
T getValue(size_t index) { return (T)(2.0 * pow(gamma, (index - offset)) / (1 + gamma)); }
T getValue(size_t index) {
return (T)(2.0 * pow(gamma, (static_cast<double>(index) - static_cast<double>(offset))) / (1 + gamma));
}
private:
double gamma, logGamma;
@ -292,35 +300,3 @@ private:
};
#endif
TEST_CASE("/fdbrpc/ddsketch/accuracy") {
int TRY = 100, SIZE = 1e6;
const int totalPercentiles = 7;
double targetPercentiles[totalPercentiles] = { .0001, .01, .1, .50, .90, .99, .9999 };
double stat[totalPercentiles] = { 0 };
for (int t = 0; t < TRY; t++) {
DDSketch<double> dd;
std::vector<double> nums;
for (int i = 0; i < SIZE; i++) {
static double a = 1, b = 1; // a skewed distribution
auto y = deterministicRandom()->random01();
auto num = b / pow(1 - y, 1 / a);
nums.push_back(num);
dd.addSample(num);
}
std::sort(nums.begin(), nums.end());
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
double percentile = targetPercentiles[percentID];
double ground = nums[percentile * (SIZE - 1)], ddvalue = dd.percentile(percentile);
double relativeError = fabs(ground - ddvalue) / ground;
stat[percentID] += relativeError;
}
}
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
printf("%.4lf per, relative error %.4lf\n", targetPercentiles[percentID], stat[percentID] / TRY);
}
return Void();
}

View File

@ -20,6 +20,12 @@
#ifndef FDBRPC_STATS_H
#define FDBRPC_STATS_H
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/OTELMetrics.h"
#include "flow/serialize.h"
#include <string>
#include <type_traits>
#pragma once
@ -40,8 +46,9 @@ MyCounters() : foo("foo", cc), bar("bar", cc), baz("baz", cc) {}
#include "flow/TDMetric.actor.h"
#include "fdbrpc/DDSketch.h"
struct ICounter {
struct ICounter : public IMetric {
// All counters have a name and value
ICounter() : IMetric(knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL)) {}
virtual std::string const& getName() const = 0;
virtual int64_t getValue() const = 0;
@ -74,8 +81,11 @@ class CounterCollection {
std::string id;
std::vector<struct ICounter*> counters, countersToRemove;
double logTime;
public:
CounterCollection(std::string const& name, std::string const& id = std::string()) : name(name), id(id) {}
CounterCollection(std::string const& name, std::string const& id = std::string())
: name(name), id(id), logTime(0) {}
~CounterCollection() {
for (auto c : countersToRemove)
c->remove();
@ -90,7 +100,7 @@ public:
std::string const& getId() const { return id; }
void logToTraceEvent(TraceEvent& te) const;
void logToTraceEvent(TraceEvent& te);
Future<Void> traceCounters(
std::string const& traceEventName,
@ -100,7 +110,7 @@ public:
std::function<void(TraceEvent&)> const& decorator = [](auto& te) {});
};
struct Counter final : ICounter, NonCopyable {
struct Counter final : public ICounter, NonCopyable {
public:
typedef int64_t Value;
@ -214,48 +224,33 @@ public:
~LatencyBands();
};
class LatencySample {
class LatencySample : public IMetric {
public:
LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
: name(name), id(id), sampleStart(now()), sketch(accuracy),
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
assert(accuracy > 0);
if (accuracy <= 0) {
fmt::print(stderr, "ERROR: LatencySample {} has invalid accuracy ({})", name, accuracy);
}
logger = recurring([this]() { logSample(); }, loggingInterval);
}
void addMeasurement(double measurement) { sketch.addSample(measurement); }
LatencySample(std::string name, UID id, double loggingInterval, double accuracy);
void addMeasurement(double measurement);
private:
std::string name;
UID id;
double sampleStart;
// These UIDs below are needed to emit the tail latencies as gauges
//
// If an OTEL aggregator is able to directly accept and process histograms
// the tail latency gauges won't necessarily be needed anymore since they can be
// calculated directly from the emitted buckets. To support users who have an aggregator
// who cannot accept histograms, the tails latencies are still directly emitted.
UID p50id;
UID p90id;
UID p95id;
UID p99id;
UID p999id;
double sampleEmit;
DDSketch<double> sketch;
Future<Void> logger;
Reference<EventCacheHolder> latencySampleEventHolder;
void logSample() {
TraceEvent(name.c_str(), id)
.detail("Count", sketch.getPopulationSize())
.detail("Elapsed", now() - sampleStart)
.detail("Min", sketch.min())
.detail("Max", sketch.max())
.detail("Mean", sketch.mean())
.detail("Median", sketch.median())
.detail("P25", sketch.percentile(0.25))
.detail("P90", sketch.percentile(0.9))
.detail("P95", sketch.percentile(0.95))
.detail("P99", sketch.percentile(0.99))
.detail("P99.9", sketch.percentile(0.999))
.trackLatest(latencySampleEventHolder->trackingKey);
sketch.clear();
sampleStart = now();
}
void logSample();
};
#endif

View File

@ -194,7 +194,9 @@ ACTOR template <class T>
Future<T> timeoutWarning(Future<T> what, double time, PromiseStream<Void> output) {
state Future<Void> end = delay(time);
loop choose {
when(T t = wait(what)) { return t; }
when(T t = wait(what)) {
return t;
}
when(wait(end)) {
output.send(Void());
end = delay(time);
@ -332,7 +334,9 @@ void endStreamOnDisconnect(Future<Void> signal,
stream.setRequestStreamEndpoint(endpoint);
try {
choose {
when(wait(signal)) { stream.sendError(connection_failed()); }
when(wait(signal)) {
stream.sendError(connection_failed());
}
when(wait(peer.isValid() ? peer->disconnect.getFuture() : Never())) {
stream.sendError(connection_failed());
}
@ -361,7 +365,9 @@ Future<ErrorOr<X>> waitValueOrSignal(Future<X> value,
loop {
try {
choose {
when(X x = wait(value)) { return x; }
when(X x = wait(value)) {
return x;
}
when(wait(signal)) {
return ErrorOr<X>(IFailureMonitor::failureMonitor().knownUnauthorized(endpoint)
? unauthorized_attempt()

View File

@ -99,6 +99,7 @@ public:
LocalityData locality;
ProcessClass startingClass;
TDMetricCollection tdmetrics;
MetricCollection metrics;
ChaosMetrics chaosMetrics;
HistogramRegistry histograms;
std::map<NetworkAddress, Reference<IListener>> listenerMap;

View File

@ -39,7 +39,7 @@ typedef long long int64_t;
#if defined(WIN32) || defined(__WINS__) || defined(__MINGW32__) || defined(_MSC_VER)
#define inline __inline
//#define snprintf _snprintf
// #define snprintf _snprintf
#define usleep(x) Sleep(((x) + 999) / 1000)
#define HAS_FIBERS 1
@ -145,7 +145,7 @@ as errors in my dev settings */
extern "C" {
#endif
//#define IO_CHECK_ALLOC ENABLED(NOT_IN_CLEAN)
// #define IO_CHECK_ALLOC ENABLED(NOT_IN_CLEAN)
#ifdef IO_CHECK_ALLOC
BASEKIT_API size_t io_memsize(void* ptr);

View File

@ -13,8 +13,8 @@
#define CORO_STACK_SIZE 8192
#define CORO_STACK_SIZE_MIN 1024
#else
//#define CORO_DEFAULT_STACK_SIZE (65536/2)
//#define CORO_DEFAULT_STACK_SIZE (65536*4)
// #define CORO_DEFAULT_STACK_SIZE (65536/2)
// #define CORO_DEFAULT_STACK_SIZE (65536*4)
// 128k needed on PPC due to parser
#define CORO_DEFAULT_STACK_SIZE (128 * 1024)
@ -45,7 +45,7 @@
#if defined(WIN32) && defined(HAS_FIBERS)
#define USE_FIBERS
#elif defined(HAS_UCONTEXT)
//#elif defined(HAS_UCONTEXT) && !defined(__x86_64__)
// #elif defined(HAS_UCONTEXT) && !defined(__x86_64__)
#if !defined(USE_UCONTEXT)
#define USE_UCONTEXT
#endif

View File

@ -14,7 +14,7 @@
#endif
#endif
//#define USE_UCONTEXT 1
// #define USE_UCONTEXT 1
#if defined(__OpenBSD__)
#undef USE_UCONTEXT
@ -45,7 +45,7 @@
#endif
#include <sys/utsname.h>
#include <inttypes.h>
//#include "task.h"
// #include "task.h"
#define nil ((void*)0)
#define nelem(x) (sizeof(x) / sizeof((x)[0]))

View File

@ -1357,6 +1357,7 @@ public:
}
m->setGlobal(enNetworkConnections, (flowGlobalType)m->network);
m->setGlobal(enASIOTimedOut, (flowGlobalType) false);
m->setGlobal(INetwork::enMetrics, (flowGlobalType)&m->metrics);
TraceEvent("NewMachine")
.detail("Name", name)

View File

@ -962,7 +962,9 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
}
loop choose {
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
break;
}
when(wait(logSystemChange)) {
if (self->logSystem.get()) {
r = self->logSystem.get()->peekLogRouter(self->myId, tagAt, self->tag);
@ -1034,7 +1036,9 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
state Future<Version> committedVersion = self->getMinKnownCommittedVersion();
loop choose {
when(wait(success(present))) { break; }
when(wait(success(present))) {
break;
}
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
if (committedVersion.isReady()) {
self->popVersion =

View File

@ -4035,7 +4035,9 @@ ACTOR Future<Void> blobWorkerRecruiter(
// wait until existing blob workers have been acknowledged so we don't break recruitment invariants
loop choose {
when(wait(self->startRecruiting.onTrigger())) { break; }
when(wait(self->startRecruiting.onTrigger())) {
break;
}
}
loop {
@ -4072,7 +4074,9 @@ ACTOR Future<Void> blobWorkerRecruiter(
}
// when the CC changes, so does the request stream so we need to restart recruiting here
when(wait(recruitBlobWorker->onChange())) { fCandidateWorker = Future<RecruitBlobWorkerReply>(); }
when(wait(recruitBlobWorker->onChange())) {
fCandidateWorker = Future<RecruitBlobWorkerReply>();
}
// signal used to restart the loop and try to recruit the next blob worker
when(wait(self->restartRecruiting.onTrigger())) {}

View File

@ -1452,9 +1452,13 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
// wait for manager stream to become ready, and send a message
loop {
choose {
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
when(wait(bwData->currentManagerStatusStream.get().onReady())) {
break;
}
when(wait(bwData->currentManagerStatusStream.onChange())) {}
when(wait(metadata->resumeSnapshot.getFuture())) { break; }
when(wait(metadata->resumeSnapshot.getFuture())) {
break;
}
}
}
if (metadata->resumeSnapshot.isSet()) {
@ -1493,7 +1497,9 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
// manager change/no response
choose {
when(wait(bwData->currentManagerStatusStream.onChange())) {}
when(wait(metadata->resumeSnapshot.getFuture())) { break; }
when(wait(metadata->resumeSnapshot.getFuture())) {
break;
}
when(wait(delay(1.0))) {}
}
@ -1580,7 +1586,9 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
// wait for manager stream to become ready, and send a message
loop {
choose {
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
when(wait(bwData->currentManagerStatusStream.get().onReady())) {
break;
}
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
}
@ -1664,8 +1672,12 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
// wait for manager stream to become ready, and send a message
loop {
choose {
when(wait(delay(std::max(0.0, sendTimeGiveUp - now())))) { break; }
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
when(wait(delay(std::max(0.0, sendTimeGiveUp - now())))) {
break;
}
when(wait(bwData->currentManagerStatusStream.get().onReady())) {
break;
}
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
}
@ -1948,7 +1960,9 @@ ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata, Version
? metadata->activeCFData.get()->whenAtLeast(waitVersion)
: Never();
choose {
when(wait(atLeast)) { break; }
when(wait(atLeast)) {
break;
}
when(wait(metadata->activeCFData.onChange())) {}
}
} catch (Error& e) {
@ -3610,7 +3624,9 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
if (!bwData->isFullRestoreMode) {
choose {
when(wait(metadata->readable.getFuture())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
when(wait(metadata->cancelled.getFuture())) {
throw wrong_shard_server();
}
}
}
@ -3627,7 +3643,9 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
if (metadata->historyLoaded.canBeSet()) {
choose {
when(wait(metadata->historyLoaded.getFuture())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
when(wait(metadata->cancelled.getFuture())) {
throw wrong_shard_server();
}
}
}
@ -3639,7 +3657,9 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
when(GranuleFiles f = wait(finalChunks[chunkIdx].second)) {
rangeGranulePair.push_back(std::pair(finalChunks[chunkIdx].first, f));
}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
when(wait(metadata->cancelled.getFuture())) {
throw wrong_shard_server();
}
}
if (rangeGranulePair.back().second.snapshotFiles.empty()) {
@ -3680,9 +3700,13 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// version on rollback
try {
choose {
when(wait(waitForVersionFuture)) { break; }
when(wait(waitForVersionFuture)) {
break;
}
when(wait(metadata->activeCFData.onChange())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
when(wait(metadata->cancelled.getFuture())) {
throw wrong_shard_server();
}
}
} catch (Error& e) {
// We can get change feed cancelled from whenAtLeast. This means the change feed may

View File

@ -371,7 +371,9 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
req.reply.send(Void());
TraceEvent(SevDebug, "BackupWorkerDoneRequest", cluster->id).log();
}
when(wait(collection)) { throw internal_error(); }
when(wait(collection)) {
throw internal_error();
}
}
// failed master (better master exists) could happen while change-coordinators request processing is
// in-progress
@ -432,7 +434,9 @@ ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db,
while (db->serverInfo->get().id == knownServerInfoID) {
choose {
when(wait(yieldedFuture(db->serverInfo->onChange()))) {}
when(wait(delayJittered(300))) { break; } // The server might be long gone!
when(wait(delayJittered(300))) {
break;
} // The server might be long gone!
}
}
reply.send(db->serverInfo->get());
@ -2809,7 +2813,9 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
self->db.clearInterf(ProcessClass::BlobManagerClass);
break;
}
when(wait(self->recruitBlobManager.onChange())) { break; }
when(wait(self->recruitBlobManager.onChange())) {
break;
}
when(wait(self->db.blobGranulesEnabled.onChange())) {
// if there is a blob manager present but blob granules are now disabled, stop the BM
if (!self->db.blobGranulesEnabled.get()) {
@ -2840,7 +2846,9 @@ ACTOR Future<Void> dbInfoUpdater(ClusterControllerData* self) {
state Future<Void> updateDBInfo = self->updateDBInfo.onTrigger();
loop {
choose {
when(wait(updateDBInfo)) { wait(delay(SERVER_KNOBS->DBINFO_BATCH_DELAY) || dbInfoChange); }
when(wait(updateDBInfo)) {
wait(delay(SERVER_KNOBS->DBINFO_BATCH_DELAY) || dbInfoChange);
}
when(wait(dbInfoChange)) {}
}
@ -3221,7 +3229,9 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
CODE_PROBE(true, "Leader replaced");
return Void();
}
when(ReplyPromise<Void> ping = waitNext(interf.clientInterface.ping.getFuture())) { ping.send(Void()); }
when(ReplyPromise<Void> ping = waitNext(interf.clientInterface.ping.getFuture())) {
ping.send(Void());
}
}
}
@ -3260,7 +3270,9 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
ASSERT(false);
throw internal_error();
}
when(wait(shouldReplace)) { break; }
when(wait(shouldReplace)) {
break;
}
}
}
if (!shouldReplace.isReady()) {

View File

@ -67,7 +67,9 @@ ACTOR Future<Void> recoveryTerminateOnConflict(UID dbgid,
}
return Void();
}
when(wait(switchedState)) { return Void(); }
when(wait(switchedState)) {
return Void();
}
}
}
@ -921,8 +923,12 @@ ACTOR Future<Standalone<CommitTransactionRef>> provisionalMaster(Reference<Clust
waitNext(parent->provisionalCommitProxies[0].getKeyServersLocations.getFuture())) {
req.reply.send(Never());
}
when(wait(waitCommitProxyFailure)) { throw worker_removed(); }
when(wait(waitGrvProxyFailure)) { throw worker_removed(); }
when(wait(waitCommitProxyFailure)) {
throw worker_removed();
}
when(wait(waitGrvProxyFailure)) {
throw worker_removed();
}
}
}
@ -1127,10 +1133,14 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
if (self->recoveryTransactionVersion < minRequiredCommitVersion)
self->recoveryTransactionVersion = minRequiredCommitVersion;
}
if (BUGGIFY) {
self->recoveryTransactionVersion += deterministicRandom()->randomInt64(0, 10000000);
// Test randomly increasing the recovery version by a large number.
// When the version epoch is enabled, versions stay in sync with time.
// An offline cluster could see a large version jump when it comes back
// online, so test this behavior in simulation.
if (BUGGIFY) {
self->recoveryTransactionVersion += deterministicRandom()->randomInt64(0, 10000000);
}
}
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_RECOVERING_EVENT_NAME).c_str(),
@ -1577,8 +1587,12 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
break;
}
when(wait(oldLogSystems->onChange())) {}
when(wait(reg)) { throw internal_error(); }
when(wait(recoverAndEndEpoch)) { throw internal_error(); }
when(wait(reg)) {
throw internal_error();
}
when(wait(recoverAndEndEpoch)) {
throw internal_error();
}
}
}

View File

@ -673,7 +673,9 @@ Future<Void> testIgnore(UnitTestParameters params) {
wait(set(env, "class-B"_sr, "test_long"_sr, int64_t{ 1 }));
choose {
when(wait(delay(5))) {}
when(wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 1 }))) { ASSERT(false); }
when(wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 1 }))) {
ASSERT(false);
}
}
return Void();
}

View File

@ -513,7 +513,9 @@ class ConfigNodeImpl {
when(ConfigTransactionGetKnobsRequest req = waitNext(cti->getKnobs.getFuture())) {
wait(getKnobs(self, req));
}
when(wait(self->kvStore->getError())) { ASSERT(false); }
when(wait(self->kvStore->getError())) {
ASSERT(false);
}
}
}
}
@ -797,7 +799,9 @@ class ConfigNodeImpl {
}
req.reply.send(Void());
}
when(wait(self->kvStore->getError())) { ASSERT(false); }
when(wait(self->kvStore->getError())) {
ASSERT(false);
}
}
}
}

View File

@ -302,7 +302,9 @@ struct MovableCoordinatedStateImpl {
ASSERT(self->lastValue.present() && self->lastCSValue.present());
TraceEvent("StartMove").detail("ConnectionString", nc.toString());
choose {
when(wait(creationTimeout)) { throw new_coordinators_timed_out(); }
when(wait(creationTimeout)) {
throw new_coordinators_timed_out();
}
when(Value ncInitialValue = wait(nccs.read())) {
ASSERT(!ncInitialValue.size()); // The new coordinators must be uninitialized!
}
@ -310,7 +312,9 @@ struct MovableCoordinatedStateImpl {
TraceEvent("FinishedRead").detail("ConnectionString", nc.toString());
choose {
when(wait(creationTimeout)) { throw new_coordinators_timed_out(); }
when(wait(creationTimeout)) {
throw new_coordinators_timed_out();
}
when(wait(nccs.setExclusive(
BinaryWriter::toValue(MovableValue(self->lastValue.get(),
MovableValue::MovingFrom,

View File

@ -47,7 +47,9 @@ class LivenessChecker {
ACTOR static Future<Void> checkStuck(LivenessChecker const* self) {
loop {
choose {
when(wait(delayUntil(self->lastTime.get() + self->threshold))) { return Void(); }
when(wait(delayUntil(self->lastTime.get() + self->threshold))) {
return Void();
}
when(wait(self->lastTime.onChange())) {}
}
}
@ -280,7 +282,9 @@ ACTOR Future<Void> remoteMonitorLeader(int* clientCount,
when(wait(yieldedFuture(currentElectedLeaderOnChange))) {
currentElectedLeaderOnChange = currentElectedLeader->onChange();
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { break; }
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) {
break;
}
}
}

View File

@ -1150,7 +1150,7 @@ struct DDQueue : public IDDRelocationQueue {
// canceled inflight relocateData. Launch the relocation for the rd.
void launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>> combined,
const DDEnabledState* ddEnabledState) {
int startedHere = 0;
[[maybe_unused]] int startedHere = 0;
double startTime = now();
// kick off relocators from items in the queue as need be
std::set<RelocateData, std::greater<RelocateData>>::iterator it = combined.begin();
@ -2530,7 +2530,9 @@ ACTOR Future<Void> dataDistributionQueue(Reference<IDDTxnProcessor> db,
debug_setCheckRelocationDuration(false);
}
}
when(KeyRange done = waitNext(rangesComplete.getFuture())) { keysToLaunchFrom = done; }
when(KeyRange done = waitNext(rangesComplete.getFuture())) {
keysToLaunchFrom = done;
}
when(wait(recordMetrics)) {
Promise<int64_t> req;
getAverageShardBytes.send(req);
@ -2633,7 +2635,9 @@ TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") {
std::cout << "Start trace counter unit test for " << duration << "s ...\n";
loop choose {
when(wait(counterFuture)) {}
when(wait(finishFuture)) { break; }
when(wait(finishFuture)) {
break;
}
when(wait(delayJittered(2.0))) {
std::vector<UID> team(3);
for (int i = 0; i < team.size(); ++i) {

View File

@ -1449,7 +1449,9 @@ ACTOR Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, Get
ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
choose {
when(wait(fetchShardMetricsList_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { req.reply.sendError(timed_out()); }
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
req.reply.sendError(timed_out());
}
}
return Void();
}
@ -1492,7 +1494,9 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
}
loop choose {
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) {
req.send(self.getAverageShardBytes());
}
when(wait(loggingTrigger)) {
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards->size())

View File

@ -135,7 +135,9 @@ public:
loop {
choose {
when(wait(self->buildTeams())) { return Void(); }
when(wait(self->buildTeams())) {
return Void();
}
when(wait(self->restartTeamBuilder.onTrigger())) {}
}
}
@ -525,7 +527,9 @@ public:
while (self->pauseWiggle && !self->pauseWiggle->get() && self->waitUntilRecruited.get()) {
choose {
when(wait(self->waitUntilRecruited.onChange() || self->pauseWiggle->onChange())) {}
when(wait(delay(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY, g_network->getCurrentTask()))) { break; }
when(wait(delay(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY, g_network->getCurrentTask()))) {
break;
}
}
}
@ -1361,7 +1365,9 @@ public:
.detail("ConfigStoreType", self->configuration.storageServerStoreType)
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get());
}
when(wait(server->wakeUpTracker.getFuture())) { server->wakeUpTracker = Promise<Void>(); }
when(wait(server->wakeUpTracker.getFuture())) {
server->wakeUpTracker = Promise<Void>();
}
when(wait(storageMetadataTracker)) {}
when(wait(server->ssVersionTooFarBehind.onChange())) {}
when(wait(self->disableFailingLaggingServers.onChange())) {}
@ -2103,7 +2109,9 @@ public:
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount);
}
when(wait(pauseChanged)) { continue; }
when(wait(pauseChanged)) {
continue;
}
}
}
}
@ -2619,7 +2627,9 @@ public:
}
}
}
when(wait(recruitStorage->onChange())) { fCandidateWorker = Future<RecruitStorageReply>(); }
when(wait(recruitStorage->onChange())) {
fCandidateWorker = Future<RecruitStorageReply>();
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (!pendingTSSCheck && self->zeroHealthyTeams->get() &&
(self->isTssRecruiting || self->tss_info_by_pair.size() > 0)) {
@ -4066,7 +4076,6 @@ void DDTeamCollection::traceAllInfo(bool shouldPrint) const {
void DDTeamCollection::rebuildMachineLocalityMap() {
machineLocalityMap.clear();
int numHealthyMachine = 0;
for (auto& [_, machine] : machine_info) {
if (machine->serversOnMachine.empty()) {
TraceEvent(SevWarn, "RebuildMachineLocalityMapError")
@ -4087,7 +4096,6 @@ void DDTeamCollection::rebuildMachineLocalityMap() {
}
const LocalityEntry& localityEntry = machineLocalityMap.add(locality, &representativeServer->getId());
machine->localityEntry = localityEntry;
++numHealthyMachine;
}
}
@ -5880,43 +5888,43 @@ TEST_CASE("/DataDistribution/GetTeam/DeprioritizeWigglePausedTeam") {
}
TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithMinAge") {
state StorageWiggler wiggler(nullptr);
state Reference<StorageWiggler> wiggler = makeReference<StorageWiggler>(nullptr);
state double startTime = now();
wiggler.addServer(UID(1, 0),
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 5.0,
KeyValueStoreType::SSD_BTREE_V2));
wiggler.addServer(UID(2, 0),
StorageMetadataType(
startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, KeyValueStoreType::MEMORY, true));
wiggler.addServer(UID(3, 0), StorageMetadataType(startTime - 5.0, KeyValueStoreType::SSD_ROCKSDB_V1, true));
wiggler.addServer(UID(4, 0),
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC - 1.0,
KeyValueStoreType::SSD_BTREE_V2));
wiggler->addServer(UID(1, 0),
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 5.0,
KeyValueStoreType::SSD_BTREE_V2));
wiggler->addServer(UID(2, 0),
StorageMetadataType(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC,
KeyValueStoreType::MEMORY,
true));
wiggler->addServer(UID(3, 0), StorageMetadataType(startTime - 5.0, KeyValueStoreType::SSD_ROCKSDB_V1, true));
wiggler->addServer(UID(4, 0),
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC - 1.0,
KeyValueStoreType::SSD_BTREE_V2));
std::vector<Optional<UID>> correctResult{ UID(3, 0), UID(2, 0), UID(4, 0), Optional<UID>() };
for (int i = 0; i < 4; ++i) {
auto id = wiggler.getNextServerId();
auto id = wiggler->getNextServerId();
ASSERT(id == correctResult[i]);
}
{
std::cout << "Finish Initial Check. Start test getNextWigglingServerID() loop...\n";
// test the getNextWigglingServerID() loop
UID id = wait(DDTeamCollectionImpl::getNextWigglingServerID(Reference<StorageWiggler>::addRef(&wiggler)));
UID id = wait(DDTeamCollectionImpl::getNextWigglingServerID(wiggler));
ASSERT(id == UID(1, 0));
}
std::cout << "Test after addServer() ...\n";
state Future<UID> nextFuture =
DDTeamCollectionImpl::getNextWigglingServerID(Reference<StorageWiggler>::addRef(&wiggler));
state Future<UID> nextFuture = DDTeamCollectionImpl::getNextWigglingServerID(wiggler);
ASSERT(!nextFuture.isReady());
startTime = now();
StorageMetadataType metadata(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 100.0,
KeyValueStoreType::SSD_BTREE_V2);
wiggler.addServer(UID(5, 0), metadata);
wiggler->addServer(UID(5, 0), metadata);
ASSERT(!nextFuture.isReady());
std::cout << "Test after updateServer() ...\n";
StorageWiggler* ptr = &wiggler;
StorageWiggler* ptr = wiggler.getPtr();
wait(trigger(
[ptr]() {
ptr->updateMetadata(UID(5, 0),
@ -5933,22 +5941,22 @@ TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithMinAge") {
TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithTSS") {
state std::unique_ptr<DDTeamCollection> collection =
DDTeamCollectionUnitTest::testMachineTeamCollection(1, Reference<IReplicationPolicy>(new PolicyOne()), 5);
state StorageWiggler wiggler(collection.get());
state Reference<StorageWiggler> wiggler = makeReference<StorageWiggler>(collection.get());
std::cout << "Test when need TSS ... \n";
collection->configuration.usableRegions = 1;
collection->configuration.desiredTSSCount = 1;
state double startTime = now();
wiggler.addServer(UID(1, 0),
StorageMetadataType(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0,
KeyValueStoreType::SSD_BTREE_V2));
wiggler.addServer(UID(2, 0),
StorageMetadataType(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0,
KeyValueStoreType::SSD_BTREE_V2));
ASSERT(!wiggler.getNextServerId(true).present());
ASSERT(wiggler.getNextServerId(collection->reachTSSPairTarget()) == UID(1, 0));
UID id = wait(DDTeamCollectionImpl::getNextWigglingServerID(
Reference<StorageWiggler>::addRef(&wiggler), Optional<Value>(), Optional<Value>(), collection.get()));
wiggler->addServer(UID(1, 0),
StorageMetadataType(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0,
KeyValueStoreType::SSD_BTREE_V2));
wiggler->addServer(UID(2, 0),
StorageMetadataType(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0,
KeyValueStoreType::SSD_BTREE_V2));
ASSERT(!wiggler->getNextServerId(true).present());
ASSERT(wiggler->getNextServerId(collection->reachTSSPairTarget()) == UID(1, 0));
UID id = wait(
DDTeamCollectionImpl::getNextWigglingServerID(wiggler, Optional<Value>(), Optional<Value>(), collection.get()));
ASSERT(now() - startTime < SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0);
ASSERT(id == UID(2, 0));
return Void();

View File

@ -351,7 +351,9 @@ ACTOR Future<Void> globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProx
Void()) &&
delay(SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_INTERVAL);
}
when(wait(actors.getResult())) { ASSERT(false); }
when(wait(actors.getResult())) {
ASSERT(false);
}
}
}
}

View File

@ -721,7 +721,7 @@ struct IntKeyCursor {
db.checkError("BtreeCloseCursor", sqlite3BtreeCloseCursor(cursor));
} catch (...) {
}
delete[](char*) cursor;
delete[] (char*)cursor;
}
}
};
@ -759,7 +759,7 @@ struct RawCursor {
} catch (...) {
TraceEvent(SevError, "RawCursorDestructionError").log();
}
delete[](char*) cursor;
delete[] (char*)cursor;
}
}
void moveFirst() {
@ -1912,14 +1912,11 @@ private:
readThreads[i].clear();
}
void checkFreePages() {
int iterations = 0;
int64_t freeListSize = freeListPages;
while (!freeTableEmpty && freeListSize < SERVER_KNOBS->CHECK_FREE_PAGE_AMOUNT) {
int deletedPages = cursor->lazyDelete(SERVER_KNOBS->CHECK_FREE_PAGE_AMOUNT);
freeTableEmpty = (deletedPages != SERVER_KNOBS->CHECK_FREE_PAGE_AMOUNT);
springCleaningStats.lazyDeletePages += deletedPages;
++iterations;
freeListSize = conn.freePages();
}

View File

@ -639,7 +639,6 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
return 0;
}
int accumulatedRows = 0;
int accumulatedBytes = 0;
rocksdb::Status s;
@ -651,7 +650,6 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
cursor->Seek(toSlice(range.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < range.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
++accumulatedRows;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result->push_back_deep(result->arena(), kv);
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
@ -671,7 +669,6 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
}
while (cursor->Valid() && toStringRef(cursor->key()) >= range.begin) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
++accumulatedRows;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result->push_back_deep(result->arena(), kv);
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.

View File

@ -210,8 +210,12 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log();
break;
}
when(wait(candidacies)) { ASSERT(false); }
when(wait(asyncPriorityInfo->onChange())) { break; }
when(wait(candidacies)) {
ASSERT(false);
}
when(wait(asyncPriorityInfo->onChange())) {
break;
}
}
}

View File

@ -327,8 +327,12 @@ class LocalConfigurationImpl {
ACTOR static Future<Void> consume(LocalConfigurationImpl* self, ConfigBroadcastInterface broadcaster) {
loop {
choose {
when(wait(consumeInternal(self, broadcaster))) { ASSERT(false); }
when(wait(self->kvStore->getError())) { ASSERT(false); }
when(wait(consumeInternal(self, broadcaster))) {
ASSERT(false);
}
when(wait(self->kvStore->getError())) {
ASSERT(false);
}
}
}
}

View File

@ -60,11 +60,8 @@ struct LogRouterData {
TaskPriority taskID) {
while (!self->version_messages.empty() && self->version_messages.front().first < before) {
Version version = self->version_messages.front().first;
int64_t messagesErased = 0;
while (!self->version_messages.empty() && self->version_messages.front().first == version) {
++messagesErased;
self->version_messages.pop_front();
}
@ -787,7 +784,9 @@ ACTOR Future<Void> logRouter(TLogInterface interf,
.detail("Locality", req.locality);
state Future<Void> core = logRouterCore(interf, req, db);
loop choose {
when(wait(core)) { return Void(); }
when(wait(core)) {
return Void();
}
when(wait(checkRemoved(db, req.recoveryCount, interf))) {}
}
} catch (Error& e) {

View File

@ -43,7 +43,9 @@ public:
if (!self->cursor->hasMessage()) {
loop {
choose {
when(wait(self->cursor->getMore())) { break; }
when(wait(self->cursor->getMore())) {
break;
}
when(wait(self->localityChanged)) {
self->cursor = self->logSystem->peekTxs(
UID(),

View File

@ -452,7 +452,9 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when(wait(self->interf->onChange())) { self->onlySpilled = false; }
when(wait(self->interf->onChange())) {
self->onlySpilled = false;
}
}
}
} catch (Error& e) {

View File

@ -0,0 +1,147 @@
/*
* MetricClient.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/MetricClient.h"
#include "fdbrpc/Stats.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/OTELMetrics.h"
#include "flow/TDMetric.actor.h"
#include "flow/Msgpack.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include <cstddef>
#ifndef WIN32
#include <sys/socket.h>
#endif
#include "flow/actorcompiler.h"
#include "flow/network.h"
UDPMetricClient::UDPMetricClient()
: socket_fd(-1), model(knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL)),
buf{ MsgpackBuffer{ .buffer = std::make_unique<uint8_t[]>(1024), .data_size = 0, .buffer_size = 1024 } },
address((model == STATSD) ? FLOW_KNOBS->STATSD_UDP_EMISSION_ADDR : FLOW_KNOBS->OTEL_UDP_EMISSION_ADDR),
port((model == STATSD) ? FLOW_KNOBS->STATSD_UDP_EMISSION_PORT : FLOW_KNOBS->OTEL_UDP_EMISSION_PORT) {
NetworkAddress destAddress = NetworkAddress::parse(address + ":" + std::to_string(port));
socket = INetworkConnections::net()->createUDPSocket(destAddress);
model = knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL);
}
// Since MSG_DONTWAIT isn't defined for Windows, we need to add a
// ifndef guard here to avoid any compilation issues
void UDPMetricClient::send_packet(int fd, const void* data, size_t len) {
#ifndef WIN32
::send(fd, data, len, MSG_DONTWAIT);
#endif
}
void UDPMetricClient::send(MetricCollection* metrics) {
if (!socket.isReady()) {
return;
}
socket_fd = socket.get()->native_handle();
if (socket_fd == -1)
return;
if (model == OTLP) {
std::vector<std::vector<OTEL::OTELSum>> sums;
std::vector<OTEL::OTELGauge> gauges;
// Define custom serialize functions
auto f_sums = [](const std::vector<OTEL::OTELSum>& vec, MsgpackBuffer& buf) {
typedef void (*func_ptr)(const OTEL::OTELSum&, MsgpackBuffer&);
func_ptr f = OTEL::serialize;
serialize_vector(vec, buf, f);
};
auto f_hists = [](const std::vector<OTEL::OTELHistogram>& vec, MsgpackBuffer& buf) {
typedef void (*func_ptr)(const OTEL::OTELHistogram&, MsgpackBuffer&);
func_ptr f = OTEL::serialize;
serialize_vector(vec, buf, f);
};
auto f_gauge = [](const std::vector<OTEL::OTELGauge>& vec, MsgpackBuffer& buf) {
typedef void (*func_ptr)(const OTEL::OTELGauge&, MsgpackBuffer&);
func_ptr f = OTEL::serialize;
serialize_vector(vec, buf, f);
};
std::vector<OTEL::OTELSum> currentSums;
size_t current_msgpack = 0;
for (const auto& [_, s] : metrics->sumMap) {
if (current_msgpack < MAX_OTELSUM_PACKET_SIZE) {
currentSums.push_back(std::move(s));
current_msgpack += s.getMsgpackBytes();
} else {
sums.push_back(std::move(currentSums));
currentSums.clear();
current_msgpack = 0;
}
}
if (!sums.empty()) {
for (const auto& currSums : sums) {
serialize_ext(currSums, buf, OTEL::OTELMetricType::Sum, f_sums);
send_packet(socket_fd, buf.buffer.get(), buf.data_size);
int error = errno;
TraceEvent("MetricsSumUdpErrno", UID()).detail("Errno", error);
buf.reset();
}
metrics->sumMap.clear();
}
// Each histogram should be in a seperate because of their large sizes
// Expected DDSketch size is ~4200 entries * 9 bytes = 37800
for (const auto& [_, h] : metrics->histMap) {
const std::vector<OTEL::OTELHistogram> singleHist{ std::move(h) };
serialize_ext(singleHist, buf, OTEL::OTELMetricType::Hist, f_hists);
send_packet(socket_fd, buf.buffer.get(), buf.data_size);
int error = errno;
TraceEvent("MetricsHistUdpErrno", UID()).detail("Errno", error);
buf.reset();
}
metrics->histMap.clear();
for (const auto& [_, g] : metrics->gaugeMap) {
gauges.push_back(std::move(g));
}
if (!gauges.empty()) {
serialize_ext(gauges, buf, OTEL::OTELMetricType::Gauge, f_gauge);
send_packet(socket_fd, buf.buffer.get(), buf.data_size);
int error = errno;
TraceEvent("MetricsGaugeUdpErrno", UID()).detail("Errno", error);
metrics->gaugeMap.clear();
buf.reset();
}
} else if (model == MetricsDataModel::STATSD) {
std::string messages;
for (const auto& msg : metrics->statsd_message) {
// Account for max udp packet size (+1 since we add '\n')
if (messages.size() + msg.size() + 1 < IUDPSocket::MAX_PACKET_SIZE) {
messages += (std::move(msg) + '\n');
} else {
send_packet(socket_fd, buf.buffer.get(), buf.data_size);
}
}
if (!messages.empty()) {
send_packet(socket_fd, messages.data(), messages.size());
}
metrics->statsd_message.clear();
}
}

View File

@ -19,13 +19,27 @@
*/
#include <cmath>
#include <cstddef>
#include <memory>
#include "msgpack.hpp"
#include <msgpack/v3/unpack_decl.hpp>
#include <string>
#include "fdbrpc/Stats.h"
#include "flow/Msgpack.h"
#include "flow/ApiVersion.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/OTELMetrics.h"
#include "flow/SystemMonitor.h"
#include "flow/UnitTest.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbserver/MetricLogger.actor.h"
#include "fdbserver/MetricClient.h"
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct MetricsRule {
@ -189,7 +203,7 @@ public:
};
ACTOR Future<Void> dumpMetrics(Database cx, MetricsConfig* config, TDMetricCollection* collection) {
state MetricUpdateBatch batch;
state MetricBatch batch;
state Standalone<MetricKeyRef> mk;
ASSERT(collection != nullptr);
mk.prefix = StringRef(mk.arena(), config->space.key());
@ -225,8 +239,8 @@ ACTOR Future<Void> dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle
state std::map<int, Future<Void>> results;
// Call all of the callbacks, map each index to its resulting future
for (int i = 0, iend = batch.callbacks.size(); i < iend; ++i)
results[i] = batch.callbacks[i](&mdb, &batch);
for (int i = 0, iend = batch.scope.callbacks.size(); i < iend; ++i)
results[i] = batch.scope.callbacks[i](&mdb, &batch.scope);
loop {
state std::map<int, Future<Void>>::iterator cb = results.begin();
@ -249,7 +263,7 @@ ACTOR Future<Void> dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle
// Otherwise, wait to retry
wait(cbtr.onError(lastError));
for (auto& cb : results)
cb.second = batch.callbacks[cb.first](&mdb, &batch);
cb.second = batch.scope.callbacks[cb.first](&mdb, &batch.scope);
}
// If there are more rolltimes then next dump is now, otherwise if no metrics are enabled then it is
@ -267,19 +281,19 @@ ACTOR Future<Void> dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
for (auto& i : batch.inserts) {
for (auto& i : batch.scope.inserts) {
// fprintf(stderr, "%s: dump insert: %s\n", collection->address.toString().c_str(),
// printable(allInsertions[i].key).c_str());
tr.set(i.key, i.value());
}
for (auto& a : batch.appends) {
for (auto& a : batch.scope.appends) {
// fprintf(stderr, "%s: dump append: %s\n", collection->address.toString().c_str(),
// printable(allAppends[i].key).c_str());
tr.atomicOp(a.key, a.value(), MutationRef::AppendIfFits);
}
for (auto& u : batch.updates) {
for (auto& u : batch.scope.updates) {
// fprintf(stderr, "%s: dump update: %s\n", collection->address.toString().c_str(),
// printable(allUpdates[i].first).c_str());
tr.set(u.first, u.second);
@ -403,6 +417,66 @@ ACTOR Future<Void> runMetrics(Future<Database> fcx, Key prefix) {
return Void();
}
ACTOR Future<Void> startMetricsSimulationServer(MetricsDataModel model) {
if (model == MetricsDataModel::NONE) {
return Void{};
}
state uint32_t port = 0;
switch (model) {
case MetricsDataModel::STATSD:
port = FLOW_KNOBS->STATSD_UDP_EMISSION_PORT;
case MetricsDataModel::OTLP:
port = FLOW_KNOBS->OTEL_UDP_EMISSION_PORT;
case MetricsDataModel::NONE:
port = 0;
}
TraceEvent(SevInfo, "MetricsUDPServerStarted").detail("Address", "127.0.0.1").detail("Port", port);
state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(port));
state Reference<IUDPSocket> serverSocket = wait(INetworkConnections::net()->createUDPSocket(localAddress));
serverSocket->bind(localAddress);
state Standalone<StringRef> packetString = makeString(IUDPSocket::MAX_PACKET_SIZE);
state uint8_t* packet = mutateString(packetString);
loop {
int size = wait(serverSocket->receive(packet, packet + IUDPSocket::MAX_PACKET_SIZE));
auto message = packetString.substr(0, size);
// Let's just focus on statsd for now. For statsd, the message is expected to be seperated by newlines. We need
// to break each statsd metric and verify them individually.
if (model == MetricsDataModel::STATSD) {
std::string statsd_message = message.toString();
auto metrics = splitString(statsd_message, "\n");
for (const auto& metric : metrics) {
ASSERT(verifyStatsdMessage(metric));
}
} else if (model == MetricsDataModel::OTLP) {
msgpack::object_handle result;
msgpack::unpack(result, reinterpret_cast<const char*>(packet), size);
}
}
}
ACTOR Future<Void> runMetrics() {
state MetricCollection* metrics = nullptr;
MetricsDataModel model = knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL);
if (model == MetricsDataModel::NONE) {
return Void{};
}
state UDPMetricClient metricClient;
state Future<Void> metricsActor;
if (g_network->isSimulated()) {
metricsActor = startMetricsSimulationServer(model);
}
loop {
metrics = MetricCollection::getMetricCollection();
if (metrics != nullptr) {
metricClient.send(metrics);
}
wait(delay(FLOW_KNOBS->METRICS_EMISSION_INTERVAL));
}
}
TEST_CASE("/fdbserver/metrics/TraceEvents") {
auto getenv2 = [](const char* s) -> const char* {
s = getenv(s);

View File

@ -768,7 +768,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
ACTOR Future<Void> updateStorageLoop(TLogData* self) {
wait(delay(0, TaskPriority::UpdateStorage));
loop { wait(updateStorage(self)); }
loop {
wait(updateStorage(self));
}
}
void commitMessages(Reference<LogData> self,
@ -1592,13 +1594,17 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
choose {
when(wait(updateStorage(self))) {}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
}
}
}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} catch (Error& e) {

View File

@ -1014,7 +1014,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
ACTOR Future<Void> updateStorageLoop(TLogData* self) {
wait(delay(0, TaskPriority::UpdateStorage));
loop { wait(updateStorage(self)); }
loop {
wait(updateStorage(self));
}
}
void commitMessages(TLogData* self,
@ -2071,7 +2073,9 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
while (!endVersion.present() || logData->version.get() < endVersion.get()) {
loop {
choose {
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
break;
}
when(wait(dbInfoChange)) {
if (logData->logSystem->get()) {
r = logData->logSystem->get()->peek(logData->logId, tagAt, endVersion, tags, true);
@ -2503,13 +2507,17 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
choose {
when(wait(updateStorage(self))) {}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
}
}
}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} catch (Error& e) {
@ -2831,7 +2839,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
when(wait(error)) { throw internal_error(); }
when(wait(error)) {
throw internal_error();
}
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;

View File

@ -1232,7 +1232,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
ACTOR Future<Void> updateStorageLoop(TLogData* self) {
wait(delay(0, TaskPriority::UpdateStorage));
loop { wait(updateStorage(self)); }
loop {
wait(updateStorage(self));
}
}
void commitMessages(TLogData* self,
@ -2530,7 +2532,9 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
while (!endVersion.present() || logData->version.get() < endVersion.get()) {
loop {
choose {
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
break;
}
when(wait(dbInfoChange)) {
if (logData->logSystem->get()) {
r = logData->logSystem->get()->peek(logData->logId, tagAt, endVersion, tags, parallelGetMore);
@ -2980,7 +2984,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
choose {
when(wait(updateStorage(self))) {}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} else {
@ -2991,7 +2997,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
}
}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} catch (Error& e) {
@ -3319,7 +3327,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
when(wait(error)) { throw internal_error(); }
when(wait(error)) {
throw internal_error();
}
when(wait(activeSharedTLog->onChange())) {
if (activeSharedTLog->get() == tlogId) {
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;

View File

@ -359,7 +359,7 @@ int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& message
TraceEvent("QuietDatabaseGotMaxStorageServerQueueSize")
.detail("Stage", "MaxComputed")
.detail("Max", maxQueueSize)
.detail("MaxQueueServer", format("%016" PRIx64, maxQueueServer.first()));
.detail("MaxQueueServer", maxQueueServer);
return maxQueueSize;
}
@ -380,14 +380,14 @@ ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInter
when(wait(timeout)) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics")
.detail("Storage", format("%016" PRIx64, storage.first()));
.detail("Storage", storage);
throw timed_out();
}
}
if (retries > 30) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics x30")
.detail("Storage", format("%016" PRIx64, storage.first()))
.detail("Storage", storage)
.detail("Version", version);
throw timed_out();
}

View File

@ -56,7 +56,9 @@ struct AfterReturn {
ACTOR void sendCommitReply(IKVSCommitRequest commitReq, IKeyValueStore* kvStore, Future<Void> onClosed) {
try {
choose {
when(wait(onClosed)) { commitReq.reply.sendError(remote_kvs_cancelled()); }
when(wait(onClosed)) {
commitReq.reply.sendError(remote_kvs_cancelled());
}
when(wait(kvStore->commit(commitReq.sequential))) {
StorageBytes storageBytes = kvStore->getStorageBytes();
commitReq.reply.send(IKVSCommitReply(storageBytes));
@ -102,8 +104,12 @@ ACTOR Future<Void> runIKVS(OpenKVStoreRequest openReq, IKVSInterface ikvsInterfa
when(IKVSGetValueRequest getReq = waitNext(ikvsInterface.getValue.getFuture())) {
actors.add(cancellableForwardPromise(getReq.reply, kvStore->readValue(getReq.key, getReq.options)));
}
when(IKVSSetRequest req = waitNext(ikvsInterface.set.getFuture())) { kvStore->set(req.keyValue); }
when(IKVSClearRequest req = waitNext(ikvsInterface.clear.getFuture())) { kvStore->clear(req.range); }
when(IKVSSetRequest req = waitNext(ikvsInterface.set.getFuture())) {
kvStore->set(req.keyValue);
}
when(IKVSClearRequest req = waitNext(ikvsInterface.clear.getFuture())) {
kvStore->clear(req.range);
}
when(IKVSCommitRequest commitReq = waitNext(ikvsInterface.commit.getFuture())) {
sendCommitReply(commitReq, kvStore, onClosed.getFuture());
}

View File

@ -251,7 +251,9 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
}
choose {
when(wait(self->version.whenAtLeast(req.prevVersion))) { break; }
when(wait(self->version.whenAtLeast(req.prevVersion))) {
break;
}
when(wait(self->checkNeededVersion.onTrigger())) {}
}
}
@ -750,7 +752,9 @@ ACTOR Future<Void> resolver(ResolverInterface resolver,
try {
state Future<Void> core = resolverCore(resolver, initReq, db);
loop choose {
when(wait(core)) { return Void(); }
when(wait(core)) {
return Void();
}
when(wait(checkRemoved(db, initReq.recoveryCount, resolver))) {}
}
} catch (Error& e) {

View File

@ -284,12 +284,10 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
.detail("GetKeys", incompleteStagingKeys.size())
.detail("DelayTime", delayTime);
ASSERT(!g_network->isSimulated());
int i = 0;
for (auto& key : incompleteStagingKeys) {
MutationRef m(MutationRef::SetValue, key.first, "0"_sr);
key.second->second.add(m, LogMessageVersion(1));
key.second->second.precomputeResult("GetAndComputeStagingKeys", applierID, batchIndex);
i++;
}
return Void();
}

View File

@ -303,7 +303,9 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf,
TraceEvent("FastRestoreLoaderCoreExitRole", self->id());
break;
}
when(wait(error)) { TraceEvent("FastRestoreLoaderActorCollectionError", self->id()); }
when(wait(error)) {
TraceEvent("FastRestoreLoaderActorCollectionError", self->id());
}
}
} catch (Error& e) {
bool isError = e.code() != error_code_operation_cancelled; // == error_code_broken_promise

View File

@ -72,7 +72,9 @@ void logTraceEvent(const RocksDBLogRecord& record) {
ACTOR Future<Void> rocksDBPeriodicallyLogger(RocksDBLogger* pRecords) {
loop choose {
when(wait(delay(0.1))) { pRecords->consume(); }
when(wait(delay(0.1))) {
pRecords->consume();
}
}
}

View File

@ -327,8 +327,12 @@ TEST_CASE("fdbserver/SimKmsConnector") {
state SimKmsConnector connector("SimKmsConnector");
loop choose {
when(wait(connector.connectorCore(inf))) { throw internal_error(); }
when(wait(testRunWorkload(inf, maxEncryptKeys))) { break; }
when(wait(connector.connectorCore(inf))) {
throw internal_error();
}
when(wait(testRunWorkload(inf, maxEncryptKeys))) {
break;
}
}
return Void();
}

View File

@ -2350,7 +2350,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
.detail("ConfigString", startingConfigString);
bool requiresExtraDBMachines = !g_simulator->extraDatabases.empty() && !useLocalDatabase;
int assignedMachines = 0, nonVersatileMachines = 0;
int assignedMachines = 0;
bool gradualMigrationPossible = true;
std::vector<ProcessClass::ClassType> processClassesSubSet = { ProcessClass::UnsetClass,
ProcessClass::StatelessClass };
@ -2404,10 +2404,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
else
processClass = ProcessClass((ProcessClass::ClassType)deterministicRandom()->randomInt(0, 3),
ProcessClass::CommandLineSource); // Unset, Storage, or Transaction
if (processClass ==
ProcessClass::StatelessClass) { // *can't* be assigned to other roles, even in an emergency
nonVersatileMachines++;
}
if (processClass == ProcessClass::UnsetClass || processClass == ProcessClass::StorageClass) {
possible_ss++;
}
@ -2419,11 +2416,9 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
if (machine >= machines) {
if (storageCacheMachines > 0 && dc == 0) {
processClass = ProcessClass(ProcessClass::StorageCacheClass, ProcessClass::CommandLineSource);
nonVersatileMachines++;
storageCacheMachines--;
} else if (blobWorkerMachines > 0) { // add blob workers to every DC
processClass = ProcessClass(ProcessClass::BlobWorkerClass, ProcessClass::CommandLineSource);
nonVersatileMachines++;
blobWorkerMachines--;
}
}

View File

@ -295,7 +295,7 @@ private:
FastAllocator<128>::release(this);
INSTRUMENT_RELEASE("SkipListNode128");
} else {
delete[](char*) this;
delete[] (char*)this;
INSTRUMENT_RELEASE("SkipListNodeLarge");
}
}

View File

@ -456,7 +456,9 @@ ACTOR Future<Version> waitForVersionNoTooOld(StorageCacheData* data, Version ver
if (version <= data->version.get())
return version;
choose {
when(wait(data->version.whenAtLeast(version))) { return version; }
when(wait(data->version.whenAtLeast(version))) {
return version;
}
when(wait(delay(SERVER_KNOBS->FUTURE_VERSION_DELAY))) {
if (deterministicRandom()->random01() < 0.001)
TraceEvent(SevWarn, "CacheServerFutureVersion1000x", data->thisServerID)
@ -1848,7 +1850,9 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
loop {
loop choose {
when(wait(cursor ? cursor->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait(cursor ? cursor->getMore(TaskPriority::TLogCommit) : Never())) {
break;
}
when(wait(dbInfoChange)) {
dbInfoChange = data->db->onChange();
if (data->db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
@ -2250,13 +2254,21 @@ ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi,
// actors.add(self->readGuard(req , getValueQ));
actors.add(getValueQ(&self, req));
}
when(WatchValueRequest req = waitNext(ssi.watchValue.getFuture())) { ASSERT(false); }
when(GetKeyRequest req = waitNext(ssi.getKey.getFuture())) { actors.add(getKey(&self, req)); }
when(WatchValueRequest req = waitNext(ssi.watchValue.getFuture())) {
ASSERT(false);
}
when(GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
actors.add(getKey(&self, req));
}
when(GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture())) {
actors.add(getKeyValues(&self, req));
}
when(GetShardStateRequest req = waitNext(ssi.getShardState.getFuture())) { ASSERT(false); }
when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) { ASSERT(false); }
when(GetShardStateRequest req = waitNext(ssi.getShardState.getFuture())) {
ASSERT(false);
}
when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
ASSERT(false);
}
// when( ReplyPromise<Version> reply = waitNext(ssi.getVersion.getFuture()) ) {
// ASSERT(false);
//}
@ -2264,21 +2276,39 @@ ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi,
ASSERT(false);
}
when(GetMappedKeyValuesRequest req = waitNext(ssi.getMappedKeyValues.getFuture())) { ASSERT(false); }
when(WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) { ASSERT(false); }
when(SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) { ASSERT(false); }
when(GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) { ASSERT(false); }
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) { ASSERT(false); }
when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) { ASSERT(false); }
when(GetKeyValuesStreamRequest req = waitNext(ssi.getKeyValuesStream.getFuture())) { ASSERT(false); }
when(ChangeFeedStreamRequest req = waitNext(ssi.changeFeedStream.getFuture())) { ASSERT(false); }
when(GetMappedKeyValuesRequest req = waitNext(ssi.getMappedKeyValues.getFuture())) {
ASSERT(false);
}
when(WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
ASSERT(false);
}
when(SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
ASSERT(false);
}
when(GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
ASSERT(false);
}
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) {
ASSERT(false);
}
when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) {
ASSERT(false);
}
when(GetKeyValuesStreamRequest req = waitNext(ssi.getKeyValuesStream.getFuture())) {
ASSERT(false);
}
when(ChangeFeedStreamRequest req = waitNext(ssi.changeFeedStream.getFuture())) {
ASSERT(false);
}
when(OverlappingChangeFeedsRequest req = waitNext(ssi.overlappingChangeFeeds.getFuture())) {
// Simulate endpoint not found so that the requester will try another endpoint
// This is a workaround to the fact that storage servers do not have an easy way to enforce this
// request goes only to other storage servers, and in simulation we manage to trigger this behavior
req.reply.sendError(broken_promise());
}
when(ChangeFeedPopRequest req = waitNext(ssi.changeFeedPop.getFuture())) { ASSERT(false); }
when(ChangeFeedPopRequest req = waitNext(ssi.changeFeedPop.getFuture())) {
ASSERT(false);
}
when(ChangeFeedVersionUpdateRequest req = waitNext(ssi.changeFeedVersionUpdate.getFuture())) {
ASSERT(false);
}

View File

@ -51,7 +51,9 @@ public:
interfaceChanged = server->onInterfaceChanged;
resetRequest = Void();
}
when(wait(serverRemoved)) { return Void(); }
when(wait(serverRemoved)) {
return Void();
}
when(wait(resetRequest)) { // To prevent a tight spin loop
if (IFailureMonitor::failureMonitor().getState(ssi.getStorageMetrics.getEndpoint()).isFailed()) {
resetRequest = IFailureMonitor::failureMonitor().onStateEqual(

View File

@ -1445,7 +1445,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
ACTOR Future<Void> updateStorageLoop(TLogData* self) {
wait(delay(0, TaskPriority::UpdateStorage));
loop { wait(updateStorage(self)); }
loop {
wait(updateStorage(self));
}
}
void commitMessages(TLogData* self,
@ -1606,7 +1608,9 @@ ACTOR Future<Void> waitForMessagesForTag(Reference<LogData> self, Tag reqTag, Ve
// we want the caller to finish first, otherwise the data structure it is building might not be complete
wait(delay(0.0));
}
when(wait(delay(timeout))) { self->blockingPeekTimeouts += 1; }
when(wait(delay(timeout))) {
self->blockingPeekTimeouts += 1;
}
}
return Void();
}
@ -2795,7 +2799,9 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
while (!endVersion.present() || logData->version.get() < endVersion.get()) {
loop {
choose {
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
break;
}
when(wait(dbInfoChange)) {
if (logData->logSystem->get()) {
r = logData->logSystem->get()->peek(logData->logId, tagAt, endVersion, tags, true);
@ -3276,7 +3282,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
choose {
when(wait(updateStorage(self))) {}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} else {
@ -3287,7 +3295,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
}
}
when(wait(allRemoved)) { throw worker_removed(); }
when(wait(allRemoved)) {
throw worker_removed();
}
}
}
} catch (Error& e) {
@ -3636,7 +3646,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
when(wait(error)) { throw internal_error(); }
when(wait(error)) {
throw internal_error();
}
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());

View File

@ -168,7 +168,7 @@ static int asyncReadZeroCopy(sqlite3_file* pFile, void** data, int iAmt, sqlite_
}
static int asyncReleaseZeroCopy(sqlite3_file* pFile, void* data, int iAmt, sqlite_int64 iOfst) {
// printf("-asyncReleaseRef %p +%lld %d <= %p\n", pFile, iOfst, iAmt, data);
delete[](char*) data;
delete[] (char*)data;
return SQLITE_OK;
}
#endif
@ -299,7 +299,7 @@ struct SharedMemoryInfo { // for a file
}
void cleanup() {
for (int i = 0; i < regions.size(); i++)
delete[](uint8_t*) regions[i];
delete[] (uint8_t*)regions[i];
table.erase(filename);
}

View File

@ -2264,7 +2264,9 @@ public:
self->remappedPages[r.originalPageID][r.version] = r.newPageID;
}
}
when(wait(remapRecoverActor)) { remapRecoverActor = Never(); }
when(wait(remapRecoverActor)) {
remapRecoverActor = Never();
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
@ -6722,7 +6724,7 @@ private:
debug_print(addPrefix(context, update->toString()));
if (REDWOOD_DEBUG) {
int c = 0;
[[maybe_unused]] int c = 0;
auto i = mBegin;
while (1) {
debug_printf("%s Mutation %4d '%s': %s\n",
@ -10671,7 +10673,9 @@ TEST_CASE(":/redwood/performance/extentQueue") {
if (entriesRead == m_extentQueue.numEntries)
break;
}
when(wait(queueRecoverActor)) { queueRecoverActor = Never(); }
when(wait(queueRecoverActor)) {
queueRecoverActor = Never();
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {

View File

@ -272,8 +272,8 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
masterInterface(masterInterface), masterLifetime(masterLifetimeToken), clusterController(clusterController),
cstate(coordinators, addActor, dbgid), dbInfo(dbInfo), registrationCount(0), addActor(addActor),
recruitmentStalled(makeReference<AsyncVar<bool>>(false)), forceRecovery(forceRecovery), neverCreated(false),
safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid), cc("Master", dbgid.toString()),
changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid),
cc("ClusterRecoveryData", dbgid.toString()), changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
getCommitVersionRequests("GetCommitVersionRequests", cc),
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),

View File

@ -0,0 +1,51 @@
/*
* MetricClient.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/TDMetric.actor.h"
#include "flow/Msgpack.h"
#include "flow/network.h"
#ifndef METRIC_CLIENT_H
#define METRIC_CLIENT_H
class IMetricClient {
protected:
MetricsDataModel model;
public:
virtual void send(MetricCollection*) = 0;
virtual ~IMetricClient() {}
};
class UDPMetricClient : public IMetricClient {
private:
// Since we can't quickly determine the exact packet size for OTELSum in msgpack
// we play on the side of caution and make our maximum 3/4 of the official one
static constexpr uint32_t MAX_OTELSUM_PACKET_SIZE = 0.75 * IUDPSocket::MAX_PACKET_SIZE;
MetricsDataModel model;
Future<Reference<IUDPSocket>> socket;
int socket_fd;
MsgpackBuffer buf;
std::string address;
int port;
void send_packet(int fd, const void* data, size_t len);
public:
UDPMetricClient();
void send(MetricCollection*) override;
};
#endif

View File

@ -30,6 +30,7 @@
#include "flow/actorcompiler.h" // This must be the last #include
ACTOR Future<Void> runMetrics(Future<Database> fcx, Key metricsPrefix);
ACTOR Future<Void> runMetrics();
#include "flow/unactorcompiler.h"
#endif

View File

@ -38,10 +38,10 @@
#include <cstdarg>
#define SevFRMutationInfo SevVerbose
//#define SevFRMutationInfo SevInfo
// #define SevFRMutationInfo SevInfo
#define SevFRDebugInfo SevVerbose
//#define SevFRDebugInfo SevInfo
// #define SevFRDebugInfo SevInfo
struct VersionedMutation {
MutationRef mutation;

View File

@ -1292,7 +1292,9 @@ Future<T> ioTimeoutError(Future<T> what, double time, const char* context = null
}
Future<Void> end = lowPriorityDelay(time);
choose {
when(T t = wait(what)) { return t; }
when(T t = wait(what)) {
return t;
}
when(wait(end)) {
Error err = io_timeout();
if (g_network->isSimulated() && !g_simulator->getCurrentProcess()->isReliable()) {
@ -1326,7 +1328,9 @@ Future<T> ioDegradedOrTimeoutError(Future<T> what,
if (degradedTime < errTime) {
Future<Void> degradedEnd = lowPriorityDelay(degradedTime);
choose {
when(T t = wait(what)) { return t; }
when(T t = wait(what)) {
return t;
}
when(wait(degradedEnd)) {
CODE_PROBE(true, "TLog degraded", probe::func::deduplicate);
TraceEvent(SevWarnAlways, "IoDegraded").log();
@ -1337,7 +1341,9 @@ Future<T> ioDegradedOrTimeoutError(Future<T> what,
Future<Void> end = lowPriorityDelay(errTime - degradedTime);
choose {
when(T t = wait(what)) { return t; }
when(T t = wait(what)) {
return t;
}
when(wait(end)) {
Error err = io_timeout();
if (g_network->isSimulated() && !g_simulator->getCurrentProcess()->isReliable()) {

View File

@ -52,7 +52,7 @@ struct art_tree {
#define ART_PREV -1
#define ART_NEITHER 0
//#define ART_IS_LEAF(x) ( (*((ART_NODE_TYPE*)x) == ART_LEAF))
// #define ART_IS_LEAF(x) ( (*((ART_NODE_TYPE*)x) == ART_LEAF))
template <class T>
static inline bool ART_IS_LEAF(T const& x) {
return *((ART_NODE_TYPE*)x) == ART_LEAF;

View File

@ -1993,7 +1993,9 @@ ACTOR Future<Version> waitForVersionNoTooOld(StorageServer* data, Version versio
if (version <= data->version.get())
return version;
choose {
when(wait(data->version.whenAtLeast(version))) { return version; }
when(wait(data->version.whenAtLeast(version))) {
return version;
}
when(wait(delay(SERVER_KNOBS->FUTURE_VERSION_DELAY))) {
if (deterministicRandom()->random01() < 0.001)
TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
@ -2034,7 +2036,6 @@ std::vector<StorageServerShard> StorageServer::getStorageServerShards(KeyRangeRe
ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
Span span("SS:getValue"_loc, req.spanContext);
span.addAttribute("key"_sr, req.key);
// Temporarily disabled -- this path is hit a lot
// getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
@ -4875,13 +4876,9 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
// keep index for boundary index entries, so that caller can use it as a continuation.
result.data[0].key = input.data[0].key;
result.data[0].value = input.data[0].value;
result.data[0].boundaryAndExist = getMappedKeyValueSize(kvms[0]) > 0;
result.data.back().key = input.data[resultSize - 1].key;
result.data.back().value = input.data[resultSize - 1].value;
// index needs to be -1
int index = (resultSize - 1) % SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE;
result.data.back().boundaryAndExist = getMappedKeyValueSize(kvms[index]) > 0;
}
result.more = input.more || resultSize < sz;
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
@ -6367,7 +6364,9 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
when(wait(changeFeedInfo->fetchLock.take())) {
feedFetchReleaser = FlowLock::Releaser(changeFeedInfo->fetchLock);
}
when(wait(changeFeedInfo->durableFetchVersion.whenAtLeast(endVersion))) { return invalidVersion; }
when(wait(changeFeedInfo->durableFetchVersion.whenAtLeast(endVersion))) {
return invalidVersion;
}
}
state Version startVersion = beginVersion;
@ -8391,6 +8390,8 @@ private:
// Because of data moves, we can get mutations operating on a change feed we don't yet know about, because
// the metadata fetch hasn't started yet
bool createdFeed = false;
bool popMutationLog = false;
bool addMutationToLog = false;
if (feed == data->uidChangeFeed.end() && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
createdFeed = true;
@ -8440,6 +8441,7 @@ private:
CODE_PROBE(true, "private mutation for feed scheduled for deletion! Un-mark it as removing");
feed->second->removing = false;
addMutationToLog = true;
// reset fetch versions because everything previously fetched was cleaned up
feed->second->fetchVersion = invalidVersion;
feed->second->durableFetchVersion = NotifiedVersion();
@ -8448,8 +8450,6 @@ private:
feed->second->updateMetadataVersion(currentVersion);
}
bool popMutationLog = false;
bool addMutationToLog = false;
if (popVersion != invalidVersion && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
// pop the change feed at pop version, no matter what state it is in
if (popVersion - 1 > feed->second->emptyVersion) {
@ -9020,7 +9020,6 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
Span span("SS:update"_loc, spanContext);
span.addAttribute("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
@ -10355,7 +10354,9 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
}*/
}
when(wait(timeout)) { timedout = true; }
when(wait(timeout)) {
timedout = true;
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
@ -11475,4 +11476,4 @@ void versionedMapTest() {
printf("PTree node is %d bytes, allocated as %d bytes\n", NSIZE, ASIZE);
printf("%d distinct after %d insertions\n", count, 1000 * 1000);
printf("Memory used: %f MB\n", (after - before) / 1e6);
}
}

View File

@ -2103,7 +2103,9 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
}
choose {
when(wait(tests)) { return Void(); }
when(wait(tests)) {
return Void();
}
when(wait(quorum(actors, 1))) {
ASSERT(false);
throw internal_error();

View File

@ -675,18 +675,42 @@ ACTOR Future<Void> registrationClient(
TraceEvent(SevWarn, "WorkerRegisterTimeout").detail("WaitTime", now() - startTime);
}
}
when(wait(ccInterface->onChange())) { break; }
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(csInterf->onChange())) { break; }
when(wait(bmInterf->onChange())) { break; }
when(wait(blobMigratorInterf->onChange())) { break; }
when(wait(ekpInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
when(wait(recovered)) { break; }
when(wait(clusterId->onChange())) { break; }
when(wait(ccInterface->onChange())) {
break;
}
when(wait(ddInterf->onChange())) {
break;
}
when(wait(rkInterf->onChange())) {
break;
}
when(wait(csInterf->onChange())) {
break;
}
when(wait(bmInterf->onChange())) {
break;
}
when(wait(blobMigratorInterf->onChange())) {
break;
}
when(wait(ekpInterf->onChange())) {
break;
}
when(wait(degraded->onChange())) {
break;
}
when(wait(FlowTransport::transport().onIncompatibleChanged())) {
break;
}
when(wait(issues->onChange())) {
break;
}
when(wait(recovered)) {
break;
}
when(wait(clusterId->onChange())) {
break;
}
}
}
}
@ -1786,6 +1810,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
metricsLogger = runMetrics(database, KeyRef(metricsPrefix));
database->globalConfig->trigger(samplingFrequency, samplingProfilerUpdateFrequency);
}
} else {
metricsLogger = runMetrics();
}
errorForwarders.add(resetAfter(degraded,

View File

@ -670,8 +670,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
} else {
int targetQueryBytes = (deterministicRandom()->randomInt(1, 20) * targetBytesReadPerQuery) / 10;
int estimatedQueryBytes = 0;
for (int i = 0; estimatedQueryBytes < targetQueryBytes && endKeyIt != threadData->keyData.end();
i++, endKeyIt++) {
for (; estimatedQueryBytes < targetQueryBytes && endKeyIt != threadData->keyData.end();
endKeyIt++) {
// iterate forward until end or target keys have passed
estimatedQueryBytes += (1 + endKeyIt->second.writes.size() - endKeyIt->second.nextClearIdx) *
threadData->targetValLength;

View File

@ -91,7 +91,9 @@ struct IndexScanWorkload : KVWorkload {
ACTOR static Future<Void> serialScans(Database cx, IndexScanWorkload* self) {
state double start = now();
try {
loop { wait(scanDatabase(cx, self)); }
loop {
wait(scanDatabase(cx, self));
}
} catch (...) {
self->totalTimeFetching = now() - start;
throw;

View File

@ -404,7 +404,9 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
try {
choose {
when(wait(main)) {}
when(wait(test.store->getError())) { ASSERT(false); }
when(wait(test.store->getError())) {
ASSERT(false);
}
}
} catch (Error& e) {
err = e;

View File

@ -120,7 +120,9 @@ struct KillRegionWorkload : TestWorkload {
wait(success(ManagementAPI::changeConfig(
cx.getReference(), g_simulator->disablePrimary + " repopulate_anti_quorum=1", true)));
choose {
when(wait(waitForStorageRecovered(self))) { break; }
when(wait(waitForStorageRecovered(self))) {
break;
}
when(wait(delay(300.0))) {}
}
}

View File

@ -90,7 +90,9 @@ struct MiniCycleWorkload : TestWorkload {
if (!ok)
return false;
}
when(wait(end)) { break; }
when(wait(end)) {
break;
}
}
}

Some files were not shown because too many files have changed in this diff Show More