Merge remote-tracking branch 'fork/ddsketch' into ddsketch

This commit is contained in:
Sam Gwydir 2022-11-15 12:18:11 -08:00
commit ac729d130d
4 changed files with 34 additions and 53 deletions

View File

@ -1016,9 +1016,7 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
proxiesChangeTrigger->trigger(); proxiesChangeTrigger->trigger();
} }
} }
when(wait(actors.getResult())) { when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
UNSTOPPABLE_ASSERT(false);
}
} }
} }
} }
@ -1839,13 +1837,13 @@ DatabaseContext::DatabaseContext(const Error& err)
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG), ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG), bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG), bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000), bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), sharedStatePtr(nullptr), mutationsPerCommit(), bytesPerCommit(), sharedStatePtr(nullptr), transactionTracingSample(false),
transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {} connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
// Static constructor used by server processes to create a DatabaseContext // Static constructor used by server processes to create a DatabaseContext
@ -3423,9 +3421,7 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
std::vector<Error>{ transaction_too_old(), future_version() }); std::vector<Error>{ transaction_too_old(), future_version() });
} }
choose { choose {
when(wait(trState->cx->connectionFileChanged())) { when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
throw transaction_too_old();
}
when(GetValueReply _reply = wait(loadBalance( when(GetValueReply _reply = wait(loadBalance(
trState->cx.getPtr(), trState->cx.getPtr(),
locationInfo.locations, locationInfo.locations,
@ -3572,9 +3568,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
state GetKeyReply reply; state GetKeyReply reply;
try { try {
choose { choose {
when(wait(trState->cx->connectionFileChanged())) { when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
throw transaction_too_old();
}
when(GetKeyReply _reply = wait(loadBalance( when(GetKeyReply _reply = wait(loadBalance(
trState->cx.getPtr(), trState->cx.getPtr(),
locationInfo.locations, locationInfo.locations,
@ -3738,9 +3732,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
TaskPriority::DefaultPromiseEndpoint))) { TaskPriority::DefaultPromiseEndpoint))) {
resp = r; resp = r;
} }
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { wait(Never()); }
wait(Never());
}
} }
if (watchValueID.present()) { if (watchValueID.present()) {
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After");
@ -4059,9 +4051,7 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
state GetKeyValuesFamilyReply rep; state GetKeyValuesFamilyReply rep;
try { try {
choose { choose {
when(wait(trState->cx->connectionFileChanged())) { when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); }
throw transaction_too_old();
}
when(GetKeyValuesFamilyReply _rep = wait(loadBalance( when(GetKeyValuesFamilyReply _rep = wait(loadBalance(
trState->cx.getPtr(), trState->cx.getPtr(),
locations[shard].locations, locations[shard].locations,
@ -4960,9 +4950,7 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
return Void(); return Void();
} }
when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) { when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) { rep = _rep; }
rep = _rep;
}
} }
++trState->cx->transactionPhysicalReadsCompleted; ++trState->cx->transactionPhysicalReadsCompleted;
} catch (Error& e) { } catch (Error& e) {
@ -5455,9 +5443,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
loop { loop {
choose { choose {
// NativeAPI watchValue future finishes or errors // NativeAPI watchValue future finishes or errors
when(wait(watch->watchFuture)) { when(wait(watch->watchFuture)) { break; }
break;
}
when(wait(cx->connectionFileChanged())) { when(wait(cx->connectionFileChanged())) {
CODE_PROBE(true, "Recreated a watch after switch"); CODE_PROBE(true, "Recreated a watch after switch");
@ -7042,9 +7028,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpa
state Future<Void> onProxiesChanged = cx->onProxiesChanged(); state Future<Void> onProxiesChanged = cx->onProxiesChanged();
choose { choose {
when(wait(onProxiesChanged)) { when(wait(onProxiesChanged)) { onProxiesChanged = cx->onProxiesChanged(); }
onProxiesChanged = cx->onProxiesChanged();
}
when(GetReadVersionReply v = when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies( wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies(
flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)), flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)),
@ -7470,9 +7454,7 @@ ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
needToConnect = false; needToConnect = false;
} }
choose { choose {
when(wait(coordinator->onChange())) { when(wait(coordinator->onChange())) { needToConnect = true; }
needToConnect = true;
}
when(ProtocolVersion pv = wait(protocolVersion)) { when(ProtocolVersion pv = wait(protocolVersion)) {
if (!expectedVersion.present() || expectedVersion.get() != pv) { if (!expectedVersion.present() || expectedVersion.get() != pv) {
@ -9032,12 +9014,8 @@ ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
} }
choose { choose {
when(wait(cx->connectionFileChanged())) { when(wait(cx->connectionFileChanged())) { cx->invalidateCache(KeyRef(), keys); }
cx->invalidateCache(KeyRef(), keys); when(wait(waitForAll(futures))) { break; }
}
when(wait(waitForAll(futures))) {
break;
}
when(wait(delay(timeout))) { when(wait(delay(timeout))) {
TraceEvent("GetCheckpointTimeout").detail("Range", keys).detail("Version", version); TraceEvent("GetCheckpointTimeout").detail("Range", keys).detail("Version", version);
} }
@ -9684,12 +9662,8 @@ ACTOR Future<Void> changeFeedWhenAtLatest(Reference<ChangeFeedData> self, Versio
// only allowed to use empty versions if you're caught up // only allowed to use empty versions if you're caught up
Future<Void> waitEmptyVersion = (self->notAtLatest.get() == 0) ? changeFeedWaitLatest(self, version) : Never(); Future<Void> waitEmptyVersion = (self->notAtLatest.get() == 0) ? changeFeedWaitLatest(self, version) : Never();
choose { choose {
when(wait(waitEmptyVersion)) { when(wait(waitEmptyVersion)) { break; }
break; when(wait(lastReturned)) { break; }
}
when(wait(lastReturned)) {
break;
}
when(wait(self->refresh.getFuture())) {} when(wait(self->refresh.getFuture())) {}
when(wait(self->notAtLatest.onChange())) {} when(wait(self->notAtLatest.onChange())) {}
} }

View File

@ -96,7 +96,12 @@ public:
try { try {
buckets.at(index)++; buckets.at(index)++;
} catch (std::out_of_range const& e) { } catch (std::out_of_range const& e) {
fmt::print(stderr, "ERROR: Invalid DDSketch bucket index ({}) at {}/{} for sample: {}\n", e.what(), index, buckets.size(), sample); fmt::print(stderr,
"ERROR: Invalid DDSketch bucket index ({}) at {}/{} for sample: {}\n",
e.what(),
index,
buckets.size(),
sample);
} }
} }
@ -158,7 +163,8 @@ public:
} }
} }
ASSERT(found); ASSERT(found);
if (!found) return -1; if (!found)
return -1;
return static_cast<Impl*>(this)->getValue(index); return static_cast<Impl*>(this)->getValue(index);
} }
@ -213,6 +219,7 @@ public:
multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) { multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) {
ASSERT(errorGuarantee > 0); ASSERT(errorGuarantee > 0);
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS); offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS);
ASSERT(offset > 0);
this->setBucketSize(2 * offset); this->setBucketSize(2 * offset);
} }

View File

@ -105,9 +105,9 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
TSSMetrics() TSSMetrics()
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc), : cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc), ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000), mismatches("Mismatches", cc), SSgetValueLatency(), SSgetKeyLatency(), SSgetKeyValuesLatency(),
SSgetMappedKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), SSgetMappedKeyValuesLatency(), TSSgetValueLatency(), TSSgetKeyLatency(), TSSgetKeyValuesLatency(),
TSSgetKeyValuesLatency(1000), TSSgetMappedKeyValuesLatency(1000) {} TSSgetMappedKeyValuesLatency() {}
}; };
template <class Rep> template <class Rep>

View File

@ -47,7 +47,7 @@ DESCR struct ReadMetric {
// Common ReadWrite test settings // Common ReadWrite test settings
struct ReadWriteCommon : KVWorkload { struct ReadWriteCommon : KVWorkload {
static constexpr int sampleSize = 10000; static constexpr double sampleError = 0.01;
friend struct ReadWriteCommonImpl; friend struct ReadWriteCommonImpl;
// general test setting // general test setting
@ -88,9 +88,9 @@ struct ReadWriteCommon : KVWorkload {
explicit ReadWriteCommon(WorkloadContext const& wcx) explicit ReadWriteCommon(WorkloadContext const& wcx)
: KVWorkload(wcx), totalReadsMetric("ReadWrite.TotalReads"_sr), totalRetriesMetric("ReadWrite.TotalRetries"_sr), : KVWorkload(wcx), totalReadsMetric("ReadWrite.TotalReads"_sr), totalRetriesMetric("ReadWrite.TotalRetries"_sr),
aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleError),
readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencies(sampleError), commitLatencies(sampleError), GRVLatencies(sampleError),
readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) { fullReadLatencies(sampleError), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) {
transactionSuccessMetric.init("ReadWrite.SuccessfulTransaction"_sr); transactionSuccessMetric.init("ReadWrite.SuccessfulTransaction"_sr);
transactionFailureMetric.init("ReadWrite.FailedTransaction"_sr); transactionFailureMetric.init("ReadWrite.FailedTransaction"_sr);