Update rare code probe annotations
This commit is contained in:
parent
95cf6f9a0f
commit
c03f60c618
|
@ -965,7 +965,8 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
allConnectionsFailed = false;
|
||||
} else {
|
||||
CODE_PROBE(rep.getError().code() == error_code_failed_to_progress,
|
||||
"Coordinator cant talk to cluster controller");
|
||||
"Coordinator cant talk to cluster controller",
|
||||
probe::decoration::rare);
|
||||
TraceEvent("MonitorProxiesConnectFailed")
|
||||
.detail("Error", rep.getError().name())
|
||||
.detail("Coordinator", clientLeaderServer.getAddressString());
|
||||
|
|
|
@ -4771,7 +4771,8 @@ static Future<Void> tssStreamComparison(Request request,
|
|||
TSS_traceMismatch(mismatchEvent, request, ssReply.get(), tssReply.get());
|
||||
|
||||
CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
|
||||
"Tracing Full TSS Mismatch in stream comparison");
|
||||
"Tracing Full TSS Mismatch in stream comparison",
|
||||
probe::decoration::rare);
|
||||
CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
|
||||
"Tracing Partial TSS Mismatch in stream comparison and storing the rest in FDB");
|
||||
|
||||
|
@ -4813,7 +4814,7 @@ maybeDuplicateTSSStreamFragment(Request& req, QueueModel* model, RequestStream<R
|
|||
Optional<TSSEndpointData> tssData = model->getTssData(ssStream->getEndpoint().token.first());
|
||||
|
||||
if (tssData.present()) {
|
||||
CODE_PROBE(true, "duplicating stream to TSS");
|
||||
CODE_PROBE(true, "duplicating stream to TSS", probe::decoration::rare);
|
||||
resetReply(req);
|
||||
// FIXME: optimize to avoid creating new netNotifiedQueueWithAcknowledgements for each stream duplication
|
||||
RequestStream<Request> tssRequestStream(tssData.get().endpoint);
|
||||
|
@ -9406,7 +9407,8 @@ void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request,
|
|||
mismatchEvent.detail("TSSVersion", tssVersion);
|
||||
|
||||
CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
|
||||
"Tracing Full TSS Feed Mismatch in stream comparison");
|
||||
"Tracing Full TSS Feed Mismatch in stream comparison",
|
||||
probe::decoration::rare);
|
||||
CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
|
||||
"Tracing Partial TSS Feed Mismatch in stream comparison and storing the rest in FDB");
|
||||
|
||||
|
|
|
@ -1654,7 +1654,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
|
|||
|
||||
// This optimization prevents nullptr operations from being added to the conflict range
|
||||
if (limits.isReached()) {
|
||||
CODE_PROBE(true, "RYW range read limit 0", probe::decoration::rare);
|
||||
CODE_PROBE(true, "RYW range read limit 0");
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
|
@ -1668,7 +1668,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
|
|||
end.removeOrEqual(end.arena());
|
||||
|
||||
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
|
||||
CODE_PROBE(true, "RYW range inverted", probe::decoration::rare);
|
||||
CODE_PROBE(true, "RYW range inverted");
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
|
@ -1698,7 +1698,7 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
if (getDatabase()->apiVersionAtLeast(630)) {
|
||||
if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() &&
|
||||
end.getKey() <= specialKeys.end) {
|
||||
CODE_PROBE(true, "Special key space get range (getMappedRange)");
|
||||
CODE_PROBE(true, "Special key space get range (getMappedRange)", probe::decoration::rare);
|
||||
throw client_invalid_operation(); // Not support special keys.
|
||||
}
|
||||
} else {
|
||||
|
@ -1720,7 +1720,7 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
|
||||
// This optimization prevents nullptr operations from being added to the conflict range
|
||||
if (limits.isReached()) {
|
||||
CODE_PROBE(true, "RYW range read limit 0 (getMappedRange)");
|
||||
CODE_PROBE(true, "RYW range read limit 0 (getMappedRange)", probe::decoration::rare);
|
||||
return MappedRangeResult();
|
||||
}
|
||||
|
||||
|
@ -1734,7 +1734,7 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
end.removeOrEqual(end.arena());
|
||||
|
||||
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
|
||||
CODE_PROBE(true, "RYW range inverted (getMappedRange)");
|
||||
CODE_PROBE(true, "RYW range inverted (getMappedRange)", probe::decoration::rare);
|
||||
return MappedRangeResult();
|
||||
}
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ TEST_CASE("/flow/buggifiedDelay") {
|
|||
});
|
||||
wait(f1 && f2);
|
||||
if (last == 1) {
|
||||
CODE_PROBE(true, "Delays can become ready out of order");
|
||||
CODE_PROBE(true, "Delays can become ready out of order", probe::decoration::rare);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ bool TokenCacheImpl::validateAndAdd(double currentTime, StringRef token, Network
|
|||
Arena arena;
|
||||
authz::jwt::TokenRef t;
|
||||
if (!authz::jwt::parseToken(arena, t, token)) {
|
||||
CODE_PROBE(true, "Token can't be parsed");
|
||||
CODE_PROBE(true, "Token can't be parsed", probe::decoration::rare);
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", peer)
|
||||
.detail("Reason", "ParseError")
|
||||
|
@ -225,35 +225,35 @@ bool TokenCacheImpl::validateAndAdd(double currentTime, StringRef token, Network
|
|||
}
|
||||
auto key = FlowTransport::transport().getPublicKeyByName(t.keyId);
|
||||
if (!key.present()) {
|
||||
CODE_PROBE(true, "Token referencing non-existing key");
|
||||
CODE_PROBE(true, "Token referencing non-existing key", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("UnknownKey", t);
|
||||
return false;
|
||||
} else if (!t.issuedAtUnixTime.present()) {
|
||||
CODE_PROBE(true, "Token has no issued-at field");
|
||||
CODE_PROBE(true, "Token has no issued-at field", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("NoIssuedAt", t);
|
||||
return false;
|
||||
} else if (!t.expiresAtUnixTime.present()) {
|
||||
CODE_PROBE(true, "Token has no expiration time");
|
||||
CODE_PROBE(true, "Token has no expiration time", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("NoExpirationTime", t);
|
||||
return false;
|
||||
} else if (double(t.expiresAtUnixTime.get()) <= currentTime) {
|
||||
CODE_PROBE(true, "Expired token");
|
||||
CODE_PROBE(true, "Expired token", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("Expired", t);
|
||||
return false;
|
||||
} else if (!t.notBeforeUnixTime.present()) {
|
||||
CODE_PROBE(true, "Token has no not-before field");
|
||||
CODE_PROBE(true, "Token has no not-before field", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("NoNotBefore", t);
|
||||
return false;
|
||||
} else if (double(t.notBeforeUnixTime.get()) > currentTime) {
|
||||
CODE_PROBE(true, "Tokens not-before is in the future");
|
||||
CODE_PROBE(true, "Tokens not-before is in the future", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("TokenNotYetValid", t);
|
||||
return false;
|
||||
} else if (!t.tenants.present()) {
|
||||
CODE_PROBE(true, "Token with no tenants");
|
||||
CODE_PROBE(true, "Token with no tenants", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("NoTenants", t);
|
||||
return false;
|
||||
} else if (!authz::jwt::verifyToken(token, key.get())) {
|
||||
CODE_PROBE(true, "Token with invalid signature");
|
||||
CODE_PROBE(true, "Token with invalid signature", probe::decoration::rare);
|
||||
TRACE_INVALID_PARSED_TOKEN("InvalidSignature", t);
|
||||
return false;
|
||||
} else {
|
||||
|
@ -300,7 +300,7 @@ bool TokenCacheImpl::validate(TenantNameRef name, StringRef token) {
|
|||
}
|
||||
}
|
||||
if (!tenantFound) {
|
||||
CODE_PROBE(true, "Valid token doesn't reference tenant");
|
||||
CODE_PROBE(true, "Valid token doesn't reference tenant", probe::decoration::rare);
|
||||
TraceEvent(SevWarn, "TenantTokenMismatch").detail("From", peer).detail("Tenant", name.toString());
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -2361,7 +2361,7 @@ class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
|
|||
NetworkAddress _localAddress;
|
||||
bool randomDropPacket() {
|
||||
auto res = deterministicRandom()->random01() < .000001;
|
||||
CODE_PROBE(res, "UDP packet drop", probe::context::sim2, probe::assert::simOnly);
|
||||
CODE_PROBE(res, "UDP packet drop", probe::context::sim2, probe::assert::simOnly, probe::decoration::rare);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -654,7 +654,7 @@ private:
|
|||
TraceEvent("WriteRecoveryKeySet", dbgid).log();
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
CODE_PROBE(true, "Snapshot created, setting writeRecoveryKey in txnStateStore");
|
||||
CODE_PROBE(true, "Snapshot created, setting writeRecoveryKey in txnStateStore", probe::decoration::rare);
|
||||
}
|
||||
|
||||
void checkSetTenantMapPrefix(MutationRef m) {
|
||||
|
|
|
@ -441,7 +441,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
|
|||
// if this granule is not an active granule, it can't be merged
|
||||
auto gIt = workerAssignments.rangeContaining(range.begin);
|
||||
if (gIt->begin() != range.begin || gIt->end() != range.end) {
|
||||
CODE_PROBE(true, "non-active granule reported merge eligible, ignoring");
|
||||
CODE_PROBE(true, "non-active granule reported merge eligible, ignoring", probe::decoration::rare);
|
||||
if (BM_DEBUG) {
|
||||
fmt::print(
|
||||
"BM {0} Ignoring Merge Candidate [{1} - {2}): range mismatch with active granule [{3} - {4})\n",
|
||||
|
@ -1035,7 +1035,7 @@ static bool handleRangeIsAssign(Reference<BlobManagerData> bmData, RangeAssignme
|
|||
if (assignment.assign.get().type == AssignRequestType::Continue) {
|
||||
ASSERT(assignment.worker.present());
|
||||
if (i.range() != assignment.keyRange || i.cvalue() != assignment.worker.get()) {
|
||||
CODE_PROBE(true, "BM assignment out of date");
|
||||
CODE_PROBE(true, "BM assignment out of date", probe::decoration::rare);
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Out of date re-assign for ({0}, {1}). Assignment must have changed while "
|
||||
"checking split.\n Reassign: [{2} - {3}): {4}\n Existing: [{5} - {6}): {7}\n",
|
||||
|
@ -1602,10 +1602,10 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
|
|||
if (retried && prevOwnerEpoch == bmData->epoch && prevGranuleID == granuleID &&
|
||||
prevOwnerSeqno == std::numeric_limits<int64_t>::max()) {
|
||||
// owner didn't change, last iteration of this transaction just succeeded but threw an error.
|
||||
CODE_PROBE(true, "split too big adjustment succeeded after retry");
|
||||
CODE_PROBE(true, "split too big adjustment succeeded after retry", probe::decoration::rare);
|
||||
break;
|
||||
}
|
||||
CODE_PROBE(true, "split too big was since moved to another worker");
|
||||
CODE_PROBE(true, "split too big was since moved to another worker", probe::decoration::rare);
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: moved to another worker\n",
|
||||
bmData->epoch,
|
||||
|
@ -1839,7 +1839,7 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
|
|||
wait(checkManagerLock(tr, bmData));
|
||||
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), granuleRange));
|
||||
if (purgeState != ForcedPurgeState::NonePurged) {
|
||||
CODE_PROBE(true, "Split stopped because of force purge");
|
||||
CODE_PROBE(true, "Split stopped because of force purge", probe::decoration::rare);
|
||||
TraceEvent("GranuleSplitCancelledForcePurge", bmData->id)
|
||||
.detail("Epoch", bmData->epoch)
|
||||
.detail("GranuleRange", granuleRange);
|
||||
|
@ -2635,7 +2635,9 @@ ACTOR Future<Void> attemptMerges(Reference<BlobManagerData> bmData,
|
|||
currentBytes + metrics.bytes > SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
|
||||
currentKeySumBytes >= CLIENT_KNOBS->VALUE_SIZE_LIMIT / 2) {
|
||||
ASSERT(currentBytes <= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES);
|
||||
CODE_PROBE(currentKeySumBytes >= CLIENT_KNOBS->VALUE_SIZE_LIMIT / 2, "merge early because of key size");
|
||||
CODE_PROBE(currentKeySumBytes >= CLIENT_KNOBS->VALUE_SIZE_LIMIT / 2,
|
||||
"merge early because of key size",
|
||||
probe::decoration::rare);
|
||||
attemptStartMerge(bmData, currentCandidates);
|
||||
currentCandidates.clear();
|
||||
currentBytes = 0;
|
||||
|
@ -3254,7 +3256,7 @@ static void addAssignment(KeyRangeMap<std::tuple<UID, int64_t, int64_t>>& map,
|
|||
if (oldEpoch > newEpoch || (oldEpoch == newEpoch && oldSeqno > newSeqno)) {
|
||||
newer.push_back(std::pair(old.range(), std::tuple(oldWorker, oldEpoch, oldSeqno)));
|
||||
if (old.range() != newRange) {
|
||||
CODE_PROBE(true, "BM Recovery: BWs disagree on range boundaries");
|
||||
CODE_PROBE(true, "BM Recovery: BWs disagree on range boundaries", probe::decoration::rare);
|
||||
anyConflicts = true;
|
||||
}
|
||||
} else {
|
||||
|
@ -3288,7 +3290,8 @@ static void addAssignment(KeyRangeMap<std::tuple<UID, int64_t, int64_t>>& map,
|
|||
std::get<0>(old.value()) = UID();
|
||||
}
|
||||
if (outOfDate.empty() || outOfDate.back() != std::pair(oldWorker, KeyRange(old.range()))) {
|
||||
CODE_PROBE(true, "BM Recovery: Two workers claim ownership of same granule");
|
||||
CODE_PROBE(
|
||||
true, "BM Recovery: Two workers claim ownership of same granule", probe::decoration::rare);
|
||||
outOfDate.push_back(std::pair(oldWorker, old.range()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2283,7 +2283,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
// popped up to V+1 is ok. Or in other words, if the last delta @ V, we only missed data
|
||||
// at V+1 onward if popVersion >= V+2
|
||||
if (metadata->bufferedDeltaVersion < metadata->activeCFData.get()->popVersion - 1) {
|
||||
CODE_PROBE(true, "Blob Worker detected popped");
|
||||
CODE_PROBE(true, "Blob Worker detected popped", probe::decoration::rare);
|
||||
TraceEvent("BlobWorkerChangeFeedPopped", bwData->id)
|
||||
.detail("Granule", metadata->keyRange)
|
||||
.detail("GranuleID", startState.granuleID)
|
||||
|
@ -3987,7 +3987,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
|
||||
ForcedPurgeState purgeState = wait(fForcedPurgeState);
|
||||
if (purgeState != ForcedPurgeState::NonePurged) {
|
||||
CODE_PROBE(true, "Worker trying to open force purged granule");
|
||||
CODE_PROBE(true, "Worker trying to open force purged granule", probe::decoration::rare);
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("Granule [{0} - {1}) is force purged on BW {2}, abandoning\n",
|
||||
req.keyRange.begin.printable(),
|
||||
|
|
|
@ -1310,7 +1310,7 @@ ACTOR Future<WriteMutationRefVar> writeMutationFetchEncryptKey(CommitBatchContex
|
|||
wait(getLatestEncryptCipherKey(self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG));
|
||||
self->cipherKeys[domainId] = cipherKey;
|
||||
|
||||
CODE_PROBE(true, "Raw access mutation encryption");
|
||||
CODE_PROBE(true, "Raw access mutation encryption", probe::decoration::rare);
|
||||
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
|
||||
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
|
||||
self->toCommit.writeTypedMessage(encryptedMutation);
|
||||
|
|
|
@ -316,7 +316,7 @@ class ConfigNodeImpl {
|
|||
ACTOR static Future<Void> getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) {
|
||||
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||
if (locked.present()) {
|
||||
CODE_PROBE(true, "attempting to read config classes from locked ConfigNode");
|
||||
CODE_PROBE(true, "attempting to read config classes from locked ConfigNode", probe::decoration::rare);
|
||||
req.reply.sendError(coordinators_changed());
|
||||
return Void();
|
||||
}
|
||||
|
@ -360,7 +360,7 @@ class ConfigNodeImpl {
|
|||
ACTOR static Future<Void> getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
|
||||
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||
if (locked.present()) {
|
||||
CODE_PROBE(true, "attempting to read knobs from locked ConfigNode");
|
||||
CODE_PROBE(true, "attempting to read knobs from locked ConfigNode", probe::decoration::rare);
|
||||
req.reply.sendError(coordinators_changed());
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -623,7 +623,9 @@ std::vector<RangeToSplit> findTenantShardBoundaries(KeyRangeMap<ShardTrackedData
|
|||
result.emplace_back(shardContainingTenantEnd, faultLines);
|
||||
}
|
||||
} else {
|
||||
CODE_PROBE(true, "Shards that contain tenant key range not split since shard stats are unavailable");
|
||||
CODE_PROBE(true,
|
||||
"Shards that contain tenant key range not split since shard stats are unavailable",
|
||||
probe::decoration::rare);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1358,7 +1360,7 @@ ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKM
|
|||
when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never()
|
||||
: fetchTopKShardMetrics_impl(self, req))) {}
|
||||
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
|
||||
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT", probe::decoration::rare);
|
||||
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT");
|
||||
req.reply.send(GetTopKMetricsReply());
|
||||
}
|
||||
}
|
||||
|
@ -2087,4 +2089,4 @@ TEST_CASE("/DataDistributor/Tracker/FetchTopK") {
|
|||
ASSERT(reply.minReadLoad == -1);
|
||||
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1538,14 +1538,18 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
|
||||
auto& snapUID = snapReq.snapUID;
|
||||
if (ddSnapReqResultMap.count(snapUID)) {
|
||||
CODE_PROBE(true, "Data distributor received a duplicate finished snapshot request");
|
||||
CODE_PROBE(true,
|
||||
"Data distributor received a duplicate finished snapshot request",
|
||||
probe::decoration::rare);
|
||||
auto result = ddSnapReqResultMap[snapUID];
|
||||
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
|
||||
TraceEvent("RetryFinishedDistributorSnapRequest")
|
||||
.detail("SnapUID", snapUID)
|
||||
.detail("Result", result.isError() ? result.getError().code() : 0);
|
||||
} else if (ddSnapReqMap.count(snapReq.snapUID)) {
|
||||
CODE_PROBE(true, "Data distributor received a duplicate ongoing snapshot request");
|
||||
CODE_PROBE(true,
|
||||
"Data distributor received a duplicate ongoing snapshot request",
|
||||
probe::decoration::rare);
|
||||
TraceEvent("RetryOngoingDistributorSnapRequest").detail("SnapUID", snapUID);
|
||||
ASSERT(snapReq.snapPayload == ddSnapReqMap[snapUID].snapPayload);
|
||||
ddSnapReqMap[snapUID] = snapReq;
|
||||
|
|
|
@ -49,7 +49,7 @@ bool GrvProxyTagThrottler::TagQueue::isMaxThrottled(double maxThrottleDuration)
|
|||
}
|
||||
|
||||
void GrvProxyTagThrottler::TagQueue::rejectRequests(LatencyBandsMap& latencyBandsMap) {
|
||||
CODE_PROBE(true, "GrvProxyTagThrottler rejecting requests");
|
||||
CODE_PROBE(true, "GrvProxyTagThrottler rejecting requests", probe::decoration::rare);
|
||||
while (!requests.empty()) {
|
||||
auto& delayedReq = requests.front();
|
||||
delayedReq.updateProxyTagThrottledDuration(latencyBandsMap);
|
||||
|
|
|
@ -740,7 +740,8 @@ private:
|
|||
}
|
||||
|
||||
CODE_PROBE(self->enableEncryption && self->uncommittedBytes() > 0,
|
||||
"KeyValueStoreMemory recovered partial transaction while encryption-at-rest is enabled");
|
||||
"KeyValueStoreMemory recovered partial transaction while encryption-at-rest is enabled",
|
||||
probe::decoration::rare);
|
||||
self->semiCommit();
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -131,16 +131,16 @@ void MockStorageServer::setShardStatus(KeyRangeRef range, MockShardStatus status
|
|||
auto ranges = serverKeys.intersectingRanges(range);
|
||||
ASSERT(!ranges.empty());
|
||||
if (ranges.begin().range().contains(range)) {
|
||||
CODE_PROBE(true, "Implicitly split single shard to 3 pieces");
|
||||
CODE_PROBE(true, "Implicitly split single shard to 3 pieces", probe::decoration::rare);
|
||||
threeWayShardSplitting(ranges.begin().range(), range, ranges.begin().cvalue().shardSize, restrictSize);
|
||||
return;
|
||||
}
|
||||
if (ranges.begin().begin() < range.begin) {
|
||||
CODE_PROBE(true, "Implicitly split begin range to 2 pieces");
|
||||
CODE_PROBE(true, "Implicitly split begin range to 2 pieces", probe::decoration::rare);
|
||||
twoWayShardSplitting(ranges.begin().range(), range.begin, ranges.begin().cvalue().shardSize, restrictSize);
|
||||
}
|
||||
if (ranges.end().end() > range.end) {
|
||||
CODE_PROBE(true, "Implicitly split end range to 2 pieces");
|
||||
CODE_PROBE(true, "Implicitly split end range to 2 pieces", probe::decoration::rare);
|
||||
twoWayShardSplitting(ranges.end().range(), range.end, ranges.end().cvalue().shardSize, restrictSize);
|
||||
}
|
||||
ranges = serverKeys.containedRanges(range);
|
||||
|
@ -156,7 +156,7 @@ void MockStorageServer::setShardStatus(KeyRangeRef range, MockShardStatus status
|
|||
if (isStatusTransitionValid(oldStatus, status)) {
|
||||
it.value() = ShardInfo{ status, newSize };
|
||||
} else if (oldStatus == MockShardStatus::COMPLETED && status == MockShardStatus::INFLIGHT) {
|
||||
CODE_PROBE(true, "Shard already on server");
|
||||
CODE_PROBE(true, "Shard already on server", probe::decoration::rare);
|
||||
} else {
|
||||
TraceEvent(SevError, "MockShardStatusTransitionError")
|
||||
.detail("From", oldStatus)
|
||||
|
@ -382,7 +382,7 @@ Future<std::vector<KeyRangeLocationInfo>> MockGlobalState::getKeyRangeLocations(
|
|||
ASSERT_EQ(srcTeam.size(), 1);
|
||||
rep.results.emplace_back(it->range(), extractStorageServerInterfaces(srcTeam.front().servers));
|
||||
}
|
||||
CODE_PROBE(it != ranges.end(), "getKeyRangeLocations is limited", probe::decoration::rare);
|
||||
CODE_PROBE(it != ranges.end(), "getKeyRangeLocations is limited");
|
||||
|
||||
std::vector<KeyRangeLocationInfo> results;
|
||||
for (int shard = 0; shard < rep.results.size(); shard++) {
|
||||
|
|
|
@ -196,7 +196,7 @@ private:
|
|||
|
||||
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
|
||||
if (e.size() != payloadSize + 1) {
|
||||
CODE_PROBE(true, "Zero fill within payload");
|
||||
CODE_PROBE(true, "Zero fill within payload", probe::decoration::rare);
|
||||
zeroFillSize = payloadSize + 1 - e.size();
|
||||
break;
|
||||
}
|
||||
|
@ -210,7 +210,7 @@ private:
|
|||
}
|
||||
}
|
||||
if (zeroFillSize) {
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue");
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue", probe::decoration::rare);
|
||||
for (int i = 0; i < zeroFillSize; i++)
|
||||
self->queue->push(StringRef((const uint8_t*)"", 1));
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ private:
|
|||
|
||||
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
|
||||
if (e.size() != payloadSize + 1) {
|
||||
CODE_PROBE(true, "Zero fill within payload");
|
||||
CODE_PROBE(true, "Zero fill within payload", probe::decoration::rare);
|
||||
zeroFillSize = payloadSize + 1 - e.size();
|
||||
break;
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ private:
|
|||
}
|
||||
}
|
||||
if (zeroFillSize) {
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue");
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue", probe::decoration::rare);
|
||||
for (int i = 0; i < zeroFillSize; i++)
|
||||
self->queue->push(StringRef((const uint8_t*)"", 1));
|
||||
}
|
||||
|
|
|
@ -368,7 +368,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) ? &cipherKeys
|
||||
: nullptr);
|
||||
}
|
||||
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery");
|
||||
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery", probe::decoration::rare);
|
||||
}
|
||||
|
||||
self->resolvedStateTransactions += req.txnStateTransactions.size();
|
||||
|
|
|
@ -172,7 +172,7 @@ private:
|
|||
|
||||
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
|
||||
if (e.size() != payloadSize + 1) {
|
||||
CODE_PROBE(true, "Zero fill within payload");
|
||||
CODE_PROBE(true, "Zero fill within payload", probe::decoration::rare);
|
||||
zeroFillSize = payloadSize + 1 - e.size();
|
||||
break;
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ private:
|
|||
}
|
||||
}
|
||||
if (zeroFillSize) {
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue");
|
||||
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue", probe::decoration::rare);
|
||||
for (int i = 0; i < zeroFillSize; i++)
|
||||
self->queue->push(StringRef((const uint8_t*)"", 1));
|
||||
}
|
||||
|
@ -1262,7 +1262,7 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
|
|||
TraceEvent("PlayIgnoredPop", logData->logId).detail("Tag", tag.toString()).detail("Version", version);
|
||||
ignoredPops.push_back(tLogPopCore(self, tag, version, logData));
|
||||
if (++ignoredPopsPlayed % SERVER_KNOBS->TLOG_POP_BATCH_SIZE == 0) {
|
||||
CODE_PROBE(true, "Yielding while processing pop requests");
|
||||
CODE_PROBE(true, "Yielding while processing pop requests", probe::decoration::rare);
|
||||
wait(yield());
|
||||
}
|
||||
}
|
||||
|
@ -1857,7 +1857,8 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
|
|||
}
|
||||
if (sequenceData.isSet()) {
|
||||
if (sequenceData.getFuture().get().first != rep.end) {
|
||||
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
|
||||
CODE_PROBE(
|
||||
true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
|
||||
replyPromise.sendError(operation_obsolete());
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ Future<Void> serveStorageMetricsRequests(ServiceType* self, StorageServerInterfa
|
|||
choose {
|
||||
when(state WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
|
||||
if (!req.tenantInfo.present() && !self->isReadable(req.keys)) {
|
||||
CODE_PROBE(true, "waitMetrics immediate wrong_shard_server()");
|
||||
CODE_PROBE(true, "waitMetrics immediate wrong_shard_server()", probe::decoration::rare);
|
||||
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
|
||||
} else {
|
||||
self->addActor(self->waitMetricsTenantAware(req));
|
||||
|
@ -231,4 +231,4 @@ Future<Void> serveStorageMetricsRequests(ServiceType* self, StorageServerInterfa
|
|||
}
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif // FDBSERVER_STORAGEMETRICS_H
|
||||
#endif // FDBSERVER_STORAGEMETRICS_H
|
||||
|
|
|
@ -2331,7 +2331,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
recruited.initEndpoints();
|
||||
if (blobMigratorInterf->get().present()) {
|
||||
recruited = blobMigratorInterf->get().get();
|
||||
CODE_PROBE(true, "Recruited while already a blob migrator.");
|
||||
CODE_PROBE(true, "Recruited while already a blob migrator.", probe::decoration::rare);
|
||||
} else {
|
||||
startRole(Role::BLOB_MIGRATOR, recruited.id(), interf.id());
|
||||
DUMPTOKEN(recruited.haltBlobMigrator);
|
||||
|
@ -2796,7 +2796,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
|
||||
std::string snapReqKey = snapReq.snapUID.toString() + snapReq.role.toString();
|
||||
if (snapReqResultMap.count(snapReqKey)) {
|
||||
CODE_PROBE(true, "Worker received a duplicate finished snapshot request");
|
||||
CODE_PROBE(true, "Worker received a duplicate finished snapshot request", probe::decoration::rare);
|
||||
auto result = snapReqResultMap[snapReqKey];
|
||||
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
|
||||
TraceEvent("RetryFinishedWorkerSnapRequest")
|
||||
|
@ -2804,7 +2804,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
.detail("Role", snapReq.role)
|
||||
.detail("Result", result.isError() ? result.getError().code() : success().code());
|
||||
} else if (snapReqMap.count(snapReqKey)) {
|
||||
CODE_PROBE(true, "Worker received a duplicate ongoing snapshot request");
|
||||
CODE_PROBE(true, "Worker received a duplicate ongoing snapshot request", probe::decoration::rare);
|
||||
TraceEvent("RetryOngoingWorkerSnapRequest")
|
||||
.detail("SnapUID", snapReq.snapUID.toString())
|
||||
.detail("Role", snapReq.role);
|
||||
|
|
|
@ -228,7 +228,7 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
|
|||
|
||||
// test finish or started but cancelled movement
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
CODE_PROBE(true, "RawMovementApi partial started");
|
||||
CODE_PROBE(true, "RawMovementApi partial started", probe::decoration::rare);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -320,4 +320,4 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
|
|||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<IDDTxnProcessorApiWorkload> IDDTxnProcessorApiWorkload;
|
||||
WorkloadFactory<IDDTxnProcessorApiWorkload> IDDTxnProcessorApiWorkload;
|
||||
|
|
Loading…
Reference in New Issue